use std::collections::HashMap;
use anyhow::Result;
use serde_json::{Value, json};
use super::helpers::{normalize_name, optional_string, required_string};
use crate::bridge_protocol::{
PersistedEvent, ThreadStatusInfo, ThreadSummary, ThreadTokenUsage, TimelineEntry,
WorkspaceRecord,
};
use crate::storage::PRIMARY_RUNTIME_ID;
const TIMELINE_SCHEMA_VERSION: i64 = 4;
#[derive(Clone, Debug)]
struct TimelineSemanticInfo {
kind: String,
detail: Option<String>,
confidence: String,
role: String,
}
impl TimelineSemanticInfo {
fn new(
kind: impl Into<String>,
detail: Option<String>,
confidence: impl Into<String>,
role: impl Into<String>,
) -> Self {
Self {
kind: kind.into(),
detail,
confidence: confidence.into(),
role: role.into(),
}
}
fn to_value(&self) -> Value {
json!({
"kind": self.kind,
"detail": self.detail,
"confidence": self.confidence,
"role": self.role,
})
}
fn from_value(value: &Value) -> Option<Self> {
let object = value.as_object()?;
Some(Self {
kind: object.get("kind")?.as_str()?.to_string(),
detail: object
.get("detail")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
confidence: object
.get("confidence")
.and_then(Value::as_str)
.unwrap_or("low")
.to_string(),
role: object
.get("role")
.and_then(Value::as_str)
.unwrap_or("primary")
.to_string(),
})
}
}
#[derive(Clone, Debug)]
struct TimelineLifecycleInfo {
stage: String,
source_status: Option<String>,
has_visible_content: bool,
}
impl TimelineLifecycleInfo {
fn new(
stage: impl Into<String>,
source_status: Option<String>,
has_visible_content: bool,
) -> Self {
Self {
stage: stage.into(),
source_status,
has_visible_content,
}
}
fn to_value(&self) -> Value {
json!({
"stage": self.stage,
"sourceStatus": self.source_status,
"hasVisibleContent": self.has_visible_content,
})
}
}
#[derive(Clone, Debug, Default)]
struct TimelineSummaryInfo {
title: Option<String>,
label: Option<String>,
command: Option<String>,
query: Option<String>,
targets: Vec<String>,
primary_path: Option<String>,
file_count: Option<i64>,
add_lines: Option<i64>,
remove_lines: Option<i64>,
wait_count: Option<i64>,
}
impl TimelineSummaryInfo {
fn to_value(&self) -> Value {
json!({
"title": self.title,
"label": self.label,
"command": self.command,
"query": self.query,
"targets": self.targets,
"primaryPath": self.primary_path,
"fileCount": self.file_count,
"addLines": self.add_lines,
"removeLines": self.remove_lines,
"waitCount": self.wait_count,
})
}
fn from_value(value: &Value) -> Option<Self> {
let object = value.as_object()?;
Some(Self {
title: object
.get("title")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
label: object
.get("label")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
command: object
.get("command")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
query: object
.get("query")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
targets: object
.get("targets")
.and_then(Value::as_array)
.map(|items| {
items
.iter()
.filter_map(Value::as_str)
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.unwrap_or_default(),
primary_path: object
.get("primaryPath")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
file_count: object.get("fileCount").and_then(Value::as_i64),
add_lines: object.get("addLines").and_then(Value::as_i64),
remove_lines: object.get("removeLines").and_then(Value::as_i64),
wait_count: object.get("waitCount").and_then(Value::as_i64),
})
}
}
pub(super) fn normalize_thread(
runtime_id: &str,
thread: &Value,
workspace: Option<&WorkspaceRecord>,
archived: bool,
) -> Result<ThreadSummary> {
let cwd = required_string(thread, "cwd")?.to_string();
let status_value = thread.get("status").unwrap_or(&Value::Null);
let status_info = normalize_thread_status_info(status_value);
let status = status_info.kind.clone();
let is_loaded = status != "notLoaded";
let is_active = status == "active";
let source = match thread.get("source") {
Some(Value::String(value)) => value.clone(),
Some(other) => other.to_string(),
None => "unknown".to_string(),
};
Ok(ThreadSummary {
id: required_string(thread, "id")?.to_string(),
runtime_id: runtime_id.to_string(),
workspace_id: workspace.map(|item| item.id.clone()),
name: normalize_name(optional_string(thread, "name")),
note: None,
preview: optional_string(thread, "preview").unwrap_or_default(),
cwd,
status,
status_info,
token_usage: thread
.get("tokenUsage")
.or_else(|| thread.get("usage"))
.map(normalize_thread_token_usage),
model_provider: optional_string(thread, "modelProvider")
.unwrap_or_else(|| "unknown".to_string()),
source,
created_at: thread
.get("createdAt")
.and_then(Value::as_i64)
.unwrap_or_default(),
updated_at: thread
.get("updatedAt")
.and_then(Value::as_i64)
.unwrap_or_default(),
is_loaded,
is_active,
archived,
})
}
fn normalize_thread_status_info(status_value: &Value) -> ThreadStatusInfo {
ThreadStatusInfo {
kind: status_value
.get("type")
.and_then(Value::as_str)
.unwrap_or_else(|| status_value.as_str().unwrap_or("unknown"))
.to_string(),
reason: optional_string(status_value, "reason"),
raw: status_value.clone(),
}
}
fn normalize_thread_token_usage(value: &Value) -> ThreadTokenUsage {
ThreadTokenUsage {
input_tokens: timeline_first_i64(value, &["inputTokens", "input_tokens"]),
cached_input_tokens: timeline_first_i64(
value,
&["cachedInputTokens", "cached_input_tokens"],
),
output_tokens: timeline_first_i64(value, &["outputTokens", "output_tokens"]),
reasoning_tokens: timeline_first_i64(value, &["reasoningTokens", "reasoning_tokens"]),
total_tokens: timeline_first_i64(value, &["totalTokens", "total_tokens"]),
raw: value.clone(),
updated_at_ms: value
.get("updatedAtMs")
.and_then(Value::as_i64)
.unwrap_or_default(),
}
}
fn timeline_first_i64(value: &Value, keys: &[&str]) -> Option<i64> {
keys.iter()
.find_map(|key| value.get(*key).and_then(Value::as_i64))
}
pub(super) fn normalize_delta_payload(
runtime_id: &str,
params: Value,
entry_type: &str,
title: Option<String>,
status: Option<String>,
raw_type: &str,
payload: Value,
summary_index: Option<i64>,
content_index: Option<i64>,
) -> Value {
let delta = params
.get("delta")
.and_then(Value::as_str)
.unwrap_or_default();
let lifecycle = timeline_lifecycle_info(false, status.as_deref(), !delta.trim().is_empty());
json!({
"runtimeId": runtime_id,
"threadId": params.get("threadId"),
"turnId": params.get("turnId"),
"itemId": params.get("itemId"),
"delta": params.get("delta"),
"entryType": entry_type,
"title": title,
"status": status,
"metadata": timeline_metadata(
"stream_event",
raw_type,
timeline_render_kind(entry_type),
timeline_collapse_hint(timeline_render_kind(entry_type), entry_type),
timeline_stream_metadata(true, false, None, summary_index, content_index),
Some(&lifecycle),
payload,
None,
None,
params,
),
"summaryIndex": summary_index,
"contentIndex": content_index,
})
}
pub(super) fn timeline_entries_from_thread(
runtime_id: &str,
thread: &Value,
) -> Result<Vec<TimelineEntry>> {
let thread_id = required_string(thread, "id")?.to_string();
let mut entries = Vec::new();
for turn in thread
.get("turns")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
let turn_id = optional_string(turn, "id");
for item in turn
.get("items")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
if let Some(entry) = timeline_entry_from_thread_item(
runtime_id,
&thread_id,
turn_id.as_deref(),
item,
"thread_item",
false,
true,
) {
entries.push(entry);
}
}
}
finalize_timeline_entries(&mut entries);
Ok(entries)
}
pub(super) fn timeline_entry_from_thread_item(
runtime_id: &str,
thread_id: &str,
turn_id: Option<&str>,
item: &Value,
source_kind: &str,
is_streaming: bool,
authoritative: bool,
) -> Option<TimelineEntry> {
let item_id = optional_string(item, "id");
let raw_type = required_string(item, "type").ok()?.to_string();
let entry_type = canonical_timeline_entry_type(&raw_type, item);
let render_kind = timeline_render_kind(&entry_type);
let stream_phase = optional_string(item, "phase");
let payload = timeline_payload_from_thread_item(&entry_type, item);
let semantic = timeline_semantic_from_item(&raw_type, &entry_type, item, &payload);
let summary = timeline_summary_from_item(&entry_type, item, &payload, semantic.as_ref());
let text = timeline_text_from_thread_item(&entry_type, item);
let entry_status = timeline_entry_status(&entry_type, item, is_streaming);
let lifecycle = timeline_lifecycle_info(
authoritative,
entry_status.as_deref(),
timeline_has_visible_content(&text, &payload, summary.as_ref()),
);
let metadata = timeline_metadata(
source_kind,
&raw_type,
render_kind,
timeline_collapse_hint(render_kind, &entry_type),
timeline_stream_metadata(
is_streaming,
authoritative,
stream_phase.clone(),
None,
None,
),
Some(&lifecycle),
payload,
semantic.as_ref(),
summary.as_ref(),
item.clone(),
);
Some(TimelineEntry {
id: timeline_entry_id(turn_id, item_id.as_deref(), &entry_type),
runtime_id: runtime_id.to_string(),
thread_id: thread_id.to_string(),
turn_id: turn_id.map(ToOwned::to_owned),
item_id,
entry_type: entry_type.clone(),
title: timeline_entry_title(&entry_type, item),
text,
status: entry_status,
metadata,
})
}
pub(super) fn timeline_entry_from_plan_update(
runtime_id: &str,
thread_id: &str,
turn_id: &str,
explanation: Option<String>,
plan: Value,
) -> TimelineEntry {
let item_id = format!("turn-plan:{turn_id}");
TimelineEntry {
id: timeline_entry_id(Some(turn_id), Some(&item_id), "plan"),
runtime_id: runtime_id.to_string(),
thread_id: thread_id.to_string(),
turn_id: Some(turn_id.to_string()),
item_id: Some(item_id),
entry_type: "plan".to_string(),
title: Some("执行计划".to_string()),
text: format_plan_payload(explanation.as_deref(), plan.as_array()),
status: Some("inProgress".to_string()),
metadata: timeline_metadata(
"stream_event",
"turn/plan/updated",
"plan",
timeline_collapse_hint("plan", "plan"),
timeline_stream_metadata(true, false, None, None, None),
None,
json!({
"explanation": explanation,
"plan": plan,
}),
None,
None,
json!({
"explanation": explanation,
"plan": plan,
}),
),
}
}
pub(super) fn timeline_entries_from_events(events: &[PersistedEvent]) -> Vec<TimelineEntry> {
let mut entries = Vec::new();
for event in events {
match event.event_type.as_str() {
"item_started" | "item_completed" | "turn_plan_updated" | "item/started"
| "item/completed" | "turn/plan/updated" => {
if let Ok(entry) = serde_json::from_value::<TimelineEntry>(event.payload.clone()) {
upsert_timeline_entry(&mut entries, entry);
continue;
}
}
"message_delta" | "item/agentMessage/delta" => {
apply_event_delta(&mut entries, event, "agentMessage", "Codex", "")
}
"command_output_delta" | "item/commandExecution/outputDelta" => {
apply_event_delta(&mut entries, event, "commandExecution", "命令输出", "")
}
"file_change_output_delta" | "item/fileChange/outputDelta" => {
apply_event_delta(&mut entries, event, "fileChange", "文件改动", "")
}
"plan_delta" | "item/plan/delta" => {
apply_event_delta(&mut entries, event, "plan", "执行计划", "")
}
"mcp_tool_call_progress" | "item/mcpToolCall/progress" => {
apply_event_delta(&mut entries, event, "mcpToolCall", "MCP 工具", "\n")
}
"reasoning_text_delta" | "item/reasoning/textDelta" => {
apply_reasoning_event_delta(&mut entries, event, false)
}
"reasoning_summary_part_added" | "item/reasoning/summaryPartAdded" => {
ensure_reasoning_event_entry(&mut entries, event);
continue;
}
"reasoning_summary_text_delta" | "item/reasoning/summaryTextDelta" => {
apply_reasoning_event_delta(&mut entries, event, true)
}
"diff_updated" | "turn/diff/updated" => {
if let Some(entry) = timeline_entry_from_diff_event(event) {
upsert_timeline_entry(&mut entries, entry);
continue;
}
}
_ => {}
}
entries.push(TimelineEntry {
id: format!("event-{}", event.seq),
runtime_id: event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
thread_id: event.thread_id.clone().unwrap_or_default(),
turn_id: optional_string(&event.payload, "turnId"),
item_id: optional_string(&event.payload, "itemId"),
entry_type: event.event_type.clone(),
title: Some(event.event_type.clone()),
text: event
.payload
.get("delta")
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.unwrap_or_else(|| {
serde_json::to_string_pretty(&event.payload).unwrap_or_default()
}),
status: optional_string(&event.payload, "status"),
metadata: Value::Null,
});
}
finalize_timeline_entries(&mut entries);
entries
}
fn timeline_entry_id(turn_id: Option<&str>, item_id: Option<&str>, raw_type: &str) -> String {
format!(
"{}:{}",
turn_id.unwrap_or("turn"),
item_id.unwrap_or(raw_type)
)
}
fn canonical_timeline_entry_type(raw_type: &str, item: &Value) -> String {
match raw_type {
"message" => {
return match message_role(item).as_deref() {
Some("user") => "userMessage".to_string(),
Some("assistant") => "agentMessage".to_string(),
Some("system") | Some("developer") => "hookPrompt".to_string(),
_ => raw_type.to_string(),
};
}
"user_message" => return "userMessage".to_string(),
"agent_message" => return "agentMessage".to_string(),
"collabAgentToolCall" => return "collabToolCall".to_string(),
"command_output" => return "commandExecution".to_string(),
"file_change_output" => return "fileChange".to_string(),
_ => {}
}
raw_type.to_string()
}
fn message_role(item: &Value) -> Option<String> {
optional_string(item, "role").map(|role| role.trim().to_lowercase())
}
fn timeline_render_kind(raw_type: &str) -> &'static str {
match raw_type {
"userMessage" => "user",
"agentMessage" | "message" => "agent",
"reasoning" | "reasoning_text" | "summary_text" => "thinking",
"plan" => "plan",
"commandExecution"
| "mcpToolCall"
| "dynamicToolCall"
| "webSearch"
| "imageGeneration"
| "imageView"
| "collabToolCall"
| "collabAgentToolCall"
| "function_call"
| "function_call_output"
| "custom_tool_call"
| "custom_tool_call_output"
| "file_search_call"
| "web_search_call"
| "computer_call"
| "computer_call_output"
| "code_interpreter_call"
| "shell_call"
| "shell_call_output"
| "local_shell_call"
| "local_shell_call_output"
| "apply_patch_call"
| "apply_patch_call_output"
| "image_generation_call"
| "mcp_call"
| "mcp_approval_request"
| "mcp_approval_response"
| "mcp_list_tools"
| "tool_search_output" => "tool",
"fileChange" | "diff" => "diff",
"hookPrompt" | "contextCompaction" | "enteredReviewMode" | "exitedReviewMode"
| "compaction" => "system",
_ => "fallback",
}
}
fn timeline_collapse_hint(render_kind: &str, _raw_type: &str) -> Value {
match render_kind {
"user" | "agent" => json!({
"enabled": false,
"preferCollapsed": false,
"lineThreshold": 0,
"charThreshold": 0,
}),
"tool" | "thinking" => json!({
"enabled": true,
"preferCollapsed": true,
"lineThreshold": 8,
"charThreshold": 320,
}),
"diff" => json!({
"enabled": true,
"preferCollapsed": true,
"lineThreshold": 12,
"charThreshold": 480,
}),
_ => json!({
"enabled": true,
"preferCollapsed": false,
"lineThreshold": 8,
"charThreshold": 320,
}),
}
}
fn timeline_stream_metadata(
is_streaming: bool,
authoritative: bool,
phase: Option<String>,
summary_index: Option<i64>,
content_index: Option<i64>,
) -> Value {
json!({
"isStreaming": is_streaming,
"authoritative": authoritative,
"phase": phase,
"summaryIndex": summary_index,
"contentIndex": content_index,
})
}
fn timeline_lifecycle_info(
authoritative: bool,
status: Option<&str>,
has_visible_content: bool,
) -> TimelineLifecycleInfo {
let source_status = status
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned);
let normalized_status = source_status.as_deref().map(|value| value.to_lowercase());
let stage = if authoritative {
match normalized_status.as_deref() {
Some("failed") => "failed",
Some("declined") => "declined",
_ => "completed",
}
} else if has_visible_content {
"streaming"
} else {
"waiting"
};
TimelineLifecycleInfo::new(stage, source_status, has_visible_content)
}
fn timeline_has_visible_content(
text: &str,
payload: &Value,
summary: Option<&TimelineSummaryInfo>,
) -> bool {
!text.trim().is_empty()
|| timeline_payload_has_visible_content(payload)
|| summary.is_some_and(summary_has_visible_content)
}
fn timeline_payload_has_visible_content(value: &Value) -> bool {
match value {
Value::Null => false,
Value::Bool(flag) => *flag,
Value::Number(_) => true,
Value::String(text) => !text.trim().is_empty(),
Value::Array(items) => items.iter().any(timeline_payload_has_visible_content),
Value::Object(object) => object.iter().any(|(key, child)| {
matches!(
key.as_str(),
"text"
| "content"
| "summary"
| "output"
| "result"
| "error"
| "stdout"
| "stderr"
| "changes"
| "query"
| "path"
| "targets"
| "command"
| "review"
| "diff"
| "explanation"
| "label"
| "title"
| "primaryPath"
) && timeline_payload_has_visible_content(child)
}),
}
}
fn summary_has_visible_content(summary: &TimelineSummaryInfo) -> bool {
summary
.title
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
|| summary
.label
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
|| summary
.command
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
|| summary
.query
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
|| summary
.primary_path
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
|| !summary.targets.is_empty()
|| summary.file_count.unwrap_or_default() > 0
|| summary.add_lines.unwrap_or_default() > 0
|| summary.remove_lines.unwrap_or_default() > 0
|| summary.wait_count.unwrap_or_default() > 0
}
fn timeline_metadata(
source_kind: &str,
raw_type: &str,
render_kind: &str,
collapse_hint: Value,
stream: Value,
lifecycle: Option<&TimelineLifecycleInfo>,
payload: Value,
semantic: Option<&TimelineSemanticInfo>,
summary: Option<&TimelineSummaryInfo>,
wire: Value,
) -> Value {
json!({
"schemaVersion": TIMELINE_SCHEMA_VERSION,
"sourceKind": source_kind,
"rawType": raw_type,
"renderKind": render_kind,
"collapseHint": collapse_hint,
"stream": stream,
"lifecycle": lifecycle.map(TimelineLifecycleInfo::to_value).unwrap_or(Value::Null),
"payload": payload,
"wire": wire,
"semantic": semantic.map(TimelineSemanticInfo::to_value).unwrap_or(Value::Null),
"summary": summary.map(TimelineSummaryInfo::to_value).unwrap_or(Value::Null),
})
}
fn timeline_semantic_from_item(
raw_type: &str,
entry_type: &str,
item: &Value,
payload: &Value,
) -> Option<TimelineSemanticInfo> {
match entry_type {
"commandExecution" => {
let actions = payload
.get("commandActions")
.and_then(Value::as_array)
.map(Vec::as_slice)
.unwrap_or(&[]);
if let Some(detail) = explored_detail_from_command_actions(actions) {
return Some(TimelineSemanticInfo::new(
"explored",
Some(detail.to_string()),
"high",
"primary",
));
}
Some(TimelineSemanticInfo::new(
"ran",
Some("run".to_string()),
"high",
"primary",
))
}
"webSearch" => Some(TimelineSemanticInfo::new(
"explored",
Some("search".to_string()),
"high",
"primary",
)),
"fileChange" | "diff" => Some(TimelineSemanticInfo::new("edited", None, "high", "primary")),
"collabToolCall" => {
let tool = payload_string(payload, "tool")
.or_else(|| optional_string(item, "tool"))
.unwrap_or_default();
if normalize_token(&tool) == "wait" {
return Some(TimelineSemanticInfo::new(
"waited",
Some("wait_agents".to_string()),
"high",
"primary",
));
}
None
}
"shell_call" | "local_shell_call" => classify_shell_like_semantic(payload, "primary"),
"web_search_call" | "file_search_call" => Some(TimelineSemanticInfo::new(
"explored",
Some("search".to_string()),
"low",
"primary",
)),
"apply_patch_call" | "apply_patch_call_output" => {
let patch_like = payload_string(payload, "operation")
.or_else(|| payload_string(payload, "output"))
.is_some_and(|text| looks_like_patch_text(&text));
patch_like.then(|| {
TimelineSemanticInfo::new(
"edited",
None,
"low",
if entry_type.ends_with("_output") {
"output"
} else {
"primary"
},
)
})
}
_ => {
let normalized_raw_type = normalize_token(raw_type);
if normalized_raw_type == "websearchcall" || normalized_raw_type == "filesearchcall" {
return Some(TimelineSemanticInfo::new(
"explored",
Some("search".to_string()),
"low",
"primary",
));
}
None
}
}
}
fn timeline_summary_from_item(
entry_type: &str,
item: &Value,
payload: &Value,
semantic: Option<&TimelineSemanticInfo>,
) -> Option<TimelineSummaryInfo> {
let semantic = semantic?;
match semantic.kind.as_str() {
"ran" => Some(timeline_summary_for_ran(item, payload, semantic)),
"explored" => Some(timeline_summary_for_explored(item, payload, semantic)),
"edited" => Some(timeline_summary_for_edited(
entry_type, item, payload, semantic,
)),
"waited" => Some(timeline_summary_for_waited(payload, semantic)),
_ => None,
}
}
fn timeline_summary_for_ran(
item: &Value,
payload: &Value,
semantic: &TimelineSemanticInfo,
) -> TimelineSummaryInfo {
TimelineSummaryInfo {
title: Some(semantic_title(&semantic.kind).to_string()),
label: Some(detail_label(semantic.detail.as_deref()).to_string()),
command: payload_string(payload, "command")
.or_else(|| first_shell_command(payload))
.or_else(|| optional_string(item, "command")),
..TimelineSummaryInfo::default()
}
}
fn timeline_summary_for_explored(
item: &Value,
payload: &Value,
semantic: &TimelineSemanticInfo,
) -> TimelineSummaryInfo {
let mut summary = TimelineSummaryInfo {
title: Some(semantic_title(&semantic.kind).to_string()),
label: Some(detail_label(semantic.detail.as_deref()).to_string()),
command: payload_string(payload, "command")
.or_else(|| first_shell_command(payload))
.or_else(|| optional_string(item, "command")),
query: payload_string(payload, "query")
.or_else(|| first_search_query(payload))
.or_else(|| extract_query_from_web_search_action(payload)),
..TimelineSummaryInfo::default()
};
if let Some(actions) = payload.get("commandActions").and_then(Value::as_array) {
for action in actions {
if let Some(target) = command_action_target(action) {
push_unique(&mut summary.targets, target);
}
if summary.query.is_none() {
summary.query = command_action_query(action);
}
}
}
if summary.targets.is_empty() {
extract_targets_from_payload(payload)
.into_iter()
.for_each(|target| push_unique(&mut summary.targets, target));
}
if summary.command.is_none() {
summary.command = optional_string(item, "command");
}
summary
}
fn timeline_summary_for_edited(
entry_type: &str,
item: &Value,
payload: &Value,
semantic: &TimelineSemanticInfo,
) -> TimelineSummaryInfo {
if entry_type == "fileChange" {
let changes = payload
.get("changes")
.and_then(Value::as_array)
.map(Vec::as_slice)
.unwrap_or(&[]);
return timeline_summary_from_file_changes(changes, semantic);
}
if entry_type == "diff" {
return timeline_summary_from_diff_text(&pretty_json(
item.get("diff").unwrap_or(&Value::Null),
));
}
let patch_text = payload_string(payload, "operation")
.or_else(|| payload_string(payload, "output"))
.unwrap_or_default();
let mut summary = timeline_summary_from_diff_text(&patch_text);
summary.title = Some(semantic_title(&semantic.kind).to_string());
summary.label = Some(semantic_title(&semantic.kind).to_string());
summary
}
fn timeline_summary_for_waited(
payload: &Value,
semantic: &TimelineSemanticInfo,
) -> TimelineSummaryInfo {
let wait_count = payload
.get("receiverThreadIds")
.and_then(Value::as_array)
.map(|items| items.len() as i64)
.or_else(|| {
payload
.get("agentsStates")
.and_then(Value::as_object)
.map(|items| items.len() as i64)
});
TimelineSummaryInfo {
title: Some(semantic_title(&semantic.kind).to_string()),
label: Some(detail_label(semantic.detail.as_deref()).to_string()),
wait_count,
..TimelineSummaryInfo::default()
}
}
fn timeline_summary_from_file_changes(
changes: &[Value],
semantic: &TimelineSemanticInfo,
) -> TimelineSummaryInfo {
let mut summary = TimelineSummaryInfo {
title: Some(semantic_title(&semantic.kind).to_string()),
label: Some(semantic_title(&semantic.kind).to_string()),
file_count: Some(changes.len() as i64),
..TimelineSummaryInfo::default()
};
let mut add_lines = 0_i64;
let mut remove_lines = 0_i64;
for change in changes {
if summary.primary_path.is_none() {
summary.primary_path = change_path(change);
}
let (added, removed) = summarize_change_counts(change);
add_lines += added;
remove_lines += removed;
}
summary.add_lines = Some(add_lines);
summary.remove_lines = Some(remove_lines);
summary
}
fn timeline_summary_from_diff_text(diff: &str) -> TimelineSummaryInfo {
let (primary_path, file_count, add_lines, remove_lines) = summarize_diff_text(diff);
TimelineSummaryInfo {
title: Some("Edited".to_string()),
label: Some("Edited".to_string()),
primary_path,
file_count: Some(file_count),
add_lines: Some(add_lines),
remove_lines: Some(remove_lines),
..TimelineSummaryInfo::default()
}
}
fn semantic_title(kind: &str) -> &'static str {
match kind {
"ran" => "Ran",
"explored" => "Explored",
"edited" => "Edited",
"waited" => "Waited",
_ => "Tool",
}
}
fn detail_label(detail: Option<&str>) -> &'static str {
match detail.unwrap_or_default() {
"read" => "Read",
"list" => "List",
"search" => "Search",
"mixed" => "Mixed",
"wait_agents" => "Wait",
"run" | "" => "Run",
_ => "Run",
}
}
fn classify_shell_like_semantic(payload: &Value, role: &str) -> Option<TimelineSemanticInfo> {
let command = first_shell_command(payload)?;
let normalized = normalize_token(&unwrap_shell_wrapper(&command));
let detail = if normalized.starts_with("cat ")
|| normalized.starts_with("sed ")
|| normalized.starts_with("head ")
|| normalized.starts_with("tail ")
|| normalized.starts_with("bat ")
|| normalized == "cat"
|| normalized == "sed"
|| normalized == "head"
|| normalized == "tail"
|| normalized == "bat"
{
"read"
} else if normalized.starts_with("ls ")
|| normalized.starts_with("tree ")
|| normalized.starts_with("find ")
|| normalized.starts_with("fd ")
|| normalized == "ls"
|| normalized == "tree"
|| normalized == "find"
|| normalized == "fd"
{
"list"
} else if normalized.starts_with("rg ")
|| normalized.starts_with("grep ")
|| normalized.starts_with("git grep ")
|| normalized.starts_with("findstr ")
|| normalized == "rg"
|| normalized == "grep"
|| normalized == "findstr"
{
"search"
} else {
"run"
};
let kind = if detail == "run" { "ran" } else { "explored" };
Some(TimelineSemanticInfo::new(
kind,
Some(detail.to_string()),
"low",
role.to_string(),
))
}
fn explored_detail_from_command_actions(actions: &[Value]) -> Option<&'static str> {
if actions.is_empty() {
return None;
}
let mut has_read = false;
let mut has_list = false;
let mut has_search = false;
for action in actions {
match normalize_token(&payload_string(action, "type").unwrap_or_default()).as_str() {
"read" => has_read = true,
"listfiles" => has_list = true,
"search" => has_search = true,
_ => return None,
}
}
Some(match (has_read, has_list, has_search) {
(true, false, false) => "read",
(false, true, false) => "list",
(false, false, true) => "search",
_ => "mixed",
})
}
fn finalize_timeline_entries(entries: &mut [TimelineEntry]) {
let mut semantic_by_call: HashMap<
(String, String, String),
(TimelineSemanticInfo, TimelineSummaryInfo),
> = HashMap::new();
for entry in entries.iter() {
let Some(call_id) = metadata_call_id(&entry.metadata) else {
continue;
};
let Some(semantic) = metadata_semantic(&entry.metadata) else {
continue;
};
if semantic.role == "output" {
continue;
}
let summary = metadata_summary(&entry.metadata).unwrap_or_default();
semantic_by_call.insert(
(
entry.thread_id.clone(),
turn_lookup_key(entry.turn_id.as_deref()),
call_id,
),
(semantic, summary),
);
}
for entry in entries.iter_mut() {
if !is_output_entry_type(&entry.entry_type) {
continue;
}
if metadata_semantic(&entry.metadata).is_some() {
continue;
}
let Some(call_id) = metadata_call_id(&entry.metadata) else {
continue;
};
let lookup_key = (
entry.thread_id.clone(),
turn_lookup_key(entry.turn_id.as_deref()),
call_id,
);
let Some((semantic, summary)) = semantic_by_call.get(&lookup_key) else {
continue;
};
let mut inherited_semantic = semantic.clone();
inherited_semantic.role = "output".to_string();
set_metadata_semantic(entry, &inherited_semantic, Some(summary));
}
}
fn is_output_entry_type(entry_type: &str) -> bool {
matches!(
entry_type,
"function_call_output"
| "custom_tool_call_output"
| "shell_call_output"
| "local_shell_call_output"
| "apply_patch_call_output"
| "computer_call_output"
)
}
fn turn_lookup_key(turn_id: Option<&str>) -> String {
turn_id.unwrap_or_default().to_string()
}
fn metadata_call_id(metadata: &Value) -> Option<String> {
metadata
.get("payload")
.and_then(|payload| payload.get("callId"))
.and_then(Value::as_str)
.map(ToOwned::to_owned)
}
fn metadata_semantic(metadata: &Value) -> Option<TimelineSemanticInfo> {
TimelineSemanticInfo::from_value(metadata.get("semantic")?)
}
fn metadata_summary(metadata: &Value) -> Option<TimelineSummaryInfo> {
TimelineSummaryInfo::from_value(metadata.get("summary")?)
}
fn set_metadata_semantic(
entry: &mut TimelineEntry,
semantic: &TimelineSemanticInfo,
summary: Option<&TimelineSummaryInfo>,
) {
if entry.metadata.is_null() || !entry.metadata.is_object() {
entry.metadata = json!({});
}
let Some(metadata_object) = entry.metadata.as_object_mut() else {
return;
};
metadata_object.insert("semantic".to_string(), semantic.to_value());
if let Some(summary) = summary {
metadata_object.insert("summary".to_string(), summary.to_value());
}
}
fn payload_string(payload: &Value, key: &str) -> Option<String> {
payload
.get(key)
.and_then(Value::as_str)
.map(ToOwned::to_owned)
}
fn first_shell_command(payload: &Value) -> Option<String> {
payload_string(payload, "command").or_else(|| {
payload
.get("commands")
.and_then(Value::as_array)
.into_iter()
.flatten()
.find_map(|item| match item {
Value::String(text) => Some(text.clone()),
Value::Object(_) => item
.get("command")
.and_then(Value::as_str)
.map(ToOwned::to_owned),
_ => None,
})
})
}
fn first_search_query(payload: &Value) -> Option<String> {
payload_string(payload, "query").or_else(|| {
payload
.get("queries")
.and_then(Value::as_array)
.into_iter()
.flatten()
.find_map(|item| match item {
Value::String(text) => Some(text.clone()),
Value::Object(_) => item
.get("query")
.or_else(|| item.get("text"))
.and_then(Value::as_str)
.map(ToOwned::to_owned),
_ => None,
})
})
}
fn extract_query_from_web_search_action(payload: &Value) -> Option<String> {
let action = payload.get("action")?;
action
.get("query")
.or_else(|| action.get("pattern"))
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.or_else(|| {
action
.get("queries")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter_map(Value::as_str)
.map(ToOwned::to_owned)
.next()
})
}
fn command_action_target(action: &Value) -> Option<String> {
let action_type = normalize_token(&payload_string(action, "type").unwrap_or_default());
match action_type.as_str() {
"read" => payload_string(action, "name").or_else(|| payload_string(action, "path")),
"listfiles" => payload_string(action, "path").or_else(|| payload_string(action, "command")),
"search" => payload_string(action, "path"),
_ => None,
}
}
fn command_action_query(action: &Value) -> Option<String> {
let action_type = normalize_token(&payload_string(action, "type").unwrap_or_default());
(action_type == "search")
.then(|| payload_string(action, "query").or_else(|| payload_string(action, "command")))
.flatten()
}
fn extract_targets_from_payload(payload: &Value) -> Vec<String> {
let mut targets = Vec::new();
if let Some(path) = payload_string(payload, "path") {
push_unique(&mut targets, path);
}
if let Some(items) = payload.get("queries").and_then(Value::as_array) {
items.iter().for_each(|item| {
if let Some(text) = item.as_str().map(ToOwned::to_owned) {
push_unique(&mut targets, text);
}
});
}
targets
}
fn unwrap_shell_wrapper(command: &str) -> String {
let trimmed = command.trim();
for marker in [" -lc ", " -c "] {
if let Some((_, script)) = trimmed.split_once(marker) {
return script
.trim()
.trim_matches('\'')
.trim_matches('"')
.to_string();
}
}
trimmed.to_string()
}
fn normalize_token(value: &str) -> String {
value
.trim()
.to_lowercase()
.replace(['_', '-', '\n', '\r', '\t'], " ")
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
}
fn push_unique(values: &mut Vec<String>, candidate: String) {
if candidate.trim().is_empty() || values.iter().any(|value| value == &candidate) {
return;
}
values.push(candidate);
}
fn change_path(change: &Value) -> Option<String> {
optional_string(change, "path").or_else(|| optional_string(change, "filePath"))
}
fn summarize_change_counts(change: &Value) -> (i64, i64) {
let kind = optional_string(change, "kind")
.or_else(|| optional_string(change, "type"))
.or_else(|| optional_string(change, "status"))
.unwrap_or_default();
let diff = optional_string(change, "diff")
.or_else(|| optional_string(change, "content"))
.unwrap_or_default();
if looks_like_patch_text(&diff) {
let (_, _, add_lines, remove_lines) = summarize_diff_text(&diff);
return (add_lines, remove_lines);
}
match normalize_token(&kind).as_str() {
"add" | "added" => (count_content_lines(&diff), 0),
"delete" | "deleted" => (0, count_content_lines(&diff)),
_ => (0, 0),
}
}
fn looks_like_patch_text(text: &str) -> bool {
let trimmed = text.trim();
trimmed.contains("*** Begin Patch")
|| trimmed.contains("diff --git ")
|| trimmed.contains("@@")
|| trimmed.contains("+++ ")
|| trimmed.contains("--- ")
}
fn summarize_diff_text(diff: &str) -> (Option<String>, i64, i64, i64) {
if diff.contains("*** Begin Patch") {
return summarize_apply_patch_text(diff);
}
summarize_unified_diff_text(diff)
}
fn summarize_apply_patch_text(diff: &str) -> (Option<String>, i64, i64, i64) {
let mut primary_path = None;
let mut file_count = 0_i64;
let mut add_lines = 0_i64;
let mut remove_lines = 0_i64;
for line in diff.lines() {
if let Some(path) = line
.strip_prefix("*** Update File: ")
.or_else(|| line.strip_prefix("*** Add File: "))
.or_else(|| line.strip_prefix("*** Delete File: "))
{
file_count += 1;
if primary_path.is_none() {
primary_path = Some(path.trim().to_string());
}
continue;
}
if line.starts_with("+++")
|| line.starts_with("---")
|| line.starts_with("***")
|| line.starts_with("@@")
{
continue;
}
if line.starts_with('+') {
add_lines += 1;
} else if line.starts_with('-') {
remove_lines += 1;
}
}
(primary_path, file_count.max(1), add_lines, remove_lines)
}
fn summarize_unified_diff_text(diff: &str) -> (Option<String>, i64, i64, i64) {
let mut primary_path = None;
let mut file_count = 0_i64;
let mut add_lines = 0_i64;
let mut remove_lines = 0_i64;
for line in diff.lines() {
if let Some(path) = line
.strip_prefix("+++ b/")
.or_else(|| line.strip_prefix("+++ "))
{
if path != "/dev/null" {
file_count += 1;
if primary_path.is_none() {
primary_path = Some(path.trim().to_string());
}
}
continue;
}
if line.starts_with("diff --git ") {
if primary_path.is_none() {
primary_path = line
.split_whitespace()
.nth(2)
.map(|path| path.trim_start_matches("a/").to_string());
}
continue;
}
if line.starts_with("+++") || line.starts_with("---") {
continue;
}
if line.starts_with('+') {
add_lines += 1;
} else if line.starts_with('-') {
remove_lines += 1;
}
}
(primary_path, file_count.max(1), add_lines, remove_lines)
}
fn count_content_lines(text: &str) -> i64 {
text.lines().count() as i64
}
fn timeline_entry_title(entry_type: &str, item: &Value) -> Option<String> {
match entry_type {
"userMessage" => Some("你".to_string()),
"agentMessage" => {
let phase = optional_string(item, "phase");
if matches!(phase.as_deref(), Some("final_answer")) {
Some("最终回复".to_string())
} else if matches!(phase.as_deref(), Some("commentary")) {
Some("中间回复".to_string())
} else {
Some("Codex".to_string())
}
}
"hookPrompt" => Some("系统提示".to_string()),
"plan" => Some("执行计划".to_string()),
"reasoning" | "reasoning_text" | "summary_text" => Some("思考过程".to_string()),
"commandExecution" => {
optional_string(item, "command").or_else(|| Some("命令输出".to_string()))
}
"fileChange" => Some("文件改动".to_string()),
"mcpToolCall" => {
let server = optional_string(item, "server");
let tool = optional_string(item, "tool");
match (server, tool) {
(Some(server), Some(tool)) => Some(format!("{server} / {tool}")),
(_, Some(tool)) => Some(tool),
_ => Some("MCP 工具".to_string()),
}
}
"dynamicToolCall" => optional_string(item, "tool").or_else(|| Some("动态工具".to_string())),
"collabToolCall" => optional_string(item, "tool").or_else(|| Some("协作代理".to_string())),
"webSearch" => optional_string(item, "query").or_else(|| Some("网络搜索".to_string())),
"imageView" => Some("查看图片".to_string()),
"imageGeneration" => Some("图像生成".to_string()),
"function_call" => optional_string(item, "name").or_else(|| Some("函数调用".to_string())),
"function_call_output" => {
optional_string(item, "name").or_else(|| Some("函数结果".to_string()))
}
"custom_tool_call" => {
optional_string(item, "name").or_else(|| Some("自定义工具".to_string()))
}
"custom_tool_call_output" => {
optional_string(item, "name").or_else(|| Some("自定义工具结果".to_string()))
}
"file_search_call" => Some("文件搜索".to_string()),
"web_search_call" => Some("网络搜索".to_string()),
"code_interpreter_call" => Some("代码解释器".to_string()),
"shell_call" | "local_shell_call" => Some("Shell 调用".to_string()),
"shell_call_output" | "local_shell_call_output" => Some("Shell 输出".to_string()),
"apply_patch_call" => Some("补丁调用".to_string()),
"apply_patch_call_output" => Some("补丁结果".to_string()),
"image_generation_call" => Some("图像生成".to_string()),
"mcp_call" => {
let server = optional_string(item, "server_label");
let tool = optional_string(item, "name");
match (server, tool) {
(Some(server), Some(tool)) => Some(format!("{server} / {tool}")),
(_, Some(tool)) => Some(tool),
_ => Some("MCP 调用".to_string()),
}
}
"mcp_approval_request" => Some("MCP 审批请求".to_string()),
"mcp_approval_response" => Some("MCP 审批结果".to_string()),
"mcp_list_tools" => Some("MCP 工具列表".to_string()),
"enteredReviewMode" => Some("进入 Review 模式".to_string()),
"exitedReviewMode" => Some("退出 Review 模式".to_string()),
"contextCompaction" | "compaction" => Some("上下文压缩".to_string()),
other => Some(other.to_string()),
}
}
fn timeline_entry_status(entry_type: &str, item: &Value, is_streaming: bool) -> Option<String> {
let explicit_status = match entry_type {
"userMessage"
| "agentMessage"
| "hookPrompt"
| "reasoning"
| "plan"
| "reasoning_text"
| "summary_text"
| "commandExecution"
| "fileChange"
| "mcpToolCall"
| "dynamicToolCall"
| "collabToolCall"
| "imageGeneration"
| "function_call"
| "function_call_output"
| "custom_tool_call"
| "custom_tool_call_output"
| "file_search_call"
| "web_search_call"
| "computer_call"
| "computer_call_output"
| "code_interpreter_call"
| "shell_call"
| "shell_call_output"
| "local_shell_call"
| "local_shell_call_output"
| "apply_patch_call"
| "apply_patch_call_output"
| "image_generation_call"
| "mcp_call"
| "mcp_approval_request"
| "mcp_approval_response"
| "mcp_list_tools" => optional_string(item, "status"),
_ => None,
};
explicit_status.or_else(|| is_streaming.then(|| "inProgress".to_string()))
}
fn timeline_text_from_thread_item(entry_type: &str, item: &Value) -> String {
match entry_type {
"userMessage" => item
.get("content")
.and_then(Value::as_array)
.map(|items| extract_content_items_text(items))
.filter(|text| !text.trim().is_empty())
.or_else(|| optional_string(item, "text"))
.unwrap_or_default(),
"hookPrompt" => pretty_json(item.get("fragments").unwrap_or(&Value::Null)),
"agentMessage" => optional_string(item, "text")
.or_else(|| {
item.get("content")
.and_then(Value::as_array)
.map(|items| extract_content_items_text(items))
})
.unwrap_or_default(),
"plan" => optional_string(item, "text").unwrap_or_default(),
"reasoning" | "reasoning_text" | "summary_text" => build_reasoning_text(
item.get("summary")
.and_then(Value::as_array)
.map(Vec::as_slice),
item.get("content")
.and_then(Value::as_array)
.map(Vec::as_slice),
),
"commandExecution" => optional_string(item, "aggregatedOutput").unwrap_or_default(),
"fileChange" => format_file_changes(item.get("changes").and_then(Value::as_array)),
"mcpToolCall" => format_tool_result(item, &["result", "error", "arguments"]),
"dynamicToolCall" => format_tool_result(item, &["contentItems", "arguments"]),
"collabToolCall" => format_tool_result(item, &["agentsStates", "prompt"]),
"webSearch" => build_web_search_text(item),
"imageView" => optional_string(item, "path").unwrap_or_default(),
"imageGeneration" => optional_string(item, "result").unwrap_or_default(),
"function_call" => format_tool_result(item, &["arguments"]),
"function_call_output" => format_tool_result(item, &["output"]),
"custom_tool_call" => format_tool_result(item, &["input", "arguments", "call_input"]),
"custom_tool_call_output" => format_tool_result(item, &["output"]),
"file_search_call" => format_tool_result(item, &["queries", "results"]),
"web_search_call" => format_tool_result(item, &["query", "action", "results"]),
"computer_call" => format_tool_result(item, &["action", "arguments"]),
"computer_call_output" => format_tool_result(item, &["output"]),
"code_interpreter_call" => format_tool_result(item, &["code", "outputs"]),
"shell_call" | "local_shell_call" => {
format_tool_result(item, &["action", "command", "commands"])
}
"shell_call_output" | "local_shell_call_output" => {
format_tool_result(item, &["output", "stdout", "stderr"])
}
"apply_patch_call" => format_tool_result(item, &["operation"]),
"apply_patch_call_output" => format_tool_result(item, &["output"]),
"image_generation_call" => optional_string(item, "result").unwrap_or_default(),
"mcp_call" => format_tool_result(item, &["arguments", "output", "error"]),
"mcp_approval_request" => format_tool_result(item, &["arguments", "reason"]),
"mcp_approval_response" => format_tool_result(item, &["reason"]),
"mcp_list_tools" => format_tool_result(item, &["tools", "error"]),
"enteredReviewMode" => "已进入 Review 模式。".to_string(),
"exitedReviewMode" => "已退出 Review 模式。".to_string(),
"contextCompaction" => "上下文已压缩。".to_string(),
"compaction" => "上下文已压缩。".to_string(),
_ => pretty_json(item),
}
}
fn timeline_payload_from_thread_item(raw_type: &str, item: &Value) -> Value {
match raw_type {
"userMessage" => json!({
"role": item.get("role").cloned().unwrap_or_else(|| json!("user")),
"content": item.get("content").cloned().unwrap_or(Value::Null),
"text": item.get("text").cloned().unwrap_or(Value::Null),
}),
"message" => json!({
"role": item.get("role").cloned().unwrap_or(Value::Null),
"phase": item.get("phase").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
"content": item.get("content").cloned().unwrap_or(Value::Null),
"text": item.get("text").cloned().unwrap_or(Value::Null),
}),
"hookPrompt" => json!({
"fragments": item.get("fragments").cloned().unwrap_or_else(|| json!([])),
}),
"agentMessage" => json!({
"role": item.get("role").cloned().unwrap_or_else(|| json!("assistant")),
"phase": item.get("phase").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
"memoryCitation": item.get("memoryCitation").cloned().unwrap_or(Value::Null),
"content": item.get("content").cloned().unwrap_or(Value::Null),
"text": item.get("text").cloned().unwrap_or(Value::Null),
}),
"plan" => json!({
"text": item.get("text").cloned().unwrap_or(Value::Null),
}),
"reasoning" | "reasoning_text" | "summary_text" => json!({
"summary": item.get("summary").cloned().unwrap_or_else(|| json!([])),
"content": item.get("content").cloned().unwrap_or_else(|| json!([])),
}),
"commandExecution" => json!({
"command": item.get("command").cloned().unwrap_or(Value::Null),
"commandActions": item.get("commandActions").cloned().unwrap_or_else(|| json!([])),
"cwd": item.get("cwd").cloned().unwrap_or(Value::Null),
"exitCode": item.get("exitCode").cloned().unwrap_or(Value::Null),
"durationMs": item.get("durationMs").cloned().unwrap_or(Value::Null),
"processId": item.get("processId").cloned().unwrap_or(Value::Null),
"source": item.get("source").cloned().unwrap_or(Value::Null),
}),
"fileChange" => json!({
"changes": item.get("changes").cloned().unwrap_or_else(|| json!([])),
}),
"mcpToolCall" => json!({
"server": item.get("server").cloned().unwrap_or(Value::Null),
"tool": item.get("tool").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"result": item.get("result").cloned().unwrap_or(Value::Null),
"error": item.get("error").cloned().unwrap_or(Value::Null),
"durationMs": item.get("durationMs").cloned().unwrap_or(Value::Null),
}),
"dynamicToolCall" => json!({
"tool": item.get("tool").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"contentItems": item.get("contentItems").cloned().unwrap_or(Value::Null),
"success": item.get("success").cloned().unwrap_or(Value::Null),
"durationMs": item.get("durationMs").cloned().unwrap_or(Value::Null),
}),
"collabToolCall" | "collabAgentToolCall" => json!({
"tool": item.get("tool").cloned().unwrap_or(Value::Null),
"agentsStates": item.get("agentsStates").cloned().unwrap_or(Value::Null),
"model": item.get("model").cloned().unwrap_or(Value::Null),
"prompt": item.get("prompt").cloned().unwrap_or(Value::Null),
"reasoningEffort": item.get("reasoningEffort").cloned().unwrap_or(Value::Null),
"receiverThreadIds": item.get("receiverThreadIds").cloned().unwrap_or(Value::Null),
"senderThreadId": item.get("senderThreadId").cloned().unwrap_or(Value::Null),
}),
"webSearch" => json!({
"query": item.get("query").cloned().unwrap_or(Value::Null),
"action": item.get("action").cloned().unwrap_or(Value::Null),
}),
"imageView" => json!({
"path": item.get("path").cloned().unwrap_or(Value::Null),
}),
"imageGeneration" => json!({
"result": item.get("result").cloned().unwrap_or(Value::Null),
"revisedPrompt": item.get("revisedPrompt").cloned().unwrap_or(Value::Null),
"savedPath": item.get("savedPath").cloned().unwrap_or(Value::Null),
}),
"function_call" => json!({
"name": item.get("name").cloned().unwrap_or(Value::Null),
"namespace": item.get("namespace").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"callId": call_id_value(item),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"function_call_output" => json!({
"callId": call_id_value(item),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"custom_tool_call" => json!({
"name": item.get("name").cloned().unwrap_or(Value::Null),
"input": item.get("input").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"callInput": item.get("call_input").cloned().unwrap_or(Value::Null),
"callId": call_id_value(item),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"custom_tool_call_output" => json!({
"callId": call_id_value(item),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"file_search_call" => json!({
"callId": call_id_value(item),
"queries": item.get("queries").cloned().unwrap_or(Value::Null),
"results": item.get("results").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"web_search_call" => json!({
"callId": call_id_value(item),
"query": item.get("query").cloned().unwrap_or(Value::Null),
"action": item.get("action").cloned().unwrap_or(Value::Null),
"results": item.get("results").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"computer_call" => json!({
"callId": call_id_value(item),
"action": item.get("action").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"computer_call_output" => json!({
"callId": call_id_value(item),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"code_interpreter_call" => json!({
"callId": call_id_value(item),
"code": item.get("code").cloned().unwrap_or(Value::Null),
"outputs": item.get("outputs").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"shell_call" | "local_shell_call" => json!({
"callId": call_id_value(item),
"action": item.get("action").cloned().unwrap_or(Value::Null),
"command": item.get("command").cloned().unwrap_or(Value::Null),
"commands": item.get("commands").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"shell_call_output" | "local_shell_call_output" => json!({
"callId": call_id_value(item),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"stdout": item.get("stdout").cloned().unwrap_or(Value::Null),
"stderr": item.get("stderr").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"apply_patch_call" => json!({
"callId": call_id_value(item),
"operation": item.get("operation").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"apply_patch_call_output" => json!({
"callId": call_id_value(item),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"image_generation_call" => json!({
"callId": call_id_value(item),
"result": item.get("result").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"mcp_call" => json!({
"callId": call_id_value(item),
"serverLabel": item.get("server_label").cloned().unwrap_or(Value::Null),
"name": item.get("name").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
"output": item.get("output").cloned().unwrap_or(Value::Null),
"error": item.get("error").cloned().unwrap_or(Value::Null),
"status": item.get("status").cloned().unwrap_or(Value::Null),
}),
"mcp_approval_request" => json!({
"serverLabel": item.get("server_label").cloned().unwrap_or(Value::Null),
"name": item.get("name").cloned().unwrap_or(Value::Null),
"arguments": item.get("arguments").cloned().unwrap_or(Value::Null),
}),
"mcp_approval_response" => json!({
"approve": item.get("approve").cloned().unwrap_or(Value::Null),
"reason": item.get("reason").cloned().unwrap_or(Value::Null),
}),
"mcp_list_tools" => json!({
"serverLabel": item.get("server_label").cloned().unwrap_or(Value::Null),
"tools": item.get("tools").cloned().unwrap_or(Value::Null),
"error": item.get("error").cloned().unwrap_or(Value::Null),
}),
"enteredReviewMode" | "exitedReviewMode" => json!({
"review": item.get("review").cloned().unwrap_or(Value::Null),
}),
"contextCompaction" | "compaction" => json!({}),
_ => item.clone(),
}
}
fn extract_content_items_text(items: &[Value]) -> String {
items
.iter()
.filter_map(extract_content_item_text)
.collect::<Vec<_>>()
.join("\n")
}
fn extract_content_item_text(item: &Value) -> Option<String> {
let item_type = optional_string(item, "type");
let text = optional_string(item, "text");
match item_type.as_deref() {
Some("input_text")
| Some("output_text")
| Some("text")
| Some("reasoning_text")
| Some("summary_text") => text,
Some("input_image") => {
optional_string(item, "image_url").map(|url| format!("[图片] {url}"))
}
Some("input_file") => optional_string(item, "filename")
.or_else(|| optional_string(item, "file_id"))
.map(|name| format!("[文件] {name}")),
Some("refusal") => optional_string(item, "refusal"),
_ => text.or_else(|| {
(!item.is_null() && !item.is_object())
.then(|| item.to_string())
.or_else(|| {
let pretty = pretty_json(item);
(!pretty.is_empty()).then_some(pretty)
})
}),
}
}
fn build_reasoning_text(summary: Option<&[Value]>, content: Option<&[Value]>) -> String {
let summary_text = summary
.map(extract_content_items_text)
.unwrap_or_default()
.trim()
.to_string();
let content_text = content
.map(extract_content_items_text)
.unwrap_or_default()
.trim()
.to_string();
match (summary_text.is_empty(), content_text.is_empty()) {
(false, false) => format!("思考摘要\n{summary_text}\n\n思考内容\n{content_text}"),
(false, true) => summary_text,
(true, false) => content_text,
(true, true) => String::new(),
}
}
fn format_plan_payload(explanation: Option<&str>, plan: Option<&Vec<Value>>) -> String {
let mut lines = Vec::new();
if let Some(explanation) = explanation.filter(|value| !value.trim().is_empty()) {
lines.push(explanation.trim().to_string());
}
if let Some(plan) = plan {
for step in plan {
let step_text = optional_string(step, "step").unwrap_or_else(|| pretty_json(step));
let status = optional_string(step, "status")
.map(|value| format_plan_status(&value))
.unwrap_or("待处理");
lines.push(format!("[{status}] {step_text}"));
}
}
lines.join("\n")
}
fn build_web_search_text(item: &Value) -> String {
let query = optional_string(item, "query").unwrap_or_default();
let action = item.get("action").cloned().unwrap_or(Value::Null);
if action.is_null() {
return query;
}
if query.trim().is_empty() {
return pretty_json(&action);
}
format!("{query}\n\n{}", pretty_json(&action))
}
fn call_id_value(item: &Value) -> Value {
item.get("call_id")
.cloned()
.or_else(|| item.get("callId").cloned())
.unwrap_or(Value::Null)
}
fn format_tool_result(item: &Value, keys: &[&str]) -> String {
let sections = keys
.iter()
.filter_map(|key| {
item.get(*key)
.filter(|value| !value.is_null())
.map(|value| {
let label = match *key {
"arguments" => "参数",
"result" | "contentItems" => "结果",
"error" => "错误",
"agentsStates" => "代理状态",
"prompt" => "提示词",
_ => *key,
};
format!("{label}\n{}", pretty_json(value))
})
})
.collect::<Vec<_>>();
sections.join("\n\n")
}
fn format_file_changes(changes: Option<&Vec<Value>>) -> String {
let Some(changes) = changes else {
return String::new();
};
let summary = changes
.iter()
.map(|change| {
let kind = optional_string(change, "type")
.or_else(|| optional_string(change, "status"))
.unwrap_or_else(|| "change".to_string());
let path = optional_string(change, "path")
.or_else(|| optional_string(change, "filePath"))
.unwrap_or_else(|| pretty_json(change));
format!("{kind}: {path}")
})
.collect::<Vec<_>>()
.join("\n");
if summary.trim().is_empty() {
pretty_json(&Value::Array(changes.clone()))
} else {
summary
}
}
fn pretty_json(value: &Value) -> String {
match value {
Value::Null => String::new(),
Value::String(text) => text.clone(),
_ => serde_json::to_string_pretty(value).unwrap_or_default(),
}
}
fn merge_timeline_metadata(existing: &mut Value, incoming: Value) {
if existing.is_null() || !existing.is_object() || !incoming.is_object() {
*existing = incoming;
return;
}
let Some(existing_object) = existing.as_object_mut() else {
*existing = incoming;
return;
};
let Some(incoming_object) = incoming.as_object() else {
*existing = incoming;
return;
};
for (key, value) in incoming_object {
match key.as_str() {
"payload" => merge_timeline_payload(existing_object, value),
"stream" | "lifecycle" | "semantic" | "summary" => {
merge_timeline_object(existing_object, key, value)
}
_ => {
existing_object.insert(key.clone(), value.clone());
}
}
}
}
fn merge_timeline_payload(existing_object: &mut serde_json::Map<String, Value>, incoming: &Value) {
let payload = existing_object
.entry("payload".to_string())
.or_insert_with(|| json!({}));
if payload.is_null() || !payload.is_object() || !incoming.is_object() {
*payload = incoming.clone();
return;
}
let Some(existing_payload) = payload.as_object_mut() else {
*payload = incoming.clone();
return;
};
let Some(incoming_payload) = incoming.as_object() else {
*payload = incoming.clone();
return;
};
for (key, value) in incoming_payload {
if matches!(key.as_str(), "summary" | "content")
&& value.as_array().is_some_and(|array| array.is_empty())
&& existing_payload
.get(key)
.and_then(Value::as_array)
.is_some_and(|array| !array.is_empty())
{
continue;
}
existing_payload.insert(key.clone(), value.clone());
}
}
fn merge_timeline_object(
existing_object: &mut serde_json::Map<String, Value>,
key: &str,
incoming: &Value,
) {
if incoming.is_null() {
return;
}
let target = existing_object
.entry(key.to_string())
.or_insert_with(|| json!({}));
if target.is_null() || !target.is_object() || !incoming.is_object() {
*target = incoming.clone();
return;
}
let Some(target_object) = target.as_object_mut() else {
*target = incoming.clone();
return;
};
let Some(incoming_object) = incoming.as_object() else {
*target = incoming.clone();
return;
};
for (child_key, child_value) in incoming_object {
target_object.insert(child_key.clone(), child_value.clone());
}
}
fn format_plan_status(status: &str) -> &'static str {
match status {
"completed" => "已完成",
"inProgress" | "in_progress" => "进行中",
"pending" => "待处理",
_ => "待处理",
}
}
fn upsert_timeline_entry(entries: &mut Vec<TimelineEntry>, entry: TimelineEntry) {
if entry.entry_type == "plan" {
entries.retain(|existing| {
!(existing.entry_type == "plan"
&& existing.turn_id == entry.turn_id
&& existing
.item_id
.as_deref()
.is_some_and(|item_id| item_id.starts_with("turn-plan:"))
&& entry
.item_id
.as_deref()
.is_some_and(|item_id| !item_id.starts_with("turn-plan:")))
});
}
let index = entries.iter().rposition(|existing| {
existing.id == entry.id
|| (entry.item_id.is_some()
&& existing.item_id == entry.item_id
&& existing.thread_id == entry.thread_id)
});
if let Some(index) = index {
let existing = &entries[index];
let mut merged_entry = entry;
if merged_entry.text.is_empty() {
merged_entry.text = existing.text.clone();
}
if merged_entry.title.is_none() {
merged_entry.title = existing.title.clone();
}
if merged_entry.status.is_none() {
merged_entry.status = existing.status.clone();
}
if merged_entry.metadata.is_null() {
merged_entry.metadata = existing.metadata.clone();
} else if !existing.metadata.is_null() {
let mut merged_metadata = existing.metadata.clone();
merge_timeline_metadata(&mut merged_metadata, merged_entry.metadata.clone());
merged_entry.metadata = merged_metadata;
}
entries[index] = merged_entry;
} else {
entries.push(entry);
}
}
fn apply_event_delta(
entries: &mut Vec<TimelineEntry>,
event: &PersistedEvent,
default_entry_type: &str,
default_title: &str,
separator: &str,
) {
let runtime_id = event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let item_id = optional_string(&event.payload, "itemId");
let entry_type = optional_string(&event.payload, "entryType")
.unwrap_or_else(|| default_entry_type.to_string());
let title =
optional_string(&event.payload, "title").or_else(|| Some(default_title.to_string()));
let status = optional_string(&event.payload, "status");
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
let delta = optional_string(&event.payload, "delta").unwrap_or_default();
let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), &entry_type);
let index = entries.iter().rposition(|entry| {
entry.item_id == item_id && entry.thread_id == thread_id && entry.turn_id == turn_id
});
if let Some(index) = index {
entries[index].entry_type = entry_type.clone();
if !entries[index].text.is_empty() && !separator.is_empty() && !delta.is_empty() {
entries[index].text.push_str(separator);
}
entries[index].text.push_str(&delta);
entries[index].status = status.or(entries[index].status.clone());
if !metadata.is_null() {
merge_timeline_metadata(&mut entries[index].metadata, metadata);
}
if let Some(title) = title {
entries[index].title = Some(title);
}
} else {
entries.push(TimelineEntry {
id: entry_id,
runtime_id,
thread_id,
turn_id,
item_id,
entry_type,
title,
text: delta,
status,
metadata,
});
}
}
fn ensure_reasoning_event_entry(entries: &mut Vec<TimelineEntry>, event: &PersistedEvent) {
let runtime_id = event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let item_id = optional_string(&event.payload, "itemId");
let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), "reasoning");
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
if entries
.iter()
.any(|entry| entry.item_id == item_id && entry.entry_type == "reasoning")
{
return;
}
entries.push(TimelineEntry {
id: entry_id,
runtime_id,
thread_id,
turn_id,
item_id,
entry_type: "reasoning".to_string(),
title: Some("思考过程".to_string()),
text: String::new(),
status: optional_string(&event.payload, "status"),
metadata,
});
}
fn apply_reasoning_event_delta(
entries: &mut Vec<TimelineEntry>,
event: &PersistedEvent,
summary: bool,
) {
ensure_reasoning_event_entry(entries, event);
let Some(index) = entries.iter().rposition(|entry| {
entry.item_id == optional_string(&event.payload, "itemId")
&& entry.entry_type == "reasoning"
}) else {
return;
};
let delta = optional_string(&event.payload, "delta").unwrap_or_default();
let target_index = if summary {
event.payload.get("summaryIndex").and_then(Value::as_i64)
} else {
event.payload.get("contentIndex").and_then(Value::as_i64)
};
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
if !metadata.is_null() {
merge_timeline_metadata(&mut entries[index].metadata, metadata);
}
if let Some(target_index) = target_index {
if let Some(payload) = entries[index].metadata.get_mut("payload") {
let container_key = if summary { "summary" } else { "content" };
let item_type = if summary {
"summary_text"
} else {
"reasoning_text"
};
ensure_text_slot(payload, container_key, target_index as usize, item_type);
append_text_slot(payload, container_key, target_index as usize, &delta);
entries[index].text = build_reasoning_text(
payload
.get("summary")
.and_then(Value::as_array)
.map(Vec::as_slice),
payload
.get("content")
.and_then(Value::as_array)
.map(Vec::as_slice),
);
}
}
}
fn ensure_text_slot(payload: &mut Value, container_key: &str, index: usize, item_type: &str) {
if !payload.is_object() {
*payload = json!({});
}
let object = payload.as_object_mut().expect("payload 应为对象");
let container = object
.entry(container_key.to_string())
.or_insert_with(|| json!([]));
if !container.is_array() {
*container = json!([]);
}
let array = container.as_array_mut().expect("container 应为数组");
while array.len() <= index {
array.push(json!({
"type": item_type,
"text": "",
}));
}
}
fn append_text_slot(payload: &mut Value, container_key: &str, index: usize, delta: &str) {
if let Some(item) = payload
.get_mut(container_key)
.and_then(Value::as_array_mut)
.and_then(|array| array.get_mut(index))
.and_then(Value::as_object_mut)
{
let text = item
.entry("text".to_string())
.or_insert_with(|| Value::String(String::new()));
let buffer = text.as_str().unwrap_or_default().to_string() + delta;
*text = Value::String(buffer);
}
}
fn timeline_entry_from_diff_event(event: &PersistedEvent) -> Option<TimelineEntry> {
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let diff = optional_string(&event.payload, "diff")?;
Some(TimelineEntry {
id: format!(
"{}:diff",
turn_id.clone().unwrap_or_else(|| "turn".to_string())
),
runtime_id: event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
thread_id,
turn_id,
item_id: Some("diff".to_string()),
entry_type: "diff".to_string(),
title: Some("统一 Diff".to_string()),
text: diff.clone(),
status: None,
metadata: timeline_metadata(
"legacy_event",
"turn/diff/updated",
"diff",
timeline_collapse_hint("diff", "diff"),
timeline_stream_metadata(true, false, None, None, None),
None,
json!({}),
Some(&TimelineSemanticInfo::new(
"edited", None, "high", "primary",
)),
Some(&timeline_summary_from_diff_text(&diff)),
event.payload.clone(),
),
})
}