use core::fmt::Write;
use naan::prelude::ResultExt;
use toad_array::Array;
use toad_len::Len;
use toad_map::{InsertError, Map};
use toad_msg::{Token, Type};
use toad_stem::Stem;
use super::{log, Step, StepOutput};
use crate::net::Addrd;
use crate::platform::{Effect, PlatformTypes};
use crate::req::Req;
use crate::resp::Resp;
use crate::todo::String;
use crate::{exec_inner_step, platform};
#[derive(Debug)]
pub struct HandleAcks<S, B> {
buffer: Stem<B>,
inner: S,
}
impl<S: Default, B: Default> Default for HandleAcks<S, B> {
fn default() -> Self {
Self { buffer: Default::default(),
inner: S::default() }
}
}
#[derive(Clone, PartialEq, Eq)]
pub enum Error<E> {
Inner(E),
ConBufferCapacityExhausted,
}
impl<E> From<E> for Error<E> {
fn from(e: E) -> Self {
Error::Inner(e)
}
}
impl<E: core::fmt::Debug> core::fmt::Debug for Error<E> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
| Self::ConBufferCapacityExhausted => f.debug_struct("ConBufferCapacityExhausted").finish(),
| Self::Inner(e) => e.fmt(f),
}
}
}
impl<E: super::Error> super::Error for Error<E> {}
macro_rules! common {
($in:expr, $msg:expr, $effects:expr, $buffer:expr) => {{
let msg: Addrd<&platform::Message<P>> = $msg;
match msg.data().ty {
Type::Ack if !$buffer.map_ref(|buf| buf.has(&msg.map(|m| m.token)))
=> {
let (size, sender, token) =
(msg.data().len(), msg.addr(), msg.data().token);
let tokens = $buffer.map_ref(
|buf| {
let mut tokens = String::<1000>::default();
write!(tokens, "[").ok();
buf.iter().enumerate().for_each(|(ix, (token, _))| {
write!(tokens, "{:?}", token).ok();
if ix < buf.len() - 1 {
write!(tokens, ",").ok();
}
});
write!(tokens, "]").ok();
tokens
});
let tokens = tokens.as_str();
log!(HandleAcks, $effects, log::Level::Warn, "Discarding {size}b ACK from {sender} addressing unknown {token:?}. Presently expecting acks for: {tokens}");
None
},
Type::Ack => {
let (size, sender, token) = (msg.data().len(), msg.addr(), (msg.data().id, msg.data().token));
log!(HandleAcks, $effects, log::Level::Trace, "Got {size}b ACK from {sender} for {token:?}");
$buffer.map_mut(|buf| buf.remove(&msg.as_ref().map(|m| m.token)));
if msg.data().code.kind() == toad_msg::CodeKind::Empty {
None
} else {
Some(Ok($in))
}
},
_ => Some(Ok($in))
}
}};
}
impl<P: PlatformTypes,
B: Map<Addrd<Token>, ()> + core::fmt::Debug,
E: super::Error,
S: Step<P, PollReq = Addrd<Req<P>>, PollResp = Addrd<Resp<P>>, Error = E>> Step<P>
for HandleAcks<S, B>
{
type PollReq = Addrd<Req<P>>;
type PollResp = Addrd<Resp<P>>;
type Error = Error<E>;
type Inner = S;
fn inner(&self) -> &S {
&self.inner
}
fn poll_req(&self,
snap: &crate::platform::Snapshot<P>,
effects: &mut <P as PlatformTypes>::Effects)
-> StepOutput<Self::PollReq, Self::Error> {
let req = exec_inner_step!(self.inner.poll_req(snap, effects), Error::Inner);
match req {
| Some(req) => {
let msg = req.as_ref().map(|r| r.as_ref());
common!(req, msg, effects, self.buffer)
},
| None => None,
}
}
fn poll_resp(&self,
snap: &crate::platform::Snapshot<P>,
effects: &mut <P as PlatformTypes>::Effects,
token: toad_msg::Token,
addr: no_std_net::SocketAddr)
-> StepOutput<Self::PollResp, Self::Error> {
let resp = exec_inner_step!(self.inner.poll_resp(snap, effects, token, addr),
Error::Inner);
match resp {
| Some(resp) => {
let msg = resp.as_ref().map(|r| r.as_ref());
common!(resp, msg, effects, self.buffer)
},
| None => None,
}
}
fn on_message_sent(&self,
snap: &platform::Snapshot<P>,
effects: &mut P::Effects,
msg: &Addrd<crate::platform::Message<P>>)
-> Result<(), Self::Error> {
self.inner
.on_message_sent(snap, effects, msg)
.map_err(Error::Inner)?;
match msg.data().ty {
| Type::Con => self.buffer
.map_mut(|buf| buf.insert(msg.as_ref().map(|m| m.token), ()))
.recover(|e| {
if matches!(e, InsertError::Exists(_)) {
Ok(())
} else {
Err(e)
}
})
.map_err(|_| Error::ConBufferCapacityExhausted),
| _ => Ok(()),
}
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use tinyvec::array_vec;
use toad_msg::{Code, Payload};
use super::*;
use crate::platform::Effect;
use crate::step::test::test_step;
use crate::test;
type InnerPollReq = Addrd<Req<test::Platform>>;
type InnerPollResp = Addrd<Resp<test::Platform>>;
type HandleAcks<S> = super::HandleAcks<S, BTreeMap<Addrd<Token>, ()>>;
fn test_message(ty: Type) -> Addrd<test::Message> {
use toad_msg::*;
Addrd(test::Message { ver: Default::default(),
ty,
id: Id(1),
code: Code::new(0, 1),
token: Token(array_vec!(_ => 1)),
payload: Payload(Default::default()),
opts: Default::default() },
test::dummy_addr())
}
test_step!(
GIVEN HandleAcks::<Dummy> where Dummy: {Step<PollReq = InnerPollReq, PollResp = InnerPollResp, Error = ()>};
WHEN inner_errors [
(inner.poll_req => { Some(Err(nb::Error::Other(()))) }),
(inner.poll_resp => { Some(Err(nb::Error::Other(()))) }),
(inner.on_message_sent = { |_, _| Err(()) })
]
THEN this_should_error [
(poll_req(_, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::Other(Error::Inner(()))))) }),
(poll_resp(_, _, _, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::Other(Error::Inner(()))))) }),
(on_message_sent(_, test::msg!(CON GET x.x.x.x:8080)) should satisfy { |out| assert_eq!(out, Err(Error::Inner(()))) })
]
);
test_step!(
GIVEN HandleAcks::<Dummy> where Dummy: {Step<PollReq = InnerPollReq, PollResp = InnerPollResp, Error = ()>};
WHEN inner_blocks [
(inner.poll_req => { Some(Err(nb::Error::WouldBlock)) }),
(inner.poll_resp => { Some(Err(nb::Error::WouldBlock)) })
]
THEN this_should_block [
(poll_req(_, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::WouldBlock))) }),
(poll_resp(_, _, _, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::WouldBlock))) })
]
);
test_step!(
GIVEN HandleAcks::<Dummy> where Dummy: {Step<PollReq = InnerPollReq, PollResp = InnerPollResp, Error = ()>};
WHEN unexpected_ack_received [
(inner.poll_req => { Some(Ok(test::msg!(ACK {0 . 01} x.x.x.x:8080).map(Req::from))) }),
(inner.poll_resp => { Some(Ok(test::msg!(ACK {2 . 05} x.x.x.x:8080).map(Resp::from))) }),
(inner.on_message_sent = { |_, _| Ok(()) })
]
THEN should_ignore [
(
poll_resp(
_,
_,
test_message(Type::Con).data().token,
crate::test::dummy_addr()
) should satisfy {
|out| assert_eq!(out, None)
}
),
(poll_req(_, _) should satisfy { |out| assert_eq!(out, None) }),
(
effects should satisfy {|effects| {
assert!(matches!(effects[0], Effect::Log(log::Level::Warn, _)));
assert!(matches!(effects[1], Effect::Log(log::Level::Warn, _)));
}}
)
]
);
#[test]
fn when_expected_piggybacked_ack_received_it_should_be_processed_and_returned() {
struct TestState {
token_last_sent: Option<Token>,
}
type Mock =
test::MockStep<TestState, Addrd<Req<test::Platform>>, Addrd<Resp<test::Platform>>, ()>;
let sut = HandleAcks::<Mock>::default();
sut.inner()
.init(TestState { token_last_sent: None })
.set_on_message_sent(|mock, _, _, msg| {
mock.state
.map_mut(|s| s.as_mut().unwrap().token_last_sent = Some(msg.data().token));
Ok(())
})
.set_poll_resp(|mock, _, _, poll_for_token, _| {
let mut msg = test::msg!(ACK {2 . 05} x.x.x.x:2222);
let token = mock.state
.map_ref(|s| s.as_ref().unwrap().token_last_sent.unwrap());
Addrd::data_mut(&mut msg).token = token;
assert_eq!(token, poll_for_token);
let p = Payload(format!("oink oink!").bytes().collect::<Vec<_>>());
Addrd::data_mut(&mut msg).payload = p;
Some(Ok(msg.map(Resp::from)))
});
let token = Token(array_vec![1, 2, 3, 4]);
let mut sent_req = test::msg!(CON GET x.x.x.x:2222);
let dest = sent_req.addr();
sent_req.as_mut().token = token;
let snap = test::snapshot();
let mut effs = Vec::<test::Effect>::new();
sut.on_message_sent(&snap, &mut effs, &sent_req).unwrap();
let res = sut.poll_resp(&snap, &mut effs, token, dest);
assert!(!effs.is_empty());
match &effs[0] {
| Effect::Log(lvl, _) => assert_eq!(*lvl, log::Level::Trace),
| e => panic!("{e:?}"),
}
assert_eq!(res.unwrap().unwrap().data().payload_string().unwrap(),
format!("oink oink!"));
}
#[test]
fn when_expected_empty_ack_received_it_should_be_processed_and_ignored() {
struct TestState {
token_last_sent: Option<Token>,
}
type Mock =
test::MockStep<TestState, Addrd<Req<test::Platform>>, Addrd<Resp<test::Platform>>, ()>;
let sut = HandleAcks::<Mock>::default();
sut.inner()
.init(TestState { token_last_sent: None })
.set_on_message_sent(|mock, _, _, msg| {
mock.state
.map_mut(|s| s.as_mut().unwrap().token_last_sent = Some(msg.data().token));
Ok(())
})
.set_poll_resp(|mock, _, _, poll_for_token, _| {
let mut msg = test::msg!(ACK {0 . 00} x.x.x.x:2222);
let token = mock.state
.map_ref(|s| s.as_ref().unwrap().token_last_sent.unwrap());
Addrd::data_mut(&mut msg).token = token;
assert_eq!(token, poll_for_token);
Some(Ok(msg.map(Resp::from)))
});
let token = Token(array_vec![1, 2, 3, 4]);
let mut sent_req = test::msg!(CON GET x.x.x.x:2222);
let dest = sent_req.addr();
sent_req.as_mut().token = token;
let snap = test::snapshot();
let mut effs = Vec::<test::Effect>::new();
sut.on_message_sent(&snap, &mut effs, &sent_req).unwrap();
let res = sut.poll_resp(&snap, &mut effs, token, dest);
assert!(!effs.is_empty());
match &effs[0] {
| Effect::Log(lvl, _) => assert_eq!(*lvl, log::Level::Trace),
| e => panic!("{e:?}"),
}
assert_eq!(res, None);
}
}