oc2_hlapi/
bus.rs

1use crate::call::{ApiCall, Call};
2use crate::device::RpcDevice;
3use crate::error::{Error, Result};
4use crate::response::{self, Response};
5use arrayvec::ArrayVec;
6use mio::unix::SourceFd;
7use mio::{Events, Interest, Poll, Token};
8use serde::Serialize;
9use std::cell::RefCell;
10use std::fs::{File, OpenOptions};
11use std::io::{ErrorKind as IoErrorKind, Read, Write};
12use std::marker::PhantomData;
13use std::os::unix::io::AsRawFd;
14use std::path::Path;
15use std::rc::Rc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use termios::Termios;
18
19static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
20
21/// The maximum size of one message that can be handled by the HLAPI in bytes. 4 KiB by default.
22pub(crate) const MAX_MESSAGE_SIZE: usize = 4096;
23
24/// The operating system's device bus. This is represented by a device file in Linux which acts as a
25/// serial console to read and write HLAPI RPC messages.
26#[derive(Clone, Debug)]
27pub struct DeviceBus(Rc<Inner>);
28
29impl DeviceBus {
30    /// Creates a new device bus at the specified path.
31    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
32        let bus = OpenOptions::new()
33            .read(true)
34            .write(true)
35            .open(path.as_ref())?;
36
37        let fd = bus.as_raw_fd();
38        let bus_token = Token(NEXT_TOKEN.fetch_add(1, Ordering::Relaxed));
39
40        // Sets options to not echo back the input to the device bus, and immediately applies that
41        // change. Without this, writing to the device bus will just hang the applicaton.
42        // Taken from https://docs.rs/miku-rpc/0.1.4/src/miku_rpc/bus.rs.html#34-37
43        let mut termios = Termios::from_fd(fd)?;
44        termios::cfmakeraw(&mut termios);
45        termios.c_lflag &= !termios::ECHO;
46        termios::tcsetattr(fd, termios::TCSANOW, &termios)?;
47
48        let poll = Poll::new()?;
49        poll.registry()
50            .register(&mut SourceFd(&fd), bus_token, Interest::READABLE)?;
51
52        Ok(Self(Rc::new(Inner {
53            bus,
54            events: RefCell::new(Events::with_capacity(16)),
55            poll: RefCell::new(poll),
56            _not_send_sync: PhantomData,
57        })))
58    }
59
60    /// Calls an RPC method. A convenience method for writing to the device bus and then reading an
61    /// RPC value returned.
62    pub fn call<T: ApiCall + Serialize>(&self, call: Call<T>) -> Result<T::Response> {
63        self.write_message(call)?;
64        self.read_message::<T>()
65    }
66
67    /// Finds a device or module by its RpcDevice identifier.
68    pub fn find<D: RpcDevice>(&self) -> Result<Option<D>> {
69        self.find_by_name(D::IDENTIFIER)
70    }
71
72    /// Finds a device or module by its name.
73    pub fn find_by_name<D: RpcDevice>(&self, name: &str) -> Result<Option<D>> {
74        let response::List(list) = self.call(Call::list())?;
75
76        let device = list
77            .iter()
78            .find(|&desc| {
79                desc.type_names
80                    .iter()
81                    .any(|identifier| &**identifier == name)
82            })
83            .map(|desc| D::new(desc.device_id, self));
84
85        Ok(device)
86    }
87
88    /// Writes an RPC message.
89    pub fn write_message<T: ApiCall + Serialize>(&self, message: Call<T>) -> Result<()> {
90        let mut write_buffer = const { ArrayVec::<_, MAX_MESSAGE_SIZE>::new_const() };
91
92        write_buffer
93            .try_push(b'\0')
94            .map_err(|_| Error::MessageLengthExceeded)?;
95
96        serde_json::to_writer(&mut write_buffer, &message).map_err(Error::from)?;
97
98        write_buffer
99            .try_push(b'\0')
100            .map_err(|_| Error::MessageLengthExceeded)?;
101
102        (&self.0.bus)
103            .write_all(write_buffer.as_slice())
104            .map_err(Error::from)?;
105
106        (&self.0.bus).flush()?;
107
108        Ok(())
109    }
110
111    /// Reads an RPC message.
112    pub fn read_message<T: ApiCall>(&self) -> Result<T::Response> {
113        let mut read_buffer = const { [0u8; MAX_MESSAGE_SIZE] };
114        let mut total_bytes = 0;
115
116        loop {
117            let bytes_read = self.read(&mut read_buffer[total_bytes..])?;
118
119            if bytes_read > 0 {
120                total_bytes += bytes_read;
121
122                if bytes_read > 1 && read_buffer[total_bytes - 1] == b'\0' {
123                    break;
124                }
125            } else {
126                return Err(Error::ReadZero);
127            }
128        }
129
130        // The message without the null bytes at the start and end.
131        let msg_slice = &read_buffer[1..total_bytes - 1];
132
133        serde_json::from_slice::<Response<T>>(msg_slice)
134            .map_err(Error::from)?
135            .into()
136    }
137
138    fn read(&self, buf: &mut [u8]) -> Result<usize> {
139        {
140            let mut poll = self.0.poll.borrow_mut();
141            let mut events = self.0.events.borrow_mut();
142            events.clear();
143
144            while let Err(e) = poll.poll(&mut events, None) {
145                if e.kind() != IoErrorKind::Interrupted {
146                    return Err(e.into());
147                }
148            }
149        }
150
151        loop {
152            let result = (&self.0.bus).read(buf);
153
154            match result {
155                Ok(n) => break Ok(n),
156                Err(e) if e.kind() != IoErrorKind::Interrupted => {
157                    break Err(e.into());
158                }
159                _ => continue,
160            }
161        }
162    }
163}
164
165#[derive(Debug)]
166struct Inner {
167    bus: File,
168    poll: RefCell<Poll>,
169    events: RefCell<Events>,
170
171    // Ensures that this struct is not Send or Sync
172    _not_send_sync: PhantomData<*mut ()>,
173}