Skip to main content

reddb_server/storage/query/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::super::ast::{
4    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
5    QueryExpr,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
14    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut retention_ms = None;
19        let mut chunk_size = None;
20        let mut downsample_policies = Vec::new();
21        let mut session_key: Option<String> = None;
22        let mut session_gap_ms: Option<u64> = None;
23        let mut columnar = false;
24
25        // Parse optional clauses in any order
26        loop {
27            if self.consume(&Token::Retention)? {
28                let value = self.parse_float()?;
29                let unit = self.parse_duration_unit()?;
30                retention_ms = Some((value * unit) as u64);
31            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32                chunk_size = Some(self.parse_integer()? as usize);
33            } else if self.consume_ident_ci("COLUMNAR")? {
34                // `COLUMNAR` — activate columnar analytical storage (#911).
35                columnar = true;
36            } else if self.consume_ident_ci("DOWNSAMPLE")? {
37                downsample_policies.push(self.parse_downsample_policy_spec()?);
38                while self.consume(&Token::Comma)? {
39                    downsample_policies.push(self.parse_downsample_policy_spec()?);
40                }
41            } else if self.consume(&Token::With)? {
42                // `WITH SESSION_KEY <col> SESSION_GAP <duration>` — both
43                // clauses are paired so the SESSIONIZE operator (slice
44                // 2+) has a complete default. Order is fixed
45                // (SESSION_KEY first) to keep the grammar simple; one
46                // without the other is a parse error.
47                self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48            } else {
49                break;
50            }
51        }
52
53        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54            name,
55            retention_ms,
56            chunk_size,
57            downsample_policies,
58            if_not_exists,
59            hypertable: None,
60            session_key,
61            session_gap_ms,
62            columnar,
63        }))
64    }
65
66    /// Parse `SESSION_KEY <ident> SESSION_GAP <duration>` after a
67    /// `WITH` token has been consumed. Both clauses are required; a
68    /// SESSION_KEY without a SESSION_GAP (or vice-versa) is rejected
69    /// at parse time so the descriptor never carries half a pairing.
70    fn parse_with_session_clause(
71        &mut self,
72        session_key: &mut Option<String>,
73        session_gap_ms: &mut Option<u64>,
74    ) -> Result<(), ParseError> {
75        if !self.consume_ident_ci("SESSION_KEY")? {
76            return Err(ParseError::new(
77                "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78                self.position(),
79            ));
80        }
81        let key = self.expect_ident()?;
82        if !self.consume_ident_ci("SESSION_GAP")? {
83            return Err(ParseError::new(
84                "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85                self.position(),
86            ));
87        }
88        let value = self.parse_float()?;
89        let unit = self.parse_duration_unit()?;
90        *session_key = Some(key);
91        *session_gap_ms = Some((value * unit) as u64);
92        Ok(())
93    }
94
95    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
96    ///
97    /// v0 intentionally establishes only the collection contract. Ingestion,
98    /// series registry, and Prometheus adapter slices build on this metadata.
99    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100        let if_not_exists = self.match_if_not_exists()?;
101        let name = self.expect_ident()?;
102
103        let mut raw_retention_ms = None;
104        let mut tenant_by = None;
105        let mut downsample_policies = Vec::new();
106
107        loop {
108            if self.consume(&Token::Retention)? {
109                let value = self.parse_float()?;
110                let unit = self.parse_duration_unit()?;
111                raw_retention_ms = Some((value * unit) as u64);
112            } else if self.consume_ident_ci("DOWNSAMPLE")? {
113                downsample_policies.push(self.parse_downsample_policy_spec()?);
114                while self.consume(&Token::Comma)? {
115                    downsample_policies.push(self.parse_downsample_policy_spec()?);
116                }
117            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118                self.expect(Token::By)?;
119                self.expect(Token::LParen)?;
120                let mut path = self.expect_ident_or_keyword()?;
121                while self.consume(&Token::Dot)? {
122                    let next = self.expect_ident_or_keyword()?;
123                    path = format!("{path}.{next}");
124                }
125                self.expect(Token::RParen)?;
126                tenant_by = Some(path);
127            } else {
128                break;
129            }
130        }
131
132        Ok(QueryExpr::CreateTable(CreateTableQuery {
133            collection_model: CollectionModel::Metrics,
134            name,
135            columns: Vec::new(),
136            if_not_exists,
137            default_ttl_ms: raw_retention_ms,
138            metrics_rollup_policies: downsample_policies,
139            context_index_fields: Vec::new(),
140            context_index_enabled: false,
141            timestamps: false,
142            partition_by: None,
143            tenant_by,
144            append_only: true,
145            subscriptions: Vec::new(),
146            analytics_config: Vec::new(),
147            vault_own_master_key: false,
148        }))
149    }
150
151    /// Parse CREATE METRIC body (after CREATE METRIC consumed).
152    pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
153        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
154        while self.consume(&Token::Dot)? {
155            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
156            path = format!("{path}.{next}");
157        }
158
159        let mut kind = None;
160        let mut role = None;
161        let mut source: Option<String> = None;
162        let mut query: Option<String> = None;
163        let mut window_ms: Option<u64> = None;
164        let mut time_field: Option<String> = None;
165        loop {
166            if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
167                kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
168            } else if self.consume_ident_ci("ROLE")? {
169                role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
170            } else if self.consume_ident_ci("SOURCE")? {
171                source = Some(self.expect_ident_or_keyword()?);
172            } else if self.consume_ident_ci("QUERY")? {
173                let value = self.parse_literal_value()?;
174                match value {
175                    crate::storage::schema::Value::Text(s) => query = Some(s.to_string()),
176                    other => {
177                        return Err(ParseError::new(
178                            format!("derived metric QUERY expects a string literal, got {other:?}"),
179                            self.position(),
180                        ));
181                    }
182                }
183            } else if self.consume_ident_ci("WINDOW")? {
184                let value = self.parse_float()?;
185                let unit = self.parse_duration_unit()?;
186                window_ms = Some((value * unit) as u64);
187            } else if self.consume_ident_ci("TIME_FIELD")? {
188                time_field = Some(self.expect_ident_or_keyword()?);
189            } else {
190                break;
191            }
192        }
193
194        Ok(QueryExpr::CreateMetric(
195            crate::storage::query::ast::CreateMetricQuery {
196                path,
197                kind: kind.ok_or_else(|| {
198                    ParseError::new(
199                        "metric descriptor requires TYPE or KIND".to_string(),
200                        self.position(),
201                    )
202                })?,
203                role: role.ok_or_else(|| {
204                    ParseError::new(
205                        "metric descriptor requires ROLE".to_string(),
206                        self.position(),
207                    )
208                })?,
209                source,
210                query,
211                window_ms,
212                time_field,
213            },
214        ))
215    }
216
217    /// Parse ALTER METRIC body (after ALTER METRIC consumed).
218    ///
219    /// Grammar:
220    ///   ALTER METRIC <dotted.path> SET ROLE <ident>
221    ///   ALTER METRIC <dotted.path> SET KIND <ident>      -- rejected at runtime
222    ///   ALTER METRIC <dotted.path> SET TYPE <ident>      -- rejected at runtime
223    ///   ALTER METRIC <dotted.path> SET PATH <dotted>     -- rejected at runtime
224    ///
225    /// Immutable-field attempts parse so the runtime can return a
226    /// structured "field X cannot be changed" error explaining *why*.
227    pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
228        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
229        while self.consume(&Token::Dot)? {
230            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
231            path = format!("{path}.{next}");
232        }
233
234        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
235            return Err(ParseError::expected(
236                vec!["SET"],
237                self.peek(),
238                self.position(),
239            ));
240        }
241
242        let mut set_role = None;
243        let mut attempted_kind = None;
244        let mut attempted_path = None;
245
246        if self.consume_ident_ci("ROLE")? {
247            set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
248        } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
249            attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
250        } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
251            let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
252            while self.consume(&Token::Dot)? {
253                let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
254                new_path = format!("{new_path}.{next}");
255            }
256            attempted_path = Some(new_path);
257        } else {
258            return Err(ParseError::expected(
259                vec!["ROLE", "KIND", "TYPE", "PATH"],
260                self.peek(),
261                self.position(),
262            ));
263        }
264
265        Ok(QueryExpr::AlterMetric(
266            crate::storage::query::ast::AlterMetricQuery {
267                path,
268                set_role,
269                attempted_kind,
270                attempted_path,
271            },
272        ))
273    }
274
275    /// Parse CREATE SLO body (after CREATE SLO consumed).
276    ///
277    /// Grammar:
278    ///   CREATE SLO <dotted.path>
279    ///     ON <metric.dotted.path>
280    ///     TARGET <number>
281    ///     WINDOW <number> <duration_unit>
282    ///
283    /// Clauses are positional after the SLO path so the grammar stays
284    /// tight; the parser leaves semantic validation (metric exists, role
285    /// = sli, target in range) to the runtime catalog where the error
286    /// wording can reference the live catalog state.
287    pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
288        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
289        while self.consume(&Token::Dot)? {
290            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
291            path = format!("{path}.{next}");
292        }
293
294        if !self.consume(&Token::On)? {
295            return Err(ParseError::expected(
296                vec!["ON"],
297                self.peek(),
298                self.position(),
299            ));
300        }
301
302        let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
303        while self.consume(&Token::Dot)? {
304            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
305            metric_path = format!("{metric_path}.{next}");
306        }
307
308        let mut target: Option<f64> = None;
309        let mut window_ms: Option<u64> = None;
310
311        loop {
312            if self.consume_ident_ci("TARGET")? {
313                target = Some(self.parse_float()?);
314            } else if self.consume_ident_ci("WINDOW")? {
315                let value = self.parse_float()?;
316                let unit = self.parse_duration_unit()?;
317                window_ms = Some((value * unit) as u64);
318            } else {
319                break;
320            }
321        }
322
323        Ok(QueryExpr::CreateSlo(CreateSloQuery {
324            path,
325            metric_path,
326            target: target.ok_or_else(|| {
327                ParseError::new(
328                    "SLO descriptor requires TARGET <number>".to_string(),
329                    self.position(),
330                )
331            })?,
332            window_ms: window_ms.ok_or_else(|| {
333                ParseError::new(
334                    "SLO descriptor requires WINDOW <duration>".to_string(),
335                    self.position(),
336                )
337            })?,
338        }))
339    }
340
341    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
342    ///
343    ///   CREATE HYPERTABLE metrics
344    ///     TIME_COLUMN ts
345    ///     CHUNK_INTERVAL '1d'
346    ///     [TTL '90d']
347    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
348    ///
349    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
350    /// TIMESERIES`, with the `hypertable` field populated. The
351    /// runtime dispatcher registers the spec on the RedDB-wide
352    /// `HypertableRegistry` alongside creating the collection.
353    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
354        let if_not_exists = self.match_if_not_exists()?;
355        let name = self.expect_ident()?;
356
357        let mut time_column: Option<String> = None;
358        let mut chunk_interval_ns: Option<u64> = None;
359        let mut ttl_ns: Option<u64> = None;
360        let mut retention_ms = None;
361        let mut columnar = false;
362
363        loop {
364            if self.consume_ident_ci("TIME_COLUMN")? {
365                time_column = Some(self.expect_ident()?);
366            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
367                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
368            } else if self.consume_ident_ci("COLUMNAR")? {
369                // `COLUMNAR` — activate columnar analytical storage (#911).
370                columnar = true;
371            } else if self.consume_ident_ci("TTL")? {
372                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
373            } else if self.consume(&Token::Retention)? {
374                let value = self.parse_float()?;
375                let unit = self.parse_duration_unit()?;
376                retention_ms = Some((value * unit) as u64);
377            } else {
378                break;
379            }
380        }
381
382        let time_column = time_column.ok_or_else(|| {
383            ParseError::new(
384                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
385                self.position(),
386            )
387        })?;
388        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
389            ParseError::new(
390                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
391                self.position(),
392            )
393        })?;
394
395        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
396            name,
397            retention_ms,
398            chunk_size: None,
399            downsample_policies: Vec::new(),
400            if_not_exists,
401            hypertable: Some(HypertableDdl {
402                time_column,
403                chunk_interval_ns,
404                default_ttl_ns: ttl_ns,
405            }),
406            session_key: None,
407            session_gap_ms: None,
408            columnar,
409        }))
410    }
411
412    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
413    /// resolve it to nanoseconds using the shared retention grammar.
414    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
415        let pos = self.position();
416        let value = self.parse_literal_value()?;
417        match value {
418            crate::storage::schema::Value::Text(s) => {
419                crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
420                    ParseError::new(
421                        // F-05: `s` is caller-controlled string-literal bytes.
422                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
423                        // before reaching downstream serialization sinks.
424                        // `clause` is a static internal label and stays bare.
425                        format!("{clause} duration {s:?} is not a valid duration literal"),
426                        pos,
427                    )
428                })
429            }
430            other => Err(ParseError::new(
431                format!("{clause} expects a string duration literal, got {other:?}"),
432                pos,
433            )),
434        }
435    }
436
437    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
438    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
439        let if_exists = self.match_if_exists()?;
440        let name = self.parse_drop_collection_name()?;
441        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
442            name,
443            if_exists,
444        }))
445    }
446
447    /// Parse a duration unit and return the multiplier in milliseconds
448    pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
449        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
450        // dedicated tokens, not `Token::Ident`, so they need their
451        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
452        // no canonical duration meaning today but were silently
453        // falling through to the seconds default — surface a clear
454        // error instead.
455        match self.peek().clone() {
456            Token::Ident(ref unit) => {
457                let mult = match unit.to_ascii_lowercase().as_str() {
458                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
459                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
460                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
461                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
462                    "d" | "day" | "days" => 86_400_000.0,
463                    other => {
464                        return Err(ParseError::new(
465                            // F-05: `other` is caller-controlled identifier
466                            // text. Render via `{:?}` so embedded CR/LF/NUL/
467                            // quotes are escaped before the message reaches
468                            // downstream serialization sinks.
469                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
470                            self.position(),
471                        ));
472                    }
473                };
474                self.advance()?;
475                Ok(mult)
476            }
477            Token::Min => {
478                // `MIN` keyword used as the minute alias.
479                self.advance()?;
480                Ok(60_000.0)
481            }
482            Token::Max | Token::Avg => {
483                // These keywords have no duration semantics; reject
484                // explicitly so a stray aggregate keyword does not
485                // silently default to seconds.
486                let kw = self.peek().clone();
487                Err(ParseError::new(
488                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
489                    self.position(),
490                ))
491            }
492            _ => Ok(1_000.0), // default: seconds
493        }
494    }
495
496    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
497        let target = self.parse_resolution_spec()?;
498        self.expect(Token::Colon)?;
499        let source = self.parse_resolution_spec()?;
500        let aggregation = if self.consume(&Token::Colon)? {
501            self.expect_ident_or_keyword()?.to_ascii_lowercase()
502        } else {
503            "avg".to_string()
504        };
505        Ok(format!("{target}:{source}:{aggregation}"))
506    }
507
508    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
509        match self.peek().clone() {
510            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
511                self.advance()?;
512                Ok(value.to_ascii_lowercase())
513            }
514            Token::Integer(value) => {
515                self.advance()?;
516                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
517                Ok(format!("{value}{unit}"))
518            }
519            Token::Float(value) => {
520                self.advance()?;
521                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
522                let number = if value.fract().abs() < f64::EPSILON {
523                    format!("{}", value as i64)
524                } else {
525                    value.to_string()
526                };
527                Ok(format!("{number}{unit}"))
528            }
529            other => Err(ParseError::new(
530                format!(
531                    "expected duration literal for downsample policy, got {}",
532                    other
533                ),
534                self.position(),
535            )),
536        }
537    }
538}