use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use super::super::{
now_rfc3339, run_replay_oracle_trace, ReplayAllowlistRule, ReplayExpectation, ReplayOracleTrace,
};
use super::api::{crystallize_traces, synthesize_candidate_from_trace};
use super::types::{
CrystallizationAction, CrystallizationArtifacts, CrystallizationCost,
CrystallizationSideEffect, CrystallizationTrace, CrystallizeOptions, WorkflowCandidate,
};
use super::util::hash_bytes;
use crate::value::VmError;
pub const TRAJECTORY_SOURCE: &str = "agent_loop_trajectory";
const DEFAULT_SIMILARITY_THRESHOLD: f64 = 0.5;
const DEFAULT_MIN_SEGMENT_LEN: usize = 2;
const DEFAULT_MAX_SEGMENT_LEN: usize = 12;
const DEFAULT_DIVERGENCE_TOLERANCE: f64 = 0.0;
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct AgentTurnRecord {
pub iteration: usize,
pub session_id: String,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub success: bool,
pub tool_calls: Vec<AgentTurnToolCall>,
pub provider: Option<String>,
pub model: Option<String>,
pub input_tokens: i64,
pub output_tokens: i64,
pub duration_ms: Option<i64>,
pub assistant_text: Option<String>,
pub metadata: BTreeMap<String, JsonValue>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct AgentTurnToolCall {
pub tool_call_id: String,
pub tool_name: String,
pub status: String,
pub raw_input: JsonValue,
pub raw_output: Option<JsonValue>,
pub capabilities: Vec<String>,
pub side_effects: Vec<CrystallizationSideEffect>,
pub duration_ms: Option<i64>,
pub parameters: BTreeMap<String, JsonValue>,
}
impl AgentTurnToolCall {
fn is_completed(&self) -> bool {
self.status.eq_ignore_ascii_case("completed")
}
fn signature(&self) -> String {
let mut parameter_keys = self
.parameters
.keys()
.cloned()
.chain(json_scalar_keys(&self.raw_input))
.collect::<Vec<_>>();
parameter_keys.sort();
parameter_keys.dedup();
format!("tool_call:{}:{}", self.tool_name, parameter_keys.join(","))
}
}
#[derive(Clone, Debug)]
pub struct TrajectoryTap {
session_id: String,
workflow_id: Option<String>,
similarity_threshold: f64,
min_segment_len: usize,
max_segment_len: usize,
replay_allowlist: Option<Vec<ReplayAllowlistRule>>,
}
impl TrajectoryTap {
pub fn new(session_id: impl Into<String>) -> Self {
Self {
session_id: session_id.into(),
workflow_id: None,
similarity_threshold: DEFAULT_SIMILARITY_THRESHOLD,
min_segment_len: DEFAULT_MIN_SEGMENT_LEN,
max_segment_len: DEFAULT_MAX_SEGMENT_LEN,
replay_allowlist: None,
}
}
pub fn with_workflow_id(mut self, workflow_id: impl Into<String>) -> Self {
self.workflow_id = Some(workflow_id.into());
self
}
pub fn with_similarity_threshold(mut self, value: f64) -> Self {
self.similarity_threshold = value.clamp(0.0, 1.0);
self
}
pub fn with_segment_len(mut self, min: usize, max: usize) -> Self {
self.min_segment_len = min.max(1);
self.max_segment_len = max.max(self.min_segment_len);
self
}
pub fn with_replay_allowlist(mut self, rules: Vec<ReplayAllowlistRule>) -> Self {
self.replay_allowlist = Some(rules);
self
}
pub fn collect(&self, turns: &[AgentTurnRecord]) -> Vec<CrystallizationTrace> {
let mut traces = Vec::new();
for segment in self.segment_turns(turns) {
traces.push(self.trace_from_segment(segment));
}
traces
}
fn segment_turns<'a>(&self, turns: &'a [AgentTurnRecord]) -> Vec<&'a [AgentTurnRecord]> {
if turns.is_empty() {
return Vec::new();
}
let mut segments = Vec::new();
let mut cursor = 0;
while cursor < turns.len() {
if !turn_is_successful(&turns[cursor]) {
cursor += 1;
continue;
}
let mut end = cursor + 1;
while end < turns.len()
&& end - cursor < self.max_segment_len
&& turn_is_successful(&turns[end])
&& self.adjacent_similarity(&turns[end - 1], &turns[end])
>= self.similarity_threshold
{
end += 1;
}
if end - cursor >= self.min_segment_len {
segments.push(&turns[cursor..end]);
}
cursor = end;
}
segments
}
fn adjacent_similarity(&self, left: &AgentTurnRecord, right: &AgentTurnRecord) -> f64 {
jaccard_similarity(
&tool_signature_multiset(left),
&tool_signature_multiset(right),
)
}
fn trace_from_segment(&self, turns: &[AgentTurnRecord]) -> CrystallizationTrace {
let segment_index = turns.first().map(|t| t.iteration).unwrap_or(0);
let id = format!(
"{}_trajectory_{}_{}",
self.session_id,
segment_index,
turns.last().map(|t| t.iteration).unwrap_or(segment_index),
);
let started_at = turns.first().and_then(|t| t.started_at.clone());
let finished_at = turns.last().and_then(|t| t.finished_at.clone());
let mut actions = Vec::with_capacity(turns.iter().map(|t| t.tool_calls.len() + 1).sum());
for turn in turns {
actions.push(model_call_action(turn));
for call in &turn.tool_calls {
actions.push(tool_call_action(turn.iteration, call));
}
}
let mut metadata = BTreeMap::new();
metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
metadata.insert("session_id".to_string(), json!(self.session_id));
metadata.insert(
"iteration_span".to_string(),
json!([
segment_index,
turns.last().map(|t| t.iteration).unwrap_or(segment_index)
]),
);
metadata.insert("turn_count".to_string(), json!(turns.len()));
let payload = serde_json::to_vec(&actions).unwrap_or_default();
let replay_allowlist = self
.replay_allowlist
.clone()
.unwrap_or_else(default_trajectory_allowlist);
CrystallizationTrace {
version: 1,
id,
source: Some(TRAJECTORY_SOURCE.to_string()),
source_hash: Some(hash_bytes(&payload)),
workflow_id: self.workflow_id.clone(),
started_at,
finished_at,
actions,
replay_allowlist,
metadata,
..CrystallizationTrace::default()
}
}
}
fn turn_is_successful(turn: &AgentTurnRecord) -> bool {
turn.success && turn.tool_calls.iter().all(AgentTurnToolCall::is_completed)
}
fn tool_signature_multiset(turn: &AgentTurnRecord) -> Vec<String> {
let mut sigs = turn
.tool_calls
.iter()
.map(AgentTurnToolCall::signature)
.collect::<Vec<_>>();
sigs.sort();
sigs
}
fn jaccard_similarity(left: &[String], right: &[String]) -> f64 {
if left.is_empty() && right.is_empty() {
return 1.0;
}
let mut union = left.to_vec();
union.extend(right.iter().cloned());
union.sort();
union.dedup();
let union_len = union.len();
if union_len == 0 {
return 1.0;
}
let mut intersection = 0usize;
let mut right_remaining = right.to_vec();
for sig in left {
if let Some(pos) = right_remaining.iter().position(|other| other == sig) {
right_remaining.swap_remove(pos);
intersection += 1;
}
}
intersection as f64 / union_len as f64
}
fn model_call_action(turn: &AgentTurnRecord) -> CrystallizationAction {
let mut metadata = turn.metadata.clone();
metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
metadata.insert("iteration".to_string(), json!(turn.iteration));
metadata.insert("session_id".to_string(), json!(turn.session_id));
if let Some(provider) = &turn.provider {
metadata.insert("provider".to_string(), json!(provider));
}
let output = turn.assistant_text.as_ref().map(|text| json!(text));
CrystallizationAction {
id: format!("turn_{}", turn.iteration),
kind: "model_call".to_string(),
name: turn
.model
.clone()
.unwrap_or_else(|| "agent_turn".to_string()),
timestamp: turn.started_at.clone(),
inputs: JsonValue::Null,
output: output.clone(),
observed_output: output,
parameters: BTreeMap::new(),
cost: CrystallizationCost {
model: turn.model.clone(),
model_calls: 1,
input_tokens: turn.input_tokens,
output_tokens: turn.output_tokens,
total_cost_usd: 0.0,
wall_ms: turn.duration_ms.unwrap_or_default(),
},
duration_ms: turn.duration_ms,
deterministic: Some(false),
fuzzy: Some(true),
metadata,
..CrystallizationAction::default()
}
}
fn tool_call_action(iteration: usize, call: &AgentTurnToolCall) -> CrystallizationAction {
let mut parameters = call.parameters.clone();
if let JsonValue::Object(map) = &call.raw_input {
for (key, value) in map {
parameters
.entry(key.clone())
.or_insert_with(|| value.clone());
}
}
let mut metadata = BTreeMap::new();
metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
metadata.insert("iteration".to_string(), json!(iteration));
metadata.insert("tool_call_id".to_string(), json!(call.tool_call_id));
metadata.insert("status".to_string(), json!(call.status));
CrystallizationAction {
id: if call.tool_call_id.is_empty() {
format!("turn_{iteration}_{}", call.tool_name)
} else {
call.tool_call_id.clone()
},
kind: "tool_call".to_string(),
name: call.tool_name.clone(),
inputs: call.raw_input.clone(),
output: call.raw_output.clone(),
observed_output: call.raw_output.clone(),
parameters,
side_effects: call.side_effects.clone(),
capabilities: call.capabilities.clone(),
duration_ms: call.duration_ms,
deterministic: Some(true),
fuzzy: Some(false),
metadata,
..CrystallizationAction::default()
}
}
fn json_scalar_keys(value: &JsonValue) -> Vec<String> {
match value {
JsonValue::Object(map) => map.keys().cloned().collect(),
_ => Vec::new(),
}
}
fn default_trajectory_allowlist() -> Vec<ReplayAllowlistRule> {
vec![
ReplayAllowlistRule {
path: "/run_id".to_string(),
reason: "trajectory replay assigns a fresh run id per regeneration".to_string(),
replacement: None,
},
ReplayAllowlistRule {
path: "/effect_receipts/*/iteration".to_string(),
reason: "trajectory regeneration may reseat iteration indices".to_string(),
replacement: None,
},
]
}
pub fn verify_trajectory_candidate(
candidate: &WorkflowCandidate,
original: &CrystallizationTrace,
) -> Result<(), String> {
verify_trajectory_candidate_with_tolerance(candidate, original, DEFAULT_DIVERGENCE_TOLERANCE)
}
fn verify_trajectory_candidate_with_tolerance(
candidate: &WorkflowCandidate,
original: &CrystallizationTrace,
tolerance: f64,
) -> Result<(), String> {
let start_index = candidate
.examples
.iter()
.find(|example| example.trace_id == original.id)
.map(|example| example.start_index)
.or_else(|| super::shadow::find_sequence_start(original, &candidate.sequence_signature))
.ok_or_else(|| {
format!(
"trajectory verifier: candidate sequence not found in trace {}",
original.id
)
})?;
let end = start_index + candidate.steps.len();
if end > original.actions.len() {
return Err(format!(
"trajectory verifier: candidate sequence extends past trace {} actions",
original.id
));
}
let mut deterministic_total = 0usize;
let mut deterministic_diverged = 0usize;
for (offset, step) in candidate.steps.iter().enumerate() {
if !matches!(step.segment, super::types::SegmentKind::Deterministic) {
continue;
}
deterministic_total += 1;
let Some(expected) = &step.expected_output else {
continue;
};
let actual = original.actions[start_index + offset]
.observed_output
.as_ref()
.or(original.actions[start_index + offset].output.as_ref());
if actual != Some(expected) {
deterministic_diverged += 1;
}
}
if deterministic_total > 0 {
let ratio = deterministic_diverged as f64 / deterministic_total as f64;
if ratio > tolerance {
return Err(format!(
"trajectory verifier: {deterministic_diverged}/{deterministic_total} deterministic \
steps diverged from trace {} (tolerance {:.2})",
original.id, tolerance
));
}
}
let Some(first_run) = original.replay_run.as_ref() else {
return Ok(());
};
if first_run.effect_receipts.is_empty() && candidate.expected_receipts.is_empty() {
return Ok(());
}
let mut regenerated = first_run.clone();
regenerated.run_id = format!("trajectory_regen_{}", candidate.id);
regenerated.effect_receipts = candidate.expected_receipts.clone();
let oracle = ReplayOracleTrace {
name: format!("trajectory_verify_{}", candidate.id),
description: Some(
"trajectory tap regenerated-fixture replay check against the source trace".to_string(),
),
expect: ReplayExpectation::Match,
allowlist: original.replay_allowlist.clone(),
first_run: first_run.clone(),
second_run: regenerated,
..ReplayOracleTrace::default()
};
let report = run_replay_oracle_trace(&oracle).map_err(|error| {
format!(
"trajectory verifier: oracle error for {}: {error}",
candidate.id
)
})?;
if !report.passed {
let detail = report
.divergence
.as_ref()
.map(|div| format!("{}: {}", div.path, div.message))
.unwrap_or_else(|| "replay oracle reported failure with no divergence".to_string());
return Err(format!(
"trajectory verifier: regenerated fixture diverged for {}: {detail}",
candidate.id
));
}
Ok(())
}
pub fn ingest_agent_loop_trajectory(
tap: &TrajectoryTap,
turns: &[AgentTurnRecord],
options: CrystallizeOptions,
) -> Result<Option<TrajectoryIngestResult>, VmError> {
let traces = tap.collect(turns);
if traces.is_empty() {
return Ok(None);
}
let needs_synthesis = traces.len() < options.min_examples.max(2);
let (mut artifacts, trace_pool) = if needs_synthesis {
let trace_pool = traces.clone();
let mut iter = traces.into_iter();
let primary = iter.next().expect("non-empty by check above");
let dropped_from_synthesis: Vec<String> = iter.map(|t| t.id).collect();
if !dropped_from_synthesis.is_empty() {
tracing::warn!(
target: "harn_vm::crystallize::trajectory",
primary_trace_id = %primary.id,
dropped_trace_ids = ?dropped_from_synthesis,
min_examples = options.min_examples,
segment_count = trace_pool.len(),
"trajectory synthesis kept only the first trace; \
remaining traces are surfaced via TrajectoryIngestResult.traces \
but are not part of the synthesized candidate"
);
}
let artifacts = synthesize_candidate_from_trace(primary, options, Vec::new(), None, None)?;
(artifacts, trace_pool)
} else {
let trace_pool = traces.clone();
let artifacts = crystallize_traces(traces, options)?;
(artifacts, trace_pool)
};
apply_trajectory_verifier(&mut artifacts, &trace_pool);
Ok(Some(TrajectoryIngestResult {
artifacts,
traces: trace_pool,
}))
}
#[derive(Clone, Debug)]
pub struct TrajectoryIngestResult {
pub artifacts: CrystallizationArtifacts,
pub traces: Vec<CrystallizationTrace>,
}
pub fn apply_trajectory_verifier(
artifacts: &mut CrystallizationArtifacts,
traces: &[CrystallizationTrace],
) {
let mut moved_ids = Vec::new();
for candidate in &mut artifacts.report.candidates {
for example in candidate.examples.clone() {
let Some(trace) = traces.iter().find(|trace| trace.id == example.trace_id) else {
continue;
};
if let Err(reason) = verify_trajectory_candidate(candidate, trace) {
candidate.rejection_reasons.push(reason);
moved_ids.push(candidate.id.clone());
break;
}
}
}
if moved_ids.is_empty() {
return;
}
let mut keep = Vec::new();
for candidate in std::mem::take(&mut artifacts.report.candidates) {
if moved_ids.contains(&candidate.id) {
artifacts.report.rejected_candidates.push(candidate);
} else {
keep.push(candidate);
}
}
artifacts.report.candidates = keep;
if artifacts
.report
.selected_candidate_id
.as_ref()
.is_some_and(|id| moved_ids.contains(id))
{
artifacts.report.selected_candidate_id = artifacts
.report
.candidates
.first()
.map(|candidate| candidate.id.clone());
if let Some(candidate) = artifacts.report.candidates.first() {
artifacts.harn_code = super::codegen::generate_harn_code(candidate);
artifacts.eval_pack_toml = super::codegen::generate_eval_pack(candidate);
} else {
artifacts.harn_code =
super::codegen::rejected_workflow_stub(&artifacts.report.rejected_candidates);
artifacts.eval_pack_toml.clear();
}
}
}
pub fn turn_record(
iteration: usize,
session_id: impl Into<String>,
tool_calls: Vec<AgentTurnToolCall>,
) -> AgentTurnRecord {
AgentTurnRecord {
iteration,
session_id: session_id.into(),
success: true,
tool_calls,
started_at: Some(now_rfc3339()),
finished_at: Some(now_rfc3339()),
..AgentTurnRecord::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn call(name: &str, params: &[(&str, JsonValue)]) -> AgentTurnToolCall {
let mut parameters = BTreeMap::new();
let mut raw = serde_json::Map::new();
for (key, value) in params {
parameters.insert((*key).to_string(), value.clone());
raw.insert((*key).to_string(), value.clone());
}
AgentTurnToolCall {
tool_call_id: format!("call_{name}"),
tool_name: name.to_string(),
status: "completed".to_string(),
raw_input: JsonValue::Object(raw),
raw_output: Some(json!({"ok": true})),
parameters,
duration_ms: Some(10),
..AgentTurnToolCall::default()
}
}
fn turn(iteration: usize, calls: Vec<AgentTurnToolCall>) -> AgentTurnRecord {
AgentTurnRecord {
iteration,
session_id: "session-test".to_string(),
success: true,
tool_calls: calls,
input_tokens: 100,
output_tokens: 50,
duration_ms: Some(20),
assistant_text: Some("ok".to_string()),
..AgentTurnRecord::default()
}
}
#[test]
fn collects_consecutive_successful_turns_into_segments() {
let turns = vec![
turn(1, vec![call("git_status", &[("path", json!("."))])]),
turn(2, vec![call("git_status", &[("path", json!("."))])]),
AgentTurnRecord {
success: false,
..turn(3, vec![call("git_status", &[("path", json!("."))])])
},
turn(4, vec![call("git_log", &[("path", json!("."))])]),
turn(5, vec![call("git_log", &[("path", json!("."))])]),
];
let tap = TrajectoryTap::new("s1");
let traces = tap.collect(&turns);
assert_eq!(traces.len(), 2);
assert!(traces
.iter()
.all(|trace| trace.source.as_deref() == Some(TRAJECTORY_SOURCE)));
assert!(traces
.iter()
.all(|trace| trace.metadata.get("source") == Some(&json!(TRAJECTORY_SOURCE))));
}
#[test]
fn splits_segment_when_signatures_diverge() {
let turns = vec![
turn(1, vec![call("git_status", &[("path", json!("."))])]),
turn(2, vec![call("git_status", &[("path", json!("."))])]),
turn(3, vec![call("git_diff", &[("path", json!("."))])]),
turn(4, vec![call("git_diff", &[("path", json!("."))])]),
];
let tap = TrajectoryTap::new("s2").with_similarity_threshold(1.0);
let traces = tap.collect(&turns);
assert_eq!(traces.len(), 2, "expected one segment per signature group");
}
#[test]
fn segment_shorter_than_minimum_is_dropped() {
let turns = vec![turn(1, vec![call("git_status", &[("path", json!("."))])])];
let tap = TrajectoryTap::new("s3");
assert!(tap.collect(&turns).is_empty());
}
#[test]
fn collect_honors_custom_replay_allowlist() {
let turns = vec![
turn(1, vec![call("git_status", &[("path", json!("."))])]),
turn(2, vec![call("git_status", &[("path", json!("."))])]),
];
let custom = vec![
ReplayAllowlistRule {
path: "/effect_receipts/*/timestamp".to_string(),
reason: "test override".to_string(),
replacement: None,
},
ReplayAllowlistRule {
path: "/custom_field".to_string(),
reason: "test override".to_string(),
replacement: None,
},
];
let tap = TrajectoryTap::new("s-allowlist").with_replay_allowlist(custom.clone());
let traces = tap.collect(&turns);
assert_eq!(traces.len(), 1);
assert_eq!(
traces[0].replay_allowlist, custom,
"custom allowlist should be honored verbatim, not overridden by the default"
);
let default_tap = TrajectoryTap::new("s-default");
let default_traces = default_tap.collect(&turns);
assert_eq!(default_traces.len(), 1);
assert_eq!(
default_traces[0].replay_allowlist,
default_trajectory_allowlist()
);
}
#[test]
fn verifier_passes_on_clean_candidate() {
let turns = vec![
turn(1, vec![call("git_status", &[("path", json!("."))])]),
turn(2, vec![call("git_status", &[("path", json!("."))])]),
];
let tap = TrajectoryTap::new("s4");
let result = ingest_agent_loop_trajectory(
&tap,
&turns,
CrystallizeOptions {
min_examples: 1,
workflow_name: Some("verifier_clean".to_string()),
..CrystallizeOptions::default()
},
)
.expect("ingest")
.expect("at least one trace");
assert!(
!result.artifacts.report.candidates.is_empty(),
"expected at least one accepted candidate"
);
}
}