codex-mobile-bridge 0.3.15

Remote bridge and service manager for codex-mobile.
Documentation
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};

use serde_json::{Map, Value, json};
use tracing::warn;

use super::history_rollout::{RolloutCommandRecord, load_rollout_command_records};

pub(super) fn enrich_thread_history_from_rollout(thread: &mut Value, codex_home: Option<&str>) {
    let Some(thread_id) = thread.get("id").and_then(Value::as_str) else {
        return;
    };
    let Some(rollout_path) = resolve_rollout_path(thread, codex_home, thread_id) else {
        return;
    };
    let records = match load_rollout_command_records(&rollout_path) {
        Ok(records) => records,
        Err(error) => {
            warn!(
                "读取历史 rollout 失败,已跳过补偿: thread_id={thread_id} path={} error={error}",
                rollout_path.display()
            );
            return;
        }
    };
    if records.is_empty() {
        return;
    }
    apply_rollout_records(thread, &records);
}

fn resolve_rollout_path(
    thread: &Value,
    codex_home: Option<&str>,
    thread_id: &str,
) -> Option<PathBuf> {
    if let Some(path) = thread.get("path").and_then(Value::as_str) {
        let candidate = PathBuf::from(path);
        if candidate.is_file() {
            return Some(candidate);
        }
        warn!(
            "thread.path 指向的 rollout 文件不存在,尝试回退到 CODEX_HOME 扫描: thread_id={thread_id} path={}",
            candidate.display()
        );
    }

    let codex_home = codex_home.filter(|value| !value.trim().is_empty())?;
    for relative in ["sessions", "archived_sessions"] {
        let root = Path::new(codex_home).join(relative);
        if let Some(path) = find_rollout_path(&root, thread_id, 0) {
            return Some(path);
        }
    }
    None
}

fn find_rollout_path(root: &Path, thread_id: &str, depth: usize) -> Option<PathBuf> {
    if depth > 4 || !root.is_dir() {
        return None;
    }
    let entries = fs::read_dir(root).ok()?;
    for entry in entries.flatten() {
        let path = entry.path();
        if path.is_dir() {
            if let Some(found) = find_rollout_path(&path, thread_id, depth + 1) {
                return Some(found);
            }
            continue;
        }
        let file_name = path.file_name().and_then(|name| name.to_str())?;
        if file_name.starts_with("rollout-")
            && file_name.ends_with(".jsonl")
            && file_name.contains(thread_id)
        {
            return Some(path);
        }
    }
    None
}

fn apply_rollout_records(thread: &mut Value, records: &[RolloutCommandRecord]) {
    let Some(turns) = thread.get_mut("turns").and_then(Value::as_array_mut) else {
        return;
    };

    let record_by_call_id = records
        .iter()
        .map(|record| (record.call_id.as_str(), record))
        .collect::<HashMap<_, _>>();

    let mut existing_call_ids = HashSet::new();
    for turn in turns.iter_mut() {
        let Some(items) = ensure_turn_items(turn) else {
            continue;
        };
        for item in items.iter_mut() {
            if item.get("type").and_then(Value::as_str) != Some("commandExecution") {
                continue;
            }
            let Some(call_id) = item.get("id").and_then(Value::as_str) else {
                continue;
            };
            existing_call_ids.insert(call_id.to_string());
            if let Some(record) = record_by_call_id.get(call_id) {
                patch_existing_command_item(item, record);
            }
        }
    }

    let mut missing_by_turn = HashMap::<String, Vec<&RolloutCommandRecord>>::new();
    for record in records {
        if existing_call_ids.contains(&record.call_id) {
            continue;
        }
        let Some(turn_id) = record.turn_id.as_ref() else {
            continue;
        };
        missing_by_turn
            .entry(turn_id.clone())
            .or_default()
            .push(record);
    }

    for turn in turns.iter_mut() {
        let Some(turn_id) = turn.get("id").and_then(Value::as_str) else {
            continue;
        };
        let Some(records) = missing_by_turn.get(turn_id) else {
            continue;
        };
        let Some(items) = ensure_turn_items(turn) else {
            continue;
        };
        insert_missing_command_items(items, records);
    }
}

