palimpsest_dataflow/palimpsest/
metrics.rs1use std::collections::BTreeMap;
9
10use crate::palimpsest::toast::ToastStats;
11
12#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
14pub struct OperatorMemory {
15 pub rows: usize,
17 pub bytes: usize,
19}
20
21impl OperatorMemory {
22 #[must_use]
24 pub const fn new(rows: usize, bytes: usize) -> Self {
25 Self { rows, bytes }
26 }
27}
28
29#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
31pub struct UpqueryCounters {
32 pub hits: usize,
34 pub fetches: usize,
36 pub misses: usize,
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
42pub struct OperatorSnapshot {
43 pub memory: OperatorMemory,
45 pub upqueries: UpqueryCounters,
47 pub toast: ToastStats,
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct Metrics {
54 operators: BTreeMap<String, OperatorSnapshot>,
55}
56
57impl Metrics {
58 #[must_use]
60 pub const fn new() -> Self {
61 Self {
62 operators: BTreeMap::new(),
63 }
64 }
65
66 pub fn record_memory(&mut self, operator: impl Into<String>, memory: OperatorMemory) {
68 self.operators.entry(operator.into()).or_default().memory = memory;
69 }
70
71 pub fn record_upquery_hit(&mut self, operator: impl Into<String>) {
73 let entry = self.operators.entry(operator.into()).or_default();
74 entry.upqueries.hits = entry.upqueries.hits.saturating_add(1);
75 }
76
77 pub fn record_upquery_fetch(&mut self, operator: impl Into<String>) {
79 let entry = self.operators.entry(operator.into()).or_default();
80 entry.upqueries.fetches = entry.upqueries.fetches.saturating_add(1);
81 }
82
83 pub fn record_upquery_miss(&mut self, operator: impl Into<String>) {
85 let entry = self.operators.entry(operator.into()).or_default();
86 entry.upqueries.misses = entry.upqueries.misses.saturating_add(1);
87 }
88
89 pub fn record_toast(&mut self, operator: impl Into<String>, toast: ToastStats) {
91 self.operators.entry(operator.into()).or_default().toast = toast;
92 }
93
94 #[must_use]
96 pub fn snapshot(&self, operator: &str) -> Option<OperatorSnapshot> {
97 self.operators.get(operator).copied()
98 }
99
100 pub fn iter(&self) -> impl ExactSizeIterator<Item = (&str, &OperatorSnapshot)> {
102 self.operators
103 .iter()
104 .map(|(name, snapshot)| (name.as_str(), snapshot))
105 }
106
107 #[must_use]
109 pub fn len(&self) -> usize {
110 self.operators.len()
111 }
112
113 #[must_use]
115 pub fn is_empty(&self) -> bool {
116 self.operators.is_empty()
117 }
118
119 #[must_use]
121 pub fn total_memory(&self) -> OperatorMemory {
122 self.operators
123 .values()
124 .map(|snapshot| snapshot.memory)
125 .fold(OperatorMemory::default(), |acc, memory| OperatorMemory {
126 rows: acc.rows.saturating_add(memory.rows),
127 bytes: acc.bytes.saturating_add(memory.bytes),
128 })
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::{Metrics, OperatorMemory};
135 use crate::palimpsest::toast::ToastStats;
136
137 #[test]
138 fn record_memory_overwrites_previous_measurement() {
139 let mut metrics = Metrics::new();
140 metrics.record_memory("posts/filter", OperatorMemory::new(10, 256));
141 metrics.record_memory("posts/filter", OperatorMemory::new(15, 320));
142
143 assert_eq!(
144 metrics.snapshot("posts/filter").unwrap().memory,
145 OperatorMemory::new(15, 320)
146 );
147 }
148
149 #[test]
150 fn upquery_counters_are_per_operator_and_increment() {
151 let mut metrics = Metrics::new();
152 metrics.record_upquery_hit("posts/filter");
153 metrics.record_upquery_fetch("posts/filter");
154 metrics.record_upquery_miss("posts/filter");
155 metrics.record_upquery_hit("authors/filter");
156
157 let posts = metrics.snapshot("posts/filter").unwrap();
158 assert_eq!(posts.upqueries.hits, 1);
159 assert_eq!(posts.upqueries.fetches, 1);
160 assert_eq!(posts.upqueries.misses, 1);
161
162 let authors = metrics.snapshot("authors/filter").unwrap();
163 assert_eq!(authors.upqueries.hits, 1);
164 }
165
166 #[test]
167 fn record_toast_stores_last_snapshot() {
168 let mut metrics = Metrics::new();
169 metrics.record_toast(
170 "posts/filter",
171 ToastStats {
172 cached_hits: 4,
173 point_select_hits: 1,
174 misses: 0,
175 },
176 );
177
178 let stats = metrics.snapshot("posts/filter").unwrap().toast;
179 assert_eq!(stats.cached_hits, 4);
180 assert_eq!(stats.point_select_hits, 1);
181 }
182
183 #[test]
184 fn total_memory_sums_across_operators() {
185 let mut metrics = Metrics::new();
186 metrics.record_memory("a", OperatorMemory::new(1, 10));
187 metrics.record_memory("b", OperatorMemory::new(2, 20));
188 assert_eq!(metrics.total_memory(), OperatorMemory::new(3, 30));
189 assert_eq!(metrics.len(), 2);
190 }
191
192 #[test]
193 fn iter_yields_canonical_order() {
194 let mut metrics = Metrics::new();
195 metrics.record_memory("b", OperatorMemory::new(1, 10));
196 metrics.record_memory("a", OperatorMemory::new(2, 20));
197
198 let names: Vec<_> = metrics.iter().map(|(name, _)| name).collect();
199 assert_eq!(names, ["a", "b"]);
200 }
201}