Skip to main content

nodedb_types/timeseries/
continuous_agg.rs

1//! Continuous aggregate definition types.
2//!
3//! Shared between Origin and Lite. Origin uses these for SQL DDL parsing
4//! and the continuous aggregate manager. Lite uses them for the embedded
5//! continuous aggregate engine and its DDL handler.
6
7use serde::{Deserialize, Serialize};
8
9/// Definition of a continuous aggregate.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ContinuousAggregateDef {
12    /// Name of this aggregate (e.g., "metrics_1m").
13    pub name: String,
14    /// Source collection or aggregate to read from.
15    pub source: String,
16    /// Time bucket interval string (e.g., "1m", "1h", "1d").
17    pub bucket_interval: String,
18    /// Bucket interval in milliseconds (computed from bucket_interval).
19    pub bucket_interval_ms: i64,
20    /// Columns to GROUP BY (tag/symbol columns).
21    pub group_by: Vec<String>,
22    /// Aggregate expressions to compute.
23    pub aggregates: Vec<AggregateExpr>,
24    /// When to refresh.
25    pub refresh_policy: RefreshPolicy,
26    /// Retention period in milliseconds (0 = infinite, independent of source).
27    pub retention_period_ms: u64,
28    /// Whether this aggregate is currently stale (schema change invalidation).
29    pub stale: bool,
30}
31
32/// An aggregate expression: function + source column → result column.
33#[derive(
34    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
35)]
36pub struct AggregateExpr {
37    /// Aggregate function.
38    pub function: AggFunction,
39    /// Source column name (e.g., "cpu"). "*" for COUNT.
40    pub source_column: String,
41    /// Output column name (e.g., "cpu_avg"). Auto-generated if empty.
42    pub output_column: String,
43}
44
45/// Supported aggregate functions.
46#[derive(
47    Debug,
48    Clone,
49    PartialEq,
50    Serialize,
51    Deserialize,
52    zerompk::ToMessagePack,
53    zerompk::FromMessagePack,
54)]
55pub enum AggFunction {
56    Sum,
57    Count,
58    Min,
59    Max,
60    Avg,
61    First,
62    Last,
63    /// Approximate count distinct via HyperLogLog.
64    CountDistinct,
65    /// Approximate percentile via TDigest. Inner value is the quantile (0.0–1.0).
66    Percentile(f64),
67    /// Approximate top-K heavy hitters via SpaceSaving. Inner value is K.
68    TopK(usize),
69}
70
71impl AggFunction {
72    pub fn as_str(&self) -> &'static str {
73        match self {
74            Self::Sum => "sum",
75            Self::Count => "count",
76            Self::Min => "min",
77            Self::Max => "max",
78            Self::Avg => "avg",
79            Self::First => "first",
80            Self::Last => "last",
81            Self::CountDistinct => "count_distinct",
82            Self::Percentile(_) => "percentile",
83            Self::TopK(_) => "topk",
84        }
85    }
86
87    /// Whether this function requires sketch state in PartialAggregate.
88    pub fn uses_sketch(&self) -> bool {
89        matches!(
90            self,
91            Self::CountDistinct | Self::Percentile(_) | Self::TopK(_)
92        )
93    }
94}
95
96/// When to refresh the aggregate.
97#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
98pub enum RefreshPolicy {
99    /// Refresh on every memtable flush. Lowest latency.
100    #[default]
101    OnFlush,
102    /// Refresh when a partition is sealed. Lower CPU cost.
103    OnSeal,
104    /// Refresh every N milliseconds.
105    Periodic(u64),
106    /// Only refresh via explicit REFRESH command.
107    Manual,
108}