1use core::marker::PhantomData;
10
11use embedded_nal::UdpClientStack;
12use embedded_timers::clock::Clock;
13use heapless::Vec;
14
15use crate::message::{
16 codes::ResponseCode, encoded_message::EncodedMessage, options::CoapOption, Message, Type,
17};
18
19use super::{
20 error::Error, Connection, MessageBuffer, MessageIdentification, RetransmissionState,
21 RetransmissionTimeout, TransmissionParameters,
22};
23
24#[derive(Debug, Clone, Copy)]
26pub enum IncomingState {
27 Idle(LastResponse),
32 Received(bool),
57 SendingAck,
63 SendingRst,
65 SendingPiggybacked,
69 AwaitingResponse,
77 SendingCon(RetransmissionState),
83 SendingNon,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum LastResponse {
92 NoResponse,
96 RespRst,
101 RespCon(bool),
108 RespNon,
113}
114
115#[must_use]
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum IncomingEvent<'a> {
124 Nothing,
127 Request(bool, EncodedMessage<'a>),
139 DuplicatedRequest,
144 SendAck,
148 SendCon,
151 Success,
157 SendRst,
160 Timeout,
163 RecvRst,
165}
166
167#[derive(Debug)]
170pub struct IncomingCommunication<
171 'a,
172 UDP: UdpClientStack,
173 CLOCK: Clock,
174 const BUFFER_SIZE: usize = { crate::DEFAULT_COAP_MESSAGE_SIZE },
175 const MAX_OPTION_COUNT: usize = { crate::DEFAULT_MAX_OPTION_COUNT },
176 const MAX_OPTION_SIZE: usize = { crate::DEFAULT_MAX_OPTION_SIZE },
177> {
178 message_buffer: MessageBuffer<BUFFER_SIZE>,
179 last_request: Option<MessageIdentification>,
182 state: IncomingState,
183 clock: &'a CLOCK,
184 transmission_parameters: TransmissionParameters,
185 pub(super) next_message_id: Option<u16>,
188 pub(super) next_random: Option<f32>,
192 _udp: PhantomData<UDP>,
193}
194
195impl<
196 'a,
197 UDP,
198 CLOCK,
199 const BUFFER_SIZE: usize,
200 const MAX_OPTION_COUNT: usize,
201 const MAX_OPTION_SIZE: usize,
202 > IncomingCommunication<'a, UDP, CLOCK, BUFFER_SIZE, MAX_OPTION_COUNT, MAX_OPTION_SIZE>
203where
204 UDP: UdpClientStack,
205 CLOCK: Clock,
206{
207 pub fn new(clock: &'a CLOCK, transmission_parameters: TransmissionParameters) -> Self {
210 Self {
211 message_buffer: MessageBuffer::default(),
212 last_request: None,
213 state: IncomingState::Idle(LastResponse::NoResponse),
214 clock,
215 transmission_parameters,
216 next_message_id: None,
217 next_random: None,
218 _udp: PhantomData,
219 }
220 }
221
222 pub fn state(&self) -> IncomingState {
226 self.state
227 }
228
229 pub fn reset(&mut self) {
231 *self = Self::new(self.clock, self.transmission_parameters);
232 }
233
234 pub fn request(&self) -> Result<EncodedMessage, Error<<UDP as UdpClientStack>::Error>> {
242 use IncomingState::*;
243 match self.state {
244 Received(_) | SendingAck | AwaitingResponse => {
245 Ok(self.message_buffer.message().unwrap())
246 }
247 _ => Err(Error::Forbidden),
248 }
249 }
250
251 pub fn schedule_response(
262 &mut self,
263 code: ResponseCode,
264 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
265 payload: Option<&[u8]>,
266 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
267 match self.state {
268 IncomingState::Received(true) => {
269 self.schedule_piggybacked_response(code, options, payload)
270 }
271 IncomingState::Received(false) => self.schedule_non_response(code, options, payload),
272 _ => Err(Error::Forbidden),
273 }
274 }
275
276 pub fn schedule_empty_ack(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
280 if !matches!(self.state, IncomingState::Received(true)) {
281 return Err(Error::Forbidden);
282 }
283 self.state = IncomingState::SendingAck;
284 Ok(())
285 }
286
287 pub fn schedule_rst(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
289 if !matches!(self.state, IncomingState::Received(_)) {
290 return Err(Error::Forbidden);
291 }
292 self.state = IncomingState::SendingRst;
293 Ok(())
294 }
295
296 pub fn schedule_piggybacked_response(
301 &mut self,
302 code: ResponseCode,
303 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
304 payload: Option<&[u8]>,
305 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
306 if !matches!(self.state, IncomingState::Received(true)) {
307 return Err(Error::Busy);
308 }
309
310 let ident = self.last_request.as_ref().unwrap();
311
312 let message: Message<MAX_OPTION_COUNT> = Message::new(
313 Type::Acknowledgement,
314 code.into(),
315 ident.id,
316 ident.token,
317 options,
318 payload,
319 );
320 self.message_buffer.encode(message)?;
321
322 self.state = IncomingState::SendingPiggybacked;
323
324 Ok(())
325 }
326
327 pub fn schedule_con_response(
329 &mut self,
330 code: ResponseCode,
331 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
332 payload: Option<&[u8]>,
333 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
334 self.schedule_separate_response(Type::Confirmable, code, options, payload)?;
335 let retransmission_state = RetransmissionState::new(
336 self.transmission_parameters,
337 self.next_random.take().unwrap(),
338 );
339 self.state = IncomingState::SendingCon(retransmission_state);
340 Ok(())
341 }
342
343 pub fn schedule_non_response(
345 &mut self,
346 code: ResponseCode,
347 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
348 payload: Option<&[u8]>,
349 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
350 self.schedule_separate_response(Type::NonConfirmable, code, options, payload)?;
351 self.state = IncomingState::SendingNon;
352 Ok(())
353 }
354
355 fn schedule_separate_response(
359 &mut self,
360 response_type: Type,
361 code: ResponseCode,
362 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
363 payload: Option<&[u8]>,
364 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
365 if !matches!(self.state, IncomingState::AwaitingResponse)
366 && !matches!(self.state, IncomingState::Received(false))
367 {
368 return Err(Error::Busy);
369 }
370
371 let message: Message<MAX_OPTION_COUNT> = Message::new(
372 response_type,
373 code.into(),
374 self.next_message_id.unwrap(),
375 self.last_request.unwrap().token,
376 options,
377 payload,
378 );
379
380 self.message_buffer.encode(message)?;
381
382 self.next_message_id = None;
383
384 Ok(())
385 }
386
387 pub(crate) fn process_incoming(
395 &mut self,
396 connection: &mut Connection<UDP>,
397 received: &mut Option<EncodedMessage<'_>>,
398 ) -> Result<IncomingEvent, Error<<UDP as UdpClientStack>::Error>> {
399 use IncomingEvent::*;
400 use IncomingState::*;
401 use LastResponse::*;
402
403 if let (Some(message), Some(last_message_ident)) = (&received, self.last_request) {
415 if message.is_request().unwrap()
416 && last_message_ident.id == message.message_id()
417 && last_message_ident.token == message.token().unwrap()
418 {
419 match self.state {
420 Idle(NoResponse) => panic!(
421 "In the Idle(NoResponse) state, the internal message buffer should be None"
422 ),
423 Idle(RespRst) => {
424 connection.send(&EncodedMessage::rst(last_message_ident.id))?;
426 }
427 Idle(RespCon(true)) => {
428 }
432 Idle(RespCon(false)) => {
433 }
437 Idle(RespNon) => {
438 connection.send(self.message_buffer.message().unwrap().data)?;
441 }
442 Received(_) => {
443 }
445 SendingAck | SendingRst | SendingPiggybacked | SendingNon => {
446 }
448 AwaitingResponse => {
449 connection.send(&EncodedMessage::ack(last_message_ident.id))?;
453 }
454 SendingCon(_) => {
455 }
458 }
459 received.take();
460 return Ok(DuplicatedRequest);
461 }
462 }
463
464 match self.state {
465 Idle(_) => {
466 if let Some(message) = received {
469 if message.is_request().unwrap() {
470 self.last_request = Some(MessageIdentification {
472 id: message.message_id(),
473 token: message.token().unwrap(),
474 });
475 let confirmable = message.message_type() == Type::Confirmable;
476 self.message_buffer.replace_with(received.take().unwrap())?;
477 self.state = Received(confirmable);
478 return Ok(Request(confirmable, self.message_buffer.message().unwrap()));
479 }
480 }
481 }
482 Received(_) | AwaitingResponse => {
483 }
488 SendingAck => {
489 let ident = self.last_request.as_ref().unwrap();
490 connection.send(&EncodedMessage::ack(ident.id))?;
491 self.state = AwaitingResponse;
492 return Ok(SendAck);
493 }
494 SendingRst => {
495 let ident = self.last_request.as_ref().unwrap();
496 connection.send(&EncodedMessage::rst(ident.id))?;
497 self.state = Idle(RespRst);
498 return Ok(SendRst);
499 }
500 SendingPiggybacked | SendingNon => {
501 connection.send(self.message_buffer.message().unwrap().data)?;
502 self.state = Idle(RespNon);
503 return Ok(Success);
504 }
505 SendingCon(ref mut retransmission_state) => {
506 if let Some(message) = received {
507 if message.message_type() == Type::Acknowledgement
508 && message.message_id()
509 == self.message_buffer.message().unwrap().message_id()
510 {
511 received.take();
512 self.state = Idle(RespCon(true));
513 return Ok(Success);
514 } else if message.message_type() == Type::Reset
515 && message.message_id()
516 == self.message_buffer.message().unwrap().message_id()
517 {
518 received.take();
519 self.state = Idle(RespCon(false));
520 return Ok(RecvRst);
521 }
522 }
523 match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
524 Ok(true) => {
525 connection.send(self.message_buffer.message().unwrap().data)?;
526 return Ok(SendCon);
527 }
528 Ok(false) => {}
529 Err(RetransmissionTimeout) => {
530 self.state = Idle(RespCon(false));
531 return Ok(Timeout);
533 }
534 }
535 }
536 }
537
538 Ok(Nothing)
539 }
540}
541
542#[cfg(test)]
543mod tests {
544 use core::{cell::RefCell, time::Duration};
545
546 use embedded_hal::prelude::_embedded_hal_blocking_rng_Read;
547 use embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
548 use heapless::Vec;
549 use mockall::predicate::*;
550 use mockall::*;
551
552 use crate::{
553 endpoint::incoming::{IncomingEvent, IncomingState},
554 message::{
555 codes::{RequestCode, SuccessCode},
556 token::{Token, TokenLength},
557 Message, Type,
558 },
559 };
560
561 use super::{super::CoapEndpoint, TransmissionParameters};
562
563 #[derive(Debug)]
564 struct Random {
565 value: u128,
566 }
567
568 impl embedded_hal::blocking::rng::Read for Random {
569 type Error = std::io::Error;
570
571 fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
572 self.value += 1;
573
574 let buf_len = buf.len();
575
576 buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
577
578 Ok(())
579 }
580 }
581
582 #[derive(Debug)]
583 struct StackError;
584
585 struct Socket;
586
587 mock! {
588 Stack {}
589
590 impl UdpClientStack for Stack {
591 type UdpSocket = Socket;
592 type Error = StackError;
593
594 fn socket(&mut self) -> Result<Socket, StackError>;
595 fn connect(
596 &mut self,
597 socket: &mut Socket,
598 remote: SocketAddr
599 ) -> Result<(), StackError>;
600 fn send(
601 &mut self,
602 socket: &mut Socket,
603 buffer: &[u8]
604 ) -> Result<(), nb::Error<StackError>>;
605 fn receive(
606 &mut self,
607 socket: &mut Socket,
608 buffer: &mut [u8]
609 ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
610 fn close(&mut self, socket: Socket) -> Result<(), StackError>;
611 }
612 }
613
614 #[derive(Debug)]
615 struct MyClock {
616 last_time: RefCell<Duration>,
617 now: RefCell<Duration>,
618 }
619
620 impl embedded_timers::clock::Clock for MyClock {
621 fn try_now(
622 &self,
623 ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
624 *self.last_time.borrow_mut() = *self.now.borrow();
625 Ok(*self.now.borrow())
626 }
627 }
628
629 impl MyClock {
630 fn advance(&self, step: Duration) {
631 *self.now.borrow_mut() = *self.last_time.borrow() + step;
632 }
633 }
634
635 #[test]
636 fn receive_con_get_piggybacked() {
637 let mut stack = MockStack::default();
638
639 let clock = MyClock {
640 last_time: RefCell::new(Duration::from_secs(0)),
641 now: RefCell::new(Duration::from_secs(1)),
642 };
643
644 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
645
646 let mut endpoint: CoapEndpoint<
647 '_,
648 MockStack,
649 Random,
650 MyClock,
651 8,
652 32,
653 128,
654 > = CoapEndpoint::try_new(
656 TransmissionParameters::default(),
657 Random { value: 0 },
658 &clock,
659 &mut receive_buffer,
660 )
661 .unwrap();
662
663 stack.expect_socket().once().return_once(|| Ok(Socket));
664 stack.expect_connect().once().return_once(|_, _| Ok(()));
665
666 endpoint
667 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
668 .unwrap();
669
670 stack
671 .expect_receive()
672 .once()
673 .return_once(|_, _| Err(nb::Error::WouldBlock));
674
675 let (incoming, _, _) = endpoint.process(&mut stack).unwrap();
676 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
677 assert!(matches!(
678 endpoint.incoming_communication.state,
679 IncomingState::Idle(_)
680 ));
681
682 let next_message_id = endpoint.message_id_counter.next();
683 let mut token = Token::default();
684 token.length = TokenLength::Eight;
685 endpoint.rng.read(&mut token.bytes).unwrap();
686 let request: Message<'_> = Message::new(
687 Type::Confirmable,
688 RequestCode::Get.into(),
689 next_message_id,
690 token,
691 Vec::new(),
692 Some(b"/hello"),
693 );
694 let response: Message<'_> = Message::new(
695 Type::Acknowledgement,
696 SuccessCode::Content.into(),
697 next_message_id,
698 token,
699 Vec::new(),
700 Some(b"world"),
701 );
702 let mut response_buf = [0_u8; 32];
703 let response_length = response.encode(&mut response_buf).unwrap().message_length();
704
705 stack.expect_receive().once().return_once(move |_, buffer| {
706 let encoded_message = request.encode(buffer).unwrap();
707
708 Ok((
709 encoded_message.message_length(),
710 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
711 ))
712 });
713
714 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
715 assert!(matches!(incoming.unwrap(), IncomingEvent::Request(true, _)));
716 assert!(matches!(
717 endpoint.incoming_communication.state,
718 IncomingState::Received(true)
719 ));
720 endpoint
723 .incoming()
724 .schedule_piggybacked_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
725 .unwrap();
726
727 stack
728 .expect_receive()
729 .once()
730 .return_once(|_, _| Err(nb::Error::WouldBlock));
731 stack.expect_send().once().return_once(move |_, buffer| {
732 assert_eq!(buffer, &response_buf[..response_length]);
733
734 Ok(())
735 });
736
737 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
738 assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
739 assert!(matches!(
740 endpoint.incoming_communication.state,
741 IncomingState::Idle(_)
742 ));
743 }
744
745 #[test]
746 fn receive_non_get_non() {
747 let mut stack = MockStack::default();
748
749 let clock = MyClock {
750 last_time: RefCell::new(Duration::from_secs(0)),
751 now: RefCell::new(Duration::from_secs(1)),
752 };
753
754 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
755
756 let mut endpoint: CoapEndpoint<
757 '_,
758 MockStack,
759 Random,
760 MyClock,
761 8,
762 32,
763 128,
764 > = CoapEndpoint::try_new(
766 TransmissionParameters::default(),
767 Random { value: 0 },
768 &clock,
769 &mut receive_buffer,
770 )
771 .unwrap();
772
773 stack.expect_socket().once().return_once(|| Ok(Socket));
774 stack.expect_connect().once().return_once(|_, _| Ok(()));
775
776 endpoint
777 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
778 .unwrap();
779
780 stack
781 .expect_receive()
782 .once()
783 .return_once(|_, _| Err(nb::Error::WouldBlock));
784
785 let (incoming, _, _) = endpoint.process(&mut stack).unwrap();
786 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
787 assert!(matches!(
788 endpoint.incoming_communication.state,
789 IncomingState::Idle(_)
790 ));
791
792 let next_message_id = endpoint.message_id_counter.next();
793 let mut token = Token::default();
794 token.length = TokenLength::Eight;
795 endpoint.rng.read(&mut token.bytes).unwrap();
796 let request: Message<'_> = Message::new(
797 Type::NonConfirmable,
798 RequestCode::Get.into(),
799 next_message_id,
800 token,
801 Vec::new(),
802 Some(b"/hello"),
803 );
804 let response: Message<'_> = Message::new(
805 Type::NonConfirmable,
806 SuccessCode::Content.into(),
807 endpoint.incoming_communication.next_message_id.unwrap(),
808 token,
809 Vec::new(),
810 Some(b"world"),
811 );
812 let mut response_buf = [0_u8; 32];
813 let response_length = response.encode(&mut response_buf).unwrap().message_length();
814
815 stack.expect_receive().once().return_once(move |_, buffer| {
816 let encoded_message = request.encode(buffer).unwrap();
817
818 Ok((
819 encoded_message.message_length(),
820 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
821 ))
822 });
823
824 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
825 assert!(matches!(
826 incoming.unwrap(),
827 IncomingEvent::Request(false, _)
828 ));
829 assert!(matches!(
830 endpoint.incoming_communication.state,
831 IncomingState::Received(false)
832 ));
833 stack
836 .expect_receive()
837 .once()
838 .return_once(|_, _| Err(nb::Error::WouldBlock));
839
840 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
841 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
842 assert!(matches!(
843 endpoint.incoming_communication.state,
844 IncomingState::Received(false)
845 ));
846
847 endpoint
848 .incoming()
849 .schedule_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
850 .unwrap();
851
852 stack
853 .expect_receive()
854 .once()
855 .return_once(|_, _| Err(nb::Error::WouldBlock));
856 stack.expect_send().once().return_once(move |_, buffer| {
857 assert_eq!(buffer, &response_buf[..response_length]);
858
859 Ok(())
860 });
861
862 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
863 assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
864 assert!(matches!(
865 endpoint.incoming_communication.state,
866 IncomingState::Idle(_)
867 ));
868 }
869
870 #[test]
871 fn receive_non_get_con() {
872 let mut stack = MockStack::default();
873
874 let clock = MyClock {
875 last_time: RefCell::new(Duration::from_secs(0)),
876 now: RefCell::new(Duration::from_secs(1)),
877 };
878
879 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
880
881 let mut endpoint: CoapEndpoint<
882 '_,
883 MockStack,
884 Random,
885 MyClock,
886 8,
887 32,
888 128,
889 > = CoapEndpoint::try_new(
891 TransmissionParameters::default(),
892 Random { value: 0 },
893 &clock,
894 &mut receive_buffer,
895 )
896 .unwrap();
897
898 let next_message_id = endpoint.message_id_counter.next();
899 let mut token = Token::default();
900 token.length = TokenLength::Eight;
901 endpoint.rng.read(&mut token.bytes).unwrap();
902 let request: Message<'_> = Message::new(
903 Type::NonConfirmable,
904 RequestCode::Get.into(),
905 next_message_id,
906 token,
907 Vec::new(),
908 Some(b"/hello"),
909 );
910
911 stack.expect_socket().once().return_once(|| Ok(Socket));
912 stack.expect_connect().once().return_once(|_, _| Ok(()));
913
914 endpoint
915 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
916 .unwrap();
917
918 stack.expect_receive().once().return_once(move |_, buffer| {
919 let encoded_message = request.encode(buffer).unwrap();
920
921 Ok((
922 encoded_message.message_length(),
923 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
924 ))
925 });
926
927 assert!(matches!(
928 endpoint.incoming_communication.state,
929 IncomingState::Idle(_)
930 ));
931 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
932 assert!(matches!(
933 incoming.unwrap(),
934 IncomingEvent::Request(false, _)
935 ));
936 assert!(matches!(
937 endpoint.incoming_communication.state,
938 IncomingState::Received(false)
939 ));
940
941 endpoint
942 .incoming()
943 .schedule_con_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
944 .unwrap();
945
946 stack
947 .expect_receive()
948 .once()
949 .return_once(|_, _| Err(nb::Error::WouldBlock));
950 stack.expect_send().once().return_once(|_, _| Ok(()));
951
952 assert!(matches!(
953 endpoint.incoming_communication.state,
954 IncomingState::SendingCon(_)
955 ));
956 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
957 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
958 assert!(matches!(
960 endpoint.incoming_communication.state,
961 IncomingState::SendingCon(_)
962 ));
963
964 let ack_id = endpoint
965 .incoming_communication
966 .message_buffer
967 .message()
968 .unwrap()
969 .message_id();
970 let ack = Message::<0>::new_ack(ack_id);
971
972 stack.expect_receive().once().return_once(move |_, buffer| {
973 let encoded_ack = ack.encode(buffer).unwrap();
974 Ok((
975 encoded_ack.message_length(),
976 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
977 ))
978 });
979
980 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
981 assert!(matches!(incoming.unwrap(), IncomingEvent::Success));
982 assert!(matches!(
983 endpoint.incoming_communication.state,
984 IncomingState::Idle(_)
985 ));
986 }
987
988 #[test]
989 fn receive_non_get_con_timeout() {
990 let mut stack = MockStack::default();
991
992 let clock = MyClock {
993 last_time: RefCell::new(Duration::from_secs(0)),
994 now: RefCell::new(Duration::from_secs(1)),
995 };
996
997 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
998
999 let mut endpoint: CoapEndpoint<
1000 '_,
1001 MockStack,
1002 Random,
1003 MyClock,
1004 8,
1005 32,
1006 128,
1007 > = CoapEndpoint::try_new(
1009 TransmissionParameters::default(),
1010 Random { value: 0 },
1011 &clock,
1012 &mut receive_buffer,
1013 )
1014 .unwrap();
1015
1016 let next_message_id = endpoint.message_id_counter.next();
1017 let mut token = Token::default();
1018 token.length = TokenLength::Eight;
1019 endpoint.rng.read(&mut token.bytes).unwrap();
1020 let request: Message<'_> = Message::new(
1021 Type::NonConfirmable,
1022 RequestCode::Get.into(),
1023 next_message_id,
1024 token,
1025 Vec::new(),
1026 Some(b"/hello"),
1027 );
1028
1029 stack.expect_socket().once().return_once(|| Ok(Socket));
1030 stack.expect_connect().once().return_once(|_, _| Ok(()));
1031
1032 endpoint
1033 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1034 .unwrap();
1035
1036 stack.expect_receive().once().return_once(move |_, buffer| {
1037 let encoded_message = request.encode(buffer).unwrap();
1038
1039 Ok((
1040 encoded_message.message_length(),
1041 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1042 ))
1043 });
1044
1045 assert!(matches!(
1046 endpoint.incoming_communication.state,
1047 IncomingState::Idle(_)
1048 ));
1049 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1050 assert!(matches!(
1051 incoming.unwrap(),
1052 IncomingEvent::Request(false, _)
1053 ));
1054 assert!(matches!(
1055 endpoint.incoming_communication.state,
1056 IncomingState::Received(false)
1057 ));
1058
1059 endpoint
1060 .incoming()
1061 .schedule_con_response(SuccessCode::Content.into(), Vec::new(), Some(b"world"))
1062 .unwrap();
1063
1064 stack
1065 .expect_receive()
1066 .returning(move |_, _| Err(nb::Error::WouldBlock));
1067 stack.expect_send().returning(|_, _| Ok(()));
1068
1069 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1070 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1071 assert!(matches!(
1072 endpoint.incoming_communication.state,
1073 IncomingState::SendingCon(_)
1074 ));
1075
1076 clock.advance(Duration::from_secs(3));
1078 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1079 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1080 assert!(matches!(
1081 endpoint.incoming_communication.state,
1082 IncomingState::SendingCon(_)
1083 ));
1084
1085 clock.advance(Duration::from_secs(5));
1086 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1087 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1088 assert!(matches!(
1089 endpoint.incoming_communication.state,
1090 IncomingState::SendingCon(_)
1091 ));
1092
1093 clock.advance(Duration::from_secs(9));
1094 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1095 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1096 assert!(matches!(
1097 endpoint.incoming_communication.state,
1098 IncomingState::SendingCon(_)
1099 ));
1100
1101 clock.advance(Duration::from_secs(17));
1102 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1103 assert!(matches!(incoming.unwrap(), IncomingEvent::SendCon));
1104 assert!(matches!(
1105 endpoint.incoming_communication.state,
1106 IncomingState::SendingCon(_)
1107 ));
1108
1109 clock.advance(Duration::from_secs(33));
1111 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1112 assert!(matches!(incoming.unwrap(), IncomingEvent::Timeout));
1113 assert!(matches!(
1114 endpoint.incoming_communication.state,
1115 IncomingState::Idle(_)
1116 ));
1117 }
1118
1119 #[test]
1120 fn receive_piggybacked_response() {
1121 let mut stack = MockStack::default();
1122
1123 let clock = MyClock {
1124 last_time: RefCell::new(Duration::from_secs(0)),
1125 now: RefCell::new(Duration::from_secs(1)),
1126 };
1127
1128 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1129
1130 let mut endpoint: CoapEndpoint<
1131 '_,
1132 MockStack,
1133 Random,
1134 MyClock,
1135 8,
1136 32,
1137 128,
1138 > = CoapEndpoint::try_new(
1140 TransmissionParameters::default(),
1141 Random { value: 0 },
1142 &clock,
1143 &mut receive_buffer,
1144 )
1145 .unwrap();
1146
1147 let message_id = endpoint.message_id_counter.next();
1148 let mut token = Token::default();
1149 token.length = TokenLength::Eight;
1150 endpoint.rng.read(&mut token.bytes).unwrap();
1151 let message: Message<'_> = Message::new(
1152 Type::Acknowledgement,
1153 SuccessCode::Content.into(),
1154 message_id,
1155 token,
1156 Vec::new(),
1157 Some(b"world"),
1158 );
1159
1160 stack.expect_socket().once().return_once(|| Ok(Socket));
1161 stack.expect_connect().once().return_once(|_, _| Ok(()));
1162
1163 endpoint
1164 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1165 .unwrap();
1166
1167 stack.expect_receive().once().return_once(move |_, buffer| {
1168 let encoded_message = message.encode(buffer).unwrap();
1169
1170 Ok((
1171 encoded_message.message_length(),
1172 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1173 ))
1174 });
1175 assert!(matches!(
1179 endpoint.incoming_communication.state,
1180 IncomingState::Idle(_)
1181 ));
1182 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1183 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1184 assert!(matches!(
1185 endpoint.incoming_communication.state,
1186 IncomingState::Idle(_)
1187 ));
1188 }
1189
1190 #[test]
1191 fn receive_ack() {
1192 let mut stack = MockStack::default();
1193
1194 let clock = MyClock {
1195 last_time: RefCell::new(Duration::from_secs(0)),
1196 now: RefCell::new(Duration::from_secs(1)),
1197 };
1198
1199 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1200
1201 let mut endpoint: CoapEndpoint<
1202 '_,
1203 MockStack,
1204 Random,
1205 MyClock,
1206 8,
1207 32,
1208 128,
1209 > = CoapEndpoint::try_new(
1211 TransmissionParameters::default(),
1212 Random { value: 0 },
1213 &clock,
1214 &mut receive_buffer,
1215 )
1216 .unwrap();
1217
1218 let message_id = endpoint.message_id_counter.next();
1219
1220 stack.expect_socket().once().return_once(|| Ok(Socket));
1221 stack.expect_connect().once().return_once(|_, _| Ok(()));
1222
1223 endpoint
1224 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1225 .unwrap();
1226
1227 stack.expect_receive().once().return_once(move |_, buffer| {
1228 let encoded_message = Message::<0>::new_ack(message_id).encode(buffer).unwrap();
1229
1230 Ok((
1231 encoded_message.message_length(),
1232 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1233 ))
1234 });
1235 assert!(matches!(
1239 endpoint.incoming_communication.state,
1240 IncomingState::Idle(_)
1241 ));
1242 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1243 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1244 assert!(matches!(
1245 endpoint.incoming_communication.state,
1246 IncomingState::Idle(_)
1247 ));
1248 }
1249
1250 #[test]
1251 fn receive_reset() {
1252 let mut stack = MockStack::default();
1253
1254 let clock = MyClock {
1255 last_time: RefCell::new(Duration::from_secs(0)),
1256 now: RefCell::new(Duration::from_secs(1)),
1257 };
1258
1259 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1260
1261 let mut endpoint: CoapEndpoint<
1262 '_,
1263 MockStack,
1264 Random,
1265 MyClock,
1266 8,
1267 32,
1268 128,
1269 > = CoapEndpoint::try_new(
1271 TransmissionParameters::default(),
1272 Random { value: 0 },
1273 &clock,
1274 &mut receive_buffer,
1275 )
1276 .unwrap();
1277
1278 let message_id = endpoint.message_id_counter.next();
1279 let message: Message<'_> = Message::new_rst(message_id);
1280
1281 stack.expect_socket().once().return_once(|| Ok(Socket));
1282 stack.expect_connect().once().return_once(|_, _| Ok(()));
1283
1284 endpoint
1285 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1286 .unwrap();
1287
1288 stack.expect_receive().once().return_once(move |_, buffer| {
1289 let encoded_message = message.encode(buffer).unwrap();
1290
1291 Ok((
1292 encoded_message.message_length(),
1293 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5683),
1294 ))
1295 });
1296 assert!(matches!(
1299 endpoint.incoming_communication.state,
1300 IncomingState::Idle(_)
1301 ));
1302 let (incoming, _outgoing, _endpoint) = endpoint.process(&mut stack).unwrap();
1303 assert!(matches!(incoming.unwrap(), IncomingEvent::Nothing));
1304 assert!(matches!(
1305 endpoint.incoming_communication.state,
1306 IncomingState::Idle(_)
1307 ));
1308 }
1309}