use std::collections::VecDeque;
use std::fmt::{Debug, Display, Formatter};
use std::time::Duration;
use nusb::Interface;
use nusb::transfer::{Queue, RequestBuffer, TransferError};
use smol::Task;
use smol::lock::Mutex;
use super::serial_port;
use crate::devices::abort;
use crate::messages::{CMD_LEN_MAX, Dispatcher};
const IN_ENDPOINT: u8 = 0x81;
const N_TRANSFERS: usize = 3;
const OUT_ENDPOINT: u8 = 0x02;
const TIME: Duration = Duration::from_millis(10);
pub(super) struct Communicator<const CH: usize> {
dispatcher: Dispatcher<CH>,
#[allow(unused)]
incoming: Task<()>,
pub(super) outgoing: Mutex<Queue<Vec<u8>>>,
}
impl<const CH: usize> Communicator<CH> {
pub(super) async fn new(interface: Interface, dispatcher: Dispatcher<CH>) -> Self {
log::debug!("{dispatcher} COMMUNICATOR::NEW (requested)");
serial_port::init(&interface).await;
let dsp = dispatcher.clone(); let outgoing = Mutex::new(interface.bulk_out_queue(OUT_ENDPOINT));
let incoming = Self::spawn(interface, dsp);
log::debug!("{dispatcher} COMMUNICATOR::NEW (success)");
Self {
dispatcher,
incoming,
outgoing,
}
}
fn handle_error(error: TransferError) {
match error {
_ => abort(format!("Background task error : {}", error)),
}
}
fn spawn(interface: Interface, dispatcher: Dispatcher<CH>) -> Task<()> {
log::debug!("{dispatcher} SPAWN (requested)");
let mut endpoint = interface.bulk_in_queue(IN_ENDPOINT);
while endpoint.pending() < N_TRANSFERS {
endpoint.submit(RequestBuffer::new(CMD_LEN_MAX));
}
let mut queue: VecDeque<u8> = VecDeque::with_capacity(N_TRANSFERS * CMD_LEN_MAX);
let mut id = [0u8; 2]; let mut listen = async move || -> Result<(), TransferError> {
log::debug!("{dispatcher} SPAWN (starting background task)");
loop {
smol::Timer::after(TIME).await;
let completion = endpoint.next_complete().await;
if completion.data.len() > 2 {
completion.status?;
log::trace!(
"BACKGROUND {} RECEIVED {:02X?}",
dispatcher.serial_number(),
&completion.data[2..],
);
queue.extend(&completion.data[2..]); while queue.get(5).is_some() {
id[0] = queue[0]; id[1] = queue[1]; log::trace!(
"BACKGROUND {} MESSAGE ID {:02X?}",
dispatcher.serial_number(),
id
);
let len = dispatcher.length(&id).await;
if queue.len() < len {
log::trace!(
"BACKGROUND {} INCOMPLETE (waiting) QUEUE {} REQUIRE {}",
dispatcher.serial_number(),
queue.len(),
len,
);
break;
}
let msg = queue.drain(..len).collect();
log::trace!(
"BACKGROUND {} DISPATCH {:02X?}",
dispatcher.serial_number(),
msg
);
dispatcher.dispatch(msg, CH).await;
}
}
endpoint.submit(RequestBuffer::reuse(completion.data, CMD_LEN_MAX));
}
};
smol::spawn(async move {
if let Err(error) = listen().await {
Self::handle_error(error);
}
})
}
pub(super) async fn send(&self, command: Vec<u8>) {
log::trace!("{self} SEND (requested) {command:02X?}");
self.outgoing.lock().await.submit(command);
log::trace!("{self} SEND (success)");
}
pub(super) fn get_dispatcher(&self) -> Dispatcher<CH> {
self.dispatcher.clone() }
}
impl<const CH: usize> Debug for Communicator<CH> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "COMMUNICATOR {{ {} }}", self.dispatcher)
}
}
impl<const CH: usize> Display for Communicator<CH> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "COMMUNICATOR {}", self.dispatcher.serial_number())
}
}