use std::collections::HashSet;
use std::path::PathBuf;
use toolpath_codex::provider::to_view;
use toolpath_codex::{ResponseItem, RolloutItem, RolloutReader, derive};
use toolpath_convo::Role;
fn fixture_path() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/sample-codex-python.jsonl")
}
fn session() -> toolpath_codex::Session {
RolloutReader::read_session(fixture_path()).unwrap()
}
fn derived() -> toolpath::v1::Path {
derive::derive_path(&session(), &derive::DeriveConfig::default())
}
#[test]
fn all_steps_have_non_empty_timestamp() {
let path = derived();
for s in &path.steps {
assert!(
!s.step.timestamp.is_empty(),
"step {} has empty timestamp",
s.step.id
);
}
}
#[test]
fn step_timestamps_match_source_message_lines() {
let s = session();
let path = derive::derive_path(&s, &derive::DeriveConfig::default());
let step_timestamps: HashSet<&str> = path
.steps
.iter()
.map(|st| st.step.timestamp.as_str())
.collect();
let mut missing: Vec<String> = Vec::new();
for line in &s.lines {
if let RolloutItem::ResponseItem(ResponseItem::Message(_)) = line.item()
&& !step_timestamps.contains(line.timestamp.as_str())
{
missing.push(line.timestamp.clone());
}
}
assert!(
missing.is_empty(),
"source message timestamps missing from derived path: {:?}",
missing
);
}
#[test]
fn turn_timestamps_match_source_message_lines() {
let s = session();
let view = to_view(&s);
let turn_timestamps: HashSet<&str> = view.turns.iter().map(|t| t.timestamp.as_str()).collect();
for line in &s.lines {
if let RolloutItem::ResponseItem(ResponseItem::Message(_)) = line.item() {
assert!(
turn_timestamps.contains(line.timestamp.as_str()),
"source message line {} has no matching Turn",
line.timestamp
);
}
}
}
#[test]
fn parent_chain_is_linear_and_in_order() {
let path = derived();
let positions: std::collections::HashMap<&str, usize> = path
.steps
.iter()
.enumerate()
.map(|(i, s)| (s.step.id.as_str(), i))
.collect();
for (i, step) in path.steps.iter().enumerate() {
assert!(
step.step.parents.len() <= 1,
"step {} has {} parents — codex is expected to be linear",
step.step.id,
step.step.parents.len()
);
for parent in &step.step.parents {
let pi = positions.get(parent.as_str()).unwrap_or_else(|| {
panic!("step {} references missing parent {}", step.step.id, parent)
});
assert!(
*pi < i,
"step {} (index {}) references parent {} (index {}) — out of order",
step.step.id,
i,
parent,
pi
);
}
}
}
#[test]
fn head_equals_last_step_id() {
let path = derived();
let last = path.steps.last().expect("path has steps");
assert_eq!(path.path.head, last.step.id);
}
#[test]
fn actor_scheme_matches_source_role() {
let s = session();
let view = to_view(&s);
let path = derive::derive_path(&s, &derive::DeriveConfig::default());
let user_seen = view.turns.iter().any(|t| t.role == Role::User);
let assistant_seen = view.turns.iter().any(|t| t.role == Role::Assistant);
let system_seen = view.turns.iter().any(|t| t.role == Role::System);
let prefixes: HashSet<&str> = path
.steps
.iter()
.map(|s| s.step.actor.split(':').next().unwrap_or(""))
.collect();
if user_seen {
assert!(prefixes.contains("human"), "no step has a human:* actor");
}
if assistant_seen {
assert!(prefixes.contains("agent"), "no step has an agent:* actor");
}
if system_seen {
assert!(prefixes.contains("system"), "no step has a system:* actor");
}
}
fn collect_derived_tool_call_ids(path: &toolpath::v1::Path) -> HashSet<String> {
let mut ids = HashSet::new();
for step in &path.steps {
for change in step.change.values() {
let Some(struc) = change.structural.as_ref() else {
continue;
};
let Some(calls) = struc.extra.get("tool_calls") else {
continue;
};
let Some(arr) = calls.as_array() else {
continue;
};
for c in arr {
if let Some(id) = c.get("call_id").and_then(|v| v.as_str()) {
ids.insert(id.to_string());
}
}
}
}
ids
}
#[test]
fn every_function_call_call_id_surfaces_in_steps() {
let s = session();
let path = derive::derive_path(&s, &derive::DeriveConfig::default());
let derived_ids = collect_derived_tool_call_ids(&path);
for line in &s.lines {
match line.item() {
RolloutItem::ResponseItem(ResponseItem::FunctionCall(fc)) => {
assert!(
derived_ids.contains(&fc.call_id),
"function_call {} missing from derived path",
fc.call_id
);
}
RolloutItem::ResponseItem(ResponseItem::CustomToolCall(ct)) => {
assert!(
derived_ids.contains(&ct.call_id),
"custom_tool_call {} missing from derived path",
ct.call_id
);
}
_ => {}
}
}
}
#[test]
fn function_call_arguments_preserved_in_view() {
let s = session();
let view = to_view(&s);
let mut tool_by_id: std::collections::HashMap<&str, &toolpath_convo::ToolInvocation> =
std::collections::HashMap::new();
for t in &view.turns {
for tu in &t.tool_uses {
tool_by_id.insert(tu.id.as_str(), tu);
}
}
for line in &s.lines {
if let RolloutItem::ResponseItem(ResponseItem::FunctionCall(fc)) = line.item() {
let tu = tool_by_id
.get(fc.call_id.as_str())
.unwrap_or_else(|| panic!("function_call {} missing from view", fc.call_id));
assert_eq!(
tu.name, fc.name,
"tool invocation {} has wrong name",
fc.call_id
);
let raw_match = tu
.input
.get("raw_arguments")
.and_then(|v| v.as_str())
.map(|s| s == fc.arguments)
.unwrap_or(false);
let parsed_match = serde_json::from_str::<serde_json::Value>(&fc.arguments)
.ok()
.map(|v| v == tu.input)
.unwrap_or(false);
let raw_eq_input = tu
.input
.as_str()
.map(|s| s == fc.arguments)
.unwrap_or(false);
assert!(
raw_match || parsed_match || raw_eq_input,
"function_call {} arguments not preserved: source={:?}, got input={:?}",
fc.call_id,
fc.arguments,
tu.input
);
}
}
}
#[test]
fn patch_apply_files_all_surface_as_artifacts() {
let s = session();
let path = derive::derive_path(&s, &derive::DeriveConfig::default());
let artifact_keys: HashSet<&str> = path
.steps
.iter()
.flat_map(|s| s.change.keys().map(|k| k.as_str()))
.collect();
for line in &s.lines {
if let RolloutItem::EventMsg(toolpath_codex::EventMsg::PatchApplyEnd(patch)) = line.item() {
if !patch.success {
continue;
}
for file_path in patch.changes.keys() {
assert!(
artifact_keys.contains(file_path.as_str()),
"file {} from successful patch_apply_end not found in derived artifacts",
file_path
);
}
}
}
}