1use core::marker::PhantomData;
10use core::time::Duration;
11
12use embedded_nal::UdpClientStack;
13use embedded_timers::clock::{Clock, Instant};
14use heapless::Vec;
15
16use crate::message::{
17 codes::{Code, RequestCode, ResponseCode},
18 encoded_message::EncodedMessage,
19 options::{CoapOption, CoapOptionName},
20 token::Token,
21 Message, Type,
22};
23
24use super::{
25 error::Error, Connection, MessageBuffer, MessageIdentification, RetransmissionState,
26 RetransmissionTimeout, TransmissionParameters, Uri,
27};
28
29#[derive(Debug, Clone, Copy)]
31pub enum OutgoingState {
32 Idle(bool),
36 SendingCon(RetransmissionState, Duration),
40 AwaitingResponse(Instant),
43 SendingNon(RetransmissionState, bool, Duration),
54 SendingPing(RetransmissionState, bool, Duration),
60 SendingNotificationCon(RetransmissionState),
62 SendingNotificationNon,
64}
65
66#[derive(Debug)]
73pub enum OutgoingEvent<'a> {
74 Nothing,
77 SendCon,
79 SendNon,
81 SendPing,
83 AckReceived,
86 Success(EncodedMessage<'a>),
90 Timeout,
92 PiggybackedWrongToken,
94 ResetReceived,
97 DuplicatedResponse,
100 SendNotification,
104 NotificationAck,
107 NotificationRst(Token),
111 NotificationTimeout,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
116enum LastOutgoing {
117 Nothing,
118 Request(MessageIdentification),
119 Notification(MessageIdentification),
120}
121
122#[derive(Debug)]
125pub struct OutgoingCommunication<
126 'a,
127 CLOCK,
128 UDP: UdpClientStack,
129 const BUFFER_SIZE: usize,
130 const MAX_OPTION_COUNT: usize,
131 const MAX_OPTION_SIZE: usize,
132> where
133 CLOCK: Clock,
134 UDP: UdpClientStack,
135{
136 message_buffer: MessageBuffer<BUFFER_SIZE>,
141 state: OutgoingState,
142 clock: &'a CLOCK,
143 transmission_parameters: TransmissionParameters,
144 pub(super) next_message_id: Option<u16>,
148 pub(super) next_token: Option<Token>,
149 pub(super) next_random: Option<f32>,
153 last_outgoing: LastOutgoing,
156 _udp: PhantomData<UDP>,
157}
158
159impl<
160 'a,
161 CLOCK,
162 UDP,
163 const BUFFER_SIZE: usize,
164 const MAX_OPTION_COUNT: usize,
165 const MAX_OPTION_SIZE: usize,
166 > OutgoingCommunication<'a, CLOCK, UDP, BUFFER_SIZE, MAX_OPTION_COUNT, MAX_OPTION_SIZE>
167where
168 CLOCK: Clock,
169 UDP: UdpClientStack,
170{
171 pub fn new(clock: &'a CLOCK, transmission_parameters: TransmissionParameters) -> Self {
174 Self {
175 message_buffer: MessageBuffer::default(),
176 state: OutgoingState::Idle(false),
177 clock,
178 transmission_parameters,
179 next_message_id: None,
180 next_token: None,
181 next_random: None,
182 last_outgoing: LastOutgoing::Nothing,
183 _udp: PhantomData,
184 }
185 }
186
187 pub fn state(&self) -> OutgoingState {
191 self.state
192 }
193
194 pub fn reset(&mut self) {
196 *self = Self::new(self.clock, self.transmission_parameters);
197 }
198
199 fn finish_request(
206 &mut self,
207 response: Option<EncodedMessage>,
208 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
209 self.last_outgoing = LastOutgoing::Request(MessageIdentification {
210 id: self.message_buffer.message().unwrap().message_id(),
211 token: self.message_buffer.message().unwrap().token().unwrap(),
212 });
213 if let Some(response) = response {
214 self.message_buffer.replace_with(response)?;
215 self.state = OutgoingState::Idle(true);
216 } else {
217 self.state = OutgoingState::Idle(false);
218 }
219 Ok(())
220 }
221
222 fn finish_notification(&mut self) {
226 self.last_outgoing = LastOutgoing::Notification(MessageIdentification {
227 id: self.message_buffer.message().unwrap().message_id(),
228 token: self.message_buffer.message().unwrap().token().unwrap(),
229 });
230 self.state = OutgoingState::Idle(false);
231 }
232
233 pub fn response(&self) -> Result<EncodedMessage, Error<<UDP as UdpClientStack>::Error>> {
241 if let OutgoingState::Idle(true) = self.state {
242 Ok(self.message_buffer.message().unwrap())
243 } else {
244 Err(Error::Forbidden)
245 }
246 }
247
248 pub fn schedule_non(
254 &mut self,
255 code: RequestCode,
256 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
257 payload: Option<&[u8]>,
258 timeout: Duration,
259 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
260 self.encode_request(Type::NonConfirmable, code, options, payload)?;
261
262 let retransmission_state = RetransmissionState::new(
263 self.transmission_parameters,
264 self.next_random.take().unwrap(),
265 );
266 self.state = OutgoingState::SendingNon(retransmission_state, true, timeout);
267
268 Ok(())
269 }
270
271 pub fn schedule_con(
277 &mut self,
278 code: RequestCode,
279 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
280 payload: Option<&[u8]>,
281 timeout: Duration,
282 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
283 self.encode_request(Type::Confirmable, code, options, payload)?;
284 let retransmission_state = RetransmissionState::new(
285 self.transmission_parameters,
286 self.next_random.take().unwrap(),
287 );
288 self.state = OutgoingState::SendingCon(retransmission_state, timeout);
289 Ok(())
290 }
291
292 fn encode_request(
296 &mut self,
297 typ: Type,
298 code: RequestCode,
299 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
300 payload: Option<&[u8]>,
301 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
302 let token = self.next_token.unwrap();
304
305 self.encode_message(typ, code.into(), token, options, payload)?;
306
307 self.next_token = None;
308
309 Ok(())
310 }
311
312 pub fn schedule_notification(
317 &mut self,
318 confirmable: bool,
319 code: ResponseCode,
320 token: Token,
321 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
322 payload: Option<&[u8]>,
323 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
324 let typ = if confirmable {
325 Type::Confirmable
326 } else {
327 Type::NonConfirmable
328 };
329
330 self.encode_message(typ, code.into(), token, options, payload)?;
331
332 if confirmable {
333 let retransmission_state = RetransmissionState::new(
334 self.transmission_parameters,
335 self.next_random.take().unwrap(),
336 );
337 self.state = OutgoingState::SendingNotificationCon(retransmission_state);
338 } else {
339 self.state = OutgoingState::SendingNotificationNon;
340 }
341
342 Ok(())
343 }
344
345 fn encode_message(
349 &mut self,
350 typ: Type,
351 code: Code,
352 token: Token,
353 options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
354 payload: Option<&[u8]>,
355 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
356 if !matches!(self.state, OutgoingState::Idle(_)) {
357 return Err(Error::Busy);
358 }
359 let id = self.next_message_id.unwrap();
361
362 let message: Message<MAX_OPTION_COUNT> =
363 Message::new(typ, code, id, token, options, payload);
364
365 self.message_buffer.encode(message)?;
366
367 self.next_message_id = None;
368
369 Ok(())
370 }
371
372 pub fn schedule_ping(
374 &mut self,
375 timeout: Duration,
376 ) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
377 if !matches!(self.state, OutgoingState::Idle(_)) {
378 return Err(Error::Busy);
379 }
380
381 let message = Message::<0>::new_ping(self.next_message_id.unwrap());
382 self.message_buffer.encode(message)?;
383 self.next_message_id = None;
384
385 let retransmission_state = RetransmissionState::new(
386 self.transmission_parameters,
387 self.next_random.take().unwrap(),
388 );
389
390 self.state = OutgoingState::SendingPing(retransmission_state, true, timeout);
391
392 Ok(())
393 }
394
395 pub fn cancel(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
411 match self.state {
412 OutgoingState::Idle(_) => Err(Error::Forbidden),
413 OutgoingState::SendingNotificationCon(_) | OutgoingState::SendingNotificationNon => {
414 self.finish_notification();
415 Ok(())
416 }
417 _ => {
418 self.finish_request(None)?;
419 Ok(())
420 }
421 }
422 }
423
424 pub(crate) fn process_outgoing(
425 &mut self,
426 connection: &mut Connection<UDP>,
427 received: &mut Option<EncodedMessage>,
428 ) -> Result<OutgoingEvent, Error<<UDP as UdpClientStack>::Error>> {
429 use OutgoingEvent::*;
430 use OutgoingState::*;
431
432 if let OutgoingState::Idle(_) = self.state {
440 use LastOutgoing::{Notification, Request};
441 if let (Some(message), Request(last_request)) = (&received, self.last_outgoing) {
442 if message.is_response().unwrap() && last_request.token == message.token().unwrap()
445 {
446 if message.message_type() == Type::Confirmable {
447 connection.send(&EncodedMessage::ack(message.message_id()))?;
448 }
449 received.take();
450 return Ok(OutgoingEvent::DuplicatedResponse);
451 }
452 if matches!(message.message_type(), Type::Acknowledgement | Type::Reset)
454 && message.message_id() == last_request.id
455 {
456 received.take();
457 return Ok(OutgoingEvent::DuplicatedResponse);
458 }
459 }
460 if let (Some(message), Notification(last_notify)) = (&received, self.last_outgoing) {
461 if matches!(message.message_type(), Type::Acknowledgement)
462 && message.message_id() == last_notify.id
463 {
464 received.take();
465 return Ok(OutgoingEvent::DuplicatedResponse);
466 }
467 if matches!(message.message_type(), Type::Reset)
471 && message.message_id() == last_notify.id
472 {
473 received.take();
474 return Ok(OutgoingEvent::NotificationRst(last_notify.token));
475 }
476 }
477 }
478
479 match self.state {
480 Idle(_) => (), SendingCon(ref mut retransmission_state, duration) => {
482 if let Some(message) = received {
483 let out_message = self.message_buffer.message().unwrap();
484
485 if message.message_type() == Type::Acknowledgement
486 && message.message_id() == out_message.message_id()
487 {
488 if message.is_response().unwrap() {
489 if message.token().unwrap() == out_message.token().unwrap() {
490 self.finish_request(Some(received.take().unwrap()))?;
492 return Ok(Success(self.message_buffer.message().unwrap()));
493 } else {
494 return Ok(PiggybackedWrongToken);
496 }
497 } else {
498 let end = self.clock.try_now().unwrap() + duration;
500 self.state = AwaitingResponse(end);
501 return Ok(AckReceived);
502 }
503 } else if message.is_response().unwrap()
504 && message.token().unwrap() == out_message.token().unwrap()
505 {
506 if message.message_type() == Type::Confirmable {
509 connection.send(&EncodedMessage::ack(message.message_id()))?;
510 }
511 self.finish_request(Some(received.take().unwrap()))?;
512 return Ok(Success(self.message_buffer.message().unwrap()));
513 } else if message.message_type() == Type::Reset
514 && message.message_id() == out_message.message_id()
515 {
516 self.finish_request(None)?;
517 return Ok(ResetReceived);
518 }
519 }
520 match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
521 Ok(true) => {
522 connection.send(self.message_buffer.message().unwrap().data)?;
523 return Ok(SendCon);
524 }
525 Ok(false) => {}
526 Err(RetransmissionTimeout) => {
527 self.finish_request(None)?;
528 return Ok(Timeout);
529 }
530 }
531 }
532 AwaitingResponse(end) => {
533 if let Some(message) = received {
534 let out_message = self.message_buffer.message().unwrap();
535 if message.is_response().unwrap()
536 && message.token().unwrap() == out_message.token().unwrap()
537 {
538 if message.message_type() == Type::Confirmable {
540 connection.send(&EncodedMessage::ack(message.message_id()))?;
541 }
542 self.finish_request(Some(received.take().unwrap()))?;
543 return Ok(Success(self.message_buffer.message().unwrap()));
544 }
545 }
546 if self.clock.try_now().unwrap() > end {
547 self.finish_request(None)?;
548 return Ok(Timeout);
549 }
550 }
551 SendingNon(ref mut retransmission_state, ref mut retransmit_requested, timeout)
554 | SendingPing(ref mut retransmission_state, ref mut retransmit_requested, timeout) => {
555 if let Some(message) = received {
556 let out_message = self.message_buffer.message().unwrap();
557
558 if message.message_type() == Type::Reset
559 && message.message_id() == out_message.message_id()
560 {
561 if matches!(self.state, SendingNon(..)) {
562 self.finish_request(None)?;
563 return Ok(ResetReceived);
564 } else {
565 self.finish_request(Some(received.take().unwrap()))?;
566 return Ok(Success(self.message_buffer.message().unwrap()));
567 }
568 } else if message.is_response().unwrap()
569 && message.token().unwrap() == out_message.token().unwrap()
570 {
571 if matches!(self.state, SendingNon(..)) {
572 if message.message_type() == Type::Confirmable {
573 let mut ack_buf = [0_u8; 4];
574 let _ack =
575 EncodedMessage::new_ack(message.message_id(), &mut ack_buf);
576 connection.send(&ack_buf)?
577 }
578 self.finish_request(Some(received.take().unwrap()))?;
579 return Ok(Success(self.message_buffer.message().unwrap()));
580 } else {
581 return Ok(Nothing);
590 }
591 }
592 }
593 if *retransmit_requested {
594 match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
595 Ok(true) => {
596 *retransmit_requested = false;
597 connection.send(self.message_buffer.message().unwrap().data)?;
598 if matches!(self.state, SendingNon(..)) {
599 return Ok(OutgoingEvent::SendNon);
600 } else {
601 return Ok(OutgoingEvent::SendPing);
602 }
603 }
604 Ok(false) => {}
605 Err(RetransmissionTimeout) => {
606 self.finish_request(None)?;
607 return Ok(Timeout);
608 }
609 }
610 }
611 let waiting_time = self.clock.try_now().unwrap()
612 - retransmission_state.last_transmission_instant.unwrap();
613 if waiting_time > timeout {
614 self.finish_request(None)?;
615 return Ok(Timeout);
616 }
617 }
618 SendingNotificationCon(ref mut retransmission_state) => {
619 if let Some(message) = received {
620 let out_message = self.message_buffer.message().unwrap();
621
622 if message.message_type() == Type::Acknowledgement
623 && message.message_id() == out_message.message_id()
624 {
625 self.finish_notification();
626 return Ok(NotificationAck);
627 } else if message.message_type() == Type::Reset
628 && message.message_id() == out_message.message_id()
629 {
630 let token = out_message.token().unwrap();
631 self.finish_notification();
632 return Ok(NotificationRst(token));
633 }
634 }
635 match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
636 Ok(true) => {
637 connection.send(self.message_buffer.message().unwrap().data)?;
638 return Ok(SendNotification);
639 }
640 Ok(false) => {}
641 Err(RetransmissionTimeout) => {
642 self.finish_notification();
643 return Ok(NotificationTimeout);
644 }
645 }
646 }
647 SendingNotificationNon => {
648 connection.send(self.message_buffer.message().unwrap().data)?;
649 self.finish_notification();
650 return Ok(SendNotification);
651 }
652 }
653
654 Ok(Nothing)
655 }
656
657 pub fn parse_options<'options>(
671 &self,
672 url: &'options str,
673 additional_options: Vec<CoapOption<'options>, MAX_OPTION_COUNT>,
674 ) -> Result<Vec<CoapOption<'options>, MAX_OPTION_COUNT>, Error<<UDP as UdpClientStack>::Error>>
675 {
676 let mut options = Vec::<CoapOption<'_>, MAX_OPTION_COUNT>::new();
684
685 let uri = Uri::new(url).unwrap();
686 let path = uri.path_str();
687
688 for path in path.split('/') {
689 if !path.is_empty() {
690 options
692 .push(CoapOption {
693 name: CoapOptionName::UriPath,
694 value: path.as_bytes(),
695 })
696 .map_err(|_| Error::OutOfMemory)?;
697 }
698 }
699
700 if let Some(queries) = uri.query_str() {
701 for query in queries.split('&') {
702 options
703 .push(CoapOption {
704 name: CoapOptionName::UriQuery,
705 value: query.as_bytes(),
707 })
708 .map_err(|_| Error::OutOfMemory)?;
709 }
710 };
711
712 for option in additional_options {
713 options.push(option).map_err(|_| Error::OutOfMemory)?;
714 }
715
716 Ok(options)
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use core::{cell::RefCell, time::Duration};
723
724 use embedded_nal::{SocketAddr, UdpClientStack};
725 use heapless::Vec;
726 use mockall::predicate::*;
727 use mockall::*;
728
729 use crate::{
730 endpoint::outgoing::{OutgoingEvent, OutgoingState},
731 message::{
732 codes::{RequestCode, SuccessCode},
733 Message, Type,
734 },
735 };
736
737 use super::{super::CoapEndpoint, TransmissionParameters};
738
739 #[derive(Debug)]
740 struct Random {
741 value: u128,
742 }
743
744 impl embedded_hal::blocking::rng::Read for Random {
745 type Error = std::io::Error;
746
747 fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
748 self.value += 1;
749
750 let buf_len = buf.len();
751
752 buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
753
754 Ok(())
755 }
756 }
757
758 #[derive(Debug)]
759 struct StackError;
760
761 struct Socket;
762
763 mock! {
764 Stack {}
765
766 impl UdpClientStack for Stack {
767 type UdpSocket = Socket;
768 type Error = StackError;
769
770 fn socket(&mut self) -> Result<Socket, StackError>;
771 fn connect(
772 &mut self,
773 socket: &mut Socket,
774 remote: SocketAddr
775 ) -> Result<(), StackError>;
776 fn send(
777 &mut self,
778 socket: &mut Socket,
779 buffer: &[u8]
780 ) -> Result<(), nb::Error<StackError>>;
781 fn receive(
782 &mut self,
783 socket: &mut Socket,
784 buffer: &mut [u8]
785 ) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
786 fn close(&mut self, socket: Socket) -> Result<(), StackError>;
787 }
788 }
789
790 #[derive(Debug)]
791 struct MyClock {
792 last_time: RefCell<Duration>,
793 now: RefCell<Duration>,
794 }
795
796 impl embedded_timers::clock::Clock for MyClock {
797 fn try_now(
798 &self,
799 ) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
800 *self.last_time.borrow_mut() = *self.now.borrow();
801 Ok(*self.now.borrow())
802 }
803 }
804
805 impl MyClock {
806 fn advance(&self, step: Duration) {
807 *self.now.borrow_mut() = *self.last_time.borrow() + step;
808 }
809 }
810
811 #[test]
812 fn send_ping() {
813 let mut stack = MockStack::default();
814
815 let clock = MyClock {
816 last_time: RefCell::new(Duration::from_secs(0)),
817 now: RefCell::new(Duration::from_secs(1)),
818 };
819
820 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
821
822 let mut endpoint: CoapEndpoint<
823 '_,
824 MockStack,
825 Random,
826 MyClock,
827 8,
828 32,
829 128,
830 > = CoapEndpoint::try_new(
832 TransmissionParameters::default(),
833 Random { value: 0 },
834 &clock,
835 &mut receive_buffer,
836 )
837 .unwrap();
838
839 stack.expect_socket().once().return_once(|| Ok(Socket));
840 stack.expect_connect().once().return_once(|_, _| Ok(()));
841
842 endpoint
843 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
844 .unwrap();
845
846 stack
847 .expect_receive()
848 .once()
849 .return_once(|_, _| Err(nb::Error::WouldBlock));
850
851 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
852 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
853 assert!(matches!(
854 endpoint.outgoing_communication.state,
855 OutgoingState::Idle(_)
856 ));
857
858 let message_id = endpoint.outgoing_communication.next_message_id.unwrap();
859 let ping_buf = crate::message::encoded_message::EncodedMessage::ping(message_id);
860 let pong: Message<'_> = Message::new_rst(message_id);
861
862 stack
863 .expect_receive()
864 .once()
865 .return_once(move |_, _| Err(nb::Error::WouldBlock));
866 stack.expect_send().once().return_once(move |_, buf| {
867 assert_eq!(buf, ping_buf);
868 Ok(())
869 });
870
871 endpoint
872 .outgoing()
873 .schedule_ping(Duration::from_secs(1))
874 .unwrap();
875
876 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
877 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendPing));
878 assert!(matches!(
879 endpoint.outgoing_communication.state,
880 OutgoingState::SendingPing(..)
881 ));
882
883 stack.expect_receive().once().return_once(move |_, buffer| {
884 let encoded_message = pong.encode(buffer).unwrap();
885
886 Ok((
887 encoded_message.message_length(),
888 "127.0.0.1:5683".parse().unwrap(),
889 ))
890 });
891
892 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
893 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Success(_)));
894 assert!(matches!(
895 endpoint.outgoing_communication.state,
896 OutgoingState::Idle(true)
897 ));
898 }
899
900 #[test]
901 fn send_con_get_piggybacked() {
902 let mut stack = MockStack::default();
903
904 let clock = MyClock {
905 last_time: RefCell::new(Duration::from_secs(0)),
906 now: RefCell::new(Duration::from_secs(1)),
907 };
908
909 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
910
911 let mut endpoint: CoapEndpoint<
912 '_,
913 MockStack,
914 Random,
915 MyClock,
916 8,
917 32,
918 128,
919 > = CoapEndpoint::try_new(
921 TransmissionParameters::default(),
922 Random { value: 0 },
923 &clock,
924 &mut receive_buffer,
925 )
926 .unwrap();
927
928 stack.expect_socket().once().return_once(|| Ok(Socket));
929 stack.expect_connect().once().return_once(|_, _| Ok(()));
930
931 endpoint
932 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
933 .unwrap();
934
935 stack
936 .expect_receive()
937 .once()
938 .return_once(|_, _| Err(nb::Error::WouldBlock));
939
940 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
941 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
942 assert!(matches!(
943 endpoint.outgoing_communication.state,
944 OutgoingState::Idle(_)
945 ));
946
947 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
948 let request_token = endpoint.outgoing_communication.next_token.unwrap();
949 let request: Message<'_> = Message::new(
950 Type::Confirmable,
951 RequestCode::Get.into(),
952 request_id,
953 request_token,
954 Vec::new(),
955 Some(b"/hello"),
956 );
957 let mut request_buf = [0_u8; 32];
958 let request_length = request.encode(&mut request_buf).unwrap().message_length();
959 let response: Message<'_> = Message::new(
960 Type::Acknowledgement,
961 SuccessCode::Content.into(),
962 request_id,
963 request_token,
964 Vec::new(),
965 Some(b"world"),
966 );
967 let mut response_buf = [0_u8; 32];
968 let response_length = response.encode(&mut response_buf).unwrap().message_length();
969
970 endpoint
971 .outgoing()
972 .schedule_con(
973 RequestCode::Get,
974 Vec::new(),
975 Some(b"/hello"),
976 Duration::from_secs(1),
977 )
978 .unwrap();
979
980 stack
981 .expect_receive()
982 .once()
983 .return_once(|_, _| Err(nb::Error::WouldBlock));
984 stack.expect_send().once().return_once(move |_, buffer| {
985 assert_eq!(buffer, &request_buf[..request_length]);
986
987 Ok(())
988 });
989
990 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
991 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
992 assert!(matches!(
993 endpoint.outgoing_communication.state,
994 OutgoingState::SendingCon(..)
995 ));
996
997 stack.expect_receive().once().return_once(move |_, buffer| {
998 buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
999
1000 Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1001 });
1002
1003 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1004 let outgoing = outgoing.unwrap();
1005 assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1006 match outgoing {
1007 OutgoingEvent::Success(message) => {
1008 assert_eq!(message, response.encode(&mut response_buf).unwrap());
1009 }
1010 _ => (),
1011 }
1012 assert!(matches!(
1013 endpoint.outgoing_communication.state,
1014 OutgoingState::Idle(_)
1015 ));
1016 }
1017
1018 #[test]
1019 fn send_con_get_separate() {
1020 let mut stack = MockStack::default();
1021
1022 let clock = MyClock {
1023 last_time: RefCell::new(Duration::from_secs(0)),
1024 now: RefCell::new(Duration::from_secs(1)),
1025 };
1026
1027 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1028
1029 let mut endpoint: CoapEndpoint<
1030 '_,
1031 MockStack,
1032 Random,
1033 MyClock,
1034 8,
1035 32,
1036 128,
1037 > = CoapEndpoint::try_new(
1039 TransmissionParameters::default(),
1040 Random { value: 0 },
1041 &clock,
1042 &mut receive_buffer,
1043 )
1044 .unwrap();
1045
1046 stack.expect_socket().once().return_once(|| Ok(Socket));
1047 stack.expect_connect().once().return_once(|_, _| Ok(()));
1048
1049 endpoint
1050 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1051 .unwrap();
1052
1053 stack
1054 .expect_receive()
1055 .once()
1056 .return_once(|_, _| Err(nb::Error::WouldBlock));
1057
1058 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1059 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1060 assert!(matches!(
1061 endpoint.outgoing_communication.state,
1062 OutgoingState::Idle(_)
1063 ));
1064
1065 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1066 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1067 let request: Message<'_> = Message::new(
1068 Type::Confirmable,
1069 RequestCode::Get.into(),
1070 request_id,
1071 request_token,
1072 Vec::new(),
1073 Some(b"/hello"),
1074 );
1075 let mut request_buf = [0_u8; 32];
1076 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1077
1078 let ack: Message<'_> = Message::new_ack(request_id);
1079 let mut ack_buf = [0_u8; 4];
1080 let ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
1081
1082 let response: Message<'_> = Message::new(
1083 Type::Acknowledgement,
1084 SuccessCode::Content.into(),
1085 request_id,
1086 request_token,
1087 Vec::new(),
1088 Some(b"world"),
1089 );
1090 let mut response_buf = [0_u8; 32];
1091 let response_length = response.encode(&mut response_buf).unwrap().message_length();
1092
1093 endpoint
1094 .outgoing()
1095 .schedule_con(
1096 RequestCode::Get,
1097 Vec::new(),
1098 Some(b"/hello"),
1099 Duration::from_secs(1),
1100 )
1101 .unwrap();
1102
1103 stack
1104 .expect_receive()
1105 .once()
1106 .return_once(|_, _| Err(nb::Error::WouldBlock));
1107 stack.expect_send().once().return_once(move |_, buffer| {
1108 assert_eq!(buffer, &request_buf[..request_length]);
1109
1110 Ok(())
1111 });
1112
1113 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1114 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1115 assert!(matches!(
1116 endpoint.outgoing_communication.state,
1117 OutgoingState::SendingCon(..)
1118 ));
1119
1120 stack.expect_receive().once().return_once(move |_, buffer| {
1121 buffer[..ack_length].copy_from_slice(&ack_buf[..ack_length]);
1122
1123 Ok((ack_length, "127.0.0.1:5683".parse().unwrap()))
1124 });
1125
1126 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1127 assert!(matches!(outgoing.unwrap(), OutgoingEvent::AckReceived));
1128 assert!(matches!(
1129 endpoint.outgoing_communication.state,
1130 OutgoingState::AwaitingResponse(..)
1131 ));
1132
1133 stack.expect_receive().once().return_once(move |_, buffer| {
1134 buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1135
1136 Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1137 });
1138
1139 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1140 let outgoing = outgoing.unwrap();
1141 assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1142 match outgoing {
1143 OutgoingEvent::Success(message) => {
1144 assert_eq!(message, response.encode(&mut response_buf).unwrap());
1145 }
1146 _ => (),
1147 }
1148 assert!(matches!(
1149 endpoint.outgoing_communication.state,
1150 OutgoingState::Idle(_)
1151 ));
1152 }
1153
1154 #[test]
1155 fn send_non_get_non() {
1156 let mut stack = MockStack::default();
1157
1158 let clock = MyClock {
1159 last_time: RefCell::new(Duration::from_secs(0)),
1160 now: RefCell::new(Duration::from_secs(1)),
1161 };
1162
1163 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1164
1165 let mut endpoint: CoapEndpoint<
1166 '_,
1167 MockStack,
1168 Random,
1169 MyClock,
1170 8,
1171 32,
1172 128,
1173 > = CoapEndpoint::try_new(
1175 TransmissionParameters::default(),
1176 Random { value: 0 },
1177 &clock,
1178 &mut receive_buffer,
1179 )
1180 .unwrap();
1181
1182 stack.expect_socket().once().return_once(|| Ok(Socket));
1183 stack.expect_connect().once().return_once(|_, _| Ok(()));
1184
1185 endpoint
1186 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1187 .unwrap();
1188
1189 stack
1190 .expect_receive()
1191 .once()
1192 .return_once(|_, _| Err(nb::Error::WouldBlock));
1193
1194 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1195 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1196 assert!(matches!(
1197 endpoint.outgoing_communication.state,
1198 OutgoingState::Idle(_)
1199 ));
1200
1201 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1202 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1203 let request: Message<'_> = Message::new(
1204 Type::NonConfirmable,
1205 RequestCode::Get.into(),
1206 request_id,
1207 request_token,
1208 Vec::new(),
1209 Some(b"/hello"),
1210 );
1211 let mut request_buf = [0_u8; 32];
1212 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1213
1214 let response: Message<'_> = Message::new(
1215 Type::NonConfirmable,
1216 SuccessCode::Content.into(),
1217 request_id + 1,
1218 request_token,
1219 Vec::new(),
1220 Some(b"world"),
1221 );
1222 let mut response_buf = [0_u8; 32];
1223 let response_length = response.encode(&mut response_buf).unwrap().message_length();
1224
1225 endpoint
1226 .outgoing()
1227 .schedule_non(
1228 RequestCode::Get,
1229 Vec::new(),
1230 Some(b"/hello"),
1231 Duration::from_secs(1),
1232 )
1233 .unwrap();
1234
1235 stack
1236 .expect_receive()
1237 .once()
1238 .return_once(|_, _| Err(nb::Error::WouldBlock));
1239 stack.expect_send().once().return_once(move |_, buffer| {
1240 assert_eq!(buffer, &request_buf[..request_length]);
1241
1242 Ok(())
1243 });
1244
1245 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1246 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
1247 assert!(matches!(
1248 endpoint.outgoing_communication.state,
1249 OutgoingState::SendingNon(..)
1250 ));
1251
1252 stack
1253 .expect_receive()
1254 .once()
1255 .return_once(|_, _| Err(nb::Error::WouldBlock));
1256
1257 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1258 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1259 assert!(matches!(
1260 endpoint.outgoing_communication.state,
1261 OutgoingState::SendingNon(..)
1262 ));
1263
1264 stack.expect_receive().once().return_once(move |_, buffer| {
1265 buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1266
1267 Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1268 });
1269
1270 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1271 let outgoing = outgoing.unwrap();
1272 assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1273 match outgoing {
1274 OutgoingEvent::Success(message) => {
1275 assert_eq!(message, response.encode(&mut response_buf).unwrap());
1276 }
1277 _ => (),
1278 }
1279 assert!(matches!(
1280 endpoint.outgoing_communication.state,
1281 OutgoingState::Idle(_)
1282 ));
1283 }
1284
1285 #[test]
1286 fn send_non_get_con() {
1287 let mut stack = MockStack::default();
1288
1289 let clock = MyClock {
1290 last_time: RefCell::new(Duration::from_secs(0)),
1291 now: RefCell::new(Duration::from_secs(1)),
1292 };
1293
1294 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1295
1296 let mut endpoint: CoapEndpoint<
1297 '_,
1298 MockStack,
1299 Random,
1300 MyClock,
1301 8,
1302 32,
1303 128,
1304 > = CoapEndpoint::try_new(
1306 TransmissionParameters::default(),
1307 Random { value: 0 },
1308 &clock,
1309 &mut receive_buffer,
1310 )
1311 .unwrap();
1312
1313 stack.expect_socket().once().return_once(|| Ok(Socket));
1314 stack.expect_connect().once().return_once(|_, _| Ok(()));
1315
1316 endpoint
1317 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1318 .unwrap();
1319
1320 stack
1321 .expect_receive()
1322 .once()
1323 .return_once(|_, _| Err(nb::Error::WouldBlock));
1324
1325 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1326 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1327 assert!(matches!(
1328 endpoint.outgoing_communication.state,
1329 OutgoingState::Idle(_)
1330 ));
1331
1332 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1333 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1334 let request: Message<'_> = Message::new(
1335 Type::NonConfirmable,
1336 RequestCode::Get.into(),
1337 request_id,
1338 request_token,
1339 Vec::new(),
1340 Some(b"/hello"),
1341 );
1342 let mut request_buf = [0_u8; 32];
1343 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1344
1345 let response_id = request_id + 1;
1346 let response: Message<'_> = Message::new(
1347 Type::Confirmable,
1348 SuccessCode::Content.into(),
1349 response_id,
1350 request_token,
1351 Vec::new(),
1352 Some(b"world"),
1353 );
1354 let mut response_buf = [0_u8; 32];
1355 let response_length = response.encode(&mut response_buf).unwrap().message_length();
1356
1357 let ack: Message<'_> = Message::new_ack(response_id);
1358 let mut ack_buf = [0_u8; 4];
1359 let _ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
1360
1361 endpoint
1362 .outgoing()
1363 .schedule_non(
1364 RequestCode::Get,
1365 Vec::new(),
1366 Some(b"/hello"),
1367 Duration::from_secs(1),
1368 )
1369 .unwrap();
1370
1371 stack
1372 .expect_receive()
1373 .once()
1374 .return_once(|_, _| Err(nb::Error::WouldBlock));
1375 stack.expect_send().once().return_once(move |_, buffer| {
1376 assert_eq!(buffer, &request_buf[..request_length]);
1377
1378 Ok(())
1379 });
1380
1381 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1382 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
1383 assert!(matches!(
1384 endpoint.outgoing_communication.state,
1385 OutgoingState::SendingNon(..)
1386 ));
1387
1388 stack
1389 .expect_receive()
1390 .once()
1391 .return_once(|_, _| Err(nb::Error::WouldBlock));
1392
1393 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1394 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1395 assert!(matches!(
1396 endpoint.outgoing_communication.state,
1397 OutgoingState::SendingNon(..)
1398 ));
1399
1400 stack.expect_receive().once().return_once(move |_, buffer| {
1401 buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1402
1403 Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1404 });
1405 stack.expect_send().once().return_once(move |_, buffer| {
1406 assert_eq!(buffer, ack_buf);
1407
1408 Ok(())
1409 });
1410
1411 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1412 let outgoing = outgoing.unwrap();
1413 assert!(matches!(outgoing, OutgoingEvent::Success(_)));
1414 match outgoing {
1415 OutgoingEvent::Success(message) => {
1416 assert_eq!(message, response.encode(&mut response_buf).unwrap());
1417 }
1418 _ => (),
1419 }
1420 assert!(matches!(
1421 endpoint.outgoing_communication.state,
1422 OutgoingState::Idle(_)
1423 ));
1424 }
1425
1426 #[test]
1427 fn send_con_get_rst() {
1428 let mut stack = MockStack::default();
1429
1430 let clock = MyClock {
1431 last_time: RefCell::new(Duration::from_secs(0)),
1432 now: RefCell::new(Duration::from_secs(1)),
1433 };
1434
1435 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1436
1437 let mut endpoint: CoapEndpoint<
1438 '_,
1439 MockStack,
1440 Random,
1441 MyClock,
1442 8,
1443 32,
1444 128,
1445 > = CoapEndpoint::try_new(
1447 TransmissionParameters::default(),
1448 Random { value: 0 },
1449 &clock,
1450 &mut receive_buffer,
1451 )
1452 .unwrap();
1453
1454 stack.expect_socket().once().return_once(|| Ok(Socket));
1455 stack.expect_connect().once().return_once(|_, _| Ok(()));
1456
1457 endpoint
1458 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1459 .unwrap();
1460
1461 stack
1462 .expect_receive()
1463 .once()
1464 .return_once(|_, _| Err(nb::Error::WouldBlock));
1465
1466 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1467 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1468 assert!(matches!(
1469 endpoint.outgoing_communication.state,
1470 OutgoingState::Idle(_)
1471 ));
1472
1473 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1474 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1475 let request: Message<'_> = Message::new(
1476 Type::Confirmable,
1477 RequestCode::Get.into(),
1478 request_id,
1479 request_token,
1480 Vec::new(),
1481 Some(b"/hello"),
1482 );
1483 let mut request_buf = [0_u8; 32];
1484 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1485
1486 let reset: Message<'_> = Message::new_rst(request_id);
1487 let mut reset_buf = [0_u8; 4];
1488 let reset_length = reset.encode(&mut reset_buf).unwrap().message_length();
1489
1490 endpoint
1491 .outgoing()
1492 .schedule_con(
1493 RequestCode::Get,
1494 Vec::new(),
1495 Some(b"/hello"),
1496 Duration::from_secs(1),
1497 )
1498 .unwrap();
1499
1500 stack
1501 .expect_receive()
1502 .once()
1503 .return_once(|_, _| Err(nb::Error::WouldBlock));
1504 stack.expect_send().once().return_once(move |_, buffer| {
1505 assert_eq!(buffer, &request_buf[..request_length]);
1506
1507 Ok(())
1508 });
1509
1510 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1511 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1512 assert!(matches!(
1513 endpoint.outgoing_communication.state,
1514 OutgoingState::SendingCon(..)
1515 ));
1516
1517 stack
1518 .expect_receive()
1519 .once()
1520 .return_once(|_, _| Err(nb::Error::WouldBlock));
1521
1522 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1523 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1524 assert!(matches!(
1525 endpoint.outgoing_communication.state,
1526 OutgoingState::SendingCon(..)
1527 ));
1528
1529 stack.expect_receive().once().return_once(move |_, buffer| {
1530 buffer[..reset_length].copy_from_slice(&reset_buf[..reset_length]);
1531
1532 Ok((reset_length, "127.0.0.1:5683".parse().unwrap()))
1533 });
1534
1535 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1536 let outgoing = outgoing.unwrap();
1537 assert!(matches!(outgoing, OutgoingEvent::ResetReceived));
1538 assert!(matches!(
1539 endpoint.outgoing_communication.state,
1540 OutgoingState::Idle(_)
1541 ));
1542 }
1543
1544 #[test]
1545 fn send_con_get_piggybacked_wrong_token() {
1546 let mut stack = MockStack::default();
1547
1548 let clock = MyClock {
1549 last_time: RefCell::new(Duration::from_secs(0)),
1550 now: RefCell::new(Duration::from_secs(1)),
1551 };
1552
1553 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1554
1555 let mut endpoint: CoapEndpoint<
1556 '_,
1557 MockStack,
1558 Random,
1559 MyClock,
1560 8,
1561 32,
1562 128,
1563 > = CoapEndpoint::try_new(
1565 TransmissionParameters::default(),
1566 Random { value: 0 },
1567 &clock,
1568 &mut receive_buffer,
1569 )
1570 .unwrap();
1571
1572 stack.expect_socket().once().return_once(|| Ok(Socket));
1573 stack.expect_connect().once().return_once(|_, _| Ok(()));
1574
1575 endpoint
1576 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1577 .unwrap();
1578
1579 stack
1580 .expect_receive()
1581 .once()
1582 .return_once(|_, _| Err(nb::Error::WouldBlock));
1583
1584 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1585 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1586 assert!(matches!(
1587 endpoint.outgoing_communication.state,
1588 OutgoingState::Idle(_)
1589 ));
1590
1591 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1592 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1593 let request: Message<'_> = Message::new(
1594 Type::Confirmable,
1595 RequestCode::Get.into(),
1596 request_id,
1597 request_token,
1598 Vec::new(),
1599 Some(b"/hello"),
1600 );
1601 let mut request_buf = [0_u8; 32];
1602 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1603
1604 let mut response_token = request_token;
1606 response_token.bytes[0] += 1;
1607 let response: Message<'_> = Message::new(
1608 Type::Acknowledgement,
1609 SuccessCode::Content.into(),
1610 request_id,
1611 response_token,
1612 Vec::new(),
1613 Some(b"world"),
1614 );
1615 let mut response_buf = [0_u8; 32];
1616 let response_length = response.encode(&mut response_buf).unwrap().message_length();
1617
1618 endpoint
1619 .outgoing()
1620 .schedule_con(
1621 RequestCode::Get,
1622 Vec::new(),
1623 Some(b"/hello"),
1624 Duration::from_secs(1),
1625 )
1626 .unwrap();
1627
1628 stack
1629 .expect_receive()
1630 .once()
1631 .return_once(|_, _| Err(nb::Error::WouldBlock));
1632 stack.expect_send().returning(move |_, buffer| {
1633 assert_eq!(buffer, &request_buf[..request_length]);
1634
1635 Ok(())
1636 });
1637
1638 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1639 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1640 assert!(matches!(
1641 endpoint.outgoing_communication.state,
1642 OutgoingState::SendingCon(..)
1643 ));
1644
1645 stack.expect_receive().once().return_once(move |_, buffer| {
1646 buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
1647
1648 Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
1649 });
1650
1651 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1652 let outgoing = outgoing.unwrap();
1653 assert!(matches!(outgoing, OutgoingEvent::PiggybackedWrongToken));
1654 assert!(matches!(
1655 endpoint.outgoing_communication.state,
1656 OutgoingState::SendingCon(..)
1657 ));
1658
1659 endpoint.outgoing().cancel().unwrap();
1660 assert!(matches!(
1661 endpoint.outgoing_communication.state,
1662 OutgoingState::Idle(false)
1663 ));
1664 }
1665
1666 #[test]
1667 fn send_con_get_timeout() {
1668 let mut stack = MockStack::default();
1669
1670 let clock = MyClock {
1671 last_time: RefCell::new(Duration::from_secs(0)),
1672 now: RefCell::new(Duration::from_secs(1)),
1673 };
1674
1675 let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
1676
1677 let mut endpoint: CoapEndpoint<
1678 '_,
1679 MockStack,
1680 Random,
1681 MyClock,
1682 8,
1683 32,
1684 128,
1685 > = CoapEndpoint::try_new(
1687 TransmissionParameters::default(),
1688 Random { value: 0 },
1689 &clock,
1690 &mut receive_buffer,
1691 )
1692 .unwrap();
1693
1694 stack.expect_socket().once().return_once(|| Ok(Socket));
1695 stack.expect_connect().once().return_once(|_, _| Ok(()));
1696
1697 endpoint
1698 .connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
1699 .unwrap();
1700
1701 stack
1702 .expect_receive()
1703 .once()
1704 .return_once(|_, _| Err(nb::Error::WouldBlock));
1705
1706 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1707 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1708 assert!(matches!(
1709 endpoint.outgoing_communication.state,
1710 OutgoingState::Idle(_)
1711 ));
1712
1713 let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
1714 let request_token = endpoint.outgoing_communication.next_token.unwrap();
1715 let request: Message<'_> = Message::new(
1716 Type::Confirmable,
1717 RequestCode::Get.into(),
1718 request_id,
1719 request_token,
1720 Vec::new(),
1721 Some(b"/hello"),
1722 );
1723 let mut request_buf = [0_u8; 32];
1724 let request_length = request.encode(&mut request_buf).unwrap().message_length();
1725
1726 endpoint
1727 .outgoing()
1728 .schedule_con(
1729 RequestCode::Get,
1730 Vec::new(),
1731 Some(b"/hello"),
1732 Duration::from_secs(1),
1733 )
1734 .unwrap();
1735
1736 stack
1737 .expect_receive()
1738 .once()
1739 .return_once(|_, _| Err(nb::Error::WouldBlock));
1740 stack.expect_send().returning(move |_, buffer| {
1741 assert_eq!(buffer, &request_buf[..request_length]);
1742
1743 Ok(())
1744 });
1745
1746 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1747 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1748 assert!(matches!(
1749 endpoint.outgoing_communication.state,
1750 OutgoingState::SendingCon(..)
1751 ));
1752
1753 stack
1754 .expect_receive()
1755 .returning(|_, _| Err(nb::Error::WouldBlock));
1756
1757 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1758 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
1759 assert!(matches!(
1760 endpoint.outgoing_communication.state,
1761 OutgoingState::SendingCon(..)
1762 ));
1763
1764 clock.advance(Duration::from_secs(3));
1766 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1767 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1768 assert!(matches!(
1769 endpoint.outgoing_communication.state,
1770 OutgoingState::SendingCon(..)
1771 ));
1772
1773 clock.advance(Duration::from_secs(5));
1774 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1775 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1776 assert!(matches!(
1777 endpoint.outgoing_communication.state,
1778 OutgoingState::SendingCon(..)
1779 ));
1780
1781 clock.advance(Duration::from_secs(9));
1782 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1783 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1784 assert!(matches!(
1785 endpoint.outgoing_communication.state,
1786 OutgoingState::SendingCon(..)
1787 ));
1788
1789 clock.advance(Duration::from_secs(17));
1790 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1791 assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
1792 assert!(matches!(
1793 endpoint.outgoing_communication.state,
1794 OutgoingState::SendingCon(..)
1795 ));
1796
1797 clock.advance(Duration::from_secs(33));
1799 let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
1800 assert!(matches!(outgoing.unwrap(), OutgoingEvent::Timeout));
1801 assert!(matches!(
1802 endpoint.outgoing_communication.state,
1803 OutgoingState::Idle(false)
1804 ));
1805 }
1806}