use serde_json::{Value, json};
use super::super::helpers::optional_string;
use super::content::build_reasoning_text;
use super::metadata::{
TimelineSemanticInfo, timeline_collapse_hint, timeline_metadata, timeline_stream_metadata,
};
use super::metadata_merge::{finalize_timeline_entries, merge_timeline_metadata};
use super::normalize::timeline_entry_id;
use super::summary::timeline_summary_from_diff_text;
use crate::bridge_protocol::{PersistedEvent, TimelineEntry};
use crate::storage::PRIMARY_RUNTIME_ID;
pub(super) fn timeline_entries_from_events(events: &[PersistedEvent]) -> Vec<TimelineEntry> {
let mut entries = Vec::new();
for event in events {
match event.event_type.as_str() {
"item_started" | "item_completed" | "turn_plan_updated" | "item/started"
| "item/completed" | "turn/plan/updated" => {
if let Ok(entry) = serde_json::from_value::<TimelineEntry>(event.payload.clone()) {
upsert_timeline_entry(&mut entries, entry);
continue;
}
}
"message_delta" | "item/agentMessage/delta" => {
apply_event_delta(&mut entries, event, "agentMessage", "Codex", "")
}
"command_output_delta" | "item/commandExecution/outputDelta" => {
apply_event_delta(&mut entries, event, "commandExecution", "命令输出", "")
}
"file_change_output_delta" | "item/fileChange/outputDelta" => {
apply_event_delta(&mut entries, event, "fileChange", "文件改动", "")
}
"plan_delta" | "item/plan/delta" => {
apply_event_delta(&mut entries, event, "plan", "执行计划", "")
}
"mcp_tool_call_progress" | "item/mcpToolCall/progress" => {
apply_event_delta(&mut entries, event, "mcpToolCall", "MCP 工具", "\n")
}
"reasoning_text_delta" | "item/reasoning/textDelta" => {
apply_reasoning_event_delta(&mut entries, event, false)
}
"reasoning_summary_part_added" | "item/reasoning/summaryPartAdded" => {
ensure_reasoning_event_entry(&mut entries, event);
continue;
}
"reasoning_summary_text_delta" | "item/reasoning/summaryTextDelta" => {
apply_reasoning_event_delta(&mut entries, event, true)
}
"diff_updated" | "turn/diff/updated" => {
if let Some(entry) = timeline_entry_from_diff_event(event) {
upsert_timeline_entry(&mut entries, entry);
continue;
}
}
_ => {}
}
entries.push(TimelineEntry {
id: format!("event-{}", event.seq),
runtime_id: event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
thread_id: event.thread_id.clone().unwrap_or_default(),
turn_id: optional_string(&event.payload, "turnId"),
item_id: optional_string(&event.payload, "itemId"),
entry_type: event.event_type.clone(),
title: Some(event.event_type.clone()),
text: event
.payload
.get("delta")
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.unwrap_or_else(|| {
serde_json::to_string_pretty(&event.payload).unwrap_or_default()
}),
status: optional_string(&event.payload, "status"),
metadata: Value::Null,
});
}
finalize_timeline_entries(&mut entries);
entries
}
fn upsert_timeline_entry(entries: &mut Vec<TimelineEntry>, entry: TimelineEntry) {
if entry.entry_type == "plan" {
entries.retain(|existing| {
!(existing.entry_type == "plan"
&& existing.turn_id == entry.turn_id
&& existing
.item_id
.as_deref()
.is_some_and(|item_id| item_id.starts_with("turn-plan:"))
&& entry
.item_id
.as_deref()
.is_some_and(|item_id| !item_id.starts_with("turn-plan:")))
});
}
let index = entries.iter().rposition(|existing| {
existing.id == entry.id
|| (entry.item_id.is_some()
&& existing.item_id == entry.item_id
&& existing.thread_id == entry.thread_id)
});
if let Some(index) = index {
let existing = &entries[index];
let mut merged_entry = entry;
if merged_entry.text.is_empty() {
merged_entry.text = existing.text.clone();
}
if merged_entry.title.is_none() {
merged_entry.title = existing.title.clone();
}
if merged_entry.status.is_none() {
merged_entry.status = existing.status.clone();
}
if merged_entry.metadata.is_null() {
merged_entry.metadata = existing.metadata.clone();
} else if !existing.metadata.is_null() {
let mut merged_metadata = existing.metadata.clone();
merge_timeline_metadata(&mut merged_metadata, merged_entry.metadata.clone());
merged_entry.metadata = merged_metadata;
}
entries[index] = merged_entry;
} else {
entries.push(entry);
}
}
fn apply_event_delta(
entries: &mut Vec<TimelineEntry>,
event: &PersistedEvent,
default_entry_type: &str,
default_title: &str,
separator: &str,
) {
let runtime_id = event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let item_id = optional_string(&event.payload, "itemId");
let entry_type = optional_string(&event.payload, "entryType")
.unwrap_or_else(|| default_entry_type.to_string());
let title =
optional_string(&event.payload, "title").or_else(|| Some(default_title.to_string()));
let status = optional_string(&event.payload, "status");
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
let delta = optional_string(&event.payload, "delta").unwrap_or_default();
let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), &entry_type);
let index = entries.iter().rposition(|entry| {
entry.item_id == item_id && entry.thread_id == thread_id && entry.turn_id == turn_id
});
if let Some(index) = index {
entries[index].entry_type = entry_type.clone();
if !entries[index].text.is_empty() && !separator.is_empty() && !delta.is_empty() {
entries[index].text.push_str(separator);
}
entries[index].text.push_str(&delta);
entries[index].status = status.or(entries[index].status.clone());
if !metadata.is_null() {
merge_timeline_metadata(&mut entries[index].metadata, metadata);
}
if let Some(title) = title {
entries[index].title = Some(title);
}
} else {
entries.push(TimelineEntry {
id: entry_id,
runtime_id,
thread_id,
turn_id,
item_id,
entry_type,
title,
text: delta,
status,
metadata,
});
}
}
fn ensure_reasoning_event_entry(entries: &mut Vec<TimelineEntry>, event: &PersistedEvent) {
let runtime_id = event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let item_id = optional_string(&event.payload, "itemId");
let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), "reasoning");
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
if entries
.iter()
.any(|entry| entry.item_id == item_id && entry.entry_type == "reasoning")
{
return;
}
entries.push(TimelineEntry {
id: entry_id,
runtime_id,
thread_id,
turn_id,
item_id,
entry_type: "reasoning".to_string(),
title: Some("思考过程".to_string()),
text: String::new(),
status: optional_string(&event.payload, "status"),
metadata,
});
}
fn apply_reasoning_event_delta(
entries: &mut Vec<TimelineEntry>,
event: &PersistedEvent,
summary: bool,
) {
ensure_reasoning_event_entry(entries, event);
let Some(index) = entries.iter().rposition(|entry| {
entry.item_id == optional_string(&event.payload, "itemId")
&& entry.entry_type == "reasoning"
}) else {
return;
};
let delta = optional_string(&event.payload, "delta").unwrap_or_default();
let target_index = if summary {
event.payload.get("summaryIndex").and_then(Value::as_i64)
} else {
event.payload.get("contentIndex").and_then(Value::as_i64)
};
let metadata = event
.payload
.get("metadata")
.cloned()
.unwrap_or(Value::Null);
if !metadata.is_null() {
merge_timeline_metadata(&mut entries[index].metadata, metadata);
}
if let Some(target_index) = target_index {
if let Some(payload) = entries[index].metadata.get_mut("payload") {
let container_key = if summary { "summary" } else { "content" };
let item_type = if summary {
"summary_text"
} else {
"reasoning_text"
};
ensure_text_slot(payload, container_key, target_index as usize, item_type);
append_text_slot(payload, container_key, target_index as usize, &delta);
entries[index].text = build_reasoning_text(
payload
.get("summary")
.and_then(Value::as_array)
.map(Vec::as_slice),
payload
.get("content")
.and_then(Value::as_array)
.map(Vec::as_slice),
);
}
}
}
fn ensure_text_slot(payload: &mut Value, container_key: &str, index: usize, item_type: &str) {
if !payload.is_object() {
*payload = json!({});
}
let object = payload.as_object_mut().expect("payload 应为对象");
let container = object
.entry(container_key.to_string())
.or_insert_with(|| json!([]));
if !container.is_array() {
*container = json!([]);
}
let array = container.as_array_mut().expect("container 应为数组");
while array.len() <= index {
array.push(json!({
"type": item_type,
"text": "",
}));
}
}
fn append_text_slot(payload: &mut Value, container_key: &str, index: usize, delta: &str) {
if let Some(item) = payload
.get_mut(container_key)
.and_then(Value::as_array_mut)
.and_then(|array| array.get_mut(index))
.and_then(Value::as_object_mut)
{
let text = item
.entry("text".to_string())
.or_insert_with(|| Value::String(String::new()));
let buffer = text.as_str().unwrap_or_default().to_string() + delta;
*text = Value::String(buffer);
}
}
fn timeline_entry_from_diff_event(event: &PersistedEvent) -> Option<TimelineEntry> {
let thread_id = event.thread_id.clone().unwrap_or_default();
let turn_id = optional_string(&event.payload, "turnId");
let diff = optional_string(&event.payload, "diff")?;
Some(TimelineEntry {
id: format!(
"{}:diff",
turn_id.clone().unwrap_or_else(|| "turn".to_string())
),
runtime_id: event
.runtime_id
.clone()
.unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
thread_id,
turn_id,
item_id: Some("diff".to_string()),
entry_type: "diff".to_string(),
title: Some("统一 Diff".to_string()),
text: diff.clone(),
status: None,
metadata: timeline_metadata(
"legacy_event",
"turn/diff/updated",
"diff",
timeline_collapse_hint("diff", "diff"),
timeline_stream_metadata(true, false, None, None, None),
None,
json!({}),
Some(&TimelineSemanticInfo::new(
"edited", None, "high", "primary",
)),
Some(&timeline_summary_from_diff_text(&diff)),
event.payload.clone(),
),
})
}