1#![doc = include_str!("../README.md")]
2#![deny(missing_docs)]
3
4use std::{collections::HashMap, time::Instant};
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tracing::{Span, debug, info, info_span, warn};
9
10#[derive(Debug, Clone, Serialize, Deserialize, Default)]
16pub struct CollectedMeta {
17 #[serde(skip_serializing_if = "HashMap::is_empty")]
19 pub sections: HashMap<String, Value>,
20
21 #[serde(skip_serializing_if = "Vec::is_empty")]
23 pub issues: Vec<String>,
24
25 #[serde(skip_serializing_if = "Option::is_none")]
27 pub input_rows: Option<usize>,
28
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub output_rows: Option<usize>,
32
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub processing_time_ms: Option<u64>,
36}
37
38impl CollectedMeta {
39 pub fn get_section<T: for<'de> Deserialize<'de>>(&self, name: &str) -> Option<T> {
41 self.sections
42 .get(name)
43 .and_then(|v| serde_json::from_value(v.clone()).ok())
44 }
45
46 pub fn has_issues(&self) -> bool {
48 !self.issues.is_empty()
49 }
50
51 pub fn warning_count(&self) -> usize {
53 self.issues.iter().filter(|i| i.contains("[WARN]")).count()
54 }
55
56 pub fn error_count(&self) -> usize {
58 self.issues.iter().filter(|i| i.contains("[ERROR]")).count()
59 }
60}
61
62#[derive(Debug)]
71pub struct TimedSection<'a> {
72 collector: &'a mut MetaCollector,
73 name: String,
74 span: Span,
75 start: Instant,
76}
77
78impl<'a> TimedSection<'a> {
79 pub fn finish(self) {
81 let elapsed_ms = self.start.elapsed().as_millis() as u64;
82 let _enter = self.span.enter();
83 info!(elapsed_ms, "section complete");
84
85 let timing = TimingMeta { elapsed_ms };
86 if let Ok(value) = serde_json::to_value(&timing) {
87 self.collector.sections.insert(self.name, value);
88 }
89 }
90
91 pub fn finish_with_data<T: Serialize>(self, data: &T) {
93 let elapsed_ms = self.start.elapsed().as_millis() as u64;
94 let _enter = self.span.enter();
95 info!(elapsed_ms, "section complete");
96
97 let timed = TimedSectionMeta {
98 elapsed_ms,
99 data: serde_json::to_value(data).unwrap_or_default(),
100 };
101 if let Ok(value) = serde_json::to_value(&timed) {
102 self.collector.sections.insert(self.name, value);
103 }
104 }
105
106 pub fn finish_with_error(self, error: impl Into<String>) {
108 let elapsed_ms = self.start.elapsed().as_millis() as u64;
109 let error = error.into();
110 let _enter = self.span.enter();
111 tracing::error!(elapsed_ms, error = %error, "section failed");
112
113 self.collector
114 .add_issue(format!("[ERROR] {}: {}", self.name, error));
115
116 let timing = TimingMeta { elapsed_ms };
117 if let Ok(value) = serde_json::to_value(&timing) {
118 self.collector.sections.insert(self.name, value);
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct TimingMeta {
126 pub elapsed_ms: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct TimedSectionMeta {
133 pub elapsed_ms: u64,
135 pub data: Value,
137}
138
139#[derive(Debug)]
150pub struct MetaCollector {
151 start_time: Instant,
152 sections: HashMap<String, Value>,
153 issues: Vec<String>,
154 input_rows: Option<usize>,
155 output_rows: Option<usize>,
156}
157
158impl MetaCollector {
159 pub fn new() -> Self {
161 debug!("MetaCollector created");
162 Self {
163 start_time: Instant::now(),
164 sections: HashMap::new(),
165 issues: Vec::new(),
166 input_rows: None,
167 output_rows: None,
168 }
169 }
170
171 pub fn add_section<T: Serialize>(&mut self, name: impl Into<String>, data: &T) {
178 let name = name.into();
179 if let Ok(value) = serde_json::to_value(data) {
180 debug!(section = %name, "meta section added");
181 self.sections.insert(name, value);
182 }
183 }
184
185 pub fn merge_section<T: Serialize>(&mut self, name: impl Into<String>, data: &T) {
188 let name = name.into();
189 if let Ok(new_value) = serde_json::to_value(data) {
190 if let Some(existing) = self.sections.get_mut(&name)
191 && let (Value::Object(existing_map), Value::Object(new_map)) =
192 (existing, &new_value)
193 {
194 for (k, v) in new_map {
195 existing_map.insert(k.clone(), v.clone());
196 }
197 debug!(section = %name, "meta section merged");
198 return;
199 }
200 debug!(section = %name, "meta section added");
201 self.sections.insert(name, new_value);
202 }
203 }
204
205 pub fn get_section(&self, name: &str) -> Option<&Value> {
207 self.sections.get(name)
208 }
209
210 pub fn has_section(&self, name: &str) -> bool {
212 self.sections.contains_key(name)
213 }
214
215 pub fn add_issue(&mut self, issue: impl Into<String>) {
221 let issue = issue.into();
222 if issue.contains("[ERROR]") {
223 tracing::error!(issue = %issue, "validation error");
224 } else {
225 warn!(issue = %issue, "validation issue");
226 }
227 self.issues.push(issue);
228 }
229
230 pub fn add_issues(&mut self, issues: impl IntoIterator<Item = String>) {
232 for issue in issues {
233 self.add_issue(issue);
234 }
235 }
236
237 pub fn has_issues(&self) -> bool {
239 !self.issues.is_empty()
240 }
241
242 pub fn issues(&self) -> &[String] {
244 &self.issues
245 }
246
247 pub fn set_input_rows(&mut self, rows: usize) {
253 debug!(input_rows = rows, "input rows recorded");
254 self.input_rows = Some(rows);
255 }
256
257 pub fn set_output_rows(&mut self, rows: usize) {
259 debug!(output_rows = rows, "output rows recorded");
260 self.output_rows = Some(rows);
261 }
262
263 pub fn set_rows(&mut self, input: usize, output: usize) {
265 self.set_input_rows(input);
266 self.set_output_rows(output);
267 }
268
269 pub fn input_rows(&self) -> Option<usize> {
271 self.input_rows
272 }
273
274 pub fn output_rows(&self) -> Option<usize> {
276 self.output_rows
277 }
278
279 pub fn timed_section(&mut self, name: impl Into<String>) -> TimedSection<'_> {
289 let name = name.into();
290 let span = info_span!("meta_section", name = %name);
291 span.in_scope(|| info!("section started"));
292
293 TimedSection {
294 collector: self,
295 name,
296 span,
297 start: Instant::now(),
298 }
299 }
300
301 pub fn elapsed_ms(&self) -> u64 {
307 self.start_time.elapsed().as_millis() as u64
308 }
309
310 pub fn build(self) -> CollectedMeta {
316 let elapsed = self.elapsed_ms();
317 info!(
318 elapsed_ms = elapsed,
319 sections = self.sections.len(),
320 issues = self.issues.len(),
321 "MetaCollector finalized"
322 );
323
324 CollectedMeta {
325 sections: self.sections,
326 issues: self.issues,
327 input_rows: self.input_rows,
328 output_rows: self.output_rows,
329 processing_time_ms: Some(elapsed),
330 }
331 }
332}
333
334impl Default for MetaCollector {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340pub fn record_input_rows(collector: Option<&mut MetaCollector>, rows: usize) {
346 if let Some(c) = collector {
347 c.set_input_rows(rows);
348 }
349}
350
351pub fn record_output_rows(collector: Option<&mut MetaCollector>, rows: usize) {
353 if let Some(c) = collector {
354 c.set_output_rows(rows);
355 }
356}
357
358pub fn record_rows(collector: Option<&mut MetaCollector>, input: usize, output: usize) {
360 if let Some(c) = collector {
361 c.set_rows(input, output);
362 }
363}
364
365pub fn record_issue(collector: Option<&mut MetaCollector>, issue: impl Into<String>) {
367 if let Some(c) = collector {
368 c.add_issue(issue);
369 }
370}
371
372pub fn record_issues(
374 collector: Option<&mut MetaCollector>,
375 issues: impl IntoIterator<Item = String>,
376) {
377 if let Some(c) = collector {
378 c.add_issues(issues);
379 }
380}
381
382pub fn record_section<T: Serialize>(
384 collector: Option<&mut MetaCollector>,
385 name: impl Into<String>,
386 data: &T,
387) {
388 if let Some(c) = collector {
389 c.add_section(name, data);
390 }
391}
392
393#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[derive(Debug, Serialize, Deserialize, PartialEq)]
402 struct TestStats {
403 count: usize,
404 label: String,
405 }
406
407 #[test]
408 fn test_basic_collection() {
409 let mut collector = MetaCollector::new();
410 collector.add_section(
411 "stats",
412 &TestStats {
413 count: 42,
414 label: "test".into(),
415 },
416 );
417 collector.set_rows(1000, 500);
418
419 let meta = collector.build();
420
421 assert_eq!(meta.input_rows, Some(1000));
422 assert_eq!(meta.output_rows, Some(500));
423 assert!(meta.processing_time_ms.is_some());
424
425 let stats: TestStats = meta.get_section("stats").unwrap();
426 assert_eq!(stats.count, 42);
427 }
428
429 #[test]
430 fn test_issues() {
431 let mut collector = MetaCollector::new();
432 collector.add_issue("[WARN] Something might be wrong");
433 collector.add_issue("[ERROR] Something is definitely wrong");
434 collector.add_issue("[WARN] Another warning");
435
436 assert!(collector.has_issues());
437 assert_eq!(collector.issues().len(), 3);
438
439 let meta = collector.build();
440 assert_eq!(meta.warning_count(), 2);
441 assert_eq!(meta.error_count(), 1);
442 }
443
444 #[test]
445 fn test_merge_section() {
446 let mut collector = MetaCollector::new();
447
448 #[derive(Serialize)]
449 struct Part1 {
450 a: i32,
451 }
452
453 #[derive(Serialize)]
454 struct Part2 {
455 b: i32,
456 }
457
458 collector.add_section("combined", &Part1 { a: 1 });
459 collector.merge_section("combined", &Part2 { b: 2 });
460
461 let meta = collector.build();
462 let section = meta.sections.get("combined").unwrap();
463
464 assert_eq!(section.get("a").unwrap().as_i64(), Some(1));
465 assert_eq!(section.get("b").unwrap().as_i64(), Some(2));
466 }
467
468 #[test]
469 fn test_timed_section() {
470 let mut collector = MetaCollector::new();
471
472 {
473 let section = collector.timed_section("work");
474 std::thread::sleep(std::time::Duration::from_millis(10));
475 section.finish();
476 }
477
478 let meta = collector.build();
479 let timing: TimingMeta = meta.get_section("work").unwrap();
480 assert!(timing.elapsed_ms >= 10);
481 }
482
483 #[test]
484 fn test_timed_section_with_data() {
485 let mut collector = MetaCollector::new();
486
487 {
488 let section = collector.timed_section("work");
489 section.finish_with_data(&TestStats {
490 count: 99,
491 label: "done".into(),
492 });
493 }
494
495 let meta = collector.build();
496 let timed: TimedSectionMeta = meta.get_section("work").unwrap();
497
498 let stats: TestStats = serde_json::from_value(timed.data).unwrap();
499 assert_eq!(stats.count, 99);
500 }
501
502 #[test]
503 fn test_convenience_functions_with_some() {
504 let mut collector = MetaCollector::new();
505
506 record_input_rows(Some(&mut collector), 1000);
507 record_output_rows(Some(&mut collector), 500);
508 record_issue(Some(&mut collector), "[WARN] test");
509 record_section(Some(&mut collector), "test", &42);
510
511 assert_eq!(collector.input_rows(), Some(1000));
512 assert_eq!(collector.output_rows(), Some(500));
513 assert!(collector.has_issues());
514 assert!(collector.has_section("test"));
515 }
516
517 #[test]
518 fn test_convenience_functions_with_none() {
519 record_input_rows(None, 1000);
521 record_output_rows(None, 500);
522 record_issue(None, "[WARN] test");
523 record_section(None, "test", &42);
524 }
525
526 #[test]
527 fn test_elapsed_time() {
528 let collector = MetaCollector::new();
529 std::thread::sleep(std::time::Duration::from_millis(5));
530 assert!(collector.elapsed_ms() >= 5);
531 }
532}