Skip to main content

pylon_functions/
trace.rs

1//! Automatic instrumentation for function executions.
2//!
3//! Every function call produces a [`FnTrace`] with zero developer effort.
4//! The Rust runtime timestamps each protocol message as it passes through,
5//! building a complete trace of all DB operations, stream chunks, scheduled
6//! functions, and the final outcome.
7
8use std::time::{Duration, Instant};
9
10use serde::Serialize;
11
12use crate::protocol::{DbOp, FnType};
13
14// ---------------------------------------------------------------------------
15// Trace types
16// ---------------------------------------------------------------------------
17
18/// A complete trace of a single function execution.
19#[derive(Debug, Clone, Serialize)]
20pub struct FnTrace {
21    pub call_id: String,
22    pub fn_name: String,
23    pub fn_type: FnType,
24    pub user_id: Option<String>,
25    pub started_at: u64,
26    pub duration_ms: f64,
27    pub outcome: FnOutcome,
28    pub ops: Vec<OpTrace>,
29    pub stream_bytes: u64,
30    pub stream_chunks: u32,
31    pub schedules: Vec<ScheduleTrace>,
32}
33
34/// How the function completed.
35#[derive(Debug, Clone, Serialize)]
36#[serde(tag = "status")]
37pub enum FnOutcome {
38    #[serde(rename = "ok")]
39    Ok {
40        #[serde(skip_serializing_if = "Option::is_none")]
41        value: Option<serde_json::Value>,
42    },
43    #[serde(rename = "error")]
44    Error { code: String, message: String },
45    #[serde(rename = "rolled_back")]
46    RolledBack { code: String, message: String },
47}
48
49/// Trace of a single DB operation within a function.
50#[derive(Debug, Clone, Serialize)]
51pub struct OpTrace {
52    pub op: DbOp,
53    pub entity: String,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub id: Option<String>,
56    pub duration_ms: f64,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub row_count: Option<usize>,
59    pub ok: bool,
60}
61
62/// Trace of a scheduled function call.
63#[derive(Debug, Clone, Serialize)]
64pub struct ScheduleTrace {
65    pub fn_name: String,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub delay_ms: Option<u64>,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub run_at: Option<u64>,
70}
71
72// ---------------------------------------------------------------------------
73// Trace builder — accumulates during execution
74// ---------------------------------------------------------------------------
75
76/// Accumulates trace data during a function execution.
77///
78/// Created at the start of each function call. Each protocol message
79/// updates the builder. When the function completes, `finish()` produces
80/// the final [`FnTrace`].
81pub struct TraceBuilder {
82    call_id: String,
83    fn_name: String,
84    fn_type: FnType,
85    pub(crate) user_id: Option<String>,
86    /// Active tenant at call time. Threaded through so nested calls
87    /// (action → mutation) can inherit it when row-level policies gate
88    /// every write the action emits.
89    pub(crate) tenant_id: Option<String>,
90    started_at: u64,
91    start_instant: Instant,
92    ops: Vec<OpTrace>,
93    stream_bytes: u64,
94    stream_chunks: u32,
95    schedules: Vec<ScheduleTrace>,
96}
97
98impl TraceBuilder {
99    pub fn new(call_id: String, fn_name: String, fn_type: FnType, user_id: Option<String>) -> Self {
100        Self::new_with_tenant(call_id, fn_name, fn_type, user_id, None)
101    }
102
103    pub fn new_with_tenant(
104        call_id: String,
105        fn_name: String,
106        fn_type: FnType,
107        user_id: Option<String>,
108        tenant_id: Option<String>,
109    ) -> Self {
110        let now_epoch = std::time::SystemTime::now()
111            .duration_since(std::time::UNIX_EPOCH)
112            .unwrap_or_default()
113            .as_millis() as u64;
114
115        Self {
116            call_id,
117            fn_name,
118            fn_type,
119            user_id,
120            tenant_id,
121            started_at: now_epoch,
122            start_instant: Instant::now(),
123            ops: Vec::new(),
124            stream_bytes: 0,
125            stream_chunks: 0,
126            schedules: Vec::new(),
127        }
128    }
129
130    /// Tenant at call time. Used by the nested-call path in the runner to
131    /// carry tenant id down to helper mutations an action invokes.
132    pub fn tenant_id(&self) -> Option<&str> {
133        self.tenant_id.as_deref()
134    }
135
136    /// Record a completed DB operation.
137    pub fn record_op(
138        &mut self,
139        op: DbOp,
140        entity: &str,
141        id: Option<&str>,
142        duration: Duration,
143        row_count: Option<usize>,
144        ok: bool,
145    ) {
146        self.ops.push(OpTrace {
147            op,
148            entity: entity.to_string(),
149            id: id.map(|s| s.to_string()),
150            duration_ms: duration.as_secs_f64() * 1000.0,
151            row_count,
152            ok,
153        });
154    }
155
156    /// Record a stream chunk sent to the client.
157    pub fn record_stream_chunk(&mut self, bytes: usize) {
158        self.stream_bytes += bytes as u64;
159        self.stream_chunks += 1;
160    }
161
162    /// Record a scheduled function.
163    pub fn record_schedule(&mut self, fn_name: &str, delay_ms: Option<u64>, run_at: Option<u64>) {
164        self.schedules.push(ScheduleTrace {
165            fn_name: fn_name.to_string(),
166            delay_ms,
167            run_at,
168        });
169    }
170
171    /// Finalize the trace with a successful outcome.
172    pub fn finish_ok(self, value: Option<serde_json::Value>) -> FnTrace {
173        self.finish(FnOutcome::Ok { value })
174    }
175
176    /// Finalize the trace with an error outcome.
177    pub fn finish_error(self, code: String, message: String) -> FnTrace {
178        self.finish(FnOutcome::Error { code, message })
179    }
180
181    /// Finalize the trace with a rollback outcome.
182    pub fn finish_rolled_back(self, code: String, message: String) -> FnTrace {
183        self.finish(FnOutcome::RolledBack { code, message })
184    }
185
186    fn finish(self, outcome: FnOutcome) -> FnTrace {
187        FnTrace {
188            call_id: self.call_id,
189            fn_name: self.fn_name,
190            fn_type: self.fn_type,
191            user_id: self.user_id,
192            started_at: self.started_at,
193            duration_ms: self.start_instant.elapsed().as_secs_f64() * 1000.0,
194            outcome,
195            ops: self.ops,
196            stream_bytes: self.stream_bytes,
197            stream_chunks: self.stream_chunks,
198            schedules: self.schedules,
199        }
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Trace log — bounded ring buffer of recent traces
205// ---------------------------------------------------------------------------
206
207/// A bounded ring buffer of recent function traces.
208///
209/// Thread-safe. Stores the most recent `capacity` traces. Oldest entries
210/// are evicted when the buffer is full.
211pub struct TraceLog {
212    traces: std::sync::Mutex<TraceRing>,
213}
214
215struct TraceRing {
216    buf: Vec<FnTrace>,
217    capacity: usize,
218    write_pos: usize,
219    count: usize,
220}
221
222impl TraceLog {
223    pub fn new(capacity: usize) -> Self {
224        Self {
225            traces: std::sync::Mutex::new(TraceRing {
226                buf: Vec::with_capacity(capacity),
227                capacity,
228                write_pos: 0,
229                count: 0,
230            }),
231        }
232    }
233
234    /// Record a completed trace.
235    pub fn push(&self, trace: FnTrace) {
236        let mut ring = self.traces.lock().unwrap();
237        let cap = ring.capacity;
238        if ring.buf.len() < cap {
239            ring.buf.push(trace);
240        } else {
241            let pos = ring.write_pos;
242            ring.buf[pos] = trace;
243        }
244        ring.write_pos = (ring.write_pos + 1) % cap;
245        ring.count += 1;
246    }
247
248    /// Query recent traces, newest first.
249    pub fn recent(&self, limit: usize) -> Vec<FnTrace> {
250        let ring = self.traces.lock().unwrap();
251        let len = ring.buf.len();
252        if len == 0 {
253            return vec![];
254        }
255
256        let take = limit.min(len);
257        let mut result = Vec::with_capacity(take);
258
259        // Walk backwards from the most recent write position.
260        let start = if ring.write_pos == 0 {
261            len - 1
262        } else {
263            ring.write_pos - 1
264        };
265        let mut i = start;
266        for _ in 0..take {
267            result.push(ring.buf[i].clone());
268            if i == 0 {
269                i = len - 1;
270            } else {
271                i -= 1;
272            }
273        }
274        result
275    }
276
277    /// Query traces filtered by function name, newest first.
278    pub fn by_fn(&self, fn_name: &str, limit: usize) -> Vec<FnTrace> {
279        self.recent(self.len())
280            .into_iter()
281            .filter(|t| t.fn_name == fn_name)
282            .take(limit)
283            .collect()
284    }
285
286    /// Query only error/rollback traces, newest first.
287    pub fn errors(&self, limit: usize) -> Vec<FnTrace> {
288        self.recent(self.len())
289            .into_iter()
290            .filter(|t| !matches!(t.outcome, FnOutcome::Ok { .. }))
291            .take(limit)
292            .collect()
293    }
294
295    /// Total traces recorded (including evicted).
296    pub fn total_count(&self) -> usize {
297        self.traces.lock().unwrap().count
298    }
299
300    /// Current buffer size.
301    pub fn len(&self) -> usize {
302        self.traces.lock().unwrap().buf.len()
303    }
304
305    pub fn is_empty(&self) -> bool {
306        self.len() == 0
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    fn make_trace(name: &str, duration_ms: f64) -> FnTrace {
315        FnTrace {
316            call_id: format!("c_{name}"),
317            fn_name: name.to_string(),
318            fn_type: FnType::Mutation,
319            user_id: Some("user_1".to_string()),
320            started_at: 1000,
321            duration_ms,
322            outcome: FnOutcome::Ok { value: None },
323            ops: vec![],
324            stream_bytes: 0,
325            stream_chunks: 0,
326            schedules: vec![],
327        }
328    }
329
330    #[test]
331    fn trace_builder_records_ops() {
332        let mut builder = TraceBuilder::new(
333            "c1".into(),
334            "placeBid".into(),
335            FnType::Mutation,
336            Some("user_1".into()),
337        );
338
339        builder.record_op(
340            DbOp::Get,
341            "Lot",
342            Some("lot_1"),
343            Duration::from_micros(100),
344            Some(1),
345            true,
346        );
347        builder.record_op(
348            DbOp::Insert,
349            "Bid",
350            None,
351            Duration::from_micros(150),
352            None,
353            true,
354        );
355        builder.record_stream_chunk(42);
356        builder.record_stream_chunk(18);
357        builder.record_schedule("closeLot", Some(5000), None);
358
359        let trace = builder.finish_ok(Some(serde_json::json!({"accepted": true})));
360
361        assert_eq!(trace.fn_name, "placeBid");
362        assert_eq!(trace.ops.len(), 2);
363        assert_eq!(trace.stream_bytes, 60);
364        assert_eq!(trace.stream_chunks, 2);
365        assert_eq!(trace.schedules.len(), 1);
366    }
367
368    #[test]
369    fn trace_log_ring_buffer() {
370        let log = TraceLog::new(3);
371
372        log.push(make_trace("a", 1.0));
373        log.push(make_trace("b", 2.0));
374        log.push(make_trace("c", 3.0));
375        log.push(make_trace("d", 4.0)); // evicts "a"
376
377        assert_eq!(log.len(), 3);
378        assert_eq!(log.total_count(), 4);
379
380        let recent = log.recent(10);
381        assert_eq!(recent.len(), 3);
382        assert_eq!(recent[0].fn_name, "d"); // newest first
383        assert_eq!(recent[1].fn_name, "c");
384        assert_eq!(recent[2].fn_name, "b");
385    }
386
387    #[test]
388    fn trace_log_by_fn() {
389        let log = TraceLog::new(100);
390        log.push(make_trace("placeBid", 1.0));
391        log.push(make_trace("getLots", 0.5));
392        log.push(make_trace("placeBid", 1.2));
393
394        let bids = log.by_fn("placeBid", 10);
395        assert_eq!(bids.len(), 2);
396    }
397
398    #[test]
399    fn trace_log_errors() {
400        let log = TraceLog::new(100);
401        log.push(make_trace("a", 1.0));
402
403        let mut err_trace = make_trace("b", 2.0);
404        err_trace.outcome = FnOutcome::Error {
405            code: "BID_TOO_LOW".into(),
406            message: "too low".into(),
407        };
408        log.push(err_trace);
409
410        let errors = log.errors(10);
411        assert_eq!(errors.len(), 1);
412        assert_eq!(errors[0].fn_name, "b");
413    }
414
415    #[test]
416    fn trace_serializes() {
417        let trace = make_trace("test", 1.5);
418        let json = serde_json::to_string(&trace).unwrap();
419        assert!(json.contains("\"fn_name\":\"test\""));
420        assert!(json.contains("\"status\":\"ok\""));
421    }
422}