coap_zero/endpoint/
mod.rs

1// Copyright Open Logistics Foundation
2//
3// Licensed under the Open Logistics Foundation License 1.3.
4// For details on the licensing terms, see the LICENSE file.
5// SPDX-License-Identifier: OLFL-1.3
6
7//! A CoAP endpoint that can be used for 1-to-1 communication.
8//!
9//! This can act both as a "server" and "client", which means that requests can be sent in both
10//! directions via a single connection.
11//! As stated in [RFC 7252, Section 4.8](https://www.rfc-editor.org/rfc/rfc7252#section-4.8),
12//! the Endpoint assumes `NSTART = 1`.
13//! This means that at any given time, only one single outgoing request and
14//! one single incoming request (including sending the corresponding response) can be handled.
15//!
16//! This Endpoint does not implement a listening server.
17//! So it is not possible to listen to a port and handle any incoming connection.
18//! Instead, only 1-to-1 connections can be established via ip, uri or socket.
19//!
20//! Currently, the [`Token`] size is fixed to 8 bytes (or 0 for Empty messages).
21
22pub mod connect;
23pub mod error;
24pub mod incoming;
25pub mod outgoing;
26
27use core::time::Duration;
28
29use byteorder::{ByteOrder, LittleEndian};
30use embedded_hal::blocking::rng;
31use embedded_nal::{SocketAddr, UdpClientStack};
32use embedded_timers::clock::Clock;
33
34use crate::message::{self, encoded_message::EncodedMessage, token::Token, Message};
35
36use self::{error::Error, incoming::IncomingCommunication, outgoing::OutgoingCommunication};
37
38/// Type alias for an [iri_string] URI
39pub type Uri = iri_string::types::RiReferenceStr<iri_string::spec::UriSpec>;
40
41/// The default port the CoAP via UDP Protocol communicates on
42pub const DEFAULT_COAP_PORT: u16 = 5683;
43
44/// Events that are generated on the endpoint-level
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum EndpointEvent<'a> {
47    /// No _real_ event happened
48    Nothing,
49    /// A message was received which did not pass the [`EncodedMessage::check_msg_format`] test
50    MsgFormatErr(message::Error),
51    /// A "ping" message (empty CON) was received. It has been automatically answered with a RST
52    /// message.
53    Ping,
54    /// We have received a message that could not be handled in any meaningful way. It has been
55    /// automatically answered with a RST message.
56    Unhandled(EncodedMessage<'a>),
57}
58
59/// Parameters of a CoAP Endpoint
60#[derive(Debug, Clone, Copy)]
61pub struct TransmissionParameters {
62    /// Minimum spacing before retransmission
63    pub ack_timeout: Duration,
64    /// Initial Timeout must be between `ack_timeout` and `ack_timeout * ack_random_factor`
65    pub ack_random_factor: f32,
66    /// Maximum number of retransmissions
67    pub max_retransmit: u32,
68}
69
70/// Default transmission parameters as defined in <https://datatracker.ietf.org/doc/html/rfc7252#section-4.8>
71impl Default for TransmissionParameters {
72    fn default() -> Self {
73        Self {
74            ack_timeout: Duration::from_secs(2),
75            ack_random_factor: 1.5,
76            max_retransmit: 4,
77        }
78    }
79}
80
81/// Stores the current (re)transmission state for the current message and implements the
82/// exponential backoff strategy for retransmissions
83#[derive(Debug, Clone, Copy)]
84pub struct RetransmissionState {
85    /// How often the message has already been retransmitted. If the message has not been sent at
86    /// all, this will be None.
87    retransmission_counter: Option<u32>,
88    /// When the last (re)transmission was sent. If the message has never been sent at all, this
89    /// will be None.
90    last_transmission_instant: Option<embedded_timers::clock::Instant>,
91    /// Initial timeout for the first transmission between
92    /// ACK_TIMEOUT and ACK_RANDOM_FACTOR * ACK_TIMEOUT.
93    initial_timeout: Duration,
94    /// Maximum number of retransmissions, copied from the `TransmissionParameters`
95    max_retransmit: u32,
96}
97
98/// Error type for [`RetransmissionState::retransmit_required`]
99pub struct RetransmissionTimeout;
100
101impl RetransmissionState {
102    /// Initializes this `RetransmissionState` with `retransmission_counter` set to None. This
103    /// assumes that the state is initialized from one of the `schedule_` methods which can not
104    /// actually transmit anything but only trigger an internal state change which requires that
105    /// the first message is sent in the next `process_` invocation.
106    ///
107    /// `random` must be a random value between 0 and 1.
108    pub fn new(transmission_parameters: TransmissionParameters, random: f32) -> Self {
109        let min_timeout = transmission_parameters.ack_timeout.as_millis() as f32;
110        let max_timeout = min_timeout * transmission_parameters.ack_random_factor;
111        let initial_timeout_millis = max_timeout * random + min_timeout * (1.0 - random);
112        let initial_timeout = Duration::from_millis(initial_timeout_millis as u64);
113        Self {
114            retransmission_counter: None,
115            last_transmission_instant: None,
116            initial_timeout,
117            max_retransmit: transmission_parameters.max_retransmit,
118        }
119    }
120
121    /// Returns if a retransmission is required, we should keep waiting or if the whole
122    /// transmission has timed out. Returns:
123    /// - `Some(true)` if a retransmission should happen
124    /// - `Some(false)` if we should keep waiting
125    /// - `Err(Error::Timeout)` if the transmission (with retransmissions) has timed out
126    ///
127    /// If a retransmission is required, this will increase the internal retransmission timer so
128    /// this assumes that the retransmission will be triggered.
129    pub fn retransmit_required(
130        &mut self,
131        now: embedded_timers::clock::Instant,
132    ) -> Result<bool, RetransmissionTimeout> {
133        match (self.retransmission_counter, self.last_transmission_instant) {
134            (Some(retransmission_counter), Some(last_instant)) => {
135                // From the CoAP RFC 4.2. Messages Transmitted Reliably:
136                // "When the timeout is triggered and the retransmission counter is less than
137                // MAX_RETRANSMIT, the message is retransmitted, the retransmission counter is
138                // incremented, and the timeout is doubled."
139                let waited = now - last_instant;
140                let timeout = self.initial_timeout * 2u32.pow(retransmission_counter);
141                if waited < timeout {
142                    Ok(false)
143                } else if retransmission_counter < self.max_retransmit {
144                    self.retransmission_counter = Some(retransmission_counter + 1);
145                    self.last_transmission_instant = Some(now);
146                    Ok(true)
147                } else {
148                    Err(RetransmissionTimeout)
149                }
150            }
151            _ => {
152                self.retransmission_counter = Some(0);
153                self.last_transmission_instant = Some(now);
154                Ok(true)
155            }
156        }
157    }
158}
159
160/// Gets a random number between 0 and 1
161fn get_random<RNG: rng::Read>(rng: &mut RNG) -> Result<f32, RNG::Error> {
162    let mut random_buf = [0_u8; 4];
163    rng.read(&mut random_buf)?;
164    // random number between 0 and 1
165    Ok(u32::from_be_bytes(random_buf) as f32 / (u32::MAX - 1) as f32)
166}
167
168/// Message ID that cannot be 0 or u16::MAX
169#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
170struct MessageId(u16);
171
172impl MessageId {
173    /// Tries to construct a new random Message ID
174    ///
175    /// ID = 0x0000 and ID = 0xFFFF won't get generated
176    // TODO I read somewhere that all 0s and all 1s have special meaning, but I couldn't find it again.
177    fn try_new<RNG: rng::Read>(rng: &mut RNG) -> Result<Self, RNG::Error> {
178        let mut counter_bytes = [0_u8; 2];
179
180        while counter_bytes == [0, 0] || counter_bytes == [0xFF, 0xFF] {
181            rng.read(&mut counter_bytes)?;
182        }
183
184        Ok(Self(LittleEndian::read_u16(&counter_bytes)))
185    }
186
187    /// Calculates and returns the next Message ID
188    fn next(&mut self) -> u16 {
189        self.0 += 1;
190
191        if self.0 == u16::MAX || self.0 == 0 {
192            self.0 = 1;
193        }
194
195        self.0
196    }
197}
198
199#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
200pub(crate) struct MessageIdentification {
201    pub(crate) id: u16,
202    pub(crate) token: Token,
203}
204
205#[derive(Debug)]
206pub(crate) struct MessageBuffer<const BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE }> {
207    buffer: [u8; BUFFER_SIZE],
208    length: usize,
209}
210
211impl<const BUFFER_SIZE: usize> Default for MessageBuffer<BUFFER_SIZE> {
212    fn default() -> Self {
213        Self {
214            buffer: [0_u8; BUFFER_SIZE],
215            length: 0,
216        }
217    }
218}
219
220impl<const BUFFER_SIZE: usize> MessageBuffer<BUFFER_SIZE> {
221    /// Copies the given Encoded Message to the internal buffer
222    fn replace_with<E>(&mut self, message: EncodedMessage<'_>) -> Result<(), Error<E>> {
223        if message.data.len() > self.buffer.len() {
224            return Err(Error::OutOfMemory);
225        }
226
227        self.length = message.data.len();
228
229        self.buffer[..self.length].copy_from_slice(message.data);
230
231        Ok(())
232    }
233
234    fn encode<const OPTION_COUNT: usize, E>(
235        &mut self,
236        message: Message<OPTION_COUNT>,
237    ) -> Result<(), Error<E>> {
238        match message.encode(&mut self.buffer) {
239            Ok(encoded) => {
240                self.length = encoded.message_length();
241                Ok(())
242            }
243            Err(e) => {
244                // We do not know when and how the encoding failed, so we need to invalidate the
245                // internal buffer
246                self.length = 0;
247                Err(Error::MessageError(e))
248            }
249        }
250    }
251
252    /// Returns the stored bytes as [`EncodedMessage`]
253    pub fn message(&self) -> Option<EncodedMessage<'_>> {
254        if self.length > 0 {
255            Some(EncodedMessage::try_new(&self.buffer[..self.length]).unwrap())
256        } else {
257            None
258        }
259    }
260}
261
262/// Helper type to conveniently handle sending and receiving on the `UdpClientStack` + `UdpSocket`
263pub(crate) struct Connection<'a, UDP: UdpClientStack> {
264    client_stack: &'a mut UDP,
265    link: &'a mut ConnectionLink<UDP::UdpSocket>,
266}
267
268impl<'a, UDP: UdpClientStack> Connection<'a, UDP> {
269    /// Helper method to block on a `UdpClientStack::send` and map the error appropriately
270    fn send(&mut self, packet: &[u8]) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
271        nb::block!(self.client_stack.send(&mut self.link.socket, packet)).map_err(Error::Network)
272    }
273    /// Helper method to map `WouldBlock` into `None` and `nb::Error::Other` to `endpoint::Error`
274    fn receive(
275        &mut self,
276        buffer: &mut [u8],
277    ) -> Result<Option<usize>, Error<<UDP as UdpClientStack>::Error>> {
278        match self.client_stack.receive(&mut self.link.socket, buffer) {
279            Ok((message_size, sender)) => {
280                if sender == self.link.addr {
281                    Ok(Some(message_size))
282                } else {
283                    // If the packet is not from our connection,
284                    // we just ignore it
285                    Ok(None)
286                }
287            }
288            Err(nb::Error::WouldBlock) => Ok(None),
289            Err(nb::Error::Other(e)) => Err(Error::Network(e)),
290        }
291    }
292}
293
294#[derive(Debug)]
295pub(crate) struct ConnectionLink<Socket> {
296    socket: Socket,
297    addr: SocketAddr,
298}
299
300/// Endpoint for 1-to-1 connection to another CoAP Endpoint
301#[derive(Debug)]
302pub struct CoapEndpoint<
303    'a,
304    UDP,
305    RNG,
306    CLOCK,
307    const MAX_OPTION_COUNT: usize = { crate::DEFAULT_MAX_OPTION_COUNT },
308    const MAX_OPTION_SIZE: usize = { crate::DEFAULT_MAX_OPTION_SIZE },
309    const INCOMING_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
310    const OUTGOING_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
311    const RECEIVE_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
312> where
313    UDP: UdpClientStack,
314    RNG: rng::Read,
315    CLOCK: Clock,
316{
317    connection_link: Option<ConnectionLink<UDP::UdpSocket>>,
318    message_id_counter: MessageId,
319    rng: RNG,
320    incoming_communication: IncomingCommunication<
321        'a,
322        UDP,
323        CLOCK,
324        INCOMING_BUFFER_SIZE,
325        MAX_OPTION_COUNT,
326        MAX_OPTION_SIZE,
327    >,
328    outgoing_communication: OutgoingCommunication<
329        'a,
330        CLOCK,
331        UDP,
332        OUTGOING_BUFFER_SIZE,
333        MAX_OPTION_COUNT,
334        MAX_OPTION_SIZE,
335    >,
336    // TODO The receive buffer could be on the stack instead which is the same in worst-case
337    // scenarios but better otherwise
338    receive_buffer: &'a mut [u8],
339}
340
341impl<
342        'a,
343        UDP,
344        RNG,
345        CLOCK,
346        const MAX_OPTION_COUNT: usize,
347        const MAX_OPTION_SIZE: usize,
348        const INCOMING_BUFFER_SIZE: usize,
349        const OUTGOING_BUFFER_SIZE: usize,
350        const RECEIVE_BUFFER_SIZE: usize,
351    >
352    CoapEndpoint<
353        'a,
354        UDP,
355        RNG,
356        CLOCK,
357        MAX_OPTION_COUNT,
358        MAX_OPTION_SIZE,
359        INCOMING_BUFFER_SIZE,
360        OUTGOING_BUFFER_SIZE,
361        RECEIVE_BUFFER_SIZE,
362    >
363where
364    UDP: UdpClientStack,
365    RNG: rng::Read,
366    CLOCK: Clock,
367{
368    /// Constructs a new CoAP Endpoint.
369    ///
370    /// Initializes the message_id_counter randomly.
371    pub fn try_new(
372        transmission_parameters: TransmissionParameters,
373        mut rng: RNG,
374        clock: &'a CLOCK,
375        receive_buffer: &'a mut [u8],
376    ) -> Result<Self, Error<<UDP as UdpClientStack>::Error>> {
377        let message_id_counter = MessageId::try_new(&mut rng).map_err(|_| Error::Rng)?;
378
379        let mut slf = Self {
380            connection_link: None,
381            message_id_counter,
382            rng,
383            incoming_communication: IncomingCommunication::new(clock, transmission_parameters),
384            outgoing_communication: OutgoingCommunication::new(clock, transmission_parameters),
385            receive_buffer,
386        };
387
388        if slf.incoming_communication.next_message_id.is_none() {
389            slf.incoming_communication.next_message_id = Some(slf.message_id_counter.next());
390        }
391        let random = get_random(&mut slf.rng).map_err(|_| Error::Rng)?;
392        slf.incoming_communication.next_random = Some(random);
393
394        if slf.outgoing_communication.next_message_id.is_none() {
395            slf.outgoing_communication.next_message_id = Some(slf.message_id_counter.next());
396        }
397        if slf.outgoing_communication.next_token.is_none() {
398            slf.outgoing_communication.next_token = Some(
399                Token::try_new(crate::message::token::TokenLength::Eight, &mut slf.rng)
400                    .map_err(|_| Error::Rng)?,
401            );
402        }
403        let random = get_random(&mut slf.rng).map_err(|_| Error::Rng)?;
404        slf.outgoing_communication.next_random = Some(random);
405
406        Ok(slf)
407    }
408
409    /// Access the [`IncomingCommunication`]
410    pub fn incoming(
411        &mut self,
412    ) -> &mut IncomingCommunication<
413        'a,
414        UDP,
415        CLOCK,
416        INCOMING_BUFFER_SIZE,
417        MAX_OPTION_COUNT,
418        MAX_OPTION_SIZE,
419    > {
420        &mut self.incoming_communication
421    }
422
423    /// Access the [`OutgoingCommunication`]
424    pub fn outgoing(
425        &mut self,
426    ) -> &mut OutgoingCommunication<
427        'a,
428        CLOCK,
429        UDP,
430        OUTGOING_BUFFER_SIZE,
431        MAX_OPTION_COUNT,
432        MAX_OPTION_SIZE,
433    > {
434        &mut self.outgoing_communication
435    }
436
437    /// Handles internal state machine.
438    ///
439    /// Has to be called periodically.
440    /// If any ongoing process is present, this function has to be called at an interval
441    /// less than CoapEndpoint::transmission_parameter.ack_timeout.
442    ///
443    /// This method receives a possibly pending RX message and offers this received message to
444    /// [`IncomingCommunication`] and [`OutgoingCommunication`]. Incoming pings are automatically
445    /// answered with a RST message and a corresponding [`EndpointEvent::Ping`] is generated. If
446    /// neither the incoming nor the outgoing communication path have consumed the RX message, an
447    /// [`EndpointEvent::Unhandled`] is generated.
448    ///
449    /// Unhandled requests are silently ignored because we may be able to handle a possible future
450    /// retransmission. Unhandled NON and CON responses are automatically rejected with a
451    /// RST message because there is no reason to believe that a response which was not consumed
452    /// now will be handled in the future. Unhandled ACKs and RSTs are silently ignored because
453    /// there must never be a response to ACKs and RSTs, see section 4.2. (Messages Transmitted
454    /// Reliably) from RFC 7252: "More generally, recipients of Acknowledgement and Reset messages
455    /// MUST NOT respond with either Acknowledgement or Reset messages."
456    pub fn process(
457        &mut self,
458        client_stack: &mut UDP,
459    ) -> Result<
460        (
461            Result<incoming::IncomingEvent, Error<<UDP as UdpClientStack>::Error>>,
462            Result<outgoing::OutgoingEvent, Error<<UDP as UdpClientStack>::Error>>,
463            EndpointEvent,
464        ),
465        Error<<UDP as UdpClientStack>::Error>,
466    > {
467        if self.incoming_communication.next_message_id.is_none() {
468            self.incoming_communication.next_message_id = Some(self.message_id_counter.next());
469        }
470        if self.incoming_communication.next_random.is_none() {
471            let random = get_random(&mut self.rng).map_err(|_| Error::Rng)?;
472            self.incoming_communication.next_random = Some(random);
473        }
474
475        if self.outgoing_communication.next_message_id.is_none() {
476            self.outgoing_communication.next_message_id = Some(self.message_id_counter.next());
477        }
478        if self.outgoing_communication.next_token.is_none() {
479            self.outgoing_communication.next_token = Some(
480                Token::try_new(crate::message::token::TokenLength::Eight, &mut self.rng)
481                    .map_err(|_| Error::Rng)?,
482            );
483        }
484        if self.outgoing_communication.next_random.is_none() {
485            let random = get_random(&mut self.rng).map_err(|_| Error::Rng)?;
486            self.outgoing_communication.next_random = Some(random);
487        }
488
489        let Some(ref mut link) = self.connection_link else {
490            return Err(Error::NotConnected);
491        };
492        let mut connection = Connection { client_stack, link };
493
494        // It may feel strange to have a mutable event variable here because a later event could
495        // theoretically overwrite an earlier event. But in the endpoint, events are only generated
496        // when the received message is handled and consumed (or not generated at all). Therefore,
497        // we will never overwrite anything else than the `Nothing` event.
498        let mut endpoint_event = EndpointEvent::Nothing;
499
500        let mut received_message = None;
501        if let Some(message_len) = connection.receive(self.receive_buffer)? {
502            match EncodedMessage::try_new(&self.receive_buffer[..message_len]) {
503                Ok(message) => {
504                    match message.check_msg_format::<MAX_OPTION_SIZE>() {
505                        Ok(()) => received_message = Some(message),
506                        Err(e) => {
507                            // Malformed message received, send a RST if it was a NON or CON (never
508                            // respond to ACKs or RSTs)
509                            use message::Type::{Confirmable, NonConfirmable};
510                            if matches!(message.message_type(), Confirmable | NonConfirmable) {
511                                connection.send(&EncodedMessage::rst(message.message_id()))?;
512                            }
513                            endpoint_event = EndpointEvent::MsgFormatErr(e);
514                        }
515                    }
516                }
517                Err(e) => {
518                    // We could not event construct the encoded message, i.e. message header
519                    // incomplete -> no message ID to send a RST
520                    endpoint_event = EndpointEvent::MsgFormatErr(e);
521                }
522            }
523        }
524
525        if let Some(message) = received_message.as_mut() {
526            if message.is_ping().unwrap() {
527                connection.send(&EncodedMessage::rst(message.message_id()))?;
528                received_message = None;
529                endpoint_event = EndpointEvent::Ping;
530            }
531        }
532
533        let outgoing_result = self
534            .outgoing_communication
535            .process_outgoing(&mut connection, &mut received_message);
536
537        let incoming_result = self
538            .incoming_communication
539            .process_incoming(&mut connection, &mut received_message);
540
541        if let Some(message) = received_message {
542            use message::Type::{Confirmable, NonConfirmable};
543            if matches!(message.message_type(), Confirmable | NonConfirmable)
544                && message.is_response().unwrap()
545            {
546                connection.send(&EncodedMessage::rst(message.message_id()))?;
547            }
548            endpoint_event = EndpointEvent::Unhandled(message);
549        }
550
551        Ok((incoming_result, outgoing_result, endpoint_event))
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use core::{cell::RefCell, time::Duration};
558
559    use embedded_hal::prelude::_embedded_hal_blocking_rng_Read;
560    use embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
561    use heapless::Vec;
562    use mockall::predicate::*;
563    use mockall::*;
564
565    use crate::{
566        endpoint::outgoing::OutgoingEvent,
567        message::{
568            codes::RequestCode,
569            encoded_message::EncodedMessage,
570            token::{Token, TokenLength},
571            Message, Type,
572        },
573    };
574
575    use super::{incoming::IncomingEvent, CoapEndpoint, EndpointEvent, TransmissionParameters};
576
577    #[derive(Debug)]
578    struct Random {
579        value: u128,
580    }
581
582    impl embedded_hal::blocking::rng::Read for Random {
583        type Error = std::io::Error;
584
585        fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
586            self.value += 1;
587
588            let buf_len = buf.len();
589
590            buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
591
592            Ok(())
593        }
594    }
595
596    #[derive(Debug)]
597    struct StackError;
598
599    struct Socket;
600
601    mock! {
602        Stack {}
603
604        impl UdpClientStack for Stack {
605            type UdpSocket = Socket;
606            type Error = StackError;
607
608            fn socket(&mut self) -> Result<Socket, StackError>;
609            fn connect(
610                &mut self,
611                socket: &mut Socket,
612                remote: SocketAddr
613            ) -> Result<(), StackError>;
614            fn send(
615                &mut self,
616                socket: &mut Socket,
617                buffer: &[u8]
618            ) -> Result<(), nb::Error<StackError>>;
619            fn receive(
620                &mut self,
621                socket: &mut Socket,
622                buffer: &mut [u8]
623            ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
624            fn close(&mut self, socket: Socket) -> Result<(), StackError>;
625        }
626    }
627
628    #[derive(Debug)]
629    struct MyClock {
630        last_time: RefCell<Duration>,
631        now: RefCell<Duration>,
632    }
633
634    impl embedded_timers::clock::Clock for MyClock {
635        fn try_now(
636            &self,
637        ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
638            *self.last_time.borrow_mut() = *self.now.borrow();
639            Ok(*self.now.borrow())
640        }
641    }
642
643    #[test]
644    fn nothing() {
645        let mut stack = MockStack::default();
646
647        let clock = MyClock {
648            last_time: RefCell::new(Duration::from_secs(0)),
649            now: RefCell::new(Duration::from_secs(1)),
650        };
651
652        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
653
654        let mut endpoint: CoapEndpoint<
655            '_,
656            MockStack,
657            Random,
658            MyClock,
659            8,
660            32,
661            128,
662            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
663        > = CoapEndpoint::try_new(
664            TransmissionParameters::default(),
665            Random { value: 0 },
666            &clock,
667            &mut receive_buffer,
668        )
669        .unwrap();
670
671        stack.expect_socket().once().return_once(|| Ok(Socket));
672        stack.expect_connect().once().return_once(|_, _| Ok(()));
673
674        endpoint
675            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
676            .unwrap();
677
678        stack
679            .expect_receive()
680            .once()
681            .return_once(|_, _| Err(nb::Error::WouldBlock));
682
683        let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
684        assert!(matches!(endpoint_event, EndpointEvent::Nothing));
685    }
686
687    #[test]
688    fn message_format_error() {
689        let mut stack = MockStack::default();
690
691        let clock = MyClock {
692            last_time: RefCell::new(Duration::from_secs(0)),
693            now: RefCell::new(Duration::from_secs(1)),
694        };
695
696        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
697
698        let mut endpoint: CoapEndpoint<
699            '_,
700            MockStack,
701            Random,
702            MyClock,
703            8,
704            32,
705            128,
706            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
707        > = CoapEndpoint::try_new(
708            TransmissionParameters::default(),
709            Random { value: 0 },
710            &clock,
711            &mut receive_buffer,
712        )
713        .unwrap();
714
715        stack.expect_socket().once().return_once(|| Ok(Socket));
716        stack.expect_connect().once().return_once(|_, _| Ok(()));
717
718        endpoint
719            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
720            .unwrap();
721
722        stack.expect_receive().once().return_once(|_, buffer| {
723            let malformed_message = [
724                // This is the CON GET /hello message from the other tests
725                // only changed the payload marker.   v Here 255 would be correct.
726                72, 1, 0, 4, 5, 0, 0, 0, 0, 0, 0, 0, 254, 47, 104, 101, 108, 108, 111,
727            ];
728
729            buffer[..malformed_message.len()].copy_from_slice(&malformed_message);
730
731            Ok((
732                malformed_message.len(),
733                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
734            ))
735        });
736
737        let (incoming_event, _, _) = endpoint.process(&mut stack).unwrap();
738        match incoming_event.unwrap() {
739            IncomingEvent::Request(_con, req) => {
740                let msg: Result<Message<'_, 8>, crate::message::Error> = req.try_into();
741                if !matches!(msg, Err(crate::message::Error::InvalidOption(_))) {
742                    panic!("Expected Err(InvalidOption), got: {msg:?}");
743                }
744            }
745            event => {
746                panic!("Expected IncomingEvent::Request, got: {event:?}");
747            }
748        }
749
750        stack.expect_receive().once().return_once(|_, buffer| {
751            let malformed_message = [
752                // This message is invalid because the Code is 0.05 which does not exist
753                72, 5, 0, 4, 5, 0, 0, 0, 0, 0, 0, 0, 255, 47, 104, 101, 108, 108, 111,
754            ];
755
756            buffer[..malformed_message.len()].copy_from_slice(&malformed_message);
757
758            Ok((
759                malformed_message.len(),
760                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
761            ))
762        });
763        stack.expect_send().once().return_once(move |_, buf| {
764            // The MsgFormatError will be responded automatically with a RST message
765            assert_eq!(buf, crate::message::encoded_message::EncodedMessage::rst(4));
766            Ok(())
767        });
768
769        let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
770        assert!(matches!(endpoint_event, EndpointEvent::MsgFormatErr(_)));
771    }
772
773    #[test]
774    fn ping_pong() {
775        let mut stack = MockStack::default();
776
777        let clock = MyClock {
778            last_time: RefCell::new(Duration::from_secs(0)),
779            now: RefCell::new(Duration::from_secs(1)),
780        };
781
782        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
783
784        let mut endpoint: CoapEndpoint<
785            '_,
786            MockStack,
787            Random,
788            MyClock,
789            8,
790            32,
791            128,
792            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
793        > = CoapEndpoint::try_new(
794            TransmissionParameters::default(),
795            Random { value: 0 },
796            &clock,
797            &mut receive_buffer,
798        )
799        .unwrap();
800
801        stack.expect_socket().once().return_once(|| Ok(Socket));
802        stack.expect_connect().once().return_once(|_, _| Ok(()));
803
804        endpoint
805            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
806            .unwrap();
807
808        let message_id = 1;
809        let ping: Message<0> = Message::new_ping(message_id);
810        let mut pong_buf = [0_u8; 4];
811        let _encoded_pong = EncodedMessage::new_rst(message_id, &mut pong_buf);
812
813        stack.expect_receive().once().return_once(move |_, buffer| {
814            let encoded_message = ping.encode(buffer).unwrap();
815
816            Ok((
817                encoded_message.message_length(),
818                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
819            ))
820        });
821        stack.expect_send().once().return_once(move |_, buf| {
822            assert_eq!(buf, pong_buf);
823            Ok(())
824        });
825
826        let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
827        assert!(matches!(endpoint_event, EndpointEvent::Ping));
828
829        stack
830            .expect_receive()
831            .once()
832            .return_once(|_, _| Err(nb::Error::WouldBlock));
833
834        let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
835        assert!(matches!(endpoint_event, EndpointEvent::Nothing));
836    }
837
838    #[test]
839    fn unhandled() {
840        let mut stack = MockStack::default();
841
842        let clock = MyClock {
843            last_time: RefCell::new(Duration::from_secs(0)),
844            now: RefCell::new(Duration::from_secs(1)),
845        };
846
847        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
848
849        let mut endpoint: CoapEndpoint<
850            '_,
851            MockStack,
852            Random,
853            MyClock,
854            8,
855            32,
856            128,
857            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
858        > = CoapEndpoint::try_new(
859            TransmissionParameters::default(),
860            Random { value: 0 },
861            &clock,
862            &mut receive_buffer,
863        )
864        .unwrap();
865
866        stack.expect_socket().once().return_once(|| Ok(Socket));
867        stack.expect_connect().once().return_once(|_, _| Ok(()));
868
869        endpoint
870            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
871            .unwrap();
872
873        let next_message_id_1 = endpoint.message_id_counter.next();
874        let mut token_1 = Token::default();
875        token_1.length = TokenLength::Eight;
876        endpoint.rng.read(&mut token_1.bytes).unwrap();
877        let request_1: Message<'_> = Message::new(
878            Type::Confirmable,
879            RequestCode::Get.into(),
880            next_message_id_1,
881            token_1,
882            Vec::new(),
883            Some(b"/hello"),
884        );
885
886        let next_message_id_2 = endpoint.message_id_counter.next();
887        let mut token_2 = Token::default();
888        token_2.length = TokenLength::Eight;
889        endpoint.rng.read(&mut token_2.bytes).unwrap();
890        let request_2: Message<'_> = Message::new(
891            Type::Confirmable,
892            RequestCode::Get.into(),
893            next_message_id_2,
894            token_2,
895            Vec::new(),
896            Some(b"/hello"),
897        );
898        let request_test = request_2.clone();
899
900        stack.expect_receive().once().return_once(move |_, buffer| {
901            let encoded_message = request_1.encode(buffer).unwrap();
902
903            Ok((
904                encoded_message.message_length(),
905                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
906            ))
907        });
908
909        let (incoming, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
910        assert!(matches!(incoming.unwrap(), IncomingEvent::Request(true, _)));
911        assert!(matches!(endpoint_event, EndpointEvent::Nothing));
912
913        stack.expect_receive().once().return_once(move |_, buffer| {
914            let encoded_message = request_2.encode(buffer).unwrap();
915
916            Ok((
917                encoded_message.message_length(),
918                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
919            ))
920        });
921
922        let (incoming, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
923        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
924        assert!(matches!(endpoint_event, EndpointEvent::Unhandled(_)));
925
926        match endpoint_event {
927            EndpointEvent::Unhandled(message) => {
928                assert_eq!(Message::try_from(message).unwrap(), request_test)
929            }
930            _ => (),
931        }
932    }
933
934    #[test]
935    fn ignore_message() {
936        let mut stack = MockStack::default();
937
938        let clock = MyClock {
939            last_time: RefCell::new(Duration::from_secs(0)),
940            now: RefCell::new(Duration::from_secs(1)),
941        };
942
943        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
944
945        let mut endpoint: CoapEndpoint<
946            '_,
947            MockStack,
948            Random,
949            MyClock,
950            8,
951            32,
952            128,
953            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
954        > = CoapEndpoint::try_new(
955            TransmissionParameters::default(),
956            Random { value: 0 },
957            &clock,
958            &mut receive_buffer,
959        )
960        .unwrap();
961
962        stack.expect_socket().once().return_once(|| Ok(Socket));
963        stack.expect_connect().once().return_once(|_, _| Ok(()));
964
965        endpoint
966            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
967            .unwrap();
968
969        let next_message_id = endpoint.message_id_counter.next();
970        let mut token = Token::default();
971        token.length = TokenLength::Eight;
972        endpoint.rng.read(&mut token.bytes).unwrap();
973        let request: Message<'_> = Message::new(
974            Type::Confirmable,
975            RequestCode::Get.into(),
976            next_message_id,
977            token,
978            Vec::new(),
979            Some(b"/hello"),
980        );
981
982        stack.expect_receive().once().return_once(move |_, buffer| {
983            let encoded_message = request.encode(buffer).unwrap();
984
985            Ok((
986                encoded_message.message_length(),
987                // We expect only messages from 127.0.0.1                        v
988                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 5683),
989            ))
990        });
991
992        let (incoming, outgoing, endpoint_event) = endpoint.process(&mut stack).unwrap();
993        assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
994        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
995        assert!(matches!(endpoint_event, EndpointEvent::Nothing));
996    }
997}