Skip to main content

reddb_server/storage/timeseries/
continuous_aggregate.rs

1//! Continuous aggregates — incremental time-bucket materialisations.
2//!
3//! A continuous aggregate keeps the result of
4//!
5//! ```sql
6//! SELECT time_bucket('5m', ts) AS bucket,
7//!        <aggs...>
8//! FROM metrics
9//! GROUP BY bucket;
10//! ```
11//!
12//! materialised so dashboards never re-scan the parent chunks.
13//! Refresh is **incremental**: the daemon tracks
14//! `last_refreshed_bucket`, reads only rows whose bucket is ≥ that
15//! watermark, and appends / upserts the new buckets. Old buckets
16//! stay immutable — matches the contract users expect from
17//! Timescale's `continuous_aggregate` + `refresh_continuous_aggregate`.
18//!
19//! This module owns:
20//! * [`ContinuousAggregateSpec`] — declarative definition
21//!   (`CREATE CONTINUOUS AGGREGATE`)
22//! * [`ContinuousAggregateState`] — the materialised bucket map +
23//!   watermark
24//! * [`ContinuousAggregateEngine`] — in-memory registry + refresh
25//!   driver that accepts a source-scan callback
26//!
27//! Physical storage + SQL dispatch wire in during the sprint that
28//! follows (needs the `MaterializedViewCache` extension for the
29//! `TimeWindow` refresh policy); tests here pin the refresh
30//! arithmetic end-to-end.
31
32use std::collections::{BTreeMap, HashMap};
33use std::sync::{Arc, Mutex};
34
35use super::aggregation::AggregationType;
36use super::retention::parse_duration_ns;
37
38/// Column to aggregate + aggregation type. Matches the
39/// `AggregationType` surface the time-bucket code already uses.
40#[derive(Debug, Clone)]
41pub struct ContinuousAggregateColumn {
42    pub alias: String,
43    pub source_column: String,
44    pub agg: AggregationType,
45}
46
47#[derive(Debug, Clone)]
48pub struct ContinuousAggregateSpec {
49    pub name: String,
50    /// Source time-series / hypertable this aggregate reads from.
51    pub source: String,
52    /// Size of the time bucket in nanoseconds.
53    pub bucket_size_ns: u64,
54    /// Aggregated columns.
55    pub columns: Vec<ContinuousAggregateColumn>,
56    /// Lag (ns) between now() and the newest bucket the refresh
57    /// daemon is willing to materialise. Matches Timescale's
58    /// `start_offset` — stops us from materialising a bucket whose
59    /// source rows are still landing.
60    pub refresh_lag_ns: u64,
61    /// Maximum span (ns) a single refresh will materialise at once.
62    /// Timescale calls this `max_interval_per_job`.
63    pub max_interval_per_job_ns: u64,
64}
65
66impl ContinuousAggregateSpec {
67    /// Convenience constructor from string durations.
68    pub fn from_durations(
69        name: impl Into<String>,
70        source: impl Into<String>,
71        bucket: &str,
72        columns: Vec<ContinuousAggregateColumn>,
73        refresh_lag: &str,
74        max_interval_per_job: &str,
75    ) -> Option<Self> {
76        Some(Self {
77            name: name.into(),
78            source: source.into(),
79            bucket_size_ns: parse_duration_ns(bucket)?.max(1),
80            columns,
81            refresh_lag_ns: parse_duration_ns(refresh_lag).unwrap_or(0),
82            max_interval_per_job_ns: parse_duration_ns(max_interval_per_job).unwrap_or(u64::MAX),
83        })
84    }
85
86    /// Align timestamp to bucket floor.
87    pub fn bucket_start(&self, ts_ns: u64) -> u64 {
88        (ts_ns / self.bucket_size_ns) * self.bucket_size_ns
89    }
90
91    pub fn bucket_end_exclusive(&self, ts_ns: u64) -> u64 {
92        self.bucket_start(ts_ns).saturating_add(self.bucket_size_ns)
93    }
94}
95
96/// Per-bucket aggregator state. Stores the intermediate state each
97/// aggregation type needs to combine additional rows when the
98/// refresh daemon picks up where it left off.
99#[derive(Debug, Clone, Default)]
100pub struct BucketState {
101    pub count: u64,
102    pub sum: f64,
103    pub min: f64,
104    pub max: f64,
105    pub first: Option<f64>,
106    pub last: Option<f64>,
107    pub any_observed: bool,
108}
109
110impl BucketState {
111    pub fn new() -> Self {
112        Self {
113            min: f64::INFINITY,
114            max: f64::NEG_INFINITY,
115            ..Self::default()
116        }
117    }
118
119    pub fn observe(&mut self, value: f64) {
120        if !value.is_finite() {
121            return;
122        }
123        self.count += 1;
124        self.sum += value;
125        if value < self.min {
126            self.min = value;
127        }
128        if value > self.max {
129            self.max = value;
130        }
131        if self.first.is_none() {
132            self.first = Some(value);
133        }
134        self.last = Some(value);
135        self.any_observed = true;
136    }
137
138    pub fn merge(&mut self, other: &BucketState) {
139        if !other.any_observed {
140            return;
141        }
142        self.count += other.count;
143        self.sum += other.sum;
144        if other.min < self.min {
145            self.min = other.min;
146        }
147        if other.max > self.max {
148            self.max = other.max;
149        }
150        if self.first.is_none() {
151            self.first = other.first;
152        }
153        if other.last.is_some() {
154            self.last = other.last;
155        }
156        self.any_observed = true;
157    }
158
159    pub fn value(&self, agg: AggregationType) -> f64 {
160        if !self.any_observed {
161            return 0.0;
162        }
163        match agg {
164            AggregationType::Count => self.count as f64,
165            AggregationType::Sum => self.sum,
166            AggregationType::Avg => {
167                if self.count == 0 {
168                    0.0
169                } else {
170                    self.sum / self.count as f64
171                }
172            }
173            AggregationType::Min => self.min,
174            AggregationType::Max => self.max,
175            AggregationType::First => self.first.unwrap_or(0.0),
176            AggregationType::Last => self.last.unwrap_or(0.0),
177        }
178    }
179}
180
181/// Materialised bucket keyed by bucket-start timestamp.
182#[derive(Debug, Clone, Default)]
183pub struct ContinuousAggregateState {
184    /// Per-alias per-bucket state.
185    buckets: BTreeMap<u64, HashMap<String, BucketState>>,
186    /// Every bucket ≤ this watermark is considered "closed and
187    /// materialised". Starts at 0 so the first refresh processes
188    /// everything.
189    last_refreshed_bucket_ns: u64,
190}
191
192impl ContinuousAggregateState {
193    pub fn new() -> Self {
194        Self::default()
195    }
196
197    pub fn last_refreshed_bucket_ns(&self) -> u64 {
198        self.last_refreshed_bucket_ns
199    }
200
201    pub fn bucket_count(&self) -> usize {
202        self.buckets.len()
203    }
204
205    /// Lookup a single bucket's value for the given alias. Returns
206    /// `None` when the bucket has not been materialised yet or the
207    /// alias is unknown.
208    pub fn query(&self, bucket_start_ns: u64, alias: &str, agg: AggregationType) -> Option<f64> {
209        self.buckets
210            .get(&bucket_start_ns)
211            .and_then(|row| row.get(alias))
212            .map(|state| state.value(agg))
213    }
214
215    /// List every materialised bucket in ascending order.
216    pub fn buckets(&self) -> Vec<u64> {
217        self.buckets.keys().copied().collect()
218    }
219}
220
221/// A row emitted by the source-scan callback during refresh.
222#[derive(Debug, Clone)]
223pub struct RefreshPoint {
224    pub ts_ns: u64,
225    /// `alias → value`. Allows one pass to feed every aggregate.
226    pub values: HashMap<String, f64>,
227}
228
229/// Source-scan callback the engine uses to stream rows from the
230/// parent during refresh. Receiving the `[start, end)` window lets
231/// the callback restrict its scan — the core optimisation that
232/// makes refreshes incremental rather than full.
233pub type ContinuousAggregateSource = Arc<dyn Fn(&str, u64, u64) -> Vec<RefreshPoint> + Send + Sync>;
234
235#[derive(Clone)]
236pub struct ContinuousAggregateEngine {
237    inner: Arc<Mutex<EngineInner>>,
238}
239
240struct EngineInner {
241    specs: HashMap<String, ContinuousAggregateSpec>,
242    states: HashMap<String, ContinuousAggregateState>,
243}
244
245impl std::fmt::Debug for ContinuousAggregateEngine {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        let guard = match self.inner.lock() {
248            Ok(g) => g,
249            Err(p) => p.into_inner(),
250        };
251        f.debug_struct("ContinuousAggregateEngine")
252            .field("aggregates", &guard.specs.len())
253            .finish()
254    }
255}
256
257impl ContinuousAggregateEngine {
258    pub fn new() -> Self {
259        Self {
260            inner: Arc::new(Mutex::new(EngineInner {
261                specs: HashMap::new(),
262                states: HashMap::new(),
263            })),
264        }
265    }
266
267    pub fn register(&self, spec: ContinuousAggregateSpec) {
268        let mut guard = match self.inner.lock() {
269            Ok(g) => g,
270            Err(p) => p.into_inner(),
271        };
272        guard.specs.insert(spec.name.clone(), spec.clone());
273        guard
274            .states
275            .entry(spec.name.clone())
276            .or_insert_with(ContinuousAggregateState::new);
277    }
278
279    pub fn drop_aggregate(&self, name: &str) {
280        let mut guard = match self.inner.lock() {
281            Ok(g) => g,
282            Err(p) => p.into_inner(),
283        };
284        guard.specs.remove(name);
285        guard.states.remove(name);
286    }
287
288    pub fn list(&self) -> Vec<ContinuousAggregateSpec> {
289        let guard = match self.inner.lock() {
290            Ok(g) => g,
291            Err(p) => p.into_inner(),
292        };
293        guard.specs.values().cloned().collect()
294    }
295
296    pub fn state(&self, name: &str) -> Option<ContinuousAggregateState> {
297        let guard = match self.inner.lock() {
298            Ok(g) => g,
299            Err(p) => p.into_inner(),
300        };
301        guard.states.get(name).cloned()
302    }
303
304    /// Refresh a single aggregate: consult `now_ns`, compute the
305    /// `[start, end)` window that can safely land (bounded by
306    /// `refresh_lag_ns` and `max_interval_per_job_ns`), call the
307    /// source callback, and fold the returned points into the
308    /// materialised buckets. Returns the number of points absorbed.
309    pub fn refresh(&self, name: &str, now_ns: u64, source: &ContinuousAggregateSource) -> u64 {
310        let spec = match self.get_spec(name) {
311            Some(s) => s,
312            None => return 0,
313        };
314        let state_snapshot = self.get_state(name).unwrap_or_default();
315
316        // End of the window: `now - refresh_lag`, aligned to bucket.
317        let latest_safe = now_ns.saturating_sub(spec.refresh_lag_ns);
318        let end_bucket = spec.bucket_start(latest_safe);
319        let start_bucket = state_snapshot.last_refreshed_bucket_ns;
320
321        if end_bucket <= start_bucket {
322            return 0;
323        }
324
325        // Cap by max_interval_per_job so no single refresh runs
326        // unbounded when the aggregate has been idle for ages.
327        let max_span = spec.max_interval_per_job_ns;
328        let end_bucket = if end_bucket.saturating_sub(start_bucket) > max_span {
329            start_bucket.saturating_add(max_span)
330        } else {
331            end_bucket
332        };
333
334        let points = source(&spec.source, start_bucket, end_bucket);
335        let absorbed = points.len() as u64;
336
337        // Apply.
338        let mut guard = match self.inner.lock() {
339            Ok(g) => g,
340            Err(p) => p.into_inner(),
341        };
342        let state = guard
343            .states
344            .entry(name.to_string())
345            .or_insert_with(ContinuousAggregateState::new);
346
347        for point in points {
348            if point.ts_ns < start_bucket || point.ts_ns >= end_bucket {
349                continue; // callback may return out-of-window rows; ignore
350            }
351            let bucket_start = spec.bucket_start(point.ts_ns);
352            let row = state
353                .buckets
354                .entry(bucket_start)
355                .or_insert_with(HashMap::new);
356            for col in &spec.columns {
357                if let Some(value) = point.values.get(&col.alias) {
358                    row.entry(col.alias.clone())
359                        .or_insert_with(BucketState::new)
360                        .observe(*value);
361                }
362            }
363        }
364        state.last_refreshed_bucket_ns = end_bucket;
365        absorbed
366    }
367
368    fn get_spec(&self, name: &str) -> Option<ContinuousAggregateSpec> {
369        let guard = match self.inner.lock() {
370            Ok(g) => g,
371            Err(p) => p.into_inner(),
372        };
373        guard.specs.get(name).cloned()
374    }
375
376    fn get_state(&self, name: &str) -> Option<ContinuousAggregateState> {
377        let guard = match self.inner.lock() {
378            Ok(g) => g,
379            Err(p) => p.into_inner(),
380        };
381        guard.states.get(name).cloned()
382    }
383}
384
385impl Default for ContinuousAggregateEngine {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    const MINUTE: u64 = 60_000_000_000;
396    const HOUR: u64 = 60 * MINUTE;
397
398    fn spec() -> ContinuousAggregateSpec {
399        ContinuousAggregateSpec {
400            name: "five_min_load".into(),
401            source: "metrics".into(),
402            bucket_size_ns: 5 * MINUTE,
403            columns: vec![
404                ContinuousAggregateColumn {
405                    alias: "avg_load".into(),
406                    source_column: "load".into(),
407                    agg: AggregationType::Avg,
408                },
409                ContinuousAggregateColumn {
410                    alias: "max_load".into(),
411                    source_column: "load".into(),
412                    agg: AggregationType::Max,
413                },
414            ],
415            refresh_lag_ns: 0,
416            max_interval_per_job_ns: u64::MAX,
417        }
418    }
419
420    fn points(values: Vec<(u64, f64)>) -> ContinuousAggregateSource {
421        Arc::new(move |_source, start, end| {
422            values
423                .iter()
424                .filter(|(ts, _)| *ts >= start && *ts < end)
425                .map(|(ts, v)| {
426                    let mut map = HashMap::new();
427                    map.insert("avg_load".to_string(), *v);
428                    map.insert("max_load".to_string(), *v);
429                    RefreshPoint {
430                        ts_ns: *ts,
431                        values: map,
432                    }
433                })
434                .collect()
435        })
436    }
437
438    #[test]
439    fn refresh_fills_buckets_until_now_minus_lag() {
440        let engine = ContinuousAggregateEngine::new();
441        engine.register(spec());
442        let source = points(vec![
443            (0, 10.0),
444            (MINUTE, 20.0),
445            (5 * MINUTE, 5.0),
446            (6 * MINUTE, 15.0),
447        ]);
448        let absorbed = engine.refresh("five_min_load", 15 * MINUTE, &source);
449        assert_eq!(absorbed, 4);
450        let state = engine.state("five_min_load").unwrap();
451        let buckets = state.buckets();
452        assert_eq!(buckets, vec![0, 5 * MINUTE]);
453        assert!((state.query(0, "avg_load", AggregationType::Avg).unwrap() - 15.0).abs() < 1e-9);
454        assert_eq!(
455            state.query(0, "max_load", AggregationType::Max).unwrap(),
456            20.0
457        );
458        assert_eq!(
459            state
460                .query(5 * MINUTE, "max_load", AggregationType::Max)
461                .unwrap(),
462            15.0
463        );
464    }
465
466    #[test]
467    fn refresh_is_incremental_across_two_calls() {
468        let engine = ContinuousAggregateEngine::new();
469        engine.register(spec());
470
471        // First batch lives in the first bucket (0..5m).
472        let source1 = points(vec![(MINUTE, 10.0), (2 * MINUTE, 20.0)]);
473        engine.refresh("five_min_load", 5 * MINUTE, &source1);
474
475        // Second batch appends to a new bucket (5m..10m) without
476        // reprocessing the first.
477        let source2 = points(vec![(6 * MINUTE, 100.0), (7 * MINUTE, 50.0)]);
478        engine.refresh("five_min_load", 10 * MINUTE, &source2);
479
480        let state = engine.state("five_min_load").unwrap();
481        assert_eq!(state.bucket_count(), 2);
482        assert_eq!(
483            state
484                .query(5 * MINUTE, "avg_load", AggregationType::Avg)
485                .unwrap(),
486            75.0
487        );
488    }
489
490    #[test]
491    fn refresh_respects_lag_window() {
492        let engine = ContinuousAggregateEngine::new();
493        let mut s = spec();
494        s.refresh_lag_ns = 10 * MINUTE;
495        engine.register(s);
496        let source = points(vec![
497            (0, 1.0),
498            (MINUTE, 2.0),
499            (5 * MINUTE, 3.0),
500            (8 * MINUTE, 4.0),
501        ]);
502        // now = 12m, lag = 10m ⇒ window ends at 12m - 10m = 2m → bucket 0.
503        let absorbed = engine.refresh("five_min_load", 12 * MINUTE, &source);
504        // Points with ts in [0, 0) are all filtered.
505        assert_eq!(absorbed, 0);
506    }
507
508    #[test]
509    fn refresh_caps_work_per_job() {
510        let engine = ContinuousAggregateEngine::new();
511        let mut s = spec();
512        s.max_interval_per_job_ns = 5 * MINUTE;
513        engine.register(s);
514        let source = points(vec![(0, 1.0), (5 * MINUTE, 2.0), (10 * MINUTE, 3.0)]);
515        // Single refresh should only chew through 5 minutes of
516        // buckets (0..5m), leaving the 5m/10m buckets for the next
517        // cycle.
518        engine.refresh("five_min_load", HOUR, &source);
519        let state = engine.state("five_min_load").unwrap();
520        assert_eq!(state.bucket_count(), 1);
521        assert_eq!(state.last_refreshed_bucket_ns(), 5 * MINUTE);
522    }
523
524    #[test]
525    fn refresh_of_unknown_aggregate_is_a_noop() {
526        let engine = ContinuousAggregateEngine::new();
527        let source: ContinuousAggregateSource = Arc::new(|_, _, _| Vec::new());
528        assert_eq!(engine.refresh("does_not_exist", 0, &source), 0);
529    }
530
531    #[test]
532    fn bucket_state_merges_cumulative_counts() {
533        let mut a = BucketState::new();
534        a.observe(1.0);
535        a.observe(3.0);
536        let mut b = BucketState::new();
537        b.observe(5.0);
538        a.merge(&b);
539        assert_eq!(a.count, 3);
540        assert_eq!(a.sum, 9.0);
541        assert_eq!(a.min, 1.0);
542        assert_eq!(a.max, 5.0);
543    }
544
545    #[test]
546    fn spec_from_durations_parses_intervals() {
547        let spec = ContinuousAggregateSpec::from_durations(
548            "hourly",
549            "metrics",
550            "1h",
551            vec![ContinuousAggregateColumn {
552                alias: "c".into(),
553                source_column: "v".into(),
554                agg: AggregationType::Count,
555            }],
556            "5m",
557            "1d",
558        )
559        .unwrap();
560        assert_eq!(spec.bucket_size_ns, HOUR);
561        assert_eq!(spec.refresh_lag_ns, 5 * MINUTE);
562    }
563}