Skip to main content

ralph_workflow/monitoring/memory_metrics/
collector.rs

1// MemoryMetricsCollector: collects and stores memory snapshots during pipeline execution.
2
3use super::backends::TelemetryBackend;
4use super::snapshot::MemorySnapshot;
5
6const DEFAULT_MAX_SNAPSHOTS: usize = 1024;
7
8/// Memory metrics collector for pipeline execution.
9#[derive(Debug, Clone)]
10pub struct MemoryMetricsCollector {
11    snapshots: Vec<MemorySnapshot>,
12    snapshot_interval: u32,
13}
14
15impl MemoryMetricsCollector {
16    /// Create a new metrics collector.
17    ///
18    /// # Arguments
19    ///
20    /// * `snapshot_interval` - Take snapshot every N iterations (0 = disabled)
21    #[must_use]
22    pub const fn new(snapshot_interval: u32) -> Self {
23        Self {
24            snapshots: Vec::new(),
25            snapshot_interval,
26        }
27    }
28
29    fn apply_snapshot_limit(snapshots: Vec<MemorySnapshot>) -> Vec<MemorySnapshot> {
30        if snapshots.len() > DEFAULT_MAX_SNAPSHOTS {
31            let skip_count = snapshots.len() - DEFAULT_MAX_SNAPSHOTS;
32            snapshots.into_iter().skip(skip_count).collect()
33        } else {
34            snapshots
35        }
36    }
37
38    /// Record a snapshot if at snapshot interval.
39    #[must_use]
40    pub fn maybe_record(&self, state: &crate::reducer::PipelineState) -> Self {
41        if self.snapshot_interval == 0 {
42            return self.clone();
43        }
44
45        if state.iteration == 0 {
46            return self.clone();
47        }
48
49        if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
50            let snapshots = self
51                .snapshots
52                .clone()
53                .into_iter()
54                .chain(std::iter::once(MemorySnapshot::from_pipeline_state(state)))
55                .collect();
56            let snapshots = Self::apply_snapshot_limit(snapshots);
57            Self {
58                snapshots,
59                snapshot_interval: self.snapshot_interval,
60            }
61        } else {
62            self.clone()
63        }
64    }
65
66    /// Get all recorded snapshots.
67    #[must_use]
68    pub fn snapshots(&self) -> &[MemorySnapshot] {
69        &self.snapshots
70    }
71
72    /// Export snapshots as JSON for external analysis.
73    ///
74    /// # Errors
75    ///
76    /// Returns error if the operation fails.
77    pub fn export_json(&self) -> serde_json::Result<String> {
78        serde_json::to_string_pretty(&self.snapshots)
79    }
80
81    /// Record a snapshot and send to telemetry backend.
82    #[must_use]
83    pub fn record_and_emit(
84        &self,
85        state: &crate::reducer::PipelineState,
86        backend: &dyn TelemetryBackend,
87    ) -> Self {
88        if self.snapshot_interval == 0 {
89            return self.clone();
90        }
91
92        if state.iteration == 0 {
93            return self.clone();
94        }
95
96        if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
97            let snapshot = MemorySnapshot::from_pipeline_state(state);
98            backend.emit_snapshot(&snapshot);
99            let snapshots = self
100                .snapshots
101                .clone()
102                .into_iter()
103                .chain(std::iter::once(snapshot))
104                .collect();
105            let snapshots = Self::apply_snapshot_limit(snapshots);
106            Self {
107                snapshots,
108                snapshot_interval: self.snapshot_interval,
109            }
110        } else {
111            self.clone()
112        }
113    }
114}