Skip to main content

greentic_runner_host/trace/
recorder.rs

1use std::collections::VecDeque;
2use std::env;
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::time::Instant;
6
7use anyhow::{Context, Result};
8use parking_lot::Mutex;
9use serde_json::Value;
10
11use crate::runner::engine::{ExecutionObserver, NodeEvent};
12use crate::validate::ValidationIssue;
13
14use super::model::{TraceEnvelope, TraceError, TraceFlow, TraceHash, TracePack, TraceStep};
15
16const DEFAULT_TRACE_FILE: &str = "trace.json";
17const DEFAULT_BUFFER_SIZE: usize = 20;
18const HASH_ALGORITHM: &str = "blake3";
19
20#[derive(Clone, Debug)]
21pub struct TraceConfig {
22    pub mode: TraceMode,
23    pub out_path: PathBuf,
24    pub buffer_size: usize,
25    pub capture_inputs: bool,
26}
27
28impl TraceConfig {
29    pub fn from_env() -> Self {
30        let out_path = env::var_os("GREENTIC_TRACE_OUT")
31            .map(PathBuf::from)
32            .unwrap_or_else(|| PathBuf::from(DEFAULT_TRACE_FILE));
33        Self {
34            mode: TraceMode::On,
35            out_path,
36            buffer_size: DEFAULT_BUFFER_SIZE,
37            capture_inputs: env::var("GREENTIC_TRACE_CAPTURE_INPUTS").ok().as_deref() == Some("1"),
38        }
39    }
40
41    pub fn with_overrides(mut self, mode: TraceMode, out_path: Option<PathBuf>) -> Self {
42        self.mode = mode;
43        if let Some(path) = out_path {
44            self.out_path = path;
45        }
46        self
47    }
48
49    pub fn with_capture_inputs(mut self, capture: bool) -> Self {
50        self.capture_inputs = capture;
51        self
52    }
53}
54
55#[derive(Copy, Clone, Debug, PartialEq, Eq)]
56pub enum TraceMode {
57    Off,
58    On,
59    Always,
60}
61
62#[derive(Clone, Debug)]
63pub struct PackTraceInfo {
64    pub pack_ref: String,
65    pub resolved_digest: Option<String>,
66}
67
68#[derive(Clone, Debug)]
69pub struct TraceContext {
70    pub pack_ref: String,
71    pub resolved_digest: Option<String>,
72    pub flow_id: String,
73    pub flow_version: String,
74}
75
76pub struct TraceRecorder {
77    config: TraceConfig,
78    context: TraceContext,
79    state: Mutex<TraceState>,
80}
81
82struct TraceState {
83    buffer: VecDeque<TraceStep>,
84    in_flight: Option<InFlightStep>,
85    flushed: bool,
86}
87
88struct InFlightStep {
89    node_id: String,
90    component_id: String,
91    operation: String,
92    input_hash: TraceHash,
93    started_at: Instant,
94    validation_issues: Vec<ValidationIssue>,
95    invocation_json: Option<Value>,
96}
97
98impl TraceRecorder {
99    pub fn new(config: TraceConfig, context: TraceContext) -> Self {
100        Self {
101            config,
102            context,
103            state: Mutex::new(TraceState {
104                buffer: VecDeque::new(),
105                in_flight: None,
106                flushed: false,
107            }),
108        }
109    }
110
111    pub fn mode(&self) -> TraceMode {
112        self.config.mode
113    }
114
115    pub fn flush_success(&self) -> Result<()> {
116        if self.config.mode != TraceMode::Always {
117            return Ok(());
118        }
119        self.flush_with_steps(None)
120    }
121
122    pub fn flush_error(&self, err: &dyn std::error::Error) -> Result<()> {
123        if self.config.mode == TraceMode::Off {
124            return Ok(());
125        }
126        self.flush_with_steps(Some(err))
127    }
128
129    pub fn flush_buffer(&self) -> Result<()> {
130        if self.config.mode == TraceMode::Off {
131            return Ok(());
132        }
133        self.flush_with_steps(None)
134    }
135
136    fn flush_with_steps(&self, fallback_error: Option<&dyn std::error::Error>) -> Result<()> {
137        let mut state = self.state.lock();
138        if state.flushed {
139            return Ok(());
140        }
141        if let Some(err) = fallback_error {
142            let step = if let Some(in_flight) = state.in_flight.take() {
143                TraceStep {
144                    node_id: in_flight.node_id,
145                    component_id: in_flight.component_id,
146                    operation: in_flight.operation,
147                    input_hash: in_flight.input_hash,
148                    invocation_json: in_flight.invocation_json,
149                    invocation_path: None,
150                    output_hash: None,
151                    state_delta_hash: None,
152                    duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
153                    validation_issues: in_flight.validation_issues,
154                    error: Some(TraceError {
155                        code: "node_error".to_string(),
156                        message: err.to_string(),
157                        details: Value::Null,
158                    }),
159                }
160            } else {
161                TraceStep {
162                    node_id: "unknown".to_string(),
163                    component_id: "unknown".to_string(),
164                    operation: "unknown".to_string(),
165                    input_hash: hash_value(&Value::Null),
166                    invocation_json: None,
167                    invocation_path: None,
168                    output_hash: None,
169                    state_delta_hash: None,
170                    duration_ms: 0,
171                    validation_issues: Vec::new(),
172                    error: Some(TraceError {
173                        code: "flow_error".to_string(),
174                        message: err.to_string(),
175                        details: Value::Null,
176                    }),
177                }
178            };
179            state.buffer.push_back(step);
180            while state.buffer.len() > self.config.buffer_size {
181                state.buffer.pop_front();
182            }
183        }
184        let steps = state.buffer.iter().cloned().collect::<Vec<_>>();
185        state.flushed = true;
186        drop(state);
187        let trace = self.build_trace(steps);
188        write_trace_atomic(&self.config.out_path, &trace)?;
189        Ok(())
190    }
191
192    fn build_trace(&self, steps: Vec<TraceStep>) -> TraceEnvelope {
193        TraceEnvelope {
194            trace_version: 1,
195            runner_version: Some(env!("CARGO_PKG_VERSION").to_string()),
196            git_sha: git_sha(),
197            pack: TracePack {
198                pack_ref: self.context.pack_ref.clone(),
199                resolved_digest: self.context.resolved_digest.clone(),
200            },
201            flow: TraceFlow {
202                id: self.context.flow_id.clone(),
203                version: self.context.flow_version.clone(),
204            },
205            steps,
206        }
207    }
208}
209
210impl ExecutionObserver for TraceRecorder {
211    fn on_node_start(&self, event: &NodeEvent<'_>) {
212        if self.config.mode == TraceMode::Off {
213            return;
214        }
215        let operation = event
216            .node
217            .operation_name()
218            .or_else(|| event.node.operation_in_mapping())
219            .unwrap_or("unknown")
220            .to_string();
221        let input_hash = hash_value(event.payload);
222        let component_id = event.node.component_id().to_string();
223        let mut state = self.state.lock();
224        state.in_flight = Some(InFlightStep {
225            node_id: event.node_id.to_string(),
226            component_id: component_id.clone(),
227            operation,
228            input_hash,
229            started_at: Instant::now(),
230            validation_issues: Vec::new(),
231            invocation_json: if self.config.capture_inputs {
232                Some(build_invocation(event, &component_id))
233            } else {
234                None
235            },
236        });
237    }
238
239    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
240        if self.config.mode == TraceMode::Off {
241            return;
242        }
243        let output_hash = hash_value(output);
244        let mut state = self.state.lock();
245        let step = if let Some(in_flight) = state.in_flight.take() {
246            TraceStep {
247                node_id: in_flight.node_id,
248                component_id: in_flight.component_id,
249                operation: in_flight.operation,
250                input_hash: in_flight.input_hash,
251                invocation_json: in_flight.invocation_json,
252                invocation_path: None,
253                output_hash: Some(output_hash),
254                state_delta_hash: None,
255                duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
256                validation_issues: in_flight.validation_issues,
257                error: None,
258            }
259        } else {
260            TraceStep {
261                node_id: event.node_id.to_string(),
262                component_id: event.node.component_id().to_string(),
263                operation: event.node.operation_name().unwrap_or("unknown").to_string(),
264                input_hash: hash_value(event.payload),
265                invocation_json: if self.config.capture_inputs {
266                    Some(build_invocation(event, event.node.component_id()))
267                } else {
268                    None
269                },
270                invocation_path: None,
271                output_hash: Some(output_hash),
272                state_delta_hash: None,
273                duration_ms: 0,
274                validation_issues: Vec::new(),
275                error: None,
276            }
277        };
278        state.buffer.push_back(step);
279        while state.buffer.len() > self.config.buffer_size {
280            state.buffer.pop_front();
281        }
282    }
283
284    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
285        if self.config.mode == TraceMode::Off {
286            return;
287        }
288        let mut state = self.state.lock();
289        let step = if let Some(in_flight) = state.in_flight.take() {
290            TraceStep {
291                node_id: in_flight.node_id,
292                component_id: in_flight.component_id,
293                operation: in_flight.operation,
294                input_hash: in_flight.input_hash,
295                invocation_json: in_flight.invocation_json,
296                invocation_path: None,
297                output_hash: None,
298                state_delta_hash: None,
299                duration_ms: in_flight.started_at.elapsed().as_millis() as u64,
300                validation_issues: in_flight.validation_issues,
301                error: Some(TraceError {
302                    code: "node_error".to_string(),
303                    message: error.to_string(),
304                    details: Value::Null,
305                }),
306            }
307        } else {
308            TraceStep {
309                node_id: event.node_id.to_string(),
310                component_id: event.node.component_id().to_string(),
311                operation: event.node.operation_name().unwrap_or("unknown").to_string(),
312                input_hash: hash_value(event.payload),
313                invocation_json: if self.config.capture_inputs {
314                    Some(build_invocation(event, event.node.component_id()))
315                } else {
316                    None
317                },
318                invocation_path: None,
319                output_hash: None,
320                state_delta_hash: None,
321                duration_ms: 0,
322                validation_issues: Vec::new(),
323                error: Some(TraceError {
324                    code: "node_error".to_string(),
325                    message: error.to_string(),
326                    details: Value::Null,
327                }),
328            }
329        };
330        state.buffer.push_back(step);
331        while state.buffer.len() > self.config.buffer_size {
332            state.buffer.pop_front();
333        }
334        drop(state);
335        if let Err(err) = self.flush_buffer() {
336            tracing::warn!(error = %err, "failed to write trace");
337        }
338    }
339
340    fn on_validation(&self, _event: &NodeEvent<'_>, issues: &[ValidationIssue]) {
341        if self.config.mode == TraceMode::Off || issues.is_empty() {
342            return;
343        }
344        let mut state = self.state.lock();
345        if let Some(in_flight) = state.in_flight.as_mut() {
346            in_flight.validation_issues.extend_from_slice(issues);
347        }
348    }
349}
350
351fn hash_value(value: &Value) -> TraceHash {
352    let bytes = serde_json::to_vec(value).unwrap_or_default();
353    let digest = blake3::hash(&bytes).to_hex().to_string();
354    TraceHash {
355        algorithm: HASH_ALGORITHM.to_string(),
356        value: digest,
357    }
358}
359
360fn build_invocation(event: &NodeEvent<'_>, component_id: &str) -> Value {
361    serde_json::json!({
362        "component_id": component_id,
363        "operation": event
364            .node
365            .operation_name()
366            .or_else(|| event.node.operation_in_mapping())
367            .unwrap_or("unknown")
368            .to_string(),
369        "payload": event.payload,
370    })
371}
372
373fn write_trace_atomic(path: &Path, trace: &TraceEnvelope) -> Result<()> {
374    if let Some(parent) = path.parent() {
375        fs::create_dir_all(parent)
376            .with_context(|| format!("failed to create {}", parent.display()))?;
377    }
378    let file_name = path
379        .file_name()
380        .and_then(|name| name.to_str())
381        .unwrap_or(DEFAULT_TRACE_FILE);
382    let tmp = path.with_file_name(format!("{file_name}.tmp"));
383    let payload = serde_json::to_vec_pretty(trace).context("serialize trace")?;
384    fs::write(&tmp, payload).with_context(|| format!("write {}", tmp.display()))?;
385    fs::rename(&tmp, path)
386        .with_context(|| format!("rename {} -> {}", tmp.display(), path.display()))?;
387    Ok(())
388}
389
390fn git_sha() -> Option<String> {
391    env::var("GIT_SHA")
392        .ok()
393        .or_else(|| env::var("GITHUB_SHA").ok())
394        .map(|value| value.trim().to_string())
395        .filter(|value| !value.is_empty())
396}