1use std::collections::VecDeque;
2use std::env;
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use parking_lot::Mutex;
9use serde_json::Value;
10
11use crate::runner::engine::{ExecutionObserver, NodeEvent};
12use crate::validate::ValidationIssue;
13
14use super::model::{TraceEnvelope, TraceError, TraceFlow, TraceHash, TracePack, TraceStep};
15
16const DEFAULT_TRACE_FILE: &str = "trace.json";
17const DEFAULT_BUFFER_SIZE: usize = 20;
18const HASH_ALGORITHM: &str = "blake3";
19
20#[derive(Clone, Debug)]
21pub struct TraceConfig {
22 pub mode: TraceMode,
23 pub out_path: PathBuf,
24 pub buffer_size: usize,
25 pub capture_inputs: bool,
26}
27
28impl TraceConfig {
29 pub fn from_env() -> Self {
30 let out_path = env::var_os("GREENTIC_TRACE_OUT")
31 .map(PathBuf::from)
32 .unwrap_or_else(|| PathBuf::from(DEFAULT_TRACE_FILE));
33 Self {
34 mode: TraceMode::On,
35 out_path,
36 buffer_size: DEFAULT_BUFFER_SIZE,
37 capture_inputs: env::var("GREENTIC_TRACE_CAPTURE_INPUTS").ok().as_deref() == Some("1"),
38 }
39 }
40
41 pub fn with_overrides(mut self, mode: TraceMode, out_path: Option<PathBuf>) -> Self {
42 self.mode = mode;
43 if let Some(path) = out_path {
44 self.out_path = path;
45 }
46 self
47 }
48
49 pub fn with_capture_inputs(mut self, capture: bool) -> Self {
50 self.capture_inputs = capture;
51 self
52 }
53}
54
55#[derive(Copy, Clone, Debug, PartialEq, Eq)]
56pub enum TraceMode {
57 Off,
58 On,
59 Always,
60}
61
62#[derive(Clone, Debug)]
63pub struct PackTraceInfo {
64 pub pack_ref: String,
65 pub resolved_digest: Option<String>,
66}
67
68#[derive(Clone, Debug)]
69pub struct TraceContext {
70 pub pack_ref: String,
71 pub resolved_digest: Option<String>,
72 pub flow_id: String,
73 pub flow_version: String,
74}
75
76pub struct TraceRecorder {
77 config: TraceConfig,
78 context: TraceContext,
79 state: Mutex<TraceState>,
80}
81
82struct TraceState {
83 buffer: VecDeque<TraceStep>,
84 in_flight: Option<InFlightStep>,
85 flushed: bool,
86}
87
88struct InFlightStep {
89 node_id: String,
90 component_id: String,
91 operation: String,
92 input_hash: TraceHash,
93 started_at: Instant,
94 validation_issues: Vec<ValidationIssue>,
95 invocation_json: Option<Value>,
96}
97
98impl TraceRecorder {
99 pub fn new(config: TraceConfig, context: TraceContext) -> Self {
100 Self {
101 config,
102 context,
103 state: Mutex::new(TraceState {
104 buffer: VecDeque::new(),
105 in_flight: None,
106 flushed: false,
107 }),
108 }
109 }
110
111 pub fn mode(&self) -> TraceMode {
112 self.config.mode
113 }
114
115 pub fn flush_success(&self) -> Result<()> {
116 if self.config.mode != TraceMode::Always {
117 return Ok(());
118 }
119 self.flush_with_steps(None)
120 }
121
122 pub fn flush_error(&self, err: &dyn std::error::Error) -> Result<()> {
123 if self.config.mode == TraceMode::Off {
124 return Ok(());
125 }
126 self.flush_with_steps(Some(err))
127 }
128
129 pub fn flush_buffer(&self) -> Result<()> {
130 if self.config.mode == TraceMode::Off {
131 return Ok(());
132 }
133 self.flush_with_steps(None)
134 }
135
136 fn flush_with_steps(&self, fallback_error: Option<&dyn std::error::Error>) -> Result<()> {
137 let mut state = self.state.lock();
138 if state.flushed {
139 return Ok(());
140 }
141 if let Some(err) = fallback_error {
142 let step = if let Some(in_flight) = state.in_flight.take() {
143 TraceStep {
144 node_id: in_flight.node_id,
145 component_id: in_flight.component_id,
146 operation: in_flight.operation,
147 input_hash: in_flight.input_hash,
148 invocation_json: in_flight.invocation_json,
149 invocation_path: None,
150 output_hash: None,
151 state_delta_hash: None,
152 duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
153 validation_issues: in_flight.validation_issues,
154 error: Some(TraceError {
155 code: "node_error".to_string(),
156 message: err.to_string(),
157 details: Value::Null,
158 }),
159 }
160 } else {
161 TraceStep {
162 node_id: "unknown".to_string(),
163 component_id: "unknown".to_string(),
164 operation: "unknown".to_string(),
165 input_hash: hash_value(&Value::Null),
166 invocation_json: None,
167 invocation_path: None,
168 output_hash: None,
169 state_delta_hash: None,
170 duration_ms: 0,
171 validation_issues: Vec::new(),
172 error: Some(TraceError {
173 code: "flow_error".to_string(),
174 message: err.to_string(),
175 details: Value::Null,
176 }),
177 }
178 };
179 state.buffer.push_back(step);
180 while state.buffer.len() > self.config.buffer_size {
181 state.buffer.pop_front();
182 }
183 }
184 let steps = state.buffer.iter().cloned().collect::<Vec<_>>();
185 state.flushed = true;
186 drop(state);
187 let trace = self.build_trace(steps);
188 write_trace_atomic(&self.config.out_path, &trace)?;
189 Ok(())
190 }
191
192 fn build_trace(&self, steps: Vec<TraceStep>) -> TraceEnvelope {
193 TraceEnvelope {
194 trace_version: 1,
195 runner_version: Some(env!("CARGO_PKG_VERSION").to_string()),
196 git_sha: git_sha(),
197 pack: TracePack {
198 pack_ref: self.context.pack_ref.clone(),
199 resolved_digest: self.context.resolved_digest.clone(),
200 },
201 flow: TraceFlow {
202 id: self.context.flow_id.clone(),
203 version: self.context.flow_version.clone(),
204 },
205 steps,
206 }
207 }
208}
209
210impl ExecutionObserver for TraceRecorder {
211 fn on_node_start(&self, event: &NodeEvent<'_>) {
212 if self.config.mode == TraceMode::Off {
213 return;
214 }
215 let operation = event
216 .node
217 .operation_name()
218 .or_else(|| event.node.operation_in_mapping())
219 .unwrap_or("unknown")
220 .to_string();
221 let input_hash = hash_value(event.payload);
222 let component_id = event.node.component_id().to_string();
223 let mut state = self.state.lock();
224 state.in_flight = Some(InFlightStep {
225 node_id: event.node_id.to_string(),
226 component_id: component_id.clone(),
227 operation,
228 input_hash,
229 started_at: Instant::now(),
230 validation_issues: Vec::new(),
231 invocation_json: if self.config.capture_inputs {
232 Some(build_invocation(event, &component_id))
233 } else {
234 None
235 },
236 });
237 }
238
239 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
240 if self.config.mode == TraceMode::Off {
241 return;
242 }
243 let output_hash = hash_value(output);
244 let mut state = self.state.lock();
245 let step = if let Some(in_flight) = state.in_flight.take() {
246 TraceStep {
247 node_id: in_flight.node_id,
248 component_id: in_flight.component_id,
249 operation: in_flight.operation,
250 input_hash: in_flight.input_hash,
251 invocation_json: in_flight.invocation_json,
252 invocation_path: None,
253 output_hash: Some(output_hash),
254 state_delta_hash: None,
255 duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
256 validation_issues: in_flight.validation_issues,
257 error: None,
258 }
259 } else {
260 TraceStep {
261 node_id: event.node_id.to_string(),
262 component_id: event.node.component_id().to_string(),
263 operation: event.node.operation_name().unwrap_or("unknown").to_string(),
264 input_hash: hash_value(event.payload),
265 invocation_json: if self.config.capture_inputs {
266 Some(build_invocation(event, event.node.component_id()))
267 } else {
268 None
269 },
270 invocation_path: None,
271 output_hash: Some(output_hash),
272 state_delta_hash: None,
273 duration_ms: 0,
274 validation_issues: Vec::new(),
275 error: None,
276 }
277 };
278 state.buffer.push_back(step);
279 while state.buffer.len() > self.config.buffer_size {
280 state.buffer.pop_front();
281 }
282 }
283
284 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
285 if self.config.mode == TraceMode::Off {
286 return;
287 }
288 let mut state = self.state.lock();
289 let step = if let Some(in_flight) = state.in_flight.take() {
290 TraceStep {
291 node_id: in_flight.node_id,
292 component_id: in_flight.component_id,
293 operation: in_flight.operation,
294 input_hash: in_flight.input_hash,
295 invocation_json: in_flight.invocation_json,
296 invocation_path: None,
297 output_hash: None,
298 state_delta_hash: None,
299 duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
300 validation_issues: in_flight.validation_issues,
301 error: Some(TraceError {
302 code: "node_error".to_string(),
303 message: error.to_string(),
304 details: Value::Null,
305 }),
306 }
307 } else {
308 TraceStep {
309 node_id: event.node_id.to_string(),
310 component_id: event.node.component_id().to_string(),
311 operation: event.node.operation_name().unwrap_or("unknown").to_string(),
312 input_hash: hash_value(event.payload),
313 invocation_json: if self.config.capture_inputs {
314 Some(build_invocation(event, event.node.component_id()))
315 } else {
316 None
317 },
318 invocation_path: None,
319 output_hash: None,
320 state_delta_hash: None,
321 duration_ms: 0,
322 validation_issues: Vec::new(),
323 error: Some(TraceError {
324 code: "node_error".to_string(),
325 message: error.to_string(),
326 details: Value::Null,
327 }),
328 }
329 };
330 state.buffer.push_back(step);
331 while state.buffer.len() > self.config.buffer_size {
332 state.buffer.pop_front();
333 }
334 drop(state);
335 if let Err(err) = self.flush_buffer() {
336 tracing::warn!(error = %err, "failed to write trace");
337 }
338 }
339
340 fn on_validation(&self, _event: &NodeEvent<'_>, issues: &[ValidationIssue]) {
341 if self.config.mode == TraceMode::Off || issues.is_empty() {
342 return;
343 }
344 let mut state = self.state.lock();
345 if let Some(in_flight) = state.in_flight.as_mut() {
346 in_flight.validation_issues.extend_from_slice(issues);
347 }
348 }
349}
350
351fn hash_value(value: &Value) -> TraceHash {
352 let bytes = serde_json::to_vec(value).unwrap_or_default();
353 let digest = blake3::hash(&bytes).to_hex().to_string();
354 TraceHash {
355 algorithm: HASH_ALGORITHM.to_string(),
356 value: digest,
357 }
358}
359
360fn build_invocation(event: &NodeEvent<'_>, component_id: &str) -> Value {
361 serde_json::json!({
362 "component_id": component_id,
363 "operation": event
364 .node
365 .operation_name()
366 .or_else(|| event.node.operation_in_mapping())
367 .unwrap_or("unknown")
368 .to_string(),
369 "payload": event.payload,
370 })
371}
372
373fn write_trace_atomic(path: &Path, trace: &TraceEnvelope) -> Result<()> {
374 if let Some(parent) = path.parent() {
375 fs::create_dir_all(parent)
376 .with_context(|| format!("failed to create {}", parent.display()))?;
377 }
378 let file_name = path
379 .file_name()
380 .and_then(|name| name.to_str())
381 .unwrap_or(DEFAULT_TRACE_FILE);
382 let tmp = path.with_file_name(format!("{file_name}.tmp"));
383 let payload = serde_json::to_vec_pretty(trace).context("serialize trace")?;
384 fs::write(&tmp, payload).with_context(|| format!("write {}", tmp.display()))?;
385 fs::rename(&tmp, path)
386 .with_context(|| format!("rename {} -> {}", tmp.display(), path.display()))?;
387 Ok(())
388}
389
390fn git_sha() -> Option<String> {
391 env::var("GIT_SHA")
392 .ok()
393 .or_else(|| env::var("GITHUB_SHA").ok())
394 .map(|value| value.trim().to_string())
395 .filter(|value| !value.is_empty())
396}