Skip to main content

ralph_workflow/monitoring/memory_metrics/
collector.rs

1// MemoryMetricsCollector: collects and stores memory snapshots during pipeline execution.
2
3use super::backends::TelemetryBackend;
4use super::snapshot::MemorySnapshot;
5
6const DEFAULT_MAX_SNAPSHOTS: usize = 1024;
7
8/// Memory metrics collector for pipeline execution.
9#[derive(Debug)]
10pub struct MemoryMetricsCollector {
11    snapshots: Vec<MemorySnapshot>,
12    snapshot_interval: u32,
13}
14
15impl MemoryMetricsCollector {
16    /// Create a new metrics collector.
17    ///
18    /// # Arguments
19    ///
20    /// * `snapshot_interval` - Take snapshot every N iterations (0 = disabled)
21    #[must_use]
22    pub const fn new(snapshot_interval: u32) -> Self {
23        Self {
24            snapshots: Vec::new(),
25            snapshot_interval,
26        }
27    }
28
29    fn enforce_snapshot_limit(&mut self) {
30        if self.snapshots.len() > DEFAULT_MAX_SNAPSHOTS {
31            let excess = self.snapshots.len() - DEFAULT_MAX_SNAPSHOTS;
32            self.snapshots.drain(0..excess);
33        }
34    }
35
36    /// Record a snapshot if at snapshot interval.
37    pub fn maybe_record(&mut self, state: &crate::reducer::PipelineState) {
38        if self.snapshot_interval == 0 {
39            return;
40        }
41
42        // Treat iteration 0 as "pre-run" (initial state). Recording here is surprising
43        // and skews exported metrics since 0 is a multiple of any non-zero interval.
44        if state.iteration == 0 {
45            return;
46        }
47
48        if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
49            self.snapshots
50                .push(MemorySnapshot::from_pipeline_state(state));
51            self.enforce_snapshot_limit();
52        }
53    }
54
55    /// Get all recorded snapshots.
56    #[must_use]
57    pub fn snapshots(&self) -> &[MemorySnapshot] {
58        &self.snapshots
59    }
60
61    /// Export snapshots as JSON for external analysis.
62    ///
63    /// # Errors
64    ///
65    /// Returns error if the operation fails.
66    pub fn export_json(&self) -> serde_json::Result<String> {
67        serde_json::to_string_pretty(&self.snapshots)
68    }
69
70    /// Record a snapshot and send to telemetry backend.
71    pub fn record_and_emit(
72        &mut self,
73        state: &crate::reducer::PipelineState,
74        backend: &mut dyn TelemetryBackend,
75    ) {
76        if self.snapshot_interval == 0 {
77            return;
78        }
79
80        if state.iteration == 0 {
81            return;
82        }
83
84        if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
85            let snapshot = MemorySnapshot::from_pipeline_state(state);
86            backend.emit_snapshot(&snapshot);
87            self.snapshots.push(snapshot);
88            self.enforce_snapshot_limit();
89        }
90    }
91}