codex-mobile-bridge 0.2.6

Remote bridge and service manager for codex-mobile.
Documentation
use serde_json::{json, Value};

use super::super::helpers::optional_string;
use super::content::build_reasoning_text;
use super::metadata::{
    timeline_collapse_hint, timeline_metadata, timeline_stream_metadata, TimelineSemanticInfo,
};
use super::metadata_merge::{finalize_timeline_entries, merge_timeline_metadata};
use super::normalize::timeline_entry_id;
use super::summary::timeline_summary_from_diff_text;
use crate::bridge_protocol::{PersistedEvent, TimelineEntry};
use crate::storage::PRIMARY_RUNTIME_ID;

pub(super) fn timeline_entries_from_events(events: &[PersistedEvent]) -> Vec<TimelineEntry> {
    let mut entries = Vec::new();
    for event in events {
        match event.event_type.as_str() {
            "item_started" | "item_completed" | "turn_plan_updated" | "item/started"
            | "item/completed" | "turn/plan/updated" => {
                if let Ok(entry) = serde_json::from_value::<TimelineEntry>(event.payload.clone()) {
                    upsert_timeline_entry(&mut entries, entry);
                    continue;
                }
            }
            "message_delta" | "item/agentMessage/delta" => {
                apply_event_delta(&mut entries, event, "agentMessage", "Codex", "")
            }
            "command_output_delta" | "item/commandExecution/outputDelta" => {
                apply_event_delta(&mut entries, event, "commandExecution", "命令输出", "")
            }
            "file_change_output_delta" | "item/fileChange/outputDelta" => {
                apply_event_delta(&mut entries, event, "fileChange", "文件改动", "")
            }
            "plan_delta" | "item/plan/delta" => {
                apply_event_delta(&mut entries, event, "plan", "执行计划", "")
            }
            "mcp_tool_call_progress" | "item/mcpToolCall/progress" => {
                apply_event_delta(&mut entries, event, "mcpToolCall", "MCP 工具", "\n")
            }
            "reasoning_text_delta" | "item/reasoning/textDelta" => {
                apply_reasoning_event_delta(&mut entries, event, false)
            }
            "reasoning_summary_part_added" | "item/reasoning/summaryPartAdded" => {
                ensure_reasoning_event_entry(&mut entries, event);
                continue;
            }
            "reasoning_summary_text_delta" | "item/reasoning/summaryTextDelta" => {
                apply_reasoning_event_delta(&mut entries, event, true)
            }
            "diff_updated" | "turn/diff/updated" => {
                if let Some(entry) = timeline_entry_from_diff_event(event) {
                    upsert_timeline_entry(&mut entries, entry);
                    continue;
                }
            }
            _ => {}
        }

        entries.push(TimelineEntry {
            id: format!("event-{}", event.seq),
            runtime_id: event
                .runtime_id
                .clone()
                .unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
            thread_id: event.thread_id.clone().unwrap_or_default(),
            turn_id: optional_string(&event.payload, "turnId"),
            item_id: optional_string(&event.payload, "itemId"),
            entry_type: event.event_type.clone(),
            title: Some(event.event_type.clone()),
            text: event
                .payload
                .get("delta")
                .and_then(Value::as_str)
                .map(ToOwned::to_owned)
                .unwrap_or_else(|| {
                    serde_json::to_string_pretty(&event.payload).unwrap_or_default()
                }),
            status: optional_string(&event.payload, "status"),
            metadata: Value::Null,
        });
    }
    finalize_timeline_entries(&mut entries);
    entries
}

