#[cfg(feature = "wire")]
use plushie_core::outgoing_message::OutgoingMessage;
#[cfg(feature = "wire")]
use serde_json::Value;
#[cfg(feature = "wire")]
use std::collections::HashMap;
#[cfg(feature = "wire")]
use std::io;
#[cfg(feature = "wire")]
use super::bridge::Bridge;
#[cfg(feature = "wire")]
use super::effect_tracker::{self, EffectTracker};
#[cfg(feature = "wire")]
use super::event_bridge::SinkEvent;
#[cfg(feature = "wire")]
use crate::App;
#[cfg(feature = "wire")]
use crate::command::Command;
#[cfg(feature = "wire")]
use crate::event::{EffectEvent, EffectResult, Event};
#[cfg(feature = "wire")]
use crate::runtime::tree_diff;
#[cfg(feature = "wire")]
use crate::settings::ExitReason;
#[cfg(feature = "wire")]
fn hello_protocol_version(hello: &Value) -> Option<u32> {
hello
.get("protocol_version")
.or_else(|| hello.get("protocol"))
.and_then(plushie_core::protocol::json_protocol_version)
}
#[cfg(feature = "wire")]
fn hello_string_field<'a>(hello: &'a Value, field: &str) -> &'a str {
hello
.get(field)
.and_then(|value| value.as_str())
.unwrap_or("unknown")
}
#[cfg(feature = "wire")]
fn hello_optional_string_field<'a>(hello: &'a Value, field: &str) -> &'a str {
hello
.get(field)
.and_then(|value| value.as_str())
.unwrap_or("None")
}
#[cfg(feature = "wire")]
fn renderer_hello_log_message(hello: &Value) -> String {
let protocol = hello_protocol_version(hello)
.map(|version| version.to_string())
.unwrap_or_else(|| "unknown".to_string());
format!(
"renderer hello: name={} protocol={} codec={} mode={} backend={} transport={}",
hello_string_field(hello, "name"),
protocol,
hello_string_field(hello, "codec"),
hello_string_field(hello, "mode"),
hello_string_field(hello, "backend"),
hello_optional_string_field(hello, "transport")
)
}
#[cfg(feature = "wire")]
fn log_renderer_hello(hello: &Value) {
log::info!("{}", renderer_hello_log_message(hello));
}
#[cfg(feature = "wire")]
pub fn run_wire<A: App>(binary_path: &str) -> crate::Result {
run_wire_inner::<A>(binary_path, None)
}
#[cfg(feature = "wire")]
pub fn run_wire_with_runtime<A: App>(
binary_path: &str,
runtime: tokio::runtime::Handle,
) -> crate::Result {
run_wire_inner::<A>(binary_path, Some(runtime))
}
#[cfg(feature = "wire")]
pub fn run_connect<A: App>(opts: crate::ConnectOpts) -> crate::Result {
run_connect_inner::<A>(opts, None)
}
#[cfg(feature = "wire")]
pub fn run_connect_with_runtime<A: App>(
opts: crate::ConnectOpts,
runtime: tokio::runtime::Handle,
) -> crate::Result {
run_connect_inner::<A>(opts, Some(runtime))
}
#[cfg(feature = "wire")]
fn run_connect_inner<A: App>(
opts: crate::ConnectOpts,
runtime: Option<tokio::runtime::Handle>,
) -> crate::Result {
let socket_str = opts
.socket
.clone()
.or_else(|| std::env::var("PLUSHIE_SOCKET").ok())
.ok_or_else(|| {
crate::Error::InvalidSettings(
"no socket address supplied: pass `ConnectOpts.socket`, set \
PLUSHIE_SOCKET, or use `--plushie-socket <path>`"
.to_string(),
)
})?;
let adapter = super::socket::SocketAdapter::connect(&socket_str)?;
log::info!(
"plushie::run_connect: connected to renderer at {:?} (token: {})",
adapter.addr,
if opts.token.is_some() {
"present"
} else {
"none"
}
);
let bridge = Bridge::connect(adapter.stream)?;
let mut settings = build_settings::<A>();
if let Some(tok) = opts.token.as_deref() {
settings["token"] = Value::String(tok.to_string());
}
run_session_single::<A>(bridge, settings, runtime)
}
#[cfg(feature = "wire")]
fn run_session_single<A: App>(
mut bridge: Bridge,
settings: Value,
runtime: Option<tokio::runtime::Handle>,
) -> crate::Result {
let (mut model, init_cmd) = A::init();
let mut sub_manager = crate::runtime::subscriptions::SubscriptionManager::new();
let mut effect_tracker = EffectTracker::new();
let mut async_mgr = AsyncTaskManager::new(runtime);
let mut view_errors = crate::runtime::view_errors::ViewErrors::default();
let mut window_sync = crate::runtime::windows::WindowSync::new();
let seed = plushie_core::protocol::TreeNode {
id: String::new(),
type_name: "container".to_string(),
props: plushie_core::protocol::Props::from(plushie_core::protocol::PropMap::new()),
children: vec![],
};
let mut current_tree = match crate::runtime::view_errors::run_guarded_view_wire::<A>(
&mut view_errors,
&model,
&seed,
) {
crate::runtime::view_errors::ViewOutcome::Ok(tree, _) => tree,
crate::runtime::view_errors::ViewOutcome::Panicked { last_good, .. } => last_good,
};
bridge.send_settings(&settings)?;
let hello = bridge.receive()?;
log_renderer_hello(&hello);
let expected = plushie_core::protocol::PROTOCOL_VERSION;
let remote_protocol = hello_protocol_version(&hello);
if remote_protocol != Some(expected) {
log::error!(
"protocol version mismatch: SDK expects {expected}, renderer advertised {remote_protocol:?}"
);
drop(bridge);
return Err(crate::Error::ProtocolVersionMismatch {
expected,
got: remote_protocol,
});
}
if let Some(remote) = hello.get("version").and_then(|v| v.as_str())
&& remote != crate::RENDERER_VERSION
{
log::warn!(
"renderer version skew: SDK built against {expected}, \
renderer reports {got}",
expected = crate::RENDERER_VERSION,
got = remote,
);
}
if let Some(codec_str) = hello.get("codec").and_then(|v| v.as_str()) {
let codec = match codec_str {
"msgpack" => super::bridge::Codec::MsgPack,
"json" => super::bridge::Codec::Json,
other => {
log::warn!("renderer advertised unknown codec `{other}`; keeping JSON");
super::bridge::Codec::Json
}
};
bridge.set_codec(codec);
}
bridge.start_reader()?;
let snapshot_value = serde_json::to_value(¤t_tree)
.map_err(|e| crate::Error::WireEncode(format!("snapshot: {e}")))?;
bridge.send_snapshot(&snapshot_value)?;
for op in window_sync.sync(¤t_tree, &settings) {
dispatch_window_sync_op(&mut bridge, &op)?;
}
if let Err(e) = execute_wire_command(&mut bridge, init_cmd, &mut effect_tracker, &mut async_mgr)
{
log::error!("initial command execution failed: {e}");
}
let new_subs = A::subscribe(&model);
validate_subscription_windows(&new_subs, ¤t_tree);
apply_wire_sub_ops(&mut bridge, &mut async_mgr, sub_manager.sync(new_subs))?;
let policy = A::restart_policy();
let reason = run_session::<A>(
&mut bridge,
&mut model,
&mut current_tree,
&mut effect_tracker,
&mut async_mgr,
&mut sub_manager,
&mut view_errors,
&mut window_sync,
&settings,
policy.heartbeat_interval,
false,
);
log::warn!("plushie wire: renderer exited ({})", reason.label());
A::handle_renderer_exit(&mut model, reason.clone());
if matches!(reason, ExitReason::Shutdown) {
flush_effects_on_shutdown::<A>(&mut model, &mut effect_tracker);
return Ok(());
}
flush_effects_on_shutdown::<A>(&mut model, &mut effect_tracker);
Err(crate::Error::RendererExit(reason))
}
#[cfg(feature = "wire")]
fn run_wire_inner<A: App>(
binary_path: &str,
runtime: Option<tokio::runtime::Handle>,
) -> crate::Result {
let settings = build_settings::<A>();
let policy = A::restart_policy();
let (mut model, init_cmd) = A::init();
let mut sub_manager = crate::runtime::subscriptions::SubscriptionManager::new();
let mut effect_tracker = EffectTracker::new();
let mut async_mgr = AsyncTaskManager::new(runtime);
let mut view_errors = crate::runtime::view_errors::ViewErrors::default();
let mut window_sync = crate::runtime::windows::WindowSync::new();
let seed = plushie_core::protocol::TreeNode {
id: String::new(),
type_name: "container".to_string(),
props: plushie_core::protocol::Props::from(plushie_core::protocol::PropMap::new()),
children: vec![],
};
let mut current_tree = match crate::runtime::view_errors::run_guarded_view_wire::<A>(
&mut view_errors,
&model,
&seed,
) {
crate::runtime::view_errors::ViewOutcome::Ok(tree, _) => tree,
crate::runtime::view_errors::ViewOutcome::Panicked { last_good, .. } => last_good,
};
let mut restart_count: u32 = 0;
let mut pending_init: Option<Command> = Some(init_cmd);
#[cfg_attr(not(feature = "dev"), allow(unused_mut, unused_assignments))]
let mut binary_owned: Option<String> = None;
loop {
let active_binary: &str = binary_owned.as_deref().unwrap_or(binary_path);
let mut bridge = Bridge::spawn(active_binary)
.map_err(|e| crate::Error::spawn(active_binary.to_string(), e))?;
bridge.send_settings(&settings)?;
let hello = bridge.receive()?;
log_renderer_hello(&hello);
let expected = plushie_core::protocol::PROTOCOL_VERSION;
let remote_protocol = hello_protocol_version(&hello);
if remote_protocol != Some(expected) {
log::error!(
"protocol version mismatch: SDK expects {expected}, renderer advertised {remote_protocol:?}"
);
drop(bridge);
return Err(crate::Error::ProtocolVersionMismatch {
expected,
got: remote_protocol,
});
}
if let Some(remote) = hello.get("version").and_then(|v| v.as_str())
&& remote != crate::RENDERER_VERSION
{
log::warn!(
"renderer version skew: SDK built against {expected}, \
renderer reports {got}; run `cargo install plushie-renderer --version {expected}` \
if this is unexpected",
expected = crate::RENDERER_VERSION,
got = remote,
);
}
if let Some(codec_str) = hello.get("codec").and_then(|v| v.as_str()) {
let codec = match codec_str {
"msgpack" => super::bridge::Codec::MsgPack,
"json" => super::bridge::Codec::Json,
other => {
log::warn!("renderer advertised unknown codec `{other}`; keeping JSON");
super::bridge::Codec::Json
}
};
bridge.set_codec(codec);
}
bridge.start_reader()?;
let snapshot_value = serde_json::to_value(¤t_tree)
.map_err(|e| crate::Error::WireEncode(format!("snapshot: {e}")))?;
bridge.send_snapshot(&snapshot_value)?;
if restart_count > 0 {
async_mgr.cancel_all_running();
async_mgr.clear_all_timers();
async_mgr.clear_all_effect_timeouts();
async_mgr.clear_all_send_after();
}
if restart_count > 0 {
window_sync = crate::runtime::windows::WindowSync::new();
}
for op in window_sync.sync(¤t_tree, &settings) {
dispatch_window_sync_op(&mut bridge, &op)?;
}
if let Some(cmd) = pending_init.take()
&& let Err(e) =
execute_wire_command(&mut bridge, cmd, &mut effect_tracker, &mut async_mgr)
{
log::error!("initial command execution failed: {e}");
}
let new_subs = A::subscribe(&model);
if restart_count > 0 {
sub_manager = crate::runtime::subscriptions::SubscriptionManager::new();
}
validate_subscription_windows(&new_subs, ¤t_tree);
apply_wire_sub_ops(&mut bridge, &mut async_mgr, sub_manager.sync(new_subs))?;
if restart_count > 0 {
for (tag, _kind) in effect_tracker.flush_all() {
let event = Event::Effect(EffectEvent {
tag,
result: EffectResult::RendererRestarted,
});
let _ = crate::runtime::view_errors::run_guarded_update::<A>(
&mut view_errors,
&mut model,
event,
);
}
}
let reason = run_session::<A>(
&mut bridge,
&mut model,
&mut current_tree,
&mut effect_tracker,
&mut async_mgr,
&mut sub_manager,
&mut view_errors,
&mut window_sync,
&settings,
policy.heartbeat_interval,
true,
);
log::warn!(
"plushie wire: renderer exited ({}); restart count = {}",
reason.label(),
restart_count
);
A::handle_renderer_exit(&mut model, reason.clone());
if matches!(reason, ExitReason::Shutdown) {
flush_effects_on_shutdown::<A>(&mut model, &mut effect_tracker);
return Ok(());
}
if matches!(reason, ExitReason::RendererSwap) {
restart_count = 0;
#[cfg(feature = "dev")]
{
match crate::runner::wire_discovery::discover_renderer() {
Ok(fresh) => {
log::info!("plushie wire: swap-discovered renderer at {fresh}");
binary_owned = Some(fresh);
}
Err(e) => {
log::warn!(
"plushie wire: swap-rediscovery failed ({e}); \
reusing current binary path"
);
}
}
}
drop(bridge);
continue;
}
if restart_count >= policy.max_restarts {
let final_reason = ExitReason::MaxRestartsReached {
last_reason: Box::new(reason.clone()),
};
A::handle_renderer_exit(&mut model, final_reason.clone());
flush_effects_on_shutdown::<A>(&mut model, &mut effect_tracker);
return Err(crate::Error::RendererExit(final_reason));
}
let delay = policy
.restart_delay
.saturating_mul(2u32.saturating_pow(restart_count));
log::info!(
"plushie wire: restarting renderer in {}ms (attempt {}/{})",
delay.as_millis(),
restart_count + 1,
policy.max_restarts
);
std::thread::sleep(delay);
restart_count += 1;
drop(bridge);
}
}
#[cfg(feature = "wire")]
fn flush_effects_on_shutdown<A: App>(model: &mut A::Model, effect_tracker: &mut EffectTracker) {
let pending = effect_tracker.pending_count();
if pending == 0 {
return;
}
log::info!("wire shutdown: flushing {pending} in-flight effect(s) as Shutdown");
let mut shutdown_errors = crate::runtime::view_errors::ViewErrors::default();
for (tag, _kind) in effect_tracker.flush_all() {
let event = Event::Effect(EffectEvent {
tag,
result: EffectResult::Shutdown,
});
let _ = crate::runtime::view_errors::run_guarded_update::<A>(
&mut shutdown_errors,
model,
event,
);
}
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "wire")]
fn run_session<A: App>(
bridge: &mut Bridge,
model: &mut A::Model,
current_tree: &mut plushie_core::protocol::TreeNode,
effect_tracker: &mut EffectTracker,
async_mgr: &mut AsyncTaskManager,
sub_manager: &mut crate::runtime::subscriptions::SubscriptionManager,
view_errors: &mut crate::runtime::view_errors::ViewErrors,
window_sync: &mut crate::runtime::windows::WindowSync,
base_settings: &Value,
heartbeat_interval: Option<std::time::Duration>,
manages_renderer_lifecycle: bool,
) -> ExitReason {
#[cfg(feature = "dev")]
let poll_interval = heartbeat_interval
.unwrap_or(std::time::Duration::from_millis(250))
.min(std::time::Duration::from_millis(250));
#[cfg(not(feature = "dev"))]
let poll_interval_opt = heartbeat_interval;
#[cfg(feature = "dev")]
let mut since_last_msg = std::time::Instant::now();
loop {
#[cfg(feature = "dev")]
{
if handle_dev_control_signals() && manages_renderer_lifecycle {
return ExitReason::RendererSwap;
}
}
#[cfg(not(feature = "dev"))]
let _ = manages_renderer_lifecycle;
#[cfg(feature = "dev")]
let incoming = bridge.recv_timeout(Some(poll_interval));
#[cfg(not(feature = "dev"))]
let incoming = bridge.recv_timeout(poll_interval_opt);
#[cfg(feature = "dev")]
let incoming = match (&incoming, heartbeat_interval) {
(super::bridge::Incoming::Timeout, Some(hb)) => {
if since_last_msg.elapsed() >= hb {
incoming
} else {
continue;
}
}
(super::bridge::Incoming::Timeout, None) => continue,
_ => {
since_last_msg = std::time::Instant::now();
incoming
}
};
match incoming {
super::bridge::Incoming::Message(raw) => {
let events = wire_to_sdk_events(&raw, effect_tracker, async_mgr);
for event in events {
if let Err(e) = process_event::<A>(
model,
event,
bridge,
current_tree,
effect_tracker,
async_mgr,
sub_manager,
view_errors,
window_sync,
base_settings,
) {
log::error!("command execution failed: {e}");
}
}
}
super::bridge::Incoming::Error(e) => {
log::error!("renderer connection lost: {e}");
return classify_exit(bridge, &e);
}
super::bridge::Incoming::Timeout => {
log::warn!(
"plushie wire: no message in {:?}, triggering restart",
heartbeat_interval
);
return ExitReason::HeartbeatTimeout;
}
}
for sink_event in async_mgr.drain() {
let event = match sink_event {
SinkEvent::EffectTimeout { wire_id } => {
match effect_tracker.resolve(&wire_id) {
Some((tag, kind)) => {
log::debug!(
"wire effect timeout resolved: wire_id={wire_id} tag={tag} kind={kind}"
);
Some(Event::Effect(EffectEvent {
tag,
result: EffectResult::Timeout,
}))
}
None => {
log::debug!(
"wire effect timeout fired after resolution: wire_id={wire_id}"
);
None
}
}
}
other => super::event_bridge::sink_event_to_sdk(other),
};
if let Some(event) = event
&& let Err(e) = process_event::<A>(
model,
event,
bridge,
current_tree,
effect_tracker,
async_mgr,
sub_manager,
view_errors,
window_sync,
base_settings,
)
{
log::error!("async event processing failed: {e}");
}
}
}
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "wire")]
fn process_event<A: App>(
model: &mut A::Model,
event: Event,
bridge: &mut Bridge,
current_tree: &mut plushie_core::protocol::TreeNode,
effect_tracker: &mut EffectTracker,
async_mgr: &mut AsyncTaskManager,
sub_manager: &mut crate::runtime::subscriptions::SubscriptionManager,
view_errors: &mut crate::runtime::view_errors::ViewErrors,
window_sync: &mut crate::runtime::windows::WindowSync,
base_settings: &Value,
) -> crate::Result {
#[cfg(feature = "dev")]
{
if crate::dev::intercept_event(&event) {
return Ok(());
}
}
let cmd = match crate::runtime::view_errors::run_guarded_update::<A>(view_errors, model, event)
{
crate::runtime::view_errors::UpdateOutcome::Ok(cmd) => cmd,
crate::runtime::view_errors::UpdateOutcome::Panicked { cmd, .. } => cmd,
};
execute_wire_command(bridge, cmd, effect_tracker, async_mgr)?;
let outcome =
crate::runtime::view_errors::run_guarded_view_wire::<A>(view_errors, model, current_tree);
let new_tree = match outcome {
crate::runtime::view_errors::ViewOutcome::Ok(tree, warnings) => {
for warning in &warnings {
log::warn!("view normalization: {warning}");
}
tree
}
crate::runtime::view_errors::ViewOutcome::Panicked { last_good, .. } => last_good,
};
for op in window_sync.sync(&new_tree, base_settings) {
dispatch_window_sync_op(bridge, &op)?;
}
let patches = tree_diff::diff_tree(current_tree, &new_tree);
if !patches.is_empty() {
let ops: Vec<Value> = patches
.iter()
.filter_map(|op| serde_json::to_value(op).ok())
.collect();
bridge.send_patch(&ops)?;
}
*current_tree = new_tree;
let new_subs = A::subscribe(model);
validate_subscription_windows(&new_subs, current_tree);
apply_wire_sub_ops(bridge, async_mgr, sub_manager.sync(new_subs))?;
Ok(())
}
#[cfg(feature = "wire")]
fn validate_subscription_windows(
subs: &[crate::subscription::Subscription],
tree: &plushie_core::protocol::TreeNode,
) {
let windows = crate::runtime::windows::detect_windows(tree);
for sub in subs {
if let Some(wid) = sub.window_id()
&& !windows.contains(wid)
{
plushie_core::diagnostics::warn(plushie_core::Diagnostic::UnknownWindow {
window_id: wid.to_string(),
subscription_tag: sub.kind().to_string(),
});
}
}
}
#[cfg(feature = "wire")]
fn dispatch_window_sync_op(
bridge: &mut Bridge,
op: &crate::runtime::windows::WindowSyncOp,
) -> crate::Result {
use crate::runtime::windows::WindowSyncOp;
match op {
WindowSyncOp::Open {
window_id,
settings,
} => bridge.send_window_op("open", window_id, settings),
WindowSyncOp::Close { window_id } => {
bridge.send_window_op("close", window_id, &Value::Null)
}
WindowSyncOp::Update {
window_id,
settings,
} => bridge.send_window_op("update", window_id, settings),
}
}
#[cfg(all(feature = "wire", feature = "dev"))]
fn handle_dev_control_signals() -> bool {
let signals = crate::dev::drain_control_signals();
let mut swap = false;
for signal in signals {
match signal {
crate::dev::ControlSignal::SwapRenderer => {
log::info!("plushie wire: dev-mode swap requested; restarting renderer");
swap = true;
}
}
}
swap
}
#[cfg(feature = "wire")]
fn classify_exit(bridge: &mut Bridge, err: &io::Error) -> ExitReason {
match err.kind() {
io::ErrorKind::UnexpectedEof => ExitReason::ConnectionLost,
_ => {
let code = bridge.try_reap();
ExitReason::Crash {
message: err.to_string(),
code,
}
}
}
}
#[cfg(feature = "wire")]
#[derive(Debug)]
enum IncomingRendererMessage {
Event {
family: String,
id: String,
value: Option<Value>,
tag: Option<String>,
modifiers: Option<plushie_core::protocol::KeyModifiers>,
captured: Option<bool>,
},
EffectResponse {
id: String,
status: &'static str,
result: Option<Value>,
error: Option<String>,
},
QueryResponse {
kind: String,
tag: String,
data: Value,
},
InteractStep {
#[allow(dead_code)]
id: String,
events: Vec<Value>,
},
InteractResponse {
#[allow(dead_code)]
id: String,
events: Vec<Value>,
},
Unknown {
msg_type: String,
#[allow(dead_code)] raw: Value,
},
}
#[cfg(feature = "wire")]
fn decode_incoming(msg: &Value) -> Option<IncomingRendererMessage> {
let msg_type = msg.get("type").and_then(|v| v.as_str())?;
let decoded = match msg_type {
"event" => {
let family = msg.get("family").and_then(|v| v.as_str())?.to_string();
let id = msg
.get("id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
IncomingRendererMessage::Event {
family,
id,
value: msg.get("value").cloned(),
tag: msg
.get("tag")
.and_then(|v| v.as_str())
.map(ToString::to_string),
modifiers: msg
.get("modifiers")
.and_then(|v| serde_json::from_value(v.clone()).ok()),
captured: msg.get("captured").and_then(|v| v.as_bool()),
}
}
"effect_response" => {
let id = msg.get("id").and_then(|v| v.as_str())?.to_string();
let status = match msg.get("status").and_then(|v| v.as_str()) {
Some("ok") => "ok",
Some("cancelled") => "cancelled",
Some("unsupported") => "unsupported",
_ => "error",
};
IncomingRendererMessage::EffectResponse {
id,
status,
result: msg.get("result").cloned(),
error: msg.get("error").and_then(|v| v.as_str()).map(String::from),
}
}
"query_response" | "op_query_response" => {
let kind = msg
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tag = msg
.get("tag")
.or_else(|| msg.get("id"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let data = msg
.get("result")
.or_else(|| msg.get("data"))
.cloned()
.unwrap_or(Value::Null);
IncomingRendererMessage::QueryResponse { kind, tag, data }
}
"interact_step" => {
let id = msg
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let events = msg
.get("events")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
IncomingRendererMessage::InteractStep { id, events }
}
"interact_response" => {
let id = msg
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let events = msg
.get("events")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
IncomingRendererMessage::InteractResponse { id, events }
}
other => IncomingRendererMessage::Unknown {
msg_type: other.to_string(),
raw: msg.clone(),
},
};
Some(decoded)
}
#[cfg(feature = "wire")]
fn wire_to_sdk_events(
msg: &Value,
effect_tracker: &mut EffectTracker,
async_mgr: &mut AsyncTaskManager,
) -> Vec<Event> {
use super::event_bridge::{SinkEvent, sink_event_to_sdk};
use plushie_core::protocol::{EffectResponse, OutgoingEvent};
let Some(decoded) = decode_incoming(msg) else {
plushie_core::diagnostics::error(plushie_core::Diagnostic::UnknownMessageType {
msg_type: String::new(),
});
log::error!("raw unknown-type message: {msg}");
return vec![];
};
let sink_event = match decoded {
IncomingRendererMessage::InteractStep { id: _, events }
| IncomingRendererMessage::InteractResponse { id: _, events } => {
return events
.iter()
.flat_map(|e| wire_to_sdk_events(e, effect_tracker, async_mgr))
.collect();
}
IncomingRendererMessage::Unknown { msg_type, raw: _ } => {
plushie_core::diagnostics::error(plushie_core::Diagnostic::UnknownMessageType {
msg_type,
});
return vec![];
}
IncomingRendererMessage::Event {
family,
id,
value,
tag,
modifiers,
captured,
} => {
let mut event = OutgoingEvent::generic(family, id, value);
event.tag = tag;
event.modifiers = modifiers;
event.captured = captured;
SinkEvent::Event(event)
}
IncomingRendererMessage::EffectResponse {
id: wire_id,
status,
result,
error,
} => {
async_mgr.cancel_effect_timeout(&wire_id);
if let Some((tag, kind)) = effect_tracker.resolve(&wire_id) {
log::debug!(
"wire effect response resolved: wire_id={wire_id} tag={tag} kind={kind} status={status}"
);
let error_as_value = error.as_ref().map(|e| Value::String(e.clone()));
let value = result.as_ref().or(error_as_value.as_ref());
let result = EffectResult::parse(&kind, status, value);
return vec![Event::Effect(EffectEvent { tag, result })];
}
log::debug!(
"wire effect response without tracked request: wire_id={wire_id} status={status}"
);
let response = EffectResponse {
message_type: "effect_response",
session: String::new(),
id: wire_id,
status,
result,
error,
};
SinkEvent::EffectResponse(response)
}
IncomingRendererMessage::QueryResponse { kind, tag, data } => {
SinkEvent::QueryResponse { kind, tag, data }
}
};
sink_event_to_sdk(sink_event).into_iter().collect()
}
#[cfg(feature = "wire")]
enum RuntimeBacking {
Handle(tokio::runtime::Handle),
Owned(tokio::runtime::Runtime),
}
#[cfg(feature = "wire")]
impl RuntimeBacking {
fn handle(&self) -> tokio::runtime::Handle {
match self {
Self::Handle(h) => h.clone(),
Self::Owned(rt) => rt.handle().clone(),
}
}
}
#[cfg(feature = "wire")]
struct AsyncTaskManager {
runtime: RuntimeBacking,
tx: std::sync::mpsc::SyncSender<SinkEvent>,
rx: std::sync::mpsc::Receiver<SinkEvent>,
running: HashMap<String, tokio::task::JoinHandle<()>>,
effect_timeouts: HashMap<String, tokio::task::JoinHandle<()>>,
timers: HashMap<String, tokio::task::JoinHandle<()>>,
send_after_handles: Vec<tokio::task::JoinHandle<()>>,
}
#[cfg(feature = "wire")]
impl AsyncTaskManager {
const CHANNEL_CAPACITY: usize = 1024;
fn new(external: Option<tokio::runtime::Handle>) -> Self {
let runtime = match external {
Some(handle) => RuntimeBacking::Handle(handle),
None => {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("failed to create tokio runtime for wire async");
RuntimeBacking::Owned(rt)
}
};
let (tx, rx) = std::sync::mpsc::sync_channel(Self::CHANNEL_CAPACITY);
Self {
runtime,
tx,
rx,
running: HashMap::new(),
effect_timeouts: HashMap::new(),
timers: HashMap::new(),
send_after_handles: Vec::new(),
}
}
fn schedule_effect_timeout(&mut self, wire_id: String, duration: std::time::Duration) {
if let Some(handle) = self.effect_timeouts.remove(&wire_id) {
log::debug!("wire effect timeout task cancelled: wire_id={wire_id}");
handle.abort();
}
log::debug!("wire effect timeout task scheduled: wire_id={wire_id} duration={duration:?}");
let tx = self.tx.clone();
let wire_id_for_task = wire_id.clone();
let handle = self.runtime.handle().spawn(async move {
tokio::time::sleep(duration).await;
log::debug!("wire effect timeout task fired: wire_id={wire_id_for_task}");
let _ = tx.send(SinkEvent::EffectTimeout {
wire_id: wire_id_for_task,
});
});
self.effect_timeouts.insert(wire_id, handle);
}
fn cancel_effect_timeout(&mut self, wire_id: &str) {
if let Some(handle) = self.effect_timeouts.remove(wire_id) {
log::debug!("wire effect timeout task cancelled: wire_id={wire_id}");
handle.abort();
}
}
fn start_timer(&mut self, tag: String, interval: std::time::Duration) {
if let Some(handle) = self.timers.remove(&tag) {
handle.abort();
}
let tx = self.tx.clone();
let tag_for_task = tag.clone();
let handle = self.runtime.handle().spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
ticker.tick().await;
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let event = Event::Timer(crate::event::TimerEvent {
tag: tag_for_task.clone(),
timestamp,
});
if tx.send(SinkEvent::DelayedEvent(event)).is_err() {
break;
}
}
});
self.timers.insert(tag, handle);
}
fn stop_timer(&mut self, tag: &str) {
if let Some(handle) = self.timers.remove(tag) {
handle.abort();
}
}
fn spawn_async(&mut self, tag: String, task_fn: crate::command::AsyncTaskFn) {
if let Some(handle) = self.running.remove(&tag) {
handle.abort();
}
let tx = self.tx.clone();
let tag_clone = tag.clone();
let handle = self.runtime.handle().spawn(async move {
let future = (task_fn)();
let result = super::run_task_with_panic_guard(&tag_clone, future).await;
let _ = tx.send(SinkEvent::AsyncResult {
tag: tag_clone,
result,
});
});
self.running.insert(tag, handle);
}
fn spawn_stream(&mut self, tag: String, task_fn: crate::command::StreamTaskFn) {
if let Some(handle) = self.running.remove(&tag) {
handle.abort();
}
let tx_stream = self.tx.clone();
let tx_final = self.tx.clone();
let tag_for_sink = tag.clone();
let tag_for_result = tag.clone();
let emitter = crate::command::StreamEmitter::buffered(&tag);
emitter.attach_sink(Box::new(move |t, value| {
let _ = tx_stream.send(SinkEvent::StreamValue { tag: t, value });
let _ = &tag_for_sink;
}));
let handle = self.runtime.handle().spawn(async move {
let future = (task_fn)(emitter);
let result = super::run_task_with_panic_guard(&tag_for_result, future).await;
let _ = tx_final.send(SinkEvent::AsyncResult {
tag: tag_for_result,
result,
});
});
self.running.insert(tag, handle);
}
fn cancel(&mut self, tag: &str) {
if let Some(handle) = self.running.remove(tag) {
handle.abort();
}
}
fn deliver_sink_event(&self, event: SinkEvent) {
let _ = self.tx.send(event);
}
fn send_after(&mut self, delay: std::time::Duration, event: crate::event::Event) {
let tx = self.tx.clone();
let handle = self.runtime.handle().spawn(async move {
use futures::FutureExt;
let fut = async move { tokio::time::sleep(delay).await };
let _ = std::panic::AssertUnwindSafe(fut).catch_unwind().await;
let _ = tx.send(SinkEvent::DelayedEvent(event));
});
self.send_after_handles
.retain(|h: &tokio::task::JoinHandle<()>| !h.is_finished());
self.send_after_handles.push(handle);
}
fn cancel_all_running(&mut self) {
for (_, handle) in self.running.drain() {
handle.abort();
}
}
fn clear_all_timers(&mut self) {
for (_, handle) in self.timers.drain() {
handle.abort();
}
}
fn clear_all_effect_timeouts(&mut self) {
for (_, handle) in self.effect_timeouts.drain() {
handle.abort();
}
}
fn clear_all_send_after(&mut self) {
for handle in self.send_after_handles.drain(..) {
handle.abort();
}
}
fn drain(&mut self) -> Vec<SinkEvent> {
let mut events = Vec::new();
while let Ok(event) = self.rx.try_recv() {
match &event {
SinkEvent::AsyncResult { tag, .. } => {
self.running.remove(tag);
}
SinkEvent::EffectTimeout { wire_id } => {
self.effect_timeouts.remove(wire_id);
}
_ => {}
}
events.push(event);
}
events
}
}
#[cfg(feature = "wire")]
fn execute_wire_command(
bridge: &mut Bridge,
cmd: Command,
effect_tracker: &mut EffectTracker,
async_mgr: &mut AsyncTaskManager,
) -> crate::Result {
match cmd {
Command::None => {}
Command::Exit => {
bridge.send_widget_op("exit", &Value::Null)?;
}
Command::Batch(cmds) => {
for c in cmds {
execute_wire_command(bridge, c, effect_tracker, async_mgr)?;
}
}
Command::Renderer(ref op) => {
execute_wire_renderer_op(bridge, op, effect_tracker, async_mgr)?;
}
Command::Async { tag, task } => {
async_mgr.spawn_async(tag, task);
}
Command::Stream { tag, task } => {
async_mgr.spawn_stream(tag, task);
}
Command::Cancel { tag } => {
async_mgr.cancel(&tag);
}
Command::SendAfter { delay, event } => {
async_mgr.send_after(delay, *event);
}
}
Ok(())
}
#[cfg(feature = "wire")]
fn execute_wire_renderer_op(
bridge: &mut Bridge,
op: &plushie_core::ops::RendererOp,
effect_tracker: &mut EffectTracker,
async_mgr: &mut AsyncTaskManager,
) -> crate::Result {
use plushie_core::ops::{ImageOp, RendererOp};
use serde_json::json;
match op {
RendererOp::Command { id, family, value } => bridge.send_command(id, family, value),
RendererOp::Commands(commands) => bridge.send_commands(commands.clone()),
RendererOp::FocusNext => bridge.send_widget_op("focus_next", &json!({})),
RendererOp::FocusPrevious => bridge.send_widget_op("focus_previous", &json!({})),
RendererOp::FocusNextWithin { scope } => {
bridge.send_widget_op("focus_next_within", &json!({"scope": scope}))
}
RendererOp::FocusPreviousWithin { scope } => {
bridge.send_widget_op("focus_previous_within", &json!({"scope": scope}))
}
RendererOp::Window(op) => execute_wire_window_op(bridge, op),
RendererOp::WindowQuery(query) => {
let (op_name, window_id, payload) = query.to_wire();
bridge.send_window_op(op_name, &window_id, &payload)
}
RendererOp::SystemOp(op) => {
let (op_name, payload) = op.to_wire();
bridge.send(&OutgoingMessage::SystemOp {
session: String::new(),
op: op_name.to_string(),
payload,
})
}
RendererOp::SystemQuery(query) => {
let (op_name, payload) = query.to_wire();
bridge.send(&OutgoingMessage::SystemQuery {
session: String::new(),
op: op_name.to_string(),
payload,
})
}
RendererOp::Effect {
tag,
request,
timeout,
} => {
let kind = request.kind();
let effective_timeout =
timeout.unwrap_or_else(|| effect_tracker::default_timeout(kind));
let (wire_id, replaced) =
effect_tracker.track_with_replacement(tag, kind, effective_timeout);
if let Some((prior_tag, _prior_kind)) = replaced {
async_mgr.deliver_sink_event(SinkEvent::DelayedEvent(Event::Effect(EffectEvent {
tag: prior_tag,
result: EffectResult::Cancelled,
})));
}
async_mgr.schedule_effect_timeout(wire_id.clone(), effective_timeout);
let (_, payload) = plushie_core::ops::effect_request_to_wire(request);
bridge.send_effect(&wire_id, kind, &payload)
}
RendererOp::Image(image_op) => {
let (op, payload) = match image_op {
ImageOp::Create { handle, data } => (
"create_image",
json!({"handle": handle, "data": base64_encode(data)}),
),
ImageOp::CreateRaw {
handle,
width,
height,
pixels,
} => (
"create_image",
json!({"handle": handle, "pixels": base64_encode(pixels),
"width": width, "height": height}),
),
ImageOp::Update { handle, data } => (
"update_image",
json!({"handle": handle, "data": base64_encode(data)}),
),
ImageOp::UpdateRaw {
handle,
width,
height,
pixels,
} => (
"update_image",
json!({"handle": handle, "pixels": base64_encode(pixels),
"width": width, "height": height}),
),
ImageOp::Delete(handle) => ("delete_image", json!({"handle": handle})),
ImageOp::List { tag } => ("list", json!({"tag": tag})),
ImageOp::Clear => ("clear", json!({})),
_ => {
log::warn!("wire mode: unhandled ImageOp variant; op skipped");
return Ok(());
}
};
bridge.send(&OutgoingMessage::ImageOp {
session: String::new(),
op: op.to_string(),
payload,
})
}
RendererOp::Announce { text, politeness } => bridge.send_widget_op(
"announce",
&json!({
"text": text,
"politeness": match politeness {
plushie_core::types::Live::Polite => "polite",
plushie_core::types::Live::Assertive => "assertive",
},
}),
),
RendererOp::LoadFont { family, bytes } => bridge.send_load_font(family, bytes),
RendererOp::Subscribe {
kind,
tag,
max_rate,
window_id,
} => bridge.send_subscribe(kind, tag, *max_rate, window_id.as_deref()),
RendererOp::Unsubscribe { kind, tag } => bridge.send_unsubscribe(kind, tag),
RendererOp::TreeHash { tag } => bridge.send_widget_op("tree_hash", &json!({"tag": tag})),
RendererOp::FindFocused { tag } => {
bridge.send_widget_op("find_focused", &json!({"tag": tag}))
}
RendererOp::AdvanceFrame { timestamp } => bridge.send(&OutgoingMessage::AdvanceFrame {
session: String::new(),
timestamp: *timestamp,
}),
_ => {
log::warn!("wire mode: unhandled RendererOp variant; op skipped");
Ok(())
}
}
}
#[cfg(feature = "wire")]
fn execute_wire_window_op(bridge: &mut Bridge, op: &plushie_core::ops::WindowOp) -> crate::Result {
let (op_str, window_id, payload) = op.to_wire();
bridge.send_window_op(op_str, &window_id, &payload)
}
#[cfg(feature = "wire")]
fn base64_encode(data: &[u8]) -> String {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(data)
}
fn build_settings<A: App>() -> Value {
let mut json = A::settings().to_wire_json();
if let Value::Object(ref mut map) = json {
map.insert(
"protocol_version".to_string(),
serde_json::json!(plushie_core::protocol::PROTOCOL_VERSION),
);
}
json
}
#[cfg(feature = "wire")]
fn apply_wire_sub_ops(
bridge: &mut Bridge,
async_mgr: &mut AsyncTaskManager,
ops: Vec<crate::runtime::subscriptions::SubOp>,
) -> crate::Result {
use crate::runtime::subscriptions::SubOp;
for op in ops {
match op {
SubOp::Subscribe {
kind,
tag,
max_rate,
window_id,
} => {
bridge.send_subscribe(&kind, &tag, max_rate, window_id.as_deref())?;
}
SubOp::Unsubscribe { kind, tag } => {
bridge.send_unsubscribe(&kind, &tag)?;
}
SubOp::StartTimer { tag, interval } => {
async_mgr.start_timer(tag, interval);
}
SubOp::StopTimer { tag } => {
async_mgr.stop_timer(&tag);
}
}
}
Ok(())
}
#[cfg(test)]
mod async_mgr_restart_cleanup_tests {
use super::*;
use std::time::Duration;
fn fresh_mgr() -> AsyncTaskManager {
AsyncTaskManager::new(None)
}
#[test]
fn cleanup_helpers_clear_all_async_mgr_tables() {
let mut mgr = fresh_mgr();
mgr.start_timer("tick".into(), Duration::from_secs(60));
mgr.start_timer("hover".into(), Duration::from_secs(60));
mgr.schedule_effect_timeout("wire-1".into(), Duration::from_secs(60));
mgr.send_after(
Duration::from_secs(60),
crate::event::Event::Timer(crate::event::TimerEvent {
tag: "delayed".into(),
timestamp: 0,
}),
);
mgr.spawn_async(
"task-a".into(),
Box::new(|| Box::pin(async { Ok(serde_json::Value::Null) })),
);
assert_eq!(mgr.timers.len(), 2);
assert_eq!(mgr.effect_timeouts.len(), 1);
assert_eq!(mgr.send_after_handles.len(), 1);
assert_eq!(mgr.running.len(), 1);
mgr.cancel_all_running();
mgr.clear_all_timers();
mgr.clear_all_effect_timeouts();
mgr.clear_all_send_after();
assert!(mgr.timers.is_empty(), "timers should be cleared");
assert!(
mgr.effect_timeouts.is_empty(),
"effect_timeouts should be cleared"
);
assert!(
mgr.send_after_handles.is_empty(),
"send_after handles should be cleared"
);
assert!(mgr.running.is_empty(), "running tasks should be cleared");
}
}
#[cfg(test)]
mod build_settings_tests {
use super::*;
use crate::App;
use crate::Event;
use crate::Subscription;
use crate::settings::Settings;
use crate::widget::WidgetRegistrar;
struct AppWithRequired;
impl App for AppWithRequired {
type Model = ();
fn init() -> (Self::Model, crate::Command) {
((), crate::Command::none())
}
fn update(_model: &mut Self::Model, _event: Event) -> crate::Command {
crate::Command::none()
}
fn view(_model: &Self::Model, _widgets: &mut WidgetRegistrar) -> crate::ViewList {
crate::ui::window("main").into()
}
fn subscribe(_model: &Self::Model) -> Vec<Subscription> {
vec![]
}
fn settings() -> Settings {
Settings {
required_widgets: vec!["gauge".into(), "custom_chart".into()],
..Settings::default()
}
}
}
struct AppWithoutRequired;
impl App for AppWithoutRequired {
type Model = ();
fn init() -> (Self::Model, crate::Command) {
((), crate::Command::none())
}
fn update(_model: &mut Self::Model, _event: Event) -> crate::Command {
crate::Command::none()
}
fn view(_model: &Self::Model, _widgets: &mut WidgetRegistrar) -> crate::ViewList {
crate::ui::window("main").into()
}
fn subscribe(_model: &Self::Model) -> Vec<Subscription> {
vec![]
}
}
#[test]
fn required_widgets_populated_lands_on_wire() {
let json = build_settings::<AppWithRequired>();
let arr = json
.get("required_widgets")
.and_then(|v| v.as_array())
.expect("required_widgets should be present when the App supplies names");
let names: Vec<&str> = arr.iter().filter_map(|v| v.as_str()).collect();
assert_eq!(names, vec!["gauge", "custom_chart"]);
}
#[test]
fn required_widgets_empty_is_omitted() {
let json = build_settings::<AppWithoutRequired>();
assert!(
json.get("required_widgets").is_none(),
"empty required_widgets should not appear on the wire; got: {json}"
);
}
}
#[cfg(test)]
mod hello_protocol_tests {
use super::*;
#[test]
fn protocol_version_wins_over_legacy_protocol() {
let expected = plushie_core::protocol::PROTOCOL_VERSION;
let hello = serde_json::json!({
"protocol_version": expected,
"protocol": expected + 1,
});
assert_eq!(hello_protocol_version(&hello), Some(expected));
}
#[test]
fn legacy_protocol_is_fallback() {
let expected = plushie_core::protocol::PROTOCOL_VERSION;
let hello = serde_json::json!({
"protocol": expected,
});
assert_eq!(hello_protocol_version(&hello), Some(expected));
}
#[test]
fn out_of_range_protocol_is_rejected() {
let hello = serde_json::json!({
"protocol_version": u64::from(u32::MAX) + 1,
});
assert_eq!(hello_protocol_version(&hello), None);
}
#[test]
fn non_integer_protocol_is_rejected() {
let hello = serde_json::json!({
"protocol_version": 1.5,
});
assert_eq!(hello_protocol_version(&hello), None);
}
#[test]
fn u32_max_protocol_is_accepted() {
let hello = serde_json::json!({
"protocol_version": u32::MAX,
});
assert_eq!(hello_protocol_version(&hello), Some(u32::MAX));
}
#[test]
fn hello_log_message_includes_handshake_details() {
let hello = serde_json::json!({
"name": "plushie-renderer",
"protocol_version": 7,
"protocol": 8,
"codec": "msgpack",
"mode": "headless",
"backend": "tiny-skia",
"transport": "stdio",
});
assert_eq!(
renderer_hello_log_message(&hello),
"renderer hello: name=plushie-renderer protocol=7 codec=msgpack mode=headless backend=tiny-skia transport=stdio"
);
}
#[test]
fn hello_log_message_uses_fallbacks_for_missing_details() {
let hello = serde_json::json!({});
assert_eq!(
renderer_hello_log_message(&hello),
"renderer hello: name=unknown protocol=unknown codec=unknown mode=unknown backend=unknown transport=None"
);
}
}
#[cfg(all(test, feature = "wire"))]
mod wire_contract_tests {
use super::*;
#[test]
fn decode_incoming_recognizes_interact_step() {
let raw = serde_json::json!({
"type": "interact_step",
"session": "s1",
"id": "i1",
"events": [
{
"type": "event",
"session": "s1",
"family": "click",
"id": "btn1",
}
],
});
match decode_incoming(&raw).expect("message should decode") {
IncomingRendererMessage::InteractStep { id, events } => {
assert_eq!(id, "i1");
assert_eq!(events.len(), 1);
assert_eq!(events[0]["family"], "click");
}
other => panic!("expected interact_step, got {other:?}"),
}
}
#[test]
fn wire_to_sdk_events_unwraps_interact_step_events() {
let raw = serde_json::json!({
"type": "interact_step",
"session": "s1",
"id": "i1",
"events": [
{
"type": "event",
"session": "s1",
"family": "click",
"id": "btn1",
}
],
});
let mut effect_tracker = EffectTracker::new();
let mut async_mgr = AsyncTaskManager::new(None);
let events = wire_to_sdk_events(&raw, &mut effect_tracker, &mut async_mgr);
assert_eq!(events.len(), 1);
match &events[0] {
Event::Widget(widget) => {
assert_eq!(widget.event_type, plushie_core::EventType::Click);
assert_eq!(widget.scoped_id.id, "btn1");
}
other => panic!("expected widget event, got {other:?}"),
}
}
}