simple_agents_workflow/
recorder.rs1use std::sync::{Arc, Mutex};
2
3use thiserror::Error;
4
5use crate::trace::{
6 TraceEvent, TraceEventKind, TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata,
7};
8
9#[derive(Debug, Clone)]
11pub struct TraceRecorder {
12 state: Arc<Mutex<TraceRecorderState>>,
13}
14
15#[derive(Debug, Clone)]
16struct TraceRecorderState {
17 trace: WorkflowTrace,
18 next_seq: u64,
19 finalized: bool,
20}
21
22impl TraceRecorder {
23 pub fn new(metadata: WorkflowTraceMetadata) -> Self {
25 Self {
26 state: Arc::new(Mutex::new(TraceRecorderState {
27 trace: WorkflowTrace {
28 metadata,
29 events: Vec::new(),
30 },
31 next_seq: 0,
32 finalized: false,
33 })),
34 }
35 }
36
37 pub fn append_event(
39 &self,
40 timestamp_unix_ms: u64,
41 kind: TraceEventKind,
42 ) -> Result<TraceEvent, TraceRecordError> {
43 let mut state = self
44 .state
45 .lock()
46 .unwrap_or_else(|poisoned| poisoned.into_inner());
47
48 if state.finalized {
49 return Err(TraceRecordError::AlreadyFinalized);
50 }
51
52 let event = TraceEvent {
53 seq: state.next_seq,
54 timestamp_unix_ms,
55 kind,
56 };
57
58 state.next_seq = state.next_seq.saturating_add(1);
59 state.trace.events.push(event.clone());
60 Ok(event)
61 }
62
63 pub fn record_node_enter(
65 &self,
66 timestamp_unix_ms: u64,
67 node_id: impl Into<String>,
68 ) -> Result<TraceEvent, TraceRecordError> {
69 self.append_event(
70 timestamp_unix_ms,
71 TraceEventKind::NodeEnter {
72 node_id: node_id.into(),
73 },
74 )
75 }
76
77 pub fn record_node_exit(
79 &self,
80 timestamp_unix_ms: u64,
81 node_id: impl Into<String>,
82 ) -> Result<TraceEvent, TraceRecordError> {
83 self.append_event(
84 timestamp_unix_ms,
85 TraceEventKind::NodeExit {
86 node_id: node_id.into(),
87 },
88 )
89 }
90
91 pub fn record_node_error(
93 &self,
94 timestamp_unix_ms: u64,
95 node_id: impl Into<String>,
96 message: impl Into<String>,
97 ) -> Result<TraceEvent, TraceRecordError> {
98 self.append_event(
99 timestamp_unix_ms,
100 TraceEventKind::NodeError {
101 node_id: node_id.into(),
102 message: message.into(),
103 },
104 )
105 }
106
107 pub fn record_terminal(
109 &self,
110 timestamp_unix_ms: u64,
111 status: TraceTerminalStatus,
112 ) -> Result<TraceEvent, TraceRecordError> {
113 self.append_event(timestamp_unix_ms, TraceEventKind::Terminal { status })
114 }
115
116 pub fn snapshot(&self) -> WorkflowTrace {
118 let state = self
119 .state
120 .lock()
121 .unwrap_or_else(|poisoned| poisoned.into_inner());
122 state.trace.clone()
123 }
124
125 pub fn finalize(&self, finished_at_unix_ms: u64) -> Result<WorkflowTrace, TraceRecordError> {
127 let mut state = self
128 .state
129 .lock()
130 .unwrap_or_else(|poisoned| poisoned.into_inner());
131
132 if state.finalized {
133 return Err(TraceRecordError::AlreadyFinalized);
134 }
135
136 state.trace.metadata.finished_at_unix_ms = Some(finished_at_unix_ms);
137 state.finalized = true;
138 Ok(state.trace.clone())
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
144pub enum TraceRecordError {
145 #[error("trace recorder is already finalized")]
147 AlreadyFinalized,
148}