ralph_workflow/monitoring/memory_metrics/
collector.rs1use super::backends::TelemetryBackend;
4use super::snapshot::MemorySnapshot;
5
6const DEFAULT_MAX_SNAPSHOTS: usize = 1024;
7
8#[derive(Debug)]
10pub struct MemoryMetricsCollector {
11 snapshots: Vec<MemorySnapshot>,
12 snapshot_interval: u32,
13}
14
15impl MemoryMetricsCollector {
16 #[must_use]
22 pub const fn new(snapshot_interval: u32) -> Self {
23 Self {
24 snapshots: Vec::new(),
25 snapshot_interval,
26 }
27 }
28
29 fn enforce_snapshot_limit(&mut self) {
30 if self.snapshots.len() > DEFAULT_MAX_SNAPSHOTS {
31 let excess = self.snapshots.len() - DEFAULT_MAX_SNAPSHOTS;
32 self.snapshots.drain(0..excess);
33 }
34 }
35
36 pub fn maybe_record(&mut self, state: &crate::reducer::PipelineState) {
38 if self.snapshot_interval == 0 {
39 return;
40 }
41
42 if state.iteration == 0 {
45 return;
46 }
47
48 if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
49 self.snapshots
50 .push(MemorySnapshot::from_pipeline_state(state));
51 self.enforce_snapshot_limit();
52 }
53 }
54
55 #[must_use]
57 pub fn snapshots(&self) -> &[MemorySnapshot] {
58 &self.snapshots
59 }
60
61 pub fn export_json(&self) -> serde_json::Result<String> {
67 serde_json::to_string_pretty(&self.snapshots)
68 }
69
70 pub fn record_and_emit(
72 &mut self,
73 state: &crate::reducer::PipelineState,
74 backend: &mut dyn TelemetryBackend,
75 ) {
76 if self.snapshot_interval == 0 {
77 return;
78 }
79
80 if state.iteration == 0 {
81 return;
82 }
83
84 if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
85 let snapshot = MemorySnapshot::from_pipeline_state(state);
86 backend.emit_snapshot(&snapshot);
87 self.snapshots.push(snapshot);
88 self.enforce_snapshot_limit();
89 }
90 }
91}