fn upsert_timeline_entry(entries: &mut Vec<TimelineEntry>, entry: TimelineEntry) {
    if entry.entry_type == "plan" {
        entries.retain(|existing| {
            !(existing.entry_type == "plan"
                && existing.turn_id == entry.turn_id
                && existing
                    .item_id
                    .as_deref()
                    .is_some_and(|item_id| item_id.starts_with("turn-plan:"))
                && entry
                    .item_id
                    .as_deref()
                    .is_some_and(|item_id| !item_id.starts_with("turn-plan:")))
        });
    }

    let index = entries.iter().rposition(|existing| {
        existing.id == entry.id
            || (entry.item_id.is_some()
                && existing.item_id == entry.item_id
                && existing.thread_id == entry.thread_id)
    });

    if let Some(index) = index {
        let existing = &entries[index];
        let mut merged_entry = entry;
        if merged_entry.text.is_empty() {
            merged_entry.text = existing.text.clone();
        }
        if merged_entry.title.is_none() {
            merged_entry.title = existing.title.clone();
        }
        if merged_entry.status.is_none() {
            merged_entry.status = existing.status.clone();
        }
        if merged_entry.metadata.is_null() {
            merged_entry.metadata = existing.metadata.clone();
        } else if !existing.metadata.is_null() {
            let mut merged_metadata = existing.metadata.clone();
            merge_timeline_metadata(&mut merged_metadata, merged_entry.metadata.clone());
            merged_entry.metadata = merged_metadata;
        }
        entries[index] = merged_entry;
    } else {
        entries.push(entry);
    }
}

fn apply_event_delta(
    entries: &mut Vec<TimelineEntry>,
    event: &PersistedEvent,
    default_entry_type: &str,
    default_title: &str,
    separator: &str,
) {
    let runtime_id = event
        .runtime_id
        .clone()
        .unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
    let thread_id = event.thread_id.clone().unwrap_or_default();
    let turn_id = optional_string(&event.payload, "turnId");
    let item_id = optional_string(&event.payload, "itemId");
    let entry_type = optional_string(&event.payload, "entryType")
        .unwrap_or_else(|| default_entry_type.to_string());
    let title =
        optional_string(&event.payload, "title").or_else(|| Some(default_title.to_string()));
    let status = optional_string(&event.payload, "status");
    let metadata = event
        .payload
        .get("metadata")
        .cloned()
        .unwrap_or(Value::Null);
    let delta = optional_string(&event.payload, "delta").unwrap_or_default();
    let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), &entry_type);

    let index = entries.iter().rposition(|entry| {
        entry.item_id == item_id && entry.thread_id == thread_id && entry.turn_id == turn_id
    });

    if let Some(index) = index {
        entries[index].entry_type = entry_type.clone();
        if !entries[index].text.is_empty() && !separator.is_empty() && !delta.is_empty() {
            entries[index].text.push_str(separator);
        }
        entries[index].text.push_str(&delta);
        entries[index].status = status.or(entries[index].status.clone());
        if !metadata.is_null() {
            merge_timeline_metadata(&mut entries[index].metadata, metadata);
        }
        if let Some(title) = title {
            entries[index].title = Some(title);
        }
    } else {
        entries.push(TimelineEntry {
            id: entry_id,
            runtime_id,
            thread_id,
            turn_id,
            item_id,
            entry_type,
            title,
            text: delta,
            status,
            metadata,
        });
    }
}

fn ensure_reasoning_event_entry(entries: &mut Vec<TimelineEntry>, event: &PersistedEvent) {
    let runtime_id = event
        .runtime_id
        .clone()
        .unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string());
    let thread_id = event.thread_id.clone().unwrap_or_default();
    let turn_id = optional_string(&event.payload, "turnId");
    let item_id = optional_string(&event.payload, "itemId");
    let entry_id = timeline_entry_id(turn_id.as_deref(), item_id.as_deref(), "reasoning");
    let metadata = event
        .payload
        .get("metadata")
        .cloned()
        .unwrap_or(Value::Null);
    if entries
        .iter()
        .any(|entry| entry.item_id == item_id && entry.entry_type == "reasoning")
    {
        return;
    }
    entries.push(TimelineEntry {
        id: entry_id,
        runtime_id,
        thread_id,
        turn_id,
        item_id,
        entry_type: "reasoning".to_string(),
        title: Some("思考过程".to_string()),
        text: String::new(),
        status: optional_string(&event.payload, "status"),
        metadata,
    });
}

