Skip to main content

reddb_server/storage/query/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::super::ast::{
4    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
5    QueryExpr,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
14    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut retention_ms = None;
19        let mut chunk_size = None;
20        let mut downsample_policies = Vec::new();
21        let mut session_key: Option<String> = None;
22        let mut session_gap_ms: Option<u64> = None;
23
24        // Parse optional clauses in any order
25        loop {
26            if self.consume(&Token::Retention)? {
27                let value = self.parse_float()?;
28                let unit = self.parse_duration_unit()?;
29                retention_ms = Some((value * unit) as u64);
30            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
31                chunk_size = Some(self.parse_integer()? as usize);
32            } else if self.consume_ident_ci("DOWNSAMPLE")? {
33                downsample_policies.push(self.parse_downsample_policy_spec()?);
34                while self.consume(&Token::Comma)? {
35                    downsample_policies.push(self.parse_downsample_policy_spec()?);
36                }
37            } else if self.consume(&Token::With)? {
38                // `WITH SESSION_KEY <col> SESSION_GAP <duration>` — both
39                // clauses are paired so the SESSIONIZE operator (slice
40                // 2+) has a complete default. Order is fixed
41                // (SESSION_KEY first) to keep the grammar simple; one
42                // without the other is a parse error.
43                self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
44            } else {
45                break;
46            }
47        }
48
49        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
50            name,
51            retention_ms,
52            chunk_size,
53            downsample_policies,
54            if_not_exists,
55            hypertable: None,
56            session_key,
57            session_gap_ms,
58        }))
59    }
60
61    /// Parse `SESSION_KEY <ident> SESSION_GAP <duration>` after a
62    /// `WITH` token has been consumed. Both clauses are required; a
63    /// SESSION_KEY without a SESSION_GAP (or vice-versa) is rejected
64    /// at parse time so the descriptor never carries half a pairing.
65    fn parse_with_session_clause(
66        &mut self,
67        session_key: &mut Option<String>,
68        session_gap_ms: &mut Option<u64>,
69    ) -> Result<(), ParseError> {
70        if !self.consume_ident_ci("SESSION_KEY")? {
71            return Err(ParseError::new(
72                "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
73                self.position(),
74            ));
75        }
76        let key = self.expect_ident()?;
77        if !self.consume_ident_ci("SESSION_GAP")? {
78            return Err(ParseError::new(
79                "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
80                self.position(),
81            ));
82        }
83        let value = self.parse_float()?;
84        let unit = self.parse_duration_unit()?;
85        *session_key = Some(key);
86        *session_gap_ms = Some((value * unit) as u64);
87        Ok(())
88    }
89
90    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
91    ///
92    /// v0 intentionally establishes only the collection contract. Ingestion,
93    /// series registry, and Prometheus adapter slices build on this metadata.
94    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
95        let if_not_exists = self.match_if_not_exists()?;
96        let name = self.expect_ident()?;
97
98        let mut raw_retention_ms = None;
99        let mut tenant_by = None;
100        let mut downsample_policies = Vec::new();
101
102        loop {
103            if self.consume(&Token::Retention)? {
104                let value = self.parse_float()?;
105                let unit = self.parse_duration_unit()?;
106                raw_retention_ms = Some((value * unit) as u64);
107            } else if self.consume_ident_ci("DOWNSAMPLE")? {
108                downsample_policies.push(self.parse_downsample_policy_spec()?);
109                while self.consume(&Token::Comma)? {
110                    downsample_policies.push(self.parse_downsample_policy_spec()?);
111                }
112            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
113                self.expect(Token::By)?;
114                self.expect(Token::LParen)?;
115                let mut path = self.expect_ident_or_keyword()?;
116                while self.consume(&Token::Dot)? {
117                    let next = self.expect_ident_or_keyword()?;
118                    path = format!("{path}.{next}");
119                }
120                self.expect(Token::RParen)?;
121                tenant_by = Some(path);
122            } else {
123                break;
124            }
125        }
126
127        Ok(QueryExpr::CreateTable(CreateTableQuery {
128            collection_model: CollectionModel::Metrics,
129            name,
130            columns: Vec::new(),
131            if_not_exists,
132            default_ttl_ms: raw_retention_ms,
133            metrics_rollup_policies: downsample_policies,
134            context_index_fields: Vec::new(),
135            context_index_enabled: false,
136            timestamps: false,
137            partition_by: None,
138            tenant_by,
139            append_only: true,
140            subscriptions: Vec::new(),
141            analytics_config: Vec::new(),
142            vault_own_master_key: false,
143        }))
144    }
145
146    /// Parse CREATE METRIC body (after CREATE METRIC consumed).
147    pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
148        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
149        while self.consume(&Token::Dot)? {
150            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
151            path = format!("{path}.{next}");
152        }
153
154        let mut kind = None;
155        let mut role = None;
156        let mut source: Option<String> = None;
157        let mut query: Option<String> = None;
158        let mut window_ms: Option<u64> = None;
159        let mut time_field: Option<String> = None;
160        loop {
161            if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
162                kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
163            } else if self.consume_ident_ci("ROLE")? {
164                role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
165            } else if self.consume_ident_ci("SOURCE")? {
166                source = Some(self.expect_ident_or_keyword()?);
167            } else if self.consume_ident_ci("QUERY")? {
168                let value = self.parse_literal_value()?;
169                match value {
170                    crate::storage::schema::Value::Text(s) => query = Some(s.to_string()),
171                    other => {
172                        return Err(ParseError::new(
173                            format!("derived metric QUERY expects a string literal, got {other:?}"),
174                            self.position(),
175                        ));
176                    }
177                }
178            } else if self.consume_ident_ci("WINDOW")? {
179                let value = self.parse_float()?;
180                let unit = self.parse_duration_unit()?;
181                window_ms = Some((value * unit) as u64);
182            } else if self.consume_ident_ci("TIME_FIELD")? {
183                time_field = Some(self.expect_ident_or_keyword()?);
184            } else {
185                break;
186            }
187        }
188
189        Ok(QueryExpr::CreateMetric(
190            crate::storage::query::ast::CreateMetricQuery {
191                path,
192                kind: kind.ok_or_else(|| {
193                    ParseError::new(
194                        "metric descriptor requires TYPE or KIND".to_string(),
195                        self.position(),
196                    )
197                })?,
198                role: role.ok_or_else(|| {
199                    ParseError::new(
200                        "metric descriptor requires ROLE".to_string(),
201                        self.position(),
202                    )
203                })?,
204                source,
205                query,
206                window_ms,
207                time_field,
208            },
209        ))
210    }
211
212    /// Parse ALTER METRIC body (after ALTER METRIC consumed).
213    ///
214    /// Grammar:
215    ///   ALTER METRIC <dotted.path> SET ROLE <ident>
216    ///   ALTER METRIC <dotted.path> SET KIND <ident>      -- rejected at runtime
217    ///   ALTER METRIC <dotted.path> SET TYPE <ident>      -- rejected at runtime
218    ///   ALTER METRIC <dotted.path> SET PATH <dotted>     -- rejected at runtime
219    ///
220    /// Immutable-field attempts parse so the runtime can return a
221    /// structured "field X cannot be changed" error explaining *why*.
222    pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
223        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
224        while self.consume(&Token::Dot)? {
225            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
226            path = format!("{path}.{next}");
227        }
228
229        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
230            return Err(ParseError::expected(
231                vec!["SET"],
232                self.peek(),
233                self.position(),
234            ));
235        }
236
237        let mut set_role = None;
238        let mut attempted_kind = None;
239        let mut attempted_path = None;
240
241        if self.consume_ident_ci("ROLE")? {
242            set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
243        } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
244            attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
245        } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
246            let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
247            while self.consume(&Token::Dot)? {
248                let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
249                new_path = format!("{new_path}.{next}");
250            }
251            attempted_path = Some(new_path);
252        } else {
253            return Err(ParseError::expected(
254                vec!["ROLE", "KIND", "TYPE", "PATH"],
255                self.peek(),
256                self.position(),
257            ));
258        }
259
260        Ok(QueryExpr::AlterMetric(
261            crate::storage::query::ast::AlterMetricQuery {
262                path,
263                set_role,
264                attempted_kind,
265                attempted_path,
266            },
267        ))
268    }
269
270    /// Parse CREATE SLO body (after CREATE SLO consumed).
271    ///
272    /// Grammar:
273    ///   CREATE SLO <dotted.path>
274    ///     ON <metric.dotted.path>
275    ///     TARGET <number>
276    ///     WINDOW <number> <duration_unit>
277    ///
278    /// Clauses are positional after the SLO path so the grammar stays
279    /// tight; the parser leaves semantic validation (metric exists, role
280    /// = sli, target in range) to the runtime catalog where the error
281    /// wording can reference the live catalog state.
282    pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
283        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
284        while self.consume(&Token::Dot)? {
285            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
286            path = format!("{path}.{next}");
287        }
288
289        if !self.consume(&Token::On)? {
290            return Err(ParseError::expected(
291                vec!["ON"],
292                self.peek(),
293                self.position(),
294            ));
295        }
296
297        let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
298        while self.consume(&Token::Dot)? {
299            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
300            metric_path = format!("{metric_path}.{next}");
301        }
302
303        let mut target: Option<f64> = None;
304        let mut window_ms: Option<u64> = None;
305
306        loop {
307            if self.consume_ident_ci("TARGET")? {
308                target = Some(self.parse_float()?);
309            } else if self.consume_ident_ci("WINDOW")? {
310                let value = self.parse_float()?;
311                let unit = self.parse_duration_unit()?;
312                window_ms = Some((value * unit) as u64);
313            } else {
314                break;
315            }
316        }
317
318        Ok(QueryExpr::CreateSlo(CreateSloQuery {
319            path,
320            metric_path,
321            target: target.ok_or_else(|| {
322                ParseError::new(
323                    "SLO descriptor requires TARGET <number>".to_string(),
324                    self.position(),
325                )
326            })?,
327            window_ms: window_ms.ok_or_else(|| {
328                ParseError::new(
329                    "SLO descriptor requires WINDOW <duration>".to_string(),
330                    self.position(),
331                )
332            })?,
333        }))
334    }
335
336    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
337    ///
338    ///   CREATE HYPERTABLE metrics
339    ///     TIME_COLUMN ts
340    ///     CHUNK_INTERVAL '1d'
341    ///     [TTL '90d']
342    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
343    ///
344    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
345    /// TIMESERIES`, with the `hypertable` field populated. The
346    /// runtime dispatcher registers the spec on the RedDB-wide
347    /// `HypertableRegistry` alongside creating the collection.
348    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
349        let if_not_exists = self.match_if_not_exists()?;
350        let name = self.expect_ident()?;
351
352        let mut time_column: Option<String> = None;
353        let mut chunk_interval_ns: Option<u64> = None;
354        let mut ttl_ns: Option<u64> = None;
355        let mut retention_ms = None;
356
357        loop {
358            if self.consume_ident_ci("TIME_COLUMN")? {
359                time_column = Some(self.expect_ident()?);
360            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
361                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
362            } else if self.consume_ident_ci("TTL")? {
363                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
364            } else if self.consume(&Token::Retention)? {
365                let value = self.parse_float()?;
366                let unit = self.parse_duration_unit()?;
367                retention_ms = Some((value * unit) as u64);
368            } else {
369                break;
370            }
371        }
372
373        let time_column = time_column.ok_or_else(|| {
374            ParseError::new(
375                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
376                self.position(),
377            )
378        })?;
379        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
380            ParseError::new(
381                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
382                self.position(),
383            )
384        })?;
385
386        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
387            name,
388            retention_ms,
389            chunk_size: None,
390            downsample_policies: Vec::new(),
391            if_not_exists,
392            hypertable: Some(HypertableDdl {
393                time_column,
394                chunk_interval_ns,
395                default_ttl_ns: ttl_ns,
396            }),
397            session_key: None,
398            session_gap_ms: None,
399        }))
400    }
401
402    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
403    /// resolve it to nanoseconds using the shared retention grammar.
404    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
405        let pos = self.position();
406        let value = self.parse_literal_value()?;
407        match value {
408            crate::storage::schema::Value::Text(s) => {
409                crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
410                    ParseError::new(
411                        // F-05: `s` is caller-controlled string-literal bytes.
412                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
413                        // before reaching downstream serialization sinks.
414                        // `clause` is a static internal label and stays bare.
415                        format!("{clause} duration {s:?} is not a valid duration literal"),
416                        pos,
417                    )
418                })
419            }
420            other => Err(ParseError::new(
421                format!("{clause} expects a string duration literal, got {other:?}"),
422                pos,
423            )),
424        }
425    }
426
427    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
428    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
429        let if_exists = self.match_if_exists()?;
430        let name = self.parse_drop_collection_name()?;
431        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
432            name,
433            if_exists,
434        }))
435    }
436
437    /// Parse a duration unit and return the multiplier in milliseconds
438    pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
439        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
440        // dedicated tokens, not `Token::Ident`, so they need their
441        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
442        // no canonical duration meaning today but were silently
443        // falling through to the seconds default — surface a clear
444        // error instead.
445        match self.peek().clone() {
446            Token::Ident(ref unit) => {
447                let mult = match unit.to_ascii_lowercase().as_str() {
448                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
449                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
450                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
451                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
452                    "d" | "day" | "days" => 86_400_000.0,
453                    other => {
454                        return Err(ParseError::new(
455                            // F-05: `other` is caller-controlled identifier
456                            // text. Render via `{:?}` so embedded CR/LF/NUL/
457                            // quotes are escaped before the message reaches
458                            // downstream serialization sinks.
459                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
460                            self.position(),
461                        ));
462                    }
463                };
464                self.advance()?;
465                Ok(mult)
466            }
467            Token::Min => {
468                // `MIN` keyword used as the minute alias.
469                self.advance()?;
470                Ok(60_000.0)
471            }
472            Token::Max | Token::Avg => {
473                // These keywords have no duration semantics; reject
474                // explicitly so a stray aggregate keyword does not
475                // silently default to seconds.
476                let kw = self.peek().clone();
477                Err(ParseError::new(
478                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
479                    self.position(),
480                ))
481            }
482            _ => Ok(1_000.0), // default: seconds
483        }
484    }
485
486    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
487        let target = self.parse_resolution_spec()?;
488        self.expect(Token::Colon)?;
489        let source = self.parse_resolution_spec()?;
490        let aggregation = if self.consume(&Token::Colon)? {
491            self.expect_ident_or_keyword()?.to_ascii_lowercase()
492        } else {
493            "avg".to_string()
494        };
495        Ok(format!("{target}:{source}:{aggregation}"))
496    }
497
498    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
499        match self.peek().clone() {
500            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
501                self.advance()?;
502                Ok(value.to_ascii_lowercase())
503            }
504            Token::Integer(value) => {
505                self.advance()?;
506                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
507                Ok(format!("{value}{unit}"))
508            }
509            Token::Float(value) => {
510                self.advance()?;
511                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
512                let number = if value.fract().abs() < f64::EPSILON {
513                    format!("{}", value as i64)
514                } else {
515                    value.to_string()
516                };
517                Ok(format!("{number}{unit}"))
518            }
519            other => Err(ParseError::new(
520                format!(
521                    "expected duration literal for downsample policy, got {}",
522                    other
523                ),
524                self.position(),
525            )),
526        }
527    }
528}