Skip to main content

reddb_rql/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
7    QueryExpr,
8};
9use crate::lexer::Token;
10use reddb_types::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
14    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut retention_ms = None;
19        let mut chunk_size = None;
20        let mut downsample_policies = Vec::new();
21        let mut session_key: Option<String> = None;
22        let mut session_gap_ms: Option<u64> = None;
23        let mut columnar = false;
24
25        // Parse optional clauses in any order
26        loop {
27            if self.consume(&Token::Retention)? {
28                let value = self.parse_float()?;
29                let unit = self.parse_duration_unit()?;
30                retention_ms = Some((value * unit) as u64);
31            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32                chunk_size = Some(self.parse_integer()? as usize);
33            } else if self.consume_ident_ci("COLUMNAR")? {
34                // `COLUMNAR` — activate columnar analytical storage (#911).
35                columnar = true;
36            } else if self.consume_ident_ci("DOWNSAMPLE")? {
37                downsample_policies.push(self.parse_downsample_policy_spec()?);
38                while self.consume(&Token::Comma)? {
39                    downsample_policies.push(self.parse_downsample_policy_spec()?);
40                }
41            } else if self.consume(&Token::With)? {
42                // `WITH SESSION_KEY <col> SESSION_GAP <duration>` — both
43                // clauses are paired so the SESSIONIZE operator (slice
44                // 2+) has a complete default. Order is fixed
45                // (SESSION_KEY first) to keep the grammar simple; one
46                // without the other is a parse error.
47                self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48            } else {
49                break;
50            }
51        }
52
53        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54            name,
55            retention_ms,
56            chunk_size,
57            downsample_policies,
58            if_not_exists,
59            hypertable: None,
60            session_key,
61            session_gap_ms,
62            columnar,
63        }))
64    }
65
66    /// Parse `SESSION_KEY <ident> SESSION_GAP <duration>` after a
67    /// `WITH` token has been consumed. Both clauses are required; a
68    /// SESSION_KEY without a SESSION_GAP (or vice-versa) is rejected
69    /// at parse time so the descriptor never carries half a pairing.
70    fn parse_with_session_clause(
71        &mut self,
72        session_key: &mut Option<String>,
73        session_gap_ms: &mut Option<u64>,
74    ) -> Result<(), ParseError> {
75        if !self.consume_ident_ci("SESSION_KEY")? {
76            return Err(ParseError::new(
77                "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78                self.position(),
79            ));
80        }
81        let key = self.expect_ident()?;
82        if !self.consume_ident_ci("SESSION_GAP")? {
83            return Err(ParseError::new(
84                "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85                self.position(),
86            ));
87        }
88        let value = self.parse_float()?;
89        let unit = self.parse_duration_unit()?;
90        *session_key = Some(key);
91        *session_gap_ms = Some((value * unit) as u64);
92        Ok(())
93    }
94
95    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
96    ///
97    /// v0 intentionally establishes only the collection contract. Ingestion,
98    /// series registry, and Prometheus adapter slices build on this metadata.
99    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100        let if_not_exists = self.match_if_not_exists()?;
101        let name = self.expect_ident()?;
102
103        let mut raw_retention_ms = None;
104        let mut tenant_by = None;
105        let mut downsample_policies = Vec::new();
106
107        loop {
108            if self.consume(&Token::Retention)? {
109                let value = self.parse_float()?;
110                let unit = self.parse_duration_unit()?;
111                raw_retention_ms = Some((value * unit) as u64);
112            } else if self.consume_ident_ci("DOWNSAMPLE")? {
113                downsample_policies.push(self.parse_downsample_policy_spec()?);
114                while self.consume(&Token::Comma)? {
115                    downsample_policies.push(self.parse_downsample_policy_spec()?);
116                }
117            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118                self.expect(Token::By)?;
119                self.expect(Token::LParen)?;
120                let mut path = self.expect_ident_or_keyword()?;
121                while self.consume(&Token::Dot)? {
122                    let next = self.expect_ident_or_keyword()?;
123                    path = format!("{path}.{next}");
124                }
125                self.expect(Token::RParen)?;
126                tenant_by = Some(path);
127            } else {
128                break;
129            }
130        }
131
132        Ok(QueryExpr::CreateTable(CreateTableQuery {
133            collection_model: CollectionModel::Metrics,
134            name,
135            columns: Vec::new(),
136            if_not_exists,
137            default_ttl_ms: raw_retention_ms,
138            metrics_rollup_policies: downsample_policies,
139            context_index_fields: Vec::new(),
140            context_index_enabled: false,
141            timestamps: false,
142            partition_by: None,
143            tenant_by,
144            append_only: true,
145            subscriptions: Vec::new(),
146            analytics_config: Vec::new(),
147            vault_own_master_key: false,
148        }))
149    }
150
151    /// Parse CREATE METRIC body (after CREATE METRIC consumed).
152    pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
153        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
154        while self.consume(&Token::Dot)? {
155            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
156            path = format!("{path}.{next}");
157        }
158
159        let mut kind = None;
160        let mut role = None;
161        let mut source: Option<String> = None;
162        let mut query: Option<String> = None;
163        let mut window_ms: Option<u64> = None;
164        let mut time_field: Option<String> = None;
165        loop {
166            if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
167                kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
168            } else if self.consume_ident_ci("ROLE")? {
169                role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
170            } else if self.consume_ident_ci("SOURCE")? {
171                source = Some(self.expect_ident_or_keyword()?);
172            } else if self.consume_ident_ci("QUERY")? {
173                let value = self.parse_literal_value()?;
174                match value {
175                    reddb_types::types::Value::Text(s) => query = Some(s.to_string()),
176                    other => {
177                        return Err(ParseError::new(
178                            format!("derived metric QUERY expects a string literal, got {other:?}"),
179                            self.position(),
180                        ));
181                    }
182                }
183            } else if self.consume_ident_ci("WINDOW")? {
184                let value = self.parse_float()?;
185                let unit = self.parse_duration_unit()?;
186                window_ms = Some((value * unit) as u64);
187            } else if self.consume_ident_ci("TIME_FIELD")? {
188                time_field = Some(self.expect_ident_or_keyword()?);
189            } else {
190                break;
191            }
192        }
193
194        Ok(QueryExpr::CreateMetric(crate::ast::CreateMetricQuery {
195            path,
196            kind: kind.ok_or_else(|| {
197                ParseError::new(
198                    "metric descriptor requires TYPE or KIND".to_string(),
199                    self.position(),
200                )
201            })?,
202            role: role.ok_or_else(|| {
203                ParseError::new(
204                    "metric descriptor requires ROLE".to_string(),
205                    self.position(),
206                )
207            })?,
208            source,
209            query,
210            window_ms,
211            time_field,
212        }))
213    }
214
215    /// Parse ALTER METRIC body (after ALTER METRIC consumed).
216    ///
217    /// Grammar:
218    ///   ALTER METRIC <dotted.path> SET ROLE <ident>
219    ///   ALTER METRIC <dotted.path> SET KIND <ident>      -- rejected at runtime
220    ///   ALTER METRIC <dotted.path> SET TYPE <ident>      -- rejected at runtime
221    ///   ALTER METRIC <dotted.path> SET PATH <dotted>     -- rejected at runtime
222    ///
223    /// Immutable-field attempts parse so the runtime can return a
224    /// structured "field X cannot be changed" error explaining *why*.
225    pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
226        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
227        while self.consume(&Token::Dot)? {
228            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
229            path = format!("{path}.{next}");
230        }
231
232        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
233            return Err(ParseError::expected(
234                vec!["SET"],
235                self.peek(),
236                self.position(),
237            ));
238        }
239
240        let mut set_role = None;
241        let mut attempted_kind = None;
242        let mut attempted_path = None;
243
244        if self.consume_ident_ci("ROLE")? {
245            set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
246        } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
247            attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
248        } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
249            let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
250            while self.consume(&Token::Dot)? {
251                let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
252                new_path = format!("{new_path}.{next}");
253            }
254            attempted_path = Some(new_path);
255        } else {
256            return Err(ParseError::expected(
257                vec!["ROLE", "KIND", "TYPE", "PATH"],
258                self.peek(),
259                self.position(),
260            ));
261        }
262
263        Ok(QueryExpr::AlterMetric(crate::ast::AlterMetricQuery {
264            path,
265            set_role,
266            attempted_kind,
267            attempted_path,
268        }))
269    }
270
271    /// Parse CREATE SLO body (after CREATE SLO consumed).
272    ///
273    /// Grammar:
274    ///   CREATE SLO <dotted.path>
275    ///     ON <metric.dotted.path>
276    ///     TARGET <number>
277    ///     WINDOW <number> <duration_unit>
278    ///
279    /// Clauses are positional after the SLO path so the grammar stays
280    /// tight; the parser leaves semantic validation (metric exists, role
281    /// = sli, target in range) to the runtime catalog where the error
282    /// wording can reference the live catalog state.
283    pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
284        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
285        while self.consume(&Token::Dot)? {
286            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
287            path = format!("{path}.{next}");
288        }
289
290        if !self.consume(&Token::On)? {
291            return Err(ParseError::expected(
292                vec!["ON"],
293                self.peek(),
294                self.position(),
295            ));
296        }
297
298        let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
299        while self.consume(&Token::Dot)? {
300            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
301            metric_path = format!("{metric_path}.{next}");
302        }
303
304        let mut target: Option<f64> = None;
305        let mut window_ms: Option<u64> = None;
306
307        loop {
308            if self.consume_ident_ci("TARGET")? {
309                target = Some(self.parse_float()?);
310            } else if self.consume_ident_ci("WINDOW")? {
311                let value = self.parse_float()?;
312                let unit = self.parse_duration_unit()?;
313                window_ms = Some((value * unit) as u64);
314            } else {
315                break;
316            }
317        }
318
319        Ok(QueryExpr::CreateSlo(CreateSloQuery {
320            path,
321            metric_path,
322            target: target.ok_or_else(|| {
323                ParseError::new(
324                    "SLO descriptor requires TARGET <number>".to_string(),
325                    self.position(),
326                )
327            })?,
328            window_ms: window_ms.ok_or_else(|| {
329                ParseError::new(
330                    "SLO descriptor requires WINDOW <duration>".to_string(),
331                    self.position(),
332                )
333            })?,
334        }))
335    }
336
337    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
338    ///
339    ///   CREATE HYPERTABLE metrics
340    ///     TIME_COLUMN ts
341    ///     CHUNK_INTERVAL '1d'
342    ///     [TTL '90d']
343    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
344    ///
345    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
346    /// TIMESERIES`, with the `hypertable` field populated. The
347    /// runtime dispatcher registers the spec on the RedDB-wide
348    /// `HypertableRegistry` alongside creating the collection.
349    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
350        let if_not_exists = self.match_if_not_exists()?;
351        let name = self.expect_ident()?;
352
353        let mut time_column: Option<String> = None;
354        let mut chunk_interval_ns: Option<u64> = None;
355        let mut ttl_ns: Option<u64> = None;
356        let mut retention_ms = None;
357        let mut columnar = false;
358
359        loop {
360            if self.consume_ident_ci("TIME_COLUMN")? {
361                time_column = Some(self.expect_ident()?);
362            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
363                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
364            } else if self.consume_ident_ci("COLUMNAR")? {
365                // `COLUMNAR` — activate columnar analytical storage (#911).
366                columnar = true;
367            } else if self.consume_ident_ci("TTL")? {
368                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
369            } else if self.consume(&Token::Retention)? {
370                let value = self.parse_float()?;
371                let unit = self.parse_duration_unit()?;
372                retention_ms = Some((value * unit) as u64);
373            } else {
374                break;
375            }
376        }
377
378        let time_column = time_column.ok_or_else(|| {
379            ParseError::new(
380                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
381                self.position(),
382            )
383        })?;
384        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
385            ParseError::new(
386                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
387                self.position(),
388            )
389        })?;
390
391        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
392            name,
393            retention_ms,
394            chunk_size: None,
395            downsample_policies: Vec::new(),
396            if_not_exists,
397            hypertable: Some(HypertableDdl {
398                time_column,
399                chunk_interval_ns,
400                default_ttl_ns: ttl_ns,
401            }),
402            session_key: None,
403            session_gap_ms: None,
404            columnar,
405        }))
406    }
407
408    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
409    /// resolve it to nanoseconds using the shared retention grammar.
410    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
411        let pos = self.position();
412        let value = self.parse_literal_value()?;
413        match value {
414            reddb_types::types::Value::Text(s) => {
415                reddb_types::duration::parse_duration_ns(&s).ok_or_else(|| {
416                    ParseError::new(
417                        // F-05: `s` is caller-controlled string-literal bytes.
418                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
419                        // before reaching downstream serialization sinks.
420                        // `clause` is a static internal label and stays bare.
421                        format!("{clause} duration {s:?} is not a valid duration literal"),
422                        pos,
423                    )
424                })
425            }
426            other => Err(ParseError::new(
427                format!("{clause} expects a string duration literal, got {other:?}"),
428                pos,
429            )),
430        }
431    }
432
433    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
434    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
435        let if_exists = self.match_if_exists()?;
436        let name = self.parse_drop_collection_name()?;
437        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
438            name,
439            if_exists,
440        }))
441    }
442
443    /// Parse a duration unit and return the multiplier in milliseconds
444    pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
445        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
446        // dedicated tokens, not `Token::Ident`, so they need their
447        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
448        // no canonical duration meaning today but were silently
449        // falling through to the seconds default — surface a clear
450        // error instead.
451        match self.peek().clone() {
452            Token::Ident(ref unit) => {
453                let mult = match unit.to_ascii_lowercase().as_str() {
454                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
455                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
456                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
457                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
458                    "d" | "day" | "days" => 86_400_000.0,
459                    other => {
460                        return Err(ParseError::new(
461                            // F-05: `other` is caller-controlled identifier
462                            // text. Render via `{:?}` so embedded CR/LF/NUL/
463                            // quotes are escaped before the message reaches
464                            // downstream serialization sinks.
465                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
466                            self.position(),
467                        ));
468                    }
469                };
470                self.advance()?;
471                Ok(mult)
472            }
473            Token::Min => {
474                // `MIN` keyword used as the minute alias.
475                self.advance()?;
476                Ok(60_000.0)
477            }
478            Token::Max | Token::Avg => {
479                // These keywords have no duration semantics; reject
480                // explicitly so a stray aggregate keyword does not
481                // silently default to seconds.
482                let kw = self.peek().clone();
483                Err(ParseError::new(
484                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
485                    self.position(),
486                ))
487            }
488            _ => Ok(1_000.0), // default: seconds
489        }
490    }
491
492    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
493        let target = self.parse_resolution_spec()?;
494        self.expect(Token::Colon)?;
495        let source = self.parse_resolution_spec()?;
496        let aggregation = if self.consume(&Token::Colon)? {
497            self.expect_ident_or_keyword()?.to_ascii_lowercase()
498        } else {
499            "avg".to_string()
500        };
501        Ok(format!("{target}:{source}:{aggregation}"))
502    }
503
504    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
505        match self.peek().clone() {
506            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
507                self.advance()?;
508                Ok(value.to_ascii_lowercase())
509            }
510            Token::Integer(value) => {
511                self.advance()?;
512                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
513                Ok(format!("{value}{unit}"))
514            }
515            Token::Float(value) => {
516                self.advance()?;
517                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
518                let number = if value.fract().abs() < f64::EPSILON {
519                    format!("{}", value as i64)
520                } else {
521                    value.to_string()
522                };
523                Ok(format!("{number}{unit}"))
524            }
525            other => Err(ParseError::new(
526                format!(
527                    "expected duration literal for downsample policy, got {}",
528                    other
529                ),
530                self.position(),
531            )),
532        }
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use reddb_types::catalog::CollectionModel;
540
541    fn parse_query(input: &str) -> Result<QueryExpr, ParseError> {
542        crate::parser::parse(input).map(|query| query.query)
543    }
544
545    #[test]
546    fn create_timeseries_accepts_clause_order_defaults_and_columnar() {
547        let query = parse_query(
548            "CREATE TIMESERIES IF NOT EXISTS readings COLUMNAR DOWNSAMPLE 1h:raw \
549             RETENTION 2 h CHUNKSIZE 64",
550        )
551        .unwrap();
552
553        let QueryExpr::CreateTimeSeries(timeseries) = query else {
554            panic!("expected create timeseries");
555        };
556        assert_eq!(timeseries.name, "readings");
557        assert!(timeseries.if_not_exists);
558        assert!(timeseries.columnar);
559        assert_eq!(timeseries.retention_ms, Some(2 * 3_600_000));
560        assert_eq!(timeseries.chunk_size, Some(64));
561        assert_eq!(timeseries.downsample_policies, vec!["1h:raw:avg"]);
562        assert_eq!(timeseries.session_key, None);
563        assert_eq!(timeseries.session_gap_ms, None);
564        assert!(timeseries.hypertable.is_none());
565    }
566
567    #[test]
568    fn create_metrics_sets_collection_defaults_and_optional_clauses() {
569        let query = parse_query(
570            "CREATE METRICS IF NOT EXISTS telemetry RETENTION 30 m \
571             DOWNSAMPLE 5m:raw:max TENANT BY (ctx.tenant)",
572        )
573        .unwrap();
574
575        let QueryExpr::CreateTable(metrics) = query else {
576            panic!("expected metrics collection");
577        };
578        assert_eq!(metrics.collection_model, CollectionModel::Metrics);
579        assert_eq!(metrics.name, "telemetry");
580        assert!(metrics.if_not_exists);
581        assert_eq!(metrics.default_ttl_ms, Some(30 * 60_000));
582        assert_eq!(metrics.metrics_rollup_policies, vec!["5m:raw:max"]);
583        assert_eq!(metrics.tenant_by.as_deref(), Some("ctx.tenant"));
584        assert!(metrics.append_only);
585        assert!(metrics.columns.is_empty());
586    }
587
588    #[test]
589    fn create_metric_alter_metric_and_slo_parse_descriptor_forms() {
590        let query = parse_query(
591            "CREATE METRIC Svc.Latency.P99 TYPE gauge ROLE sli SOURCE rollups \
592             QUERY 'SELECT p99 FROM rollups' WINDOW 5 min TIME_FIELD observed_at",
593        )
594        .unwrap();
595        let QueryExpr::CreateMetric(metric) = query else {
596            panic!("expected create metric");
597        };
598        assert_eq!(metric.path, "svc.latency.p99");
599        assert_eq!(metric.kind, "gauge");
600        assert_eq!(metric.role, "sli");
601        assert_eq!(metric.source.as_deref(), Some("rollups"));
602        assert_eq!(metric.query.as_deref(), Some("SELECT p99 FROM rollups"));
603        assert_eq!(metric.window_ms, Some(5 * 60_000));
604        assert_eq!(metric.time_field.as_deref(), Some("observed_at"));
605
606        let query = parse_query("ALTER METRIC Svc.Latency.P99 SET PATH svc.latency.p95").unwrap();
607        let QueryExpr::AlterMetric(alter) = query else {
608            panic!("expected alter metric");
609        };
610        assert_eq!(alter.path, "svc.latency.p99");
611        assert_eq!(alter.set_role, None);
612        assert_eq!(alter.attempted_kind, None);
613        assert_eq!(alter.attempted_path.as_deref(), Some("svc.latency.p95"));
614
615        let query =
616            parse_query("CREATE SLO Api.Availability ON Svc.Latency.P99 TARGET 0.999 WINDOW 28 d")
617                .unwrap();
618        let QueryExpr::CreateSlo(slo) = query else {
619            panic!("expected create slo");
620        };
621        assert_eq!(slo.path, "api.availability");
622        assert_eq!(slo.metric_path, "svc.latency.p99");
623        assert!((slo.target - 0.999).abs() < f64::EPSILON);
624        assert_eq!(slo.window_ms, 28 * 86_400_000);
625    }
626
627    #[test]
628    fn create_hypertable_and_drop_timeseries_parse_variants() {
629        let query = parse_query(
630            "CREATE HYPERTABLE IF NOT EXISTS events TIME_COLUMN ts COLUMNAR \
631             CHUNK_INTERVAL '30m' TTL '10s' RETENTION 1 h",
632        )
633        .unwrap();
634        let QueryExpr::CreateTimeSeries(timeseries) = query else {
635            panic!("expected hypertable as timeseries");
636        };
637        let hypertable = timeseries.hypertable.expect("hypertable ddl");
638        assert_eq!(timeseries.name, "events");
639        assert!(timeseries.if_not_exists);
640        assert!(timeseries.columnar);
641        assert_eq!(timeseries.retention_ms, Some(3_600_000));
642        assert_eq!(hypertable.time_column, "ts");
643        assert_eq!(hypertable.chunk_interval_ns, 30 * 60 * 1_000_000_000);
644        assert_eq!(hypertable.default_ttl_ns, Some(10 * 1_000_000_000));
645
646        let query = parse_query("DROP TIMESERIES IF EXISTS tenant.metrics.*").unwrap();
647        assert!(matches!(
648            query,
649            QueryExpr::DropTimeSeries(drop) if drop.name == "tenant.metrics.*" && drop.if_exists
650        ));
651    }
652
653    #[test]
654    fn timeseries_metric_and_slo_errors_are_reported() {
655        for sql in [
656            "CREATE TIMESERIES events WITH RETENTION 1 d",
657            "CREATE METRICS telemetry TENANT (ctx.tenant)",
658            "CREATE METRIC svc.latency TYPE gauge",
659            "CREATE METRIC svc.latency TYPE gauge ROLE sli QUERY 42",
660            "ALTER METRIC svc.latency ROLE sli",
661            "CREATE SLO api.availability ON svc.latency WINDOW 1 h",
662            "CREATE HYPERTABLE events TIME_COLUMN ts CHUNK_INTERVAL 'not-duration'",
663        ] {
664            assert!(parse_query(sql).is_err(), "{sql} should not parse");
665        }
666    }
667}