Skip to main content

nodedb_types/timeseries/
continuous_agg.rs

1//! Continuous aggregate definition types.
2//!
3//! Shared between Origin and Lite. Origin uses these for SQL DDL parsing
4//! and the continuous aggregate manager. Lite uses them for the embedded
5//! continuous aggregate engine and its DDL handler.
6
7use serde::{Deserialize, Serialize};
8
9/// Definition of a continuous aggregate.
10#[derive(
11    Debug,
12    Clone,
13    PartialEq,
14    Serialize,
15    Deserialize,
16    zerompk::ToMessagePack,
17    zerompk::FromMessagePack,
18)]
19pub struct ContinuousAggregateDef {
20    /// Name of this aggregate (e.g., "metrics_1m").
21    pub name: String,
22    /// Source collection or aggregate to read from.
23    pub source: String,
24    /// Time bucket interval string (e.g., "1m", "1h", "1d").
25    pub bucket_interval: String,
26    /// Bucket interval in milliseconds (computed from bucket_interval).
27    pub bucket_interval_ms: i64,
28    /// Columns to GROUP BY (tag/symbol columns).
29    pub group_by: Vec<String>,
30    /// Aggregate expressions to compute.
31    pub aggregates: Vec<AggregateExpr>,
32    /// When to refresh.
33    pub refresh_policy: RefreshPolicy,
34    /// Retention period in milliseconds (0 = infinite, independent of source).
35    pub retention_period_ms: u64,
36    /// Whether this aggregate is currently stale (schema change invalidation).
37    pub stale: bool,
38}
39
40/// An aggregate expression: function + source column → result column.
41#[derive(
42    Debug,
43    Clone,
44    PartialEq,
45    Serialize,
46    Deserialize,
47    zerompk::ToMessagePack,
48    zerompk::FromMessagePack,
49)]
50pub struct AggregateExpr {
51    /// Aggregate function.
52    pub function: AggFunction,
53    /// Source column name (e.g., "cpu"). "*" for COUNT.
54    pub source_column: String,
55    /// Output column name (e.g., "cpu_avg"). Auto-generated if empty.
56    pub output_column: String,
57}
58
59/// Supported aggregate functions.
60#[derive(
61    Debug,
62    Clone,
63    PartialEq,
64    Serialize,
65    Deserialize,
66    zerompk::ToMessagePack,
67    zerompk::FromMessagePack,
68)]
69pub enum AggFunction {
70    Sum,
71    Count,
72    Min,
73    Max,
74    Avg,
75    First,
76    Last,
77    /// Approximate count distinct via HyperLogLog.
78    CountDistinct,
79    /// Approximate percentile via TDigest. Inner value is the quantile (0.0–1.0).
80    Percentile(f64),
81    /// Approximate top-K heavy hitters via SpaceSaving. Inner value is K.
82    TopK(usize),
83}
84
85impl AggFunction {
86    pub fn as_str(&self) -> &'static str {
87        match self {
88            Self::Sum => "sum",
89            Self::Count => "count",
90            Self::Min => "min",
91            Self::Max => "max",
92            Self::Avg => "avg",
93            Self::First => "first",
94            Self::Last => "last",
95            Self::CountDistinct => "count_distinct",
96            Self::Percentile(_) => "percentile",
97            Self::TopK(_) => "topk",
98        }
99    }
100
101    /// Whether this function requires sketch state in PartialAggregate.
102    pub fn uses_sketch(&self) -> bool {
103        matches!(
104            self,
105            Self::CountDistinct | Self::Percentile(_) | Self::TopK(_)
106        )
107    }
108}
109
110/// When to refresh the aggregate.
111#[derive(
112    Debug,
113    Clone,
114    Default,
115    PartialEq,
116    Eq,
117    Serialize,
118    Deserialize,
119    zerompk::ToMessagePack,
120    zerompk::FromMessagePack,
121)]
122pub enum RefreshPolicy {
123    /// Refresh on every memtable flush. Lowest latency.
124    #[default]
125    OnFlush,
126    /// Refresh when a partition is sealed. Lower CPU cost.
127    OnSeal,
128    /// Refresh every N milliseconds.
129    Periodic(u64),
130    /// Only refresh via explicit REFRESH command.
131    Manual,
132}