1use std::time::{Duration, Instant};
9
10use serde::Serialize;
11
12use crate::protocol::{DbOp, FnType};
13
14#[derive(Debug, Clone, Serialize)]
20pub struct FnTrace {
21 pub call_id: String,
22 pub fn_name: String,
23 pub fn_type: FnType,
24 pub user_id: Option<String>,
25 pub started_at: u64,
26 pub duration_ms: f64,
27 pub outcome: FnOutcome,
28 pub ops: Vec<OpTrace>,
29 pub stream_bytes: u64,
30 pub stream_chunks: u32,
31 pub schedules: Vec<ScheduleTrace>,
32}
33
34#[derive(Debug, Clone, Serialize)]
36#[serde(tag = "status")]
37pub enum FnOutcome {
38 #[serde(rename = "ok")]
39 Ok {
40 #[serde(skip_serializing_if = "Option::is_none")]
41 value: Option<serde_json::Value>,
42 },
43 #[serde(rename = "error")]
44 Error { code: String, message: String },
45 #[serde(rename = "rolled_back")]
46 RolledBack { code: String, message: String },
47}
48
49#[derive(Debug, Clone, Serialize)]
51pub struct OpTrace {
52 pub op: DbOp,
53 pub entity: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub id: Option<String>,
56 pub duration_ms: f64,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub row_count: Option<usize>,
59 pub ok: bool,
60}
61
62#[derive(Debug, Clone, Serialize)]
64pub struct ScheduleTrace {
65 pub fn_name: String,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub delay_ms: Option<u64>,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub run_at: Option<u64>,
70}
71
72pub struct TraceBuilder {
82 call_id: String,
83 fn_name: String,
84 fn_type: FnType,
85 pub(crate) user_id: Option<String>,
86 pub(crate) tenant_id: Option<String>,
90 started_at: u64,
91 start_instant: Instant,
92 ops: Vec<OpTrace>,
93 stream_bytes: u64,
94 stream_chunks: u32,
95 schedules: Vec<ScheduleTrace>,
96}
97
98impl TraceBuilder {
99 pub fn new(call_id: String, fn_name: String, fn_type: FnType, user_id: Option<String>) -> Self {
100 Self::new_with_tenant(call_id, fn_name, fn_type, user_id, None)
101 }
102
103 pub fn new_with_tenant(
104 call_id: String,
105 fn_name: String,
106 fn_type: FnType,
107 user_id: Option<String>,
108 tenant_id: Option<String>,
109 ) -> Self {
110 let now_epoch = std::time::SystemTime::now()
111 .duration_since(std::time::UNIX_EPOCH)
112 .unwrap_or_default()
113 .as_millis() as u64;
114
115 Self {
116 call_id,
117 fn_name,
118 fn_type,
119 user_id,
120 tenant_id,
121 started_at: now_epoch,
122 start_instant: Instant::now(),
123 ops: Vec::new(),
124 stream_bytes: 0,
125 stream_chunks: 0,
126 schedules: Vec::new(),
127 }
128 }
129
130 pub fn tenant_id(&self) -> Option<&str> {
133 self.tenant_id.as_deref()
134 }
135
136 pub fn record_op(
138 &mut self,
139 op: DbOp,
140 entity: &str,
141 id: Option<&str>,
142 duration: Duration,
143 row_count: Option<usize>,
144 ok: bool,
145 ) {
146 self.ops.push(OpTrace {
147 op,
148 entity: entity.to_string(),
149 id: id.map(|s| s.to_string()),
150 duration_ms: duration.as_secs_f64() * 1000.0,
151 row_count,
152 ok,
153 });
154 }
155
156 pub fn record_stream_chunk(&mut self, bytes: usize) {
158 self.stream_bytes += bytes as u64;
159 self.stream_chunks += 1;
160 }
161
162 pub fn record_schedule(&mut self, fn_name: &str, delay_ms: Option<u64>, run_at: Option<u64>) {
164 self.schedules.push(ScheduleTrace {
165 fn_name: fn_name.to_string(),
166 delay_ms,
167 run_at,
168 });
169 }
170
171 pub fn finish_ok(self, value: Option<serde_json::Value>) -> FnTrace {
173 self.finish(FnOutcome::Ok { value })
174 }
175
176 pub fn finish_error(self, code: String, message: String) -> FnTrace {
178 self.finish(FnOutcome::Error { code, message })
179 }
180
181 pub fn finish_rolled_back(self, code: String, message: String) -> FnTrace {
183 self.finish(FnOutcome::RolledBack { code, message })
184 }
185
186 fn finish(self, outcome: FnOutcome) -> FnTrace {
187 FnTrace {
188 call_id: self.call_id,
189 fn_name: self.fn_name,
190 fn_type: self.fn_type,
191 user_id: self.user_id,
192 started_at: self.started_at,
193 duration_ms: self.start_instant.elapsed().as_secs_f64() * 1000.0,
194 outcome,
195 ops: self.ops,
196 stream_bytes: self.stream_bytes,
197 stream_chunks: self.stream_chunks,
198 schedules: self.schedules,
199 }
200 }
201}
202
203pub struct TraceLog {
212 traces: std::sync::Mutex<TraceRing>,
213}
214
215struct TraceRing {
216 buf: Vec<FnTrace>,
217 capacity: usize,
218 write_pos: usize,
219 count: usize,
220}
221
222impl TraceLog {
223 pub fn new(capacity: usize) -> Self {
224 Self {
225 traces: std::sync::Mutex::new(TraceRing {
226 buf: Vec::with_capacity(capacity),
227 capacity,
228 write_pos: 0,
229 count: 0,
230 }),
231 }
232 }
233
234 pub fn push(&self, trace: FnTrace) {
236 let mut ring = self.traces.lock().unwrap();
237 let cap = ring.capacity;
238 if ring.buf.len() < cap {
239 ring.buf.push(trace);
240 } else {
241 let pos = ring.write_pos;
242 ring.buf[pos] = trace;
243 }
244 ring.write_pos = (ring.write_pos + 1) % cap;
245 ring.count += 1;
246 }
247
248 pub fn recent(&self, limit: usize) -> Vec<FnTrace> {
250 let ring = self.traces.lock().unwrap();
251 let len = ring.buf.len();
252 if len == 0 {
253 return vec![];
254 }
255
256 let take = limit.min(len);
257 let mut result = Vec::with_capacity(take);
258
259 let start = if ring.write_pos == 0 {
261 len - 1
262 } else {
263 ring.write_pos - 1
264 };
265 let mut i = start;
266 for _ in 0..take {
267 result.push(ring.buf[i].clone());
268 if i == 0 {
269 i = len - 1;
270 } else {
271 i -= 1;
272 }
273 }
274 result
275 }
276
277 pub fn by_fn(&self, fn_name: &str, limit: usize) -> Vec<FnTrace> {
279 self.recent(self.len())
280 .into_iter()
281 .filter(|t| t.fn_name == fn_name)
282 .take(limit)
283 .collect()
284 }
285
286 pub fn errors(&self, limit: usize) -> Vec<FnTrace> {
288 self.recent(self.len())
289 .into_iter()
290 .filter(|t| !matches!(t.outcome, FnOutcome::Ok { .. }))
291 .take(limit)
292 .collect()
293 }
294
295 pub fn total_count(&self) -> usize {
297 self.traces.lock().unwrap().count
298 }
299
300 pub fn len(&self) -> usize {
302 self.traces.lock().unwrap().buf.len()
303 }
304
305 pub fn is_empty(&self) -> bool {
306 self.len() == 0
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn make_trace(name: &str, duration_ms: f64) -> FnTrace {
315 FnTrace {
316 call_id: format!("c_{name}"),
317 fn_name: name.to_string(),
318 fn_type: FnType::Mutation,
319 user_id: Some("user_1".to_string()),
320 started_at: 1000,
321 duration_ms,
322 outcome: FnOutcome::Ok { value: None },
323 ops: vec![],
324 stream_bytes: 0,
325 stream_chunks: 0,
326 schedules: vec![],
327 }
328 }
329
330 #[test]
331 fn trace_builder_records_ops() {
332 let mut builder = TraceBuilder::new(
333 "c1".into(),
334 "placeBid".into(),
335 FnType::Mutation,
336 Some("user_1".into()),
337 );
338
339 builder.record_op(
340 DbOp::Get,
341 "Lot",
342 Some("lot_1"),
343 Duration::from_micros(100),
344 Some(1),
345 true,
346 );
347 builder.record_op(
348 DbOp::Insert,
349 "Bid",
350 None,
351 Duration::from_micros(150),
352 None,
353 true,
354 );
355 builder.record_stream_chunk(42);
356 builder.record_stream_chunk(18);
357 builder.record_schedule("closeLot", Some(5000), None);
358
359 let trace = builder.finish_ok(Some(serde_json::json!({"accepted": true})));
360
361 assert_eq!(trace.fn_name, "placeBid");
362 assert_eq!(trace.ops.len(), 2);
363 assert_eq!(trace.stream_bytes, 60);
364 assert_eq!(trace.stream_chunks, 2);
365 assert_eq!(trace.schedules.len(), 1);
366 }
367
368 #[test]
369 fn trace_log_ring_buffer() {
370 let log = TraceLog::new(3);
371
372 log.push(make_trace("a", 1.0));
373 log.push(make_trace("b", 2.0));
374 log.push(make_trace("c", 3.0));
375 log.push(make_trace("d", 4.0)); assert_eq!(log.len(), 3);
378 assert_eq!(log.total_count(), 4);
379
380 let recent = log.recent(10);
381 assert_eq!(recent.len(), 3);
382 assert_eq!(recent[0].fn_name, "d"); assert_eq!(recent[1].fn_name, "c");
384 assert_eq!(recent[2].fn_name, "b");
385 }
386
387 #[test]
388 fn trace_log_by_fn() {
389 let log = TraceLog::new(100);
390 log.push(make_trace("placeBid", 1.0));
391 log.push(make_trace("getLots", 0.5));
392 log.push(make_trace("placeBid", 1.2));
393
394 let bids = log.by_fn("placeBid", 10);
395 assert_eq!(bids.len(), 2);
396 }
397
398 #[test]
399 fn trace_log_errors() {
400 let log = TraceLog::new(100);
401 log.push(make_trace("a", 1.0));
402
403 let mut err_trace = make_trace("b", 2.0);
404 err_trace.outcome = FnOutcome::Error {
405 code: "BID_TOO_LOW".into(),
406 message: "too low".into(),
407 };
408 log.push(err_trace);
409
410 let errors = log.errors(10);
411 assert_eq!(errors.len(), 1);
412 assert_eq!(errors[0].fn_name, "b");
413 }
414
415 #[test]
416 fn trace_serializes() {
417 let trace = make_trace("test", 1.5);
418 let json = serde_json::to_string(&trace).unwrap();
419 assert!(json.contains("\"fn_name\":\"test\""));
420 assert!(json.contains("\"status\":\"ok\""));
421 }
422}