Skip to main content

harn_vm/orchestration/crystallize/
trajectory.rs

1//! Trajectory tap: ingest `agent_loop` turn-level records as candidate
2//! crystallization sources, alongside the Code Mode composition snippets
3//! already handled by [`super::api::crystallize_traces`].
4//!
5//! Background. Closed harn#1622 fed repeated Code Mode composition
6//! snippets into the crystallization pipeline. Closed harn#2436 extends
7//! the same `bundle → codegen → shadow → normalize` machinery to a
8//! second candidate source: turn-level trajectories emitted by
9//! `agent_loop`. Trajectory candidates carry source field
10//! [`TRAJECTORY_SOURCE`] (`"agent_loop_trajectory"`) so downstream
11//! consumers (cloud importers, receipt viewers) can distinguish them
12//! from composition-derived candidates.
13//!
14//! The tap is intentionally narrow: it groups *consecutive successful
15//! turns* by their tool-call signature similarity into one or more
16//! candidate segments. Each segment becomes a
17//! [`CrystallizationTrace`] with `source = "agent_loop_trajectory"`
18//! and per-action metadata so the existing mining and shadow pipelines
19//! treat the trace identically to any other source. The replay
20//! verifier ([`verify_trajectory_candidate`]) re-derives a regenerated
21//! fixture from the candidate steps and runs the existing replay oracle
22//! against the original trace; if the deterministic outputs diverge the
23//! candidate is rejected before it reaches `bundle`.
24//!
25//! `agent_loop` itself emits per-turn events through the event log
26//! (`AgentEvent::IterationEnd`, `ToolCall`, `ToolCallUpdate`, etc.).
27//! A future patch can build [`AgentTurnRecord`] values directly from
28//! those events; this module takes them already projected so callers
29//! (replay drivers, CLI subcommands, integration tests) can construct
30//! synthetic trajectories without standing up an entire agent runtime.
31
32use std::collections::BTreeMap;
33
34use serde::{Deserialize, Serialize};
35use serde_json::{json, Value as JsonValue};
36
37use super::super::{
38    now_rfc3339, run_replay_oracle_trace, ReplayAllowlistRule, ReplayExpectation, ReplayOracleTrace,
39};
40use super::api::{crystallize_traces, synthesize_candidate_from_trace};
41use super::types::{
42    CrystallizationAction, CrystallizationArtifacts, CrystallizationCost,
43    CrystallizationSideEffect, CrystallizationTrace, CrystallizeOptions, WorkflowCandidate,
44};
45use super::util::hash_bytes;
46use crate::value::VmError;
47
48/// Source marker stamped on trajectory-derived traces and bundle
49/// receipts. Consumers (cloud importers, receipt viewers) use this to
50/// distinguish trajectory candidates from `code_mode_composition`
51/// candidates and from release-fixture-derived single-trace candidates.
52pub const TRAJECTORY_SOURCE: &str = "agent_loop_trajectory";
53
54/// Default similarity threshold for grouping consecutive successful
55/// turns. Two adjacent turns are part of the same segment if their
56/// tool-call signature multisets overlap by at least this Jaccard
57/// coefficient. Tuned for merge-captain-style workloads where the same
58/// 2-3 tools recur across turns. Callers can override via
59/// [`TrajectoryTap::with_similarity_threshold`].
60const DEFAULT_SIMILARITY_THRESHOLD: f64 = 0.5;
61
62/// Default minimum length for a segment to be emitted as a trace. Below
63/// this we treat the run as too short to crystallize. The crystallize
64/// pipeline itself enforces `DEFAULT_MIN_EXAMPLES` across traces, but
65/// the per-segment floor keeps single-turn noise out of the trace pool.
66const DEFAULT_MIN_SEGMENT_LEN: usize = 2;
67
68/// Maximum length for a segment. Beyond this we split into two segments
69/// so a runaway trace can't blow the crystallization budget. Matches the
70/// cap in [`super::normalize::best_repeated_sequence`].
71const DEFAULT_MAX_SEGMENT_LEN: usize = 12;
72
73/// Tolerance for the replay verifier. We count how many deterministic
74/// steps diverge between the regenerated fixture and the original
75/// trace; if more than this fraction diverges the candidate is rejected
76/// rather than just warned about.
77const DEFAULT_DIVERGENCE_TOLERANCE: f64 = 0.0;
78
79/// Per-turn snapshot of an `agent_loop` round-trip. One record covers
80/// one model call plus the tool calls dispatched in response to it.
81/// Built by replay drivers or by walking
82/// [`crate::agent_events::AgentEvent`] streams.
83#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
84#[serde(default)]
85pub struct AgentTurnRecord {
86    pub iteration: usize,
87    pub session_id: String,
88    pub started_at: Option<String>,
89    pub finished_at: Option<String>,
90    /// `true` when the turn produced no failed tool calls and no
91    /// terminal error from the loop. Failed turns split a segment: a
92    /// run of successful turns interrupted by a failure becomes two
93    /// candidate segments rather than one.
94    pub success: bool,
95    pub tool_calls: Vec<AgentTurnToolCall>,
96    pub provider: Option<String>,
97    pub model: Option<String>,
98    pub input_tokens: i64,
99    pub output_tokens: i64,
100    pub duration_ms: Option<i64>,
101    /// Final assistant text emitted on the turn, if any. Used as the
102    /// observed output for the synthesized `model_call` action so the
103    /// shadow oracle can compare deterministic prefixes across replays.
104    pub assistant_text: Option<String>,
105    /// Free-form metadata copied into the synthesized model_call
106    /// action. Trajectory callers commonly inject `goal`,
107    /// `success_criteria`, and any policy decisions taken during the
108    /// turn. The collector never overwrites the source field —
109    /// [`TRAJECTORY_SOURCE`] is always stamped on top regardless.
110    pub metadata: BTreeMap<String, JsonValue>,
111}
112
113/// Per-tool-call snapshot used by [`AgentTurnRecord`]. Mirrors the
114/// fields the existing [`CrystallizationAction`] pipeline already
115/// consumes so the trajectory tap stays a thin adapter.
116#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
117#[serde(default)]
118pub struct AgentTurnToolCall {
119    pub tool_call_id: String,
120    pub tool_name: String,
121    /// Status string from `ToolCallUpdate.status` (`completed`,
122    /// `failed`, …). Anything other than `completed` excludes the
123    /// containing turn from the success run.
124    pub status: String,
125    pub raw_input: JsonValue,
126    pub raw_output: Option<JsonValue>,
127    pub capabilities: Vec<String>,
128    pub side_effects: Vec<CrystallizationSideEffect>,
129    pub duration_ms: Option<i64>,
130    /// Caller-provided scalar parameter map. The mining pipeline
131    /// extracts varying scalar values from `raw_input` automatically;
132    /// this map is for callers who already have a typed projection
133    /// (e.g. release identity fields, classifier labels) and want
134    /// them to surface as first-class workflow parameters.
135    pub parameters: BTreeMap<String, JsonValue>,
136}
137
138impl AgentTurnToolCall {
139    fn is_completed(&self) -> bool {
140        self.status.eq_ignore_ascii_case("completed")
141    }
142
143    fn signature(&self) -> String {
144        // Match `action_signature` shape so two trajectory traces
145        // collapse onto the same sequence_signature inside
146        // `mine_candidates` when their tool name + parameter keys
147        // align.
148        let mut parameter_keys = self
149            .parameters
150            .keys()
151            .cloned()
152            .chain(json_scalar_keys(&self.raw_input))
153            .collect::<Vec<_>>();
154        parameter_keys.sort();
155        parameter_keys.dedup();
156        format!("tool_call:{}:{}", self.tool_name, parameter_keys.join(","))
157    }
158}
159
160/// Collector that converts a slice of [`AgentTurnRecord`]s into
161/// trajectory-derived [`CrystallizationTrace`]s. Held as a struct so
162/// callers can configure thresholds without a long argument list.
163#[derive(Clone, Debug)]
164pub struct TrajectoryTap {
165    session_id: String,
166    workflow_id: Option<String>,
167    similarity_threshold: f64,
168    min_segment_len: usize,
169    max_segment_len: usize,
170    /// Caller-supplied replay allowlist. When `None` we fall back to
171    /// [`default_trajectory_allowlist`]; when `Some` the caller-supplied
172    /// value is honored verbatim. This lets richer policies (e.g. extra
173    /// per-receipt id paths) flow through without being silently dropped
174    /// by the trace builder.
175    replay_allowlist: Option<Vec<ReplayAllowlistRule>>,
176}
177
178impl TrajectoryTap {
179    pub fn new(session_id: impl Into<String>) -> Self {
180        Self {
181            session_id: session_id.into(),
182            workflow_id: None,
183            similarity_threshold: DEFAULT_SIMILARITY_THRESHOLD,
184            min_segment_len: DEFAULT_MIN_SEGMENT_LEN,
185            max_segment_len: DEFAULT_MAX_SEGMENT_LEN,
186            replay_allowlist: None,
187        }
188    }
189
190    pub fn with_workflow_id(mut self, workflow_id: impl Into<String>) -> Self {
191        self.workflow_id = Some(workflow_id.into());
192        self
193    }
194
195    pub fn with_similarity_threshold(mut self, value: f64) -> Self {
196        self.similarity_threshold = value.clamp(0.0, 1.0);
197        self
198    }
199
200    pub fn with_segment_len(mut self, min: usize, max: usize) -> Self {
201        self.min_segment_len = min.max(1);
202        self.max_segment_len = max.max(self.min_segment_len);
203        self
204    }
205
206    /// Override the replay allowlist applied to traces produced by this
207    /// tap. Pass the full set of rules — the default rules are not
208    /// implicitly merged in.
209    pub fn with_replay_allowlist(mut self, rules: Vec<ReplayAllowlistRule>) -> Self {
210        self.replay_allowlist = Some(rules);
211        self
212    }
213
214    /// Group consecutive successful turns into one or more trajectory
215    /// traces. Returns an empty vec when no run of successful turns
216    /// reaches `min_segment_len`.
217    pub fn collect(&self, turns: &[AgentTurnRecord]) -> Vec<CrystallizationTrace> {
218        let mut traces = Vec::new();
219        for segment in self.segment_turns(turns) {
220            traces.push(self.trace_from_segment(segment));
221        }
222        traces
223    }
224
225    fn segment_turns<'a>(&self, turns: &'a [AgentTurnRecord]) -> Vec<&'a [AgentTurnRecord]> {
226        if turns.is_empty() {
227            return Vec::new();
228        }
229        let mut segments = Vec::new();
230        let mut cursor = 0;
231        while cursor < turns.len() {
232            if !turn_is_successful(&turns[cursor]) {
233                cursor += 1;
234                continue;
235            }
236            let mut end = cursor + 1;
237            while end < turns.len()
238                && end - cursor < self.max_segment_len
239                && turn_is_successful(&turns[end])
240                && self.adjacent_similarity(&turns[end - 1], &turns[end])
241                    >= self.similarity_threshold
242            {
243                end += 1;
244            }
245            if end - cursor >= self.min_segment_len {
246                segments.push(&turns[cursor..end]);
247            }
248            cursor = end;
249        }
250        segments
251    }
252
253    fn adjacent_similarity(&self, left: &AgentTurnRecord, right: &AgentTurnRecord) -> f64 {
254        jaccard_similarity(
255            &tool_signature_multiset(left),
256            &tool_signature_multiset(right),
257        )
258    }
259
260    fn trace_from_segment(&self, turns: &[AgentTurnRecord]) -> CrystallizationTrace {
261        let segment_index = turns.first().map(|t| t.iteration).unwrap_or(0);
262        let id = format!(
263            "{}_trajectory_{}_{}",
264            self.session_id,
265            segment_index,
266            turns.last().map(|t| t.iteration).unwrap_or(segment_index),
267        );
268        let started_at = turns.first().and_then(|t| t.started_at.clone());
269        let finished_at = turns.last().and_then(|t| t.finished_at.clone());
270        let mut actions = Vec::with_capacity(turns.iter().map(|t| t.tool_calls.len() + 1).sum());
271        for turn in turns {
272            actions.push(model_call_action(turn));
273            for call in &turn.tool_calls {
274                actions.push(tool_call_action(turn.iteration, call));
275            }
276        }
277
278        let mut metadata = BTreeMap::new();
279        metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
280        metadata.insert("session_id".to_string(), json!(self.session_id));
281        metadata.insert(
282            "iteration_span".to_string(),
283            json!([
284                segment_index,
285                turns.last().map(|t| t.iteration).unwrap_or(segment_index)
286            ]),
287        );
288        metadata.insert("turn_count".to_string(), json!(turns.len()));
289
290        let payload = serde_json::to_vec(&actions).unwrap_or_default();
291        let replay_allowlist = self
292            .replay_allowlist
293            .clone()
294            .unwrap_or_else(default_trajectory_allowlist);
295        CrystallizationTrace {
296            version: 1,
297            id,
298            source: Some(TRAJECTORY_SOURCE.to_string()),
299            source_hash: Some(hash_bytes(&payload)),
300            workflow_id: self.workflow_id.clone(),
301            started_at,
302            finished_at,
303            actions,
304            replay_allowlist,
305            metadata,
306            ..CrystallizationTrace::default()
307        }
308    }
309}
310
311fn turn_is_successful(turn: &AgentTurnRecord) -> bool {
312    turn.success && turn.tool_calls.iter().all(AgentTurnToolCall::is_completed)
313}
314
315fn tool_signature_multiset(turn: &AgentTurnRecord) -> Vec<String> {
316    let mut sigs = turn
317        .tool_calls
318        .iter()
319        .map(AgentTurnToolCall::signature)
320        .collect::<Vec<_>>();
321    sigs.sort();
322    sigs
323}
324
325fn jaccard_similarity(left: &[String], right: &[String]) -> f64 {
326    if left.is_empty() && right.is_empty() {
327        // Two turns with no tool calls (pure assistant chat) count as
328        // similar — they trivially share the empty signature set.
329        return 1.0;
330    }
331    let mut union = left.to_vec();
332    union.extend(right.iter().cloned());
333    union.sort();
334    union.dedup();
335    let union_len = union.len();
336    if union_len == 0 {
337        return 1.0;
338    }
339    let mut intersection = 0usize;
340    let mut right_remaining = right.to_vec();
341    for sig in left {
342        if let Some(pos) = right_remaining.iter().position(|other| other == sig) {
343            right_remaining.swap_remove(pos);
344            intersection += 1;
345        }
346    }
347    intersection as f64 / union_len as f64
348}
349
350fn model_call_action(turn: &AgentTurnRecord) -> CrystallizationAction {
351    let mut metadata = turn.metadata.clone();
352    metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
353    metadata.insert("iteration".to_string(), json!(turn.iteration));
354    metadata.insert("session_id".to_string(), json!(turn.session_id));
355    if let Some(provider) = &turn.provider {
356        metadata.insert("provider".to_string(), json!(provider));
357    }
358    let output = turn.assistant_text.as_ref().map(|text| json!(text));
359    CrystallizationAction {
360        id: format!("turn_{}", turn.iteration),
361        kind: "model_call".to_string(),
362        name: turn
363            .model
364            .clone()
365            .unwrap_or_else(|| "agent_turn".to_string()),
366        timestamp: turn.started_at.clone(),
367        inputs: JsonValue::Null,
368        output: output.clone(),
369        observed_output: output,
370        parameters: BTreeMap::new(),
371        cost: CrystallizationCost {
372            model: turn.model.clone(),
373            model_calls: 1,
374            input_tokens: turn.input_tokens,
375            output_tokens: turn.output_tokens,
376            total_cost_usd: 0.0,
377            wall_ms: turn.duration_ms.unwrap_or_default(),
378        },
379        duration_ms: turn.duration_ms,
380        deterministic: Some(false),
381        fuzzy: Some(true),
382        metadata,
383        ..CrystallizationAction::default()
384    }
385}
386
387fn tool_call_action(iteration: usize, call: &AgentTurnToolCall) -> CrystallizationAction {
388    let mut parameters = call.parameters.clone();
389    if let JsonValue::Object(map) = &call.raw_input {
390        for (key, value) in map {
391            parameters
392                .entry(key.clone())
393                .or_insert_with(|| value.clone());
394        }
395    }
396    let mut metadata = BTreeMap::new();
397    metadata.insert("source".to_string(), json!(TRAJECTORY_SOURCE));
398    metadata.insert("iteration".to_string(), json!(iteration));
399    metadata.insert("tool_call_id".to_string(), json!(call.tool_call_id));
400    metadata.insert("status".to_string(), json!(call.status));
401    CrystallizationAction {
402        id: if call.tool_call_id.is_empty() {
403            format!("turn_{iteration}_{}", call.tool_name)
404        } else {
405            call.tool_call_id.clone()
406        },
407        kind: "tool_call".to_string(),
408        name: call.tool_name.clone(),
409        inputs: call.raw_input.clone(),
410        output: call.raw_output.clone(),
411        observed_output: call.raw_output.clone(),
412        parameters,
413        side_effects: call.side_effects.clone(),
414        capabilities: call.capabilities.clone(),
415        duration_ms: call.duration_ms,
416        deterministic: Some(true),
417        fuzzy: Some(false),
418        metadata,
419        ..CrystallizationAction::default()
420    }
421}
422
423fn json_scalar_keys(value: &JsonValue) -> Vec<String> {
424    match value {
425        JsonValue::Object(map) => map.keys().cloned().collect(),
426        _ => Vec::new(),
427    }
428}
429
430fn default_trajectory_allowlist() -> Vec<ReplayAllowlistRule> {
431    vec![
432        ReplayAllowlistRule {
433            path: "/run_id".to_string(),
434            reason: "trajectory replay assigns a fresh run id per regeneration".to_string(),
435            replacement: None,
436        },
437        ReplayAllowlistRule {
438            path: "/effect_receipts/*/iteration".to_string(),
439            reason: "trajectory regeneration may reseat iteration indices".to_string(),
440            replacement: None,
441        },
442    ]
443}
444
445/// Verifier gate for trajectory-derived candidates. Re-derives a
446/// "regenerated fixture" from the candidate's deterministic steps and
447/// runs the existing replay oracle against the original trace. The
448/// oracle catches divergence at the receipt level; this wrapper adds a
449/// tolerance check on the action-level outputs so a single noisy
450/// fuzzy step doesn't tip the whole candidate to rejected.
451///
452/// Returns `Ok(())` when the candidate is safe to keep, or an error
453/// string suitable for pushing onto
454/// [`WorkflowCandidate::rejection_reasons`].
455pub fn verify_trajectory_candidate(
456    candidate: &WorkflowCandidate,
457    original: &CrystallizationTrace,
458) -> Result<(), String> {
459    verify_trajectory_candidate_with_tolerance(candidate, original, DEFAULT_DIVERGENCE_TOLERANCE)
460}
461
462fn verify_trajectory_candidate_with_tolerance(
463    candidate: &WorkflowCandidate,
464    original: &CrystallizationTrace,
465    tolerance: f64,
466) -> Result<(), String> {
467    // 1. Sequence-signature check: the candidate's signature must
468    //    appear inside the original trace at the recorded start index
469    //    (or anywhere, if no example points to this trace). If we can't
470    //    relocate the sequence the candidate isn't actually derived
471    //    from this trace and shouldn't be promoted from it.
472    let start_index = candidate
473        .examples
474        .iter()
475        .find(|example| example.trace_id == original.id)
476        .map(|example| example.start_index)
477        .or_else(|| super::shadow::find_sequence_start(original, &candidate.sequence_signature))
478        .ok_or_else(|| {
479            format!(
480                "trajectory verifier: candidate sequence not found in trace {}",
481                original.id
482            )
483        })?;
484
485    let end = start_index + candidate.steps.len();
486    if end > original.actions.len() {
487        return Err(format!(
488            "trajectory verifier: candidate sequence extends past trace {} actions",
489            original.id
490        ));
491    }
492
493    // 2. Deterministic-output check with tolerance. The shadow path
494    //    already does a strict comparison; this is the trajectory
495    //    relaxation so an LLM-rewritten transient string doesn't fail
496    //    the whole gate.
497    let mut deterministic_total = 0usize;
498    let mut deterministic_diverged = 0usize;
499    for (offset, step) in candidate.steps.iter().enumerate() {
500        if !matches!(step.segment, super::types::SegmentKind::Deterministic) {
501            continue;
502        }
503        deterministic_total += 1;
504        let Some(expected) = &step.expected_output else {
505            continue;
506        };
507        let actual = original.actions[start_index + offset]
508            .observed_output
509            .as_ref()
510            .or(original.actions[start_index + offset].output.as_ref());
511        if actual != Some(expected) {
512            deterministic_diverged += 1;
513        }
514    }
515    if deterministic_total > 0 {
516        let ratio = deterministic_diverged as f64 / deterministic_total as f64;
517        if ratio > tolerance {
518            return Err(format!(
519                "trajectory verifier: {deterministic_diverged}/{deterministic_total} deterministic \
520                 steps diverged from trace {} (tolerance {:.2})",
521                original.id, tolerance
522            ));
523        }
524    }
525
526    // 3. Replay oracle on the regenerated fixture. We synthesize a
527    //    minimal `ReplayTraceRun` from the candidate's expected
528    //    receipts and compare it against the trace's recorded replay
529    //    run (when present). Traces with no replay run skip the
530    //    oracle — there's nothing to compare against.
531    let Some(first_run) = original.replay_run.as_ref() else {
532        return Ok(());
533    };
534    if first_run.effect_receipts.is_empty() && candidate.expected_receipts.is_empty() {
535        return Ok(());
536    }
537    let mut regenerated = first_run.clone();
538    regenerated.run_id = format!("trajectory_regen_{}", candidate.id);
539    regenerated.effect_receipts = candidate.expected_receipts.clone();
540    let oracle = ReplayOracleTrace {
541        name: format!("trajectory_verify_{}", candidate.id),
542        description: Some(
543            "trajectory tap regenerated-fixture replay check against the source trace".to_string(),
544        ),
545        expect: ReplayExpectation::Match,
546        allowlist: original.replay_allowlist.clone(),
547        first_run: first_run.clone(),
548        second_run: regenerated,
549        ..ReplayOracleTrace::default()
550    };
551    let report = run_replay_oracle_trace(&oracle).map_err(|error| {
552        format!(
553            "trajectory verifier: oracle error for {}: {error}",
554            candidate.id
555        )
556    })?;
557    if !report.passed {
558        let detail = report
559            .divergence
560            .as_ref()
561            .map(|div| format!("{}: {}", div.path, div.message))
562            .unwrap_or_else(|| "replay oracle reported failure with no divergence".to_string());
563        return Err(format!(
564            "trajectory verifier: regenerated fixture diverged for {}: {detail}",
565            candidate.id
566        ));
567    }
568    Ok(())
569}
570
571/// Top-level convenience: collect trajectories from `turns`, feed them
572/// through the existing crystallization pipeline, and run the
573/// trajectory replay verifier on every accepted candidate. Mirrors
574/// [`super::release_fixture::ingest_release_fixture`] in shape so the
575/// CLI / orchestrator can route trajectory ingestion through one
576/// surface.
577///
578/// Returns `Ok(None)` when no segments cleared `min_segment_len`. When
579/// fewer than `min_examples` segments are produced, falls back to
580/// [`synthesize_candidate_from_trace`] using the first trace so a
581/// short trajectory still yields a candidate. Any additional segments
582/// not picked up by synthesis are still returned in
583/// [`TrajectoryIngestResult::traces`] so callers / verifiers / bundle
584/// builders can see them, and a `tracing::warn!` is emitted listing the
585/// ids of the traces the synthesis path did not consume.
586pub fn ingest_agent_loop_trajectory(
587    tap: &TrajectoryTap,
588    turns: &[AgentTurnRecord],
589    options: CrystallizeOptions,
590) -> Result<Option<TrajectoryIngestResult>, VmError> {
591    let traces = tap.collect(turns);
592    if traces.is_empty() {
593        return Ok(None);
594    }
595    let needs_synthesis = traces.len() < options.min_examples.max(2);
596    let (mut artifacts, trace_pool) = if needs_synthesis {
597        // Synthesis builds a single-trace candidate, but we must not
598        // silently discard the other traces — they still belong to
599        // the result so the bundle pipeline, verifier, and any
600        // downstream auditor can observe the full ingested set.
601        let trace_pool = traces.clone();
602        let mut iter = traces.into_iter();
603        let primary = iter.next().expect("non-empty by check above");
604        let dropped_from_synthesis: Vec<String> = iter.map(|t| t.id).collect();
605        if !dropped_from_synthesis.is_empty() {
606            tracing::warn!(
607                target: "harn_vm::crystallize::trajectory",
608                primary_trace_id = %primary.id,
609                dropped_trace_ids = ?dropped_from_synthesis,
610                min_examples = options.min_examples,
611                segment_count = trace_pool.len(),
612                "trajectory synthesis kept only the first trace; \
613                 remaining traces are surfaced via TrajectoryIngestResult.traces \
614                 but are not part of the synthesized candidate"
615            );
616        }
617        let artifacts = synthesize_candidate_from_trace(primary, options, Vec::new(), None, None)?;
618        (artifacts, trace_pool)
619    } else {
620        let trace_pool = traces.clone();
621        let artifacts = crystallize_traces(traces, options)?;
622        (artifacts, trace_pool)
623    };
624
625    apply_trajectory_verifier(&mut artifacts, &trace_pool);
626
627    Ok(Some(TrajectoryIngestResult {
628        artifacts,
629        traces: trace_pool,
630    }))
631}
632
633/// Bundle of artifacts produced by [`ingest_agent_loop_trajectory`].
634/// `traces` is the same slice that should be passed to
635/// [`super::bundle::build_crystallization_bundle`] so the bundle's
636/// fixtures and source-trace references line up.
637#[derive(Clone, Debug)]
638pub struct TrajectoryIngestResult {
639    pub artifacts: CrystallizationArtifacts,
640    pub traces: Vec<CrystallizationTrace>,
641}
642
643/// Run [`verify_trajectory_candidate`] across every accepted candidate
644/// in `artifacts`. Candidates whose verifier fails move from
645/// `candidates` to `rejected_candidates` and gain a rejection reason.
646pub fn apply_trajectory_verifier(
647    artifacts: &mut CrystallizationArtifacts,
648    traces: &[CrystallizationTrace],
649) {
650    let mut moved_ids = Vec::new();
651    for candidate in &mut artifacts.report.candidates {
652        // We verify against every trace the candidate references; the
653        // first failure is enough to disqualify the candidate.
654        for example in candidate.examples.clone() {
655            let Some(trace) = traces.iter().find(|trace| trace.id == example.trace_id) else {
656                continue;
657            };
658            if let Err(reason) = verify_trajectory_candidate(candidate, trace) {
659                candidate.rejection_reasons.push(reason);
660                moved_ids.push(candidate.id.clone());
661                break;
662            }
663        }
664    }
665    if moved_ids.is_empty() {
666        return;
667    }
668    let mut keep = Vec::new();
669    for candidate in std::mem::take(&mut artifacts.report.candidates) {
670        if moved_ids.contains(&candidate.id) {
671            artifacts.report.rejected_candidates.push(candidate);
672        } else {
673            keep.push(candidate);
674        }
675    }
676    artifacts.report.candidates = keep;
677    if artifacts
678        .report
679        .selected_candidate_id
680        .as_ref()
681        .is_some_and(|id| moved_ids.contains(id))
682    {
683        artifacts.report.selected_candidate_id = artifacts
684            .report
685            .candidates
686            .first()
687            .map(|candidate| candidate.id.clone());
688        if let Some(candidate) = artifacts.report.candidates.first() {
689            artifacts.harn_code = super::codegen::generate_harn_code(candidate);
690            artifacts.eval_pack_toml = super::codegen::generate_eval_pack(candidate);
691        } else {
692            artifacts.harn_code =
693                super::codegen::rejected_workflow_stub(&artifacts.report.rejected_candidates);
694            artifacts.eval_pack_toml.clear();
695        }
696    }
697}
698
699/// Convenience constructor for an [`AgentTurnRecord`] used by tests
700/// and replay drivers that have only the tool calls and want defaults
701/// for the rest.
702pub fn turn_record(
703    iteration: usize,
704    session_id: impl Into<String>,
705    tool_calls: Vec<AgentTurnToolCall>,
706) -> AgentTurnRecord {
707    AgentTurnRecord {
708        iteration,
709        session_id: session_id.into(),
710        success: true,
711        tool_calls,
712        started_at: Some(now_rfc3339()),
713        finished_at: Some(now_rfc3339()),
714        ..AgentTurnRecord::default()
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    fn call(name: &str, params: &[(&str, JsonValue)]) -> AgentTurnToolCall {
723        let mut parameters = BTreeMap::new();
724        let mut raw = serde_json::Map::new();
725        for (key, value) in params {
726            parameters.insert((*key).to_string(), value.clone());
727            raw.insert((*key).to_string(), value.clone());
728        }
729        AgentTurnToolCall {
730            tool_call_id: format!("call_{name}"),
731            tool_name: name.to_string(),
732            status: "completed".to_string(),
733            raw_input: JsonValue::Object(raw),
734            raw_output: Some(json!({"ok": true})),
735            parameters,
736            duration_ms: Some(10),
737            ..AgentTurnToolCall::default()
738        }
739    }
740
741    fn turn(iteration: usize, calls: Vec<AgentTurnToolCall>) -> AgentTurnRecord {
742        AgentTurnRecord {
743            iteration,
744            session_id: "session-test".to_string(),
745            success: true,
746            tool_calls: calls,
747            input_tokens: 100,
748            output_tokens: 50,
749            duration_ms: Some(20),
750            assistant_text: Some("ok".to_string()),
751            ..AgentTurnRecord::default()
752        }
753    }
754
755    #[test]
756    fn collects_consecutive_successful_turns_into_segments() {
757        let turns = vec![
758            turn(1, vec![call("git_status", &[("path", json!("."))])]),
759            turn(2, vec![call("git_status", &[("path", json!("."))])]),
760            AgentTurnRecord {
761                success: false,
762                ..turn(3, vec![call("git_status", &[("path", json!("."))])])
763            },
764            turn(4, vec![call("git_log", &[("path", json!("."))])]),
765            turn(5, vec![call("git_log", &[("path", json!("."))])]),
766        ];
767        let tap = TrajectoryTap::new("s1");
768        let traces = tap.collect(&turns);
769        assert_eq!(traces.len(), 2);
770        assert!(traces
771            .iter()
772            .all(|trace| trace.source.as_deref() == Some(TRAJECTORY_SOURCE)));
773        assert!(traces
774            .iter()
775            .all(|trace| trace.metadata.get("source") == Some(&json!(TRAJECTORY_SOURCE))));
776    }
777
778    #[test]
779    fn splits_segment_when_signatures_diverge() {
780        // The signature of a tool call depends on its name AND its
781        // parameter keys; switching from `git_status` to `git_diff`
782        // drops similarity below the default threshold.
783        let turns = vec![
784            turn(1, vec![call("git_status", &[("path", json!("."))])]),
785            turn(2, vec![call("git_status", &[("path", json!("."))])]),
786            turn(3, vec![call("git_diff", &[("path", json!("."))])]),
787            turn(4, vec![call("git_diff", &[("path", json!("."))])]),
788        ];
789        let tap = TrajectoryTap::new("s2").with_similarity_threshold(1.0);
790        let traces = tap.collect(&turns);
791        assert_eq!(traces.len(), 2, "expected one segment per signature group");
792    }
793
794    #[test]
795    fn segment_shorter_than_minimum_is_dropped() {
796        let turns = vec![turn(1, vec![call("git_status", &[("path", json!("."))])])];
797        let tap = TrajectoryTap::new("s3");
798        assert!(tap.collect(&turns).is_empty());
799    }
800
801    #[test]
802    fn collect_honors_custom_replay_allowlist() {
803        let turns = vec![
804            turn(1, vec![call("git_status", &[("path", json!("."))])]),
805            turn(2, vec![call("git_status", &[("path", json!("."))])]),
806        ];
807        let custom = vec![
808            ReplayAllowlistRule {
809                path: "/effect_receipts/*/timestamp".to_string(),
810                reason: "test override".to_string(),
811                replacement: None,
812            },
813            ReplayAllowlistRule {
814                path: "/custom_field".to_string(),
815                reason: "test override".to_string(),
816                replacement: None,
817            },
818        ];
819        let tap = TrajectoryTap::new("s-allowlist").with_replay_allowlist(custom.clone());
820        let traces = tap.collect(&turns);
821        assert_eq!(traces.len(), 1);
822        assert_eq!(
823            traces[0].replay_allowlist, custom,
824            "custom allowlist should be honored verbatim, not overridden by the default"
825        );
826
827        // Sanity: with no override the default is still used.
828        let default_tap = TrajectoryTap::new("s-default");
829        let default_traces = default_tap.collect(&turns);
830        assert_eq!(default_traces.len(), 1);
831        assert_eq!(
832            default_traces[0].replay_allowlist,
833            default_trajectory_allowlist()
834        );
835    }
836
837    #[test]
838    fn verifier_passes_on_clean_candidate() {
839        let turns = vec![
840            turn(1, vec![call("git_status", &[("path", json!("."))])]),
841            turn(2, vec![call("git_status", &[("path", json!("."))])]),
842        ];
843        let tap = TrajectoryTap::new("s4");
844        let result = ingest_agent_loop_trajectory(
845            &tap,
846            &turns,
847            CrystallizeOptions {
848                min_examples: 1,
849                workflow_name: Some("verifier_clean".to_string()),
850                ..CrystallizeOptions::default()
851            },
852        )
853        .expect("ingest")
854        .expect("at least one trace");
855        assert!(
856            !result.artifacts.report.candidates.is_empty(),
857            "expected at least one accepted candidate"
858        );
859    }
860}