use serde_json::Value;
use std::io::{BufRead, BufReader, Read, Write};
use std::sync::Mutex;
use thiserror::Error;
use tokio::sync::mpsc;
#[derive(Debug, Error)]
pub enum TransportError {
#[error("transport closed")]
Closed,
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("json: {0}")]
Json(#[from] serde_json::Error),
}
pub struct ReaderHandle {
rx: mpsc::UnboundedReceiver<Result<Value, TransportError>>,
}
impl ReaderHandle {
pub async fn recv(&mut self) -> Option<Result<Value, TransportError>> {
self.rx.recv().await
}
}
pub struct WriterHandle {
inner: Mutex<Box<dyn Write + Send>>,
}
impl WriterHandle {
pub fn send(&self, value: &Value) -> Result<(), TransportError> {
let payload = serde_json::to_vec(value)?;
let mut guard = self.inner.lock().map_err(|_| TransportError::Closed)?;
guard.write_all(&payload)?;
guard.write_all(b"\0")?;
guard.flush()?;
Ok(())
}
}
pub struct Transport {
pub reader: ReaderHandle,
pub writer: WriterHandle,
}
impl Transport {
pub fn new<R, W>(read: R, write: W) -> Self
where
R: Read + Send + 'static,
W: Write + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
std::thread::Builder::new()
.name("webkit-reader".into())
.spawn(move || drain_reader(read, &tx))
.unwrap_or_else(|e| panic!("spawn webkit-reader: {e}"));
Transport {
reader: ReaderHandle { rx },
writer: WriterHandle {
inner: Mutex::new(Box::new(write)),
},
}
}
}
fn drain_reader<R: Read>(read: R, tx: &mpsc::UnboundedSender<Result<Value, TransportError>>) {
let mut buf = BufReader::new(read);
loop {
let mut frame = Vec::with_capacity(1024);
match buf.read_until(0, &mut frame) {
Ok(0) => break, Ok(_) => {
if frame.last() == Some(&0) {
frame.pop();
}
if frame.is_empty() {
continue;
}
let parsed = serde_json::from_slice::<Value>(&frame).map_err(TransportError::Json);
if tx.send(parsed).is_err() {
break;
}
},
Err(e) => {
let _ = tx.send(Err(TransportError::Io(e)));
break;
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use std::sync::Arc;
struct WriterRef(Arc<Mutex<Vec<u8>>>);
impl Write for WriterRef {
fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(b);
Ok(b.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn reader_decodes_nul_delimited_frames() {
let payload = b"{\"id\":1}\0{\"method\":\"Foo\"}\0";
let mut transport = Transport::new(Cursor::new(payload.to_vec()), Vec::<u8>::new());
let first = transport.reader.recv().await.unwrap().unwrap();
assert_eq!(first["id"], 1);
let second = transport.reader.recv().await.unwrap().unwrap();
assert_eq!(second["method"], "Foo");
assert!(transport.reader.recv().await.is_none());
}
#[tokio::test]
async fn writer_appends_nul() {
let buf_handle = Arc::new(Mutex::new(Vec::<u8>::new()));
let transport = Transport::new(Cursor::new(Vec::<u8>::new()), WriterRef(buf_handle.clone()));
transport.writer.send(&serde_json::json!({"id": 42})).unwrap();
let buf = buf_handle.lock().unwrap();
assert_eq!(&buf[..], b"{\"id\":42}\0");
}
}