1pub mod connect;
23pub mod error;
24pub mod incoming;
25pub mod outgoing;
26
27use core::time::Duration;
28
29use byteorder::{ByteOrder, LittleEndian};
30use embedded_hal::blocking::rng;
31use embedded_nal::{SocketAddr, UdpClientStack};
32use embedded_timers::clock::Clock;
33
34use crate::message::{self, encoded_message::EncodedMessage, token::Token, Message};
35
36use self::{error::Error, incoming::IncomingCommunication, outgoing::OutgoingCommunication};
37
38pub type Uri = iri_string::types::RiReferenceStr<iri_string::spec::UriSpec>;
40
41pub const DEFAULT_COAP_PORT: u16 = 5683;
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum EndpointEvent<'a> {
47 Nothing,
49 MsgFormatErr(message::Error),
51 Ping,
54 Unhandled(EncodedMessage<'a>),
57}
58
59#[derive(Debug, Clone, Copy)]
61pub struct TransmissionParameters {
62 pub ack_timeout: Duration,
64 pub ack_random_factor: f32,
66 pub max_retransmit: u32,
68}
69
70impl Default for TransmissionParameters {
72 fn default() -> Self {
73 Self {
74 ack_timeout: Duration::from_secs(2),
75 ack_random_factor: 1.5,
76 max_retransmit: 4,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy)]
84pub struct RetransmissionState {
85 retransmission_counter: Option<u32>,
88 last_transmission_instant: Option<embedded_timers::clock::Instant>,
91 initial_timeout: Duration,
94 max_retransmit: u32,
96}
97
98pub struct RetransmissionTimeout;
100
101impl RetransmissionState {
102 pub fn new(transmission_parameters: TransmissionParameters, random: f32) -> Self {
109 let min_timeout = transmission_parameters.ack_timeout.as_millis() as f32;
110 let max_timeout = min_timeout * transmission_parameters.ack_random_factor;
111 let initial_timeout_millis = max_timeout * random + min_timeout * (1.0 - random);
112 let initial_timeout = Duration::from_millis(initial_timeout_millis as u64);
113 Self {
114 retransmission_counter: None,
115 last_transmission_instant: None,
116 initial_timeout,
117 max_retransmit: transmission_parameters.max_retransmit,
118 }
119 }
120
121 pub fn retransmit_required(
130 &mut self,
131 now: embedded_timers::clock::Instant,
132 ) -> Result<bool, RetransmissionTimeout> {
133 match (self.retransmission_counter, self.last_transmission_instant) {
134 (Some(retransmission_counter), Some(last_instant)) => {
135 let waited = now - last_instant;
140 let timeout = self.initial_timeout * 2u32.pow(retransmission_counter);
141 if waited < timeout {
142 Ok(false)
143 } else if retransmission_counter < self.max_retransmit {
144 self.retransmission_counter = Some(retransmission_counter + 1);
145 self.last_transmission_instant = Some(now);
146 Ok(true)
147 } else {
148 Err(RetransmissionTimeout)
149 }
150 }
151 _ => {
152 self.retransmission_counter = Some(0);
153 self.last_transmission_instant = Some(now);
154 Ok(true)
155 }
156 }
157 }
158}
159
160fn get_random<RNG: rng::Read>(rng: &mut RNG) -> Result<f32, RNG::Error> {
162 let mut random_buf = [0_u8; 4];
163 rng.read(&mut random_buf)?;
164 Ok(u32::from_be_bytes(random_buf) as f32 / (u32::MAX - 1) as f32)
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
170struct MessageId(u16);
171
172impl MessageId {
173 fn try_new<RNG: rng::Read>(rng: &mut RNG) -> Result<Self, RNG::Error> {
178 let mut counter_bytes = [0_u8; 2];
179
180 while counter_bytes == [0, 0] || counter_bytes == [0xFF, 0xFF] {
181 rng.read(&mut counter_bytes)?;
182 }
183
184 Ok(Self(LittleEndian::read_u16(&counter_bytes)))
185 }
186
187 fn next(&mut self) -> u16 {
189 self.0 += 1;
190
191 if self.0 == u16::MAX || self.0 == 0 {
192 self.0 = 1;
193 }
194
195 self.0
196 }
197}
198
199#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
200pub(crate) struct MessageIdentification {
201 pub(crate) id: u16,
202 pub(crate) token: Token,
203}
204
205#[derive(Debug)]
206pub(crate) struct MessageBuffer<const BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE }> {
207 buffer: [u8; BUFFER_SIZE],
208 length: usize,
209}
210
211impl<const BUFFER_SIZE: usize> Default for MessageBuffer<BUFFER_SIZE> {
212 fn default() -> Self {
213 Self {
214 buffer: [0_u8; BUFFER_SIZE],
215 length: 0,
216 }
217 }
218}
219
220impl<const BUFFER_SIZE: usize> MessageBuffer<BUFFER_SIZE> {
221 fn replace_with<E>(&mut self, message: EncodedMessage<'_>) -> Result<(), Error<E>> {
223 if message.data.len() > self.buffer.len() {
224 return Err(Error::OutOfMemory);
225 }
226
227 self.length = message.data.len();
228
229 self.buffer[..self.length].copy_from_slice(message.data);
230
231 Ok(())
232 }
233
234 fn encode<const OPTION_COUNT: usize, E>(
235 &mut self,
236 message: Message<OPTION_COUNT>,
237 ) -> Result<(), Error<E>> {
238 match message.encode(&mut self.buffer) {
239 Ok(encoded) => {
240 self.length = encoded.message_length();
241 Ok(())
242 }
243 Err(e) => {
244 self.length = 0;
247 Err(Error::MessageError(e))
248 }
249 }
250 }
251
252 pub fn message(&self) -> Option<EncodedMessage<'_>> {
254 if self.length > 0 {
255 Some(EncodedMessage::try_new(&self.buffer[..self.length]).unwrap())
256 } else {
257 None
258 }
259 }
260}
261
262pub(crate) struct Connection<'a, UDP: UdpClientStack> {
264 client_stack: &'a mut UDP,
265 link: &'a mut ConnectionLink<UDP::UdpSocket>,
266}
267
268impl<'a, UDP: UdpClientStack> Connection<'a, UDP> {
269 fn send(&mut self, packet: &[u8]) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
271 nb::block!(self.client_stack.send(&mut self.link.socket, packet)).map_err(Error::Network)
272 }
273 fn receive(
275 &mut self,
276 buffer: &mut [u8],
277 ) -> Result<Option<usize>, Error<<UDP as UdpClientStack>::Error>> {
278 match self.client_stack.receive(&mut self.link.socket, buffer) {
279 Ok((message_size, sender)) => {
280 if sender == self.link.addr {
281 Ok(Some(message_size))
282 } else {
283 Ok(None)
286 }
287 }
288 Err(nb::Error::WouldBlock) => Ok(None),
289 Err(nb::Error::Other(e)) => Err(Error::Network(e)),
290 }
291 }
292}
293
294#[derive(Debug)]
295pub(crate) struct ConnectionLink<Socket> {
296 socket: Socket,
297 addr: SocketAddr,
298}
299
300#[derive(Debug)]
302pub struct CoapEndpoint<
303 'a,
304 UDP,
305 RNG,
306 CLOCK,
307 const MAX_OPTION_COUNT: usize = { crate::DEFAULT_MAX_OPTION_COUNT },
308 const MAX_OPTION_SIZE: usize = { crate::DEFAULT_MAX_OPTION_SIZE },
309 const INCOMING_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
310 const OUTGOING_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
311 const RECEIVE_BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
312> where
313 UDP: UdpClientStack,
314 RNG: rng::Read,
315 CLOCK: Clock,
316{
317 connection_link: Option<ConnectionLink<UDP::UdpSocket>>,
318 message_id_counter: MessageId,
319 rng: RNG,
320 incoming_communication: IncomingCommunication<
321 'a,
322 UDP,
323 CLOCK,
324 INCOMING_BUFFER_SIZE,
325 MAX_OPTION_COUNT,
326 MAX_OPTION_SIZE,
327 >,
328 outgoing_communication: OutgoingCommunication<
329 'a,
330 CLOCK,
331 UDP,
332 OUTGOING_BUFFER_SIZE,
333 MAX_OPTION_COUNT,
334 MAX_OPTION_SIZE,
335 >,
336 receive_buffer: &'a mut [u8],
339}
340
341impl<
342 'a,
343 UDP,
344 RNG,
345 CLOCK,
346 const MAX_OPTION_COUNT: usize,
347 const MAX_OPTION_SIZE: usize,
348 const INCOMING_BUFFER_SIZE: usize,
349 const OUTGOING_BUFFER_SIZE: usize,
350 const RECEIVE_BUFFER_SIZE: usize,
351 >
352 CoapEndpoint<
353 'a,
354 UDP,
355 RNG,
356 CLOCK,
357 MAX_OPTION_COUNT,
358 MAX_OPTION_SIZE,
359 INCOMING_BUFFER_SIZE,
360 OUTGOING_BUFFER_SIZE,
361 RECEIVE_BUFFER_SIZE,
362 >
363where
364 UDP: UdpClientStack,
365 RNG: rng::Read,
366 CLOCK: Clock,
367{
368 pub fn try_new(
372 transmission_parameters: TransmissionParameters,
373 mut rng: RNG,
374 clock: &'a CLOCK,
375 receive_buffer: &'a mut [u8],
376 ) -> Result<Self, Error<<UDP as UdpClientStack>::Error>> {
377 let message_id_counter = MessageId::try_new(&mut rng).map_err(|_| Error::Rng)?;
378
379 let mut slf = Self {
380 connection_link: None,
381 message_id_counter,
382 rng,
383 incoming_communication: IncomingCommunication::new(clock, transmission_parameters),
384 outgoing_communication: OutgoingCommunication::new(clock, transmission_parameters),
385 receive_buffer,
386 };
387
388 if slf.incoming_communication.next_message_id.is_none() {
389 slf.incoming_communication.next_message_id = Some(slf.message_id_counter.next());
390 }
391 let random = get_random(&mut slf.rng).map_err(|_| Error::Rng)?;
392 slf.incoming_communication.next_random = Some(random);
393
394 if slf.outgoing_communication.next_message_id.is_none() {
395 slf.outgoing_communication.next_message_id = Some(slf.message_id_counter.next());
396 }
397 if slf.outgoing_communication.next_token.is_none() {
398 slf.outgoing_communication.next_token = Some(
399 Token::try_new(crate::message::token::TokenLength::Eight, &mut slf.rng)
400 .map_err(|_| Error::Rng)?,
401 );
402 }
403 let random = get_random(&mut slf.rng).map_err(|_| Error::Rng)?;
404 slf.outgoing_communication.next_random = Some(random);
405
406 Ok(slf)
407 }
408
409 pub fn incoming(
411 &mut self,
412 ) -> &mut IncomingCommunication<
413 'a,
414 UDP,
415 CLOCK,
416 INCOMING_BUFFER_SIZE,
417 MAX_OPTION_COUNT,
418 MAX_OPTION_SIZE,
419 > {
420 &mut self.incoming_communication
421 }
422
423 pub fn outgoing(
425 &mut self,
426 ) -> &mut OutgoingCommunication<
427 'a,
428 CLOCK,
429 UDP,
430 OUTGOING_BUFFER_SIZE,
431 MAX_OPTION_COUNT,
432 MAX_OPTION_SIZE,
433 > {
434 &mut self.outgoing_communication
435 }
436
437 pub fn process(
457 &mut self,
458 client_stack: &mut UDP,
459 ) -> Result<
460 (
461 Result<incoming::IncomingEvent, Error<<UDP as UdpClientStack>::Error>>,
462 Result<outgoing::OutgoingEvent, Error<<UDP as UdpClientStack>::Error>>,
463 EndpointEvent,
464 ),
465 Error<<UDP as UdpClientStack>::Error>,
466 > {
467 if self.incoming_communication.next_message_id.is_none() {
468 self.incoming_communication.next_message_id = Some(self.message_id_counter.next());
469 }
470 if self.incoming_communication.next_random.is_none() {
471 let random = get_random(&mut self.rng).map_err(|_| Error::Rng)?;
472 self.incoming_communication.next_random = Some(random);
473 }
474
475 if self.outgoing_communication.next_message_id.is_none() {
476 self.outgoing_communication.next_message_id = Some(self.message_id_counter.next());
477 }
478 if self.outgoing_communication.next_token.is_none() {
479 self.outgoing_communication.next_token = Some(
480 Token::try_new(crate::message::token::TokenLength::Eight, &mut self.rng)
481 .map_err(|_| Error::Rng)?,
482 );
483 }
484 if self.outgoing_communication.next_random.is_none() {
485 let random = get_random(&mut self.rng).map_err(|_| Error::Rng)?;
486 self.outgoing_communication.next_random = Some(random);
487 }
488
489 let Some(ref mut link) = self.connection_link else {
490 return Err(Error::NotConnected);
491 };
492 let mut connection = Connection { client_stack, link };
493
494 let mut endpoint_event = EndpointEvent::Nothing;
499
500 let mut received_message = None;
501 if let Some(message_len) = connection.receive(self.receive_buffer)? {
502 match EncodedMessage::try_new(&self.receive_buffer[..message_len]) {
503 Ok(message) => {
504 match message.check_msg_format::<MAX_OPTION_SIZE>() {
505 Ok(()) => received_message = Some(message),
506 Err(e) => {
507 use message::Type::{Confirmable, NonConfirmable};
510 if matches!(message.message_type(), Confirmable | NonConfirmable) {
511 connection.send(&EncodedMessage::rst(message.message_id()))?;
512 }
513 endpoint_event = EndpointEvent::MsgFormatErr(e);
514 }
515 }
516 }
517 Err(e) => {
518 endpoint_event = EndpointEvent::MsgFormatErr(e);
521 }
522 }
523 }
524
525 if let Some(message) = received_message.as_mut() {
526 if message.is_ping().unwrap() {
527 connection.send(&EncodedMessage::rst(message.message_id()))?;
528 received_message = None;
529 endpoint_event = EndpointEvent::Ping;
530 }
531 }
532
533 let outgoing_result = self
534 .outgoing_communication
535 .process_outgoing(&mut connection, &mut received_message);
536
537 let incoming_result = self
538 .incoming_communication
539 .process_incoming(&mut connection, &mut received_message);
540
541 if let Some(message) = received_message {
542 use message::Type::{Confirmable, NonConfirmable};
543 if matches!(message.message_type(), Confirmable | NonConfirmable)
544 && message.is_response().unwrap()
545 {
546 connection.send(&EncodedMessage::rst(message.message_id()))?;
547 }
548 endpoint_event = EndpointEvent::Unhandled(message);
549 }
550
551 Ok((incoming_result, outgoing_result, endpoint_event))
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use core::{cell::RefCell, time::Duration};
558
559 use embedded_hal::prelude::_embedded_hal_blocking_rng_Read;
560 use embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
561 use heapless::Vec;
562 use mockall::predicate::*;
563 use mockall::*;
564
565 use crate::{
566 endpoint::outgoing::OutgoingEvent,
567 message::{
568 codes::RequestCode,
569 encoded_message::EncodedMessage,
570 token::{Token, TokenLength},
571 Message, Type,
572 },
573 };
574
575 use super::{incoming::IncomingEvent, CoapEndpoint, EndpointEvent, TransmissionParameters};
576
577 #[derive(Debug)]
578 struct Random {
579 value: u128,
580 }
581
582 impl embedded_hal::blocking::rng::Read for Random {
583 type Error = std::io::Error;
584
585 fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
586 self.value += 1;
587
588 let buf_len = buf.len();
589
590 buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
591
592 Ok(())
593 }
594 }
595
596 #[derive(Debug)]
597 struct StackError;
598
599 struct Socket;
600
601 mock! {
602 Stack {}
603
604 impl UdpClientStack for Stack {
605 type UdpSocket = Socket;
606 type Error = StackError;
607
608 fn socket(&mut self) -> Result<Socket, StackError>;
609 fn connect(
610 &mut self,
611 socket: &mut Socket,
612 remote: SocketAddr
613 ) -> Result<(), StackError>;
614 fn send(
615 &mut self,
616 socket: &mut Socket,
617 buffer: &[u8]
618 ) -> Result<(), nb::Error<StackError>>;
619 fn receive(
620 &mut self,
621 socket: &mut Socket,
622 buffer: &mut [u8]
623 ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
624 fn close(&mut self, socket: Socket) -> Result<(), StackError>;
625 }
626 }
627
628 #[derive(Debug)]
629 struct MyClock {
630 last_time: RefCell<Duration>,
631 now: RefCell<Duration>,
632 }
633
634 impl embedded_timers::clock::Clock for MyClock {
635 fn try_now(
636 &self,
637 ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
638 *self.last_time.borrow_mut() = *self.now.borrow();
639 Ok(*self.now.borrow())
640 }
641 }
642
643 #[test]
644 fn nothing() {
645 let mut stack = MockStack::default();
646
647 let clock = MyClock {
648 last_time: RefCell::new(Duration::from_secs(0)),
649 now: RefCell::new(Duration::from_secs(1)),
650 };
651
652 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
653
654 let mut endpoint: CoapEndpoint<
655 '_,
656 MockStack,
657 Random,
658 MyClock,
659 8,
660 32,
661 128,
662 > = CoapEndpoint::try_new(
664 TransmissionParameters::default(),
665 Random { value: 0 },
666 &clock,
667 &mut receive_buffer,
668 )
669 .unwrap();
670
671 stack.expect_socket().once().return_once(|| Ok(Socket));
672 stack.expect_connect().once().return_once(|_, _| Ok(()));
673
674 endpoint
675 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
676 .unwrap();
677
678 stack
679 .expect_receive()
680 .once()
681 .return_once(|_, _| Err(nb::Error::WouldBlock));
682
683 let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
684 assert!(matches!(endpoint_event, EndpointEvent::Nothing));
685 }
686
687 #[test]
688 fn message_format_error() {
689 let mut stack = MockStack::default();
690
691 let clock = MyClock {
692 last_time: RefCell::new(Duration::from_secs(0)),
693 now: RefCell::new(Duration::from_secs(1)),
694 };
695
696 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
697
698 let mut endpoint: CoapEndpoint<
699 '_,
700 MockStack,
701 Random,
702 MyClock,
703 8,
704 32,
705 128,
706 > = CoapEndpoint::try_new(
708 TransmissionParameters::default(),
709 Random { value: 0 },
710 &clock,
711 &mut receive_buffer,
712 )
713 .unwrap();
714
715 stack.expect_socket().once().return_once(|| Ok(Socket));
716 stack.expect_connect().once().return_once(|_, _| Ok(()));
717
718 endpoint
719 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
720 .unwrap();
721
722 stack.expect_receive().once().return_once(|_, buffer| {
723 let malformed_message = [
724 72, 1, 0, 4, 5, 0, 0, 0, 0, 0, 0, 0, 254, 47, 104, 101, 108, 108, 111,
727 ];
728
729 buffer[..malformed_message.len()].copy_from_slice(&malformed_message);
730
731 Ok((
732 malformed_message.len(),
733 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
734 ))
735 });
736
737 let (incoming_event, _, _) = endpoint.process(&mut stack).unwrap();
738 match incoming_event.unwrap() {
739 IncomingEvent::Request(_con, req) => {
740 let msg: Result<Message<'_, 8>, crate::message::Error> = req.try_into();
741 if !matches!(msg, Err(crate::message::Error::InvalidOption(_))) {
742 panic!("Expected Err(InvalidOption), got: {msg:?}");
743 }
744 }
745 event => {
746 panic!("Expected IncomingEvent::Request, got: {event:?}");
747 }
748 }
749
750 stack.expect_receive().once().return_once(|_, buffer| {
751 let malformed_message = [
752 72, 5, 0, 4, 5, 0, 0, 0, 0, 0, 0, 0, 255, 47, 104, 101, 108, 108, 111,
754 ];
755
756 buffer[..malformed_message.len()].copy_from_slice(&malformed_message);
757
758 Ok((
759 malformed_message.len(),
760 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
761 ))
762 });
763 stack.expect_send().once().return_once(move |_, buf| {
764 assert_eq!(buf, crate::message::encoded_message::EncodedMessage::rst(4));
766 Ok(())
767 });
768
769 let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
770 assert!(matches!(endpoint_event, EndpointEvent::MsgFormatErr(_)));
771 }
772
773 #[test]
774 fn ping_pong() {
775 let mut stack = MockStack::default();
776
777 let clock = MyClock {
778 last_time: RefCell::new(Duration::from_secs(0)),
779 now: RefCell::new(Duration::from_secs(1)),
780 };
781
782 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
783
784 let mut endpoint: CoapEndpoint<
785 '_,
786 MockStack,
787 Random,
788 MyClock,
789 8,
790 32,
791 128,
792 > = CoapEndpoint::try_new(
794 TransmissionParameters::default(),
795 Random { value: 0 },
796 &clock,
797 &mut receive_buffer,
798 )
799 .unwrap();
800
801 stack.expect_socket().once().return_once(|| Ok(Socket));
802 stack.expect_connect().once().return_once(|_, _| Ok(()));
803
804 endpoint
805 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
806 .unwrap();
807
808 let message_id = 1;
809 let ping: Message<0> = Message::new_ping(message_id);
810 let mut pong_buf = [0_u8; 4];
811 let _encoded_pong = EncodedMessage::new_rst(message_id, &mut pong_buf);
812
813 stack.expect_receive().once().return_once(move |_, buffer| {
814 let encoded_message = ping.encode(buffer).unwrap();
815
816 Ok((
817 encoded_message.message_length(),
818 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
819 ))
820 });
821 stack.expect_send().once().return_once(move |_, buf| {
822 assert_eq!(buf, pong_buf);
823 Ok(())
824 });
825
826 let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
827 assert!(matches!(endpoint_event, EndpointEvent::Ping));
828
829 stack
830 .expect_receive()
831 .once()
832 .return_once(|_, _| Err(nb::Error::WouldBlock));
833
834 let (_, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
835 assert!(matches!(endpoint_event, EndpointEvent::Nothing));
836 }
837
838 #[test]
839 fn unhandled() {
840 let mut stack = MockStack::default();
841
842 let clock = MyClock {
843 last_time: RefCell::new(Duration::from_secs(0)),
844 now: RefCell::new(Duration::from_secs(1)),
845 };
846
847 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
848
849 let mut endpoint: CoapEndpoint<
850 '_,
851 MockStack,
852 Random,
853 MyClock,
854 8,
855 32,
856 128,
857 > = CoapEndpoint::try_new(
859 TransmissionParameters::default(),
860 Random { value: 0 },
861 &clock,
862 &mut receive_buffer,
863 )
864 .unwrap();
865
866 stack.expect_socket().once().return_once(|| Ok(Socket));
867 stack.expect_connect().once().return_once(|_, _| Ok(()));
868
869 endpoint
870 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
871 .unwrap();
872
873 let next_message_id_1 = endpoint.message_id_counter.next();
874 let mut token_1 = Token::default();
875 token_1.length = TokenLength::Eight;
876 endpoint.rng.read(&mut token_1.bytes).unwrap();
877 let request_1: Message<'_> = Message::new(
878 Type::Confirmable,
879 RequestCode::Get.into(),
880 next_message_id_1,
881 token_1,
882 Vec::new(),
883 Some(b"/hello"),
884 );
885
886 let next_message_id_2 = endpoint.message_id_counter.next();
887 let mut token_2 = Token::default();
888 token_2.length = TokenLength::Eight;
889 endpoint.rng.read(&mut token_2.bytes).unwrap();
890 let request_2: Message<'_> = Message::new(
891 Type::Confirmable,
892 RequestCode::Get.into(),
893 next_message_id_2,
894 token_2,
895 Vec::new(),
896 Some(b"/hello"),
897 );
898 let request_test = request_2.clone();
899
900 stack.expect_receive().once().return_once(move |_, buffer| {
901 let encoded_message = request_1.encode(buffer).unwrap();
902
903 Ok((
904 encoded_message.message_length(),
905 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
906 ))
907 });
908
909 let (incoming, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
910 assert!(matches!(incoming.unwrap(), IncomingEvent::Request(true, _)));
911 assert!(matches!(endpoint_event, EndpointEvent::Nothing));
912
913 stack.expect_receive().once().return_once(move |_, buffer| {
914 let encoded_message = request_2.encode(buffer).unwrap();
915
916 Ok((
917 encoded_message.message_length(),
918 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
919 ))
920 });
921
922 let (incoming, _, endpoint_event) = endpoint.process(&mut stack).unwrap();
923 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
924 assert!(matches!(endpoint_event, EndpointEvent::Unhandled(_)));
925
926 match endpoint_event {
927 EndpointEvent::Unhandled(message) => {
928 assert_eq!(Message::try_from(message).unwrap(), request_test)
929 }
930 _ => (),
931 }
932 }
933
934 #[test]
935 fn ignore_message() {
936 let mut stack = MockStack::default();
937
938 let clock = MyClock {
939 last_time: RefCell::new(Duration::from_secs(0)),
940 now: RefCell::new(Duration::from_secs(1)),
941 };
942
943 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
944
945 let mut endpoint: CoapEndpoint<
946 '_,
947 MockStack,
948 Random,
949 MyClock,
950 8,
951 32,
952 128,
953 > = CoapEndpoint::try_new(
955 TransmissionParameters::default(),
956 Random { value: 0 },
957 &clock,
958 &mut receive_buffer,
959 )
960 .unwrap();
961
962 stack.expect_socket().once().return_once(|| Ok(Socket));
963 stack.expect_connect().once().return_once(|_, _| Ok(()));
964
965 endpoint
966 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
967 .unwrap();
968
969 let next_message_id = endpoint.message_id_counter.next();
970 let mut token = Token::default();
971 token.length = TokenLength::Eight;
972 endpoint.rng.read(&mut token.bytes).unwrap();
973 let request: Message<'_> = Message::new(
974 Type::Confirmable,
975 RequestCode::Get.into(),
976 next_message_id,
977 token,
978 Vec::new(),
979 Some(b"/hello"),
980 );
981
982 stack.expect_receive().once().return_once(move |_, buffer| {
983 let encoded_message = request.encode(buffer).unwrap();
984
985 Ok((
986 encoded_message.message_length(),
987 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 5683),
989 ))
990 });
991
992 let (incoming, outgoing, endpoint_event) = endpoint.process(&mut stack).unwrap();
993 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
994 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
995 assert!(matches!(endpoint_event, EndpointEvent::Nothing));
996 }
997}