use core::marker::PhantomData;
use core::time::Duration;
use embedded_nal::UdpClientStack;
use embedded_timers::clock::{Clock, Instant};
use heapless::Vec;
use crate::message::{
codes::{Code, RequestCode, ResponseCode},
encoded_message::EncodedMessage,
options::{CoapOption, CoapOptionName},
token::Token,
Message, Type,
};
use super::{
error::Error, Connection, MessageBuffer, MessageIdentification, RetransmissionState,
RetransmissionTimeout, TransmissionParameters, Uri,
};
#[derive(Debug, Clone, Copy)]
pub enum OutgoingState {
Idle(bool),
SendingCon(RetransmissionState, Duration),
AwaitingResponse(Instant),
SendingNon(RetransmissionState, bool, Duration),
SendingPing(RetransmissionState, bool, Duration),
SendingNotificationCon(RetransmissionState),
SendingNotificationNon,
}
#[derive(Debug)]
pub enum OutgoingEvent<'a> {
Nothing,
SendCon,
SendNon,
SendPing,
AckReceived,
Success(EncodedMessage<'a>),
Timeout,
PiggybackedWrongToken,
ResetReceived,
DuplicatedResponse,
SendNotification,
NotificationAck,
NotificationRst(Token),
NotificationTimeout,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum LastOutgoing {
Nothing,
Request(MessageIdentification),
Notification(MessageIdentification),
}
#[derive(Debug)]
pub struct OutgoingCommunication<
'a,
CLOCK,
UDP: UdpClientStack,
const BUFFER_SIZE: usize,
const MAX_OPTION_COUNT: usize,
const MAX_OPTION_SIZE: usize,
> where
CLOCK: Clock,
UDP: UdpClientStack,
{
message_buffer: MessageBuffer<BUFFER_SIZE>,
state: OutgoingState,
clock: &'a CLOCK,
transmission_parameters: TransmissionParameters,
pub(super) next_message_id: Option<u16>,
pub(super) next_token: Option<Token>,
pub(super) next_random: Option<f32>,
last_outgoing: LastOutgoing,
_udp: PhantomData<UDP>,
}
impl<
'a,
CLOCK,
UDP,
const BUFFER_SIZE: usize,
const MAX_OPTION_COUNT: usize,
const MAX_OPTION_SIZE: usize,
> OutgoingCommunication<'a, CLOCK, UDP, BUFFER_SIZE, MAX_OPTION_COUNT, MAX_OPTION_SIZE>
where
CLOCK: Clock,
UDP: UdpClientStack,
{
pub fn new(clock: &'a CLOCK, transmission_parameters: TransmissionParameters) -> Self {
Self {
message_buffer: MessageBuffer::default(),
state: OutgoingState::Idle(false),
clock,
transmission_parameters,
next_message_id: None,
next_token: None,
next_random: None,
last_outgoing: LastOutgoing::Nothing,
_udp: PhantomData,
}
}
pub fn state(&self) -> OutgoingState {
self.state
}
pub fn reset(&mut self) {
*self = Self::new(self.clock, self.transmission_parameters);
}
fn finish_request(
&mut self,
response: Option<EncodedMessage>,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
self.last_outgoing = LastOutgoing::Request(MessageIdentification {
id: self.message_buffer.message().unwrap().message_id(),
token: self.message_buffer.message().unwrap().token().unwrap(),
});
if let Some(response) = response {
self.message_buffer.replace_with(response)?;
self.state = OutgoingState::Idle(true);
} else {
self.state = OutgoingState::Idle(false);
}
Ok(())
}
fn finish_notification(&mut self) {
self.last_outgoing = LastOutgoing::Notification(MessageIdentification {
id: self.message_buffer.message().unwrap().message_id(),
token: self.message_buffer.message().unwrap().token().unwrap(),
});
self.state = OutgoingState::Idle(false);
}
pub fn response(&self) -> Result<EncodedMessage, Error<<UDP as UdpClientStack>::Error>> {
if let OutgoingState::Idle(true) = self.state {
Ok(self.message_buffer.message().unwrap())
} else {
Err(Error::Forbidden)
}
}
pub fn schedule_non(
&mut self,
code: RequestCode,
options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
payload: Option<&[u8]>,
timeout: Duration,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
self.encode_request(Type::NonConfirmable, code, options, payload)?;
let retransmission_state = RetransmissionState::new(
self.transmission_parameters,
self.next_random.take().unwrap(),
);
self.state = OutgoingState::SendingNon(retransmission_state, true, timeout);
Ok(())
}
pub fn schedule_con(
&mut self,
code: RequestCode,
options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
payload: Option<&[u8]>,
timeout: Duration,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
self.encode_request(Type::Confirmable, code, options, payload)?;
let retransmission_state = RetransmissionState::new(
self.transmission_parameters,
self.next_random.take().unwrap(),
);
self.state = OutgoingState::SendingCon(retransmission_state, timeout);
Ok(())
}
fn encode_request(
&mut self,
typ: Type,
code: RequestCode,
options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
payload: Option<&[u8]>,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
let token = self.next_token.unwrap();
self.encode_message(typ, code.into(), token, options, payload)?;
self.next_token = None;
Ok(())
}
pub fn schedule_notification(
&mut self,
confirmable: bool,
code: ResponseCode,
token: Token,
options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
payload: Option<&[u8]>,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
let typ = if confirmable {
Type::Confirmable
} else {
Type::NonConfirmable
};
self.encode_message(typ, code.into(), token, options, payload)?;
if confirmable {
let retransmission_state = RetransmissionState::new(
self.transmission_parameters,
self.next_random.take().unwrap(),
);
self.state = OutgoingState::SendingNotificationCon(retransmission_state);
} else {
self.state = OutgoingState::SendingNotificationNon;
}
Ok(())
}
fn encode_message(
&mut self,
typ: Type,
code: Code,
token: Token,
options: Vec<CoapOption<'_>, MAX_OPTION_COUNT>,
payload: Option<&[u8]>,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
if !matches!(self.state, OutgoingState::Idle(_)) {
return Err(Error::Busy);
}
let id = self.next_message_id.unwrap();
let message: Message<MAX_OPTION_COUNT> =
Message::new(typ, code, id, token, options, payload);
self.message_buffer.encode(message)?;
self.next_message_id = None;
Ok(())
}
pub fn schedule_ping(
&mut self,
timeout: Duration,
) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
if !matches!(self.state, OutgoingState::Idle(_)) {
return Err(Error::Busy);
}
let message = Message::<0>::new_ping(self.next_message_id.unwrap());
self.message_buffer.encode(message)?;
self.next_message_id = None;
let retransmission_state = RetransmissionState::new(
self.transmission_parameters,
self.next_random.take().unwrap(),
);
self.state = OutgoingState::SendingPing(retransmission_state, true, timeout);
Ok(())
}
pub fn cancel(&mut self) -> Result<(), Error<<UDP as UdpClientStack>::Error>> {
match self.state {
OutgoingState::Idle(_) => Err(Error::Forbidden),
OutgoingState::SendingNotificationCon(_) | OutgoingState::SendingNotificationNon => {
self.finish_notification();
Ok(())
}
_ => {
self.finish_request(None)?;
Ok(())
}
}
}
pub(crate) fn process_outgoing(
&mut self,
connection: &mut Connection<UDP>,
received: &mut Option<EncodedMessage>,
) -> Result<OutgoingEvent, Error<<UDP as UdpClientStack>::Error>> {
use OutgoingEvent::*;
use OutgoingState::*;
if let OutgoingState::Idle(_) = self.state {
use LastOutgoing::{Notification, Request};
if let (Some(message), Request(last_request)) = (&received, self.last_outgoing) {
if message.is_response().unwrap() && last_request.token == message.token().unwrap()
{
if message.message_type() == Type::Confirmable {
connection.send(&EncodedMessage::ack(message.message_id()))?;
}
received.take();
return Ok(OutgoingEvent::DuplicatedResponse);
}
if matches!(message.message_type(), Type::Acknowledgement | Type::Reset)
&& message.message_id() == last_request.id
{
received.take();
return Ok(OutgoingEvent::DuplicatedResponse);
}
}
if let (Some(message), Notification(last_notify)) = (&received, self.last_outgoing) {
if matches!(message.message_type(), Type::Acknowledgement)
&& message.message_id() == last_notify.id
{
received.take();
return Ok(OutgoingEvent::DuplicatedResponse);
}
if matches!(message.message_type(), Type::Reset)
&& message.message_id() == last_notify.id
{
received.take();
return Ok(OutgoingEvent::NotificationRst(last_notify.token));
}
}
}
match self.state {
Idle(_) => (), SendingCon(ref mut retransmission_state, duration) => {
if let Some(message) = received {
let out_message = self.message_buffer.message().unwrap();
if message.message_type() == Type::Acknowledgement
&& message.message_id() == out_message.message_id()
{
if message.is_response().unwrap() {
if message.token().unwrap() == out_message.token().unwrap() {
self.finish_request(Some(received.take().unwrap()))?;
return Ok(Success(self.message_buffer.message().unwrap()));
} else {
return Ok(PiggybackedWrongToken);
}
} else {
let end = self.clock.try_now().unwrap() + duration;
self.state = AwaitingResponse(end);
return Ok(AckReceived);
}
} else if message.is_response().unwrap()
&& message.token().unwrap() == out_message.token().unwrap()
{
if message.message_type() == Type::Confirmable {
connection.send(&EncodedMessage::ack(message.message_id()))?;
}
self.finish_request(Some(received.take().unwrap()))?;
return Ok(Success(self.message_buffer.message().unwrap()));
} else if message.message_type() == Type::Reset
&& message.message_id() == out_message.message_id()
{
self.finish_request(None)?;
return Ok(ResetReceived);
}
}
match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
Ok(true) => {
connection.send(self.message_buffer.message().unwrap().data)?;
return Ok(SendCon);
}
Ok(false) => {}
Err(RetransmissionTimeout) => {
self.finish_request(None)?;
return Ok(Timeout);
}
}
}
AwaitingResponse(end) => {
if let Some(message) = received {
let out_message = self.message_buffer.message().unwrap();
if message.is_response().unwrap()
&& message.token().unwrap() == out_message.token().unwrap()
{
if message.message_type() == Type::Confirmable {
connection.send(&EncodedMessage::ack(message.message_id()))?;
}
self.finish_request(Some(received.take().unwrap()))?;
return Ok(Success(self.message_buffer.message().unwrap()));
}
}
if self.clock.try_now().unwrap() > end {
self.finish_request(None)?;
return Ok(Timeout);
}
}
SendingNon(ref mut retransmission_state, ref mut retransmit_requested, timeout)
| SendingPing(ref mut retransmission_state, ref mut retransmit_requested, timeout) => {
if let Some(message) = received {
let out_message = self.message_buffer.message().unwrap();
if message.message_type() == Type::Reset
&& message.message_id() == out_message.message_id()
{
if matches!(self.state, SendingNon(..)) {
self.finish_request(None)?;
return Ok(ResetReceived);
} else {
self.finish_request(Some(received.take().unwrap()))?;
return Ok(Success(self.message_buffer.message().unwrap()));
}
} else if message.is_response().unwrap()
&& message.token().unwrap() == out_message.token().unwrap()
{
if matches!(self.state, SendingNon(..)) {
if message.message_type() == Type::Confirmable {
let mut ack_buf = [0_u8; 4];
let _ack =
EncodedMessage::new_ack(message.message_id(), &mut ack_buf);
connection.send(&ack_buf)?
}
self.finish_request(Some(received.take().unwrap()))?;
return Ok(Success(self.message_buffer.message().unwrap()));
} else {
return Ok(Nothing);
}
}
}
if *retransmit_requested {
match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
Ok(true) => {
*retransmit_requested = false;
connection.send(self.message_buffer.message().unwrap().data)?;
if matches!(self.state, SendingNon(..)) {
return Ok(OutgoingEvent::SendNon);
} else {
return Ok(OutgoingEvent::SendPing);
}
}
Ok(false) => {}
Err(RetransmissionTimeout) => {
self.finish_request(None)?;
return Ok(Timeout);
}
}
}
let waiting_time = self.clock.try_now().unwrap()
- retransmission_state.last_transmission_instant.unwrap();
if waiting_time > timeout {
self.finish_request(None)?;
return Ok(Timeout);
}
}
SendingNotificationCon(ref mut retransmission_state) => {
if let Some(message) = received {
let out_message = self.message_buffer.message().unwrap();
if message.message_type() == Type::Acknowledgement
&& message.message_id() == out_message.message_id()
{
self.finish_notification();
return Ok(NotificationAck);
} else if message.message_type() == Type::Reset
&& message.message_id() == out_message.message_id()
{
let token = out_message.token().unwrap();
self.finish_notification();
return Ok(NotificationRst(token));
}
}
match retransmission_state.retransmit_required(self.clock.try_now().unwrap()) {
Ok(true) => {
connection.send(self.message_buffer.message().unwrap().data)?;
return Ok(SendNotification);
}
Ok(false) => {}
Err(RetransmissionTimeout) => {
self.finish_notification();
return Ok(NotificationTimeout);
}
}
}
SendingNotificationNon => {
connection.send(self.message_buffer.message().unwrap().data)?;
self.finish_notification();
return Ok(SendNotification);
}
}
Ok(Nothing)
}
pub fn parse_options<'options>(
&self,
url: &'options str,
additional_options: Vec<CoapOption<'options>, MAX_OPTION_COUNT>,
) -> Result<Vec<CoapOption<'options>, MAX_OPTION_COUNT>, Error<<UDP as UdpClientStack>::Error>>
{
let mut options = Vec::<CoapOption<'_>, MAX_OPTION_COUNT>::new();
let uri = Uri::new(url).unwrap();
let path = uri.path_str();
for path in path.split('/') {
if !path.is_empty() {
options
.push(CoapOption {
name: CoapOptionName::UriPath,
value: path.as_bytes(),
})
.map_err(|_| Error::OutOfMemory)?;
}
}
if let Some(queries) = uri.query_str() {
for query in queries.split('&') {
options
.push(CoapOption {
name: CoapOptionName::UriQuery,
value: query.as_bytes(),
})
.map_err(|_| Error::OutOfMemory)?;
}
};
for option in additional_options {
options.push(option).map_err(|_| Error::OutOfMemory)?;
}
Ok(options)
}
}
#[cfg(test)]
mod tests {
use core::{cell::RefCell, time::Duration};
use embedded_nal::{SocketAddr, UdpClientStack};
use heapless::Vec;
use mockall::predicate::*;
use mockall::*;
use crate::{
endpoint::outgoing::{OutgoingEvent, OutgoingState},
message::{
codes::{RequestCode, SuccessCode},
Message, Type,
},
};
use super::{super::CoapEndpoint, TransmissionParameters};
#[derive(Debug)]
struct Random {
value: u128,
}
impl embedded_hal::blocking::rng::Read for Random {
type Error = std::io::Error;
fn read(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
self.value += 1;
let buf_len = buf.len();
buf[..buf_len].copy_from_slice(&self.value.to_le_bytes()[..buf_len]);
Ok(())
}
}
#[derive(Debug)]
struct StackError;
struct Socket;
mock! {
Stack {}
impl UdpClientStack for Stack {
type UdpSocket = Socket;
type Error = StackError;
fn socket(&mut self) -> Result<Socket, StackError>;
fn connect(
&mut self,
socket: &mut Socket,
remote: SocketAddr
) -> Result<(), StackError>;
fn send(
&mut self,
socket: &mut Socket,
buffer: &[u8]
) -> Result<(), nb::Error<StackError>>;
fn receive(
&mut self,
socket: &mut Socket,
buffer: &mut [u8]
) -> Result<(usize, SocketAddr), nb::Error<StackError>>;
fn close(&mut self, socket: Socket) -> Result<(), StackError>;
}
}
#[derive(Debug)]
struct MyClock {
last_time: RefCell<Duration>,
now: RefCell<Duration>,
}
impl embedded_timers::clock::Clock for MyClock {
fn try_now(
&self,
) -> Result<embedded_timers::clock::Instant, embedded_timers::clock::ClockError> {
*self.last_time.borrow_mut() = *self.now.borrow();
Ok(*self.now.borrow())
}
}
impl MyClock {
fn advance(&self, step: Duration) {
*self.now.borrow_mut() = *self.last_time.borrow() + step;
}
}
#[test]
fn send_ping() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let message_id = endpoint.outgoing_communication.next_message_id.unwrap();
let ping_buf = crate::message::encoded_message::EncodedMessage::ping(message_id);
let pong: Message<'_> = Message::new_rst(message_id);
stack
.expect_receive()
.once()
.return_once(move |_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buf| {
assert_eq!(buf, ping_buf);
Ok(())
});
endpoint
.outgoing()
.schedule_ping(Duration::from_secs(1))
.unwrap();
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendPing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingPing(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
let encoded_message = pong.encode(buffer).unwrap();
Ok((
encoded_message.message_length(),
"127.0.0.1:5683".parse().unwrap(),
))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Success(_)));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(true)
));
}
#[test]
fn send_con_get_piggybacked() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::Confirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let response: Message<'_> = Message::new(
Type::Acknowledgement,
SuccessCode::Content.into(),
request_id,
request_token,
Vec::new(),
Some(b"world"),
);
let mut response_buf = [0_u8; 32];
let response_length = response.encode(&mut response_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_con(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::Success(_)));
match outgoing {
OutgoingEvent::Success(message) => {
assert_eq!(message, response.encode(&mut response_buf).unwrap());
}
_ => (),
}
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
}
#[test]
fn send_con_get_separate() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::Confirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let ack: Message<'_> = Message::new_ack(request_id);
let mut ack_buf = [0_u8; 4];
let ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
let response: Message<'_> = Message::new(
Type::Acknowledgement,
SuccessCode::Content.into(),
request_id,
request_token,
Vec::new(),
Some(b"world"),
);
let mut response_buf = [0_u8; 32];
let response_length = response.encode(&mut response_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_con(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..ack_length].copy_from_slice(&ack_buf[..ack_length]);
Ok((ack_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::AckReceived));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::AwaitingResponse(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::Success(_)));
match outgoing {
OutgoingEvent::Success(message) => {
assert_eq!(message, response.encode(&mut response_buf).unwrap());
}
_ => (),
}
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
}
#[test]
fn send_non_get_non() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::NonConfirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let response: Message<'_> = Message::new(
Type::NonConfirmable,
SuccessCode::Content.into(),
request_id + 1,
request_token,
Vec::new(),
Some(b"world"),
);
let mut response_buf = [0_u8; 32];
let response_length = response.encode(&mut response_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_non(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingNon(..)
));
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingNon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::Success(_)));
match outgoing {
OutgoingEvent::Success(message) => {
assert_eq!(message, response.encode(&mut response_buf).unwrap());
}
_ => (),
}
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
}
#[test]
fn send_non_get_con() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::NonConfirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let response_id = request_id + 1;
let response: Message<'_> = Message::new(
Type::Confirmable,
SuccessCode::Content.into(),
response_id,
request_token,
Vec::new(),
Some(b"world"),
);
let mut response_buf = [0_u8; 32];
let response_length = response.encode(&mut response_buf).unwrap().message_length();
let ack: Message<'_> = Message::new_ack(response_id);
let mut ack_buf = [0_u8; 4];
let _ack_length = ack.encode(&mut ack_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_non(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendNon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingNon(..)
));
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingNon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
});
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, ack_buf);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::Success(_)));
match outgoing {
OutgoingEvent::Success(message) => {
assert_eq!(message, response.encode(&mut response_buf).unwrap());
}
_ => (),
}
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
}
#[test]
fn send_con_get_rst() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::Confirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let reset: Message<'_> = Message::new_rst(request_id);
let mut reset_buf = [0_u8; 4];
let reset_length = reset.encode(&mut reset_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_con(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().once().return_once(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..reset_length].copy_from_slice(&reset_buf[..reset_length]);
Ok((reset_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::ResetReceived));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
}
#[test]
fn send_con_get_piggybacked_wrong_token() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::Confirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
let mut response_token = request_token;
response_token.bytes[0] += 1;
let response: Message<'_> = Message::new(
Type::Acknowledgement,
SuccessCode::Content.into(),
request_id,
response_token,
Vec::new(),
Some(b"world"),
);
let mut response_buf = [0_u8; 32];
let response_length = response.encode(&mut response_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_con(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().returning(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack.expect_receive().once().return_once(move |_, buffer| {
buffer[..response_length].copy_from_slice(&response_buf[..response_length]);
Ok((response_length, "127.0.0.1:5683".parse().unwrap()))
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
let outgoing = outgoing.unwrap();
assert!(matches!(outgoing, OutgoingEvent::PiggybackedWrongToken));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
endpoint.outgoing().cancel().unwrap();
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(false)
));
}
#[test]
fn send_con_get_timeout() {
let mut stack = MockStack::default();
let clock = MyClock {
last_time: RefCell::new(Duration::from_secs(0)),
now: RefCell::new(Duration::from_secs(1)),
};
let mut receive_buffer = [0_u8; crate::DEFAULT_COAP_MESSAGE_SIZE];
let mut endpoint: CoapEndpoint<
'_,
MockStack,
Random,
MyClock,
8,
32,
128,
> = CoapEndpoint::try_new(
TransmissionParameters::default(),
Random { value: 0 },
&clock,
&mut receive_buffer,
)
.unwrap();
stack.expect_socket().once().return_once(|| Ok(Socket));
stack.expect_connect().once().return_once(|_, _| Ok(()));
endpoint
.connect_to_addr(&mut stack, "127.0.0.1:5683".parse().unwrap())
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(_)
));
let request_id = endpoint.outgoing_communication.next_message_id.unwrap();
let request_token = endpoint.outgoing_communication.next_token.unwrap();
let request: Message<'_> = Message::new(
Type::Confirmable,
RequestCode::Get.into(),
request_id,
request_token,
Vec::new(),
Some(b"/hello"),
);
let mut request_buf = [0_u8; 32];
let request_length = request.encode(&mut request_buf).unwrap().message_length();
endpoint
.outgoing()
.schedule_con(
RequestCode::Get,
Vec::new(),
Some(b"/hello"),
Duration::from_secs(1),
)
.unwrap();
stack
.expect_receive()
.once()
.return_once(|_, _| Err(nb::Error::WouldBlock));
stack.expect_send().returning(move |_, buffer| {
assert_eq!(buffer, &request_buf[..request_length]);
Ok(())
});
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
stack
.expect_receive()
.returning(|_, _| Err(nb::Error::WouldBlock));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Nothing));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
clock.advance(Duration::from_secs(3));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
clock.advance(Duration::from_secs(5));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
clock.advance(Duration::from_secs(9));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
clock.advance(Duration::from_secs(17));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::SendCon));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::SendingCon(..)
));
clock.advance(Duration::from_secs(33));
let (_, outgoing, _) = endpoint.process(&mut stack).unwrap();
assert!(matches!(outgoing.unwrap(), OutgoingEvent::Timeout));
assert!(matches!(
endpoint.outgoing_communication.state,
OutgoingState::Idle(false)
));
}
}