use core::{fmt::Debug, ops::DerefMut};
use embassy_sync::{blocking_mutex::raw::RawMutex, mutex::Mutex};
use embassy_time::{with_timeout, Duration, TimeoutError};
use rand_core::RngCore;
use crate::{
frame_pool::{FrameBox, RawFrameSlice, SendFrameBox, WireFrameBox},
peer::{Peer, INCOMING_SIZE, OUTGOING_SIZE},
CmdAddr, Error, FrameSerial, MAX_TARGETS,
};
pub const REPLY_TIMEOUT: Duration = Duration::from_millis(1);
pub struct Controller<
R: RawMutex + 'static,
const IN: usize = INCOMING_SIZE,
const OUT: usize = OUTGOING_SIZE,
> {
peers: Mutex<R, [Peer<IN, OUT>; MAX_TARGETS]>,
}
impl<R: RawMutex + 'static, const IN: usize, const OUT: usize> Controller<R, IN, OUT> {
const ONE: Peer<IN, OUT> = Peer::<IN, OUT>::const_new();
pub const fn uninit() -> Controller<R, IN, OUT> {
Self {
peers: Mutex::new([Self::ONE; MAX_TARGETS]),
}
}
pub async fn init(&self, sli: &mut RawFrameSlice) {
assert!(sli.capacity() >= (INCOMING_SIZE * MAX_TARGETS));
let mut inner = self.peers.lock().await;
for m in inner.iter_mut() {
let mut split = sli.split(INCOMING_SIZE).unwrap();
core::mem::swap(sli, &mut split);
assert_eq!(split.capacity(), INCOMING_SIZE);
m.set_pool(split);
}
}
}
impl<R: RawMutex + 'static> Controller<R> {
pub async fn step<T, Rand>(
&self,
serial: &mut T,
rand: &mut Rand,
) -> Result<(), Error<T::SerError>>
where
T: FrameSerial,
Rand: RngCore,
{
let mut inner = self.peers.lock().await;
serve_peers(inner.deref_mut(), serial).await?;
complete_pendings(inner.deref_mut(), serial).await?;
offer_addr(inner.deref_mut(), serial, rand).await?;
Ok(())
}
}
impl<R: RawMutex + 'static> Controller<R> {
pub async fn send(&self, mac: u64, frame: SendFrameBox) -> Result<(), SendError> {
self.peers
.lock()
.await
.iter_mut()
.find(|p| p.is_active_mac(mac))
.ok_or(SendError::NoMatchingMac)
.and_then(|p| {
p.enqueue_outgoing(frame.into_inner())
.map_err(SendError::QueueFull)
})
}
pub async fn recv_from(&self, mac: u64) -> Result<WireFrameBox, RecvError> {
self.peers
.lock()
.await
.iter_mut()
.find(|p| p.is_active_mac(mac))
.ok_or(RecvError::NoMatchingMac)
.and_then(|p| p.dequeue_incoming().ok_or(RecvError::NoMessage))
.map(WireFrameBox::new_unchecked)
}
pub async fn connected(&self) -> heapless::Vec<u64, { MAX_TARGETS + 1 }> {
self.peers
.lock()
.await
.iter()
.filter_map(|p| p.is_active().then_some(p.mac()))
.collect()
}
}
pub enum SendError {
NoMatchingMac,
QueueFull(FrameBox),
}
impl Debug for SendError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_str("SendError::")?;
let remain = match self {
SendError::NoMatchingMac => "NoMatchingMac",
SendError::QueueFull(_) => "QueueFull(...)",
};
f.write_str(remain)
}
}
#[derive(Debug, PartialEq)]
pub enum RecvError {
NoMatchingMac,
NoMessage,
}
async fn serve_peers<T: FrameSerial>(
inner: &mut [Peer; MAX_TARGETS],
serial: &mut T,
) -> Result<(), Error<T::SerError>> {
for (i, p) in inner.iter_mut().enumerate() {
if !p.is_active() {
continue;
}
let Some(mut rx) = p.alloc_incoming() else {
nut_warn!("Couldn't alloc incoming!");
p.increment_error();
continue;
};
let mut maybe_out = p.dequeue_outgoing();
let mut fallback = [0u8; 1];
let to_send = match maybe_out.as_deref_mut() {
Some(fb) => fb,
None => &mut fallback,
};
to_send[0] = CmdAddr::SelectAddr(i as u8).into();
serial.send_frame(to_send).await?;
let rxto = with_timeout(REPLY_TIMEOUT, serial.recv(&mut rx));
match rxto.await {
Ok(Ok(tf)) => {
let len = tf.frame.len();
if len != 0 && tf.frame[0] == CmdAddr::ReplyFromAddr(i as u8).into() {
p.set_success();
if len > 1 {
nut_trace!("Got msg len {=usize} for {=usize}", len, i);
rx.set_len(len);
p.enqueue_incoming(rx);
}
} else {
nut_warn!("Error with {=usize} len is {=usize}", i, len);
p.increment_error();
}
}
Ok(Err(e)) => {
p.increment_error();
return Err(e);
}
Err(TimeoutError) => {
p.increment_error();
}
}
}
Ok(())
}
async fn complete_pendings<T: FrameSerial>(
inner: &mut [Peer; MAX_TARGETS],
serial: &mut T,
) -> Result<(), Error<T::SerError>> {
for (i, p) in inner.iter_mut().enumerate() {
let Some(mac) = p.is_pending() else {
continue;
};
let mut out_buf = [0u8; 9];
out_buf[0] = CmdAddr::DiscoverySuccess(i as u8).into();
out_buf[1..9].copy_from_slice(&mac.to_le_bytes());
let mut in_buf = [0u8; 2];
serial.send_frame(&out_buf).await?;
let rxto = with_timeout(REPLY_TIMEOUT, serial.recv(&mut in_buf));
match rxto.await {
Ok(Ok(tf)) => {
let frame = tf.frame;
let good_len = frame.len() == 1;
let good_hdr = good_len && frame[0] == CmdAddr::ReplyFromAddr(i as u8).into();
if good_hdr {
nut_info!("Promoting to active {=usize} {=u64}", i, mac);
p.promote_to_active();
} else {
p.increment_error();
}
}
Ok(Err(_e)) => {
p.increment_error();
continue;
}
Err(TimeoutError) => {
p.increment_error();
}
}
}
Ok(())
}
async fn offer_addr<T: FrameSerial, R: RngCore>(
inner: &mut [Peer; MAX_TARGETS],
serial: &mut T,
rand: &mut R,
) -> Result<(), Error<T::SerError>> {
let Some((i, p)) = inner.iter_mut().enumerate().find(|(_i, p)| p.is_idle()) else {
return Ok(());
};
let mut out_buf = [0u8; 9];
out_buf[0] = CmdAddr::DiscoveryOffer(i as u8).into();
rand.fill_bytes(&mut out_buf[1..9]);
serial.send_frame(&out_buf).await?;
let mut in_buf = [0u8; 10];
let rxto = with_timeout(Duration::from_millis(1), serial.recv(&mut in_buf));
match rxto.await {
Ok(Ok(tf)) => {
let frame = tf.frame;
let good_len = frame.len() == 9;
let good_hdr = good_len && frame[0] == CmdAddr::DiscoveryClaim(i as u8).into();
if good_hdr {
let mut mac = [0u8; 8];
let rand_iter = out_buf[1..9].iter();
let resp_iter = frame[1..9].iter();
mac.iter_mut()
.zip(rand_iter.zip(resp_iter))
.for_each(|(d, (a, b))| *d = *a ^ *b);
p.promote_to_pending(u64::from_le_bytes(mac));
}
}
Ok(Err(e)) => return Err(e),
Err(_) => {
}
}
Ok(())
}