1use crate::call::{ApiCall, Call};
2use crate::device::RpcDevice;
3use crate::error::{Error, Result};
4use crate::response::{self, Response};
5use arrayvec::ArrayVec;
6use mio::unix::SourceFd;
7use mio::{Events, Interest, Poll, Token};
8use serde::Serialize;
9use std::cell::RefCell;
10use std::fs::{File, OpenOptions};
11use std::io::{ErrorKind as IoErrorKind, Read, Write};
12use std::marker::PhantomData;
13use std::os::unix::io::AsRawFd;
14use std::path::Path;
15use std::rc::Rc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use termios::Termios;
18
19static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
20
21pub(crate) const MAX_MESSAGE_SIZE: usize = 4096;
23
24#[derive(Clone, Debug)]
27pub struct DeviceBus(Rc<Inner>);
28
29impl DeviceBus {
30 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
32 let bus = OpenOptions::new()
33 .read(true)
34 .write(true)
35 .open(path.as_ref())?;
36
37 let fd = bus.as_raw_fd();
38 let bus_token = Token(NEXT_TOKEN.fetch_add(1, Ordering::Relaxed));
39
40 let mut termios = Termios::from_fd(fd)?;
44 termios::cfmakeraw(&mut termios);
45 termios.c_lflag &= !termios::ECHO;
46 termios::tcsetattr(fd, termios::TCSANOW, &termios)?;
47
48 let poll = Poll::new()?;
49 poll.registry()
50 .register(&mut SourceFd(&fd), bus_token, Interest::READABLE)?;
51
52 Ok(Self(Rc::new(Inner {
53 bus,
54 events: RefCell::new(Events::with_capacity(16)),
55 poll: RefCell::new(poll),
56 _not_send_sync: PhantomData,
57 })))
58 }
59
60 pub fn call<T: ApiCall + Serialize>(&self, call: Call<T>) -> Result<T::Response> {
63 self.write_message(call)?;
64 self.read_message::<T>()
65 }
66
67 pub fn find<D: RpcDevice>(&self) -> Result<Option<D>> {
69 self.find_by_name(D::IDENTIFIER)
70 }
71
72 pub fn find_by_name<D: RpcDevice>(&self, name: &str) -> Result<Option<D>> {
74 let response::List(list) = self.call(Call::list())?;
75
76 let device = list
77 .iter()
78 .find(|&desc| {
79 desc.type_names
80 .iter()
81 .any(|identifier| &**identifier == name)
82 })
83 .map(|desc| D::new(desc.device_id, self));
84
85 Ok(device)
86 }
87
88 pub fn write_message<T: ApiCall + Serialize>(&self, message: Call<T>) -> Result<()> {
90 let mut write_buffer = const { ArrayVec::<_, MAX_MESSAGE_SIZE>::new_const() };
91
92 write_buffer
93 .try_push(b'\0')
94 .map_err(|_| Error::MessageLengthExceeded)?;
95
96 serde_json::to_writer(&mut write_buffer, &message).map_err(Error::from)?;
97
98 write_buffer
99 .try_push(b'\0')
100 .map_err(|_| Error::MessageLengthExceeded)?;
101
102 (&self.0.bus)
103 .write_all(write_buffer.as_slice())
104 .map_err(Error::from)?;
105
106 (&self.0.bus).flush()?;
107
108 Ok(())
109 }
110
111 pub fn read_message<T: ApiCall>(&self) -> Result<T::Response> {
113 let mut read_buffer = const { [0u8; MAX_MESSAGE_SIZE] };
114 let mut total_bytes = 0;
115
116 loop {
117 let bytes_read = self.read(&mut read_buffer[total_bytes..])?;
118
119 if bytes_read > 0 {
120 total_bytes += bytes_read;
121
122 if bytes_read > 1 && read_buffer[total_bytes - 1] == b'\0' {
123 break;
124 }
125 } else {
126 return Err(Error::ReadZero);
127 }
128 }
129
130 let msg_slice = &read_buffer[1..total_bytes - 1];
132
133 serde_json::from_slice::<Response<T>>(msg_slice)
134 .map_err(Error::from)?
135 .into()
136 }
137
138 fn read(&self, buf: &mut [u8]) -> Result<usize> {
139 {
140 let mut poll = self.0.poll.borrow_mut();
141 let mut events = self.0.events.borrow_mut();
142 events.clear();
143
144 while let Err(e) = poll.poll(&mut events, None) {
145 if e.kind() != IoErrorKind::Interrupted {
146 return Err(e.into());
147 }
148 }
149 }
150
151 loop {
152 let result = (&self.0.bus).read(buf);
153
154 match result {
155 Ok(n) => break Ok(n),
156 Err(e) if e.kind() != IoErrorKind::Interrupted => {
157 break Err(e.into());
158 }
159 _ => continue,
160 }
161 }
162 }
163}
164
165#[derive(Debug)]
166struct Inner {
167 bus: File,
168 poll: RefCell<Poll>,
169 events: RefCell<Events>,
170
171 _not_send_sync: PhantomData<*mut ()>,
173}