1use std::collections::{BTreeMap, HashMap};
33use std::sync::{Arc, Mutex};
34
35use super::aggregation::AggregationType;
36use super::retention::parse_duration_ns;
37
38#[derive(Debug, Clone)]
41pub struct ContinuousAggregateColumn {
42 pub alias: String,
43 pub source_column: String,
44 pub agg: AggregationType,
45}
46
47#[derive(Debug, Clone)]
48pub struct ContinuousAggregateSpec {
49 pub name: String,
50 pub source: String,
52 pub bucket_size_ns: u64,
54 pub columns: Vec<ContinuousAggregateColumn>,
56 pub refresh_lag_ns: u64,
61 pub max_interval_per_job_ns: u64,
64}
65
66impl ContinuousAggregateSpec {
67 pub fn from_durations(
69 name: impl Into<String>,
70 source: impl Into<String>,
71 bucket: &str,
72 columns: Vec<ContinuousAggregateColumn>,
73 refresh_lag: &str,
74 max_interval_per_job: &str,
75 ) -> Option<Self> {
76 Some(Self {
77 name: name.into(),
78 source: source.into(),
79 bucket_size_ns: parse_duration_ns(bucket)?.max(1),
80 columns,
81 refresh_lag_ns: parse_duration_ns(refresh_lag).unwrap_or(0),
82 max_interval_per_job_ns: parse_duration_ns(max_interval_per_job).unwrap_or(u64::MAX),
83 })
84 }
85
86 pub fn bucket_start(&self, ts_ns: u64) -> u64 {
88 (ts_ns / self.bucket_size_ns) * self.bucket_size_ns
89 }
90
91 pub fn bucket_end_exclusive(&self, ts_ns: u64) -> u64 {
92 self.bucket_start(ts_ns).saturating_add(self.bucket_size_ns)
93 }
94}
95
96#[derive(Debug, Clone, Default)]
100pub struct BucketState {
101 pub count: u64,
102 pub sum: f64,
103 pub min: f64,
104 pub max: f64,
105 pub first: Option<f64>,
106 pub last: Option<f64>,
107 pub any_observed: bool,
108}
109
110impl BucketState {
111 pub fn new() -> Self {
112 Self {
113 min: f64::INFINITY,
114 max: f64::NEG_INFINITY,
115 ..Self::default()
116 }
117 }
118
119 pub fn observe(&mut self, value: f64) {
120 if !value.is_finite() {
121 return;
122 }
123 self.count += 1;
124 self.sum += value;
125 if value < self.min {
126 self.min = value;
127 }
128 if value > self.max {
129 self.max = value;
130 }
131 if self.first.is_none() {
132 self.first = Some(value);
133 }
134 self.last = Some(value);
135 self.any_observed = true;
136 }
137
138 pub fn merge(&mut self, other: &BucketState) {
139 if !other.any_observed {
140 return;
141 }
142 self.count += other.count;
143 self.sum += other.sum;
144 if other.min < self.min {
145 self.min = other.min;
146 }
147 if other.max > self.max {
148 self.max = other.max;
149 }
150 if self.first.is_none() {
151 self.first = other.first;
152 }
153 if other.last.is_some() {
154 self.last = other.last;
155 }
156 self.any_observed = true;
157 }
158
159 pub fn value(&self, agg: AggregationType) -> f64 {
160 if !self.any_observed {
161 return 0.0;
162 }
163 match agg {
164 AggregationType::Count => self.count as f64,
165 AggregationType::Sum => self.sum,
166 AggregationType::Avg => {
167 if self.count == 0 {
168 0.0
169 } else {
170 self.sum / self.count as f64
171 }
172 }
173 AggregationType::Min => self.min,
174 AggregationType::Max => self.max,
175 AggregationType::First => self.first.unwrap_or(0.0),
176 AggregationType::Last => self.last.unwrap_or(0.0),
177 }
178 }
179}
180
181#[derive(Debug, Clone, Default)]
183pub struct ContinuousAggregateState {
184 buckets: BTreeMap<u64, HashMap<String, BucketState>>,
186 last_refreshed_bucket_ns: u64,
190}
191
192impl ContinuousAggregateState {
193 pub fn new() -> Self {
194 Self::default()
195 }
196
197 pub fn last_refreshed_bucket_ns(&self) -> u64 {
198 self.last_refreshed_bucket_ns
199 }
200
201 pub fn bucket_count(&self) -> usize {
202 self.buckets.len()
203 }
204
205 pub fn query(&self, bucket_start_ns: u64, alias: &str, agg: AggregationType) -> Option<f64> {
209 self.buckets
210 .get(&bucket_start_ns)
211 .and_then(|row| row.get(alias))
212 .map(|state| state.value(agg))
213 }
214
215 pub fn buckets(&self) -> Vec<u64> {
217 self.buckets.keys().copied().collect()
218 }
219}
220
221#[derive(Debug, Clone)]
223pub struct RefreshPoint {
224 pub ts_ns: u64,
225 pub values: HashMap<String, f64>,
227}
228
229pub type ContinuousAggregateSource = Arc<dyn Fn(&str, u64, u64) -> Vec<RefreshPoint> + Send + Sync>;
234
235#[derive(Clone)]
236pub struct ContinuousAggregateEngine {
237 inner: Arc<Mutex<EngineInner>>,
238}
239
240struct EngineInner {
241 specs: HashMap<String, ContinuousAggregateSpec>,
242 states: HashMap<String, ContinuousAggregateState>,
243}
244
245impl std::fmt::Debug for ContinuousAggregateEngine {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 let guard = match self.inner.lock() {
248 Ok(g) => g,
249 Err(p) => p.into_inner(),
250 };
251 f.debug_struct("ContinuousAggregateEngine")
252 .field("aggregates", &guard.specs.len())
253 .finish()
254 }
255}
256
257impl ContinuousAggregateEngine {
258 pub fn new() -> Self {
259 Self {
260 inner: Arc::new(Mutex::new(EngineInner {
261 specs: HashMap::new(),
262 states: HashMap::new(),
263 })),
264 }
265 }
266
267 pub fn register(&self, spec: ContinuousAggregateSpec) {
268 let mut guard = match self.inner.lock() {
269 Ok(g) => g,
270 Err(p) => p.into_inner(),
271 };
272 guard.specs.insert(spec.name.clone(), spec.clone());
273 guard
274 .states
275 .entry(spec.name.clone())
276 .or_insert_with(ContinuousAggregateState::new);
277 }
278
279 pub fn drop_aggregate(&self, name: &str) {
280 let mut guard = match self.inner.lock() {
281 Ok(g) => g,
282 Err(p) => p.into_inner(),
283 };
284 guard.specs.remove(name);
285 guard.states.remove(name);
286 }
287
288 pub fn list(&self) -> Vec<ContinuousAggregateSpec> {
289 let guard = match self.inner.lock() {
290 Ok(g) => g,
291 Err(p) => p.into_inner(),
292 };
293 guard.specs.values().cloned().collect()
294 }
295
296 pub fn state(&self, name: &str) -> Option<ContinuousAggregateState> {
297 let guard = match self.inner.lock() {
298 Ok(g) => g,
299 Err(p) => p.into_inner(),
300 };
301 guard.states.get(name).cloned()
302 }
303
304 pub fn refresh(&self, name: &str, now_ns: u64, source: &ContinuousAggregateSource) -> u64 {
310 let spec = match self.get_spec(name) {
311 Some(s) => s,
312 None => return 0,
313 };
314 let state_snapshot = self.get_state(name).unwrap_or_default();
315
316 let latest_safe = now_ns.saturating_sub(spec.refresh_lag_ns);
318 let end_bucket = spec.bucket_start(latest_safe);
319 let start_bucket = state_snapshot.last_refreshed_bucket_ns;
320
321 if end_bucket <= start_bucket {
322 return 0;
323 }
324
325 let max_span = spec.max_interval_per_job_ns;
328 let end_bucket = if end_bucket.saturating_sub(start_bucket) > max_span {
329 start_bucket.saturating_add(max_span)
330 } else {
331 end_bucket
332 };
333
334 let points = source(&spec.source, start_bucket, end_bucket);
335 let absorbed = points.len() as u64;
336
337 let mut guard = match self.inner.lock() {
339 Ok(g) => g,
340 Err(p) => p.into_inner(),
341 };
342 let state = guard
343 .states
344 .entry(name.to_string())
345 .or_insert_with(ContinuousAggregateState::new);
346
347 for point in points {
348 if point.ts_ns < start_bucket || point.ts_ns >= end_bucket {
349 continue; }
351 let bucket_start = spec.bucket_start(point.ts_ns);
352 let row = state
353 .buckets
354 .entry(bucket_start)
355 .or_insert_with(HashMap::new);
356 for col in &spec.columns {
357 if let Some(value) = point.values.get(&col.alias) {
358 row.entry(col.alias.clone())
359 .or_insert_with(BucketState::new)
360 .observe(*value);
361 }
362 }
363 }
364 state.last_refreshed_bucket_ns = end_bucket;
365 absorbed
366 }
367
368 fn get_spec(&self, name: &str) -> Option<ContinuousAggregateSpec> {
369 let guard = match self.inner.lock() {
370 Ok(g) => g,
371 Err(p) => p.into_inner(),
372 };
373 guard.specs.get(name).cloned()
374 }
375
376 fn get_state(&self, name: &str) -> Option<ContinuousAggregateState> {
377 let guard = match self.inner.lock() {
378 Ok(g) => g,
379 Err(p) => p.into_inner(),
380 };
381 guard.states.get(name).cloned()
382 }
383}
384
385impl Default for ContinuousAggregateEngine {
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 const MINUTE: u64 = 60_000_000_000;
396 const HOUR: u64 = 60 * MINUTE;
397
398 fn spec() -> ContinuousAggregateSpec {
399 ContinuousAggregateSpec {
400 name: "five_min_load".into(),
401 source: "metrics".into(),
402 bucket_size_ns: 5 * MINUTE,
403 columns: vec![
404 ContinuousAggregateColumn {
405 alias: "avg_load".into(),
406 source_column: "load".into(),
407 agg: AggregationType::Avg,
408 },
409 ContinuousAggregateColumn {
410 alias: "max_load".into(),
411 source_column: "load".into(),
412 agg: AggregationType::Max,
413 },
414 ],
415 refresh_lag_ns: 0,
416 max_interval_per_job_ns: u64::MAX,
417 }
418 }
419
420 fn points(values: Vec<(u64, f64)>) -> ContinuousAggregateSource {
421 Arc::new(move |_source, start, end| {
422 values
423 .iter()
424 .filter(|(ts, _)| *ts >= start && *ts < end)
425 .map(|(ts, v)| {
426 let mut map = HashMap::new();
427 map.insert("avg_load".to_string(), *v);
428 map.insert("max_load".to_string(), *v);
429 RefreshPoint {
430 ts_ns: *ts,
431 values: map,
432 }
433 })
434 .collect()
435 })
436 }
437
438 #[test]
439 fn refresh_fills_buckets_until_now_minus_lag() {
440 let engine = ContinuousAggregateEngine::new();
441 engine.register(spec());
442 let source = points(vec![
443 (0, 10.0),
444 (MINUTE, 20.0),
445 (5 * MINUTE, 5.0),
446 (6 * MINUTE, 15.0),
447 ]);
448 let absorbed = engine.refresh("five_min_load", 15 * MINUTE, &source);
449 assert_eq!(absorbed, 4);
450 let state = engine.state("five_min_load").unwrap();
451 let buckets = state.buckets();
452 assert_eq!(buckets, vec![0, 5 * MINUTE]);
453 assert!((state.query(0, "avg_load", AggregationType::Avg).unwrap() - 15.0).abs() < 1e-9);
454 assert_eq!(
455 state.query(0, "max_load", AggregationType::Max).unwrap(),
456 20.0
457 );
458 assert_eq!(
459 state
460 .query(5 * MINUTE, "max_load", AggregationType::Max)
461 .unwrap(),
462 15.0
463 );
464 }
465
466 #[test]
467 fn refresh_is_incremental_across_two_calls() {
468 let engine = ContinuousAggregateEngine::new();
469 engine.register(spec());
470
471 let source1 = points(vec![(MINUTE, 10.0), (2 * MINUTE, 20.0)]);
473 engine.refresh("five_min_load", 5 * MINUTE, &source1);
474
475 let source2 = points(vec![(6 * MINUTE, 100.0), (7 * MINUTE, 50.0)]);
478 engine.refresh("five_min_load", 10 * MINUTE, &source2);
479
480 let state = engine.state("five_min_load").unwrap();
481 assert_eq!(state.bucket_count(), 2);
482 assert_eq!(
483 state
484 .query(5 * MINUTE, "avg_load", AggregationType::Avg)
485 .unwrap(),
486 75.0
487 );
488 }
489
490 #[test]
491 fn refresh_respects_lag_window() {
492 let engine = ContinuousAggregateEngine::new();
493 let mut s = spec();
494 s.refresh_lag_ns = 10 * MINUTE;
495 engine.register(s);
496 let source = points(vec![
497 (0, 1.0),
498 (MINUTE, 2.0),
499 (5 * MINUTE, 3.0),
500 (8 * MINUTE, 4.0),
501 ]);
502 let absorbed = engine.refresh("five_min_load", 12 * MINUTE, &source);
504 assert_eq!(absorbed, 0);
506 }
507
508 #[test]
509 fn refresh_caps_work_per_job() {
510 let engine = ContinuousAggregateEngine::new();
511 let mut s = spec();
512 s.max_interval_per_job_ns = 5 * MINUTE;
513 engine.register(s);
514 let source = points(vec![(0, 1.0), (5 * MINUTE, 2.0), (10 * MINUTE, 3.0)]);
515 engine.refresh("five_min_load", HOUR, &source);
519 let state = engine.state("five_min_load").unwrap();
520 assert_eq!(state.bucket_count(), 1);
521 assert_eq!(state.last_refreshed_bucket_ns(), 5 * MINUTE);
522 }
523
524 #[test]
525 fn refresh_of_unknown_aggregate_is_a_noop() {
526 let engine = ContinuousAggregateEngine::new();
527 let source: ContinuousAggregateSource = Arc::new(|_, _, _| Vec::new());
528 assert_eq!(engine.refresh("does_not_exist", 0, &source), 0);
529 }
530
531 #[test]
532 fn bucket_state_merges_cumulative_counts() {
533 let mut a = BucketState::new();
534 a.observe(1.0);
535 a.observe(3.0);
536 let mut b = BucketState::new();
537 b.observe(5.0);
538 a.merge(&b);
539 assert_eq!(a.count, 3);
540 assert_eq!(a.sum, 9.0);
541 assert_eq!(a.min, 1.0);
542 assert_eq!(a.max, 5.0);
543 }
544
545 #[test]
546 fn spec_from_durations_parses_intervals() {
547 let spec = ContinuousAggregateSpec::from_durations(
548 "hourly",
549 "metrics",
550 "1h",
551 vec![ContinuousAggregateColumn {
552 alias: "c".into(),
553 source_column: "v".into(),
554 agg: AggregationType::Count,
555 }],
556 "5m",
557 "1d",
558 )
559 .unwrap();
560 assert_eq!(spec.bucket_size_ns, HOUR);
561 assert_eq!(spec.refresh_lag_ns, 5 * MINUTE);
562 }
563}