use serde_json::json;
use tokio::time::{Duration, timeout};
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{ThreadRenderNode, ThreadRenderSnapshot};
use crate::storage::PRIMARY_RUNTIME_ID;
use super::super::events::handle_app_server_message;
use super::support::bootstrap_test_state;
#[tokio::test]
async fn command_execution_completion_keeps_streamed_output_when_turn_id_changes() {
let state = bootstrap_test_state().await;
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Notification {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
method: "item/commandExecution/outputDelta".to_string(),
params: json!({
"threadId": "thread-render-exec-delta",
"turnId": "turn-delta",
"itemId": "item-ran",
"delta": "partial output\n"
}),
},
),
)
.await
.expect("处理 commandExecution outputDelta 超时")
.expect("处理 commandExecution outputDelta 失败");
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Notification {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
method: "item/completed".to_string(),
params: json!({
"threadId": "thread-render-exec-delta",
"turnId": "turn-completed",
"item": {
"id": "item-ran",
"type": "commandExecution",
"status": "completed",
"command": "echo partial",
"exitCode": 0,
"commandActions": [
{
"type": "run",
"command": "echo partial"
}
]
}
}),
},
),
)
.await
.expect("处理 commandExecution item/completed 超时")
.expect("处理 commandExecution item/completed 失败");
let snapshots = state
.thread_render_snapshots
.lock()
.expect("thread render snapshots poisoned");
let snapshot = snapshots
.get("thread-render-exec-delta")
.expect("应缓存 thread render snapshot");
let exec_node = snapshot
.nodes
.iter()
.find_map(|node| match node {
ThreadRenderNode::ExecGroup {
kind,
output_text,
commands,
..
} => Some((kind.clone(), output_text.clone(), commands.clone())),
_ => None,
})
.expect("应存在 exec group 节点");
assert_eq!("ran", exec_node.0);
assert_eq!(Some("partial output\n".to_string()), exec_node.1);
assert_eq!(1, exec_node.2.len());
assert_eq!("echo partial", exec_node.2[0].text);
}
#[tokio::test]
async fn command_execution_completion_prefers_final_aggregated_output() {
let state = bootstrap_test_state().await;
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Notification {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
method: "item/commandExecution/outputDelta".to_string(),
params: json!({
"threadId": "thread-render-exec-final",
"turnId": "turn-final",
"itemId": "item-ran-final",
"delta": "stale output\n"
}),
},
),
)
.await
.expect("处理 commandExecution outputDelta 超时")
.expect("处理 commandExecution outputDelta 失败");
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Notification {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
method: "item/completed".to_string(),
params: json!({
"threadId": "thread-render-exec-final",
"turnId": "turn-final",
"item": {
"id": "item-ran-final",
"type": "commandExecution",
"status": "completed",
"command": "printf final",
"aggregatedOutput": "final output\n",
"exitCode": 0,
"commandActions": [
{
"type": "run",
"command": "printf final"
}
]
}
}),
},
),
)
.await
.expect("处理 commandExecution item/completed 超时")
.expect("处理 commandExecution item/completed 失败");
let snapshots = state
.thread_render_snapshots
.lock()
.expect("thread render snapshots poisoned");
let snapshot = snapshots
.get("thread-render-exec-final")
.expect("应缓存 thread render snapshot");
let output_text = snapshot
.nodes
.iter()
.find_map(|node| match node {
ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
_ => None,
})
.expect("应存在 exec group 输出");
assert_eq!("final output\n", output_text);
}
#[tokio::test]
async fn cache_thread_render_snapshot_backfills_missing_exec_output() {
let state = bootstrap_test_state().await;
let thread_id = "thread-render-exec-cache";
let existing = state.cache_thread_render_snapshot(ThreadRenderSnapshot {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
thread_id: thread_id.to_string(),
revision: 1,
status_surface: None,
nodes: vec![ThreadRenderNode::ExecGroup {
id: "turn-live:item-cache:exec".to_string(),
turn_id: Some("turn-live".to_string()),
item_id: Some("item-cache".to_string()),
title: "Ran".to_string(),
kind: "ran".to_string(),
state: "completed".to_string(),
commands: Vec::new(),
output_text: Some("streamed output\n".to_string()),
exit_code: Some(0),
}],
});
assert_eq!(
Some("streamed output\n".to_string()),
existing.nodes.iter().find_map(|node| match node {
ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
_ => None,
})
);
let merged = state.cache_thread_render_snapshot(ThreadRenderSnapshot {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
thread_id: thread_id.to_string(),
revision: 1,
status_surface: None,
nodes: vec![ThreadRenderNode::ExecGroup {
id: "turn-read:item-cache:exec".to_string(),
turn_id: Some("turn-read".to_string()),
item_id: Some("item-cache".to_string()),
title: "Ran".to_string(),
kind: "ran".to_string(),
state: "completed".to_string(),
commands: Vec::new(),
output_text: None,
exit_code: Some(0),
}],
});
assert_eq!(
Some("streamed output\n".to_string()),
merged.nodes.iter().find_map(|node| match node {
ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
_ => None,
})
);
}