coap_zero/endpoint/
outgoing.rs

1// Copyright Open Logistics Foundation
2//
3// Licensed under the Open Logistics Foundation License 1.3.
4// For details on the licensing terms, see the LICENSE file.
5// SPDX-License-Identifier: OLFL-1.3
6
7//! The outgoing communication path
8
9use core::marker::PhantomData;
10use core::time::Duration;
11
12use embedded_nal::UdpClientStack;
13use embedded_timers::clock::{Clock, Instant};
14use heapless::Vec;
15
16use crate::message::{
17    codes::{Code, RequestCode, ResponseCode},
18    encoded_message::EncodedMessage,
19    options::{CoapOption, CoapOptionName},
20    token::Token,
21    Message, Type,
22};
23
24use super::{
25    error::Error, Connection, MessageBuffer, MessageIdentification, RetransmissionState,
26    RetransmissionTimeout, TransmissionParameters, Uri,
27};
28
29/// Current state of an outgoing communication
30#[derive(Debug, Clone, Copy)]
31pub enum OutgoingState {
32    /// Ready to send a new request. The contained `bool` stores is the last communication attempt
33    /// (if any) was successful. If so, the respective received response can be obtained with
34    /// [`OutgoingCommunication::response`].
35    Idle(bool),
36    /// Sending a CON request and waiting for an ACK or piggybacked response. The additional
37    /// [`Duration`] is used to start an application-level timeout to wait for a separate response
38    /// if an ACK is received.
39    SendingCon(RetransmissionState, Duration),
40    /// ACK received, waiting for separate response. The [`Instant`] is set to when we should time
41    /// out, i.e. state entry plus the [`Duration`] from [`SendingCon`](OutgoingState::SendingCon).
42    AwaitingResponse(Instant),
43    /// Sending a NON request and waiting for a response. The [`RetransmissionState`] is used to
44    /// limit the number of application-level (user-triggered) retransmissions. As a policy, we
45    /// limit ourselves to the same exponential backoff strategy that is used for CON messages,
46    /// i.e. if the user requests a retransmission but the last exponential backoff timeout has not
47    /// yet expired, the retransmission will be delayed. _If_ the next opportunity to (re)transmit
48    /// shall be used is stored in the `bool` value. This is set to `true` automatically when
49    /// `schedule_non` is called or when the user calls `schedule_retransmission`.
50    ///
51    /// The additional [`Duration`] is used to trigger application-level timeouts based on the
52    /// last (re)transmission instant.
53    SendingNon(RetransmissionState, bool, Duration),
54    /// Sending a ping and waiting for the associated RST message.
55    ///
56    /// From the messaging layer perspective, this is identical to [`OutgoingState::SendingNon`]
57    /// (waiting for a matching response without automatic retransmissions), its documentation
58    /// applies here, too.
59    SendingPing(RetransmissionState, bool, Duration),
60    /// Sending a CON notification and waiting for an ACK
61    SendingNotificationCon(RetransmissionState),
62    /// Sending a NON notification
63    SendingNotificationNon,
64}
65
66/// What has happened in the [`OutgoingCommunication`] path when calling
67/// [`CoapEndpoint::process`](super::CoapEndpoint::process)
68///
69/// This type is annotated with `must_use` because some events require user interaction to avoid
70/// getting stuck in specific states. See the documentation for the specific events for more
71/// information.
72#[derive(Debug)]
73pub enum OutgoingEvent<'a> {
74    /// No _real_ event happened. This pseudo-event is included as a variant here to allow concise
75    /// event handling on the user side.
76    Nothing,
77    /// A CON message has been sent
78    SendCon,
79    /// A NON message has been sent
80    SendNon,
81    /// A ping has been sent
82    SendPing,
83    /// We have received an ACK to our request. This brings us into the
84    /// [`OutgoingState::AwaitingResponse`] state.
85    AckReceived,
86    /// We got a response to our request and consider the whole communication as successful. The
87    /// response is included in the event. Afterwards, the `OutgoingState` will be [`Idle(true)`]
88    /// which allows to obtain the response via `message` for later use.
89    Success(EncodedMessage<'a>),
90    /// The communication attempt timed out.
91    Timeout,
92    /// We got a piggybacked response but the token does not match our request.
93    PiggybackedWrongToken,
94    /// We got a RST message. This cancels the ongoing request and brings us back to
95    /// [`Idle(false)`].
96    ResetReceived,
97    /// We have received a response again which we have received before. If it is a separate CON
98    /// response, the ACK is sent again automatically.
99    DuplicatedResponse,
100    /// The notification message has been sent. If it was a CON message, we still wait for the ACK
101    /// or RST. If it was a NON message, this includes a state transition into
102    /// [`OutgoingState::Idle`].
103    SendNotification,
104    /// A CON notification message has been acked. This includes a state transition into
105    /// [`OutgoingState::Idle`].
106    NotificationAck,
107    /// The (CON or NON) notification message has been responded to with a RST. This signifies that
108    /// the observer/receiver is not interested in further notifications about this resource. It
109    /// should be removed from the list of registered observers for this resource.
110    NotificationRst(Token),
111    /// Transmitting a CON notification (with retransmissions) has timed out.
112    NotificationTimeout,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
116enum LastOutgoing {
117    Nothing,
118    Request(MessageIdentification),
119    Notification(MessageIdentification),
120}
121
122/// Outgoing communication path. Sends outgoing requests, pings and CoAP Observe notifications and
123/// handles retransmissions and timeouts automatically.
124#[derive(Debug)]
125pub struct OutgoingCommunication<
126    'a,
127    CLOCK,
128    UDP: UdpClientStack,
129    const BUFFER_SIZE: usize,
130    const MAX_OPTION_COUNT: usize,
131    const MAX_OPTION_SIZE: usize,
132> where
133    CLOCK: Clock,
134    UDP: UdpClientStack,
135{
136    /// The internal message_buffer which contains encoded messages
137    ///
138    /// For `OutgoingCommunication`, this contains our outgoing request in all states but the
139    /// OutgoingState::Idle state. In Idle(true), it contains the received response.
140    message_buffer: MessageBuffer<BUFFER_SIZE>,
141    state: OutgoingState,
142    clock: &'a CLOCK,
143    transmission_parameters: TransmissionParameters,
144    // The MessageId and the RNG for generating the Token are part of the endpoint
145    // but we need a MessageId and a Token for scheduling a Message.
146    // Therfore we get them from the Endpoint in its `process` function.
147    pub(super) next_message_id: Option<u16>,
148    pub(super) next_token: Option<Token>,
149    /// Random value between 0 and 1 to initialize the RetransmissionState required to schedule a
150    /// CON message. Since the endpoint owns the RNG, the next random number must be determined in
151    /// advance.
152    pub(super) next_random: Option<f32>,
153    /// The last request we have sent, used for message de-duplication. This is set in `finish`
154    /// when the current request is finished.
155    last_outgoing: LastOutgoing,
156    _udp: PhantomData<UDP>,
157}
158
159impl<
160        'a,
161        CLOCK,
162        UDP,
163        const BUFFER_SIZE: usize,
164        const MAX_OPTION_COUNT: usize,
165        const MAX_OPTION_SIZE: usize,
166    > OutgoingCommunication<'a, CLOCK, UDP, BUFFER_SIZE, MAX_OPTION_COUNT, MAX_OPTION_SIZE>
167where
168    CLOCK: Clock,
169    UDP: UdpClientStack,
170{
171    /// Initializes the outgoing communication path in the [`OutgoingState::Idle(false)`] state so
172    /// it is ready to send requests.
173    pub fn new(clock: &'a CLOCK, transmission_parameters: TransmissionParameters) -> Self {
174        Self {
175            message_buffer: MessageBuffer::default(),
176            state: OutgoingState::Idle(false),
177            clock,
178            transmission_parameters,
179            next_message_id: None,
180            next_token: None,
181            next_random: None,
182            last_outgoing: LastOutgoing::Nothing,
183            _udp: PhantomData,
184        }
185    }
186
187    /// Returns the current state of the `OutgoingCommunication`. Depending on the state, the user
188    /// _must_ take action to drive the state forward, see the documentation to the states in
189    /// [`OutgoingState`].
190    pub fn state(&self) -> OutgoingState {
191        self.state
192    }
193
194    /// Resets the internal state machine
195    pub fn reset(&mut self) {
196        *self = Self::new(self.clock, self.transmission_parameters);
197    }
198
199    /// Helper method to go back to Idle state
200    ///
201    /// 1. If a response is given, this response is stored in the internal message buffer and we go
202    ///    to Idle(true). Otherwise, we go to Idle(false).
203    /// 2. The current request is stored in self.last_outgoing. This may not be done before because
204    ///    the de-duplication code would de-duplicate the current request then.
205    fn finish_request(
206        &mut self,
207        response: Option<EncodedMessage>,
208    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
209        self.last_outgoing = LastOutgoing::Request(MessageIdentification {
210            id: self.message_buffer.message().unwrap().message_id(),
211            token: self.message_buffer.message().unwrap().token().unwrap(),
212        });
213        if let Some(response) = response {
214            self.message_buffer.replace_with(response)?;
215            self.state = OutgoingState::Idle(true);
216        } else {
217            self.state = OutgoingState::Idle(false);
218        }
219        Ok(())
220    }
221
222    /// Like [`finish_request`] but for finishing notifications
223    ///
224    /// The current notification is stored in self.last_outgoing and we go to Idle(false).
225    fn finish_notification(&mut self) {
226        self.last_outgoing = LastOutgoing::Notification(MessageIdentification {
227            id: self.message_buffer.message().unwrap().message_id(),
228            token: self.message_buffer.message().unwrap().token().unwrap(),
229        });
230        self.state = OutgoingState::Idle(false);
231    }
232
233    /// Returns the last response which is the same that was returned in the `Success` event
234    /// before. Therefore, usage of this method is only required if the response should be used
235    /// later (the message from the `Success` event can probably not be stored due to lifetime
236    /// requirements).
237    ///
238    /// This method is only available in [`Idle(true)`] and will return [`Error::Forbidden`]
239    /// otherwise.
240    pub fn response(&self) -> Result<EncodedMessage, Error<<UDP as UdpClientStack>::Error>> {
241        if let OutgoingState::Idle(true) = self.state {
242            Ok(self.message_buffer.message().unwrap())
243        } else {
244            Err(Error::Forbidden)
245        }
246    }
247
248    /// Schedules a non-confirmable request to be sent
249    ///
250    /// The message is constructed with the given details (`message_type`, `code`, `options` and
251    /// `payload`). The `timeout` is used for an application-level timeout which is used if no
252    /// further retransmissions are tried.
253    pub fn schedule_non(
254        &mut self,
255        code: RequestCode,
256        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
257        payload: Option<&[u8]>,
258        timeout: Duration,
259    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
260        self.encode_request(Type::NonConfirmable, code, options, payload)?;
261
262        let retransmission_state = RetransmissionState::new(
263            self.transmission_parameters,
264            self.next_random.take().unwrap(),
265        );
266        self.state = OutgoingState::SendingNon(retransmission_state, true, timeout);
267
268        Ok(())
269    }
270
271    /// Schedules a confirmable request to be sent
272    ///
273    /// The `timeout` is used for an application-level timeout which is started as soon as an ACK
274    /// is received (in case of piggybacked responses, we succeed immediately, so the timeout is
275    /// not needed).
276    pub fn schedule_con(
277        &mut self,
278        code: RequestCode,
279        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
280        payload: Option<&[u8]>,
281        timeout: Duration,
282    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
283        self.encode_request(Type::Confirmable, code, options, payload)?;
284        let retransmission_state = RetransmissionState::new(
285            self.transmission_parameters,
286            self.next_random.take().unwrap(),
287        );
288        self.state = OutgoingState::SendingCon(retransmission_state, timeout);
289        Ok(())
290    }
291
292    /// Encodes a request into `self.message_buffer`
293    ///
294    /// This consumes `self.next_token` if successful.
295    fn encode_request(
296        &mut self,
297        typ: Type,
298        code: RequestCode,
299        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
300        payload: Option<&[u8]>,
301    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
302        // Do not take() the token yet, only after we have successfully encoded the message
303        let token = self.next_token.unwrap();
304
305        self.encode_message(typ, code.into(), token, options, payload)?;
306
307        self.next_token = None;
308
309        Ok(())
310    }
311
312    /// Schedules a CoAP observe notification to be sent
313    ///
314    /// The `options` must already contain the Observe option because the sequence number
315    /// can not be determined in this method.
316    pub fn schedule_notification(
317        &mut self,
318        confirmable: bool,
319        code: ResponseCode,
320        token: Token,
321        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
322        payload: Option<&[u8]>,
323    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
324        let typ = if confirmable {
325            Type::Confirmable
326        } else {
327            Type::NonConfirmable
328        };
329
330        self.encode_message(typ, code.into(), token, options, payload)?;
331
332        if confirmable {
333            let retransmission_state = RetransmissionState::new(
334                self.transmission_parameters,
335                self.next_random.take().unwrap(),
336            );
337            self.state = OutgoingState::SendingNotificationCon(retransmission_state);
338        } else {
339            self.state = OutgoingState::SendingNotificationNon;
340        }
341
342        Ok(())
343    }
344
345    /// Encodes a message into `self.message_buffer`, may be used for requests and notifications
346    ///
347    /// This consumes `self.next_message_id` if successful.
348    fn encode_message(
349        &mut self,
350        typ: Type,
351        code: Code,
352        token: Token,
353        options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
354        payload: Option<&[u8]>,
355    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
356        if !matches!(self.state, OutgoingState::Idle(_)) {
357            return Err(Error::Busy);
358        }
359        // Do not take() the next values yet, only after we have successfully encoded the message
360        let id = self.next_message_id.unwrap();
361
362        let message: Message<MAX_OPTION_COUNT> =
363            Message::new(typ, code, id, token, options, payload);
364
365        self.message_buffer.encode(message)?;
366
367        self.next_message_id = None;
368
369        Ok(())
370    }
371
372    /// Schedules a ping to be sent
373    pub fn schedule_ping(
374        &mut self,
375        timeout: Duration,
376    ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
377        if !matches!(self.state, OutgoingState::Idle(_)) {
378            return Err(Error::Busy);
379        }
380
381        let message = Message::<0>::new_ping(self.next_message_id.unwrap());
382        self.message_buffer.encode(message)?;
383        self.next_message_id = None;
384
385        let retransmission_state = RetransmissionState::new(
386            self.transmission_parameters,
387            self.next_random.take().unwrap(),
388        );
389
390        self.state = OutgoingState::SendingPing(retransmission_state, true, timeout);
391
392        Ok(())
393    }
394
395    /// Cancels the currently ongoing outgoing communication
396    ///
397    /// This brings us back into the [`OutgoingState::Idle(false)`] state. Also, this sets up
398    /// message de-duplication normally. This has the effect that if we receive a CON response for
399    /// a cancelled request, an ACK will be automatically sent. This may look surprising but:
400    /// 1. this behavior is the same as normally receiving+acknowledging the response and just
401    ///    ignoring the response afterwards (which would be valid) and
402    /// 2. this enables the other endpoint to stop retransmitting its CON response.
403    ///
404    /// If the currently ongoing communication is a notification, we will additionally be able to
405    /// recognize corresponding RST messages and generate [`OutgoingEvent::NotificationRst`]
406    /// events.
407    ///
408    /// This method is available in all states but the `Idle` state. If in `Idle` state, it returns
409    /// [`Error::Forbidden`].
410    pub fn cancel(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
411        match self.state {
412            OutgoingState::Idle(_) => Err(Error::Forbidden),
413            OutgoingState::SendingNotificationCon(_) | OutgoingState::SendingNotificationNon => {
414                self.finish_notification();
415                Ok(())
416            }
417            _ => {
418                self.finish_request(None)?;
419                Ok(())
420            }
421        }
422    }
423
424    pub(crate) fn process_outgoing(
425        &mut self,
426        connection: &mut Connection<UDP>,
427        received: &mut Option<EncodedMessage>,
428    ) -> Result<OutgoingEvent, Error<<UDP as UdpClientStack>::Error>> {
429        use OutgoingEvent::*;
430        use OutgoingState::*;
431
432        // Whenever we handle the message, we need to return the event immediately to not overwrite
433        // it with a possible later (re)transmission event. See IncomingCommunication for a longer
434        // explanation.
435        //
436        // To not duplicate the message de-duplication code in all states, de-duplication is
437        // handled first.
438
439        if let OutgoingState::Idle(_) = self.state {
440            use LastOutgoing::{Notification, Request};
441            if let (Some(message), Request(last_request)) = (&received, self.last_outgoing) {
442                // There are two types of messages which we have to de-duplicate:
443                // 1. Responses identified by their token ...
444                if message.is_response().unwrap() && last_request.token == message.token().unwrap()
445                {
446                    if message.message_type() == Type::Confirmable {
447                        connection.send(&EncodedMessage::ack(message.message_id()))?;
448                    }
449                    received.take();
450                    return Ok(OutgoingEvent::DuplicatedResponse);
451                }
452                // 2. ... and ACKs/RSTs identified by their message id
453                if matches!(message.message_type(), Type::Acknowledgement | Type::Reset)
454                    && message.message_id() == last_request.id
455                {
456                    received.take();
457                    return Ok(OutgoingEvent::DuplicatedResponse);
458                }
459            }
460            if let (Some(message), Notification(last_notify)) = (&received, self.last_outgoing) {
461                if matches!(message.message_type(), Type::Acknowledgement)
462                    && message.message_id() == last_notify.id
463                {
464                    received.take();
465                    return Ok(OutgoingEvent::DuplicatedResponse);
466                }
467                // For notifications we have sent, we especially have to take care of receiving
468                // RSTs in response to NON notifications to let the user know that the
469                // observer/receiver lost interest in the observed resource.
470                if matches!(message.message_type(), Type::Reset)
471                    && message.message_id() == last_notify.id
472                {
473                    received.take();
474                    return Ok(OutgoingEvent::NotificationRst(last_notify.token));
475                }
476            }
477        }
478
479        match self.state {
480            Idle(_) => (), // Probably nothing to do here
481            SendingCon(ref mut retransmission_state, duration) => {
482                if let Some(message) = received {
483                    let out_message = self.message_buffer.message().unwrap();
484
485                    if message.message_type() == Type::Acknowledgement
486                        && message.message_id() == out_message.message_id()
487                    {
488                        if message.is_response().unwrap() {
489                            if message.token().unwrap() == out_message.token().unwrap() {
490                                // piggybacked response
491                                self.finish_request(Some(received.take().unwrap()))?;
492                                return Ok(Success(self.message_buffer.message().unwrap()));
493                            } else {
494                                // No state transition because we could still receive the correct response
495                                return Ok(PiggybackedWrongToken);
496                            }
497                        } else {
498                            // ACK
499                            let end = self.clock.try_now().unwrap() + duration;
500                            self.state = AwaitingResponse(end);
501                            return Ok(AckReceived);
502                        }
503                    } else if message.is_response().unwrap()
504                        && message.token().unwrap() == out_message.token().unwrap()
505                    {
506                        // separate response arriving earlier than ACK (which may have a
507                        // transmission failure)
508                        if message.message_type() == Type::Confirmable {
509                            connection.send(&EncodedMessage::ack(message.message_id()))?;
510                        }
511                        self.finish_request(Some(received.take().unwrap()))?;
512                        return Ok(Success(self.message_buffer.message().unwrap()));
513                    } else if message.message_type() == Type::Reset
514                        && message.message_id() == out_message.message_id()
515                    {
516                        self.finish_request(None)?;
517                        return Ok(ResetReceived);
518                    }
519                }
520                match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
521                    Ok(true) => {
522                        connection.send(self.message_buffer.message().unwrap().data)?;
523                        return Ok(SendCon);
524                    }
525                    Ok(false) => {}
526                    Err(RetransmissionTimeout) => {
527                        self.finish_request(None)?;
528                        return Ok(Timeout);
529                    }
530                }
531            }
532            AwaitingResponse(end) => {
533                if let Some(message) = received {
534                    let out_message = self.message_buffer.message().unwrap();
535                    if message.is_response().unwrap()
536                        && message.token().unwrap() == out_message.token().unwrap()
537                    {
538                        // separate response
539                        if message.message_type() == Type::Confirmable {
540                            connection.send(&EncodedMessage::ack(message.message_id()))?;
541                        }
542                        self.finish_request(Some(received.take().unwrap()))?;
543                        return Ok(Success(self.message_buffer.message().unwrap()));
544                    }
545                }
546                if self.clock.try_now().unwrap() > end {
547                    self.finish_request(None)?;
548                    return Ok(Timeout);
549                }
550            }
551            // Retransmissions are handled the same way for pings and NON messages, so we handle
552            // both cases together. We only have to handle responses and RST messages differently.
553            SendingNon(ref mut retransmission_state, ref mut retransmit_requested, timeout)
554            | SendingPing(ref mut retransmission_state, ref mut retransmit_requested, timeout) => {
555                if let Some(message) = received {
556                    let out_message = self.message_buffer.message().unwrap();
557
558                    if message.message_type() == Type::Reset
559                        && message.message_id() == out_message.message_id()
560                    {
561                        if matches!(self.state, SendingNon(..)) {
562                            self.finish_request(None)?;
563                            return Ok(ResetReceived);
564                        } else {
565                            self.finish_request(Some(received.take().unwrap()))?;
566                            return Ok(Success(self.message_buffer.message().unwrap()));
567                        }
568                    } else if message.is_response().unwrap()
569                        && message.token().unwrap() == out_message.token().unwrap()
570                    {
571                        if matches!(self.state, SendingNon(..)) {
572                            if message.message_type() == Type::Confirmable {
573                                let mut ack_buf = [0_u8; 4];
574                                let _ack =
575                                    EncodedMessage::new_ack(message.message_id(), &mut ack_buf);
576                                connection.send(&ack_buf)?
577                            }
578                            self.finish_request(Some(received.take().unwrap()))?;
579                            return Ok(Success(self.message_buffer.message().unwrap()));
580                        } else {
581                            // Without this return, the `matches!` match above to distinguish
582                            // between SendingNon and SendingPing is forbidden because self is
583                            // already mutably borrowed. With this return, the borrow checker is
584                            // satisfied.
585                            //
586                            // Practically, this will just delay a (re)transmission attempt for
587                            // pings whenever a response with an empty token is received here, i.e.
588                            // this will have no practical relevance.
589                            return Ok(Nothing);
590                        }
591                    }
592                }
593                if *retransmit_requested {
594                    match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
595                        Ok(true) => {
596                            *retransmit_requested = false;
597                            connection.send(self.message_buffer.message().unwrap().data)?;
598                            if matches!(self.state, SendingNon(..)) {
599                                return Ok(OutgoingEvent::SendNon);
600                            } else {
601                                return Ok(OutgoingEvent::SendPing);
602                            }
603                        }
604                        Ok(false) => {}
605                        Err(RetransmissionTimeout) => {
606                            self.finish_request(None)?;
607                            return Ok(Timeout);
608                        }
609                    }
610                }
611                let waiting_time = self.clock.try_now().unwrap()
612                    - retransmission_state.last_transmission_instant.unwrap();
613                if waiting_time > timeout {
614                    self.finish_request(None)?;
615                    return Ok(Timeout);
616                }
617            }
618            SendingNotificationCon(ref mut retransmission_state) => {
619                if let Some(message) = received {
620                    let out_message = self.message_buffer.message().unwrap();
621
622                    if message.message_type() == Type::Acknowledgement
623                        && message.message_id() == out_message.message_id()
624                    {
625                        self.finish_notification();
626                        return Ok(NotificationAck);
627                    } else if message.message_type() == Type::Reset
628                        && message.message_id() == out_message.message_id()
629                    {
630                        let token = out_message.token().unwrap();
631                        self.finish_notification();
632                        return Ok(NotificationRst(token));
633                    }
634                }
635                match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
636                    Ok(true) => {
637                        connection.send(self.message_buffer.message().unwrap().data)?;
638                        return Ok(SendNotification);
639                    }
640                    Ok(false) => {}
641                    Err(RetransmissionTimeout) => {
642                        self.finish_notification();
643                        return Ok(NotificationTimeout);
644                    }
645                }
646            }
647            SendingNotificationNon => {
648                connection.send(self.message_buffer.message().unwrap().data)?;
649                self.finish_notification();
650                return Ok(SendNotification);
651            }
652        }
653
654        Ok(Nothing)
655    }
656
657    /// Associated helper method to easily create the appropriate request options from an [`Uri`]
658    ///
659    /// Takes the request `url` and a list of additional options. The `url` is parsed into
660    /// [`UriPath`](CoapOptionName::UriPath)s and [`UriQuery`](CoapOptionName::UriQuery)s and
661    /// merged with the additional options.
662    // TODO Maybe, we should remove the parse_options helper method?
663    // From the spec:
664    //    "Implementation Note:  Unfortunately, over time, the URI format has
665    //  acquired significant complexity.  [...] Percent-encoding is
666    //  crucial for data transparency but may lead to unusual results such
667    //  as a slash character in a path component."
668    //  -> I guess we can not just take the path and split at '/'. I see that this method is
669    //  helpful but it is hard enough to get this right.
670    pub fn parse_options<'options>(
671        &self,
672        url: &'options str,
673        additional_options: Vec<CoapOption<'options>, MAX_OPTION_COUNT>,
674    ) -> Result<Vec<CoapOption<'options>, MAX_OPTION_COUNT>, Error<<UDP as UdpClientStack>::Error>>
675    {
676        // Parse the URI into the appropriate Options:
677
678        // Options are supposed to be sorted, so we sort them into the SortedLinkedList.
679        // We get the path segments into a vec before, because we need to insert them in reverse
680        // order into the linked list. This results in a rather large amount of memory allocs, but
681        // they are only on the stack, so it is kind of fine.
682        // Unfortunately, we can't get a slice out of that, so we move that into a Vec then.
683        let mut options = Vec::<CoapOption<'_>, MAX_OPTION_COUNT>::new();
684
685        let uri = Uri::new(url).unwrap();
686        let path = uri.path_str();
687
688        for path in path.split('/') {
689            if !path.is_empty() {
690                // TODO Error
691                options
692                    .push(CoapOption {
693                        name: CoapOptionName::UriPath,
694                        value: path.as_bytes(),
695                    })
696                    .map_err(|_| Error::OutOfMemory)?;
697            }
698        }
699
700        if let Some(queries) = uri.query_str() {
701            for query in queries.split('&') {
702                options
703                    .push(CoapOption {
704                        name: CoapOptionName::UriQuery,
705                        // TODO Error
706                        value: query.as_bytes(),
707                    })
708                    .map_err(|_| Error::OutOfMemory)?;
709            }
710        };
711
712        for option in additional_options {
713            options.push(option).map_err(|_| Error::OutOfMemory)?;
714        }
715
716        Ok(options)
717    }
718}
719
720#[cfg(test)]
721mod tests {
722    use core::{cell::RefCell, time::Duration};
723
724    use embedded_nal::{SocketAddr, UdpClientStack};
725    use heapless::Vec;
726    use mockall::predicate::*;
727    use mockall::*;
728
729    use crate::{
730        endpoint::outgoing::{OutgoingEvent, OutgoingState},
731        message::{
732            codes::{RequestCode, SuccessCode},
733            Message, Type,
734        },
735    };
736
737    use super::{super::CoapEndpoint, TransmissionParameters};
738
739    #[derive(Debug)]
740    struct Random {
741        value: u128,
742    }
743
744    impl embedded_hal::blocking::rng::Read for Random {
745        type Error = std::io::Error;
746
747        fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
748            self.value += 1;
749
750            let buf_len = buf.len();
751
752            buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
753
754            Ok(())
755        }
756    }
757
758    #[derive(Debug)]
759    struct StackError;
760
761    struct Socket;
762
763    mock! {
764        Stack {}
765
766        impl UdpClientStack for Stack {
767            type UdpSocket = Socket;
768            type Error = StackError;
769
770            fn socket(&mut self) -> Result<Socket, StackError>;
771            fn connect(
772                &mut self,
773                socket: &mut Socket,
774                remote: SocketAddr
775            ) -> Result<(), StackError>;
776            fn send(
777                &mut self,
778                socket: &mut Socket,
779                buffer: &[u8]
780            ) -> Result<(), nb::Error<StackError>>;
781            fn receive(
782                &mut self,
783                socket: &mut Socket,
784                buffer: &mut [u8]
785            ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
786            fn close(&mut self, socket: Socket) -> Result<(), StackError>;
787        }
788    }
789
790    #[derive(Debug)]
791    struct MyClock {
792        last_time: RefCell<Duration>,
793        now: RefCell<Duration>,
794    }
795
796    impl embedded_timers::clock::Clock for MyClock {
797        fn try_now(
798            &self,
799        ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
800            *self.last_time.borrow_mut() = *self.now.borrow();
801            Ok(*self.now.borrow())
802        }
803    }
804
805    impl MyClock {
806        fn advance(&self, step: Duration) {
807            *self.now.borrow_mut() = *self.last_time.borrow() + step;
808        }
809    }
810
811    #[test]
812    fn send_ping() {
813        let mut stack = MockStack::default();
814
815        let clock = MyClock {
816            last_time: RefCell::new(Duration::from_secs(0)),
817            now: RefCell::new(Duration::from_secs(1)),
818        };
819
820        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
821
822        let mut endpoint: CoapEndpoint<
823            '_,
824            MockStack,
825            Random,
826            MyClock,
827            8,
828            32,
829            128,
830            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
831        > = CoapEndpoint::try_new(
832            TransmissionParameters::default(),
833            Random { value: 0 },
834            &clock,
835            &mut receive_buffer,
836        )
837        .unwrap();
838
839        stack.expect_socket().once().return_once(|| Ok(Socket));
840        stack.expect_connect().once().return_once(|_, _| Ok(()));
841
842        endpoint
843            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
844            .unwrap();
845
846        stack
847            .expect_receive()
848            .once()
849            .return_once(|_, _| Err(nb::Error::WouldBlock));
850
851        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
852        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
853        assert!(matches!(
854            endpoint.outgoing_communication.state,
855            OutgoingState::Idle(_)
856        ));
857
858        let message_id = endpoint.outgoing_communication.next_message_id.unwrap();
859        let ping_buf = crate::message::encoded_message::EncodedMessage::ping(message_id);
860        let pong: Message<'_> = Message::new_rst(message_id);
861
862        stack
863            .expect_receive()
864            .once()
865            .return_once(move |_, _| Err(nb::Error::WouldBlock));
866        stack.expect_send().once().return_once(move |_, buf| {
867            assert_eq!(buf, ping_buf);
868            Ok(())
869        });
870
871        endpoint
872            .outgoing()
873            .schedule_ping(Duration::from_secs(1))
874            .unwrap();
875
876        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
877        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendPing));
878        assert!(matches!(
879            endpoint.outgoing_communication.state,
880            OutgoingState::SendingPing(..)
881        ));
882
883        stack.expect_receive().once().return_once(move |_, buffer| {
884            let encoded_message = pong.encode(buffer).unwrap();
885
886            Ok((
887                encoded_message.message_length(),
888                "127.0.0.1:5683".parse().unwrap(),
889            ))
890        });
891
892        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
893        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Success(_)));
894        assert!(matches!(
895            endpoint.outgoing_communication.state,
896            OutgoingState::Idle(true)
897        ));
898    }
899
900    #[test]
901    fn send_con_get_piggybacked() {
902        let mut stack = MockStack::default();
903
904        let clock = MyClock {
905            last_time: RefCell::new(Duration::from_secs(0)),
906            now: RefCell::new(Duration::from_secs(1)),
907        };
908
909        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
910
911        let mut endpoint: CoapEndpoint<
912            '_,
913            MockStack,
914            Random,
915            MyClock,
916            8,
917            32,
918            128,
919            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
920        > = CoapEndpoint::try_new(
921            TransmissionParameters::default(),
922            Random { value: 0 },
923            &clock,
924            &mut receive_buffer,
925        )
926        .unwrap();
927
928        stack.expect_socket().once().return_once(|| Ok(Socket));
929        stack.expect_connect().once().return_once(|_, _| Ok(()));
930
931        endpoint
932            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
933            .unwrap();
934
935        stack
936            .expect_receive()
937            .once()
938            .return_once(|_, _| Err(nb::Error::WouldBlock));
939
940        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
941        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
942        assert!(matches!(
943            endpoint.outgoing_communication.state,
944            OutgoingState::Idle(_)
945        ));
946
947        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
948        let request_token = endpoint.outgoing_communication.next_token.unwrap();
949        let request: Message<'_> = Message::new(
950            Type::Confirmable,
951            RequestCode::Get.into(),
952            request_id,
953            request_token,
954            Vec::new(),
955            Some(b"/hello"),
956        );
957        let mut request_buf = [0_u8; 32];
958        let request_length = request.encode(&mut request_buf).unwrap().message_length();
959        let response: Message<'_> = Message::new(
960            Type::Acknowledgement,
961            SuccessCode::Content.into(),
962            request_id,
963            request_token,
964            Vec::new(),
965            Some(b"world"),
966        );
967        let mut response_buf = [0_u8; 32];
968        let response_length = response.encode(&mut response_buf).unwrap().message_length();
969
970        endpoint
971            .outgoing()
972            .schedule_con(
973                RequestCode::Get,
974                Vec::new(),
975                Some(b"/hello"),
976                Duration::from_secs(1),
977            )
978            .unwrap();
979
980        stack
981            .expect_receive()
982            .once()
983            .return_once(|_, _| Err(nb::Error::WouldBlock));
984        stack.expect_send().once().return_once(move |_, buffer| {
985            assert_eq!(buffer, &request_buf[..request_length]);
986
987            Ok(())
988        });
989
990        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
991        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
992        assert!(matches!(
993            endpoint.outgoing_communication.state,
994            OutgoingState::SendingCon(..)
995        ));
996
997        stack.expect_receive().once().return_once(move |_, buffer| {
998            buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
999
1000            Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1001        });
1002
1003        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1004        let outgoing = outgoing.unwrap();
1005        assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1006        match outgoing {
1007            OutgoingEvent::Success(message) => {
1008                assert_eq!(message, response.encode(&mut response_buf).unwrap());
1009            }
1010            _ => (),
1011        }
1012        assert!(matches!(
1013            endpoint.outgoing_communication.state,
1014            OutgoingState::Idle(_)
1015        ));
1016    }
1017
1018    #[test]
1019    fn send_con_get_separate() {
1020        let mut stack = MockStack::default();
1021
1022        let clock = MyClock {
1023            last_time: RefCell::new(Duration::from_secs(0)),
1024            now: RefCell::new(Duration::from_secs(1)),
1025        };
1026
1027        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1028
1029        let mut endpoint: CoapEndpoint<
1030            '_,
1031            MockStack,
1032            Random,
1033            MyClock,
1034            8,
1035            32,
1036            128,
1037            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1038        > = CoapEndpoint::try_new(
1039            TransmissionParameters::default(),
1040            Random { value: 0 },
1041            &clock,
1042            &mut receive_buffer,
1043        )
1044        .unwrap();
1045
1046        stack.expect_socket().once().return_once(|| Ok(Socket));
1047        stack.expect_connect().once().return_once(|_, _| Ok(()));
1048
1049        endpoint
1050            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1051            .unwrap();
1052
1053        stack
1054            .expect_receive()
1055            .once()
1056            .return_once(|_, _| Err(nb::Error::WouldBlock));
1057
1058        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1059        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1060        assert!(matches!(
1061            endpoint.outgoing_communication.state,
1062            OutgoingState::Idle(_)
1063        ));
1064
1065        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1066        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1067        let request: Message<'_> = Message::new(
1068            Type::Confirmable,
1069            RequestCode::Get.into(),
1070            request_id,
1071            request_token,
1072            Vec::new(),
1073            Some(b"/hello"),
1074        );
1075        let mut request_buf = [0_u8; 32];
1076        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1077
1078        let ack: Message<'_> = Message::new_ack(request_id);
1079        let mut ack_buf = [0_u8; 4];
1080        let ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
1081
1082        let response: Message<'_> = Message::new(
1083            Type::Acknowledgement,
1084            SuccessCode::Content.into(),
1085            request_id,
1086            request_token,
1087            Vec::new(),
1088            Some(b"world"),
1089        );
1090        let mut response_buf = [0_u8; 32];
1091        let response_length = response.encode(&mut response_buf).unwrap().message_length();
1092
1093        endpoint
1094            .outgoing()
1095            .schedule_con(
1096                RequestCode::Get,
1097                Vec::new(),
1098                Some(b"/hello"),
1099                Duration::from_secs(1),
1100            )
1101            .unwrap();
1102
1103        stack
1104            .expect_receive()
1105            .once()
1106            .return_once(|_, _| Err(nb::Error::WouldBlock));
1107        stack.expect_send().once().return_once(move |_, buffer| {
1108            assert_eq!(buffer, &request_buf[..request_length]);
1109
1110            Ok(())
1111        });
1112
1113        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1114        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1115        assert!(matches!(
1116            endpoint.outgoing_communication.state,
1117            OutgoingState::SendingCon(..)
1118        ));
1119
1120        stack.expect_receive().once().return_once(move |_, buffer| {
1121            buffer[..ack_length].copy_from_slice(&ack_buf[..ack_length]);
1122
1123            Ok((ack_length, "127.0.0.1:5683".parse().unwrap()))
1124        });
1125
1126        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1127        assert!(matches!(outgoing.unwrap(), OutgoingEvent::AckReceived));
1128        assert!(matches!(
1129            endpoint.outgoing_communication.state,
1130            OutgoingState::AwaitingResponse(..)
1131        ));
1132
1133        stack.expect_receive().once().return_once(move |_, buffer| {
1134            buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1135
1136            Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1137        });
1138
1139        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1140        let outgoing = outgoing.unwrap();
1141        assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1142        match outgoing {
1143            OutgoingEvent::Success(message) => {
1144                assert_eq!(message, response.encode(&mut response_buf).unwrap());
1145            }
1146            _ => (),
1147        }
1148        assert!(matches!(
1149            endpoint.outgoing_communication.state,
1150            OutgoingState::Idle(_)
1151        ));
1152    }
1153
1154    #[test]
1155    fn send_non_get_non() {
1156        let mut stack = MockStack::default();
1157
1158        let clock = MyClock {
1159            last_time: RefCell::new(Duration::from_secs(0)),
1160            now: RefCell::new(Duration::from_secs(1)),
1161        };
1162
1163        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1164
1165        let mut endpoint: CoapEndpoint<
1166            '_,
1167            MockStack,
1168            Random,
1169            MyClock,
1170            8,
1171            32,
1172            128,
1173            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1174        > = CoapEndpoint::try_new(
1175            TransmissionParameters::default(),
1176            Random { value: 0 },
1177            &clock,
1178            &mut receive_buffer,
1179        )
1180        .unwrap();
1181
1182        stack.expect_socket().once().return_once(|| Ok(Socket));
1183        stack.expect_connect().once().return_once(|_, _| Ok(()));
1184
1185        endpoint
1186            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1187            .unwrap();
1188
1189        stack
1190            .expect_receive()
1191            .once()
1192            .return_once(|_, _| Err(nb::Error::WouldBlock));
1193
1194        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1195        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1196        assert!(matches!(
1197            endpoint.outgoing_communication.state,
1198            OutgoingState::Idle(_)
1199        ));
1200
1201        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1202        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1203        let request: Message<'_> = Message::new(
1204            Type::NonConfirmable,
1205            RequestCode::Get.into(),
1206            request_id,
1207            request_token,
1208            Vec::new(),
1209            Some(b"/hello"),
1210        );
1211        let mut request_buf = [0_u8; 32];
1212        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1213
1214        let response: Message<'_> = Message::new(
1215            Type::NonConfirmable,
1216            SuccessCode::Content.into(),
1217            request_id + 1,
1218            request_token,
1219            Vec::new(),
1220            Some(b"world"),
1221        );
1222        let mut response_buf = [0_u8; 32];
1223        let response_length = response.encode(&mut response_buf).unwrap().message_length();
1224
1225        endpoint
1226            .outgoing()
1227            .schedule_non(
1228                RequestCode::Get,
1229                Vec::new(),
1230                Some(b"/hello"),
1231                Duration::from_secs(1),
1232            )
1233            .unwrap();
1234
1235        stack
1236            .expect_receive()
1237            .once()
1238            .return_once(|_, _| Err(nb::Error::WouldBlock));
1239        stack.expect_send().once().return_once(move |_, buffer| {
1240            assert_eq!(buffer, &request_buf[..request_length]);
1241
1242            Ok(())
1243        });
1244
1245        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1246        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
1247        assert!(matches!(
1248            endpoint.outgoing_communication.state,
1249            OutgoingState::SendingNon(..)
1250        ));
1251
1252        stack
1253            .expect_receive()
1254            .once()
1255            .return_once(|_, _| Err(nb::Error::WouldBlock));
1256
1257        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1258        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1259        assert!(matches!(
1260            endpoint.outgoing_communication.state,
1261            OutgoingState::SendingNon(..)
1262        ));
1263
1264        stack.expect_receive().once().return_once(move |_, buffer| {
1265            buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1266
1267            Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1268        });
1269
1270        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1271        let outgoing = outgoing.unwrap();
1272        assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1273        match outgoing {
1274            OutgoingEvent::Success(message) => {
1275                assert_eq!(message, response.encode(&mut response_buf).unwrap());
1276            }
1277            _ => (),
1278        }
1279        assert!(matches!(
1280            endpoint.outgoing_communication.state,
1281            OutgoingState::Idle(_)
1282        ));
1283    }
1284
1285    #[test]
1286    fn send_non_get_con() {
1287        let mut stack = MockStack::default();
1288
1289        let clock = MyClock {
1290            last_time: RefCell::new(Duration::from_secs(0)),
1291            now: RefCell::new(Duration::from_secs(1)),
1292        };
1293
1294        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1295
1296        let mut endpoint: CoapEndpoint<
1297            '_,
1298            MockStack,
1299            Random,
1300            MyClock,
1301            8,
1302            32,
1303            128,
1304            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1305        > = CoapEndpoint::try_new(
1306            TransmissionParameters::default(),
1307            Random { value: 0 },
1308            &clock,
1309            &mut receive_buffer,
1310        )
1311        .unwrap();
1312
1313        stack.expect_socket().once().return_once(|| Ok(Socket));
1314        stack.expect_connect().once().return_once(|_, _| Ok(()));
1315
1316        endpoint
1317            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1318            .unwrap();
1319
1320        stack
1321            .expect_receive()
1322            .once()
1323            .return_once(|_, _| Err(nb::Error::WouldBlock));
1324
1325        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1326        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1327        assert!(matches!(
1328            endpoint.outgoing_communication.state,
1329            OutgoingState::Idle(_)
1330        ));
1331
1332        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1333        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1334        let request: Message<'_> = Message::new(
1335            Type::NonConfirmable,
1336            RequestCode::Get.into(),
1337            request_id,
1338            request_token,
1339            Vec::new(),
1340            Some(b"/hello"),
1341        );
1342        let mut request_buf = [0_u8; 32];
1343        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1344
1345        let response_id = request_id + 1;
1346        let response: Message<'_> = Message::new(
1347            Type::Confirmable,
1348            SuccessCode::Content.into(),
1349            response_id,
1350            request_token,
1351            Vec::new(),
1352            Some(b"world"),
1353        );
1354        let mut response_buf = [0_u8; 32];
1355        let response_length = response.encode(&mut response_buf).unwrap().message_length();
1356
1357        let ack: Message<'_> = Message::new_ack(response_id);
1358        let mut ack_buf = [0_u8; 4];
1359        let _ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
1360
1361        endpoint
1362            .outgoing()
1363            .schedule_non(
1364                RequestCode::Get,
1365                Vec::new(),
1366                Some(b"/hello"),
1367                Duration::from_secs(1),
1368            )
1369            .unwrap();
1370
1371        stack
1372            .expect_receive()
1373            .once()
1374            .return_once(|_, _| Err(nb::Error::WouldBlock));
1375        stack.expect_send().once().return_once(move |_, buffer| {
1376            assert_eq!(buffer, &request_buf[..request_length]);
1377
1378            Ok(())
1379        });
1380
1381        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1382        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
1383        assert!(matches!(
1384            endpoint.outgoing_communication.state,
1385            OutgoingState::SendingNon(..)
1386        ));
1387
1388        stack
1389            .expect_receive()
1390            .once()
1391            .return_once(|_, _| Err(nb::Error::WouldBlock));
1392
1393        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1394        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1395        assert!(matches!(
1396            endpoint.outgoing_communication.state,
1397            OutgoingState::SendingNon(..)
1398        ));
1399
1400        stack.expect_receive().once().return_once(move |_, buffer| {
1401            buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1402
1403            Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1404        });
1405        stack.expect_send().once().return_once(move |_, buffer| {
1406            assert_eq!(buffer, ack_buf);
1407
1408            Ok(())
1409        });
1410
1411        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1412        let outgoing = outgoing.unwrap();
1413        assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1414        match outgoing {
1415            OutgoingEvent::Success(message) => {
1416                assert_eq!(message, response.encode(&mut response_buf).unwrap());
1417            }
1418            _ => (),
1419        }
1420        assert!(matches!(
1421            endpoint.outgoing_communication.state,
1422            OutgoingState::Idle(_)
1423        ));
1424    }
1425
1426    #[test]
1427    fn send_con_get_rst() {
1428        let mut stack = MockStack::default();
1429
1430        let clock = MyClock {
1431            last_time: RefCell::new(Duration::from_secs(0)),
1432            now: RefCell::new(Duration::from_secs(1)),
1433        };
1434
1435        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1436
1437        let mut endpoint: CoapEndpoint<
1438            '_,
1439            MockStack,
1440            Random,
1441            MyClock,
1442            8,
1443            32,
1444            128,
1445            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1446        > = CoapEndpoint::try_new(
1447            TransmissionParameters::default(),
1448            Random { value: 0 },
1449            &clock,
1450            &mut receive_buffer,
1451        )
1452        .unwrap();
1453
1454        stack.expect_socket().once().return_once(|| Ok(Socket));
1455        stack.expect_connect().once().return_once(|_, _| Ok(()));
1456
1457        endpoint
1458            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1459            .unwrap();
1460
1461        stack
1462            .expect_receive()
1463            .once()
1464            .return_once(|_, _| Err(nb::Error::WouldBlock));
1465
1466        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1467        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1468        assert!(matches!(
1469            endpoint.outgoing_communication.state,
1470            OutgoingState::Idle(_)
1471        ));
1472
1473        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1474        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1475        let request: Message<'_> = Message::new(
1476            Type::Confirmable,
1477            RequestCode::Get.into(),
1478            request_id,
1479            request_token,
1480            Vec::new(),
1481            Some(b"/hello"),
1482        );
1483        let mut request_buf = [0_u8; 32];
1484        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1485
1486        let reset: Message<'_> = Message::new_rst(request_id);
1487        let mut reset_buf = [0_u8; 4];
1488        let reset_length = reset.encode(&mut reset_buf).unwrap().message_length();
1489
1490        endpoint
1491            .outgoing()
1492            .schedule_con(
1493                RequestCode::Get,
1494                Vec::new(),
1495                Some(b"/hello"),
1496                Duration::from_secs(1),
1497            )
1498            .unwrap();
1499
1500        stack
1501            .expect_receive()
1502            .once()
1503            .return_once(|_, _| Err(nb::Error::WouldBlock));
1504        stack.expect_send().once().return_once(move |_, buffer| {
1505            assert_eq!(buffer, &request_buf[..request_length]);
1506
1507            Ok(())
1508        });
1509
1510        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1511        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1512        assert!(matches!(
1513            endpoint.outgoing_communication.state,
1514            OutgoingState::SendingCon(..)
1515        ));
1516
1517        stack
1518            .expect_receive()
1519            .once()
1520            .return_once(|_, _| Err(nb::Error::WouldBlock));
1521
1522        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1523        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1524        assert!(matches!(
1525            endpoint.outgoing_communication.state,
1526            OutgoingState::SendingCon(..)
1527        ));
1528
1529        stack.expect_receive().once().return_once(move |_, buffer| {
1530            buffer[..reset_length].copy_from_slice(&reset_buf[..reset_length]);
1531
1532            Ok((reset_length, "127.0.0.1:5683".parse().unwrap()))
1533        });
1534
1535        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1536        let outgoing = outgoing.unwrap();
1537        assert!(matches!(outgoing, OutgoingEvent::ResetReceived));
1538        assert!(matches!(
1539            endpoint.outgoing_communication.state,
1540            OutgoingState::Idle(_)
1541        ));
1542    }
1543
1544    #[test]
1545    fn send_con_get_piggybacked_wrong_token() {
1546        let mut stack = MockStack::default();
1547
1548        let clock = MyClock {
1549            last_time: RefCell::new(Duration::from_secs(0)),
1550            now: RefCell::new(Duration::from_secs(1)),
1551        };
1552
1553        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1554
1555        let mut endpoint: CoapEndpoint<
1556            '_,
1557            MockStack,
1558            Random,
1559            MyClock,
1560            8,
1561            32,
1562            128,
1563            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1564        > = CoapEndpoint::try_new(
1565            TransmissionParameters::default(),
1566            Random { value: 0 },
1567            &clock,
1568            &mut receive_buffer,
1569        )
1570        .unwrap();
1571
1572        stack.expect_socket().once().return_once(|| Ok(Socket));
1573        stack.expect_connect().once().return_once(|_, _| Ok(()));
1574
1575        endpoint
1576            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1577            .unwrap();
1578
1579        stack
1580            .expect_receive()
1581            .once()
1582            .return_once(|_, _| Err(nb::Error::WouldBlock));
1583
1584        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1585        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1586        assert!(matches!(
1587            endpoint.outgoing_communication.state,
1588            OutgoingState::Idle(_)
1589        ));
1590
1591        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1592        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1593        let request: Message<'_> = Message::new(
1594            Type::Confirmable,
1595            RequestCode::Get.into(),
1596            request_id,
1597            request_token,
1598            Vec::new(),
1599            Some(b"/hello"),
1600        );
1601        let mut request_buf = [0_u8; 32];
1602        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1603
1604        // Manipulate the token to get the according Event
1605        let mut response_token = request_token;
1606        response_token.bytes[0] += 1;
1607        let response: Message<'_> = Message::new(
1608            Type::Acknowledgement,
1609            SuccessCode::Content.into(),
1610            request_id,
1611            response_token,
1612            Vec::new(),
1613            Some(b"world"),
1614        );
1615        let mut response_buf = [0_u8; 32];
1616        let response_length = response.encode(&mut response_buf).unwrap().message_length();
1617
1618        endpoint
1619            .outgoing()
1620            .schedule_con(
1621                RequestCode::Get,
1622                Vec::new(),
1623                Some(b"/hello"),
1624                Duration::from_secs(1),
1625            )
1626            .unwrap();
1627
1628        stack
1629            .expect_receive()
1630            .once()
1631            .return_once(|_, _| Err(nb::Error::WouldBlock));
1632        stack.expect_send().returning(move |_, buffer| {
1633            assert_eq!(buffer, &request_buf[..request_length]);
1634
1635            Ok(())
1636        });
1637
1638        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1639        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1640        assert!(matches!(
1641            endpoint.outgoing_communication.state,
1642            OutgoingState::SendingCon(..)
1643        ));
1644
1645        stack.expect_receive().once().return_once(move |_, buffer| {
1646            buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1647
1648            Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1649        });
1650
1651        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1652        let outgoing = outgoing.unwrap();
1653        assert!(matches!(outgoing, OutgoingEvent::PiggybackedWrongToken));
1654        assert!(matches!(
1655            endpoint.outgoing_communication.state,
1656            OutgoingState::SendingCon(..)
1657        ));
1658
1659        endpoint.outgoing().cancel().unwrap();
1660        assert!(matches!(
1661            endpoint.outgoing_communication.state,
1662            OutgoingState::Idle(false)
1663        ));
1664    }
1665
1666    #[test]
1667    fn send_con_get_timeout() {
1668        let mut stack = MockStack::default();
1669
1670        let clock = MyClock {
1671            last_time: RefCell::new(Duration::from_secs(0)),
1672            now: RefCell::new(Duration::from_secs(1)),
1673        };
1674
1675        let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1676
1677        let mut endpoint: CoapEndpoint<
1678            '_,
1679            MockStack,
1680            Random,
1681            MyClock,
1682            8,
1683            32,
1684            128,
1685            //{ coap_zero::DEFAULT_COAP_MESSAGE_SIZE },
1686        > = CoapEndpoint::try_new(
1687            TransmissionParameters::default(),
1688            Random { value: 0 },
1689            &clock,
1690            &mut receive_buffer,
1691        )
1692        .unwrap();
1693
1694        stack.expect_socket().once().return_once(|| Ok(Socket));
1695        stack.expect_connect().once().return_once(|_, _| Ok(()));
1696
1697        endpoint
1698            .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1699            .unwrap();
1700
1701        stack
1702            .expect_receive()
1703            .once()
1704            .return_once(|_, _| Err(nb::Error::WouldBlock));
1705
1706        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1707        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1708        assert!(matches!(
1709            endpoint.outgoing_communication.state,
1710            OutgoingState::Idle(_)
1711        ));
1712
1713        let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1714        let request_token = endpoint.outgoing_communication.next_token.unwrap();
1715        let request: Message<'_> = Message::new(
1716            Type::Confirmable,
1717            RequestCode::Get.into(),
1718            request_id,
1719            request_token,
1720            Vec::new(),
1721            Some(b"/hello"),
1722        );
1723        let mut request_buf = [0_u8; 32];
1724        let request_length = request.encode(&mut request_buf).unwrap().message_length();
1725
1726        endpoint
1727            .outgoing()
1728            .schedule_con(
1729                RequestCode::Get,
1730                Vec::new(),
1731                Some(b"/hello"),
1732                Duration::from_secs(1),
1733            )
1734            .unwrap();
1735
1736        stack
1737            .expect_receive()
1738            .once()
1739            .return_once(|_, _| Err(nb::Error::WouldBlock));
1740        stack.expect_send().returning(move |_, buffer| {
1741            assert_eq!(buffer, &request_buf[..request_length]);
1742
1743            Ok(())
1744        });
1745
1746        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1747        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1748        assert!(matches!(
1749            endpoint.outgoing_communication.state,
1750            OutgoingState::SendingCon(..)
1751        ));
1752
1753        stack
1754            .expect_receive()
1755            .returning(|_, _| Err(nb::Error::WouldBlock));
1756
1757        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1758        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1759        assert!(matches!(
1760            endpoint.outgoing_communication.state,
1761            OutgoingState::SendingCon(..)
1762        ));
1763
1764        // 4 Retries
1765        clock.advance(Duration::from_secs(3));
1766        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1767        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1768        assert!(matches!(
1769            endpoint.outgoing_communication.state,
1770            OutgoingState::SendingCon(..)
1771        ));
1772
1773        clock.advance(Duration::from_secs(5));
1774        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1775        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1776        assert!(matches!(
1777            endpoint.outgoing_communication.state,
1778            OutgoingState::SendingCon(..)
1779        ));
1780
1781        clock.advance(Duration::from_secs(9));
1782        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1783        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1784        assert!(matches!(
1785            endpoint.outgoing_communication.state,
1786            OutgoingState::SendingCon(..)
1787        ));
1788
1789        clock.advance(Duration::from_secs(17));
1790        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1791        assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1792        assert!(matches!(
1793            endpoint.outgoing_communication.state,
1794            OutgoingState::SendingCon(..)
1795        ));
1796
1797        // Timeout
1798        clock.advance(Duration::from_secs(33));
1799        let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1800        assert!(matches!(outgoing.unwrap(), OutgoingEvent::Timeout));
1801        assert!(matches!(
1802            endpoint.outgoing_communication.state,
1803            OutgoingState::Idle(false)
1804        ));
1805    }
1806}