nodedb_types/timeseries/continuous_agg.rs
1//! Continuous aggregate definition types.
2//!
3//! Shared between Origin and Lite. Origin uses these for SQL DDL parsing
4//! and the continuous aggregate manager. Lite uses them for the embedded
5//! continuous aggregate engine and its DDL handler.
6
7use serde::{Deserialize, Serialize};
8
9/// Definition of a continuous aggregate.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ContinuousAggregateDef {
12 /// Name of this aggregate (e.g., "metrics_1m").
13 pub name: String,
14 /// Source collection or aggregate to read from.
15 pub source: String,
16 /// Time bucket interval string (e.g., "1m", "1h", "1d").
17 pub bucket_interval: String,
18 /// Bucket interval in milliseconds (computed from bucket_interval).
19 pub bucket_interval_ms: i64,
20 /// Columns to GROUP BY (tag/symbol columns).
21 pub group_by: Vec<String>,
22 /// Aggregate expressions to compute.
23 pub aggregates: Vec<AggregateExpr>,
24 /// When to refresh.
25 pub refresh_policy: RefreshPolicy,
26 /// Retention period in milliseconds (0 = infinite, independent of source).
27 pub retention_period_ms: u64,
28 /// Whether this aggregate is currently stale (schema change invalidation).
29 pub stale: bool,
30}
31
32/// An aggregate expression: function + source column → result column.
33#[derive(
34 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
35)]
36pub struct AggregateExpr {
37 /// Aggregate function.
38 pub function: AggFunction,
39 /// Source column name (e.g., "cpu"). "*" for COUNT.
40 pub source_column: String,
41 /// Output column name (e.g., "cpu_avg"). Auto-generated if empty.
42 pub output_column: String,
43}
44
45/// Supported aggregate functions.
46#[derive(
47 Debug,
48 Clone,
49 PartialEq,
50 Serialize,
51 Deserialize,
52 zerompk::ToMessagePack,
53 zerompk::FromMessagePack,
54)]
55pub enum AggFunction {
56 Sum,
57 Count,
58 Min,
59 Max,
60 Avg,
61 First,
62 Last,
63 /// Approximate count distinct via HyperLogLog.
64 CountDistinct,
65 /// Approximate percentile via TDigest. Inner value is the quantile (0.0–1.0).
66 Percentile(f64),
67 /// Approximate top-K heavy hitters via SpaceSaving. Inner value is K.
68 TopK(usize),
69}
70
71impl AggFunction {
72 pub fn as_str(&self) -> &'static str {
73 match self {
74 Self::Sum => "sum",
75 Self::Count => "count",
76 Self::Min => "min",
77 Self::Max => "max",
78 Self::Avg => "avg",
79 Self::First => "first",
80 Self::Last => "last",
81 Self::CountDistinct => "count_distinct",
82 Self::Percentile(_) => "percentile",
83 Self::TopK(_) => "topk",
84 }
85 }
86
87 /// Whether this function requires sketch state in PartialAggregate.
88 pub fn uses_sketch(&self) -> bool {
89 matches!(
90 self,
91 Self::CountDistinct | Self::Percentile(_) | Self::TopK(_)
92 )
93 }
94}
95
96/// When to refresh the aggregate.
97#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
98pub enum RefreshPolicy {
99 /// Refresh on every memtable flush. Lowest latency.
100 #[default]
101 OnFlush,
102 /// Refresh when a partition is sealed. Lower CPU cost.
103 OnSeal,
104 /// Refresh every N milliseconds.
105 Periodic(u64),
106 /// Only refresh via explicit REFRESH command.
107 Manual,
108}