use crate::link::data::{self, Llid};
use crate::link::{MIN_DATA_PAYLOAD_BUF, MIN_DATA_PDU_BUF};
use crate::{bytes::*, Error};
use heapless::consts::U1;
use heapless::spsc::{self, MultiCore};
pub trait PacketQueue {
type Producer: Producer;
type Consumer: Consumer;
fn split(self) -> (Self::Producer, Self::Consumer);
}
pub trait Producer {
fn free_space(&self) -> u8;
fn produce_dyn(
&mut self,
payload_bytes: u8,
f: &mut dyn FnMut(&mut ByteWriter<'_>) -> Result<Llid, Error>,
) -> Result<(), Error>;
fn produce_with<E>(
&mut self,
payload_bytes: u8,
f: impl FnOnce(&mut ByteWriter<'_>) -> Result<Llid, E>,
) -> Result<(), E>
where
E: From<Error>,
Self: Sized,
{
let mut f = Some(f);
let mut r = None;
self.produce_dyn(payload_bytes, &mut |bytes| {
let f = f.take().unwrap();
let result = f(bytes);
if let Ok(llid) = result {
r = Some(Ok(()));
Ok(llid)
} else {
r = Some(result.map(|_| ()));
Err(Error::InvalidValue)
}
})
.ok();
r.unwrap()
}
}
pub trait Consumer {
fn has_data(&self) -> bool;
fn consume_raw_with<R>(
&mut self,
f: impl FnOnce(data::Header, &[u8]) -> Consume<R>,
) -> Result<R, Error>;
fn consume_pdu_with<R>(
&mut self,
f: impl FnOnce(data::Header, data::Pdu<'_, &[u8]>) -> Consume<R>,
) -> Result<R, Error> {
self.consume_raw_with(|header, raw| {
let pdu = match data::Pdu::parse(header, raw) {
Ok(pdu) => pdu,
Err(e) => return Consume::always(Err(e)),
};
f(header, pdu)
})
}
}
#[derive(Debug)]
pub struct Consume<T> {
should_consume: bool,
result: Result<T, Error>,
}
impl<T> Consume<T> {
pub fn new(consume: bool, result: Result<T, Error>) -> Self {
Self {
should_consume: consume,
result,
}
}
pub fn always(result: Result<T, Error>) -> Self {
Self {
should_consume: true,
result,
}
}
pub fn never(result: Result<T, Error>) -> Self {
Self {
should_consume: false,
result,
}
}
pub fn on_success(result: Result<T, Error>) -> Self {
Self {
should_consume: result.is_ok(),
result,
}
}
pub fn should_consume(&self) -> bool {
self.should_consume
}
pub fn result(&self) -> &Result<T, Error> {
&self.result
}
pub fn into_result(self) -> Result<T, Error> {
self.result
}
}
pub struct SimpleQueue {
inner: spsc::Queue<[u8; MIN_DATA_PDU_BUF], U1, u8, MultiCore>,
}
impl SimpleQueue {
pub const fn new() -> Self {
Self {
inner: spsc::Queue(heapless::i::Queue::u8()),
}
}
}
impl<'a> PacketQueue for &'a mut SimpleQueue {
type Producer = SimpleProducer<'a>;
type Consumer = SimpleConsumer<'a>;
fn split(self) -> (Self::Producer, Self::Consumer) {
let (p, c) = self.inner.split();
(SimpleProducer { inner: p }, SimpleConsumer { inner: c })
}
}
pub struct SimpleProducer<'a> {
inner: spsc::Producer<'a, [u8; MIN_DATA_PDU_BUF], U1, u8, MultiCore>,
}
impl<'a> Producer for SimpleProducer<'a> {
fn free_space(&self) -> u8 {
if self.inner.ready() {
MIN_DATA_PAYLOAD_BUF as u8
} else {
0
}
}
fn produce_dyn(
&mut self,
payload_bytes: u8,
f: &mut dyn FnMut(&mut ByteWriter<'_>) -> Result<Llid, Error>,
) -> Result<(), Error> {
assert!(usize::from(payload_bytes) <= MIN_DATA_PAYLOAD_BUF);
if !self.inner.ready() {
return Err(Error::Eof);
}
let mut buf = [0; MIN_DATA_PDU_BUF];
let mut writer = ByteWriter::new(&mut buf[2..]);
let free = writer.space_left();
let llid = f(&mut writer)?;
let used = free - writer.space_left();
let mut header = data::Header::new(llid);
header.set_payload_length(used as u8);
header.to_bytes(&mut ByteWriter::new(&mut buf[..2]))?;
self.inner.enqueue(buf).map_err(|_| ()).unwrap();
Ok(())
}
}
pub struct SimpleConsumer<'a> {
inner: spsc::Consumer<'a, [u8; MIN_DATA_PDU_BUF], U1, u8, MultiCore>,
}
impl<'a> Consumer for SimpleConsumer<'a> {
fn has_data(&self) -> bool {
self.inner.ready()
}
fn consume_raw_with<R>(
&mut self,
f: impl FnOnce(data::Header, &[u8]) -> Consume<R>,
) -> Result<R, Error> {
if let Some(packet) = self.inner.peek() {
let mut bytes = ByteReader::new(packet);
let raw_header: [u8; 2] = bytes.read_array().unwrap();
let header = data::Header::parse(&raw_header);
let pl_len = usize::from(header.payload_length());
let raw_payload = bytes.read_slice(pl_len)?;
let res = f(header, raw_payload);
if res.should_consume {
self.inner.dequeue().unwrap(); }
res.result
} else {
Err(Error::Eof)
}
}
}
pub fn run_tests(queue: impl PacketQueue) {
fn assert_empty(c: &mut impl Consumer) {
assert!(!c.has_data(), "empty queue `has_data()` returned true");
let err = c
.consume_raw_with(|_, _| -> Consume<()> {
unreachable!("`consume_raw_with` on empty queue invoked the callback");
})
.unwrap_err();
assert_eq!(
err,
Error::Eof,
"empty queues `consume_raw_with` didn't return expected `Error::Eof"
);
let err = c
.consume_pdu_with(|_, _| -> Consume<()> {
unreachable!("`consume_pdu_with` on empty queue invoked the callback");
})
.unwrap_err();
assert_eq!(
err,
Error::Eof,
"empty queues `consume_pdu_with` didn't return expected `Error::Eof"
);
}
let (mut p, mut c) = queue.split();
assert_empty(&mut c);
let free_space = p.free_space();
assert!(
free_space >= MIN_DATA_PAYLOAD_BUF as u8,
"empty queue has space for {} byte payload, need at least {}",
free_space,
MIN_DATA_PAYLOAD_BUF
);
p.produce_with(MIN_DATA_PAYLOAD_BUF as u8, |writer| -> Result<_, Error> {
assert_eq!(
writer.space_left(),
MIN_DATA_PAYLOAD_BUF,
"produce_with didn't pass ByteWriter with correct buffer"
);
writer.write_slice(&[0; MIN_DATA_PAYLOAD_BUF]).unwrap();
Ok(Llid::DataStart)
})
.expect("enqueuing packet failed");
assert!(
c.has_data(),
"consumer's `has_data()` still false after enqueuing packet"
);
c.consume_raw_with(|header, data| -> Consume<()> {
assert_eq!(usize::from(header.payload_length()), MIN_DATA_PAYLOAD_BUF);
assert_eq!(
data,
&[0; MIN_DATA_PAYLOAD_BUF][..],
"consume_raw_with didn't yield correct payload"
);
Consume::never(Ok(()))
})
.expect("consume_raw_with failed when data is available");
assert!(
c.has_data(),
"`has_data()` returned false after using `Consume::never`"
);
c.consume_pdu_with(|header, _| -> Consume<()> {
assert_eq!(usize::from(header.payload_length()), MIN_DATA_PAYLOAD_BUF);
Consume::always(Ok(()))
})
.expect("consume_pdu_with failed when data is available");
assert_empty(&mut c);
p.produce_with(0, |writer| -> Result<_, Error> {
assert_eq!(
writer.space_left(),
MIN_DATA_PAYLOAD_BUF,
"produce_with didn't pass ByteWriter with correct buffer"
);
Ok(Llid::DataStart)
})
.expect("enqueuing packet failed");
assert!(
c.has_data(),
"consumer's `has_data()` still false after enqueuing empty packet"
);
c.consume_raw_with(|header, data| -> Consume<()> {
assert_eq!(usize::from(header.payload_length()), 0);
assert_eq!(
data,
&[][..],
"consume_raw_with didn't yield correct empty payload"
);
Consume::never(Ok(()))
})
.expect("consume_raw_with failed when empty packet is available");
assert!(
c.has_data(),
"`has_data()` returned false after using `Consume::never`"
);
c.consume_pdu_with(|header, _| -> Consume<()> {
assert_eq!(usize::from(header.payload_length()), 0);
Consume::always(Ok(()))
})
.expect("consume_pdu_with failed when empty packet is available");
assert_empty(&mut c);
}
#[test]
fn simple_queue() {
run_tests(&mut SimpleQueue::new());
}