coap_zero/endpoint/
incoming.rs

1// Copyright Open Logistics Foundation
2//
3// Licensed under the Open Logistics Foundation License 1.3.
4// For details on the licensing terms, see the LICENSE file.
5// SPDX-License-Identifier: OLFL-1.3
6
7//! The incoming communication path
8
9use core::marker::PhantomData;
10
11use embedded_nal::UdpClientStack;
12use embedded_timers::clock::Clock;
13use heapless::Vec;
14
15use crate::message::{
16    codes::ResponseCode, encoded_message::EncodedMessage, options::CoapOption, Message, Type,
17};
18
19use super::{
20    error::Error, Connection, MessageBuffer, MessageIdentification, RetransmissionState,
21    RetransmissionTimeout, TransmissionParameters,
22};
23
24/// The state in which the incoming communication path is currently
25#[derive(Debug, Clone, Copy)]
26pub enum IncomingState {
27    /// New requests can be handled
28    //
29    // In this state, it depends on the contained [`LastResponse`] what the internal message
30    // buffer contains. Since it should not be required, access to it is forbidden.
31    Idle(LastResponse),
32    /// A request was received and needs to be handled by the user. The contained `bool` determines
33    /// if the request was confirmable (`true`) or non-confirmable (`false`).
34    ///
35    /// In any case, the response may be sent with [`IncomingCommunication::schedule_response`]
36    /// immediately. For CON requests, this will result in a piggybacked response. For NON
37    /// requests, a NON response will be sent which is considered the default case for NON requests
38    /// (see RFC 7252 5.2.3 Non-confirmable: "If the request message is Non-confirmable, then the
39    /// response SHOULD be returned in a Non-confirmable message as well.")
40    ///
41    /// Furthermore, [`IncomingCommunication::schedule_rst`] may be called in both cases which will
42    /// transmit a RST message.
43    ///
44    /// If a different behavior is desired, the following different options exist:
45    /// - In `Received(true)`, [`IncomingCommunication::schedule_empty_ack`] allows to send an
46    /// empty ACK which will trigger a state transition via [`IncomingState::SendingAck`]
47    /// into [`IncomingState::AwaitingResponse`] where separate responses may be sent.
48    /// Additionally, [`IncomingCommunication::schedule_piggybacked_response`] may be called which
49    /// is identical to [`IncomingCommunication::schedule_response`] in this case.
50    /// - In `Received(false)`, [`IncomingCommunication::schedule_con_response`] and
51    /// [`IncomingCommunication::schedule_non_response`] are available, the second being
52    /// identical to [`IncomingCommunication::schedule_response`] in this case.
53    ///
54    /// In this state, the message buffer contains the request message. Thus, the request
55    /// message may be obtained by calling [`IncomingCommunication::request`].
56    Received(bool),
57    /// An ACK is being sent. Afterwards, we will go into the [`IncomingState::AwaitingResponse`]
58    /// state.
59    //
60    // In this state, the message buffer still contains the request message and may be obtained
61    // with [`IncomingCommunication::request`].
62    SendingAck,
63    /// A RST message is being sent. Afterwards, we will go back to the `Idle` state.
64    SendingRst,
65    /// A piggybacked response is being sent. Afterwards, we will go into the `Idle` state.
66    //
67    // In this state, the message buffer contains the piggybacked response message.
68    SendingPiggybacked,
69    /// The request has been acknowledged, i.e. a separate response is expected. In this state,
70    /// [`IncomingCommunication::schedule_con_response`] and
71    /// [`IncomingCommunication::schedule_non_response`] are available and either of those
72    /// _must_ be called by the user.
73    ///
74    /// In this state, the message buffer still contains the request message and may be obtained
75    /// with [`IncomingCommunication::request`].
76    AwaitingResponse,
77    /// A separate CON response is being sent. We will stay in this state and try retransmissions
78    /// until an ACK to this message is received or until the whole communication attempt (with
79    /// retries) times out.
80    //
81    // In this state, the message buffer contains the separate CON response message.
82    SendingCon(RetransmissionState),
83    /// A separate NON response is being sent.
84    //
85    // In this state, the message buffer contains the separate NON response message.
86    SendingNon,
87}
88
89/// Distinguishes different cases on how we can respond to a request
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum LastResponse {
92    /// We have not yet answered any request
93    //
94    // The internal message_buffer should be `None` in this case.
95    NoResponse,
96    /// The last request was answered with a RST message
97    //
98    // The internal message buffer contains the last request because it has not been overwritten
99    // with an actual response. It should not be required anyways.
100    RespRst,
101    /// The last request was answered with a CON message
102    ///
103    /// The contained `bool` stores if the CON message was successfully acked. If `false`,
104    /// transmitting the CON timed out.
105    //
106    // The internal message buffer contains the CON response that was sent out.
107    RespCon(bool),
108    /// The last request was answered with a separate NON response or a piggybacked response (in
109    /// both cases, we do not know if the requester has received our response)
110    //
111    // The internal message buffer contains the response we have sent out before.
112    RespNon,
113}
114
115/// What has happened in the [`IncomingCommunication`] path when calling
116/// [`CoapEndpoint::process`](super::CoapEndpoint::process)
117///
118/// This type is annotated with `must_use` because some events require user interaction to avoid
119/// getting stuck in specific states. See the documentation for the specific events for more
120/// information.
121#[must_use]
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum IncomingEvent<'a> {
124    /// No _real_ event happened. This pseudo-event is included as a variant here to allow concise
125    /// event handling on the user side.
126    Nothing,
127    /// A new request was received. The user _must_ respond to this event. The event contains the
128    /// `bool` flag if the request is confirmable (`true`) or non-confirmable (`false`) and the
129    /// request message itself.
130    ///
131    /// In any case, the response may be sent with [`IncomingCommunication::schedule_response`]
132    /// immediately. For CON requests, this will result in a piggybacked response. For NON
133    /// requests, a NON response will be sent. For other options, see the
134    /// [`IncomingState::Received`] state.
135    ///
136    /// The contained [`EncodedMessage`] keeps the borrow of the [`Endpoint`](super::CoapEndpoint)
137    /// alive which might be undesirable in some situations. To resolve this, it may safely be `drop`ped, it can be accessed later via [`IncomingCommunication::request`].
138    Request(bool, EncodedMessage<'a>),
139    /// We have received the same request again. Depending on how we have already handled the
140    /// request, appropriate action is taken automatically. For example, if the request has already
141    /// been ACKed but a separate response is still missing, we send the ACK again. If we have sent
142    /// a NON response before which may not have reached the other endpoint, we send it again.
143    DuplicatedRequest,
144    /// We have sent an ACK. With this event, a state transition into
145    /// [`IncomingState::AwaitingResponse`] takes place in which more user interaction is
146    /// _required_.
147    SendAck,
148    /// A separate CON response has been (re-)sent. NON and piggybacked responses directly result
149    /// in a [`IncomingEvent::Success`] event.
150    SendCon,
151    /// We have successfully finished the communication (from our perspective). If our response was
152    /// piggybacked or a separate NON message, we have no guarantee that the other endpoint has
153    /// already received our response. If our response was a separate CON message, the
154    /// corresponding ACK has been received and we know that the other endpoint has received our
155    /// response.
156    Success,
157    /// We have sent a RST message. With this event, a state transition into
158    /// [`IncomingState::Idle`] takes place.
159    SendRst,
160    /// Sending the separate CON response has timed out. Piggybacked or separate NON responses will
161    /// not be ACKed so they can not time out.
162    Timeout,
163    /// We have received a RST when sending a separate CON response
164    RecvRst,
165}
166
167/// Incoming communication path. This handles incoming requests and pings which will be
168/// automatically answered with RST messages.
169#[derive(Debug)]
170pub struct IncomingCommunication<
171    'a,
172    UDP: UdpClientStack,
173    CLOCK: Clock,
174    const BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
175    const MAX_OPTION_COUNT: usize = { crate::DEFAULT_MAX_OPTION_COUNT },
176    const MAX_OPTION_SIZE: usize = { crate::DEFAULT_MAX_OPTION_SIZE },
177> {
178    message_buffer: MessageBuffer<BUFFER_SIZE>,
179    /// Depending on the state, contains the request we are currently handling or the last request
180    /// we have handled (for de-duplication)
181    last_request: Option<MessageIdentification>,
182    state: IncomingState,
183    clock: &'a CLOCK,
184    transmission_parameters: TransmissionParameters,
185    /// Next message ID to create/schedule a separate response. Since the endpoint is in charge of
186    /// the message ID generation, the next message ID must be determined in advance.
187    pub(super) next_message_id: Option<u16>,
188    /// Random value between 0 and 1 to initialize the RetransmissionState required to schedule a
189    /// CON message. Since the endpoint owns the RNG, the next random number must be determined in
190    /// advance.
191    pub(super) next_random: Option<f32>,
192    _udp: PhantomData<UDP>,
193}
194
195impl<
196        'a,
197        UDP,
198        CLOCK,
199        const BUFFER_SIZE: usize,
200        const MAX_OPTION_COUNT: usize,
201        const MAX_OPTION_SIZE: usize,
202    > IncomingCommunication<'a, UDP, CLOCK, BUFFER_SIZE, MAX_OPTION_COUNT, MAX_OPTION_SIZE>
203where
204    UDP: UdpClientStack,
205    CLOCK: Clock,
206{
207    /// Initializes the incoming communication path in the [`IncomingState::Idle`] state so it is
208    /// ready to handle incoming requests.
209    pub fn new(clock: &'a CLOCK, transmission_parameters: TransmissionParameters) -> Self {
210        Self {
211            message_buffer: MessageBuffer::default(),
212            last_request: None,
213            state: IncomingState::Idle(LastResponse::NoResponse),
214            clock,
215            transmission_parameters,
216            next_message_id: None,
217            next_random: None,
218            _udp: PhantomData,
219        }
220    }
221
222    /// Returns the current state of the `IncomingCommunication`. Depending on the state, the user
223    /// _must_ take action to drive the state forward, see the documentation to the states in
224    /// [`IncomingState`].
225    pub fn state(&self) -> IncomingState {
226        self.state
227    }
228
229    /// Resets the internal state machine
230    pub fn reset(&mut self) {
231        *self = Self::new(self.clock, self.transmission_parameters);
232    }
233
234    /// Returns the current request message which is the same that was returned in the `Request`
235    /// event before. Therefore, usage of this method is only required if the decision between
236    /// ACK + separate response and a piggybacked response shall be postponed (the message from the
237    /// `Request` event can probably not be stored due to lifetime requirements).
238    ///
239    /// This method is only available in [`IncomingState::Received`], [`IncomingState::SendingAck`]
240    /// and [`IncomingState::AwaitingResponse`] and will return [`Error::Forbidden`] otherwise.
241    pub fn request(&self) -> Result<EncodedMessage, Error<<UDP as UdpClientStack>::Error>> {
242        use IncomingState::*;
243        match self.state {
244            Received(_) | SendingAck | AwaitingResponse => {
245                Ok(self.message_buffer.message().unwrap())
246            }
247            _ => Err(Error::Forbidden),
248        }
249    }
250
251    /// Schedules an immediate response to be sent.
252    ///
253    /// If a confirmable request was received, a piggy-backed response will be scheduled.
254    /// In case of a non-confirmable request, the response will be send as non-confirmable as well.
255    ///
256    /// For more control, the user can check if the request was confirmable or non-confirmable
257    /// themselves and call [`IncomingCommunication::schedule_empty_ack`],
258    /// [`IncomingCommunication::schedule_piggybacked_response`],
259    /// [`IncomingCommunication::schedule_con_response`] or
260    /// [`IncomingCommunication::schedule_non_response`] accordingly.
261    pub fn schedule_response(
262        &mut self,
263        code: ResponseCode,
264        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
265        payload: Option<&[u8]>,
266    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
267        match self.state {
268            IncomingState::Received(true) => {
269                self.schedule_piggybacked_response(code, options, payload)
270            }
271            IncomingState::Received(false) => self.schedule_non_response(code, options, payload),
272            _ => Err(Error::Forbidden),
273        }
274    }
275
276    /// Schedules an Acknowledgement to be sent.
277    ///
278    /// The ACK will fit the `message_id` for the last received request
279    pub fn schedule_empty_ack(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
280        if !matches!(self.state, IncomingState::Received(true)) {
281            return Err(Error::Forbidden);
282        }
283        self.state = IncomingState::SendingAck;
284        Ok(())
285    }
286
287    /// Schedules a RST message to be sent.
288    pub fn schedule_rst(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
289        if !matches!(self.state, IncomingState::Received(_)) {
290            return Err(Error::Forbidden);
291        }
292        self.state = IncomingState::SendingRst;
293        Ok(())
294    }
295
296    /// Schedules an Acknowledgement with piggy-backed response
297    ///
298    /// The ACK will fit the `message_id` for the last received request.
299    /// The response is defined by the given details (`code`, `payload` and `additional_options`).
300    pub fn schedule_piggybacked_response(
301        &mut self,
302        code: ResponseCode,
303        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
304        payload: Option<&[u8]>,
305    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
306        if !matches!(self.state, IncomingState::Received(true)) {
307            return Err(Error::Busy);
308        }
309
310        let ident = self.last_request.as_ref().unwrap();
311
312        let message: Message<MAX_OPTION_COUNT> = Message::new(
313            Type::Acknowledgement,
314            code.into(),
315            ident.id,
316            ident.token,
317            options,
318            payload,
319        );
320        self.message_buffer.encode(message)?;
321
322        self.state = IncomingState::SendingPiggybacked;
323
324        Ok(())
325    }
326
327    /// Schedules a message to be sent as a separate CON response
328    pub fn schedule_con_response(
329        &mut self,
330        code: ResponseCode,
331        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
332        payload: Option<&[u8]>,
333    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
334        self.schedule_separate_response(Type::Confirmable, code, options, payload)?;
335        let retransmission_state = RetransmissionState::new(
336            self.transmission_parameters,
337            self.next_random.take().unwrap(),
338        );
339        self.state = IncomingState::SendingCon(retransmission_state);
340        Ok(())
341    }
342
343    /// Schedules a message to be sent as a separate NON response
344    pub fn schedule_non_response(
345        &mut self,
346        code: ResponseCode,
347        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
348        payload: Option<&[u8]>,
349    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
350        self.schedule_separate_response(Type::NonConfirmable, code, options, payload)?;
351        self.state = IncomingState::SendingNon;
352        Ok(())
353    }
354
355    /// Schedules a message to be sent as a response for an incoming request.
356    ///
357    /// Internal method to handle CON and NON cases
358    fn schedule_separate_response(
359        &mut self,
360        response_type: Type,
361        code: ResponseCode,
362        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
363        payload: Option<&[u8]>,
364    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
365        if !matches!(self.state, IncomingState::AwaitingResponse)
366            && !matches!(self.state, IncomingState::Received(false))
367        {
368            return Err(Error::Busy);
369        }
370
371        let message: Message<MAX_OPTION_COUNT> = Message::new(
372            response_type,
373            code.into(),
374            self.next_message_id.unwrap(),
375            self.last_request.unwrap().token,
376            options,
377            payload,
378        );
379
380        self.message_buffer.encode(message)?;
381
382        self.next_message_id = None;
383
384        Ok(())
385    }
386
387    /// Drives the internal state machine
388    ///
389    /// If we receive a new request in a non-Idle state, it is ignored here. This seems to be the
390    /// right thing to do if we imagine a setup with NSTART>1 because then, an other
391    /// [`IncomingCommunication`] could handle this request. Currently, this will result in
392    /// unhandled messages in the endpoint which is responsible to react accordingly. It is advised
393    /// to silently ignore the request to trigger a retransmission on the requester's side.
394    pub(crate) fn process_incoming(
395        &mut self,
396        connection: &mut Connection<UDP>,
397        received: &mut Option<EncodedMessage<'_>>,
398    ) -> Result<IncomingEvent, Error<<UDP as UdpClientStack>::Error>> {
399        use IncomingEvent::*;
400        use IncomingState::*;
401        use LastResponse::*;
402
403        // We can generate events when we handle the received message or when an internal
404        // retransmission timeout occurs. So whenever either of those happens, we should skip the
405        // other because we can not return two events at the same time. Since we can only handle a
406        // received message _now_, we start by handling messages and skip checking timeouts. If the
407        // user calls the process method often enough, the timeout will trigger anyways in the next
408        // iteration.
409        //
410        // Message de-duplication happens in nearly all states. To not duplicate the de-duplication
411        // code in every state, we handle de-duplication in a first match and the other cases in a
412        // second match.
413
414        if let (Some(message), Some(last_message_ident)) = (&received, self.last_request) {
415            if message.is_request().unwrap()
416                && last_message_ident.id == message.message_id()
417                && last_message_ident.token == message.token().unwrap()
418            {
419                match self.state {
420                    Idle(NoResponse) => panic!(
421                        "In the Idle(NoResponse) state, the internal message buffer should be None"
422                    ),
423                    Idle(RespRst) => {
424                        // Previously, we have responded with an ACK. We should do so now, too.
425                        connection.send(&EncodedMessage::rst(last_message_ident.id))?;
426                    }
427                    Idle(RespCon(true)) => {
428                        // The requester has already received and acked our response. So this
429                        // must be a request which has been sent before their ACK. We should
430                        // ignore it.
431                    }
432                    Idle(RespCon(false)) => {
433                        // Our CON response (with retries!) timed out. We have already tried a lot
434                        // to respond but the connection just seems to be bad. Who knows how this
435                        // request reached us again, let's just not handle it again.
436                    }
437                    Idle(RespNon) => {
438                        // We have responded with a NON message which may have not have reached the
439                        // other endpoint. Let's just try again.
440                        connection.send(self.message_buffer.message().unwrap().data)?;
441                    }
442                    Received(_) => {
443                        // We do not know the response yet so we can only ignore it
444                    }
445                    SendingAck | SendingRst | SendingPiggybacked | SendingNon => {
446                        // We can ignore it because a message will be sent now anyways
447                    }
448                    AwaitingResponse => {
449                        // We have already ACKed the request but we do not know the separate
450                        // response yet. It looks like the ACK did not reach the other endpoint, so
451                        // let's send it again.
452                        connection.send(&EncodedMessage::ack(last_message_ident.id))?;
453                    }
454                    SendingCon(_) => {
455                        // We are in the process of (re-)transmitting a CON response anyways so
456                        // we do not need to ACK again
457                    }
458                }
459                received.take();
460                return Ok(DuplicatedRequest);
461            }
462        }
463
464        match self.state {
465            Idle(_) => {
466                // When we get passed some received data,
467                // we need to check if we just received a request.
468                if let Some(message) = received {
469                    if message.is_request().unwrap() {
470                        // New request, we save all metadata and return the message
471                        self.last_request = Some(MessageIdentification {
472                            id: message.message_id(),
473                            token: message.token().unwrap(),
474                        });
475                        let confirmable = message.message_type() == Type::Confirmable;
476                        self.message_buffer.replace_with(received.take().unwrap())?;
477                        self.state = Received(confirmable);
478                        return Ok(Request(confirmable, self.message_buffer.message().unwrap()));
479                    }
480                }
481            }
482            Received(_) | AwaitingResponse => {
483                // We actually don't have anything to do
484                // since we are just waiting for the user to ACK.
485                // In case we are already in state AwaitingResponse,
486                // we wait for the user to send the separate response.
487            }
488            SendingAck => {
489                let ident = self.last_request.as_ref().unwrap();
490                connection.send(&EncodedMessage::ack(ident.id))?;
491                self.state = AwaitingResponse;
492                return Ok(SendAck);
493            }
494            SendingRst => {
495                let ident = self.last_request.as_ref().unwrap();
496                connection.send(&EncodedMessage::rst(ident.id))?;
497                self.state = Idle(RespRst);
498                return Ok(SendRst);
499            }
500            SendingPiggybacked | SendingNon => {
501                connection.send(self.message_buffer.message().unwrap().data)?;
502                self.state = Idle(RespNon);
503                return Ok(Success);
504            }
505            SendingCon(ref mut retransmission_state) => {
506                if let Some(message) = received {
507                    if message.message_type() == Type::Acknowledgement
508                        && message.message_id()
509                            == self.message_buffer.message().unwrap().message_id()
510                    {
511                        received.take();
512                        self.state = Idle(RespCon(true));
513                        return Ok(Success);
514                    } else if message.message_type() == Type::Reset
515                        && message.message_id()
516                            == self.message_buffer.message().unwrap().message_id()
517                    {
518                        received.take();
519                        self.state = Idle(RespCon(false));
520                        return Ok(RecvRst);
521                    }
522                }
523                match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
524                    Ok(true) => {
525                        connection.send(self.message_buffer.message().unwrap().data)?;
526                        return Ok(SendCon);
527                    }
528                    Ok(false) => {}
529                    Err(RetransmissionTimeout) => {
530                        self.state = Idle(RespCon(false));
531                        // Timeout is an Event not an Error because it is specified in the protocol
532                        return Ok(Timeout);
533                    }
534                }
535            }
536        }
537
538        Ok(Nothing)
539    }
540}
541
542#[cfg(test)]
543mod tests {
544    use core::{cell::RefCell, time::Duration};
545
546    use embedded_hal::prelude::_embedded_hal_blocking_rng_Read;
547    use embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
548    use heapless::Vec;
549    use mockall::predicate::*;
550    use mockall::*;
551
552    use crate::{
553        endpoint::incoming::{IncomingEvent, IncomingState},
554        message::{
555            codes::{RequestCode, SuccessCode},
556            token::{Token, TokenLength},
557            Message, Type,
558        },
559    };
560
561    use super::{super::CoapEndpoint, TransmissionParameters};
562
563    #[derive(Debug)]
564    struct Random {
565        value: u128,
566    }
567
568    impl embedded_hal::blocking::rng::Read for Random {
569        type Error = std::io::Error;
570
571        fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
572            self.value += 1;
573
574            let buf_len = buf.len();
575
576            buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
577
578            Ok(())
579        }
580    }
581
582    #[derive(Debug)]
583    struct StackError;
584
585    struct Socket;
586
587    mock! {
588        Stack {}
589
590        impl UdpClientStack for Stack {
591            type UdpSocket = Socket;
592            type Error = StackError;
593
594            fn socket(&mut self) -> Result<Socket, StackError>;
595            fn connect(
596                &mut self,
597                socket: &mut Socket,
598                remote: SocketAddr
599            ) -> Result<(), StackError>;
600            fn send(
601                &mut self,
602                socket: &mut Socket,
603                buffer: &[u8]
604            ) -> Result<(), nb::Error<StackError>>;
605            fn receive(
606                &mut self,
607                socket: &mut Socket,
608                buffer: &mut [u8]
609            ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
610            fn close(&mut self, socket: Socket) -> Result<(), StackError>;
611        }
612    }
613
614    #[derive(Debug)]
615    struct MyClock {
616        last_time: RefCell<Duration>,
617        now: RefCell<Duration>,
618    }
619
620    impl embedded_timers::clock::Clock for MyClock {
621        fn try_now(
622            &self,
623        ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
624            *self.last_time.borrow_mut() = *self.now.borrow();
625            Ok(*self.now.borrow())
626        }
627    }
628
629    impl MyClock {
630        fn advance(&self, step: Duration) {
631            *self.now.borrow_mut() = *self.last_time.borrow() + step;
632        }
633    }
634
635    #[test]
636    fn receive_con_get_piggybacked() {
637        let mut stack = MockStack::default();
638
639        let clock = MyClock {
640            last_time: RefCell::new(Duration::from_secs(0)),
641            now: RefCell::new(Duration::from_secs(1)),
642        };
643
644        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
645
646        let mut endpoint: CoapEndpoint<
647            '_,
648            MockStack,
649            Random,
650            MyClock,
651            8,
652            32,
653            128,
654            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
655        > = CoapEndpoint::try_new(
656            TransmissionParameters::default(),
657            Random { value: 0 },
658            &clock,
659            &mut receive_buffer,
660        )
661        .unwrap();
662
663        stack.expect_socket().once().return_once(|| Ok(Socket));
664        stack.expect_connect().once().return_once(|_, _| Ok(()));
665
666        endpoint
667            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
668            .unwrap();
669
670        stack
671            .expect_receive()
672            .once()
673            .return_once(|_, _| Err(nb::Error::WouldBlock));
674
675        let (incoming, _, _) = endpoint.process(&mut stack).unwrap();
676        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
677        assert!(matches!(
678            endpoint.incoming_communication.state,
679            IncomingState::Idle(_)
680        ));
681
682        let next_message_id = endpoint.message_id_counter.next();
683        let mut token = Token::default();
684        token.length = TokenLength::Eight;
685        endpoint.rng.read(&mut token.bytes).unwrap();
686        let request: Message<'_> = Message::new(
687            Type::Confirmable,
688            RequestCode::Get.into(),
689            next_message_id,
690            token,
691            Vec::new(),
692            Some(b"/hello"),
693        );
694        let response: Message<'_> = Message::new(
695            Type::Acknowledgement,
696            SuccessCode::Content.into(),
697            next_message_id,
698            token,
699            Vec::new(),
700            Some(b"world"),
701        );
702        let mut response_buf = [0_u8; 32];
703        let response_length = response.encode(&mut response_buf).unwrap().message_length();
704
705        stack.expect_receive().once().return_once(move |_, buffer| {
706            let encoded_message = request.encode(buffer).unwrap();
707
708            Ok((
709                encoded_message.message_length(),
710                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
711            ))
712        });
713
714        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
715        assert!(matches!(incoming.unwrap(), IncomingEvent::Request(true, _)));
716        assert!(matches!(
717            endpoint.incoming_communication.state,
718            IncomingState::Received(true)
719        ));
720        // TODO Test Message
721
722        endpoint
723            .incoming()
724            .schedule_piggybacked_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
725            .unwrap();
726
727        stack
728            .expect_receive()
729            .once()
730            .return_once(|_, _| Err(nb::Error::WouldBlock));
731        stack.expect_send().once().return_once(move |_, buffer| {
732            assert_eq!(buffer, &response_buf[..response_length]);
733
734            Ok(())
735        });
736
737        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
738        assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
739        assert!(matches!(
740            endpoint.incoming_communication.state,
741            IncomingState::Idle(_)
742        ));
743    }
744
745    #[test]
746    fn receive_non_get_non() {
747        let mut stack = MockStack::default();
748
749        let clock = MyClock {
750            last_time: RefCell::new(Duration::from_secs(0)),
751            now: RefCell::new(Duration::from_secs(1)),
752        };
753
754        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
755
756        let mut endpoint: CoapEndpoint<
757            '_,
758            MockStack,
759            Random,
760            MyClock,
761            8,
762            32,
763            128,
764            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
765        > = CoapEndpoint::try_new(
766            TransmissionParameters::default(),
767            Random { value: 0 },
768            &clock,
769            &mut receive_buffer,
770        )
771        .unwrap();
772
773        stack.expect_socket().once().return_once(|| Ok(Socket));
774        stack.expect_connect().once().return_once(|_, _| Ok(()));
775
776        endpoint
777            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
778            .unwrap();
779
780        stack
781            .expect_receive()
782            .once()
783            .return_once(|_, _| Err(nb::Error::WouldBlock));
784
785        let (incoming, _, _) = endpoint.process(&mut stack).unwrap();
786        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
787        assert!(matches!(
788            endpoint.incoming_communication.state,
789            IncomingState::Idle(_)
790        ));
791
792        let next_message_id = endpoint.message_id_counter.next();
793        let mut token = Token::default();
794        token.length = TokenLength::Eight;
795        endpoint.rng.read(&mut token.bytes).unwrap();
796        let request: Message<'_> = Message::new(
797            Type::NonConfirmable,
798            RequestCode::Get.into(),
799            next_message_id,
800            token,
801            Vec::new(),
802            Some(b"/hello"),
803        );
804        let response: Message<'_> = Message::new(
805            Type::NonConfirmable,
806            SuccessCode::Content.into(),
807            endpoint.incoming_communication.next_message_id.unwrap(),
808            token,
809            Vec::new(),
810            Some(b"world"),
811        );
812        let mut response_buf = [0_u8; 32];
813        let response_length = response.encode(&mut response_buf).unwrap().message_length();
814
815        stack.expect_receive().once().return_once(move |_, buffer| {
816            let encoded_message = request.encode(buffer).unwrap();
817
818            Ok((
819                encoded_message.message_length(),
820                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
821            ))
822        });
823
824        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
825        assert!(matches!(
826            incoming.unwrap(),
827            IncomingEvent::Request(false, _)
828        ));
829        assert!(matches!(
830            endpoint.incoming_communication.state,
831            IncomingState::Received(false)
832        ));
833        // TODO Test Message
834
835        stack
836            .expect_receive()
837            .once()
838            .return_once(|_, _| Err(nb::Error::WouldBlock));
839
840        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
841        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
842        assert!(matches!(
843            endpoint.incoming_communication.state,
844            IncomingState::Received(false)
845        ));
846
847        endpoint
848            .incoming()
849            .schedule_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
850            .unwrap();
851
852        stack
853            .expect_receive()
854            .once()
855            .return_once(|_, _| Err(nb::Error::WouldBlock));
856        stack.expect_send().once().return_once(move |_, buffer| {
857            assert_eq!(buffer, &response_buf[..response_length]);
858
859            Ok(())
860        });
861
862        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
863        assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
864        assert!(matches!(
865            endpoint.incoming_communication.state,
866            IncomingState::Idle(_)
867        ));
868    }
869
870    #[test]
871    fn receive_non_get_con() {
872        let mut stack = MockStack::default();
873
874        let clock = MyClock {
875            last_time: RefCell::new(Duration::from_secs(0)),
876            now: RefCell::new(Duration::from_secs(1)),
877        };
878
879        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
880
881        let mut endpoint: CoapEndpoint<
882            '_,
883            MockStack,
884            Random,
885            MyClock,
886            8,
887            32,
888            128,
889            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
890        > = CoapEndpoint::try_new(
891            TransmissionParameters::default(),
892            Random { value: 0 },
893            &clock,
894            &mut receive_buffer,
895        )
896        .unwrap();
897
898        let next_message_id = endpoint.message_id_counter.next();
899        let mut token = Token::default();
900        token.length = TokenLength::Eight;
901        endpoint.rng.read(&mut token.bytes).unwrap();
902        let request: Message<'_> = Message::new(
903            Type::NonConfirmable,
904            RequestCode::Get.into(),
905            next_message_id,
906            token,
907            Vec::new(),
908            Some(b"/hello"),
909        );
910
911        stack.expect_socket().once().return_once(|| Ok(Socket));
912        stack.expect_connect().once().return_once(|_, _| Ok(()));
913
914        endpoint
915            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
916            .unwrap();
917
918        stack.expect_receive().once().return_once(move |_, buffer| {
919            let encoded_message = request.encode(buffer).unwrap();
920
921            Ok((
922                encoded_message.message_length(),
923                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
924            ))
925        });
926
927        assert!(matches!(
928            endpoint.incoming_communication.state,
929            IncomingState::Idle(_)
930        ));
931        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
932        assert!(matches!(
933            incoming.unwrap(),
934            IncomingEvent::Request(false, _)
935        ));
936        assert!(matches!(
937            endpoint.incoming_communication.state,
938            IncomingState::Received(false)
939        ));
940
941        endpoint
942            .incoming()
943            .schedule_con_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
944            .unwrap();
945
946        stack
947            .expect_receive()
948            .once()
949            .return_once(|_, _| Err(nb::Error::WouldBlock));
950        stack.expect_send().once().return_once(|_, _| Ok(()));
951
952        assert!(matches!(
953            endpoint.incoming_communication.state,
954            IncomingState::SendingCon(_)
955        ));
956        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
957        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
958        // TODO We have sent the message and waiting for the ack now. Staying in SendingCon feels a little unintuitive.
959        assert!(matches!(
960            endpoint.incoming_communication.state,
961            IncomingState::SendingCon(_)
962        ));
963
964        let ack_id = endpoint
965            .incoming_communication
966            .message_buffer
967            .message()
968            .unwrap()
969            .message_id();
970        let ack = Message::<0>::new_ack(ack_id);
971
972        stack.expect_receive().once().return_once(move |_, buffer| {
973            let encoded_ack = ack.encode(buffer).unwrap();
974            Ok((
975                encoded_ack.message_length(),
976                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
977            ))
978        });
979
980        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
981        assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
982        assert!(matches!(
983            endpoint.incoming_communication.state,
984            IncomingState::Idle(_)
985        ));
986    }
987
988    #[test]
989    fn receive_non_get_con_timeout() {
990        let mut stack = MockStack::default();
991
992        let clock = MyClock {
993            last_time: RefCell::new(Duration::from_secs(0)),
994            now: RefCell::new(Duration::from_secs(1)),
995        };
996
997        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
998
999        let mut endpoint: CoapEndpoint<
1000            '_,
1001            MockStack,
1002            Random,
1003            MyClock,
1004            8,
1005            32,
1006            128,
1007            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1008        > = CoapEndpoint::try_new(
1009            TransmissionParameters::default(),
1010            Random { value: 0 },
1011            &clock,
1012            &mut receive_buffer,
1013        )
1014        .unwrap();
1015
1016        let next_message_id = endpoint.message_id_counter.next();
1017        let mut token = Token::default();
1018        token.length = TokenLength::Eight;
1019        endpoint.rng.read(&mut token.bytes).unwrap();
1020        let request: Message<'_> = Message::new(
1021            Type::NonConfirmable,
1022            RequestCode::Get.into(),
1023            next_message_id,
1024            token,
1025            Vec::new(),
1026            Some(b"/hello"),
1027        );
1028
1029        stack.expect_socket().once().return_once(|| Ok(Socket));
1030        stack.expect_connect().once().return_once(|_, _| Ok(()));
1031
1032        endpoint
1033            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1034            .unwrap();
1035
1036        stack.expect_receive().once().return_once(move |_, buffer| {
1037            let encoded_message = request.encode(buffer).unwrap();
1038
1039            Ok((
1040                encoded_message.message_length(),
1041                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1042            ))
1043        });
1044
1045        assert!(matches!(
1046            endpoint.incoming_communication.state,
1047            IncomingState::Idle(_)
1048        ));
1049        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1050        assert!(matches!(
1051            incoming.unwrap(),
1052            IncomingEvent::Request(false, _)
1053        ));
1054        assert!(matches!(
1055            endpoint.incoming_communication.state,
1056            IncomingState::Received(false)
1057        ));
1058
1059        endpoint
1060            .incoming()
1061            .schedule_con_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
1062            .unwrap();
1063
1064        stack
1065            .expect_receive()
1066            .returning(move |_, _| Err(nb::Error::WouldBlock));
1067        stack.expect_send().returning(|_, _| Ok(()));
1068
1069        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1070        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1071        assert!(matches!(
1072            endpoint.incoming_communication.state,
1073            IncomingState::SendingCon(_)
1074        ));
1075
1076        // 4 Retries
1077        clock.advance(Duration::from_secs(3));
1078        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1079        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1080        assert!(matches!(
1081            endpoint.incoming_communication.state,
1082            IncomingState::SendingCon(_)
1083        ));
1084
1085        clock.advance(Duration::from_secs(5));
1086        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1087        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1088        assert!(matches!(
1089            endpoint.incoming_communication.state,
1090            IncomingState::SendingCon(_)
1091        ));
1092
1093        clock.advance(Duration::from_secs(9));
1094        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1095        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1096        assert!(matches!(
1097            endpoint.incoming_communication.state,
1098            IncomingState::SendingCon(_)
1099        ));
1100
1101        clock.advance(Duration::from_secs(17));
1102        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1103        assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1104        assert!(matches!(
1105            endpoint.incoming_communication.state,
1106            IncomingState::SendingCon(_)
1107        ));
1108
1109        // Timeout
1110        clock.advance(Duration::from_secs(33));
1111        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1112        assert!(matches!(incoming.unwrap(), IncomingEvent::Timeout));
1113        assert!(matches!(
1114            endpoint.incoming_communication.state,
1115            IncomingState::Idle(_)
1116        ));
1117    }
1118
1119    #[test]
1120    fn receive_piggybacked_response() {
1121        let mut stack = MockStack::default();
1122
1123        let clock = MyClock {
1124            last_time: RefCell::new(Duration::from_secs(0)),
1125            now: RefCell::new(Duration::from_secs(1)),
1126        };
1127
1128        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1129
1130        let mut endpoint: CoapEndpoint<
1131            '_,
1132            MockStack,
1133            Random,
1134            MyClock,
1135            8,
1136            32,
1137            128,
1138            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1139        > = CoapEndpoint::try_new(
1140            TransmissionParameters::default(),
1141            Random { value: 0 },
1142            &clock,
1143            &mut receive_buffer,
1144        )
1145        .unwrap();
1146
1147        let message_id = endpoint.message_id_counter.next();
1148        let mut token = Token::default();
1149        token.length = TokenLength::Eight;
1150        endpoint.rng.read(&mut token.bytes).unwrap();
1151        let message: Message<'_> = Message::new(
1152            Type::Acknowledgement,
1153            SuccessCode::Content.into(),
1154            message_id,
1155            token,
1156            Vec::new(),
1157            Some(b"world"),
1158        );
1159
1160        stack.expect_socket().once().return_once(|| Ok(Socket));
1161        stack.expect_connect().once().return_once(|_, _| Ok(()));
1162
1163        endpoint
1164            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1165            .unwrap();
1166
1167        stack.expect_receive().once().return_once(move |_, buffer| {
1168            let encoded_message = message.encode(buffer).unwrap();
1169
1170            Ok((
1171                encoded_message.message_length(),
1172                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1173            ))
1174        });
1175        // The Response should be handled by outgoing or be unhandled, so incoming should do nothing
1176        // at all.
1177
1178        assert!(matches!(
1179            endpoint.incoming_communication.state,
1180            IncomingState::Idle(_)
1181        ));
1182        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1183        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1184        assert!(matches!(
1185            endpoint.incoming_communication.state,
1186            IncomingState::Idle(_)
1187        ));
1188    }
1189
1190    #[test]
1191    fn receive_ack() {
1192        let mut stack = MockStack::default();
1193
1194        let clock = MyClock {
1195            last_time: RefCell::new(Duration::from_secs(0)),
1196            now: RefCell::new(Duration::from_secs(1)),
1197        };
1198
1199        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1200
1201        let mut endpoint: CoapEndpoint<
1202            '_,
1203            MockStack,
1204            Random,
1205            MyClock,
1206            8,
1207            32,
1208            128,
1209            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1210        > = CoapEndpoint::try_new(
1211            TransmissionParameters::default(),
1212            Random { value: 0 },
1213            &clock,
1214            &mut receive_buffer,
1215        )
1216        .unwrap();
1217
1218        let message_id = endpoint.message_id_counter.next();
1219
1220        stack.expect_socket().once().return_once(|| Ok(Socket));
1221        stack.expect_connect().once().return_once(|_, _| Ok(()));
1222
1223        endpoint
1224            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1225            .unwrap();
1226
1227        stack.expect_receive().once().return_once(move |_, buffer| {
1228            let encoded_message = Message::<0>::new_ack(message_id).encode(buffer).unwrap();
1229
1230            Ok((
1231                encoded_message.message_length(),
1232                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1233            ))
1234        });
1235        // The ACK is not handled at all because it is not expected anywhere, so incoming should do
1236        // nothing at all.
1237
1238        assert!(matches!(
1239            endpoint.incoming_communication.state,
1240            IncomingState::Idle(_)
1241        ));
1242        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1243        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1244        assert!(matches!(
1245            endpoint.incoming_communication.state,
1246            IncomingState::Idle(_)
1247        ));
1248    }
1249
1250    #[test]
1251    fn receive_reset() {
1252        let mut stack = MockStack::default();
1253
1254        let clock = MyClock {
1255            last_time: RefCell::new(Duration::from_secs(0)),
1256            now: RefCell::new(Duration::from_secs(1)),
1257        };
1258
1259        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1260
1261        let mut endpoint: CoapEndpoint<
1262            '_,
1263            MockStack,
1264            Random,
1265            MyClock,
1266            8,
1267            32,
1268            128,
1269            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1270        > = CoapEndpoint::try_new(
1271            TransmissionParameters::default(),
1272            Random { value: 0 },
1273            &clock,
1274            &mut receive_buffer,
1275        )
1276        .unwrap();
1277
1278        let message_id = endpoint.message_id_counter.next();
1279        let message: Message<'_> = Message::new_rst(message_id);
1280
1281        stack.expect_socket().once().return_once(|| Ok(Socket));
1282        stack.expect_connect().once().return_once(|_, _| Ok(()));
1283
1284        endpoint
1285            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1286            .unwrap();
1287
1288        stack.expect_receive().once().return_once(move |_, buffer| {
1289            let encoded_message = message.encode(buffer).unwrap();
1290
1291            Ok((
1292                encoded_message.message_length(),
1293                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1294            ))
1295        });
1296        // The Response should be handled by outgoing, so incoming should do nothing at all.
1297
1298        assert!(matches!(
1299            endpoint.incoming_communication.state,
1300            IncomingState::Idle(_)
1301        ));
1302        let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1303        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1304        assert!(matches!(
1305            endpoint.incoming_communication.state,
1306            IncomingState::Idle(_)
1307        ));
1308    }
1309}