use packbytes::{FromBytes, ToBytes, ByteArray};
use tokio::io::AsyncReadExt;
use serial2_tokio::{SerialPort, CharSize, StopBits, Parity};
use std::{
path::Path,
task::{Poll, Waker},
future::poll_fn,
collections::HashMap,
mem::transmute,
vec::Vec,
ops::{Deref, DerefMut},
time::Duration,
};
use crate::{
mutex::*,
command::{Command, MAX_COMMAND, checksum, self},
registers::{CommandError, SlaveSize, VirtualSize},
};
use super::{Error, usize_to_message};
pub struct Master {
receive: BusyMutex<SerialPort>,
transmit: BusyMutex<SerialPort>,
pending: BusyMutex<HashMap<Token, Pending>>,
timeout: Duration,
}
struct Pending {
command: Command,
buffer: &'static mut [u8],
waker: Option<Waker>,
result: Option<Result<u8, Error>>,
}
type Token = u16;
impl Master {
pub fn new(path: impl AsRef<Path>, rate: u32) -> Result<Self, std::io::Error> {
let bus1 = SerialPort::open(path, |mut settings: serial2_tokio::Settings| {
settings.set_raw();
settings.set_baud_rate(rate)?;
settings.set_char_size(CharSize::Bits8);
settings.set_stop_bits(StopBits::One);
settings.set_parity(Parity::Even);
Ok(settings)
})?;
let bus2 = bus1.try_clone()?;
Ok(Self {
receive: BusyMutex::from(bus1),
transmit: BusyMutex::from(bus2),
pending: BusyMutex::from(HashMap::new()),
timeout: Duration::from_millis(100),
})
}
pub async fn run(&self) -> Result<(), std::io::Error> {
let mut bus = self.receive.try_lock().expect("run function called twice");
let mut receive = [0u8; MAX_COMMAND];
loop {
const HEADER: usize = <Command as FromBytes>::Bytes::SIZE;
bus.read_exact(&mut receive[.. HEADER+1]).await?;
while checksum(&receive[.. HEADER]) != receive[HEADER] {
receive[.. HEADER+1].rotate_left(1);
bus.read_exact(&mut receive[HEADER .. HEADER+1]).await?;
}
let header = Command::from_be_bytes(receive[.. HEADER].try_into().unwrap());
let data = &mut receive[.. usize::from(header.size)];
bus.read_exact(data).await?;
let mut pending = self.pending.lock().await;
if let Some(buffer) = pending.get_mut(&header.token) {
if !( buffer.command.token == header.token
&& buffer.command.access.fixed() == header.access.fixed()
&& buffer.command.access.topological() == header.access.topological()
&& buffer.command.access.read() == header.access.read()
&& (buffer.command.address == header.address
|| header.access.topological()
&& buffer.command.address.register() == header.address.register())
&& buffer.command.size == header.size )
{
buffer.result = Some(Err(Error::Master("reponse header mismatch")));
}
else if header.access.error() {
buffer.result = Some(Err(Error::Slave(CommandError::Unknown)));
}
else if header.checksum != checksum(data) {
buffer.result = Some(Err(Error::Master("data checksum mismatch")));
}
else {
buffer.buffer.copy_from_slice(data);
buffer.result = Some(Ok(header.executed));
}
if let Some(waker) = buffer.waker.take() {
waker.wake();
}
}
}
}
}
pub struct Topic<'m> {
master: &'m Master,
token: Token,
#[allow(unused)] buffer: PinnedBuffer<'m>,
}
#[derive(Copy, Clone)]
pub enum Address {
Topological(u16, SlaveSize),
Fixed(u16, SlaveSize),
Virtual(VirtualSize),
}
impl<'m> Topic<'m> {
pub async fn new(master: &'m Master, address: Address, mut buffer: PinnedBuffer<'m>) -> Result<Self, Error> {
let mut pending = master.pending.lock().await;
let first = rand::random::<u16>();
let token = loop {
if let Some(token) = (0 ..= u16::try_from(pending.len()).unwrap())
.map(|i| i.wrapping_add(first))
.filter(|k| ! pending.contains_key(&k))
.next()
{break token}
};
let mut command = Command::default();
command.token = token;
command.size = usize_to_message(buffer.len())?;
match address {
Address::Topological(slave, local) => {
command.access.set_topological(true);
command.address = command::Address::new(slave, local).into();
},
Address::Fixed(slave, local) => {
command.access.set_fixed(true);
command.address = command::Address::new(slave, local).into();
},
Address::Virtual(global) => {
command.address = command::Address::from(global);
},
}
pending.insert(token, Pending {
command: command,
buffer: unsafe {transmute::<&mut [u8], &mut [u8]>(buffer.deref_mut())},
waker: None,
result: None,
});
Ok(Self{master, token, buffer})
}
pub async fn send(&self, read: bool, write: bool, data: Option<&[u8]>) -> Result<(), Error> {
let mut pending = self.master.pending.lock().await;
let buffer = pending.get_mut(&self.token).unwrap();
let data = data.unwrap_or(buffer.buffer);
buffer.command.checksum = checksum(data);
buffer.command.access.set_read(read);
buffer.command.access.set_write(write);
{
let bus = self.master.transmit.lock().await;
let header = buffer.command.to_be_bytes();
bus.write_all(&header).await?;
bus.write_all(&checksum(&header).to_be_bytes()).await?;
bus.write_all(data).await?;
}
Ok(())
}
pub async fn receive(&self, mut copy: Option<&mut [u8]>) -> Result<u8, Error> {
let polling = poll_fn(|context| {
if let Some(mut pending) = self.master.pending.try_lock() {
let buffer = pending.get_mut(&self.token).unwrap();
if let Some(result) = buffer.result.take() {
if let Some(dst) = copy.take() {
dst.copy_from_slice(buffer.buffer);
}
return Poll::Ready(result)
}
buffer.waker.replace(context.waker().clone());
}
Poll::Pending
});
tokio::time::timeout(self.master.timeout, polling).await
.map_err(|_| Error::Timeout)?
}
pub async fn get(&self, dst: &mut [u8]) {
let pending = self.master.pending.lock().await;
let buffer = pending.get(&self.token).unwrap();
dst.copy_from_slice(buffer.buffer);
}
}
impl Drop for Topic<'_> {
fn drop(&mut self) {
loop {
if let Some(mut pending) = self.master.pending.try_lock() {
pending.remove(&self.token);
break
}
std::thread::yield_now();
}
}
}
pub enum PinnedBuffer<'s> {
Borrowed(&'s mut [u8]),
Owned(Vec<u8>),
}
impl Deref for PinnedBuffer<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
match self {
Self::Borrowed(slice) => slice,
Self::Owned(vec) => vec.deref(),
}
}
}
impl DerefMut for PinnedBuffer<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::Borrowed(slice) => slice,
Self::Owned(vec) => vec.deref_mut(),
}
}
}