Skip to main content

meta_tracing/
lib.rs

1#![doc = include_str!("../README.md")]
2#![deny(missing_docs)]
3
4use std::{collections::HashMap, time::Instant};
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tracing::{Span, debug, info, info_span, warn};
9
10// =============================================================================
11// CollectedMeta - Final Output
12// =============================================================================
13
14/// Accumulated metadata - the final serializable output
15#[derive(Debug, Clone, Serialize, Deserialize, Default)]
16pub struct CollectedMeta {
17    /// Named sections from different packages
18    #[serde(skip_serializing_if = "HashMap::is_empty")]
19    pub sections: HashMap<String, Value>,
20
21    /// Issues/warnings from any source
22    #[serde(skip_serializing_if = "Vec::is_empty")]
23    pub issues: Vec<String>,
24
25    /// Row tracking
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub input_rows: Option<usize>,
28
29    /// Number of rows produced
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub output_rows: Option<usize>,
32
33    /// Processing time in milliseconds
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub processing_time_ms: Option<u64>,
36}
37
38impl CollectedMeta {
39    /// Get a section as a specific type
40    pub fn get_section<T: for<'de> Deserialize<'de>>(&self, name: &str) -> Option<T> {
41        self.sections
42            .get(name)
43            .and_then(|v| serde_json::from_value(v.clone()).ok())
44    }
45
46    /// Check if there are any issues
47    pub fn has_issues(&self) -> bool {
48        !self.issues.is_empty()
49    }
50
51    /// Count warnings
52    pub fn warning_count(&self) -> usize {
53        self.issues.iter().filter(|i| i.contains("[WARN]")).count()
54    }
55
56    /// Count errors
57    pub fn error_count(&self) -> usize {
58        self.issues.iter().filter(|i| i.contains("[ERROR]")).count()
59    }
60}
61
62// =============================================================================
63// TimedSection - Span-based Timing
64// =============================================================================
65
66/// A timed section that creates a tracing span and captures elapsed time.
67///
68/// Created via `MetaCollector::timed_section()`. When finished, the elapsed
69/// time is recorded both to tracing and to the metadata.
70#[derive(Debug)]
71pub struct TimedSection<'a> {
72    collector: &'a mut MetaCollector,
73    name: String,
74    span: Span,
75    start: Instant,
76}
77
78impl<'a> TimedSection<'a> {
79    /// Finish the section without additional data
80    pub fn finish(self) {
81        let elapsed_ms = self.start.elapsed().as_millis() as u64;
82        let _enter = self.span.enter();
83        info!(elapsed_ms, "section complete");
84
85        let timing = TimingMeta { elapsed_ms };
86        if let Ok(value) = serde_json::to_value(&timing) {
87            self.collector.sections.insert(self.name, value);
88        }
89    }
90
91    /// Finish the section with additional data
92    pub fn finish_with_data<T: Serialize>(self, data: &T) {
93        let elapsed_ms = self.start.elapsed().as_millis() as u64;
94        let _enter = self.span.enter();
95        info!(elapsed_ms, "section complete");
96
97        let timed = TimedSectionMeta {
98            elapsed_ms,
99            data: serde_json::to_value(data).unwrap_or_default(),
100        };
101        if let Ok(value) = serde_json::to_value(&timed) {
102            self.collector.sections.insert(self.name, value);
103        }
104    }
105
106    /// Finish the section, recording an error
107    pub fn finish_with_error(self, error: impl Into<String>) {
108        let elapsed_ms = self.start.elapsed().as_millis() as u64;
109        let error = error.into();
110        let _enter = self.span.enter();
111        tracing::error!(elapsed_ms, error = %error, "section failed");
112
113        self.collector
114            .add_issue(format!("[ERROR] {}: {}", self.name, error));
115
116        let timing = TimingMeta { elapsed_ms };
117        if let Ok(value) = serde_json::to_value(&timing) {
118            self.collector.sections.insert(self.name, value);
119        }
120    }
121}
122
123/// Timing metadata for a section
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct TimingMeta {
126    /// Wall-clock time spent inside the section
127    pub elapsed_ms: u64,
128}
129
130/// Timed section metadata with data
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct TimedSectionMeta {
133    /// Wall-clock time spent inside the section
134    pub elapsed_ms: u64,
135    /// Arbitrary JSON payload attached by the caller
136    pub data: Value,
137}
138
139// =============================================================================
140// MetaCollector
141// =============================================================================
142
143/// Accumulates metadata during processing with automatic tracing.
144///
145/// Every addition is traced at the appropriate level:
146/// - `add_section`: DEBUG level
147/// - `add_issue`: WARN level for warnings, ERROR for errors
148/// - `timed_section`: INFO span with timing
149#[derive(Debug)]
150pub struct MetaCollector {
151    start_time: Instant,
152    sections: HashMap<String, Value>,
153    issues: Vec<String>,
154    input_rows: Option<usize>,
155    output_rows: Option<usize>,
156}
157
158impl MetaCollector {
159    /// Create a new MetaCollector
160    pub fn new() -> Self {
161        debug!("MetaCollector created");
162        Self {
163            start_time: Instant::now(),
164            sections: HashMap::new(),
165            issues: Vec::new(),
166            input_rows: None,
167            output_rows: None,
168        }
169    }
170
171    // -------------------------------------------------------------------------
172    // Section Management
173    // -------------------------------------------------------------------------
174
175    /// Add a named section with any serializable data.
176    /// Automatically traces at DEBUG level.
177    pub fn add_section<T: Serialize>(&mut self, name: impl Into<String>, data: &T) {
178        let name = name.into();
179        if let Ok(value) = serde_json::to_value(data) {
180            debug!(section = %name, "meta section added");
181            self.sections.insert(name, value);
182        }
183    }
184
185    /// Add or merge into a named section.
186    /// If section exists and both are objects, merges keys.
187    pub fn merge_section<T: Serialize>(&mut self, name: impl Into<String>, data: &T) {
188        let name = name.into();
189        if let Ok(new_value) = serde_json::to_value(data) {
190            if let Some(existing) = self.sections.get_mut(&name)
191                && let (Value::Object(existing_map), Value::Object(new_map)) =
192                    (existing, &new_value)
193            {
194                for (k, v) in new_map {
195                    existing_map.insert(k.clone(), v.clone());
196                }
197                debug!(section = %name, "meta section merged");
198                return;
199            }
200            debug!(section = %name, "meta section added");
201            self.sections.insert(name, new_value);
202        }
203    }
204
205    /// Get a section by name
206    pub fn get_section(&self, name: &str) -> Option<&Value> {
207        self.sections.get(name)
208    }
209
210    /// Check if a section exists
211    pub fn has_section(&self, name: &str) -> bool {
212        self.sections.contains_key(name)
213    }
214
215    // -------------------------------------------------------------------------
216    // Issues
217    // -------------------------------------------------------------------------
218
219    /// Add an issue. Automatically traces at WARN or ERROR level.
220    pub fn add_issue(&mut self, issue: impl Into<String>) {
221        let issue = issue.into();
222        if issue.contains("[ERROR]") {
223            tracing::error!(issue = %issue, "validation error");
224        } else {
225            warn!(issue = %issue, "validation issue");
226        }
227        self.issues.push(issue);
228    }
229
230    /// Add multiple issues
231    pub fn add_issues(&mut self, issues: impl IntoIterator<Item = String>) {
232        for issue in issues {
233            self.add_issue(issue);
234        }
235    }
236
237    /// Check if there are any issues
238    pub fn has_issues(&self) -> bool {
239        !self.issues.is_empty()
240    }
241
242    /// Get all issues
243    pub fn issues(&self) -> &[String] {
244        &self.issues
245    }
246
247    // -------------------------------------------------------------------------
248    // Row Tracking
249    // -------------------------------------------------------------------------
250
251    /// Set input row count
252    pub fn set_input_rows(&mut self, rows: usize) {
253        debug!(input_rows = rows, "input rows recorded");
254        self.input_rows = Some(rows);
255    }
256
257    /// Set output row count
258    pub fn set_output_rows(&mut self, rows: usize) {
259        debug!(output_rows = rows, "output rows recorded");
260        self.output_rows = Some(rows);
261    }
262
263    /// Set both input and output rows
264    pub fn set_rows(&mut self, input: usize, output: usize) {
265        self.set_input_rows(input);
266        self.set_output_rows(output);
267    }
268
269    /// Get input rows
270    pub fn input_rows(&self) -> Option<usize> {
271        self.input_rows
272    }
273
274    /// Get output rows
275    pub fn output_rows(&self) -> Option<usize> {
276        self.output_rows
277    }
278
279    // -------------------------------------------------------------------------
280    // Timed Sections
281    // -------------------------------------------------------------------------
282
283    /// Start a timed section with a tracing span.
284    ///
285    /// Returns a `TimedSection` that must be finished via `.finish()` or
286    /// `.finish_with_data()`. The elapsed time is recorded both to tracing
287    /// and to the metadata.
288    pub fn timed_section(&mut self, name: impl Into<String>) -> TimedSection<'_> {
289        let name = name.into();
290        let span = info_span!("meta_section", name = %name);
291        span.in_scope(|| info!("section started"));
292
293        TimedSection {
294            collector: self,
295            name,
296            span,
297            start: Instant::now(),
298        }
299    }
300
301    // -------------------------------------------------------------------------
302    // Timing
303    // -------------------------------------------------------------------------
304
305    /// Get elapsed time since collector creation
306    pub fn elapsed_ms(&self) -> u64 {
307        self.start_time.elapsed().as_millis() as u64
308    }
309
310    // -------------------------------------------------------------------------
311    // Build
312    // -------------------------------------------------------------------------
313
314    /// Build the final CollectedMeta
315    pub fn build(self) -> CollectedMeta {
316        let elapsed = self.elapsed_ms();
317        info!(
318            elapsed_ms = elapsed,
319            sections = self.sections.len(),
320            issues = self.issues.len(),
321            "MetaCollector finalized"
322        );
323
324        CollectedMeta {
325            sections: self.sections,
326            issues: self.issues,
327            input_rows: self.input_rows,
328            output_rows: self.output_rows,
329            processing_time_ms: Some(elapsed),
330        }
331    }
332}
333
334impl Default for MetaCollector {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340// =============================================================================
341// Convenience Functions for Option<&mut MetaCollector>
342// =============================================================================
343
344/// Record input rows if collector is present
345pub fn record_input_rows(collector: Option<&mut MetaCollector>, rows: usize) {
346    if let Some(c) = collector {
347        c.set_input_rows(rows);
348    }
349}
350
351/// Record output rows if collector is present
352pub fn record_output_rows(collector: Option<&mut MetaCollector>, rows: usize) {
353    if let Some(c) = collector {
354        c.set_output_rows(rows);
355    }
356}
357
358/// Record both input and output rows if collector is present
359pub fn record_rows(collector: Option<&mut MetaCollector>, input: usize, output: usize) {
360    if let Some(c) = collector {
361        c.set_rows(input, output);
362    }
363}
364
365/// Add an issue if collector is present
366pub fn record_issue(collector: Option<&mut MetaCollector>, issue: impl Into<String>) {
367    if let Some(c) = collector {
368        c.add_issue(issue);
369    }
370}
371
372/// Add multiple issues if collector is present
373pub fn record_issues(
374    collector: Option<&mut MetaCollector>,
375    issues: impl IntoIterator<Item = String>,
376) {
377    if let Some(c) = collector {
378        c.add_issues(issues);
379    }
380}
381
382/// Add a section if collector is present
383pub fn record_section<T: Serialize>(
384    collector: Option<&mut MetaCollector>,
385    name: impl Into<String>,
386    data: &T,
387) {
388    if let Some(c) = collector {
389        c.add_section(name, data);
390    }
391}
392
393// =============================================================================
394// Tests
395// =============================================================================
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    #[derive(Debug, Serialize, Deserialize, PartialEq)]
402    struct TestStats {
403        count: usize,
404        label: String,
405    }
406
407    #[test]
408    fn test_basic_collection() {
409        let mut collector = MetaCollector::new();
410        collector.add_section(
411            "stats",
412            &TestStats {
413                count: 42,
414                label: "test".into(),
415            },
416        );
417        collector.set_rows(1000, 500);
418
419        let meta = collector.build();
420
421        assert_eq!(meta.input_rows, Some(1000));
422        assert_eq!(meta.output_rows, Some(500));
423        assert!(meta.processing_time_ms.is_some());
424
425        let stats: TestStats = meta.get_section("stats").unwrap();
426        assert_eq!(stats.count, 42);
427    }
428
429    #[test]
430    fn test_issues() {
431        let mut collector = MetaCollector::new();
432        collector.add_issue("[WARN] Something might be wrong");
433        collector.add_issue("[ERROR] Something is definitely wrong");
434        collector.add_issue("[WARN] Another warning");
435
436        assert!(collector.has_issues());
437        assert_eq!(collector.issues().len(), 3);
438
439        let meta = collector.build();
440        assert_eq!(meta.warning_count(), 2);
441        assert_eq!(meta.error_count(), 1);
442    }
443
444    #[test]
445    fn test_merge_section() {
446        let mut collector = MetaCollector::new();
447
448        #[derive(Serialize)]
449        struct Part1 {
450            a: i32,
451        }
452
453        #[derive(Serialize)]
454        struct Part2 {
455            b: i32,
456        }
457
458        collector.add_section("combined", &Part1 { a: 1 });
459        collector.merge_section("combined", &Part2 { b: 2 });
460
461        let meta = collector.build();
462        let section = meta.sections.get("combined").unwrap();
463
464        assert_eq!(section.get("a").unwrap().as_i64(), Some(1));
465        assert_eq!(section.get("b").unwrap().as_i64(), Some(2));
466    }
467
468    #[test]
469    fn test_timed_section() {
470        let mut collector = MetaCollector::new();
471
472        {
473            let section = collector.timed_section("work");
474            std::thread::sleep(std::time::Duration::from_millis(10));
475            section.finish();
476        }
477
478        let meta = collector.build();
479        let timing: TimingMeta = meta.get_section("work").unwrap();
480        assert!(timing.elapsed_ms >= 10);
481    }
482
483    #[test]
484    fn test_timed_section_with_data() {
485        let mut collector = MetaCollector::new();
486
487        {
488            let section = collector.timed_section("work");
489            section.finish_with_data(&TestStats {
490                count: 99,
491                label: "done".into(),
492            });
493        }
494
495        let meta = collector.build();
496        let timed: TimedSectionMeta = meta.get_section("work").unwrap();
497
498        let stats: TestStats = serde_json::from_value(timed.data).unwrap();
499        assert_eq!(stats.count, 99);
500    }
501
502    #[test]
503    fn test_convenience_functions_with_some() {
504        let mut collector = MetaCollector::new();
505
506        record_input_rows(Some(&mut collector), 1000);
507        record_output_rows(Some(&mut collector), 500);
508        record_issue(Some(&mut collector), "[WARN] test");
509        record_section(Some(&mut collector), "test", &42);
510
511        assert_eq!(collector.input_rows(), Some(1000));
512        assert_eq!(collector.output_rows(), Some(500));
513        assert!(collector.has_issues());
514        assert!(collector.has_section("test"));
515    }
516
517    #[test]
518    fn test_convenience_functions_with_none() {
519        // Should not panic
520        record_input_rows(None, 1000);
521        record_output_rows(None, 500);
522        record_issue(None, "[WARN] test");
523        record_section(None, "test", &42);
524    }
525
526    #[test]
527    fn test_elapsed_time() {
528        let collector = MetaCollector::new();
529        std::thread::sleep(std::time::Duration::from_millis(5));
530        assert!(collector.elapsed_ms() >= 5);
531    }
532}