Skip to main content

reddb_server/storage/query/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::super::ast::{
4    CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr,
5};
6use super::super::lexer::Token;
7use super::error::ParseError;
8use super::Parser;
9use crate::catalog::CollectionModel;
10
11impl<'a> Parser<'a> {
12    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
13    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
14        let if_not_exists = self.match_if_not_exists()?;
15        let name = self.expect_ident()?;
16
17        let mut retention_ms = None;
18        let mut chunk_size = None;
19        let mut downsample_policies = Vec::new();
20
21        // Parse optional clauses in any order
22        loop {
23            if self.consume(&Token::Retention)? {
24                let value = self.parse_float()?;
25                let unit = self.parse_duration_unit()?;
26                retention_ms = Some((value * unit) as u64);
27            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
28                chunk_size = Some(self.parse_integer()? as usize);
29            } else if self.consume_ident_ci("DOWNSAMPLE")? {
30                downsample_policies.push(self.parse_downsample_policy_spec()?);
31                while self.consume(&Token::Comma)? {
32                    downsample_policies.push(self.parse_downsample_policy_spec()?);
33                }
34            } else {
35                break;
36            }
37        }
38
39        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
40            name,
41            retention_ms,
42            chunk_size,
43            downsample_policies,
44            if_not_exists,
45            hypertable: None,
46        }))
47    }
48
49    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
50    ///
51    /// v0 intentionally establishes only the collection contract. Ingestion,
52    /// series registry, and Prometheus adapter slices build on this metadata.
53    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
54        let if_not_exists = self.match_if_not_exists()?;
55        let name = self.expect_ident()?;
56
57        let mut raw_retention_ms = None;
58        let mut tenant_by = None;
59        let mut downsample_policies = Vec::new();
60
61        loop {
62            if self.consume(&Token::Retention)? {
63                let value = self.parse_float()?;
64                let unit = self.parse_duration_unit()?;
65                raw_retention_ms = Some((value * unit) as u64);
66            } else if self.consume_ident_ci("DOWNSAMPLE")? {
67                downsample_policies.push(self.parse_downsample_policy_spec()?);
68                while self.consume(&Token::Comma)? {
69                    downsample_policies.push(self.parse_downsample_policy_spec()?);
70                }
71            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
72                self.expect(Token::By)?;
73                self.expect(Token::LParen)?;
74                let mut path = self.expect_ident_or_keyword()?;
75                while self.consume(&Token::Dot)? {
76                    let next = self.expect_ident_or_keyword()?;
77                    path = format!("{path}.{next}");
78                }
79                self.expect(Token::RParen)?;
80                tenant_by = Some(path);
81            } else {
82                break;
83            }
84        }
85
86        Ok(QueryExpr::CreateTable(CreateTableQuery {
87            collection_model: CollectionModel::Metrics,
88            name,
89            columns: Vec::new(),
90            if_not_exists,
91            default_ttl_ms: raw_retention_ms,
92            metrics_rollup_policies: downsample_policies,
93            context_index_fields: Vec::new(),
94            context_index_enabled: false,
95            timestamps: false,
96            partition_by: None,
97            tenant_by,
98            append_only: true,
99            subscriptions: Vec::new(),
100            vault_own_master_key: false,
101        }))
102    }
103
104    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
105    ///
106    ///   CREATE HYPERTABLE metrics
107    ///     TIME_COLUMN ts
108    ///     CHUNK_INTERVAL '1d'
109    ///     [TTL '90d']
110    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
111    ///
112    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
113    /// TIMESERIES`, with the `hypertable` field populated. The
114    /// runtime dispatcher registers the spec on the RedDB-wide
115    /// `HypertableRegistry` alongside creating the collection.
116    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
117        let if_not_exists = self.match_if_not_exists()?;
118        let name = self.expect_ident()?;
119
120        let mut time_column: Option<String> = None;
121        let mut chunk_interval_ns: Option<u64> = None;
122        let mut ttl_ns: Option<u64> = None;
123        let mut retention_ms = None;
124
125        loop {
126            if self.consume_ident_ci("TIME_COLUMN")? {
127                time_column = Some(self.expect_ident()?);
128            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
129                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
130            } else if self.consume_ident_ci("TTL")? {
131                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
132            } else if self.consume(&Token::Retention)? {
133                let value = self.parse_float()?;
134                let unit = self.parse_duration_unit()?;
135                retention_ms = Some((value * unit) as u64);
136            } else {
137                break;
138            }
139        }
140
141        let time_column = time_column.ok_or_else(|| {
142            ParseError::new(
143                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
144                self.position(),
145            )
146        })?;
147        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
148            ParseError::new(
149                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
150                self.position(),
151            )
152        })?;
153
154        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
155            name,
156            retention_ms,
157            chunk_size: None,
158            downsample_policies: Vec::new(),
159            if_not_exists,
160            hypertable: Some(HypertableDdl {
161                time_column,
162                chunk_interval_ns,
163                default_ttl_ns: ttl_ns,
164            }),
165        }))
166    }
167
168    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
169    /// resolve it to nanoseconds using the shared retention grammar.
170    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
171        let pos = self.position();
172        let value = self.parse_literal_value()?;
173        match value {
174            crate::storage::schema::Value::Text(s) => {
175                crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
176                    ParseError::new(
177                        // F-05: `s` is caller-controlled string-literal bytes.
178                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
179                        // before reaching downstream serialization sinks.
180                        // `clause` is a static internal label and stays bare.
181                        format!("{clause} duration {s:?} is not a valid duration literal"),
182                        pos,
183                    )
184                })
185            }
186            other => Err(ParseError::new(
187                format!("{clause} expects a string duration literal, got {other:?}"),
188                pos,
189            )),
190        }
191    }
192
193    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
194    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
195        let if_exists = self.match_if_exists()?;
196        let name = self.parse_drop_collection_name()?;
197        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
198            name,
199            if_exists,
200        }))
201    }
202
203    /// Parse a duration unit and return the multiplier in milliseconds
204    fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
205        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
206        // dedicated tokens, not `Token::Ident`, so they need their
207        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
208        // no canonical duration meaning today but were silently
209        // falling through to the seconds default — surface a clear
210        // error instead.
211        match self.peek().clone() {
212            Token::Ident(ref unit) => {
213                let mult = match unit.to_ascii_lowercase().as_str() {
214                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
215                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
216                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
217                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
218                    "d" | "day" | "days" => 86_400_000.0,
219                    other => {
220                        return Err(ParseError::new(
221                            // F-05: `other` is caller-controlled identifier
222                            // text. Render via `{:?}` so embedded CR/LF/NUL/
223                            // quotes are escaped before the message reaches
224                            // downstream serialization sinks.
225                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
226                            self.position(),
227                        ));
228                    }
229                };
230                self.advance()?;
231                Ok(mult)
232            }
233            Token::Min => {
234                // `MIN` keyword used as the minute alias.
235                self.advance()?;
236                Ok(60_000.0)
237            }
238            Token::Max | Token::Avg => {
239                // These keywords have no duration semantics; reject
240                // explicitly so a stray aggregate keyword does not
241                // silently default to seconds.
242                let kw = self.peek().clone();
243                Err(ParseError::new(
244                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
245                    self.position(),
246                ))
247            }
248            _ => Ok(1_000.0), // default: seconds
249        }
250    }
251
252    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
253        let target = self.parse_resolution_spec()?;
254        self.expect(Token::Colon)?;
255        let source = self.parse_resolution_spec()?;
256        let aggregation = if self.consume(&Token::Colon)? {
257            self.expect_ident_or_keyword()?.to_ascii_lowercase()
258        } else {
259            "avg".to_string()
260        };
261        Ok(format!("{target}:{source}:{aggregation}"))
262    }
263
264    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
265        match self.peek().clone() {
266            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
267                self.advance()?;
268                Ok(value.to_ascii_lowercase())
269            }
270            Token::Integer(value) => {
271                self.advance()?;
272                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
273                Ok(format!("{value}{unit}"))
274            }
275            Token::Float(value) => {
276                self.advance()?;
277                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
278                let number = if value.fract().abs() < f64::EPSILON {
279                    format!("{}", value as i64)
280                } else {
281                    value.to_string()
282                };
283                Ok(format!("{number}{unit}"))
284            }
285            other => Err(ParseError::new(
286                format!(
287                    "expected duration literal for downsample policy, got {}",
288                    other
289                ),
290                self.position(),
291            )),
292        }
293    }
294}