use std::io::{BufRead, BufReader, Read};
use std::sync::Mutex;
use std::thread;
use iced::futures::SinkExt;
use iced::stream;
use plushie_core::codec::Codec;
use plushie_core::message::StdinEvent;
use plushie_core::protocol::IncomingMessage;
use serde_json::Value;
pub(crate) type TransportReader = BufReader<Box<dyn Read + Send>>;
fn startup_exit(codec: &Codec, message: &str) -> ! {
log::error!("{message}");
let error = serde_json::json!({"type": "error", "message": message});
if let Ok(bytes) = codec.encode(&error) {
let _ = plushie_renderer::emitters::write_output(&bytes);
}
std::process::exit(1);
}
fn empty_settings(
reader: TransportReader,
) -> (Value, iced::Settings, Vec<Vec<u8>>, TransportReader) {
(
Value::Object(Default::default()),
iced::Settings::default(),
Vec::new(),
reader,
)
}
pub(crate) static STDIN_RX: Mutex<Option<tokio::sync::mpsc::Receiver<StdinEvent>>> =
Mutex::new(None);
pub(crate) fn stdin_subscription() -> impl iced::futures::Stream<Item = StdinEvent> {
stream::channel(32, async |mut sender| {
let mut rx = STDIN_RX
.lock()
.expect("STDIN_RX lock poisoned")
.take()
.expect("stdin_subscription: no receiver (called more than once?)");
while let Some(event) = rx.recv().await {
if sender.send(event).await.is_err() {
break;
}
}
})
}
pub(crate) fn spawn_stdin_reader(
sender: tokio::sync::mpsc::Sender<StdinEvent>,
mut reader: TransportReader,
) {
thread::spawn(move || {
let codec = Codec::get_global();
loop {
match codec.read_message(&mut reader) {
Ok(None) => {
let _ = sender.blocking_send(StdinEvent::Closed);
break;
}
Ok(Some(bytes)) => match codec.decode::<IncomingMessage>(&bytes) {
Ok(msg) => {
if sender.blocking_send(StdinEvent::Message(msg)).is_err() {
return;
}
}
Err(e) => {
let warning = format!("parse error: {e}");
if sender.blocking_send(StdinEvent::Warning(warning)).is_err() {
return;
}
}
},
Err(e) => {
let _ = sender.blocking_send(StdinEvent::Warning(format!("read error: {e}")));
let _ = sender.blocking_send(StdinEvent::Closed);
break;
}
}
}
});
}
pub(crate) fn read_initial_settings(
forced_codec: Option<Codec>,
mut reader: TransportReader,
) -> (Value, iced::Settings, Vec<Vec<u8>>, TransportReader) {
let codec = match forced_codec {
Some(c) => c,
None => {
let buf = match reader.fill_buf() {
Ok(buf) if !buf.is_empty() => buf,
Ok(_) => {
log::error!("stdin closed before settings received");
Codec::set_global(Codec::MsgPack);
return empty_settings(reader);
}
Err(e) => {
log::error!("stdin closed before settings received: {e}");
Codec::set_global(Codec::MsgPack);
return empty_settings(reader);
}
};
Codec::detect_from_first_byte(buf[0])
}
};
log::info!("wire codec: {codec}");
Codec::set_global(codec);
let payload = match codec.read_message(&mut reader) {
Ok(Some(bytes)) => bytes,
Ok(None) => {
log::error!("stdin closed before settings received");
return empty_settings(reader);
}
Err(e) => {
log::error!("failed to read initial settings: {e}");
return empty_settings(reader);
}
};
let msg: IncomingMessage = match codec.decode(&payload) {
Ok(m) => m,
Err(err) => {
startup_exit(&codec, &format!("failed to decode initial settings: {err}"));
}
};
match msg {
IncomingMessage::Settings { settings } => {
log::info!("initial settings received");
let expected = u64::from(plushie_core::protocol::PROTOCOL_VERSION);
if let Some(version) = settings.get("protocol_version").and_then(|v| v.as_u64()) {
if version != expected {
startup_exit(
&codec,
&format!(
"protocol version mismatch: host sent {version}, renderer expects {expected}"
),
);
}
} else {
log::warn!(
"no protocol_version in Settings, assuming compatible (expected {})",
expected
);
}
plushie_renderer::settings::apply_validate_props(&settings);
let iced_settings = plushie_renderer::settings::parse_iced_settings(&settings);
let mut font_bytes = plushie_renderer::settings::parse_inline_fonts(&settings);
if let Some(fonts) = settings.get("fonts").and_then(|v| v.as_array()) {
for font_val in fonts {
if let Some(path) = font_val.as_str() {
match std::fs::read(path) {
Ok(bytes) => {
log::info!("loaded font: {path}");
font_bytes.push(bytes);
}
Err(e) => {
log::error!("failed to load font {path}: {e}");
}
}
}
}
}
(settings, iced_settings, font_bytes, reader)
}
other => {
let variant = match &other {
IncomingMessage::Snapshot { .. } => "snapshot",
IncomingMessage::Patch { .. } => "patch",
IncomingMessage::Effect { .. } => "effect",
IncomingMessage::WidgetOp { .. } => "widget_op",
IncomingMessage::Subscribe { .. } => "subscribe",
IncomingMessage::Unsubscribe { .. } => "unsubscribe",
IncomingMessage::WindowOp { .. } => "window_op",
IncomingMessage::Settings { .. } => "settings",
IncomingMessage::Query { .. } => "query",
IncomingMessage::Interact { .. } => "interact",
IncomingMessage::TreeHash { .. } => "tree_hash",
IncomingMessage::Screenshot { .. } => "screenshot",
IncomingMessage::Reset { .. } => "reset",
IncomingMessage::ImageOp { .. } => "image_op",
IncomingMessage::ExtensionCommand { .. } => "extension_command",
IncomingMessage::ExtensionCommands { .. } => "extension_commands",
IncomingMessage::AdvanceFrame { .. } => "advance_frame",
};
log::error!("expected settings as first message, got {variant}");
empty_settings(reader)
}
}
}