mod activity;
mod cache;
mod format;
mod items;
mod updates;
use anyhow::Result;
use serde_json::Value;
use serde_json::json;
use self::activity::{push_terminal_interaction, upsert_approval_review, upsert_hook_event};
use self::items::upsert_nodes_from_item;
use self::updates::{
append_reasoning_delta, apply_thread_status, apply_turn_completed, ensure_reasoning_node,
extract_message, push_notice, set_status_surface, upsert_assistant_delta,
upsert_exec_output_delta, upsert_file_change_placeholder, upsert_mcp_progress,
upsert_plan_delta, upsert_plan_update_node,
};
use crate::bridge_protocol::{StatusSurfaceState, ThreadRenderNode, ThreadRenderSnapshot};
use crate::state::BridgeState;
use crate::state::helpers::{optional_string, required_string};
pub(super) const PREVIEW_LIMIT: usize = 240;
pub(super) fn empty_thread_render_snapshot(
runtime_id: &str,
thread_id: &str,
) -> ThreadRenderSnapshot {
ThreadRenderSnapshot {
runtime_id: runtime_id.to_string(),
thread_id: thread_id.to_string(),
revision: 0,
status_surface: None,
nodes: Vec::new(),
}
}
pub(super) fn build_thread_render_snapshot(
runtime_id: &str,
thread: &Value,
) -> Result<ThreadRenderSnapshot> {
let thread_id = required_string(thread, "id")?;
let mut snapshot = empty_thread_render_snapshot(runtime_id, thread_id);
for turn in thread
.get("turns")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
let turn_id = optional_string(turn, "id");
for item in turn
.get("items")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
upsert_nodes_from_item(&mut snapshot, turn_id.as_deref(), item, false);
}
}
snapshot.status_surface = thread_status_surface(thread);
snapshot.revision = snapshot.nodes.len() as i64;
if let Some(status_surface) = snapshot.status_surface.as_mut() {
status_surface.revision = snapshot.revision;
}
Ok(snapshot)
}
pub(super) fn apply_app_server_notification(
snapshot: &mut ThreadRenderSnapshot,
method: &str,
params: &Value,
) -> bool {
let changed = match method {
"turn/started" => {
set_status_surface(snapshot, working_status("Working", None));
true
}
"turn/completed" => {
apply_turn_completed(snapshot, params);
true
}
"thread/status/changed" => {
apply_thread_status(snapshot, params);
true
}
"turn/plan/updated" => {
upsert_plan_update_node(snapshot, params);
true
}
"item/started" | "item/completed" => {
let turn_id = optional_string(params, "turnId");
if let Some(item) = params.get("item") {
upsert_nodes_from_item(
snapshot,
turn_id.as_deref(),
item,
method == "item/started",
);
true
} else {
false
}
}
"item/agentMessage/delta" => {
upsert_assistant_delta(snapshot, params);
true
}
"item/plan/delta" => {
upsert_plan_delta(snapshot, params);
true
}
"item/commandExecution/outputDelta" => {
upsert_exec_output_delta(snapshot, params);
true
}
"item/fileChange/outputDelta" => {
upsert_file_change_placeholder(snapshot, params);
true
}
"item/mcpToolCall/progress" => {
upsert_mcp_progress(snapshot, params);
true
}
"item/reasoning/summaryPartAdded" => {
ensure_reasoning_node(snapshot, params);
true
}
"item/reasoning/summaryTextDelta" => {
append_reasoning_delta(snapshot, params, true);
true
}
"item/reasoning/textDelta" => {
append_reasoning_delta(snapshot, params, false);
true
}
"thread/compacted" => {
push_notice(
snapshot,
params,
"Context compacted",
extract_message(params),
);
true
}
"hook/started" => {
upsert_hook_event(snapshot, params, true);
true
}
"hook/completed" => {
upsert_hook_event(snapshot, params, false);
true
}
"item/autoApprovalReview/started" => {
upsert_approval_review(snapshot, params, true);
true
}
"item/autoApprovalReview/completed" => {
upsert_approval_review(snapshot, params, false);
true
}
"item/commandExecution/terminalInteraction" => {
push_terminal_interaction(snapshot, params);
true
}
"model/rerouted" => {
let detail = Some(format!(
"{} -> {} ({})",
optional_string(params, "fromModel").unwrap_or_default(),
optional_string(params, "toModel").unwrap_or_default(),
optional_string(params, "reason").unwrap_or_default()
));
push_notice(snapshot, params, "Model rerouted", detail);
true
}
"error" => {
push_notice(snapshot, params, "Error", extract_message(params));
true
}
_ => false,
};
if changed {
snapshot.revision += 1;
if let Some(status_surface) = snapshot.status_surface.as_mut() {
status_surface.revision = snapshot.revision;
}
}
changed
}
pub(super) fn render_notice_node(
turn_id: Option<&str>,
item_id: Option<String>,
title: impl Into<String>,
detail: Option<String>,
) -> ThreadRenderNode {
ThreadRenderNode::InfoNotice {
id: render_node_id(turn_id, item_id.as_deref(), "notice"),
turn_id: turn_id.map(ToOwned::to_owned),
item_id,
title: title.into(),
detail,
}
}
pub(super) fn render_collab_title(tool: &str, status: &str, receiver_ids: &[String]) -> String {
let receiver = receiver_ids
.first()
.cloned()
.unwrap_or_else(|| "agent".to_string());
match (tool, status, receiver_ids.len()) {
("spawnAgent", "inProgress", _) => "Spawning agent".to_string(),
("spawnAgent", _, 1) => format!("Spawned {receiver}"),
("sendInput", "inProgress", _) => format!("Sending input to {receiver}"),
("sendInput", _, _) => format!("Sent input to {receiver}"),
("resumeAgent", "inProgress", _) => format!("Resuming {receiver}"),
("resumeAgent", _, _) => format!("Resumed {receiver}"),
("closeAgent", _, _) => format!("Closed {receiver}"),
("wait", "inProgress", 1) => format!("Waiting for {receiver}"),
("wait", "inProgress", count) => format!("Waiting for {count} agents"),
("wait", _, _) => "Finished waiting".to_string(),
_ => tool.to_string(),
}
}
pub(super) fn render_node_id(turn_id: Option<&str>, item_id: Option<&str>, suffix: &str) -> String {
format!(
"{}:{}:{}",
turn_id.unwrap_or("turn"),
item_id.unwrap_or("item"),
suffix
)
}
pub(super) fn optional_turn_id_from_params(params: &Value) -> Option<String> {
params
.get("turn")
.and_then(|turn| turn.get("id"))
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.or_else(|| optional_string(params, "turnId"))
}
pub(super) fn node_id_value(node: &ThreadRenderNode) -> &str {
match node {
ThreadRenderNode::UserMessage { id, .. }
| ThreadRenderNode::AssistantMarkdown { id, .. }
| ThreadRenderNode::FinalSeparator { id, .. }
| ThreadRenderNode::ReasoningSummary { id, .. }
| ThreadRenderNode::PlanUpdate { id, .. }
| ThreadRenderNode::ProposedPlan { id, .. }
| ThreadRenderNode::ExecGroup { id, .. }
| ThreadRenderNode::FileChange { id, .. }
| ThreadRenderNode::McpToolCall { id, .. }
| ThreadRenderNode::DynamicToolCall { id, .. }
| ThreadRenderNode::WebSearch { id, .. }
| ThreadRenderNode::TerminalInteraction { id, .. }
| ThreadRenderNode::CollabEvent { id, .. }
| ThreadRenderNode::HookEvent { id, .. }
| ThreadRenderNode::ApprovalReview { id, .. }
| ThreadRenderNode::InfoNotice { id, .. }
| ThreadRenderNode::ViewImage { id, .. }
| ThreadRenderNode::ImageGeneration { id, .. } => id,
}
}
pub(super) fn node_turn_id(node: &ThreadRenderNode) -> Option<&str> {
match node {
ThreadRenderNode::UserMessage { turn_id, .. }
| ThreadRenderNode::AssistantMarkdown { turn_id, .. }
| ThreadRenderNode::FinalSeparator { turn_id, .. }
| ThreadRenderNode::ReasoningSummary { turn_id, .. }
| ThreadRenderNode::PlanUpdate { turn_id, .. }
| ThreadRenderNode::ProposedPlan { turn_id, .. }
| ThreadRenderNode::ExecGroup { turn_id, .. }
| ThreadRenderNode::FileChange { turn_id, .. }
| ThreadRenderNode::McpToolCall { turn_id, .. }
| ThreadRenderNode::DynamicToolCall { turn_id, .. }
| ThreadRenderNode::WebSearch { turn_id, .. }
| ThreadRenderNode::TerminalInteraction { turn_id, .. }
| ThreadRenderNode::CollabEvent { turn_id, .. }
| ThreadRenderNode::HookEvent { turn_id, .. }
| ThreadRenderNode::ApprovalReview { turn_id, .. }
| ThreadRenderNode::InfoNotice { turn_id, .. }
| ThreadRenderNode::ViewImage { turn_id, .. }
| ThreadRenderNode::ImageGeneration { turn_id, .. } => turn_id.as_deref(),
}
}
pub(super) fn node_item_id(node: &ThreadRenderNode) -> Option<&str> {
match node {
ThreadRenderNode::UserMessage { item_id, .. }
| ThreadRenderNode::AssistantMarkdown { item_id, .. }
| ThreadRenderNode::ReasoningSummary { item_id, .. }
| ThreadRenderNode::PlanUpdate { item_id, .. }
| ThreadRenderNode::ProposedPlan { item_id, .. }
| ThreadRenderNode::ExecGroup { item_id, .. }
| ThreadRenderNode::FileChange { item_id, .. }
| ThreadRenderNode::McpToolCall { item_id, .. }
| ThreadRenderNode::DynamicToolCall { item_id, .. }
| ThreadRenderNode::WebSearch { item_id, .. }
| ThreadRenderNode::TerminalInteraction { item_id, .. }
| ThreadRenderNode::CollabEvent { item_id, .. }
| ThreadRenderNode::HookEvent { item_id, .. }
| ThreadRenderNode::ApprovalReview { item_id, .. }
| ThreadRenderNode::InfoNotice { item_id, .. }
| ThreadRenderNode::ViewImage { item_id, .. }
| ThreadRenderNode::ImageGeneration { item_id, .. } => item_id.as_deref(),
ThreadRenderNode::FinalSeparator { .. } => None,
}
}
pub(super) fn upsert_node_by_id(snapshot: &mut ThreadRenderSnapshot, node: ThreadRenderNode) {
let node_id = node_id_value(&node).to_string();
if let Some(existing) = snapshot
.nodes
.iter_mut()
.find(|existing| node_id_value(existing) == node_id)
{
*existing = node;
return;
}
snapshot.nodes.push(node);
}
pub(super) fn replace_item_nodes(
snapshot: &mut ThreadRenderSnapshot,
item_id: &str,
new_nodes: Vec<ThreadRenderNode>,
) {
let mut insert_at = snapshot.nodes.len();
let mut retained = Vec::with_capacity(snapshot.nodes.len() + new_nodes.len());
for node in snapshot.nodes.drain(..) {
if node_item_id(&node) == Some(item_id) {
insert_at = insert_at.min(retained.len());
continue;
}
retained.push(node);
}
if insert_at > retained.len() {
insert_at = retained.len();
}
retained.splice(insert_at..insert_at, new_nodes);
snapshot.nodes = retained;
}
pub(super) fn set_inline_message_from_last_node(snapshot: &mut ThreadRenderSnapshot) {
let message = snapshot.nodes.last().and_then(inline_message_for_node);
if let Some(status_surface) = snapshot.status_surface.as_mut() {
status_surface.inline_message = message;
}
}
pub(super) fn working_status(header: &str, details: Option<String>) -> StatusSurfaceState {
StatusSurfaceState {
kind: "running".to_string(),
header: header.to_string(),
details,
inline_message: None,
interrupt_visible: true,
revision: 0,
}
}
fn inline_message_for_node(node: &ThreadRenderNode) -> Option<String> {
match node {
ThreadRenderNode::ExecGroup { title, .. }
| ThreadRenderNode::McpToolCall { title, .. }
| ThreadRenderNode::DynamicToolCall { title, .. }
| ThreadRenderNode::TerminalInteraction { title, .. }
| ThreadRenderNode::PlanUpdate { title, .. }
| ThreadRenderNode::CollabEvent { title, .. }
| ThreadRenderNode::HookEvent { title, .. }
| ThreadRenderNode::ApprovalReview { title, .. }
| ThreadRenderNode::InfoNotice { title, .. } => Some(title.clone()),
_ => None,
}
}
fn thread_status_surface(thread: &Value) -> Option<StatusSurfaceState> {
let status = thread
.get("status")
.and_then(Value::as_object)
.and_then(|status| status.get("state"))
.and_then(Value::as_str)
.or_else(|| thread.get("status").and_then(Value::as_str))?;
match status {
"inProgress" | "active" => Some(working_status("Working", None)),
"failed" => Some(working_status("Failed", None)),
_ => None,
}
}
impl BridgeState {
pub(super) fn apply_thread_render_notification(
&self,
runtime_id: &str,
method: &str,
params: &Value,
) -> Result<()> {
let Some(thread_id) = params.get("threadId").and_then(Value::as_str) else {
return Ok(());
};
let mut snapshots = self
.thread_render_snapshots
.lock()
.expect("thread render snapshots poisoned");
let snapshot = snapshots
.entry(thread_id.to_string())
.or_insert_with(|| empty_thread_render_snapshot(runtime_id, thread_id));
if !apply_app_server_notification(snapshot, method, params) {
return Ok(());
}
let payload = json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"patch": {
"runtimeId": runtime_id,
"threadId": thread_id,
"snapshot": snapshot.clone(),
},
});
drop(snapshots);
self.emit_event(
"thread/renderPatch",
Some(runtime_id),
Some(thread_id),
payload,
)
}
}