use crate::call::{ApiCall, Call};
use crate::device::RpcDevice;
use crate::error::{Error, Result};
use crate::response::{self, Response};
use arrayvec::ArrayVec;
use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token};
use serde::Serialize;
use std::cell::RefCell;
use std::fs::{File, OpenOptions};
use std::io::{ErrorKind as IoErrorKind, Read, Write};
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use termios::Termios;
static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
pub(crate) const MAX_MESSAGE_SIZE: usize = 4096;
#[derive(Clone, Debug)]
pub struct DeviceBus(Rc<Inner>);
impl DeviceBus {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let bus = OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())?;
let fd = bus.as_raw_fd();
let bus_token = Token(NEXT_TOKEN.fetch_add(1, Ordering::Relaxed));
let mut termios = Termios::from_fd(fd)?;
termios::cfmakeraw(&mut termios);
termios.c_lflag &= !termios::ECHO;
termios::tcsetattr(fd, termios::TCSANOW, &termios)?;
let poll = Poll::new()?;
poll.registry()
.register(&mut SourceFd(&fd), bus_token, Interest::READABLE)?;
Ok(Self(Rc::new(Inner {
bus,
events: RefCell::new(Events::with_capacity(16)),
poll: RefCell::new(poll),
_not_send_sync: PhantomData,
})))
}
pub fn call<T: ApiCall + Serialize>(&self, call: Call<T>) -> Result<Response<T>> {
self.write_message(call)?;
self.read_message()
}
pub fn find<D: RpcDevice>(&self) -> Result<Option<D>> {
self.find_by_name(D::IDENTIFIER)
}
pub fn find_by_name<D: RpcDevice>(&self, name: &str) -> Result<Option<D>> {
let list_result = self.call(Call::list())?.into();
let response::List(list) = match list_result {
Ok(response) => response,
Err(e) => return Err(e.into()),
};
let device = list
.iter()
.find(|&desc| {
desc.type_names
.iter()
.any(|identifier| &**identifier == name)
})
.map(|desc| D::new(desc.device_id, self));
Ok(device)
}
pub fn write_message<T: ApiCall + Serialize>(&self, message: Call<T>) -> Result<()> {
let mut write_buffer = const { ArrayVec::<_, MAX_MESSAGE_SIZE>::new_const() };
write_buffer
.try_push(b'\0')
.map_err(|_| Error::MessageLengthExceeded)?;
serde_json::to_writer(&mut write_buffer, &message).map_err(Error::from)?;
write_buffer
.try_push(b'\0')
.map_err(|_| Error::MessageLengthExceeded)?;
(&self.0.bus)
.write_all(write_buffer.as_slice())
.map_err(Error::from)
}
pub fn read_message<T: ApiCall>(&self) -> Result<Response<T>> {
let mut read_buffer = const { ArrayVec::<_, MAX_MESSAGE_SIZE>::new_const() };
let mut total_bytes = 0;
loop {
let bytes_read = self.read(&mut read_buffer)?;
if bytes_read > 0 {
total_bytes += bytes_read;
if read_buffer.len() > 1 && read_buffer.last().is_some_and(|&byte| byte == b'\0') {
break;
}
} else {
return Err(Error::ReadZero);
}
}
let msg_slice = &read_buffer[1..total_bytes - 1];
let response = serde_json::from_slice(msg_slice).map_err(Error::from)?;
Ok(response)
}
fn read(&self, buf: &mut ArrayVec<u8, MAX_MESSAGE_SIZE>) -> Result<usize> {
{
let mut poll = self.0.poll.borrow_mut();
let mut events = self.0.events.borrow_mut();
events.clear();
while let Err(e) = poll.poll(&mut events, None) {
if e.kind() != IoErrorKind::Interrupted {
return Err(e.into());
}
}
}
loop {
let result = (&self.0.bus).read(buf);
match result {
Ok(n) => break Ok(n),
Err(e) if e.kind() != IoErrorKind::Interrupted => {
break Err(e.into());
}
_ => continue,
}
}
}
}
#[derive(Debug)]
struct Inner {
bus: File,
poll: RefCell<Poll>,
events: RefCell<Events>,
_not_send_sync: PhantomData<*mut ()>,
}