nodedb_types/timeseries/continuous_agg.rs
1//! Continuous aggregate definition types.
2//!
3//! Shared between Origin and Lite. Origin uses these for SQL DDL parsing
4//! and the continuous aggregate manager. Lite uses them for the embedded
5//! continuous aggregate engine and its DDL handler.
6
7use serde::{Deserialize, Serialize};
8
9/// Definition of a continuous aggregate.
10#[derive(
11 Debug,
12 Clone,
13 PartialEq,
14 Serialize,
15 Deserialize,
16 zerompk::ToMessagePack,
17 zerompk::FromMessagePack,
18)]
19pub struct ContinuousAggregateDef {
20 /// Name of this aggregate (e.g., "metrics_1m").
21 pub name: String,
22 /// Source collection or aggregate to read from.
23 pub source: String,
24 /// Time bucket interval string (e.g., "1m", "1h", "1d").
25 pub bucket_interval: String,
26 /// Bucket interval in milliseconds (computed from bucket_interval).
27 pub bucket_interval_ms: i64,
28 /// Columns to GROUP BY (tag/symbol columns).
29 pub group_by: Vec<String>,
30 /// Aggregate expressions to compute.
31 pub aggregates: Vec<AggregateExpr>,
32 /// When to refresh.
33 pub refresh_policy: RefreshPolicy,
34 /// Retention period in milliseconds (0 = infinite, independent of source).
35 pub retention_period_ms: u64,
36 /// Whether this aggregate is currently stale (schema change invalidation).
37 pub stale: bool,
38}
39
40/// An aggregate expression: function + source column → result column.
41#[derive(
42 Debug,
43 Clone,
44 PartialEq,
45 Serialize,
46 Deserialize,
47 zerompk::ToMessagePack,
48 zerompk::FromMessagePack,
49)]
50pub struct AggregateExpr {
51 /// Aggregate function.
52 pub function: AggFunction,
53 /// Source column name (e.g., "cpu"). "*" for COUNT.
54 pub source_column: String,
55 /// Output column name (e.g., "cpu_avg"). Auto-generated if empty.
56 pub output_column: String,
57}
58
59/// Supported aggregate functions.
60#[derive(
61 Debug,
62 Clone,
63 PartialEq,
64 Serialize,
65 Deserialize,
66 zerompk::ToMessagePack,
67 zerompk::FromMessagePack,
68)]
69pub enum AggFunction {
70 Sum,
71 Count,
72 Min,
73 Max,
74 Avg,
75 First,
76 Last,
77 /// Approximate count distinct via HyperLogLog.
78 CountDistinct,
79 /// Approximate percentile via TDigest. Inner value is the quantile (0.0–1.0).
80 Percentile(f64),
81 /// Approximate top-K heavy hitters via SpaceSaving. Inner value is K.
82 TopK(usize),
83}
84
85impl AggFunction {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 Self::Sum => "sum",
89 Self::Count => "count",
90 Self::Min => "min",
91 Self::Max => "max",
92 Self::Avg => "avg",
93 Self::First => "first",
94 Self::Last => "last",
95 Self::CountDistinct => "count_distinct",
96 Self::Percentile(_) => "percentile",
97 Self::TopK(_) => "topk",
98 }
99 }
100
101 /// Whether this function requires sketch state in PartialAggregate.
102 pub fn uses_sketch(&self) -> bool {
103 matches!(
104 self,
105 Self::CountDistinct | Self::Percentile(_) | Self::TopK(_)
106 )
107 }
108}
109
110/// When to refresh the aggregate.
111#[derive(
112 Debug,
113 Clone,
114 Default,
115 PartialEq,
116 Eq,
117 Serialize,
118 Deserialize,
119 zerompk::ToMessagePack,
120 zerompk::FromMessagePack,
121)]
122pub enum RefreshPolicy {
123 /// Refresh on every memtable flush. Lowest latency.
124 #[default]
125 OnFlush,
126 /// Refresh when a partition is sealed. Lower CPU cost.
127 OnSeal,
128 /// Refresh every N milliseconds.
129 Periodic(u64),
130 /// Only refresh via explicit REFRESH command.
131 Manual,
132}