ralph_workflow/monitoring/memory_metrics/
collector.rs1use super::backends::TelemetryBackend;
4use super::snapshot::MemorySnapshot;
5
6const DEFAULT_MAX_SNAPSHOTS: usize = 1024;
7
8#[derive(Debug, Clone)]
10pub struct MemoryMetricsCollector {
11 snapshots: Vec<MemorySnapshot>,
12 snapshot_interval: u32,
13}
14
15impl MemoryMetricsCollector {
16 #[must_use]
22 pub const fn new(snapshot_interval: u32) -> Self {
23 Self {
24 snapshots: Vec::new(),
25 snapshot_interval,
26 }
27 }
28
29 fn apply_snapshot_limit(snapshots: Vec<MemorySnapshot>) -> Vec<MemorySnapshot> {
30 if snapshots.len() > DEFAULT_MAX_SNAPSHOTS {
31 let skip_count = snapshots.len() - DEFAULT_MAX_SNAPSHOTS;
32 snapshots.into_iter().skip(skip_count).collect()
33 } else {
34 snapshots
35 }
36 }
37
38 #[must_use]
40 pub fn maybe_record(&self, state: &crate::reducer::PipelineState) -> Self {
41 if self.snapshot_interval == 0 {
42 return self.clone();
43 }
44
45 if state.iteration == 0 {
46 return self.clone();
47 }
48
49 if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
50 let snapshots = self
51 .snapshots
52 .clone()
53 .into_iter()
54 .chain(std::iter::once(MemorySnapshot::from_pipeline_state(state)))
55 .collect();
56 let snapshots = Self::apply_snapshot_limit(snapshots);
57 Self {
58 snapshots,
59 snapshot_interval: self.snapshot_interval,
60 }
61 } else {
62 self.clone()
63 }
64 }
65
66 #[must_use]
68 pub fn snapshots(&self) -> &[MemorySnapshot] {
69 &self.snapshots
70 }
71
72 pub fn export_json(&self) -> serde_json::Result<String> {
78 serde_json::to_string_pretty(&self.snapshots)
79 }
80
81 #[must_use]
83 pub fn record_and_emit(
84 &self,
85 state: &crate::reducer::PipelineState,
86 backend: &dyn TelemetryBackend,
87 ) -> Self {
88 if self.snapshot_interval == 0 {
89 return self.clone();
90 }
91
92 if state.iteration == 0 {
93 return self.clone();
94 }
95
96 if state.iteration == 1 || state.iteration.is_multiple_of(self.snapshot_interval) {
97 let snapshot = MemorySnapshot::from_pipeline_state(state);
98 backend.emit_snapshot(&snapshot);
99 let snapshots = self
100 .snapshots
101 .clone()
102 .into_iter()
103 .chain(std::iter::once(snapshot))
104 .collect();
105 let snapshots = Self::apply_snapshot_limit(snapshots);
106 Self {
107 snapshots,
108 snapshot_interval: self.snapshot_interval,
109 }
110 } else {
111 self.clone()
112 }
113 }
114}