fn apply_reasoning_event_delta(
    entries: &mut Vec<TimelineEntry>,
    event: &PersistedEvent,
    summary: bool,
) {
    ensure_reasoning_event_entry(entries, event);
    let Some(index) = entries.iter().rposition(|entry| {
        entry.item_id == optional_string(&event.payload, "itemId")
            && entry.entry_type == "reasoning"
    }) else {
        return;
    };
    let delta = optional_string(&event.payload, "delta").unwrap_or_default();
    let target_index = if summary {
        event.payload.get("summaryIndex").and_then(Value::as_i64)
    } else {
        event.payload.get("contentIndex").and_then(Value::as_i64)
    };
    let metadata = event
        .payload
        .get("metadata")
        .cloned()
        .unwrap_or(Value::Null);
    if !metadata.is_null() {
        merge_timeline_metadata(&mut entries[index].metadata, metadata);
    }
    if let Some(target_index) = target_index {
        if let Some(payload) = entries[index].metadata.get_mut("payload") {
            let container_key = if summary { "summary" } else { "content" };
            let item_type = if summary {
                "summary_text"
            } else {
                "reasoning_text"
            };
            ensure_text_slot(payload, container_key, target_index as usize, item_type);
            append_text_slot(payload, container_key, target_index as usize, &delta);
            entries[index].text = build_reasoning_text(
                payload
                    .get("summary")
                    .and_then(Value::as_array)
                    .map(Vec::as_slice),
                payload
                    .get("content")
                    .and_then(Value::as_array)
                    .map(Vec::as_slice),
            );
        }
    }
}

fn ensure_text_slot(payload: &mut Value, container_key: &str, index: usize, item_type: &str) {
    if !payload.is_object() {
        *payload = json!({});
    }
    let object = payload.as_object_mut().expect("payload 应为对象");
    let container = object
        .entry(container_key.to_string())
        .or_insert_with(|| json!([]));
    if !container.is_array() {
        *container = json!([]);
    }
    let array = container.as_array_mut().expect("container 应为数组");
    while array.len() <= index {
        array.push(json!({
            "type": item_type,
            "text": "",
        }));
    }
}

fn append_text_slot(payload: &mut Value, container_key: &str, index: usize, delta: &str) {
    if let Some(item) = payload
        .get_mut(container_key)
        .and_then(Value::as_array_mut)
        .and_then(|array| array.get_mut(index))
        .and_then(Value::as_object_mut)
    {
        let text = item
            .entry("text".to_string())
            .or_insert_with(|| Value::String(String::new()));
        let buffer = text.as_str().unwrap_or_default().to_string() + delta;
        *text = Value::String(buffer);
    }
}

fn timeline_entry_from_diff_event(event: &PersistedEvent) -> Option<TimelineEntry> {
    let thread_id = event.thread_id.clone().unwrap_or_default();
    let turn_id = optional_string(&event.payload, "turnId");
    let diff = optional_string(&event.payload, "diff")?;
    Some(TimelineEntry {
        id: format!(
            "{}:diff",
            turn_id.clone().unwrap_or_else(|| "turn".to_string())
        ),
        runtime_id: event
            .runtime_id
            .clone()
            .unwrap_or_else(|| PRIMARY_RUNTIME_ID.to_string()),
        thread_id,
        turn_id,
        item_id: Some("diff".to_string()),
        entry_type: "diff".to_string(),
        title: Some("统一 Diff".to_string()),
        text: diff.clone(),
        status: None,
        metadata: timeline_metadata(
            "legacy_event",
            "turn/diff/updated",
            "diff",
            timeline_collapse_hint("diff", "diff"),
            timeline_stream_metadata(true, false, None, None, None),
            None,
            json!({}),
            Some(&TimelineSemanticInfo::new(
                "edited", None, "high", "primary",
            )),
            Some(&timeline_summary_from_diff_text(&diff)),
            event.payload.clone(),
        ),
    })
}