Skip to main content

clark_agent/
trajectory.rs

1//! Trajectory capture: ordered, failable, sequence-numbered run record.
2//!
3//! ## Why a separate sink
4//!
5//! [`crate::event::EventSink`] is the loop's streaming UI channel:
6//! best-effort `emit(event)` that returns nothing, with the explicit
7//! contract "failures must not propagate out of the loop." That makes
8//! it the right shape for streaming a UI but the wrong shape for
9//! trajectory capture, where a single dropped event corrupts replay
10//! and eval.
11//!
12//! `TrajectorySink` is the durable counterpart. Three guarantees the
13//! event sink does not make:
14//!
15//! 1. **Ordered.** Every record carries a monotonic per-run `seq`. A
16//!    consumer that observes `seq = N` is guaranteed to have observed
17//!    every record with `seq < N`.
18//!
19//! 2. **Failable.** `record` returns `Result<_, TrajectoryError>`.
20//!    Callers that wire a `TrajectorySink` into the loop via
21//!    [`TrajectoryRecorder`] choose the policy: drop and continue
22//!    (default), or escalate to a typed error. The sink itself stays
23//!    pure — it surfaces failures, never decides what the loop does.
24//!
25//! 3. **Run-scoped.** Records are keyed by `run_id` (when known) so
26//!    parent/child trajectories live in the same store under
27//!    different ids. The `seq` resets per run.
28//!
29//! ## Wiring
30//!
31//! Construct a [`TrajectoryRecorder`] around a `TrajectorySink`, then
32//! register the recorder as an `EventObserver` plugin. The recorder
33//! filters [`AgentEvent`]s into [`TrajectoryRecord`]s, stamps each
34//! with a sequence number, and forwards to the sink.
35//!
36//! ```ignore
37//! let sink: Arc<dyn TrajectorySink> = Arc::new(InMemoryTrajectorySink::default());
38//! let recorder = Arc::new(TrajectoryRecorder::new(sink.clone()));
39//! AgentBuilder::new()
40//!     .event_observer_arc(recorder.clone())
41//!     .stream(...)
42//!     .build()
43//! ```
44//!
45//! After the run, drain the sink. Records are in order by `seq`.
46
47use std::sync::atomic::{AtomicU64, Ordering};
48use std::sync::Arc;
49
50use async_trait::async_trait;
51use serde::{Deserialize, Serialize};
52use tokio::sync::Mutex;
53
54use crate::event::AgentEvent;
55use crate::plugin::{Plugin, PluginCapabilities};
56use crate::types::{AgentMessage, RunIdentity};
57
58// ─── Records ───────────────────────────────────────────────────────
59
60/// Schema version stamped on every [`TrajectoryRecord`].
61///
62/// Bump when the record or payload shape changes in a way a consumer
63/// must branch on. Persisted records carry the version they were written
64/// with; readers can migrate or reject by inspecting [`TrajectoryRecord::schema_version`].
65/// Records written before this field existed deserialize as `0` (the
66/// serde default), so `0` means "pre-versioning".
67pub const TRAJECTORY_SCHEMA_VERSION: u32 = 1;
68
69/// Default used when deserializing a record that predates the
70/// `schema_version` field. Intentionally `0`, not the current version, so
71/// a missing field reads as "unknown / pre-versioning" rather than
72/// silently claiming to match the current schema.
73fn pre_versioning_schema() -> u32 {
74    0
75}
76
77/// One durable record emitted from a run.
78///
79/// Each record carries a monotonic per-run `seq`, an optional
80/// `run_id` (resolved as soon as the loop emits
81/// [`AgentEvent::RunIdentified`]; `None` for events that precede
82/// it), and a typed [`TrajectoryPayload`].
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct TrajectoryRecord {
85    /// Schema version this record was written with. New records are
86    /// stamped with [`TRAJECTORY_SCHEMA_VERSION`]; records written before
87    /// versioning existed deserialize as `0`.
88    #[serde(default = "pre_versioning_schema")]
89    pub schema_version: u32,
90    pub seq: u64,
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub run_id: Option<String>,
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub parent_run_id: Option<String>,
95    #[serde(default)]
96    pub depth: usize,
97    /// UNIX milliseconds at the time the record was produced.
98    pub recorded_at_unix_ms: u64,
99    pub payload: TrajectoryPayload,
100}
101
102/// Typed payload of a trajectory record. The variants are a curated
103/// subset of [`AgentEvent`] — the things a replay or eval actually
104/// needs. Streaming-only events (`MessageUpdate`, `ToolExecutionUpdate`)
105/// are intentionally omitted; they belong on the streaming channel.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(tag = "kind", rename_all = "snake_case")]
108pub enum TrajectoryPayload {
109    RunStarted {
110        identity: RunIdentity,
111    },
112    RunEnded {
113        outcome: String,
114        new_messages: Vec<AgentMessage>,
115    },
116    TurnStarted,
117    TurnEnded {
118        assistant: AgentMessage,
119        tool_results: Vec<AgentMessage>,
120    },
121    /// Messages appended to the transcript (user, assistant, or tool
122    /// result). Final form only — no streaming deltas.
123    MessageAppended {
124        message: AgentMessage,
125    },
126    ToolStarted {
127        tool_call_id: String,
128        tool_name: String,
129        args: serde_json::Value,
130    },
131    ToolEnded {
132        tool_call_id: String,
133        tool_name: String,
134        result: crate::tool::ToolResult,
135        is_error: bool,
136    },
137    /// One LLM call's request snapshot, post-transform and post-gate.
138    /// Captures the typed view of "what the model saw this turn."
139    ProviderRequestPrepared {
140        iteration: usize,
141        model_id: Option<String>,
142        system_prompt_chars: usize,
143        message_count: usize,
144        tool_count: usize,
145        tools: Vec<String>,
146    },
147    /// A `ContextTransform` plugin ran. Carries only the plugin name
148    /// and the before/after message counts so durable trajectories
149    /// stay compact; the full diff stays on the streaming channel for
150    /// listeners that want it.
151    ContextTransformApplied {
152        iteration: usize,
153        plugin: String,
154        before_count: usize,
155        after_count: usize,
156    },
157    /// A `ToolGate` plugin contributed an allowlist for this turn.
158    ToolGateApplied {
159        iteration: usize,
160        plugin: String,
161        #[serde(default, skip_serializing_if = "Option::is_none")]
162        allow: Option<Vec<String>>,
163    },
164    /// The final intersected allowlist would have been empty, so the loop
165    /// selected a deterministic owner allowlist instead of advertising zero
166    /// tools to the provider.
167    ToolGateConflictResolved {
168        iteration: usize,
169        plugins: Vec<String>,
170        #[serde(default, skip_serializing_if = "Option::is_none")]
171        chosen_plugin: Option<String>,
172        allow: Vec<String>,
173        reason: String,
174    },
175    /// The loop discarded a truncated turn and re-streamed.
176    OutputTokensEscalation {
177        attempt: u8,
178        prev_cap: u32,
179        new_cap: u32,
180    },
181}
182
183/// Errors a `TrajectorySink` may surface.
184#[derive(Debug, thiserror::Error)]
185pub enum TrajectoryError {
186    #[error("trajectory sink rejected record: {0}")]
187    Rejected(String),
188    #[error("trajectory sink i/o failure: {0}")]
189    Io(String),
190}
191
192// ─── Sink trait ────────────────────────────────────────────────────
193
194/// Durable trajectory sink. Implementations persist records in order;
195/// returning `Err` surfaces the failure to the caller (the
196/// [`TrajectoryRecorder`] applies the loop's chosen policy).
197///
198/// Implementations MUST preserve the order in which `record` is
199/// called. The sink does not need to be re-entrant; the recorder
200/// serializes calls with an internal mutex when wired as an
201/// `EventObserver`.
202#[async_trait]
203pub trait TrajectorySink: Send + Sync {
204    async fn record(&self, record: TrajectoryRecord) -> Result<(), TrajectoryError>;
205}
206
207// ─── In-memory sink ───────────────────────────────────────────────
208
209/// Simple `Vec`-backed sink. Useful for tests, eval harnesses that
210/// load the whole trajectory in memory, and the replay path.
211#[derive(Debug, Default)]
212pub struct InMemoryTrajectorySink {
213    records: Mutex<Vec<TrajectoryRecord>>,
214}
215
216impl InMemoryTrajectorySink {
217    pub fn new() -> Self {
218        Self::default()
219    }
220
221    pub async fn snapshot(&self) -> Vec<TrajectoryRecord> {
222        self.records.lock().await.clone()
223    }
224
225    pub async fn len(&self) -> usize {
226        self.records.lock().await.len()
227    }
228
229    pub async fn is_empty(&self) -> bool {
230        self.records.lock().await.is_empty()
231    }
232}
233
234#[async_trait]
235impl TrajectorySink for InMemoryTrajectorySink {
236    async fn record(&self, record: TrajectoryRecord) -> Result<(), TrajectoryError> {
237        self.records.lock().await.push(record);
238        Ok(())
239    }
240}
241
242// ─── Recorder (EventObserver) ─────────────────────────────────────
243
244/// Plugin that filters [`AgentEvent`]s into [`TrajectoryRecord`]s and
245/// forwards to a [`TrajectorySink`]. Stamps each record with a
246/// monotonic per-run sequence number and resolves the run id from
247/// [`AgentEvent::RunIdentified`].
248///
249/// Register as an `EventObserver` on the parent run; child runs that
250/// reuse the same recorder share the sequence space and stay
251/// distinguishable by `run_id`/`parent_run_id`. For a strict
252/// per-run sequence reset, register a fresh recorder per spawn.
253pub struct TrajectoryRecorder {
254    sink: Arc<dyn TrajectorySink>,
255    seq: AtomicU64,
256    identity: Mutex<Option<RunIdentity>>,
257}
258
259impl TrajectoryRecorder {
260    pub fn new(sink: Arc<dyn TrajectorySink>) -> Self {
261        Self {
262            sink,
263            seq: AtomicU64::new(0),
264            identity: Mutex::new(None),
265        }
266    }
267
268    async fn record(&self, payload: TrajectoryPayload) {
269        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
270        let identity = self.identity.lock().await.clone();
271        let recorded_at_unix_ms = std::time::SystemTime::now()
272            .duration_since(std::time::UNIX_EPOCH)
273            .map(|d| d.as_millis() as u64)
274            .unwrap_or(0);
275        let record = TrajectoryRecord {
276            schema_version: TRAJECTORY_SCHEMA_VERSION,
277            seq,
278            run_id: identity.as_ref().map(|i| i.run_id.clone()),
279            parent_run_id: identity.as_ref().and_then(|i| i.parent_run_id.clone()),
280            depth: identity.as_ref().map(|i| i.depth).unwrap_or(0),
281            recorded_at_unix_ms,
282            payload,
283        };
284        if let Err(e) = self.sink.record(record).await {
285            tracing::warn!(error = %e, "trajectory sink rejected record; continuing");
286        }
287    }
288}
289
290impl Plugin for TrajectoryRecorder {
291    fn name(&self) -> &'static str {
292        "trajectory_recorder"
293    }
294
295    fn capabilities(&self) -> PluginCapabilities {
296        PluginCapabilities::event_observer()
297    }
298}
299
300#[async_trait]
301impl crate::plugin::EventObserver for TrajectoryRecorder {
302    async fn on_event(&self, event: &AgentEvent) {
303        match event {
304            AgentEvent::AgentStart => {
305                // Reset sequence and identity for a fresh run. Concurrent
306                // re-use across runs is not supported — register a fresh
307                // recorder per run if you need per-run isolation.
308                self.seq.store(0, Ordering::SeqCst);
309                *self.identity.lock().await = None;
310            }
311            AgentEvent::RunIdentified { identity } => {
312                *self.identity.lock().await = Some(identity.clone());
313                self.record(TrajectoryPayload::RunStarted {
314                    identity: identity.clone(),
315                })
316                .await;
317            }
318            AgentEvent::AgentEnd { messages } => {
319                self.record(TrajectoryPayload::RunEnded {
320                    outcome: "ended".to_string(),
321                    new_messages: messages.clone(),
322                })
323                .await;
324            }
325            AgentEvent::TurnStart => {
326                self.record(TrajectoryPayload::TurnStarted).await;
327            }
328            AgentEvent::TurnEnd {
329                message,
330                tool_results,
331            } => {
332                self.record(TrajectoryPayload::TurnEnded {
333                    assistant: message.clone(),
334                    tool_results: tool_results.clone(),
335                })
336                .await;
337            }
338            AgentEvent::MessageEnd { message } => {
339                self.record(TrajectoryPayload::MessageAppended {
340                    message: message.clone(),
341                })
342                .await;
343            }
344            AgentEvent::ToolExecutionStart {
345                tool_call_id,
346                tool_name,
347                args,
348            } => {
349                self.record(TrajectoryPayload::ToolStarted {
350                    tool_call_id: tool_call_id.clone(),
351                    tool_name: tool_name.clone(),
352                    args: args.clone(),
353                })
354                .await;
355            }
356            AgentEvent::ToolExecutionEnd {
357                tool_call_id,
358                tool_name,
359                result,
360                is_error,
361            } => {
362                self.record(TrajectoryPayload::ToolEnded {
363                    tool_call_id: tool_call_id.clone(),
364                    tool_name: tool_name.clone(),
365                    result: result.clone(),
366                    is_error: *is_error,
367                })
368                .await;
369            }
370            AgentEvent::ProviderRequestPrepared {
371                iteration,
372                model_id,
373                system_prompt,
374                messages,
375                tools,
376                ..
377            } => {
378                self.record(TrajectoryPayload::ProviderRequestPrepared {
379                    iteration: *iteration,
380                    model_id: model_id.clone(),
381                    system_prompt_chars: system_prompt.chars().count(),
382                    message_count: messages.len(),
383                    tool_count: tools.len(),
384                    tools: tools.iter().map(|t| t.name.clone()).collect(),
385                })
386                .await;
387            }
388            AgentEvent::ContextTransformApplied {
389                iteration,
390                plugin,
391                before,
392                after,
393            } => {
394                self.record(TrajectoryPayload::ContextTransformApplied {
395                    iteration: *iteration,
396                    plugin: (*plugin).to_string(),
397                    before_count: before.len(),
398                    after_count: after.len(),
399                })
400                .await;
401            }
402            AgentEvent::ToolGateApplied {
403                iteration,
404                plugin,
405                allow,
406            } => {
407                self.record(TrajectoryPayload::ToolGateApplied {
408                    iteration: *iteration,
409                    plugin: (*plugin).to_string(),
410                    allow: allow.clone(),
411                })
412                .await;
413            }
414            AgentEvent::ToolGateConflictResolved {
415                iteration,
416                plugins,
417                chosen_plugin,
418                allow,
419                reason,
420            } => {
421                self.record(TrajectoryPayload::ToolGateConflictResolved {
422                    iteration: *iteration,
423                    plugins: plugins.clone(),
424                    chosen_plugin: chosen_plugin.clone(),
425                    allow: allow.clone(),
426                    reason: reason.clone(),
427                })
428                .await;
429            }
430            AgentEvent::OutputTokensEscalation {
431                attempt,
432                prev_cap,
433                new_cap,
434            } => {
435                self.record(TrajectoryPayload::OutputTokensEscalation {
436                    attempt: *attempt,
437                    prev_cap: *prev_cap,
438                    new_cap: *new_cap,
439                })
440                .await;
441            }
442            AgentEvent::MessageStart { .. }
443            | AgentEvent::MessageUpdate { .. }
444            | AgentEvent::ToolExecutionUpdate { .. } => {
445                // Streaming-only deltas. The streaming `EventSink` is
446                // the right channel for these; durable trajectory
447                // captures the final assembled message via
448                // `MessageEnd`/`TurnEnd`.
449            }
450        }
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::plugin::EventObserver;
458    use crate::types::{AssistantContent, StopReason};
459
460    #[tokio::test]
461    async fn recorder_writes_ordered_records_with_run_id() {
462        let sink = Arc::new(InMemoryTrajectorySink::new());
463        let recorder = TrajectoryRecorder::new(sink.clone());
464
465        recorder.on_event(&AgentEvent::AgentStart).await;
466        let identity = RunIdentity::root().with_conversation_id("conv-1");
467        recorder
468            .on_event(&AgentEvent::RunIdentified {
469                identity: identity.clone(),
470            })
471            .await;
472        recorder.on_event(&AgentEvent::TurnStart).await;
473        recorder
474            .on_event(&AgentEvent::TurnEnd {
475                message: AgentMessage::Assistant {
476                    content: AssistantContent { blocks: Vec::new() },
477                    stop_reason: StopReason::EndTurn,
478                    error_message: None,
479                    timestamp: None,
480                    usage: None,
481                },
482                tool_results: Vec::new(),
483            })
484            .await;
485        recorder
486            .on_event(&AgentEvent::AgentEnd {
487                messages: Vec::new(),
488            })
489            .await;
490
491        let records = sink.snapshot().await;
492        // AgentStart resets but emits no record; RunIdentified is first.
493        assert_eq!(records.len(), 4);
494        assert!(matches!(
495            records[0].payload,
496            TrajectoryPayload::RunStarted { .. }
497        ));
498        assert!(matches!(records[1].payload, TrajectoryPayload::TurnStarted));
499        assert!(matches!(
500            records[2].payload,
501            TrajectoryPayload::TurnEnded { .. }
502        ));
503        assert!(matches!(
504            records[3].payload,
505            TrajectoryPayload::RunEnded { .. }
506        ));
507
508        for (i, r) in records.iter().enumerate() {
509            assert_eq!(r.seq, i as u64);
510            assert_eq!(r.run_id.as_deref(), Some(identity.run_id.as_str()));
511            assert_eq!(
512                r.schema_version, TRAJECTORY_SCHEMA_VERSION,
513                "new records carry the current schema version"
514            );
515        }
516    }
517
518    #[test]
519    fn record_missing_schema_version_deserializes_as_pre_versioning() {
520        // A record persisted before the `schema_version` field existed
521        // must still deserialize, reading as `0` (pre-versioning) rather
522        // than silently claiming to match the current schema.
523        let json = serde_json::json!({
524            "seq": 7,
525            "recorded_at_unix_ms": 123,
526            "payload": { "kind": "turn_started" }
527        });
528        let record: TrajectoryRecord =
529            serde_json::from_value(json).expect("legacy record deserializes");
530        assert_eq!(record.schema_version, 0);
531        assert_eq!(record.seq, 7);
532    }
533
534    #[test]
535    fn record_round_trips_with_schema_version() {
536        let record = TrajectoryRecord {
537            schema_version: TRAJECTORY_SCHEMA_VERSION,
538            seq: 1,
539            run_id: Some("r1".into()),
540            parent_run_id: None,
541            depth: 0,
542            recorded_at_unix_ms: 1,
543            payload: TrajectoryPayload::TurnStarted,
544        };
545        let json = serde_json::to_value(&record).expect("serialize");
546        assert_eq!(
547            json["schema_version"],
548            serde_json::json!(TRAJECTORY_SCHEMA_VERSION)
549        );
550        let back: TrajectoryRecord = serde_json::from_value(json).expect("deserialize");
551        assert_eq!(back.schema_version, TRAJECTORY_SCHEMA_VERSION);
552    }
553
554    #[tokio::test]
555    async fn recorder_skips_streaming_only_events() {
556        let sink = Arc::new(InMemoryTrajectorySink::new());
557        let recorder = TrajectoryRecorder::new(sink.clone());
558
559        let msg = AgentMessage::User {
560            content: crate::types::UserContent::Text("hi".into()),
561            timestamp: None,
562        };
563        recorder
564            .on_event(&AgentEvent::MessageStart {
565                message: msg.clone(),
566            })
567            .await;
568        recorder
569            .on_event(&AgentEvent::ToolExecutionUpdate {
570                tool_call_id: "1".into(),
571                tool_name: "shell".into(),
572                partial: crate::tool::ToolResult::text("partial"),
573            })
574            .await;
575
576        assert!(sink.is_empty().await);
577    }
578}