use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use anyhow::{Context, Result};
use serde_json::{Value, json};
use tracing::warn;
use super::super::helpers::optional_string;
#[derive(Debug, Clone, PartialEq)]
pub(super) struct RolloutCommandRecord {
pub(super) call_id: String,
pub(super) turn_id: Option<String>,
pub(super) raw_command: Option<String>,
pub(super) workdir: Option<String>,
pub(super) command_actions: Vec<Value>,
pub(super) output_text: Option<String>,
pub(super) exit_code: Option<i64>,
pub(super) status: Option<String>,
pub(super) rollout_index: usize,
}
impl RolloutCommandRecord {
fn new(call_id: &str, rollout_index: usize) -> Self {
Self {
call_id: call_id.to_string(),
turn_id: None,
raw_command: None,
workdir: None,
command_actions: Vec::new(),
output_text: None,
exit_code: None,
status: None,
rollout_index,
}
}
}
pub(super) fn load_rollout_command_records(path: &Path) -> Result<Vec<RolloutCommandRecord>> {
let file =
File::open(path).with_context(|| format!("打开 rollout 文件失败: {}", path.display()))?;
let reader = BufReader::new(file);
let mut current_turn_id: Option<String> = None;
let mut records = HashMap::<String, RolloutCommandRecord>::new();
for (rollout_index, line_result) in reader.lines().enumerate() {
let line = match line_result {
Ok(line) => line,
Err(error) => {
warn!(
"读取 rollout 行失败,已跳过: path={} index={} error={error}",
path.display(),
rollout_index
);
continue;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(error) => {
warn!(
"解析 rollout JSON 行失败,已跳过: path={} index={} error={error}",
path.display(),
rollout_index
);
continue;
}
};
if let Some(turn_id) = extract_current_turn_id(&value) {
current_turn_id = Some(turn_id);
}
match value.get("type").and_then(Value::as_str) {
Some("response_item") => handle_response_item(
&value,
current_turn_id.as_deref(),
rollout_index,
&mut records,
),
Some("event_msg") => handle_event_msg(&value, rollout_index, &mut records),
_ => {}
}
}
let mut items = records.into_values().collect::<Vec<_>>();
items.sort_by_key(|record| record.rollout_index);
Ok(items)
}
fn handle_response_item(
value: &Value,
current_turn_id: Option<&str>,
rollout_index: usize,
records: &mut HashMap<String, RolloutCommandRecord>,
) {
let payload = value.get("payload").unwrap_or(&Value::Null);
match payload.get("type").and_then(Value::as_str) {
Some("function_call")
if payload.get("name").and_then(Value::as_str) == Some("exec_command") =>
{
let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
return;
};
let record = records
.entry(call_id.to_string())
.or_insert_with(|| RolloutCommandRecord::new(call_id, rollout_index));
if record.turn_id.is_none() {
record.turn_id = current_turn_id.map(ToOwned::to_owned);
}
let arguments = payload
.get("arguments")
.and_then(Value::as_str)
.and_then(|raw| serde_json::from_str::<Value>(raw).ok())
.unwrap_or(Value::Null);
let command = optional_string(&arguments, "cmd");
if record.raw_command.is_none() {
record.raw_command = normalize_non_blank(command.clone());
}
if record.workdir.is_none() {
record.workdir = normalize_non_blank(optional_string(&arguments, "workdir"));
}
if record.command_actions.is_empty() {
record.command_actions =
heuristic_command_actions(command.as_deref(), record.workdir.as_deref());
}
}
Some("function_call_output") => {
let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
return;
};
let record = records
.entry(call_id.to_string())
.or_insert_with(|| RolloutCommandRecord::new(call_id, rollout_index));
if record.turn_id.is_none() {
record.turn_id = current_turn_id.map(ToOwned::to_owned);
}
let Some(raw_text) = function_call_output_text(payload.get("output")) else {
return;
};
if record.output_text.is_none() {
record.output_text = extract_command_output_text(&raw_text);
}
if record.exit_code.is_none() {
record.exit_code = parse_exit_code(&raw_text);
}
if record.status.is_none() {
record.status = record.exit_code.map(status_from_exit_code);
}
}
_ => {}
}
}
fn handle_event_msg(
value: &Value,
rollout_index: usize,
records: &mut HashMap<String, RolloutCommandRecord>,
) {
let payload = value.get("payload").unwrap_or(&Value::Null);
if payload.get("type").and_then(Value::as_str) != Some("exec_command_end") {
return;
}
let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
return;
};
let record = records
.entry(call_id.to_string())
.or_insert_with(|| RolloutCommandRecord::new(call_id, rollout_index));
if let Some(turn_id) = optional_string(payload, "turn_id") {
record.turn_id = Some(turn_id);
}
if record.raw_command.is_none() {
record.raw_command = raw_command_from_exec_event(payload);
}
if record.workdir.is_none() {
record.workdir = normalize_non_blank(optional_string(payload, "cwd"));
}
let parsed_actions = command_actions_from_parsed(
payload.get("parsed_cmd").unwrap_or(&Value::Null),
record.raw_command.as_deref(),
);
if !parsed_actions.is_empty() {
record.command_actions = parsed_actions;
} else if record.command_actions.is_empty() {
record.command_actions =
heuristic_command_actions(record.raw_command.as_deref(), record.workdir.as_deref());
}
if record.output_text.is_none() {
record.output_text = normalize_non_blank(optional_string(payload, "aggregated_output"));
}
if record.exit_code.is_none() {
record.exit_code = payload.get("exit_code").and_then(Value::as_i64);
}
if record.status.is_none() {
record.status = normalize_non_blank(optional_string(payload, "status"));
}
}
fn extract_current_turn_id(value: &Value) -> Option<String> {
match value.get("type").and_then(Value::as_str) {
Some("turn_context") => value
.get("payload")
.and_then(|payload| payload.get("turn_id"))
.and_then(Value::as_str)
.map(ToOwned::to_owned),
Some("event_msg") => {
let payload = value.get("payload").unwrap_or(&Value::Null);
match payload.get("type").and_then(Value::as_str) {
Some("turn_started") => optional_string(payload, "turn_id").or_else(|| {
payload
.get("turn")
.and_then(|turn| optional_string(turn, "id"))
}),
_ => None,
}
}
_ => None,
}
}
fn raw_command_from_exec_event(payload: &Value) -> Option<String> {
let args = payload
.get("command")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter_map(Value::as_str)
.collect::<Vec<_>>();
if args.is_empty() {
return None;
}
if args.len() >= 3 && args[1] == "-lc" {
return normalize_non_blank(Some(args[2].to_string()));
}
normalize_non_blank(Some(shell_join(&args)))
}
fn command_actions_from_parsed(parsed_cmd: &Value, fallback_command: Option<&str>) -> Vec<Value> {
parsed_cmd
.as_array()
.into_iter()
.flatten()
.filter_map(|item| match item.get("type").and_then(Value::as_str) {
Some("read") => Some(json!({
"type": "read",
"command": optional_string(item, "cmd").or_else(|| fallback_command.map(ToOwned::to_owned)).unwrap_or_default(),
"name": optional_string(item, "name").unwrap_or_else(|| optional_string(item, "path").unwrap_or_default()),
"path": optional_string(item, "path").unwrap_or_default(),
})),
Some("list_files") | Some("listFiles") => Some(json!({
"type": "listFiles",
"command": optional_string(item, "cmd").or_else(|| fallback_command.map(ToOwned::to_owned)).unwrap_or_default(),
"path": optional_string(item, "path"),
})),
Some("search") => Some(json!({
"type": "search",
"command": optional_string(item, "cmd").or_else(|| fallback_command.map(ToOwned::to_owned)).unwrap_or_default(),
"query": optional_string(item, "query"),
"path": optional_string(item, "path"),
})),
Some("unknown") => Some(json!({
"type": "run",
"command": optional_string(item, "cmd").or_else(|| fallback_command.map(ToOwned::to_owned)).unwrap_or_default(),
})),
_ => None,
})
.collect()
}
fn heuristic_command_actions(raw_command: Option<&str>, workdir: Option<&str>) -> Vec<Value> {
let Some(command) = normalize_non_blank(raw_command.map(ToOwned::to_owned)) else {
return Vec::new();
};
let command_lower = command.to_ascii_lowercase();
if looks_like_search_command(&command_lower) {
return vec![json!({
"type": "search",
"command": command,
"path": workdir,
})];
}
if looks_like_read_command(&command_lower) {
let target = infer_read_target(&command).unwrap_or_else(|| command.clone());
return vec![json!({
"type": "read",
"command": command,
"name": target,
})];
}
if looks_like_list_command(&command_lower) {
return vec![json!({
"type": "listFiles",
"command": command,
"path": workdir,
})];
}
vec![json!({
"type": "run",
"command": command,
})]
}
fn looks_like_search_command(command: &str) -> bool {
command.contains("rg ")
|| command.contains("grep ")
|| (command.contains("find ")
&& [" -name ", " -path ", " -iname ", " -ipath "]
.iter()
.any(|flag| command.contains(flag)))
}
fn looks_like_read_command(command: &str) -> bool {
command.starts_with("cat ")
|| command.starts_with("sed -n ")
|| command.starts_with("head ")
|| command.starts_with("tail ")
}
fn looks_like_list_command(command: &str) -> bool {
command.starts_with("pwd")
|| command.contains(" ls ")
|| command.starts_with("ls ")
|| command.contains("rg --files")
|| command.starts_with("fd ")
|| (command.contains("find ") && !looks_like_search_command(command))
}
fn infer_read_target(command: &str) -> Option<String> {
let token = command
.split_whitespace()
.rev()
.map(trim_shell_token)
.find(|token| {
!token.is_empty()
&& !token.starts_with('-')
&& token != "|"
&& token != "&&"
&& token != ";"
})?;
let path = Path::new(&token);
path.file_name()
.and_then(|name| name.to_str())
.map(ToOwned::to_owned)
.or(Some(token))
}
fn function_call_output_text(value: Option<&Value>) -> Option<String> {
let value = value?;
match value {
Value::String(text) => Some(text.clone()),
Value::Array(items) => function_call_output_items_to_text(items),
Value::Object(object) => function_call_output_text(object.get("body"))
.or_else(|| function_call_output_text(object.get("content")))
.or_else(|| {
object
.get("text")
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.and_then(|text| normalize_non_blank(Some(text)))
}),
_ => None,
}
}
fn function_call_output_items_to_text(items: &[Value]) -> Option<String> {
let lines = items
.iter()
.filter_map(function_call_output_item_text)
.collect::<Vec<_>>();
normalize_non_blank(Some(lines.join("\n")))
}
fn function_call_output_item_text(item: &Value) -> Option<String> {
match item {
Value::String(text) => normalize_non_blank(Some(text.clone())),
Value::Object(object) => object
.get("text")
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.and_then(|text| normalize_non_blank(Some(text)))
.or_else(|| function_call_output_text(object.get("body")))
.or_else(|| function_call_output_text(object.get("content"))),
_ => None,
}
}
fn extract_command_output_text(raw_text: &str) -> Option<String> {
let (_, output) = raw_text.split_once("Output:\n")?;
normalize_non_blank(Some(output.to_string()))
}
fn parse_exit_code(raw_text: &str) -> Option<i64> {
let (_, suffix) = raw_text.split_once("Process exited with code ")?;
suffix
.lines()
.next()
.and_then(|line| line.trim().parse::<i64>().ok())
}
fn status_from_exit_code(exit_code: i64) -> String {
if exit_code == 0 {
"completed".to_string()
} else {
"failed".to_string()
}
}
fn shell_join(args: &[&str]) -> String {
args.iter()
.map(|arg| {
if arg
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '/' | '.' | '_' | '-'))
{
(*arg).to_string()
} else {
format!("'{}'", arg.replace('\'', r#"'\"'\"'"#))
}
})
.collect::<Vec<_>>()
.join(" ")
}
fn trim_shell_token(token: &str) -> String {
token
.trim_matches(|ch| matches!(ch, '"' | '\'' | '`' | '(' | ')' | '[' | ']'))
.to_string()
}
fn normalize_non_blank(value: Option<String>) -> Option<String> {
value.and_then(|value| {
if value.trim().is_empty() {
None
} else {
Some(value)
}
})
}