Skip to main content

simple_agents_workflow/
recorder.rs

1use std::sync::{Arc, Mutex};
2
3use thiserror::Error;
4
5use crate::trace::{
6    TraceEvent, TraceEventKind, TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata,
7};
8
9/// Thread-safe in-memory recorder for workflow trace events.
10#[derive(Debug, Clone)]
11pub struct TraceRecorder {
12    state: Arc<Mutex<TraceRecorderState>>,
13}
14
15#[derive(Debug, Clone)]
16struct TraceRecorderState {
17    trace: WorkflowTrace,
18    next_seq: u64,
19    finalized: bool,
20}
21
22impl TraceRecorder {
23    /// Creates a new recorder with empty event history.
24    pub fn new(metadata: WorkflowTraceMetadata) -> Self {
25        Self {
26            state: Arc::new(Mutex::new(TraceRecorderState {
27                trace: WorkflowTrace {
28                    metadata,
29                    events: Vec::new(),
30                },
31                next_seq: 0,
32                finalized: false,
33            })),
34        }
35    }
36
37    /// Appends an arbitrary event to the trace.
38    pub fn append_event(
39        &self,
40        timestamp_unix_ms: u64,
41        kind: TraceEventKind,
42    ) -> Result<TraceEvent, TraceRecordError> {
43        let mut state = self
44            .state
45            .lock()
46            .unwrap_or_else(|poisoned| poisoned.into_inner());
47
48        if state.finalized {
49            return Err(TraceRecordError::AlreadyFinalized);
50        }
51
52        let event = TraceEvent {
53            seq: state.next_seq,
54            timestamp_unix_ms,
55            kind,
56        };
57
58        state.next_seq = state.next_seq.saturating_add(1);
59        state.trace.events.push(event.clone());
60        Ok(event)
61    }
62
63    /// Appends a node enter event.
64    pub fn record_node_enter(
65        &self,
66        timestamp_unix_ms: u64,
67        node_id: impl Into<String>,
68    ) -> Result<TraceEvent, TraceRecordError> {
69        self.append_event(
70            timestamp_unix_ms,
71            TraceEventKind::NodeEnter {
72                node_id: node_id.into(),
73            },
74        )
75    }
76
77    /// Appends a node exit event.
78    pub fn record_node_exit(
79        &self,
80        timestamp_unix_ms: u64,
81        node_id: impl Into<String>,
82    ) -> Result<TraceEvent, TraceRecordError> {
83        self.append_event(
84            timestamp_unix_ms,
85            TraceEventKind::NodeExit {
86                node_id: node_id.into(),
87            },
88        )
89    }
90
91    /// Appends a node error event.
92    pub fn record_node_error(
93        &self,
94        timestamp_unix_ms: u64,
95        node_id: impl Into<String>,
96        message: impl Into<String>,
97    ) -> Result<TraceEvent, TraceRecordError> {
98        self.append_event(
99            timestamp_unix_ms,
100            TraceEventKind::NodeError {
101                node_id: node_id.into(),
102                message: message.into(),
103            },
104        )
105    }
106
107    /// Appends a workflow terminal event.
108    pub fn record_terminal(
109        &self,
110        timestamp_unix_ms: u64,
111        status: TraceTerminalStatus,
112    ) -> Result<TraceEvent, TraceRecordError> {
113        self.append_event(timestamp_unix_ms, TraceEventKind::Terminal { status })
114    }
115
116    /// Returns a snapshot of the current trace.
117    pub fn snapshot(&self) -> WorkflowTrace {
118        let state = self
119            .state
120            .lock()
121            .unwrap_or_else(|poisoned| poisoned.into_inner());
122        state.trace.clone()
123    }
124
125    /// Finalizes the recorder and returns the immutable trace.
126    pub fn finalize(&self, finished_at_unix_ms: u64) -> Result<WorkflowTrace, TraceRecordError> {
127        let mut state = self
128            .state
129            .lock()
130            .unwrap_or_else(|poisoned| poisoned.into_inner());
131
132        if state.finalized {
133            return Err(TraceRecordError::AlreadyFinalized);
134        }
135
136        state.trace.metadata.finished_at_unix_ms = Some(finished_at_unix_ms);
137        state.finalized = true;
138        Ok(state.trace.clone())
139    }
140}
141
142/// Trace recorder failures.
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
144pub enum TraceRecordError {
145    /// Mutation attempted after recorder finalization.
146    #[error("trace recorder is already finalized")]
147    AlreadyFinalized,
148}