use std::sync::Arc;
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, task::JoinHandle, time::Instant};
use tokio_serial::SerialPortBuilderExt;
use crate::{Function, Request, Response};
use serialport::SerialPort;
pub struct QueuedMaster {
handle: JoinHandle<()>,
sender: mpsc::Sender<Job>,
}
struct Job {
request: OwnedRequest,
baud_rate: u32,
respond_to: oneshot::Sender<Result<Response, crate::error::Error>>,
}
struct OwnedRequest {
modbus_id: u8,
function: Function,
timeout: core::time::Duration,
}
impl OwnedRequest {
fn from_borrowed(req: &Request<'_>) -> Self {
Self {
modbus_id: req.modbus_id(),
function: req.function().clone(),
timeout: req.timeout(),
}
}
fn as_request(&self) -> Request<'_> {
Request::new(self.modbus_id, &self.function, self.timeout)
}
}
impl QueuedMaster {
pub async fn new_rs485(path: &str, baud_rate: u32, buffer: usize) -> tokio_serial::Result<Arc<Self>> {
let port = tokio_serial::new(path, baud_rate)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.timeout(Self::idle_time_rs485(baud_rate))
.open_native_async()?;
let (sender, receiver) = mpsc::channel::<Job>(buffer);
let handle = tokio::task::spawn(Self::task(port, baud_rate, receiver));
Ok(Arc::new(Self {
handle,
sender,
}))
}
pub async fn send(&self, req: &Request<'_>, baud_rate: u32) -> Result<Response, crate::error::Error> {
let (sender, receiver) = oneshot::channel::<Result<Response, crate::error::Error>>();
let job = Job {
request: OwnedRequest::from_borrowed(req),
baud_rate,
respond_to: sender,
};
self.sender
.send(job)
.await
.map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?;
receiver
.await
.map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?
}
async fn task(mut port: tokio_serial::SerialStream, baud_rate: u32, mut receiver: mpsc::Receiver<Job>) {
let mut baud_rate = baud_rate;
let mut last_tx: Instant = Instant::now();
while let Some(Job { request, baud_rate: req_baud_rate, respond_to }) = receiver.recv().await {
let req = request.as_request();
if req_baud_rate != baud_rate {
if let Err(e) = port.set_baud_rate(req_baud_rate) {
let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
continue;
}
baud_rate = req_baud_rate;
}
let idle_until = last_tx + Self::idle_time_rs485(baud_rate);
let now = tokio::time::Instant::now();
if idle_until > now {
tokio::time::sleep_until(idle_until).await;
}
let frame = match req.to_bytes() {
Ok(frame) => frame,
Err(e) => {
let _ = respond_to.send(Err(crate::error::Error::Request(e)));
continue;
},
};
if let Err(e) = port.clear(serialport::ClearBuffer::Output) {
let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
continue;
}
if let Err(e) = Self::write(&mut port, &frame).await {
let _ = respond_to.send(Err(e));
continue;
}
last_tx = tokio::time::Instant::now();
if req.is_broadcasting() {
let _ = respond_to.send(Ok(Response::Success));
continue;
}
let post_tx_idle = Self::idle_time_rs485(baud_rate);
tokio::time::sleep(post_tx_idle).await;
let mut buf: [u8; 256] = [0; 256];
let len = match Self::read(&mut port, &mut buf, req.timeout(), req.function().expected_len()).await {
Ok(len) => len,
Err(e) => {
let _ = respond_to.send(Err(e));
continue;
},
};
if len == 0 {
let _ = respond_to.send(Err(crate::error::Error::IO(std::io::ErrorKind::TimedOut.into())));
continue;
}
let res = Response::from_bytes(&req, &buf[0..len]).map_err(|e| crate::error::Error::Response(e));
let _ = respond_to.send(res);
}
}
async fn write(port: &mut tokio_serial::SerialStream, frame: &[u8]) -> Result<(), crate::error::Error> {
port
.write_all(frame)
.await
.map_err(|e| crate::error::Error::IO(e))?;
port
.flush()
.await
.map_err(|e| crate::error::Error::IO(e))?;
Ok(())
}
async fn read(
port: &mut tokio_serial::SerialStream,
buf: &mut [u8],
timeout: core::time::Duration,
expected_len: usize,
) -> Result<usize, crate::error::Error> {
let mut len: usize = 0;
let deadline = tokio::time::Instant::now() + timeout;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
let remaining = deadline.saturating_duration_since(now);
let read_res =
tokio::time::timeout(remaining, port.read(&mut buf[len..])).await;
let n = match read_res {
Ok(Ok(n)) => n,
Ok(Err(ref e)) if e.kind() == std::io::ErrorKind::TimedOut => {
if len == 0 {
continue;
}
if len >= 5 && buf[1] & 0x80 != 0 {
break;
}
if len < expected_len {
continue;
}
break
}
Ok(Err(e)) => return Err(crate::error::Error::IO(e)),
Err(_) => break,
};
len += n;
if len >= buf.len() {
break;
}
}
Ok(len)
}
fn idle_time_rs485(baud_rate: u32) -> core::time::Duration {
const BITS_PER_CHAR: f64 = 10.0;
let seconds = 3.5 * BITS_PER_CHAR / baud_rate as f64;
core::time::Duration::from_secs_f64(seconds)
}
}
impl Drop for QueuedMaster {
fn drop(&mut self) {
self.handle.abort();
}
}