use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use serde_json::{Map, Value, json};
use tracing::warn;
use super::history_rollout::{RolloutCommandRecord, load_rollout_command_records};
pub(super) fn enrich_thread_history_from_rollout(thread: &mut Value, codex_home: Option<&str>) {
let Some(thread_id) = thread.get("id").and_then(Value::as_str) else {
return;
};
let Some(rollout_path) = resolve_rollout_path(thread, codex_home, thread_id) else {
return;
};
let records = match load_rollout_command_records(&rollout_path) {
Ok(records) => records,
Err(error) => {
warn!(
"读取历史 rollout 失败,已跳过补偿: thread_id={thread_id} path={} error={error}",
rollout_path.display()
);
return;
}
};
if records.is_empty() {
return;
}
apply_rollout_records(thread, &records);
}
fn resolve_rollout_path(
thread: &Value,
codex_home: Option<&str>,
thread_id: &str,
) -> Option<PathBuf> {
if let Some(path) = thread.get("path").and_then(Value::as_str) {
let candidate = PathBuf::from(path);
if candidate.is_file() {
return Some(candidate);
}
warn!(
"thread.path 指向的 rollout 文件不存在,尝试回退到 CODEX_HOME 扫描: thread_id={thread_id} path={}",
candidate.display()
);
}
let codex_home = codex_home.filter(|value| !value.trim().is_empty())?;
for relative in ["sessions", "archived_sessions"] {
let root = Path::new(codex_home).join(relative);
if let Some(path) = find_rollout_path(&root, thread_id, 0) {
return Some(path);
}
}
None
}
fn find_rollout_path(root: &Path, thread_id: &str, depth: usize) -> Option<PathBuf> {
if depth > 4 || !root.is_dir() {
return None;
}
let entries = fs::read_dir(root).ok()?;
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(found) = find_rollout_path(&path, thread_id, depth + 1) {
return Some(found);
}
continue;
}
let file_name = path.file_name().and_then(|name| name.to_str())?;
if file_name.starts_with("rollout-")
&& file_name.ends_with(".jsonl")
&& file_name.contains(thread_id)
{
return Some(path);
}
}
None
}
fn apply_rollout_records(thread: &mut Value, records: &[RolloutCommandRecord]) {
let Some(turns) = thread.get_mut("turns").and_then(Value::as_array_mut) else {
return;
};
let record_by_call_id = records
.iter()
.map(|record| (record.call_id.as_str(), record))
.collect::<HashMap<_, _>>();
let mut existing_call_ids = HashSet::new();
for turn in turns.iter_mut() {
let Some(items) = ensure_turn_items(turn) else {
continue;
};
for item in items.iter_mut() {
if item.get("type").and_then(Value::as_str) != Some("commandExecution") {
continue;
}
let Some(call_id) = item.get("id").and_then(Value::as_str) else {
continue;
};
existing_call_ids.insert(call_id.to_string());
if let Some(record) = record_by_call_id.get(call_id) {
patch_existing_command_item(item, record);
}
}
}
let mut missing_by_turn = HashMap::<String, Vec<&RolloutCommandRecord>>::new();
for record in records {
if existing_call_ids.contains(&record.call_id) {
continue;
}
let Some(turn_id) = record.turn_id.as_ref() else {
continue;
};
missing_by_turn
.entry(turn_id.clone())
.or_default()
.push(record);
}
for turn in turns.iter_mut() {
let Some(turn_id) = turn.get("id").and_then(Value::as_str) else {
continue;
};
let Some(records) = missing_by_turn.get(turn_id) else {
continue;
};
let Some(items) = ensure_turn_items(turn) else {
continue;
};
insert_missing_command_items(items, records);
}
}
fn ensure_turn_items(turn: &mut Value) -> Option<&mut Vec<Value>> {
let object = turn.as_object_mut()?;
if !object.contains_key("items") {
object.insert("items".to_string(), Value::Array(Vec::new()));
}
object.get_mut("items").and_then(Value::as_array_mut)
}
fn patch_existing_command_item(item: &mut Value, record: &RolloutCommandRecord) {
if command_output_is_blank(item.get("aggregatedOutput"))
&& let Some(output_text) = record.output_text.as_ref()
{
item["aggregatedOutput"] = Value::String(output_text.clone());
}
if item.get("exitCode").and_then(Value::as_i64).is_none()
&& let Some(exit_code) = record.exit_code
{
item["exitCode"] = json!(exit_code);
}
if item.get("status").and_then(Value::as_str).is_none()
&& let Some(status) = record.status.as_ref()
{
item["status"] = Value::String(status.clone());
}
if item
.get("commandActions")
.and_then(Value::as_array)
.is_none_or(|actions| actions.is_empty())
&& !record.command_actions.is_empty()
{
item["commandActions"] = Value::Array(record.command_actions.clone());
}
if item
.get("command")
.and_then(Value::as_str)
.is_none_or(|command| command.trim().is_empty())
&& let Some(command) = record.raw_command.as_ref()
{
item["command"] = Value::String(command.clone());
}
}
fn insert_missing_command_items(items: &mut Vec<Value>, records: &[&RolloutCommandRecord]) {
let insert_at = first_final_answer_index(items).unwrap_or(items.len());
let mut offset = 0;
for record in records
.iter()
.copied()
.filter_map(build_command_execution_item)
{
items.insert(insert_at + offset, record);
offset += 1;
}
}
fn first_final_answer_index(items: &[Value]) -> Option<usize> {
items.iter().position(|item| {
item.get("type").and_then(Value::as_str) == Some("agentMessage")
&& item.get("phase").and_then(Value::as_str) == Some("final_answer")
})
}
fn build_command_execution_item(record: &RolloutCommandRecord) -> Option<Value> {
let command = record
.raw_command
.as_ref()
.map(|value| value.trim())
.unwrap_or("");
if command.is_empty() && record.command_actions.is_empty() {
return None;
}
let mut item = Map::new();
item.insert("id".to_string(), Value::String(record.call_id.clone()));
item.insert(
"type".to_string(),
Value::String("commandExecution".to_string()),
);
item.insert(
"status".to_string(),
Value::String(
record
.status
.clone()
.unwrap_or_else(|| status_from_exit_code(record.exit_code)),
),
);
item.insert("command".to_string(), Value::String(command.to_string()));
item.insert(
"commandActions".to_string(),
Value::Array(record.command_actions.clone()),
);
if let Some(output_text) = record.output_text.as_ref() {
item.insert(
"aggregatedOutput".to_string(),
Value::String(output_text.clone()),
);
}
if let Some(exit_code) = record.exit_code {
item.insert("exitCode".to_string(), json!(exit_code));
}
Some(Value::Object(item))
}
fn command_output_is_blank(value: Option<&Value>) -> bool {
value
.and_then(Value::as_str)
.is_none_or(|text| text.trim().is_empty())
}
fn status_from_exit_code(exit_code: Option<i64>) -> String {
match exit_code {
Some(0) | None => "completed".to_string(),
Some(_) => "failed".to_string(),
}
}