#![allow(clippy::missing_errors_doc)]
#[cfg(feature = "wire")]
use std::io::{self, BufRead, BufReader, Read, Write};
#[cfg(feature = "wire")]
use std::process::{Child, Command as ProcessCommand, Stdio};
#[cfg(feature = "wire")]
use std::sync::Arc;
#[cfg(feature = "wire")]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "wire")]
use std::sync::mpsc;
#[cfg(feature = "wire")]
use std::thread::{self, JoinHandle};
#[cfg(feature = "wire")]
use std::time::Duration;
#[cfg(feature = "wire")]
use plushie_core::outgoing_message::OutgoingMessage;
#[cfg(feature = "wire")]
use serde_json::Value;
#[cfg(feature = "wire")]
use super::socket::SocketStream;
#[cfg(feature = "wire")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Codec {
Json,
MsgPack,
}
#[cfg(feature = "wire")]
type BoxedReader = Box<dyn Read + Send>;
#[cfg(feature = "wire")]
enum Transport {
Subprocess {
child: Child,
stdin: Option<Box<dyn Write + Send>>,
},
Socket {
writer: Box<dyn Write + Send>,
shutdown_handle: SocketStream,
},
}
#[cfg(feature = "wire")]
pub struct Bridge {
transport: Transport,
codec: Codec,
sync_reader: Option<BufReader<BoxedReader>>,
reader: Option<ReaderHandle>,
}
#[cfg(feature = "wire")]
struct ReaderHandle {
rx: mpsc::Receiver<io::Result<Value>>,
stop: Arc<AtomicBool>,
thread: Option<JoinHandle<()>>,
}
#[cfg(feature = "wire")]
pub enum Incoming {
Message(Value),
Error(io::Error),
Timeout,
}
#[cfg(feature = "wire")]
impl Bridge {
pub fn spawn(binary_path: &str) -> io::Result<Self> {
let mut child = ProcessCommand::new(binary_path)
.env_clear()
.envs(crate::runner::env::renderer_env())
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "stdout unavailable"))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "stdin unavailable"))?;
let sync_reader: BufReader<BoxedReader> =
BufReader::with_capacity(64 * 1024, Box::new(stdout));
Ok(Self {
transport: Transport::Subprocess {
child,
stdin: Some(Box::new(stdin)),
},
codec: Codec::Json,
sync_reader: Some(sync_reader),
reader: None,
})
}
pub fn connect(stream: SocketStream) -> io::Result<Self> {
let read_half = stream.try_clone()?;
let write_half = stream.try_clone()?;
let sync_reader: BufReader<BoxedReader> =
BufReader::with_capacity(64 * 1024, Box::new(read_half));
Ok(Self {
transport: Transport::Socket {
writer: Box::new(write_half),
shutdown_handle: stream,
},
codec: Codec::Json,
sync_reader: Some(sync_reader),
reader: None,
})
}
pub fn start_reader(&mut self) -> io::Result<()> {
if self.reader.is_some() {
return Ok(());
}
let reader = self
.sync_reader
.take()
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "reader already taken"))?;
let (tx, rx) = mpsc::sync_channel::<io::Result<Value>>(256);
let stop = Arc::new(AtomicBool::new(false));
let stop_for_thread = stop.clone();
let codec = self.codec;
let thread = thread::Builder::new()
.name("plushie-wire-reader".into())
.spawn(move || reader_loop(reader, codec, tx, stop_for_thread))?;
self.reader = Some(ReaderHandle {
rx,
stop,
thread: Some(thread),
});
Ok(())
}
pub fn recv_timeout(&mut self, timeout: Option<Duration>) -> Incoming {
let Some(reader) = self.reader.as_ref() else {
return Incoming::Error(io::Error::other("reader not started"));
};
let recv_result = match timeout {
Some(dur) => reader.rx.recv_timeout(dur),
None => reader
.rx
.recv()
.map_err(|_| mpsc::RecvTimeoutError::Disconnected),
};
match recv_result {
Ok(Ok(msg)) => Incoming::Message(msg),
Ok(Err(e)) => Incoming::Error(e),
Err(mpsc::RecvTimeoutError::Timeout) => Incoming::Timeout,
Err(mpsc::RecvTimeoutError::Disconnected) => Incoming::Error(io::Error::new(
io::ErrorKind::UnexpectedEof,
"reader disconnected",
)),
}
}
pub fn stop_reader(&mut self) {
if let Some(mut reader) = self.reader.take() {
reader.stop.store(true, Ordering::SeqCst);
if let Some(handle) = reader.thread.take() {
let _ = handle.join();
}
}
}
fn writer_mut(&mut self) -> io::Result<&mut dyn Write> {
match &mut self.transport {
Transport::Subprocess { stdin, .. } => stdin
.as_deref_mut()
.map(|w| w as &mut dyn Write)
.ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed")),
Transport::Socket { writer, .. } => Ok(writer.as_mut()),
}
}
pub fn send(&mut self, message: &OutgoingMessage) -> crate::Result {
let codec = self.codec;
let writer = self.writer_mut().map_err(crate::Error::Io)?;
match codec {
Codec::Json => {
let json = serde_json::to_string(message)
.map_err(|e| crate::Error::WireEncode(e.to_string()))?;
writeln!(writer, "{json}")?;
writer.flush()?;
}
Codec::MsgPack => {
let bytes = rmp_serde::to_vec_named(message)
.map_err(|e| crate::Error::WireEncode(e.to_string()))?;
let len = (bytes.len() as u32).to_be_bytes();
writer.write_all(&len)?;
writer.write_all(&bytes)?;
writer.flush()?;
}
}
Ok(())
}
pub fn send_load_font(&mut self, family: &str, bytes: &[u8]) -> crate::Result {
let codec = self.codec;
let writer = self.writer_mut().map_err(crate::Error::Io)?;
match codec {
Codec::Json => {
use base64::Engine;
let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
let message = OutgoingMessage::LoadFont {
session: String::new(),
payload: serde_json::json!({
"family": family,
"data": b64,
}),
};
let json = serde_json::to_string(&message)
.map_err(|e| crate::Error::WireEncode(e.to_string()))?;
writeln!(writer, "{json}")?;
writer.flush()?;
}
Codec::MsgPack => {
use rmpv::Value as V;
let payload = V::Map(vec![
(V::String("family".into()), V::String(family.into())),
(V::String("data".into()), V::Binary(bytes.to_vec())),
]);
let message = V::Map(vec![
(V::String("type".into()), V::String("load_font".into())),
(V::String("session".into()), V::String(String::new().into())),
(V::String("payload".into()), payload),
]);
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &message)
.map_err(|e| crate::Error::WireEncode(e.to_string()))?;
let len = u32::try_from(buf.len())
.map_err(|_| crate::Error::WireEncode("frame exceeds 4 GiB".into()))?;
writer.write_all(&len.to_be_bytes())?;
writer.write_all(&buf)?;
writer.flush()?;
}
}
Ok(())
}
pub fn send_settings(&mut self, settings: &Value) -> crate::Result {
self.send(&OutgoingMessage::Settings {
session: String::new(),
settings: settings.clone(),
})
}
pub fn send_snapshot(&mut self, tree: &Value) -> crate::Result {
self.send(&OutgoingMessage::Snapshot {
session: String::new(),
tree: tree.clone(),
})
}
pub fn send_patch(&mut self, ops: &[Value]) -> crate::Result {
self.send(&OutgoingMessage::Patch {
session: String::new(),
ops: ops.to_vec(),
})
}
pub fn send_subscribe(
&mut self,
kind: &str,
tag: &str,
max_rate: Option<u32>,
window_id: Option<&str>,
) -> crate::Result {
self.send(&OutgoingMessage::Subscribe {
session: String::new(),
kind: kind.to_string(),
tag: tag.to_string(),
max_rate,
window_id: window_id.map(String::from),
})
}
pub fn send_unsubscribe(&mut self, kind: &str, tag: &str) -> crate::Result {
self.send(&OutgoingMessage::Unsubscribe {
session: String::new(),
kind: kind.to_string(),
tag: tag.to_string(),
})
}
pub fn send_widget_op(&mut self, op: &str, payload: &Value) -> crate::Result {
self.send(&OutgoingMessage::WidgetOp {
session: String::new(),
op: op.to_string(),
payload: payload.clone(),
})
}
pub fn send_command(&mut self, id: &str, family: &str, value: &Value) -> crate::Result {
self.send(&OutgoingMessage::Command {
session: String::new(),
id: id.to_string(),
family: family.to_string(),
value: value.clone(),
})
}
pub fn send_commands(
&mut self,
commands: Vec<plushie_core::ops::WidgetCommand>,
) -> crate::Result {
self.send(&OutgoingMessage::Commands {
session: String::new(),
commands,
})
}
pub fn send_window_op(&mut self, op: &str, window_id: &str, payload: &Value) -> crate::Result {
self.send(&OutgoingMessage::WindowOp {
session: String::new(),
op: op.to_string(),
window_id: window_id.to_string(),
payload: payload.clone(),
})
}
pub fn send_effect(&mut self, id: &str, kind: &str, payload: &Value) -> crate::Result {
self.send(&OutgoingMessage::Effect {
session: String::new(),
id: id.to_string(),
kind: kind.to_string(),
payload: payload.clone(),
})
}
pub fn send_interact(
&mut self,
id: &str,
action: &str,
selector: &Value,
payload: &Value,
) -> crate::Result {
self.send(&OutgoingMessage::Interact {
session: String::new(),
id: id.to_string(),
action: action.to_string(),
selector: selector.clone(),
payload: payload.clone(),
})
}
pub fn send_query(
&mut self,
id: &str,
target: &str,
selector: Option<&Value>,
) -> crate::Result {
self.send(&OutgoingMessage::Query {
session: String::new(),
id: id.to_string(),
target: target.to_string(),
selector: selector.cloned(),
})
}
pub fn send_reset(&mut self, id: &str) -> crate::Result {
self.send(&OutgoingMessage::Reset {
session: String::new(),
id: id.to_string(),
})
}
pub fn send_register_effect_stub(&mut self, kind: &str, response: &Value) -> crate::Result {
self.send(&OutgoingMessage::RegisterEffectStub {
session: String::new(),
kind: kind.to_string(),
response: response.clone(),
})
}
pub fn send_unregister_effect_stub(&mut self, kind: &str) -> crate::Result {
self.send(&OutgoingMessage::UnregisterEffectStub {
session: String::new(),
kind: kind.to_string(),
})
}
pub fn receive(&mut self) -> io::Result<Value> {
let codec = self.codec;
let reader = self.sync_reader.as_mut().ok_or_else(|| {
io::Error::new(io::ErrorKind::BrokenPipe, "reader already handed off")
})?;
read_one(reader, codec)
}
pub fn is_alive(&mut self) -> bool {
match &mut self.transport {
Transport::Subprocess { child, .. } => child.try_wait().ok().flatten().is_none(),
Transport::Socket { .. } => true,
}
}
pub fn kill(&mut self) -> io::Result<()> {
match &mut self.transport {
Transport::Subprocess { child, .. } => child.kill(),
Transport::Socket {
shutdown_handle, ..
} => {
shutdown_handle.shutdown();
Ok(())
}
}
}
pub fn try_reap(&mut self) -> Option<i32> {
match &mut self.transport {
Transport::Subprocess { child, .. } => match child.try_wait() {
Ok(Some(status)) => status.code(),
_ => None,
},
Transport::Socket { .. } => None,
}
}
pub fn wait(&mut self) -> io::Result<Option<i32>> {
match &mut self.transport {
Transport::Subprocess { child, .. } => Ok(child.wait()?.code()),
Transport::Socket { .. } => Ok(None),
}
}
pub fn set_codec(&mut self, codec: Codec) {
self.codec = codec;
}
}
#[cfg(feature = "wire")]
impl Drop for Bridge {
fn drop(&mut self) {
if let Some(reader) = self.reader.as_ref() {
reader.stop.store(true, Ordering::SeqCst);
}
{
match &mut self.transport {
Transport::Subprocess { child, stdin } => {
drop(stdin.take());
const GRACE: Duration = Duration::from_millis(500);
let deadline = std::time::Instant::now() + GRACE;
loop {
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if std::time::Instant::now() >= deadline => break,
Ok(None) => std::thread::sleep(Duration::from_millis(10)),
Err(_) => break,
}
}
if matches!(child.try_wait(), Ok(None)) {
let _ = child.kill();
}
}
Transport::Socket {
writer,
shutdown_handle,
} => {
drop(std::mem::replace(writer, Box::new(io::sink())));
shutdown_handle.shutdown();
}
}
}
self.stop_reader();
if let Transport::Subprocess { child, .. } = &mut self.transport {
let _ = child.wait();
}
}
}
#[cfg(feature = "wire")]
fn read_one<R: Read>(reader: &mut BufReader<R>, codec: Codec) -> io::Result<Value> {
const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
match codec {
Codec::Json => {
let mut line = String::new();
let limit = (MAX_MESSAGE_SIZE + 1) as u64;
let n = reader.take(limit).read_line(&mut line)?;
if n == 0 && line.is_empty() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"renderer closed",
));
}
if line.len() > MAX_MESSAGE_SIZE {
let diag = plushie_core::Diagnostic::BufferOverflow {
size: line.len(),
limit: MAX_MESSAGE_SIZE,
};
plushie_core::diagnostics::error(diag.clone());
return Err(io::Error::new(io::ErrorKind::InvalidData, diag.to_string()));
}
serde_json::from_str(&line).map_err(io::Error::other)
}
Codec::MsgPack => {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > MAX_MESSAGE_SIZE {
let diag = plushie_core::Diagnostic::BufferOverflow {
size: len,
limit: MAX_MESSAGE_SIZE,
};
plushie_core::diagnostics::error(diag.clone());
return Err(io::Error::new(io::ErrorKind::InvalidData, diag.to_string()));
}
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf)?;
if let Err(e) = plushie_core::codec_safety::check_msgpack_depth(
&buf,
plushie_core::codec_safety::MAX_RMPV_DEPTH,
) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("msgpack depth check: {e}"),
));
}
rmp_serde::from_slice(&buf).map_err(io::Error::other)
}
}
}
#[cfg(feature = "wire")]
fn reader_loop(
mut reader: BufReader<BoxedReader>,
codec: Codec,
tx: mpsc::SyncSender<io::Result<Value>>,
stop: Arc<AtomicBool>,
) {
loop {
if stop.load(Ordering::SeqCst) {
return;
}
let result = read_one(&mut reader, codec);
let is_err = result.is_err();
if tx.send(result).is_err() {
return;
}
if is_err {
return;
}
}
}
#[cfg(all(test, feature = "wire"))]
mod tests {
use super::*;
use std::io::{Cursor, Read};
use std::net::{TcpListener, TcpStream};
fn bridge_socket_pair() -> (SocketStream, TcpStream) {
let listener = TcpListener::bind(("127.0.0.1", 0)).expect("bind loopback listener");
let addr = listener.local_addr().expect("listener address");
let client = TcpStream::connect(addr).expect("connect client");
let (server, _) = listener.accept().expect("accept server");
(SocketStream::Tcp(client), server)
}
fn read_msgpack_frame(reader: &mut impl Read) -> Vec<u8> {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf).expect("read frame length");
let len = u32::from_be_bytes(len_buf) as usize;
let mut payload = vec![0u8; len];
reader.read_exact(&mut payload).expect("read frame payload");
payload
}
#[test]
fn read_one_json_back_to_back_messages_are_both_decoded() {
let bytes = b"{\"type\":\"hello\",\"n\":1}\n{\"type\":\"hello\",\"n\":2}\n";
let cursor = Cursor::new(bytes.to_vec());
let mut reader = BufReader::new(cursor);
let first = read_one(&mut reader, Codec::Json).expect("first decode");
let second = read_one(&mut reader, Codec::Json).expect("second decode");
assert_eq!(first.get("n").and_then(|v| v.as_u64()), Some(1));
assert_eq!(second.get("n").and_then(|v| v.as_u64()), Some(2));
}
#[test]
fn read_one_msgpack_rejects_oversized_length_prefix() {
let oversize = (64 * 1024 * 1024 + 1) as u32;
let bytes = oversize.to_be_bytes().to_vec();
let mut reader = BufReader::new(Cursor::new(bytes));
let err = read_one(&mut reader, Codec::MsgPack).expect_err("expected overflow error");
let msg = err.to_string();
assert!(
msg.contains("buffer_overflow"),
"unexpected error text: {msg}"
);
}
#[test]
fn read_one_json_rejects_oversized_line() {
let payload: Vec<u8> = std::iter::repeat_n(b'x', 70 * 1024 * 1024).collect();
let mut bytes = payload;
bytes.push(b'\n');
let mut reader = BufReader::new(Cursor::new(bytes));
let err = read_one(&mut reader, Codec::Json).expect_err("expected overflow error");
assert!(
err.to_string().contains("buffer_overflow"),
"unexpected error text: {err}"
);
}
#[test]
fn read_one_msgpack_back_to_back_messages_are_both_decoded() {
use serde_json::json;
fn frame(value: &Value) -> Vec<u8> {
let bytes = rmp_serde::to_vec_named(value).unwrap();
let mut buf = (bytes.len() as u32).to_be_bytes().to_vec();
buf.extend_from_slice(&bytes);
buf
}
let mut bytes = Vec::new();
bytes.extend_from_slice(&frame(&json!({"type": "hello", "n": 1})));
bytes.extend_from_slice(&frame(&json!({"type": "hello", "n": 2})));
let mut reader = BufReader::new(Cursor::new(bytes));
let first = read_one(&mut reader, Codec::MsgPack).expect("first decode");
let second = read_one(&mut reader, Codec::MsgPack).expect("second decode");
assert_eq!(first.get("n").and_then(|v| v.as_u64()), Some(1));
assert_eq!(second.get("n").and_then(|v| v.as_u64()), Some(2));
}
#[test]
fn send_msgpack_uses_renderer_named_message_shape() {
use serde_json::json;
let (client, mut server) = bridge_socket_pair();
let mut bridge = Bridge::connect(client).expect("connect bridge");
bridge.set_codec(Codec::MsgPack);
let message = OutgoingMessage::Settings {
session: String::new(),
settings: json!({
"protocol_version": 1,
"app_id": "test",
}),
};
bridge.send(&message).expect("send message");
let payload = read_msgpack_frame(&mut server);
assert_eq!(payload, rmp_serde::to_vec_named(&message).unwrap());
let decoded: Value = rmp_serde::from_slice(&payload).expect("decode payload");
assert_eq!(
decoded.get("type").and_then(Value::as_str),
Some("settings")
);
assert!(decoded.get("settings").is_some());
}
}