use anyhow::{Context, Result};
use serde_json::{Value, json};
use crate::state::BridgeState;
use crate::state::helpers::{optional_string, required_string};
use crate::state::timeline::{
normalize_delta_payload, timeline_entry_from_plan_update, timeline_entry_from_thread_item,
};
pub(super) async fn handle_turn_started(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = params
.get("turn")
.and_then(|turn| turn.get("id"))
.and_then(Value::as_str)
.unwrap_or_default();
state.emit_event(
"turn/started",
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"turnId": turn_id,
"turn": params.get("turn"),
}),
)?;
Ok(())
}
pub(super) async fn handle_agent_message_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
"item/agentMessage/delta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"agentMessage",
Some("Codex".to_string()),
None,
"item/agentMessage/delta",
json!({}),
None,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_command_execution_output_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
"item/commandExecution/outputDelta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"commandExecution",
Some("命令输出".to_string()),
Some("inProgress".to_string()),
"item/commandExecution/outputDelta",
json!({}),
None,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_file_change_output_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
"item/fileChange/outputDelta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"fileChange",
Some("文件改动".to_string()),
Some("inProgress".to_string()),
"item/fileChange/outputDelta",
json!({}),
None,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_turn_diff_updated(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
state.emit_event(
"turn/diff/updated",
Some(runtime_id),
params.get("threadId").and_then(Value::as_str),
json!({
"runtimeId": runtime_id,
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"diff": params.get("diff"),
"raw": params,
}),
)?;
Ok(())
}
pub(super) async fn handle_turn_plan_updated(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
let entry = timeline_entry_from_plan_update(
runtime_id,
thread_id,
turn_id,
optional_string(¶ms, "explanation"),
params.get("plan").cloned().unwrap_or_else(|| json!([])),
);
state.emit_event(
"turn/plan/updated",
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
Ok(())
}
pub(super) async fn handle_item_started(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
if let Some(item) = params.get("item").and_then(Value::as_object) {
if let Some(entry) = timeline_entry_from_thread_item(
runtime_id,
thread_id,
Some(turn_id),
&Value::Object(item.clone()),
"stream_event",
true,
false,
) {
state.emit_event(
"item/started",
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
}
}
Ok(())
}
pub(super) async fn handle_item_completed(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = required_string(¶ms, "threadId")?;
let turn_id = required_string(¶ms, "turnId")?;
if let Some(item) = params.get("item").and_then(Value::as_object) {
if let Some(entry) = timeline_entry_from_thread_item(
runtime_id,
thread_id,
Some(turn_id),
&Value::Object(item.clone()),
"stream_event",
false,
true,
) {
state.emit_event(
"item/completed",
Some(runtime_id),
Some(thread_id),
serde_json::to_value(entry)?,
)?;
}
}
Ok(())
}
pub(super) async fn handle_item_plan_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
"item/plan/delta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"plan",
Some("执行计划".to_string()),
Some("inProgress".to_string()),
"item/plan/delta",
json!({}),
None,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_mcp_tool_call_progress(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
state.emit_event(
"item/mcpToolCall/progress",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
json!({
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"itemId": params.get("itemId"),
"delta": params.get("message"),
}),
"mcpToolCall",
Some("MCP 工具".to_string()),
Some("inProgress".to_string()),
"item/mcpToolCall/progress",
json!({}),
None,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_reasoning_text_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let content_index = params.get("contentIndex").and_then(Value::as_i64);
state.emit_event(
"item/reasoning/textDelta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"reasoning",
Some("思考过程".to_string()),
Some("inProgress".to_string()),
"item/reasoning/textDelta",
json!({
"summary": [],
"content": [],
}),
None,
content_index,
),
)?;
Ok(())
}
pub(super) async fn handle_reasoning_summary_part_added(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let summary_index = params.get("summaryIndex").and_then(Value::as_i64);
state.emit_event(
"item/reasoning/summaryPartAdded",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
json!({
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"itemId": params.get("itemId"),
"delta": "",
}),
"reasoning",
Some("思考摘要".to_string()),
Some("inProgress".to_string()),
"item/reasoning/summaryPartAdded",
json!({
"summary": [],
"content": [],
}),
summary_index,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_reasoning_summary_text_delta(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = params
.get("threadId")
.and_then(Value::as_str)
.map(ToOwned::to_owned);
let summary_index = params.get("summaryIndex").and_then(Value::as_i64);
state.emit_event(
"item/reasoning/summaryTextDelta",
Some(runtime_id),
thread_id.as_deref(),
normalize_delta_payload(
runtime_id,
params,
"reasoning",
Some("思考摘要".to_string()),
Some("inProgress".to_string()),
"item/reasoning/summaryTextDelta",
json!({
"summary": [],
"content": [],
}),
summary_index,
None,
),
)?;
Ok(())
}
pub(super) async fn handle_turn_completed(
state: &BridgeState,
runtime_id: &str,
params: Value,
) -> Result<()> {
let thread_id = required_string(¶ms, "threadId")?;
let turn = params.get("turn").context("turn/completed 缺少 turn")?;
if let Some(turn_id) = turn.get("id").and_then(Value::as_str) {
state.cleanup_staged_turn_inputs(turn_id)?;
}
state.emit_event(
"turn/completed",
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"turnId": turn.get("id"),
"status": turn.get("status"),
"error": turn.get("error"),
"turn": turn,
}),
)?;
Ok(())
}