Skip to main content

nodedb_types/timeseries/
continuous_agg.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Continuous aggregate definition types.
4//!
5//! Shared between Origin and Lite. Origin uses these for SQL DDL parsing
6//! and the continuous aggregate manager. Lite uses them for the embedded
7//! continuous aggregate engine and its DDL handler.
8
9use serde::{Deserialize, Serialize};
10
11/// Definition of a continuous aggregate.
12#[derive(
13    Debug,
14    Clone,
15    PartialEq,
16    Serialize,
17    Deserialize,
18    zerompk::ToMessagePack,
19    zerompk::FromMessagePack,
20)]
21pub struct ContinuousAggregateDef {
22    /// Name of this aggregate (e.g., "metrics_1m").
23    pub name: String,
24    /// Source collection or aggregate to read from.
25    pub source: String,
26    /// Time bucket interval string (e.g., "1m", "1h", "1d").
27    pub bucket_interval: String,
28    /// Bucket interval in milliseconds (computed from bucket_interval).
29    pub bucket_interval_ms: i64,
30    /// Columns to GROUP BY (tag/symbol columns).
31    pub group_by: Vec<String>,
32    /// Aggregate expressions to compute.
33    pub aggregates: Vec<AggregateExpr>,
34    /// When to refresh.
35    pub refresh_policy: RefreshPolicy,
36    /// Retention period in milliseconds (0 = infinite, independent of source).
37    pub retention_period_ms: u64,
38    /// Whether this aggregate is currently stale (schema change invalidation).
39    pub stale: bool,
40}
41
42/// An aggregate expression: function + source column → result column.
43#[derive(
44    Debug,
45    Clone,
46    PartialEq,
47    Serialize,
48    Deserialize,
49    zerompk::ToMessagePack,
50    zerompk::FromMessagePack,
51)]
52pub struct AggregateExpr {
53    /// Aggregate function.
54    pub function: AggFunction,
55    /// Source column name (e.g., "cpu"). "*" for COUNT.
56    pub source_column: String,
57    /// Output column name (e.g., "cpu_avg"). Auto-generated if empty.
58    pub output_column: String,
59}
60
61/// Supported aggregate functions.
62#[derive(
63    Debug,
64    Clone,
65    PartialEq,
66    Serialize,
67    Deserialize,
68    zerompk::ToMessagePack,
69    zerompk::FromMessagePack,
70)]
71#[serde(rename_all = "snake_case")]
72#[non_exhaustive]
73pub enum AggFunction {
74    #[serde(rename = "sum")]
75    Sum,
76    #[serde(rename = "count")]
77    Count,
78    #[serde(rename = "min")]
79    Min,
80    #[serde(rename = "max")]
81    Max,
82    #[serde(rename = "avg")]
83    Avg,
84    #[serde(rename = "first")]
85    First,
86    #[serde(rename = "last")]
87    Last,
88    /// Approximate count distinct via HyperLogLog.
89    #[serde(rename = "count_distinct")]
90    CountDistinct,
91    /// Approximate percentile via TDigest. Inner value is the quantile (0.0–1.0).
92    #[serde(rename = "percentile")]
93    Percentile(f64),
94    /// Approximate top-K heavy hitters via SpaceSaving. Inner value is K.
95    #[serde(rename = "topk")]
96    TopK(usize),
97}
98
99impl AggFunction {
100    pub fn as_str(&self) -> &'static str {
101        match self {
102            Self::Sum => "sum",
103            Self::Count => "count",
104            Self::Min => "min",
105            Self::Max => "max",
106            Self::Avg => "avg",
107            Self::First => "first",
108            Self::Last => "last",
109            Self::CountDistinct => "count_distinct",
110            Self::Percentile(_) => "percentile",
111            Self::TopK(_) => "topk",
112        }
113    }
114
115    /// Whether this function requires sketch state in PartialAggregate.
116    pub fn uses_sketch(&self) -> bool {
117        matches!(
118            self,
119            Self::CountDistinct | Self::Percentile(_) | Self::TopK(_)
120        )
121    }
122}
123
124/// When to refresh the aggregate.
125#[derive(
126    Debug,
127    Clone,
128    Default,
129    PartialEq,
130    Eq,
131    Serialize,
132    Deserialize,
133    zerompk::ToMessagePack,
134    zerompk::FromMessagePack,
135)]
136#[serde(rename_all = "snake_case")]
137#[non_exhaustive]
138pub enum RefreshPolicy {
139    /// Refresh on every memtable flush. Lowest latency.
140    #[default]
141    #[serde(rename = "on_flush")]
142    OnFlush,
143    /// Refresh when a partition is sealed. Lower CPU cost.
144    #[serde(rename = "on_seal")]
145    OnSeal,
146    /// Refresh every N milliseconds.
147    #[serde(rename = "periodic")]
148    Periodic(u64),
149    /// Only refresh via explicit REFRESH command.
150    #[serde(rename = "manual")]
151    Manual,
152}