Skip to main content

gear_core/message/
context.rs

1// Copyright (C) Gear Technologies Inc.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4use crate::{
5    buffer::Payload,
6    ids::{ActorId, MessageId, ReservationId, prelude::*},
7    message::{
8        Dispatch, HandleMessage, HandlePacket, IncomingMessage, InitMessage, InitPacket,
9        ReplyMessage, ReplyPacket,
10    },
11    reservation::{GasReserver, ReservationNonce},
12};
13use alloc::{
14    collections::{BTreeMap, BTreeSet},
15    vec::Vec,
16};
17use gear_core_errors::{ExecutionError, ExtError, MessageError as Error, MessageError};
18use parity_scale_codec::{Decode, Encode};
19use scale_decode::DecodeAsType;
20use scale_encode::EncodeAsType;
21use scale_info::TypeInfo;
22
23use super::{DispatchKind, IncomingDispatch, Packet};
24
25/// Context settings.
26#[derive(Clone, Copy, Debug, Default)]
27pub struct ContextSettings {
28    /// Fee for sending message.
29    pub sending_fee: u64,
30    /// Fee for sending scheduled message.
31    pub scheduled_sending_fee: u64,
32    /// Fee for calling wait.
33    pub waiting_fee: u64,
34    /// Fee for waking messages.
35    pub waking_fee: u64,
36    /// Fee for creating reservation.
37    pub reservation_fee: u64,
38    /// Limit of outgoing messages, that program can send in current message processing.
39    pub outgoing_limit: u32,
40    /// Limit of bytes in outgoing messages during current execution.
41    pub outgoing_bytes_limit: u32,
42}
43
44impl ContextSettings {
45    /// Returns default settings with specified outgoing messages limits.
46    pub fn with_outgoing_limits(outgoing_limit: u32, outgoing_bytes_limit: u32) -> Self {
47        Self {
48            outgoing_limit,
49            outgoing_bytes_limit,
50            ..Default::default()
51        }
52    }
53}
54
55/// Dispatch or message with additional information.
56pub type OutgoingMessageInfo<T> = (T, u32, Option<ReservationId>);
57pub type OutgoingMessageInfoNoDelay<T> = (T, Option<ReservationId>);
58
59/// Context outcome dispatches and awakening ids.
60pub struct ContextOutcomeDrain {
61    /// Outgoing dispatches to be sent.
62    pub outgoing_dispatches: Vec<OutgoingMessageInfo<Dispatch>>,
63    /// Messages to be waken.
64    pub awakening: Vec<(MessageId, u32)>,
65    /// Reply deposits to be provided.
66    pub reply_deposits: Vec<(MessageId, u64)>,
67    /// Whether this execution sent out a reply.
68    pub reply_sent: bool,
69}
70
71/// Context outcome.
72///
73/// Contains all outgoing messages and wakes that should be done after execution.
74#[derive(Clone, Debug)]
75pub struct ContextOutcome {
76    init: Vec<OutgoingMessageInfo<InitMessage>>,
77    handle: Vec<OutgoingMessageInfo<HandleMessage>>,
78    reply: Option<OutgoingMessageInfoNoDelay<ReplyMessage>>,
79    // u32 is delay
80    awakening: Vec<(MessageId, u32)>,
81    // u64 is gas limit
82    // TODO: add Option<ReservationId> after #1828
83    reply_deposits: Vec<(MessageId, u64)>,
84    // Additional information section.
85    program_id: ActorId,
86    source: ActorId,
87    origin_msg_id: MessageId,
88}
89
90impl ContextOutcome {
91    /// Create new ContextOutcome.
92    fn new(program_id: ActorId, source: ActorId, origin_msg_id: MessageId) -> Self {
93        Self {
94            init: Vec::new(),
95            handle: Vec::new(),
96            reply: None,
97            awakening: Vec::new(),
98            reply_deposits: Vec::new(),
99            program_id,
100            source,
101            origin_msg_id,
102        }
103    }
104
105    /// Destructs outcome after execution and returns provided dispatches and awaken message ids.
106    pub fn drain(self) -> ContextOutcomeDrain {
107        let mut dispatches = Vec::new();
108        let reply_sent = self.reply.is_some();
109
110        for (msg, delay, reservation) in self.init.into_iter() {
111            dispatches.push((msg.into_dispatch(self.program_id), delay, reservation));
112        }
113
114        for (msg, delay, reservation) in self.handle.into_iter() {
115            dispatches.push((msg.into_dispatch(self.program_id), delay, reservation));
116        }
117
118        if let Some((msg, reservation)) = self.reply {
119            dispatches.push((
120                msg.into_dispatch(self.program_id, self.source, self.origin_msg_id),
121                0,
122                reservation,
123            ));
124        };
125
126        ContextOutcomeDrain {
127            outgoing_dispatches: dispatches,
128            awakening: self.awakening,
129            reply_deposits: self.reply_deposits,
130            reply_sent,
131        }
132    }
133}
134/// Store of current temporary message execution context.
135#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, Decode, Encode, TypeInfo)]
136pub struct OutgoingPayloads {
137    handles: BTreeMap<u32, Option<Payload>>,
138    reply: Option<Payload>,
139    bytes_counter: u32,
140}
141
142/// Store of previous message execution context.
143#[derive(
144    Clone, Debug, Default, PartialEq, Eq, Hash, Decode, DecodeAsType, Encode, EncodeAsType, TypeInfo,
145)]
146#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
147pub struct ContextStore {
148    initialized: BTreeSet<ActorId>,
149    reservation_nonce: ReservationNonce,
150    system_reservation: Option<u64>,
151    /// Used to prevent creating messages with the same ID in DB. Before this was achieved by using `outgoing.len()`
152    /// but now it is moved to [OutgoingPayloads] thus we need to keep nonce here. Now to calculate nonce we simple increment `local_nonce`
153    /// in each `init` call.
154    local_nonce: u32,
155}
156
157impl ContextStore {
158    // TODO: Remove, only used in migrations (#issue 3721)
159    /// Create a new context store with the provided parameters.
160    pub fn new(
161        initialized: BTreeSet<ActorId>,
162        reservation_nonce: ReservationNonce,
163        system_reservation: Option<u64>,
164        local_nonce: u32,
165    ) -> Self {
166        Self {
167            initialized,
168            reservation_nonce,
169            system_reservation,
170            local_nonce,
171        }
172    }
173
174    /// Returns stored within message context reservation nonce.
175    ///
176    /// Will be non zero, if any reservations were created during
177    /// previous execution of the message.
178    pub(crate) fn reservation_nonce(&self) -> ReservationNonce {
179        self.reservation_nonce
180    }
181
182    /// Set reservation nonce from gas reserver.
183    ///
184    /// Gas reserver has actual nonce state during/after execution.
185    pub fn set_reservation_nonce(&mut self, gas_reserver: &GasReserver) {
186        self.reservation_nonce = gas_reserver.nonce();
187    }
188
189    /// Set system reservation.
190    pub fn add_system_reservation(&mut self, amount: u64) {
191        let reservation = &mut self.system_reservation;
192        *reservation = reservation
193            .map(|reservation| reservation.saturating_add(amount))
194            .or(Some(amount));
195    }
196
197    /// Get system reservation.
198    pub fn system_reservation(&self) -> Option<u64> {
199        self.system_reservation
200    }
201}
202
203/// Context of currently processing incoming message.
204#[derive(Clone, Debug)]
205pub struct MessageContext {
206    kind: DispatchKind,
207    current: IncomingMessage,
208    outcome: ContextOutcome,
209    store: ContextStore,
210    outgoing_payloads: OutgoingPayloads,
211    settings: ContextSettings,
212}
213
214impl MessageContext {
215    /// Create new message context.
216    /// Returns `None` if outgoing messages bytes limit exceeded.
217    pub fn new(dispatch: IncomingDispatch, program_id: ActorId, settings: ContextSettings) -> Self {
218        let (kind, message, store) = dispatch.into_parts();
219
220        Self {
221            kind,
222            outcome: ContextOutcome::new(program_id, message.source(), message.id()),
223            current: message,
224            store: store.unwrap_or_default(),
225            outgoing_payloads: OutgoingPayloads::default(),
226            settings,
227        }
228    }
229
230    /// Getter for inner settings.
231    pub fn settings(&self) -> &ContextSettings {
232        &self.settings
233    }
234
235    /// Getter for inner dispatch kind
236    pub fn kind(&self) -> DispatchKind {
237        self.kind
238    }
239
240    fn check_reply_availability(&self) -> Result<(), ExecutionError> {
241        if !matches!(self.kind, DispatchKind::Init | DispatchKind::Handle) {
242            return Err(ExecutionError::IncorrectEntryForReply);
243        }
244
245        Ok(())
246    }
247
248    fn increase_counter(counter: u32, amount: impl TryInto<u32>, limit: u32) -> Option<u32> {
249        TryInto::<u32>::try_into(amount)
250            .ok()
251            .and_then(|amount| counter.checked_add(amount))
252            .and_then(|counter| (counter <= limit).then_some(counter))
253    }
254
255    /// Return bool defining was reply sent within the execution.
256    pub fn reply_sent(&self) -> bool {
257        self.outcome.reply.is_some()
258    }
259
260    /// Send a new program initialization message.
261    ///
262    /// Generates a new message from provided data packet.
263    /// Returns message id and generated program id.
264    pub fn init_program(
265        &mut self,
266        packet: InitPacket,
267        delay: u32,
268    ) -> Result<(MessageId, ActorId), Error> {
269        let program_id = packet.destination();
270
271        if self.store.initialized.contains(&program_id) {
272            return Err(Error::DuplicateInit);
273        }
274
275        let last = self.store.local_nonce;
276
277        if last >= self.settings.outgoing_limit {
278            return Err(Error::OutgoingMessagesAmountLimitExceeded);
279        }
280
281        let message_id = MessageId::generate_outgoing(self.current.id(), last);
282        let message = InitMessage::from_packet(message_id, packet);
283        self.store.local_nonce += 1;
284        self.outgoing_payloads.handles.insert(last, None);
285        self.store.initialized.insert(program_id);
286        self.outcome.init.push((message, delay, None));
287
288        Ok((message_id, program_id))
289    }
290
291    /// Send a new program initialization message.
292    ///
293    /// Generates message from provided data packet and stored by handle payload.
294    /// Returns message id.
295    pub fn send_commit(
296        &mut self,
297        handle: u32,
298        mut packet: HandlePacket,
299        delay: u32,
300        reservation: Option<ReservationId>,
301    ) -> Result<MessageId, Error> {
302        let outgoing = self
303            .outgoing_payloads
304            .handles
305            .get_mut(&handle)
306            .ok_or(Error::OutOfBounds)?;
307        let data = outgoing.take().ok_or(Error::LateAccess)?;
308
309        let do_send_commit = || {
310            let Some(new_outgoing_bytes) = Self::increase_counter(
311                self.outgoing_payloads.bytes_counter,
312                packet.payload_len(),
313                self.settings.outgoing_bytes_limit,
314            ) else {
315                return Err((Error::OutgoingMessagesBytesLimitExceeded, data));
316            };
317
318            packet
319                .try_prepend(data)
320                .map_err(|data| (Error::MaxMessageSizeExceed, data))?;
321
322            let message_id = MessageId::generate_outgoing(self.current.id(), handle);
323            let message = HandleMessage::from_packet(message_id, packet);
324
325            self.outcome.handle.push((message, delay, reservation));
326
327            // Increasing `outgoing_bytes_counter`, instead of decreasing it,
328            // because this counter takes into account also messages,
329            // that are already committed during this execution.
330            // The message subsequent executions will recalculate this counter from
331            // store outgoing messages (see `Self::new`),
332            // so committed during this execution messages won't be taken into account
333            // during next executions.
334            self.outgoing_payloads.bytes_counter = new_outgoing_bytes;
335
336            Ok(message_id)
337        };
338
339        do_send_commit().map_err(|(err, data)| {
340            *outgoing = Some(data);
341            err
342        })
343    }
344
345    /// Provide space for storing payload for future message creation.
346    ///
347    /// Returns it's handle.
348    pub fn send_init(&mut self) -> Result<u32, Error> {
349        let last = self.store.local_nonce;
350        if last < self.settings.outgoing_limit {
351            self.store.local_nonce += 1;
352            self.outgoing_payloads
353                .handles
354                .insert(last, Some(Default::default()));
355
356            Ok(last)
357        } else {
358            Err(Error::OutgoingMessagesAmountLimitExceeded)
359        }
360    }
361
362    /// Pushes payload into stored payload by handle.
363    pub fn send_push(&mut self, handle: u32, buffer: &[u8]) -> Result<(), Error> {
364        let data = match self.outgoing_payloads.handles.get_mut(&handle) {
365            Some(Some(data)) => data,
366            Some(None) => return Err(Error::LateAccess),
367            None => return Err(Error::OutOfBounds),
368        };
369
370        let new_outgoing_bytes = Self::increase_counter(
371            self.outgoing_payloads.bytes_counter,
372            buffer.len(),
373            self.settings.outgoing_bytes_limit,
374        )
375        .ok_or(Error::OutgoingMessagesBytesLimitExceeded)?;
376
377        data.try_extend_from_slice(buffer)
378            .map_err(|_| Error::MaxMessageSizeExceed)?;
379
380        self.outgoing_payloads.bytes_counter = new_outgoing_bytes;
381
382        Ok(())
383    }
384
385    /// Pushes the incoming buffer/payload into stored payload by handle.
386    pub fn send_push_input(&mut self, handle: u32, range: CheckedRange) -> Result<(), Error> {
387        let data = match self.outgoing_payloads.handles.get_mut(&handle) {
388            Some(Some(data)) => data,
389            Some(None) => return Err(Error::LateAccess),
390            None => return Err(Error::OutOfBounds),
391        };
392
393        let bytes_amount = range.len();
394        let CheckedRange {
395            offset,
396            excluded_end,
397        } = range;
398
399        let new_outgoing_bytes = Self::increase_counter(
400            self.outgoing_payloads.bytes_counter,
401            bytes_amount,
402            self.settings.outgoing_bytes_limit,
403        )
404        .ok_or(Error::OutgoingMessagesBytesLimitExceeded)?;
405
406        data.try_extend_from_slice(&self.current.payload()[offset..excluded_end])
407            .map_err(|_| Error::MaxMessageSizeExceed)?;
408
409        self.outgoing_payloads.bytes_counter = new_outgoing_bytes;
410
411        Ok(())
412    }
413
414    /// Check if provided `offset`/`len` are correct for the current payload
415    /// limits. Result `CheckedRange` instance is accepted by
416    /// `send_push_input`/`reply_push_input` and has the method `len`
417    /// allowing to charge gas before the calls.
418    pub fn check_input_range(&self, offset: u32, len: u32) -> Result<CheckedRange, Error> {
419        let input_len = self.current.payload().len();
420        let offset = offset as usize;
421        let len = len as usize;
422
423        // Check `offset` is not out of bounds.
424        if offset >= input_len {
425            return Err(Error::OutOfBoundsInputSliceOffset);
426        }
427
428        // Check `len` for the current `offset` doesn't refer to the slice out of input bounds.
429        let available_len = input_len - offset;
430        if len > available_len {
431            return Err(Error::OutOfBoundsInputSliceLength);
432        }
433
434        Ok(CheckedRange {
435            offset,
436            // guaranteed to be `<= input.len()`, because of the check upper
437            excluded_end: offset.saturating_add(len),
438        })
439    }
440
441    /// Send reply message.
442    ///
443    /// Generates reply from provided data packet and stored reply payload.
444    /// Returns message id.
445    pub fn reply_commit(
446        &mut self,
447        mut packet: ReplyPacket,
448        reservation: Option<ReservationId>,
449    ) -> Result<MessageId, ExtError> {
450        self.check_reply_availability()?;
451
452        if self.reply_sent() {
453            return Err(Error::DuplicateReply.into());
454        }
455
456        let data = self.outgoing_payloads.reply.take().unwrap_or_default();
457
458        if let Err(data) = packet.try_prepend(data) {
459            self.outgoing_payloads.reply = Some(data);
460            return Err(Error::MaxMessageSizeExceed.into());
461        }
462
463        let message_id = MessageId::generate_reply(self.current.id());
464        let message = ReplyMessage::from_packet(message_id, packet);
465
466        self.outcome.reply = Some((message, reservation));
467
468        Ok(message_id)
469    }
470
471    /// Pushes payload into stored reply payload.
472    pub fn reply_push(&mut self, buffer: &[u8]) -> Result<(), ExtError> {
473        self.check_reply_availability()?;
474
475        if self.reply_sent() {
476            return Err(Error::LateAccess.into());
477        }
478
479        // NOTE: it's normal to not undone `get_or_insert_with` in case of error
480        self.outgoing_payloads
481            .reply
482            .get_or_insert_with(Default::default)
483            .try_extend_from_slice(buffer)
484            .map_err(|_| Error::MaxMessageSizeExceed.into())
485    }
486
487    /// Return reply destination.
488    pub fn reply_destination(&self) -> ActorId {
489        self.outcome.source
490    }
491
492    /// Pushes the incoming message buffer into stored reply payload.
493    pub fn reply_push_input(&mut self, range: CheckedRange) -> Result<(), ExtError> {
494        self.check_reply_availability()?;
495
496        if self.reply_sent() {
497            return Err(Error::LateAccess.into());
498        }
499
500        let CheckedRange {
501            offset,
502            excluded_end,
503        } = range;
504
505        // NOTE: it's normal to not undone `get_or_insert_with` in case of error
506        self.outgoing_payloads
507            .reply
508            .get_or_insert_with(Default::default)
509            .try_extend_from_slice(&self.current.payload()[offset..excluded_end])
510            .map_err(|_| Error::MaxMessageSizeExceed.into())
511    }
512
513    /// Wake message by it's message id.
514    pub fn wake(&mut self, waker_id: MessageId, delay: u32) -> Result<(), Error> {
515        if !self.outcome.awakening.iter().any(|v| v.0 == waker_id) {
516            self.outcome.awakening.push((waker_id, delay));
517            Ok(())
518        } else {
519            Err(Error::DuplicateWaking)
520        }
521    }
522
523    /// Create deposit to handle future reply on message id was sent.
524    pub fn reply_deposit(
525        &mut self,
526        message_id: MessageId,
527        amount: u64,
528    ) -> Result<(), MessageError> {
529        if self
530            .outcome
531            .reply_deposits
532            .iter()
533            .any(|(mid, _)| mid == &message_id)
534        {
535            return Err(MessageError::DuplicateReplyDeposit);
536        }
537
538        if !self
539            .outcome
540            .handle
541            .iter()
542            .any(|(message, ..)| message.id() == message_id)
543            && !self
544                .outcome
545                .init
546                .iter()
547                .any(|(message, ..)| message.id() == message_id)
548        {
549            return Err(MessageError::IncorrectMessageForReplyDeposit);
550        }
551
552        self.outcome.reply_deposits.push((message_id, amount));
553
554        Ok(())
555    }
556
557    /// Current processing incoming message.
558    pub fn current(&self) -> &IncomingMessage {
559        &self.current
560    }
561
562    /// Current program's id.
563    pub fn program_id(&self) -> ActorId {
564        self.outcome.program_id
565    }
566
567    /// Destructs context after execution and returns provided outcome and store.
568    pub fn drain(self) -> (ContextOutcome, ContextStore) {
569        let Self { outcome, store, .. } = self;
570
571        (outcome, store)
572    }
573}
574
575pub struct CheckedRange {
576    offset: usize,
577    excluded_end: usize,
578}
579
580impl CheckedRange {
581    pub fn len(&self) -> u32 {
582        (self.excluded_end - self.offset) as u32
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use alloc::vec;
590    use core::convert::TryInto;
591
592    macro_rules! assert_ok {
593        ( $x:expr $(,)? ) => {
594            let is = $x;
595            match is {
596                Ok(_) => (),
597                _ => assert!(false, "Expected Ok(_). Got {:#?}", is),
598            }
599        };
600        ( $x:expr, $y:expr $(,)? ) => {
601            assert_eq!($x, Ok($y));
602        };
603    }
604
605    macro_rules! assert_err {
606        ( $x:expr , $y:expr $(,)? ) => {
607            assert_eq!($x, Err($y.into()));
608        };
609    }
610
611    // Set of constants for clarity of a part of the test
612    const INCOMING_MESSAGE_ID: u64 = 3;
613    const INCOMING_MESSAGE_SOURCE: u64 = 4;
614
615    #[test]
616    fn duplicated_init() {
617        let mut message_context = MessageContext::new(
618            Default::default(),
619            Default::default(),
620            ContextSettings::with_outgoing_limits(1024, u32::MAX),
621        );
622
623        // first init to default ActorId.
624        assert_ok!(message_context.init_program(Default::default(), 0));
625
626        // second init to same default ActorId should get error.
627        assert_err!(
628            message_context.init_program(Default::default(), 0),
629            Error::DuplicateInit,
630        );
631    }
632
633    #[test]
634    fn send_push_bytes_exceeded() {
635        let mut message_context = MessageContext::new(
636            Default::default(),
637            Default::default(),
638            ContextSettings::with_outgoing_limits(1024, 10),
639        );
640
641        let handle = message_context.send_init().unwrap();
642
643        // push 5 bytes
644        assert_ok!(message_context.send_push(handle, &[1, 2, 3, 4, 5]));
645
646        // push 5 bytes
647        assert_ok!(message_context.send_push(handle, &[1, 2, 3, 4, 5]));
648
649        // push 1 byte should get error.
650        assert_err!(
651            message_context.send_push(handle, &[1]),
652            Error::OutgoingMessagesBytesLimitExceeded,
653        );
654    }
655
656    #[test]
657    fn send_commit_bytes_exceeded() {
658        let mut message_context = MessageContext::new(
659            Default::default(),
660            Default::default(),
661            ContextSettings::with_outgoing_limits(1024, 10),
662        );
663
664        let handle = message_context.send_init().unwrap();
665
666        // push 5 bytes
667        assert_ok!(message_context.send_push(handle, &[1, 2, 3, 4, 5]));
668
669        // commit 6 bytes should get error.
670        assert_err!(
671            message_context.send_commit(
672                handle,
673                HandlePacket::new(
674                    Default::default(),
675                    Payload::try_from([1, 2, 3, 4, 5, 6].to_vec()).unwrap(),
676                    0
677                ),
678                0,
679                None
680            ),
681            Error::OutgoingMessagesBytesLimitExceeded,
682        );
683
684        // commit 5 bytes should be ok.
685        assert_ok!(message_context.send_commit(
686            handle,
687            HandlePacket::new(
688                Default::default(),
689                Payload::try_from([1, 2, 3, 4, 5].to_vec()).unwrap(),
690                0,
691            ),
692            0,
693            None,
694        ));
695
696        let messages = message_context.drain().0.drain().outgoing_dispatches;
697        assert_eq!(
698            messages[0].0.payload_bytes(),
699            [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
700        );
701    }
702
703    #[test]
704    fn send_commit_message_size_limit() {
705        let mut message_context = MessageContext::new(
706            Default::default(),
707            Default::default(),
708            ContextSettings::with_outgoing_limits(1024, u32::MAX),
709        );
710
711        let handle = message_context.send_init().unwrap();
712
713        // push 1 byte
714        assert_ok!(message_context.send_push(handle, &[1]));
715
716        let payload = Payload::repeat(2);
717        assert_err!(
718            message_context.send_commit(
719                handle,
720                HandlePacket::new(Default::default(), payload, 0),
721                0,
722                None
723            ),
724            Error::MaxMessageSizeExceed,
725        );
726
727        let payload = Payload::try_from(vec![1; Payload::MAX_LEN - 1]).unwrap();
728        assert_ok!(message_context.send_commit(
729            handle,
730            HandlePacket::new(Default::default(), payload, 0),
731            0,
732            None,
733        ));
734
735        let messages = message_context.drain().0.drain().outgoing_dispatches;
736        assert_eq!(
737            Payload::try_from(messages[0].0.payload_bytes().to_vec()).unwrap(),
738            Payload::repeat(1)
739        );
740    }
741
742    #[test]
743    fn send_push_input_bytes_exceeded() {
744        let incoming_message = IncomingMessage::new(
745            MessageId::from(INCOMING_MESSAGE_ID),
746            ActorId::from(INCOMING_MESSAGE_SOURCE),
747            vec![1, 2, 3, 4, 5].try_into().unwrap(),
748            0,
749            0,
750            None,
751        );
752
753        let incoming_dispatch = IncomingDispatch::new(DispatchKind::Handle, incoming_message, None);
754
755        // Creating a message context
756        let mut message_context = MessageContext::new(
757            incoming_dispatch,
758            Default::default(),
759            ContextSettings::with_outgoing_limits(1024, 10),
760        );
761
762        let handle = message_context.send_init().unwrap();
763
764        // push 5 bytes
765        assert_ok!(message_context.send_push_input(
766            handle,
767            CheckedRange {
768                offset: 0,
769                excluded_end: 5,
770            }
771        ));
772
773        // push 5 bytes
774        assert_ok!(message_context.send_push_input(
775            handle,
776            CheckedRange {
777                offset: 0,
778                excluded_end: 5,
779            }
780        ));
781
782        // push 1 byte should get error.
783        assert_err!(
784            message_context.send_push_input(
785                handle,
786                CheckedRange {
787                    offset: 0,
788                    excluded_end: 1,
789                }
790            ),
791            Error::OutgoingMessagesBytesLimitExceeded,
792        );
793    }
794
795    #[test]
796    fn outgoing_limit_exceeded() {
797        // Check that we can always send exactly outgoing_limit messages.
798        let max_n = 5;
799
800        for n in 0..=max_n {
801            // for outgoing_limit n checking that LimitExceeded will be after n's message.
802            let settings = ContextSettings::with_outgoing_limits(n, u32::MAX);
803
804            let mut message_context =
805                MessageContext::new(Default::default(), Default::default(), settings);
806            // send n messages
807            for _ in 0..n {
808                let handle = message_context.send_init().expect("unreachable");
809                message_context
810                    .send_push(handle, b"payload")
811                    .expect("unreachable");
812                message_context
813                    .send_commit(handle, HandlePacket::default(), 0, None)
814                    .expect("unreachable");
815            }
816            // n + 1 should get first error.
817            let limit_exceeded = message_context.send_init();
818            assert_eq!(
819                limit_exceeded,
820                Err(Error::OutgoingMessagesAmountLimitExceeded)
821            );
822
823            // we can't send messages in this MessageContext.
824            let limit_exceeded = message_context.init_program(Default::default(), 0);
825            assert_eq!(
826                limit_exceeded,
827                Err(Error::OutgoingMessagesAmountLimitExceeded)
828            );
829        }
830    }
831
832    #[test]
833    fn invalid_out_of_bounds() {
834        let mut message_context = MessageContext::new(
835            Default::default(),
836            Default::default(),
837            ContextSettings::with_outgoing_limits(1024, u32::MAX),
838        );
839
840        // Use invalid handle 0.
841        let out_of_bounds = message_context.send_commit(0, Default::default(), 0, None);
842        assert_eq!(out_of_bounds, Err(Error::OutOfBounds));
843
844        // make 0 valid.
845        let valid_handle = message_context.send_init().expect("unreachable");
846        assert_eq!(valid_handle, 0);
847
848        // Use valid handle 0.
849        assert_ok!(message_context.send_commit(0, Default::default(), 0, None));
850
851        // Use invalid handle 42.
852        assert_err!(
853            message_context.send_commit(42, Default::default(), 0, None),
854            Error::OutOfBounds,
855        );
856    }
857
858    #[test]
859    fn double_reply() {
860        let mut message_context = MessageContext::new(
861            Default::default(),
862            Default::default(),
863            ContextSettings::with_outgoing_limits(1024, u32::MAX),
864        );
865
866        // First reply.
867        assert_ok!(message_context.reply_commit(Default::default(), None));
868
869        // Reply twice in one message is forbidden.
870        assert_err!(
871            message_context.reply_commit(Default::default(), None),
872            Error::DuplicateReply,
873        );
874    }
875
876    #[test]
877    fn reply_commit_message_size_limit() {
878        let mut message_context =
879            MessageContext::new(Default::default(), Default::default(), Default::default());
880
881        assert_ok!(message_context.reply_push(&[1]));
882
883        let payload = Payload::repeat(2);
884        assert_err!(
885            message_context.reply_commit(ReplyPacket::new(payload, 0), None),
886            Error::MaxMessageSizeExceed,
887        );
888
889        let payload = Payload::try_from(vec![1; Payload::MAX_LEN - 1]).unwrap();
890        assert_ok!(message_context.reply_commit(ReplyPacket::new(payload, 0), None));
891
892        let messages = message_context.drain().0.drain().outgoing_dispatches;
893        assert_eq!(
894            Payload::try_from(messages[0].0.payload_bytes().to_vec()).unwrap(),
895            Payload::repeat(1)
896        );
897    }
898
899    #[test]
900    /// Test that covers full api of `MessageContext`
901    fn message_context_api() {
902        // Creating an incoming message around which the runner builds the `MessageContext`
903        let incoming_message = IncomingMessage::new(
904            MessageId::from(INCOMING_MESSAGE_ID),
905            ActorId::from(INCOMING_MESSAGE_SOURCE),
906            vec![1, 2].try_into().unwrap(),
907            0,
908            0,
909            None,
910        );
911
912        let incoming_dispatch = IncomingDispatch::new(DispatchKind::Handle, incoming_message, None);
913
914        // Creating a message context
915        let mut context = MessageContext::new(
916            incoming_dispatch,
917            Default::default(),
918            ContextSettings::with_outgoing_limits(1024, u32::MAX),
919        );
920
921        // Checking that the initial parameters of the context match the passed constants
922        assert_eq!(context.current().id(), MessageId::from(INCOMING_MESSAGE_ID));
923
924        // Creating a reply packet
925        let reply_packet = ReplyPacket::new(vec![0, 0].try_into().unwrap(), 0);
926
927        // Checking that we are able to initialize reply
928        assert_ok!(context.reply_push(&[1, 2, 3]));
929
930        // Setting reply message and making sure the operation was successful
931        assert_ok!(context.reply_commit(reply_packet.clone(), None));
932
933        // Checking that the `ReplyMessage` matches the passed one
934        assert_eq!(
935            context
936                .outcome
937                .reply
938                .as_ref()
939                .unwrap()
940                .0
941                .payload_bytes()
942                .to_vec(),
943            vec![1, 2, 3, 0, 0],
944        );
945
946        // Checking that repeated call `reply_push(...)` returns error and does not do anything
947        assert_err!(context.reply_push(&[1]), Error::LateAccess);
948        assert_eq!(
949            context
950                .outcome
951                .reply
952                .as_ref()
953                .unwrap()
954                .0
955                .payload_bytes()
956                .to_vec(),
957            vec![1, 2, 3, 0, 0],
958        );
959
960        // Checking that repeated call `reply_commit(...)` returns error and does not
961        assert_err!(
962            context.reply_commit(reply_packet, None),
963            Error::DuplicateReply
964        );
965
966        // Checking that at this point vector of outgoing messages is empty
967        assert!(context.outcome.handle.is_empty());
968
969        // Creating an expected handle for a future initialized message
970        let expected_handle = 0;
971
972        // Initializing message and compare its handle with expected one
973        assert_eq!(
974            context.send_init().expect("Error initializing new message"),
975            expected_handle
976        );
977
978        // And checking that it is not formed
979        assert!(
980            context
981                .outgoing_payloads
982                .handles
983                .get(&expected_handle)
984                .expect("This key should be")
985                .is_some()
986        );
987
988        // Checking that we are able to push payload for the
989        // message that we have not committed yet
990        assert_ok!(context.send_push(expected_handle, &[5, 7]));
991        assert_ok!(context.send_push(expected_handle, &[9]));
992
993        // Creating an outgoing packet to commit sending by parts
994        let commit_packet = HandlePacket::default();
995
996        // Checking if commit is successful
997        assert_ok!(context.send_commit(expected_handle, commit_packet, 0, None));
998
999        // Checking that we are **NOT** able to push payload for the message or
1000        // commit it if we already committed it or directly pushed before
1001        assert_err!(
1002            context.send_push(expected_handle, &[5, 7]),
1003            Error::LateAccess,
1004        );
1005        assert_err!(
1006            context.send_commit(expected_handle, HandlePacket::default(), 0, None),
1007            Error::LateAccess,
1008        );
1009
1010        // Creating a handle to push and do commit non-existent message
1011        let expected_handle = 15;
1012
1013        // Checking that we also get an error when trying
1014        // to commit or send a non-existent message
1015        assert_err!(context.send_push(expected_handle, &[0]), Error::OutOfBounds);
1016        assert_err!(
1017            context.send_commit(expected_handle, HandlePacket::default(), 0, None),
1018            Error::OutOfBounds,
1019        );
1020
1021        // Creating a handle to init and do not commit later
1022        // to show that the message will not be sent
1023        let expected_handle = 1;
1024
1025        assert_eq!(
1026            context.send_init().expect("Error initializing new message"),
1027            expected_handle
1028        );
1029        assert_ok!(context.send_push(expected_handle, &[2, 2]));
1030
1031        // Checking that reply message not lost and matches our initial
1032        assert!(context.outcome.reply.is_some());
1033        assert_eq!(
1034            context.outcome.reply.as_ref().unwrap().0.payload_bytes(),
1035            vec![1, 2, 3, 0, 0]
1036        );
1037
1038        // Checking that on drain we get only messages that were fully formed (directly sent or committed)
1039        let (expected_result, _) = context.drain();
1040        assert_eq!(expected_result.handle.len(), 1);
1041        assert_eq!(expected_result.handle[0].0.payload_bytes(), vec![5, 7, 9]);
1042    }
1043
1044    #[test]
1045    fn duplicate_waking() {
1046        let incoming_message = IncomingMessage::new(
1047            MessageId::from(INCOMING_MESSAGE_ID),
1048            ActorId::from(INCOMING_MESSAGE_SOURCE),
1049            vec![1, 2].try_into().unwrap(),
1050            0,
1051            0,
1052            None,
1053        );
1054
1055        let incoming_dispatch = IncomingDispatch::new(DispatchKind::Handle, incoming_message, None);
1056
1057        let mut context = MessageContext::new(
1058            incoming_dispatch,
1059            Default::default(),
1060            ContextSettings::with_outgoing_limits(1024, u32::MAX),
1061        );
1062
1063        context.wake(MessageId::default(), 10).unwrap();
1064
1065        assert_eq!(
1066            context.wake(MessageId::default(), 1),
1067            Err(Error::DuplicateWaking)
1068        );
1069    }
1070
1071    #[test]
1072    fn duplicate_reply_deposit() {
1073        let incoming_message = IncomingMessage::new(
1074            MessageId::from(INCOMING_MESSAGE_ID),
1075            ActorId::from(INCOMING_MESSAGE_SOURCE),
1076            vec![1, 2].try_into().unwrap(),
1077            0,
1078            0,
1079            None,
1080        );
1081
1082        let incoming_dispatch = IncomingDispatch::new(DispatchKind::Handle, incoming_message, None);
1083
1084        let mut message_context = MessageContext::new(
1085            incoming_dispatch,
1086            Default::default(),
1087            ContextSettings::with_outgoing_limits(1024, u32::MAX),
1088        );
1089
1090        let handle = message_context.send_init().expect("unreachable");
1091        message_context
1092            .send_push(handle, b"payload")
1093            .expect("unreachable");
1094        let message_id = message_context
1095            .send_commit(handle, HandlePacket::default(), 0, None)
1096            .expect("unreachable");
1097
1098        assert!(message_context.reply_deposit(message_id, 1234).is_ok());
1099        assert_err!(
1100            message_context.reply_deposit(message_id, 1234),
1101            MessageError::DuplicateReplyDeposit
1102        );
1103    }
1104
1105    #[test]
1106    fn inexistent_reply_deposit() {
1107        let incoming_message = IncomingMessage::new(
1108            MessageId::from(INCOMING_MESSAGE_ID),
1109            ActorId::from(INCOMING_MESSAGE_SOURCE),
1110            vec![1, 2].try_into().unwrap(),
1111            0,
1112            0,
1113            None,
1114        );
1115
1116        let incoming_dispatch = IncomingDispatch::new(DispatchKind::Handle, incoming_message, None);
1117
1118        let mut message_context = MessageContext::new(
1119            incoming_dispatch,
1120            Default::default(),
1121            ContextSettings::with_outgoing_limits(1024, u32::MAX),
1122        );
1123
1124        let message_id = message_context
1125            .reply_commit(ReplyPacket::default(), None)
1126            .expect("unreachable");
1127
1128        assert_err!(
1129            message_context.reply_deposit(message_id, 1234),
1130            MessageError::IncorrectMessageForReplyDeposit
1131        );
1132        assert_err!(
1133            message_context.reply_deposit(Default::default(), 1234),
1134            MessageError::IncorrectMessageForReplyDeposit
1135        );
1136    }
1137}