Skip to main content

palimpsest_dataflow/palimpsest/
metrics.rs

1//! Per-operator memory and lookup instrumentation.
2//!
3//! Operators report their arrangement size (in rows and bytes) and the
4//! upquery / TOAST counters surfaced by sibling modules. The collected
5//! state is exposed as a snapshot so tests, logging, and future Prometheus
6//! exporters share one shape.
7
8use std::collections::BTreeMap;
9
10use crate::palimpsest::toast::ToastStats;
11
12/// One operator's resource footprint.
13#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
14pub struct OperatorMemory {
15    /// Number of records currently materialized in the arrangement.
16    pub rows: usize,
17    /// Approximate memory footprint of the arrangement in bytes.
18    pub bytes: usize,
19}
20
21impl OperatorMemory {
22    /// Builds a record from `(rows, bytes)` measurements.
23    #[must_use]
24    pub const fn new(rows: usize, bytes: usize) -> Self {
25        Self { rows, bytes }
26    }
27}
28
29/// Upquery counters tracked per operator.
30#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
31pub struct UpqueryCounters {
32    /// Upqueries that hit the materialized arrangement.
33    pub hits: usize,
34    /// Upqueries that had to fetch from upstream.
35    pub fetches: usize,
36    /// Upqueries that found nothing.
37    pub misses: usize,
38}
39
40/// Snapshot of every counter tracked for a single operator.
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
42pub struct OperatorSnapshot {
43    /// Memory footprint.
44    pub memory: OperatorMemory,
45    /// Upquery counters.
46    pub upqueries: UpqueryCounters,
47    /// TOAST resolver counters.
48    pub toast: ToastStats,
49}
50
51/// Registry that aggregates per-operator metrics by canonical operator name.
52#[derive(Debug, Clone, Default)]
53pub struct Metrics {
54    operators: BTreeMap<String, OperatorSnapshot>,
55}
56
57impl Metrics {
58    /// Creates an empty metrics registry.
59    #[must_use]
60    pub const fn new() -> Self {
61        Self {
62            operators: BTreeMap::new(),
63        }
64    }
65
66    /// Reports a fresh memory measurement for `operator`.
67    pub fn record_memory(&mut self, operator: impl Into<String>, memory: OperatorMemory) {
68        self.operators.entry(operator.into()).or_default().memory = memory;
69    }
70
71    /// Reports an arrangement-hit upquery.
72    pub fn record_upquery_hit(&mut self, operator: impl Into<String>) {
73        let entry = self.operators.entry(operator.into()).or_default();
74        entry.upqueries.hits = entry.upqueries.hits.saturating_add(1);
75    }
76
77    /// Reports an upquery that had to fetch from upstream.
78    pub fn record_upquery_fetch(&mut self, operator: impl Into<String>) {
79        let entry = self.operators.entry(operator.into()).or_default();
80        entry.upqueries.fetches = entry.upqueries.fetches.saturating_add(1);
81    }
82
83    /// Reports an upquery that found nothing.
84    pub fn record_upquery_miss(&mut self, operator: impl Into<String>) {
85        let entry = self.operators.entry(operator.into()).or_default();
86        entry.upqueries.misses = entry.upqueries.misses.saturating_add(1);
87    }
88
89    /// Stores the latest `ToastStats` for `operator`.
90    pub fn record_toast(&mut self, operator: impl Into<String>, toast: ToastStats) {
91        self.operators.entry(operator.into()).or_default().toast = toast;
92    }
93
94    /// Returns the snapshot for `operator`, if recorded.
95    #[must_use]
96    pub fn snapshot(&self, operator: &str) -> Option<OperatorSnapshot> {
97        self.operators.get(operator).copied()
98    }
99
100    /// Iterates over `(operator, snapshot)` pairs in canonical order.
101    pub fn iter(&self) -> impl ExactSizeIterator<Item = (&str, &OperatorSnapshot)> {
102        self.operators
103            .iter()
104            .map(|(name, snapshot)| (name.as_str(), snapshot))
105    }
106
107    /// Returns the number of operators tracked.
108    #[must_use]
109    pub fn len(&self) -> usize {
110        self.operators.len()
111    }
112
113    /// Returns true when no operators have reported metrics.
114    #[must_use]
115    pub fn is_empty(&self) -> bool {
116        self.operators.is_empty()
117    }
118
119    /// Aggregates memory totals across every operator.
120    #[must_use]
121    pub fn total_memory(&self) -> OperatorMemory {
122        self.operators
123            .values()
124            .map(|snapshot| snapshot.memory)
125            .fold(OperatorMemory::default(), |acc, memory| OperatorMemory {
126                rows: acc.rows.saturating_add(memory.rows),
127                bytes: acc.bytes.saturating_add(memory.bytes),
128            })
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::{Metrics, OperatorMemory};
135    use crate::palimpsest::toast::ToastStats;
136
137    #[test]
138    fn record_memory_overwrites_previous_measurement() {
139        let mut metrics = Metrics::new();
140        metrics.record_memory("posts/filter", OperatorMemory::new(10, 256));
141        metrics.record_memory("posts/filter", OperatorMemory::new(15, 320));
142
143        assert_eq!(
144            metrics.snapshot("posts/filter").unwrap().memory,
145            OperatorMemory::new(15, 320)
146        );
147    }
148
149    #[test]
150    fn upquery_counters_are_per_operator_and_increment() {
151        let mut metrics = Metrics::new();
152        metrics.record_upquery_hit("posts/filter");
153        metrics.record_upquery_fetch("posts/filter");
154        metrics.record_upquery_miss("posts/filter");
155        metrics.record_upquery_hit("authors/filter");
156
157        let posts = metrics.snapshot("posts/filter").unwrap();
158        assert_eq!(posts.upqueries.hits, 1);
159        assert_eq!(posts.upqueries.fetches, 1);
160        assert_eq!(posts.upqueries.misses, 1);
161
162        let authors = metrics.snapshot("authors/filter").unwrap();
163        assert_eq!(authors.upqueries.hits, 1);
164    }
165
166    #[test]
167    fn record_toast_stores_last_snapshot() {
168        let mut metrics = Metrics::new();
169        metrics.record_toast(
170            "posts/filter",
171            ToastStats {
172                cached_hits: 4,
173                point_select_hits: 1,
174                misses: 0,
175            },
176        );
177
178        let stats = metrics.snapshot("posts/filter").unwrap().toast;
179        assert_eq!(stats.cached_hits, 4);
180        assert_eq!(stats.point_select_hits, 1);
181    }
182
183    #[test]
184    fn total_memory_sums_across_operators() {
185        let mut metrics = Metrics::new();
186        metrics.record_memory("a", OperatorMemory::new(1, 10));
187        metrics.record_memory("b", OperatorMemory::new(2, 20));
188        assert_eq!(metrics.total_memory(), OperatorMemory::new(3, 30));
189        assert_eq!(metrics.len(), 2);
190    }
191
192    #[test]
193    fn iter_yields_canonical_order() {
194        let mut metrics = Metrics::new();
195        metrics.record_memory("b", OperatorMemory::new(1, 10));
196        metrics.record_memory("a", OperatorMemory::new(2, 20));
197
198        let names: Vec<_> = metrics.iter().map(|(name, _)| name).collect();
199        assert_eq!(names, ["a", "b"]);
200    }
201}