Skip to main content

reddb_rql/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
7    QueryExpr,
8};
9use crate::lexer::Token;
10use reddb_types::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
14    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15        let if_not_exists = self.match_if_not_exists()?;
16        let name = self.expect_ident()?;
17
18        let mut retention_ms = None;
19        let mut chunk_size = None;
20        let mut downsample_policies = Vec::new();
21        let mut session_key: Option<String> = None;
22        let mut session_gap_ms: Option<u64> = None;
23        let mut columnar = false;
24
25        // Parse optional clauses in any order
26        loop {
27            if self.consume(&Token::Retention)? {
28                let value = self.parse_float()?;
29                let unit = self.parse_duration_unit()?;
30                retention_ms = Some((value * unit) as u64);
31            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32                chunk_size = Some(self.parse_integer()? as usize);
33            } else if self.consume_ident_ci("COLUMNAR")? {
34                // `COLUMNAR` — activate columnar analytical storage (#911).
35                columnar = true;
36            } else if self.consume_ident_ci("DOWNSAMPLE")? {
37                downsample_policies.push(self.parse_downsample_policy_spec()?);
38                while self.consume(&Token::Comma)? {
39                    downsample_policies.push(self.parse_downsample_policy_spec()?);
40                }
41            } else if self.consume(&Token::With)? {
42                // `WITH SESSION_KEY <col> SESSION_GAP <duration>` — both
43                // clauses are paired so the SESSIONIZE operator (slice
44                // 2+) has a complete default. Order is fixed
45                // (SESSION_KEY first) to keep the grammar simple; one
46                // without the other is a parse error.
47                self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48            } else {
49                break;
50            }
51        }
52
53        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54            name,
55            retention_ms,
56            chunk_size,
57            downsample_policies,
58            if_not_exists,
59            hypertable: None,
60            session_key,
61            session_gap_ms,
62            columnar,
63        }))
64    }
65
66    /// Parse `SESSION_KEY <ident> SESSION_GAP <duration>` after a
67    /// `WITH` token has been consumed. Both clauses are required; a
68    /// SESSION_KEY without a SESSION_GAP (or vice-versa) is rejected
69    /// at parse time so the descriptor never carries half a pairing.
70    fn parse_with_session_clause(
71        &mut self,
72        session_key: &mut Option<String>,
73        session_gap_ms: &mut Option<u64>,
74    ) -> Result<(), ParseError> {
75        if !self.consume_ident_ci("SESSION_KEY")? {
76            return Err(ParseError::new(
77                "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78                self.position(),
79            ));
80        }
81        let key = self.expect_ident()?;
82        if !self.consume_ident_ci("SESSION_GAP")? {
83            return Err(ParseError::new(
84                "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85                self.position(),
86            ));
87        }
88        let value = self.parse_float()?;
89        let unit = self.parse_duration_unit()?;
90        *session_key = Some(key);
91        *session_gap_ms = Some((value * unit) as u64);
92        Ok(())
93    }
94
95    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
96    ///
97    /// v0 intentionally establishes only the collection contract. Ingestion,
98    /// series registry, and Prometheus adapter slices build on this metadata.
99    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100        let if_not_exists = self.match_if_not_exists()?;
101        let name = self.expect_ident()?;
102
103        let mut raw_retention_ms = None;
104        let mut tenant_by = None;
105        let mut downsample_policies = Vec::new();
106
107        loop {
108            if self.consume(&Token::Retention)? {
109                let value = self.parse_float()?;
110                let unit = self.parse_duration_unit()?;
111                raw_retention_ms = Some((value * unit) as u64);
112            } else if self.consume_ident_ci("DOWNSAMPLE")? {
113                downsample_policies.push(self.parse_downsample_policy_spec()?);
114                while self.consume(&Token::Comma)? {
115                    downsample_policies.push(self.parse_downsample_policy_spec()?);
116                }
117            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118                self.expect(Token::By)?;
119                self.expect(Token::LParen)?;
120                let mut path = self.expect_ident_or_keyword()?;
121                while self.consume(&Token::Dot)? {
122                    let next = self.expect_ident_or_keyword()?;
123                    path = format!("{path}.{next}");
124                }
125                self.expect(Token::RParen)?;
126                tenant_by = Some(path);
127            } else {
128                break;
129            }
130        }
131
132        Ok(QueryExpr::CreateTable(CreateTableQuery {
133            collection_model: CollectionModel::Metrics,
134            name,
135            columns: Vec::new(),
136            if_not_exists,
137            default_ttl_ms: raw_retention_ms,
138            metrics_rollup_policies: downsample_policies,
139            context_index_fields: Vec::new(),
140            context_index_enabled: false,
141            timestamps: false,
142            partition_by: None,
143            tenant_by,
144            append_only: true,
145            subscriptions: Vec::new(),
146            analytics_config: Vec::new(),
147            vault_own_master_key: false,
148            ai_policy: None,
149        }))
150    }
151
152    /// Parse CREATE METRIC body (after CREATE METRIC consumed).
153    pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
154        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
155        while self.consume(&Token::Dot)? {
156            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
157            path = format!("{path}.{next}");
158        }
159
160        let mut kind = None;
161        let mut role = None;
162        let mut source: Option<String> = None;
163        let mut query: Option<String> = None;
164        let mut window_ms: Option<u64> = None;
165        let mut time_field: Option<String> = None;
166        loop {
167            if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
168                kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
169            } else if self.consume_ident_ci("ROLE")? {
170                role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
171            } else if self.consume_ident_ci("SOURCE")? {
172                source = Some(self.expect_ident_or_keyword()?);
173            } else if self.consume_ident_ci("QUERY")? {
174                let value = self.parse_literal_value()?;
175                match value {
176                    reddb_types::types::Value::Text(s) => query = Some(s.to_string()),
177                    other => {
178                        return Err(ParseError::new(
179                            format!("derived metric QUERY expects a string literal, got {other:?}"),
180                            self.position(),
181                        ));
182                    }
183                }
184            } else if self.consume_ident_ci("WINDOW")? {
185                let value = self.parse_float()?;
186                let unit = self.parse_duration_unit()?;
187                window_ms = Some((value * unit) as u64);
188            } else if self.consume_ident_ci("TIME_FIELD")? {
189                time_field = Some(self.expect_ident_or_keyword()?);
190            } else {
191                break;
192            }
193        }
194
195        Ok(QueryExpr::CreateMetric(crate::ast::CreateMetricQuery {
196            path,
197            kind: kind.ok_or_else(|| {
198                ParseError::new(
199                    "metric descriptor requires TYPE or KIND".to_string(),
200                    self.position(),
201                )
202            })?,
203            role: role.ok_or_else(|| {
204                ParseError::new(
205                    "metric descriptor requires ROLE".to_string(),
206                    self.position(),
207                )
208            })?,
209            source,
210            query,
211            window_ms,
212            time_field,
213        }))
214    }
215
216    /// Parse ALTER METRIC body (after ALTER METRIC consumed).
217    ///
218    /// Grammar:
219    ///   ALTER METRIC <dotted.path> SET ROLE <ident>
220    ///   ALTER METRIC <dotted.path> SET KIND <ident>      -- rejected at runtime
221    ///   ALTER METRIC <dotted.path> SET TYPE <ident>      -- rejected at runtime
222    ///   ALTER METRIC <dotted.path> SET PATH <dotted>     -- rejected at runtime
223    ///
224    /// Immutable-field attempts parse so the runtime can return a
225    /// structured "field X cannot be changed" error explaining *why*.
226    pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
227        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
228        while self.consume(&Token::Dot)? {
229            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
230            path = format!("{path}.{next}");
231        }
232
233        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
234            return Err(ParseError::expected(
235                vec!["SET"],
236                self.peek(),
237                self.position(),
238            ));
239        }
240
241        let mut set_role = None;
242        let mut attempted_kind = None;
243        let mut attempted_path = None;
244
245        if self.consume_ident_ci("ROLE")? {
246            set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
247        } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
248            attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
249        } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
250            let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
251            while self.consume(&Token::Dot)? {
252                let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
253                new_path = format!("{new_path}.{next}");
254            }
255            attempted_path = Some(new_path);
256        } else {
257            return Err(ParseError::expected(
258                vec!["ROLE", "KIND", "TYPE", "PATH"],
259                self.peek(),
260                self.position(),
261            ));
262        }
263
264        Ok(QueryExpr::AlterMetric(crate::ast::AlterMetricQuery {
265            path,
266            set_role,
267            attempted_kind,
268            attempted_path,
269        }))
270    }
271
272    /// Parse CREATE SLO body (after CREATE SLO consumed).
273    ///
274    /// Grammar:
275    ///   CREATE SLO <dotted.path>
276    ///     ON <metric.dotted.path>
277    ///     TARGET <number>
278    ///     WINDOW <number> <duration_unit>
279    ///
280    /// Clauses are positional after the SLO path so the grammar stays
281    /// tight; the parser leaves semantic validation (metric exists, role
282    /// = sli, target in range) to the runtime catalog where the error
283    /// wording can reference the live catalog state.
284    pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
285        let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
286        while self.consume(&Token::Dot)? {
287            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
288            path = format!("{path}.{next}");
289        }
290
291        if !self.consume(&Token::On)? {
292            return Err(ParseError::expected(
293                vec!["ON"],
294                self.peek(),
295                self.position(),
296            ));
297        }
298
299        let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
300        while self.consume(&Token::Dot)? {
301            let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
302            metric_path = format!("{metric_path}.{next}");
303        }
304
305        let mut target: Option<f64> = None;
306        let mut window_ms: Option<u64> = None;
307
308        loop {
309            if self.consume_ident_ci("TARGET")? {
310                target = Some(self.parse_float()?);
311            } else if self.consume_ident_ci("WINDOW")? {
312                let value = self.parse_float()?;
313                let unit = self.parse_duration_unit()?;
314                window_ms = Some((value * unit) as u64);
315            } else {
316                break;
317            }
318        }
319
320        Ok(QueryExpr::CreateSlo(CreateSloQuery {
321            path,
322            metric_path,
323            target: target.ok_or_else(|| {
324                ParseError::new(
325                    "SLO descriptor requires TARGET <number>".to_string(),
326                    self.position(),
327                )
328            })?,
329            window_ms: window_ms.ok_or_else(|| {
330                ParseError::new(
331                    "SLO descriptor requires WINDOW <duration>".to_string(),
332                    self.position(),
333                )
334            })?,
335        }))
336    }
337
338    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
339    ///
340    ///   CREATE HYPERTABLE metrics
341    ///     TIME_COLUMN ts
342    ///     CHUNK_INTERVAL '1d'
343    ///     [TTL '90d']
344    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
345    ///
346    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
347    /// TIMESERIES`, with the `hypertable` field populated. The
348    /// runtime dispatcher registers the spec on the RedDB-wide
349    /// `HypertableRegistry` alongside creating the collection.
350    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
351        let if_not_exists = self.match_if_not_exists()?;
352        let name = self.expect_ident()?;
353
354        let mut time_column: Option<String> = None;
355        let mut chunk_interval_ns: Option<u64> = None;
356        let mut ttl_ns: Option<u64> = None;
357        let mut retention_ms = None;
358        let mut columnar = false;
359
360        loop {
361            if self.consume_ident_ci("TIME_COLUMN")? {
362                time_column = Some(self.expect_ident()?);
363            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
364                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
365            } else if self.consume_ident_ci("COLUMNAR")? {
366                // `COLUMNAR` — activate columnar analytical storage (#911).
367                columnar = true;
368            } else if self.consume_ident_ci("TTL")? {
369                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
370            } else if self.consume(&Token::Retention)? {
371                let value = self.parse_float()?;
372                let unit = self.parse_duration_unit()?;
373                retention_ms = Some((value * unit) as u64);
374            } else {
375                break;
376            }
377        }
378
379        let time_column = time_column.ok_or_else(|| {
380            ParseError::new(
381                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
382                self.position(),
383            )
384        })?;
385        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
386            ParseError::new(
387                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
388                self.position(),
389            )
390        })?;
391
392        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
393            name,
394            retention_ms,
395            chunk_size: None,
396            downsample_policies: Vec::new(),
397            if_not_exists,
398            hypertable: Some(HypertableDdl {
399                time_column,
400                chunk_interval_ns,
401                default_ttl_ns: ttl_ns,
402            }),
403            session_key: None,
404            session_gap_ms: None,
405            columnar,
406        }))
407    }
408
409    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
410    /// resolve it to nanoseconds using the shared retention grammar.
411    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
412        let pos = self.position();
413        let value = self.parse_literal_value()?;
414        match value {
415            reddb_types::types::Value::Text(s) => {
416                reddb_types::duration::parse_duration_ns(&s).ok_or_else(|| {
417                    ParseError::new(
418                        // F-05: `s` is caller-controlled string-literal bytes.
419                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
420                        // before reaching downstream serialization sinks.
421                        // `clause` is a static internal label and stays bare.
422                        format!("{clause} duration {s:?} is not a valid duration literal"),
423                        pos,
424                    )
425                })
426            }
427            other => Err(ParseError::new(
428                format!("{clause} expects a string duration literal, got {other:?}"),
429                pos,
430            )),
431        }
432    }
433
434    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
435    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
436        let if_exists = self.match_if_exists()?;
437        let name = self.parse_drop_collection_name()?;
438        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
439            name,
440            if_exists,
441        }))
442    }
443
444    /// Parse a duration unit and return the multiplier in milliseconds
445    pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
446        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
447        // dedicated tokens, not `Token::Ident`, so they need their
448        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
449        // no canonical duration meaning today but were silently
450        // falling through to the seconds default — surface a clear
451        // error instead.
452        match self.peek().clone() {
453            Token::Ident(ref unit) => {
454                let mult = match unit.to_ascii_lowercase().as_str() {
455                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
456                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
457                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
458                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
459                    "d" | "day" | "days" => 86_400_000.0,
460                    other => {
461                        return Err(ParseError::new(
462                            // F-05: `other` is caller-controlled identifier
463                            // text. Render via `{:?}` so embedded CR/LF/NUL/
464                            // quotes are escaped before the message reaches
465                            // downstream serialization sinks.
466                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
467                            self.position(),
468                        ));
469                    }
470                };
471                self.advance()?;
472                Ok(mult)
473            }
474            Token::Min => {
475                // `MIN` keyword used as the minute alias.
476                self.advance()?;
477                Ok(60_000.0)
478            }
479            Token::Max | Token::Avg => {
480                // These keywords have no duration semantics; reject
481                // explicitly so a stray aggregate keyword does not
482                // silently default to seconds.
483                let kw = self.peek().clone();
484                Err(ParseError::new(
485                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
486                    self.position(),
487                ))
488            }
489            _ => Ok(1_000.0), // default: seconds
490        }
491    }
492
493    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
494        let target = self.parse_resolution_spec()?;
495        self.expect(Token::Colon)?;
496        let source = self.parse_resolution_spec()?;
497        let aggregation = if self.consume(&Token::Colon)? {
498            self.expect_ident_or_keyword()?.to_ascii_lowercase()
499        } else {
500            "avg".to_string()
501        };
502        Ok(format!("{target}:{source}:{aggregation}"))
503    }
504
505    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
506        match self.peek().clone() {
507            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
508                self.advance()?;
509                Ok(value.to_ascii_lowercase())
510            }
511            Token::Integer(value) => {
512                self.advance()?;
513                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
514                Ok(format!("{value}{unit}"))
515            }
516            Token::Float(value) => {
517                self.advance()?;
518                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
519                let number = if value.fract().abs() < f64::EPSILON {
520                    format!("{}", value as i64)
521                } else {
522                    value.to_string()
523                };
524                Ok(format!("{number}{unit}"))
525            }
526            other => Err(ParseError::new(
527                format!(
528                    "expected duration literal for downsample policy, got {}",
529                    other
530                ),
531                self.position(),
532            )),
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use reddb_types::catalog::CollectionModel;
541
542    fn parse_query(input: &str) -> Result<QueryExpr, ParseError> {
543        crate::parser::parse(input).map(|query| query.query)
544    }
545
546    #[test]
547    fn create_timeseries_accepts_clause_order_defaults_and_columnar() {
548        let query = parse_query(
549            "CREATE TIMESERIES IF NOT EXISTS readings COLUMNAR DOWNSAMPLE 1h:raw \
550             RETENTION 2 h CHUNKSIZE 64",
551        )
552        .unwrap();
553
554        let QueryExpr::CreateTimeSeries(timeseries) = query else {
555            panic!("expected create timeseries");
556        };
557        assert_eq!(timeseries.name, "readings");
558        assert!(timeseries.if_not_exists);
559        assert!(timeseries.columnar);
560        assert_eq!(timeseries.retention_ms, Some(2 * 3_600_000));
561        assert_eq!(timeseries.chunk_size, Some(64));
562        assert_eq!(timeseries.downsample_policies, vec!["1h:raw:avg"]);
563        assert_eq!(timeseries.session_key, None);
564        assert_eq!(timeseries.session_gap_ms, None);
565        assert!(timeseries.hypertable.is_none());
566    }
567
568    #[test]
569    fn create_metrics_sets_collection_defaults_and_optional_clauses() {
570        let query = parse_query(
571            "CREATE METRICS IF NOT EXISTS telemetry RETENTION 30 m \
572             DOWNSAMPLE 5m:raw:max TENANT BY (ctx.tenant)",
573        )
574        .unwrap();
575
576        let QueryExpr::CreateTable(metrics) = query else {
577            panic!("expected metrics collection");
578        };
579        assert_eq!(metrics.collection_model, CollectionModel::Metrics);
580        assert_eq!(metrics.name, "telemetry");
581        assert!(metrics.if_not_exists);
582        assert_eq!(metrics.default_ttl_ms, Some(30 * 60_000));
583        assert_eq!(metrics.metrics_rollup_policies, vec!["5m:raw:max"]);
584        assert_eq!(metrics.tenant_by.as_deref(), Some("ctx.tenant"));
585        assert!(metrics.append_only);
586        assert!(metrics.columns.is_empty());
587    }
588
589    #[test]
590    fn create_metric_alter_metric_and_slo_parse_descriptor_forms() {
591        let query = parse_query(
592            "CREATE METRIC Svc.Latency.P99 TYPE gauge ROLE sli SOURCE rollups \
593             QUERY 'SELECT p99 FROM rollups' WINDOW 5 min TIME_FIELD observed_at",
594        )
595        .unwrap();
596        let QueryExpr::CreateMetric(metric) = query else {
597            panic!("expected create metric");
598        };
599        assert_eq!(metric.path, "svc.latency.p99");
600        assert_eq!(metric.kind, "gauge");
601        assert_eq!(metric.role, "sli");
602        assert_eq!(metric.source.as_deref(), Some("rollups"));
603        assert_eq!(metric.query.as_deref(), Some("SELECT p99 FROM rollups"));
604        assert_eq!(metric.window_ms, Some(5 * 60_000));
605        assert_eq!(metric.time_field.as_deref(), Some("observed_at"));
606
607        let query = parse_query("ALTER METRIC Svc.Latency.P99 SET PATH svc.latency.p95").unwrap();
608        let QueryExpr::AlterMetric(alter) = query else {
609            panic!("expected alter metric");
610        };
611        assert_eq!(alter.path, "svc.latency.p99");
612        assert_eq!(alter.set_role, None);
613        assert_eq!(alter.attempted_kind, None);
614        assert_eq!(alter.attempted_path.as_deref(), Some("svc.latency.p95"));
615
616        let query =
617            parse_query("CREATE SLO Api.Availability ON Svc.Latency.P99 TARGET 0.999 WINDOW 28 d")
618                .unwrap();
619        let QueryExpr::CreateSlo(slo) = query else {
620            panic!("expected create slo");
621        };
622        assert_eq!(slo.path, "api.availability");
623        assert_eq!(slo.metric_path, "svc.latency.p99");
624        assert!((slo.target - 0.999).abs() < f64::EPSILON);
625        assert_eq!(slo.window_ms, 28 * 86_400_000);
626    }
627
628    #[test]
629    fn create_hypertable_and_drop_timeseries_parse_variants() {
630        let query = parse_query(
631            "CREATE HYPERTABLE IF NOT EXISTS events TIME_COLUMN ts COLUMNAR \
632             CHUNK_INTERVAL '30m' TTL '10s' RETENTION 1 h",
633        )
634        .unwrap();
635        let QueryExpr::CreateTimeSeries(timeseries) = query else {
636            panic!("expected hypertable as timeseries");
637        };
638        let hypertable = timeseries.hypertable.expect("hypertable ddl");
639        assert_eq!(timeseries.name, "events");
640        assert!(timeseries.if_not_exists);
641        assert!(timeseries.columnar);
642        assert_eq!(timeseries.retention_ms, Some(3_600_000));
643        assert_eq!(hypertable.time_column, "ts");
644        assert_eq!(hypertable.chunk_interval_ns, 30 * 60 * 1_000_000_000);
645        assert_eq!(hypertable.default_ttl_ns, Some(10 * 1_000_000_000));
646
647        let query = parse_query("DROP TIMESERIES IF EXISTS tenant.metrics.*").unwrap();
648        assert!(matches!(
649            query,
650            QueryExpr::DropTimeSeries(drop) if drop.name == "tenant.metrics.*" && drop.if_exists
651        ));
652    }
653
654    #[test]
655    fn timeseries_metric_and_slo_errors_are_reported() {
656        for sql in [
657            "CREATE TIMESERIES events WITH RETENTION 1 d",
658            "CREATE METRICS telemetry TENANT (ctx.tenant)",
659            "CREATE METRIC svc.latency TYPE gauge",
660            "CREATE METRIC svc.latency TYPE gauge ROLE sli QUERY 42",
661            "ALTER METRIC svc.latency ROLE sli",
662            "CREATE SLO api.availability ON svc.latency WINDOW 1 h",
663            "CREATE HYPERTABLE events TIME_COLUMN ts CHUNK_INTERVAL 'not-duration'",
664        ] {
665            assert!(parse_query(sql).is_err(), "{sql} should not parse");
666        }
667    }
668}