fn ensure_turn_items(turn: &mut Value) -> Option<&mut Vec<Value>> {
    let object = turn.as_object_mut()?;
    if !object.contains_key("items") {
        object.insert("items".to_string(), Value::Array(Vec::new()));
    }
    object.get_mut("items").and_then(Value::as_array_mut)
}

fn patch_existing_command_item(item: &mut Value, record: &RolloutCommandRecord) {
    if command_output_is_blank(item.get("aggregatedOutput"))
        && let Some(output_text) = record.output_text.as_ref()
    {
        item["aggregatedOutput"] = Value::String(output_text.clone());
    }
    if item.get("exitCode").and_then(Value::as_i64).is_none()
        && let Some(exit_code) = record.exit_code
    {
        item["exitCode"] = json!(exit_code);
    }
    if item.get("status").and_then(Value::as_str).is_none()
        && let Some(status) = record.status.as_ref()
    {
        item["status"] = Value::String(status.clone());
    }
    if item
        .get("commandActions")
        .and_then(Value::as_array)
        .is_none_or(|actions| actions.is_empty())
        && !record.command_actions.is_empty()
    {
        item["commandActions"] = Value::Array(record.command_actions.clone());
    }
    if item
        .get("command")
        .and_then(Value::as_str)
        .is_none_or(|command| command.trim().is_empty())
        && let Some(command) = record.raw_command.as_ref()
    {
        item["command"] = Value::String(command.clone());
    }
}

fn insert_missing_command_items(items: &mut Vec<Value>, records: &[&RolloutCommandRecord]) {
    let insert_at = first_final_answer_index(items).unwrap_or(items.len());
    for (offset, record) in records
        .iter()
        .copied()
        .filter_map(build_command_execution_item)
        .enumerate()
    {
        items.insert(insert_at + offset, record);
    }
}

fn first_final_answer_index(items: &[Value]) -> Option<usize> {
    items.iter().position(|item| {
        item.get("type").and_then(Value::as_str) == Some("agentMessage")
            && item.get("phase").and_then(Value::as_str) == Some("final_answer")
    })
}

fn build_command_execution_item(record: &RolloutCommandRecord) -> Option<Value> {
    let command = record
        .raw_command
        .as_ref()
        .map(|value| value.trim())
        .unwrap_or("");
    if command.is_empty() && record.command_actions.is_empty() {
        return None;
    }

    let mut item = Map::new();
    item.insert("id".to_string(), Value::String(record.call_id.clone()));
    item.insert(
        "type".to_string(),
        Value::String("commandExecution".to_string()),
    );
    item.insert(
        "status".to_string(),
        Value::String(
            record
                .status
                .clone()
                .unwrap_or_else(|| status_from_exit_code(record.exit_code)),
        ),
    );
    item.insert("command".to_string(), Value::String(command.to_string()));
    item.insert(
        "commandActions".to_string(),
        Value::Array(record.command_actions.clone()),
    );
    if let Some(output_text) = record.output_text.as_ref() {
        item.insert(
            "aggregatedOutput".to_string(),
            Value::String(output_text.clone()),
        );
    }
    if let Some(exit_code) = record.exit_code {
        item.insert("exitCode".to_string(), json!(exit_code));
    }
    Some(Value::Object(item))
}

fn command_output_is_blank(value: Option<&Value>) -> bool {
    value
        .and_then(Value::as_str)
        .is_none_or(|text| text.trim().is_empty())
}

fn status_from_exit_code(exit_code: Option<i64>) -> String {
    match exit_code {
        Some(0) | None => "completed".to_string(),
        Some(_) => "failed".to_string(),
    }
}