Skip to main content

lash_trace/
lib.rs

1use std::collections::BTreeMap;
2use std::fs::OpenOptions;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use sha2::{Digest, Sha256};
10
11mod lashlang_graph;
12#[cfg(feature = "otel")]
13pub mod otel;
14
15pub use lashlang_graph::{
16    TraceLashlangEdgeSelection, TraceLashlangGraph, TraceLashlangGraphChildLink,
17    TraceLashlangGraphEdge, TraceLashlangGraphNode, TraceLashlangGraphStore,
18    TraceLashlangNodeStatus,
19};
20
21pub const TRACE_SCHEMA_VERSION: u32 = 2;
22
23#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum TraceLevel {
26    #[default]
27    Standard,
28    Extended,
29}
30
31impl TraceLevel {
32    pub fn is_extended(self) -> bool {
33        matches!(self, Self::Extended)
34    }
35}
36
37#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
38pub struct TraceContext {
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub run_id: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub experiment_id: Option<String>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub candidate_id: Option<String>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub candidate_parent_id: Option<String>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub example_id: Option<String>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub split: Option<String>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub session_id: Option<String>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub turn_id: Option<String>,
55    /// Stable id of the span this record represents (e.g. `turn:<session>:<turn>`,
56    /// `llm:<call_id>`, `tool:<call_id>`). Populated by the runtime for turn /
57    /// llm / tool / session records so a consumer can build a nested span tree
58    /// from `(graph_node_id, parent_graph_node_id)` with a single `id -> span`
59    /// map. Lashlang execution sets its own graph node id here.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub graph_node_id: Option<String>,
62    /// Id of the enclosing span — the value of some other record's
63    /// `graph_node_id`. A turn's parent is its causal origin (the spawning tool
64    /// call / effect, via `caused_by`) when known, otherwise the session root.
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub parent_graph_node_id: Option<String>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub turn_index: Option<usize>,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub protocol_iteration: Option<usize>,
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub effect_id: Option<String>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub llm_call_id: Option<String>,
75    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
76    pub metadata: BTreeMap<String, Value>,
77}
78
79impl TraceContext {
80    pub fn for_session(mut self, session_id: impl Into<String>) -> Self {
81        self.session_id = Some(session_id.into());
82        self
83    }
84
85    pub fn for_turn_index(mut self, turn_index: usize) -> Self {
86        self.turn_index = Some(turn_index);
87        self
88    }
89
90    pub fn for_turn(mut self, turn_id: impl Into<String>) -> Self {
91        self.turn_id = Some(turn_id.into());
92        self
93    }
94
95    pub fn for_protocol_iteration(mut self, protocol_iteration: usize) -> Self {
96        self.protocol_iteration = Some(protocol_iteration);
97        self
98    }
99
100    pub fn for_llm_call(mut self, llm_call_id: impl Into<String>) -> Self {
101        self.llm_call_id = Some(llm_call_id.into());
102        self
103    }
104}
105
106#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
107pub struct TraceRecord {
108    pub schema_version: u32,
109    pub id: String,
110    pub timestamp: String,
111    pub context: TraceContext,
112    #[serde(flatten)]
113    pub event: TraceEvent,
114}
115
116impl TraceRecord {
117    pub fn new(context: TraceContext, event: TraceEvent) -> Self {
118        Self::new_with_timestamp(context, event, chrono::Utc::now())
119    }
120
121    pub fn new_with_timestamp(
122        context: TraceContext,
123        event: TraceEvent,
124        timestamp: chrono::DateTime<chrono::Utc>,
125    ) -> Self {
126        Self {
127            schema_version: TRACE_SCHEMA_VERSION,
128            id: uuid::Uuid::new_v4().to_string(),
129            timestamp: timestamp.to_rfc3339(),
130            context,
131            event,
132        }
133    }
134}
135
136#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
137#[serde(tag = "type", rename_all = "snake_case")]
138#[allow(
139    clippy::large_enum_variant,
140    reason = "TraceEvent is a public DTO; keeping event payloads inline preserves ergonomic pattern matching"
141)]
142pub enum TraceEvent {
143    SessionStarted {
144        #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
145        metadata: BTreeMap<String, Value>,
146    },
147    TurnStarted {
148        #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
149        metadata: BTreeMap<String, Value>,
150    },
151    PromptBuilt {
152        prompt_hash: String,
153        prompt_chars: usize,
154        #[serde(default, skip_serializing_if = "Vec::is_empty")]
155        components: Vec<TracePromptComponent>,
156    },
157    LlmCallStarted {
158        request: TraceLlmRequest,
159    },
160    LlmCallCompleted {
161        response: TraceLlmResponse,
162        #[serde(default, skip_serializing_if = "Option::is_none")]
163        usage: Option<TraceTokenUsage>,
164        #[serde(default, skip_serializing_if = "Option::is_none")]
165        provider_usage: Option<Value>,
166        #[serde(default, skip_serializing_if = "Option::is_none")]
167        stream_summary: Option<Value>,
168    },
169    LlmCallFailed {
170        error: TraceError,
171        #[serde(default, skip_serializing_if = "Option::is_none")]
172        stream_summary: Option<Value>,
173    },
174    ProviderStreamEvent {
175        event: TraceProviderStreamEvent,
176    },
177    RuntimeStreamEvent {
178        event: TraceRuntimeStreamEvent,
179    },
180    ToolCallStarted {
181        call_id: Option<String>,
182        name: String,
183        args: Value,
184    },
185    ToolCallCompleted {
186        call_id: Option<String>,
187        name: String,
188        args: Value,
189        output: TraceToolCallOutput,
190        duration_ms: u64,
191    },
192    ProtocolStep {
193        plugin_id: String,
194        payload: Value,
195    },
196    TokenUsage {
197        usage: TraceTokenUsage,
198        #[serde(default, skip_serializing_if = "Option::is_none")]
199        cumulative: Option<TraceTokenUsage>,
200    },
201    LashlangExecution {
202        event: TraceLashlangExecutionEvent,
203    },
204    TurnCompleted {
205        status: String,
206        done_reason: String,
207        #[serde(default, skip_serializing_if = "Option::is_none")]
208        agent_frame_switch: Option<TraceAgentFrameSwitch>,
209    },
210    Custom {
211        name: String,
212        payload: Value,
213    },
214}
215
216#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
217pub struct TraceToolCallOutput {
218    pub outcome: TraceToolCallOutcome,
219    #[serde(default, skip_serializing_if = "Option::is_none")]
220    pub control: Option<Value>,
221}
222
223impl TraceToolCallOutput {
224    pub fn status(&self) -> TraceToolCallStatus {
225        match self.outcome {
226            TraceToolCallOutcome::Success(_) => TraceToolCallStatus::Success,
227            TraceToolCallOutcome::Failure(_) => TraceToolCallStatus::Failure,
228            TraceToolCallOutcome::Cancelled(_) => TraceToolCallStatus::Cancelled,
229        }
230    }
231
232    pub fn is_success(&self) -> bool {
233        self.status() == TraceToolCallStatus::Success
234    }
235
236    pub fn value_for_projection(&self) -> Value {
237        match &self.outcome {
238            TraceToolCallOutcome::Success(value)
239            | TraceToolCallOutcome::Failure(value)
240            | TraceToolCallOutcome::Cancelled(value) => value.clone(),
241        }
242    }
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
247pub enum TraceToolCallOutcome {
248    Success(Value),
249    Failure(Value),
250    Cancelled(Value),
251}
252
253#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
254#[serde(rename_all = "snake_case")]
255pub enum TraceToolCallStatus {
256    Success,
257    Failure,
258    Cancelled,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct TracePromptComponent {
263    pub id: String,
264    pub kind: String,
265    pub hash: String,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub chars: Option<usize>,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TraceLlmRequest {
272    pub model: String,
273    #[serde(default, skip_serializing_if = "Option::is_none")]
274    pub model_variant: Option<String>,
275    pub messages: Vec<TraceLlmMessage>,
276    #[serde(default, skip_serializing_if = "Vec::is_empty")]
277    pub attachments: Vec<TraceAttachment>,
278    #[serde(default, skip_serializing_if = "Vec::is_empty")]
279    pub tools: Vec<TraceToolSpec>,
280    pub tool_choice: String,
281    #[serde(default, skip_serializing_if = "Option::is_none")]
282    pub output_spec: Option<Value>,
283    pub stream: bool,
284}
285
286#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
287pub struct TraceLlmMessage {
288    pub role: String,
289    pub blocks: Vec<TraceContentBlock>,
290}
291
292#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
293#[serde(tag = "kind", rename_all = "snake_case")]
294pub enum TraceContentBlock {
295    Text {
296        text: String,
297        #[serde(default, skip_serializing_if = "is_false")]
298        cache_breakpoint: bool,
299    },
300    Image {
301        attachment_idx: usize,
302    },
303    ToolCall {
304        call_id: Option<String>,
305        tool_name: String,
306        input_json: Value,
307        item_id: Option<String>,
308        has_signature: bool,
309    },
310    ToolResult {
311        call_id: Option<String>,
312        tool_name: Option<String>,
313        content: String,
314    },
315    Reasoning {
316        text: String,
317        item_id: Option<String>,
318        summary: Vec<String>,
319        has_encrypted: bool,
320        redacted: bool,
321    },
322}
323
324fn is_false(value: &bool) -> bool {
325    !*value
326}
327
328#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
329pub struct TraceAttachment {
330    pub mime: String,
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    pub filename: Option<String>,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub bytes_sha256: Option<String>,
335    #[serde(default, skip_serializing_if = "Option::is_none")]
336    pub bytes_len: Option<usize>,
337}
338
339#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
340pub struct TraceToolSpec {
341    pub name: String,
342    pub description: String,
343    pub input_schema: Value,
344    pub output_schema: Value,
345}
346
347#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
348pub struct TraceLlmResponse {
349    pub text: String,
350    pub duration_ms: u64,
351    #[serde(default, skip_serializing_if = "Option::is_none")]
352    pub terminal_reason: Option<String>,
353    #[serde(default, skip_serializing_if = "Option::is_none")]
354    pub parts: Option<Value>,
355}
356
357#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
358pub struct TraceProviderStreamEvent {
359    pub provider: String,
360    pub sequence: u64,
361    pub elapsed_ms: u64,
362    pub event_name: String,
363    #[serde(default, skip_serializing_if = "Option::is_none")]
364    pub item_id: Option<String>,
365    #[serde(default, skip_serializing_if = "Option::is_none")]
366    pub output_index: Option<i64>,
367    pub raw_len: usize,
368    pub raw_sha256: String,
369    #[serde(default, skip_serializing_if = "Option::is_none")]
370    pub raw_json: Option<Value>,
371}
372
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct TraceRuntimeStreamEvent {
375    pub sequence: u64,
376    pub elapsed_ms: u64,
377    pub event_name: String,
378    #[serde(default, skip_serializing_if = "Option::is_none")]
379    pub raw_text: Option<String>,
380    #[serde(default, skip_serializing_if = "Option::is_none")]
381    pub visible_text: Option<String>,
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    pub item_id: Option<String>,
384    #[serde(default, skip_serializing_if = "Option::is_none")]
385    pub output_index: Option<i64>,
386    #[serde(default, skip_serializing_if = "Option::is_none")]
387    pub call_id: Option<String>,
388    #[serde(default, skip_serializing_if = "Option::is_none")]
389    pub tool_name: Option<String>,
390    #[serde(default, skip_serializing_if = "Option::is_none")]
391    pub input_json: Option<Value>,
392    #[serde(default, skip_serializing_if = "Option::is_none")]
393    pub usage: Option<TraceTokenUsage>,
394}
395
396#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
397pub struct TraceTokenUsage {
398    pub input_tokens: i64,
399    pub output_tokens: i64,
400    pub cache_read_input_tokens: i64,
401    pub cache_write_input_tokens: i64,
402    pub reasoning_output_tokens: i64,
403}
404
405#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
406pub struct TraceAgentFrameSwitch {
407    pub frame_id: String,
408}
409
410#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
411pub struct TraceRuntimeScope {
412    pub session_id: String,
413    #[serde(default, skip_serializing_if = "Option::is_none")]
414    pub turn_id: Option<String>,
415    #[serde(default, skip_serializing_if = "Option::is_none")]
416    pub turn_index: Option<usize>,
417    #[serde(default, skip_serializing_if = "Option::is_none")]
418    pub protocol_iteration: Option<usize>,
419}
420
421impl TraceRuntimeScope {
422    pub fn new(session_id: impl Into<String>) -> Self {
423        Self {
424            session_id: session_id.into(),
425            turn_id: None,
426            turn_index: None,
427            protocol_iteration: None,
428        }
429    }
430}
431
432#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
433#[serde(tag = "type", rename_all = "snake_case")]
434pub enum TraceRuntimeSubject {
435    Effect { effect_id: String, kind: String },
436    Process { process_id: String },
437}
438
439impl TraceRuntimeSubject {
440    pub fn graph_key(&self, scope: &TraceRuntimeScope) -> String {
441        match self {
442            Self::Effect { effect_id, .. } => match scope.turn_id.as_deref() {
443                Some(turn_id) if !turn_id.is_empty() => {
444                    format!("effect:{}:{turn_id}:{effect_id}", scope.session_id)
445                }
446                _ => format!("effect:{}:{effect_id}", scope.session_id),
447            },
448            Self::Process { process_id } => format!("process:{process_id}"),
449        }
450    }
451}
452
453#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
454pub struct TraceLashlangExecutionIdentity {
455    pub scope: TraceRuntimeScope,
456    pub subject: TraceRuntimeSubject,
457    pub module_ref: String,
458    pub entry_kind: String,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    pub entry_ref: Option<String>,
461    pub entry_name: String,
462}
463
464impl TraceLashlangExecutionIdentity {
465    pub fn graph_key(&self) -> String {
466        self.subject.graph_key(&self.scope)
467    }
468}
469
470#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
471#[serde(tag = "kind", rename_all = "snake_case")]
472pub enum TraceLashlangExecutionEvent {
473    ExecutionStarted {
474        event_key: String,
475        identity: TraceLashlangExecutionIdentity,
476        execution_map: TraceLashlangMap,
477    },
478    ExecutionFinished {
479        event_key: String,
480        identity: TraceLashlangExecutionIdentity,
481        status: TraceLashlangStatus,
482        #[serde(default, skip_serializing_if = "Option::is_none")]
483        error: Option<String>,
484    },
485    NodeStarted {
486        event_key: String,
487        identity: TraceLashlangExecutionIdentity,
488        node_id: String,
489        node_kind: String,
490        label: String,
491        occurrence: u64,
492    },
493    NodeCompleted {
494        event_key: String,
495        identity: TraceLashlangExecutionIdentity,
496        node_id: String,
497        node_kind: String,
498        label: String,
499        occurrence: u64,
500    },
501    NodeFailed {
502        event_key: String,
503        identity: TraceLashlangExecutionIdentity,
504        node_id: String,
505        node_kind: String,
506        label: String,
507        occurrence: u64,
508        error: String,
509    },
510    BranchSelected {
511        event_key: String,
512        identity: TraceLashlangExecutionIdentity,
513        node_id: String,
514        occurrence: u64,
515        edge_id: String,
516        selected: TraceBranchSelection,
517    },
518    ChildStarted {
519        event_key: String,
520        identity: TraceLashlangExecutionIdentity,
521        parent_node_id: String,
522        occurrence: u64,
523        child: TraceLashlangChildExecution,
524    },
525}
526
527#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
528pub struct TraceLashlangChildExecution {
529    pub scope: TraceRuntimeScope,
530    pub subject: TraceRuntimeSubject,
531    #[serde(default, skip_serializing_if = "Option::is_none")]
532    pub module_ref: Option<String>,
533    #[serde(default, skip_serializing_if = "Option::is_none")]
534    pub entry_ref: Option<String>,
535    #[serde(default, skip_serializing_if = "Option::is_none")]
536    pub entry_name: Option<String>,
537}
538
539impl TraceLashlangChildExecution {
540    pub fn graph_key(&self) -> String {
541        self.subject.graph_key(&self.scope)
542    }
543}
544
545#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
546#[serde(rename_all = "snake_case")]
547pub enum TraceLashlangStatus {
548    Running,
549    Completed,
550    Failed,
551    Cancelled,
552}
553
554#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
555#[serde(rename_all = "snake_case")]
556pub enum TraceBranchSelection {
557    Then,
558    Else,
559}
560
561#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
562pub struct TraceLashlangMap {
563    pub module_ref: String,
564    pub entry_kind: String,
565    #[serde(default, skip_serializing_if = "Option::is_none")]
566    pub entry_ref: Option<String>,
567    pub entry_name: String,
568    #[serde(default)]
569    pub nodes: Vec<TraceLashlangMapNode>,
570    #[serde(default)]
571    pub edges: Vec<TraceLashlangMapEdge>,
572}
573
574#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
575pub struct TraceLashlangMapNode {
576    pub id: String,
577    pub kind: String,
578    pub label: String,
579    #[serde(default, skip_serializing_if = "Option::is_none")]
580    pub label_metadata: Option<TraceLabelMetadata>,
581}
582
583#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
584pub struct TraceLabelMetadata {
585    pub title: String,
586    #[serde(default, skip_serializing_if = "Option::is_none")]
587    pub description: Option<String>,
588}
589
590#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
591pub struct TraceLashlangMapEdge {
592    pub id: String,
593    pub from: String,
594    pub to: String,
595    pub label: String,
596}
597
598#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
599pub struct TraceError {
600    pub message: String,
601    pub retryable: bool,
602    #[serde(default, skip_serializing_if = "Option::is_none")]
603    pub terminal_reason: Option<String>,
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub code: Option<String>,
606    #[serde(default, skip_serializing_if = "Option::is_none")]
607    pub raw: Option<String>,
608}
609
610#[derive(Debug, thiserror::Error)]
611pub enum TraceSinkError {
612    #[error("failed to serialize trace record: {0}")]
613    Serialize(#[from] serde_json::Error),
614    #[error("trace sink lock poisoned")]
615    LockPoisoned,
616    #[error("failed to create trace directory {path}: {source}")]
617    CreateDir { path: PathBuf, source: io::Error },
618    #[error("failed to open trace file {path}: {source}")]
619    Open { path: PathBuf, source: io::Error },
620    #[error("failed to write trace file {path}: {source}")]
621    Write { path: PathBuf, source: io::Error },
622}
623
624pub trait TraceSink: Send + Sync {
625    fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError>;
626
627    /// Force any buffered trace data this sink owns to durable storage.
628    ///
629    /// Hosts call this before process exit so records that a sink has not yet
630    /// committed are not lost. The default is a no-op: sinks that write each
631    /// record through on [`append`](Self::append) (or that delegate durability
632    /// to a host-owned exporter) have nothing of their own to flush. Sinks that
633    /// buffer — or that can force an `fsync` — override this.
634    fn flush(&self) -> Result<(), TraceSinkError> {
635        Ok(())
636    }
637}
638
639pub struct JsonlTraceSink {
640    path: PathBuf,
641    lock: Mutex<()>,
642}
643
644impl JsonlTraceSink {
645    pub fn new(path: impl Into<PathBuf>) -> Self {
646        Self {
647            path: path.into(),
648            lock: Mutex::new(()),
649        }
650    }
651
652    pub fn path(&self) -> &Path {
653        &self.path
654    }
655}
656
657impl TraceSink for JsonlTraceSink {
658    fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError> {
659        let line = serde_json::to_string(record)?;
660        let _guard = self.lock.lock().map_err(|_| TraceSinkError::LockPoisoned)?;
661        if let Some(parent) = self.path.parent()
662            && !parent.as_os_str().is_empty()
663        {
664            std::fs::create_dir_all(parent).map_err(|source| TraceSinkError::CreateDir {
665                path: parent.to_path_buf(),
666                source,
667            })?;
668        }
669        let mut file = OpenOptions::new()
670            .create(true)
671            .append(true)
672            .open(&self.path)
673            .map_err(|source| TraceSinkError::Open {
674                path: self.path.clone(),
675                source,
676            })?;
677        writeln!(file, "{line}").map_err(|source| TraceSinkError::Write {
678            path: self.path.clone(),
679            source,
680        })
681    }
682
683    /// `fsync` the trace file to durable storage.
684    ///
685    /// Each [`append`](Self::append) opens, appends, and closes the file, so a
686    /// record's bytes already reach the OS as it is written — this sink holds no
687    /// in-process buffer. `flush` goes one step further and forces an `fsync` so
688    /// the OS page cache is pushed to disk, which is the honest guarantee a host
689    /// wants before exit. If no record has been written yet the file may not
690    /// exist; that is a no-op rather than an error (nothing to sync), and we do
691    /// not create an empty file just to sync it.
692    fn flush(&self) -> Result<(), TraceSinkError> {
693        let _guard = self.lock.lock().map_err(|_| TraceSinkError::LockPoisoned)?;
694        match OpenOptions::new().append(true).open(&self.path) {
695            Ok(file) => file.sync_all().map_err(|source| TraceSinkError::Write {
696                path: self.path.clone(),
697                source,
698            }),
699            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
700            Err(source) => Err(TraceSinkError::Open {
701                path: self.path.clone(),
702                source,
703            }),
704        }
705    }
706}
707
708/// Writes each trace record as one JSON line to stderr — handy for `cargo run`
709/// debugging without a trace file.
710#[derive(Default)]
711pub struct StderrTraceSink {
712    lock: Mutex<()>,
713}
714
715impl TraceSink for StderrTraceSink {
716    fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError> {
717        let line = serde_json::to_string(record)?;
718        let _guard = self.lock.lock().map_err(|_| TraceSinkError::LockPoisoned)?;
719        eprintln!("{line}");
720        Ok(())
721    }
722}
723
724/// Fans each trace record out to several sinks in order (e.g. stderr + a JSONL
725/// file). Stops at the first sink that errors.
726pub struct TeeTraceSink {
727    sinks: Vec<Arc<dyn TraceSink>>,
728}
729
730impl TeeTraceSink {
731    pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TraceSink>>) -> Self {
732        Self {
733            sinks: sinks.into_iter().collect(),
734        }
735    }
736}
737
738impl TraceSink for TeeTraceSink {
739    fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError> {
740        for sink in &self.sinks {
741            sink.append(record)?;
742        }
743        Ok(())
744    }
745
746    /// Flush every wrapped sink, stopping at the first that errors.
747    fn flush(&self) -> Result<(), TraceSinkError> {
748        for sink in &self.sinks {
749            sink.flush()?;
750        }
751        Ok(())
752    }
753}
754
755pub fn sha256_hex(input: impl AsRef<[u8]>) -> String {
756    let mut hasher = Sha256::new();
757    hasher.update(input.as_ref());
758    format!("{:x}", hasher.finalize())
759}
760
761pub fn json_hash(value: &Value) -> String {
762    sha256_hex(serde_json::to_vec(value).unwrap_or_default())
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn jsonl_sink_writes_record() {
771        let dir = std::env::temp_dir().join(format!("lash-trace-{}", uuid::Uuid::new_v4()));
772        std::fs::create_dir_all(&dir).unwrap();
773        let path = dir.join("trace.jsonl");
774        let sink = JsonlTraceSink::new(&path);
775        sink.append(&TraceRecord::new(
776            TraceContext::default().for_session("root"),
777            TraceEvent::Custom {
778                name: "test.event".to_string(),
779                payload: serde_json::json!({"ok": true}),
780            },
781        ))
782        .unwrap();
783        let text = std::fs::read_to_string(&path).unwrap();
784        assert!(text.contains("\"type\":\"custom\""));
785        assert!(text.contains("\"session_id\":\"root\""));
786    }
787
788    #[test]
789    fn tool_start_and_frame_switch_records_are_jsonl_shaped() {
790        let started = TraceRecord::new(
791            TraceContext::default().for_session("root"),
792            TraceEvent::ToolCallStarted {
793                call_id: Some("call-1".to_string()),
794                name: "read_file".to_string(),
795                args: serde_json::json!({"path": "README.md"}),
796            },
797        );
798        let completed = TraceRecord::new(
799            TraceContext::default().for_session("root"),
800            TraceEvent::TurnCompleted {
801                status: "completed".to_string(),
802                done_reason: "modelstop".to_string(),
803                agent_frame_switch: Some(TraceAgentFrameSwitch {
804                    frame_id: "frame-1".to_string(),
805                }),
806            },
807        );
808
809        let started_json = serde_json::to_value(started).unwrap();
810        assert_eq!(started_json["type"], "tool_call_started");
811        assert_eq!(started_json["call_id"], "call-1");
812
813        let completed_json = serde_json::to_value(completed).unwrap();
814        assert_eq!(completed_json["type"], "turn_completed");
815        assert_eq!(completed_json["agent_frame_switch"]["frame_id"], "frame-1");
816    }
817
818    #[test]
819    fn lashlang_execution_records_are_jsonl_shaped() {
820        let identity = TraceLashlangExecutionIdentity {
821            scope: TraceRuntimeScope::new("s1"),
822            subject: TraceRuntimeSubject::Process {
823                process_id: "p1".to_string(),
824            },
825            module_ref: "module".to_string(),
826            entry_kind: "process".to_string(),
827            entry_ref: Some("component:0".to_string()),
828            entry_name: "main".to_string(),
829        };
830        let event = TraceLashlangExecutionEvent::NodeStarted {
831            event_key: "process:p1:node:n1:1:started".to_string(),
832            identity,
833            node_id: "n1".to_string(),
834            node_kind: "resource_operation".to_string(),
835            label: "read_file".to_string(),
836            occurrence: 1,
837        };
838        let record = TraceRecord::new(
839            TraceContext::default().for_session("s1"),
840            TraceEvent::LashlangExecution { event },
841        );
842
843        let json = serde_json::to_value(&record).expect("serialize lashlang execution");
844        assert_eq!(json["type"], "lashlang_execution");
845        assert_eq!(json["event"]["kind"], "node_started");
846        assert_eq!(json["event"]["event_key"], "process:p1:node:n1:1:started");
847
848        let round_trip =
849            serde_json::from_value::<TraceRecord>(json).expect("deserialize lashlang execution");
850        assert!(matches!(
851            round_trip.event,
852            TraceEvent::LashlangExecution {
853                event: TraceLashlangExecutionEvent::NodeStarted { .. }
854            }
855        ));
856    }
857
858    #[test]
859    fn tool_completion_serializes_typed_failure_output() {
860        let record = TraceRecord::new(
861            TraceContext::default().for_session("root"),
862            TraceEvent::ToolCallCompleted {
863                call_id: Some("call-1".to_string()),
864                name: "read_file".to_string(),
865                args: serde_json::json!({"path": "missing"}),
866                output: TraceToolCallOutput {
867                    outcome: TraceToolCallOutcome::Failure(serde_json::json!({
868                        "class": "invalid_request",
869                        "code": "invalid_tool_args",
870                        "message": "bad args",
871                        "source": "runtime",
872                        "retry": { "type": "never" },
873                        "raw": { "path": "missing" }
874                    })),
875                    control: None,
876                },
877                duration_ms: 3,
878            },
879        );
880
881        let json = serde_json::to_value(record).unwrap();
882        assert_eq!(json["type"], "tool_call_completed");
883        assert_eq!(json["output"]["outcome"]["status"], "failure");
884        assert_eq!(
885            json["output"]["outcome"]["payload"]["code"],
886            "invalid_tool_args"
887        );
888        assert_eq!(
889            json["output"]["outcome"]["payload"]["raw"]["path"],
890            "missing"
891        );
892    }
893
894    #[test]
895    fn jsonl_sink_creates_parent_directories() {
896        let dir = std::env::temp_dir().join(format!("lash-trace-{}", uuid::Uuid::new_v4()));
897        let path = dir.join("nested").join("trace.jsonl");
898        let sink = JsonlTraceSink::new(&path);
899        sink.append(&TraceRecord::new(
900            TraceContext::default().for_session("root"),
901            TraceEvent::RuntimeStreamEvent {
902                event: TraceRuntimeStreamEvent {
903                    sequence: 1,
904                    elapsed_ms: 0,
905                    event_name: "delta".to_string(),
906                    raw_text: Some("hello".to_string()),
907                    visible_text: Some("hello".to_string()),
908                    item_id: None,
909                    output_index: None,
910                    call_id: None,
911                    tool_name: None,
912                    input_json: None,
913                    usage: None,
914                },
915            },
916        ))
917        .unwrap();
918        assert!(path.exists());
919        let _ = std::fs::remove_dir_all(dir);
920    }
921}