1use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
7 QueryExpr,
8};
9use crate::lexer::Token;
10use reddb_types::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut retention_ms = None;
19 let mut chunk_size = None;
20 let mut downsample_policies = Vec::new();
21 let mut session_key: Option<String> = None;
22 let mut session_gap_ms: Option<u64> = None;
23 let mut columnar = false;
24
25 loop {
27 if self.consume(&Token::Retention)? {
28 let value = self.parse_float()?;
29 let unit = self.parse_duration_unit()?;
30 retention_ms = Some((value * unit) as u64);
31 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32 chunk_size = Some(self.parse_integer()? as usize);
33 } else if self.consume_ident_ci("COLUMNAR")? {
34 columnar = true;
36 } else if self.consume_ident_ci("DOWNSAMPLE")? {
37 downsample_policies.push(self.parse_downsample_policy_spec()?);
38 while self.consume(&Token::Comma)? {
39 downsample_policies.push(self.parse_downsample_policy_spec()?);
40 }
41 } else if self.consume(&Token::With)? {
42 self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48 } else {
49 break;
50 }
51 }
52
53 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54 name,
55 retention_ms,
56 chunk_size,
57 downsample_policies,
58 if_not_exists,
59 hypertable: None,
60 session_key,
61 session_gap_ms,
62 columnar,
63 }))
64 }
65
66 fn parse_with_session_clause(
71 &mut self,
72 session_key: &mut Option<String>,
73 session_gap_ms: &mut Option<u64>,
74 ) -> Result<(), ParseError> {
75 if !self.consume_ident_ci("SESSION_KEY")? {
76 return Err(ParseError::new(
77 "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78 self.position(),
79 ));
80 }
81 let key = self.expect_ident()?;
82 if !self.consume_ident_ci("SESSION_GAP")? {
83 return Err(ParseError::new(
84 "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85 self.position(),
86 ));
87 }
88 let value = self.parse_float()?;
89 let unit = self.parse_duration_unit()?;
90 *session_key = Some(key);
91 *session_gap_ms = Some((value * unit) as u64);
92 Ok(())
93 }
94
95 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100 let if_not_exists = self.match_if_not_exists()?;
101 let name = self.expect_ident()?;
102
103 let mut raw_retention_ms = None;
104 let mut tenant_by = None;
105 let mut downsample_policies = Vec::new();
106
107 loop {
108 if self.consume(&Token::Retention)? {
109 let value = self.parse_float()?;
110 let unit = self.parse_duration_unit()?;
111 raw_retention_ms = Some((value * unit) as u64);
112 } else if self.consume_ident_ci("DOWNSAMPLE")? {
113 downsample_policies.push(self.parse_downsample_policy_spec()?);
114 while self.consume(&Token::Comma)? {
115 downsample_policies.push(self.parse_downsample_policy_spec()?);
116 }
117 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118 self.expect(Token::By)?;
119 self.expect(Token::LParen)?;
120 let mut path = self.expect_ident_or_keyword()?;
121 while self.consume(&Token::Dot)? {
122 let next = self.expect_ident_or_keyword()?;
123 path = format!("{path}.{next}");
124 }
125 self.expect(Token::RParen)?;
126 tenant_by = Some(path);
127 } else {
128 break;
129 }
130 }
131
132 Ok(QueryExpr::CreateTable(CreateTableQuery {
133 collection_model: CollectionModel::Metrics,
134 name,
135 columns: Vec::new(),
136 if_not_exists,
137 default_ttl_ms: raw_retention_ms,
138 metrics_rollup_policies: downsample_policies,
139 context_index_fields: Vec::new(),
140 context_index_enabled: false,
141 timestamps: false,
142 partition_by: None,
143 tenant_by,
144 append_only: true,
145 subscriptions: Vec::new(),
146 analytics_config: Vec::new(),
147 vault_own_master_key: false,
148 }))
149 }
150
151 pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
153 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
154 while self.consume(&Token::Dot)? {
155 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
156 path = format!("{path}.{next}");
157 }
158
159 let mut kind = None;
160 let mut role = None;
161 let mut source: Option<String> = None;
162 let mut query: Option<String> = None;
163 let mut window_ms: Option<u64> = None;
164 let mut time_field: Option<String> = None;
165 loop {
166 if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
167 kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
168 } else if self.consume_ident_ci("ROLE")? {
169 role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
170 } else if self.consume_ident_ci("SOURCE")? {
171 source = Some(self.expect_ident_or_keyword()?);
172 } else if self.consume_ident_ci("QUERY")? {
173 let value = self.parse_literal_value()?;
174 match value {
175 reddb_types::types::Value::Text(s) => query = Some(s.to_string()),
176 other => {
177 return Err(ParseError::new(
178 format!("derived metric QUERY expects a string literal, got {other:?}"),
179 self.position(),
180 ));
181 }
182 }
183 } else if self.consume_ident_ci("WINDOW")? {
184 let value = self.parse_float()?;
185 let unit = self.parse_duration_unit()?;
186 window_ms = Some((value * unit) as u64);
187 } else if self.consume_ident_ci("TIME_FIELD")? {
188 time_field = Some(self.expect_ident_or_keyword()?);
189 } else {
190 break;
191 }
192 }
193
194 Ok(QueryExpr::CreateMetric(crate::ast::CreateMetricQuery {
195 path,
196 kind: kind.ok_or_else(|| {
197 ParseError::new(
198 "metric descriptor requires TYPE or KIND".to_string(),
199 self.position(),
200 )
201 })?,
202 role: role.ok_or_else(|| {
203 ParseError::new(
204 "metric descriptor requires ROLE".to_string(),
205 self.position(),
206 )
207 })?,
208 source,
209 query,
210 window_ms,
211 time_field,
212 }))
213 }
214
215 pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
226 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
227 while self.consume(&Token::Dot)? {
228 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
229 path = format!("{path}.{next}");
230 }
231
232 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
233 return Err(ParseError::expected(
234 vec!["SET"],
235 self.peek(),
236 self.position(),
237 ));
238 }
239
240 let mut set_role = None;
241 let mut attempted_kind = None;
242 let mut attempted_path = None;
243
244 if self.consume_ident_ci("ROLE")? {
245 set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
246 } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
247 attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
248 } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
249 let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
250 while self.consume(&Token::Dot)? {
251 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
252 new_path = format!("{new_path}.{next}");
253 }
254 attempted_path = Some(new_path);
255 } else {
256 return Err(ParseError::expected(
257 vec!["ROLE", "KIND", "TYPE", "PATH"],
258 self.peek(),
259 self.position(),
260 ));
261 }
262
263 Ok(QueryExpr::AlterMetric(crate::ast::AlterMetricQuery {
264 path,
265 set_role,
266 attempted_kind,
267 attempted_path,
268 }))
269 }
270
271 pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
284 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
285 while self.consume(&Token::Dot)? {
286 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
287 path = format!("{path}.{next}");
288 }
289
290 if !self.consume(&Token::On)? {
291 return Err(ParseError::expected(
292 vec!["ON"],
293 self.peek(),
294 self.position(),
295 ));
296 }
297
298 let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
299 while self.consume(&Token::Dot)? {
300 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
301 metric_path = format!("{metric_path}.{next}");
302 }
303
304 let mut target: Option<f64> = None;
305 let mut window_ms: Option<u64> = None;
306
307 loop {
308 if self.consume_ident_ci("TARGET")? {
309 target = Some(self.parse_float()?);
310 } else if self.consume_ident_ci("WINDOW")? {
311 let value = self.parse_float()?;
312 let unit = self.parse_duration_unit()?;
313 window_ms = Some((value * unit) as u64);
314 } else {
315 break;
316 }
317 }
318
319 Ok(QueryExpr::CreateSlo(CreateSloQuery {
320 path,
321 metric_path,
322 target: target.ok_or_else(|| {
323 ParseError::new(
324 "SLO descriptor requires TARGET <number>".to_string(),
325 self.position(),
326 )
327 })?,
328 window_ms: window_ms.ok_or_else(|| {
329 ParseError::new(
330 "SLO descriptor requires WINDOW <duration>".to_string(),
331 self.position(),
332 )
333 })?,
334 }))
335 }
336
337 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
350 let if_not_exists = self.match_if_not_exists()?;
351 let name = self.expect_ident()?;
352
353 let mut time_column: Option<String> = None;
354 let mut chunk_interval_ns: Option<u64> = None;
355 let mut ttl_ns: Option<u64> = None;
356 let mut retention_ms = None;
357 let mut columnar = false;
358
359 loop {
360 if self.consume_ident_ci("TIME_COLUMN")? {
361 time_column = Some(self.expect_ident()?);
362 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
363 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
364 } else if self.consume_ident_ci("COLUMNAR")? {
365 columnar = true;
367 } else if self.consume_ident_ci("TTL")? {
368 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
369 } else if self.consume(&Token::Retention)? {
370 let value = self.parse_float()?;
371 let unit = self.parse_duration_unit()?;
372 retention_ms = Some((value * unit) as u64);
373 } else {
374 break;
375 }
376 }
377
378 let time_column = time_column.ok_or_else(|| {
379 ParseError::new(
380 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
381 self.position(),
382 )
383 })?;
384 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
385 ParseError::new(
386 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
387 self.position(),
388 )
389 })?;
390
391 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
392 name,
393 retention_ms,
394 chunk_size: None,
395 downsample_policies: Vec::new(),
396 if_not_exists,
397 hypertable: Some(HypertableDdl {
398 time_column,
399 chunk_interval_ns,
400 default_ttl_ns: ttl_ns,
401 }),
402 session_key: None,
403 session_gap_ms: None,
404 columnar,
405 }))
406 }
407
408 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
411 let pos = self.position();
412 let value = self.parse_literal_value()?;
413 match value {
414 reddb_types::types::Value::Text(s) => {
415 reddb_types::duration::parse_duration_ns(&s).ok_or_else(|| {
416 ParseError::new(
417 format!("{clause} duration {s:?} is not a valid duration literal"),
422 pos,
423 )
424 })
425 }
426 other => Err(ParseError::new(
427 format!("{clause} expects a string duration literal, got {other:?}"),
428 pos,
429 )),
430 }
431 }
432
433 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
435 let if_exists = self.match_if_exists()?;
436 let name = self.parse_drop_collection_name()?;
437 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
438 name,
439 if_exists,
440 }))
441 }
442
443 pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
445 match self.peek().clone() {
452 Token::Ident(ref unit) => {
453 let mult = match unit.to_ascii_lowercase().as_str() {
454 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
455 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
456 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
457 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
458 "d" | "day" | "days" => 86_400_000.0,
459 other => {
460 return Err(ParseError::new(
461 format!("unknown duration unit {other:?}, expected s/m/h/d"),
466 self.position(),
467 ));
468 }
469 };
470 self.advance()?;
471 Ok(mult)
472 }
473 Token::Min => {
474 self.advance()?;
476 Ok(60_000.0)
477 }
478 Token::Max | Token::Avg => {
479 let kw = self.peek().clone();
483 Err(ParseError::new(
484 format!("unknown duration unit '{}', expected s/m/h/d", kw),
485 self.position(),
486 ))
487 }
488 _ => Ok(1_000.0), }
490 }
491
492 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
493 let target = self.parse_resolution_spec()?;
494 self.expect(Token::Colon)?;
495 let source = self.parse_resolution_spec()?;
496 let aggregation = if self.consume(&Token::Colon)? {
497 self.expect_ident_or_keyword()?.to_ascii_lowercase()
498 } else {
499 "avg".to_string()
500 };
501 Ok(format!("{target}:{source}:{aggregation}"))
502 }
503
504 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
505 match self.peek().clone() {
506 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
507 self.advance()?;
508 Ok(value.to_ascii_lowercase())
509 }
510 Token::Integer(value) => {
511 self.advance()?;
512 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
513 Ok(format!("{value}{unit}"))
514 }
515 Token::Float(value) => {
516 self.advance()?;
517 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
518 let number = if value.fract().abs() < f64::EPSILON {
519 format!("{}", value as i64)
520 } else {
521 value.to_string()
522 };
523 Ok(format!("{number}{unit}"))
524 }
525 other => Err(ParseError::new(
526 format!(
527 "expected duration literal for downsample policy, got {}",
528 other
529 ),
530 self.position(),
531 )),
532 }
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use reddb_types::catalog::CollectionModel;
540
541 fn parse_query(input: &str) -> Result<QueryExpr, ParseError> {
542 crate::parser::parse(input).map(|query| query.query)
543 }
544
545 #[test]
546 fn create_timeseries_accepts_clause_order_defaults_and_columnar() {
547 let query = parse_query(
548 "CREATE TIMESERIES IF NOT EXISTS readings COLUMNAR DOWNSAMPLE 1h:raw \
549 RETENTION 2 h CHUNKSIZE 64",
550 )
551 .unwrap();
552
553 let QueryExpr::CreateTimeSeries(timeseries) = query else {
554 panic!("expected create timeseries");
555 };
556 assert_eq!(timeseries.name, "readings");
557 assert!(timeseries.if_not_exists);
558 assert!(timeseries.columnar);
559 assert_eq!(timeseries.retention_ms, Some(2 * 3_600_000));
560 assert_eq!(timeseries.chunk_size, Some(64));
561 assert_eq!(timeseries.downsample_policies, vec!["1h:raw:avg"]);
562 assert_eq!(timeseries.session_key, None);
563 assert_eq!(timeseries.session_gap_ms, None);
564 assert!(timeseries.hypertable.is_none());
565 }
566
567 #[test]
568 fn create_metrics_sets_collection_defaults_and_optional_clauses() {
569 let query = parse_query(
570 "CREATE METRICS IF NOT EXISTS telemetry RETENTION 30 m \
571 DOWNSAMPLE 5m:raw:max TENANT BY (ctx.tenant)",
572 )
573 .unwrap();
574
575 let QueryExpr::CreateTable(metrics) = query else {
576 panic!("expected metrics collection");
577 };
578 assert_eq!(metrics.collection_model, CollectionModel::Metrics);
579 assert_eq!(metrics.name, "telemetry");
580 assert!(metrics.if_not_exists);
581 assert_eq!(metrics.default_ttl_ms, Some(30 * 60_000));
582 assert_eq!(metrics.metrics_rollup_policies, vec!["5m:raw:max"]);
583 assert_eq!(metrics.tenant_by.as_deref(), Some("ctx.tenant"));
584 assert!(metrics.append_only);
585 assert!(metrics.columns.is_empty());
586 }
587
588 #[test]
589 fn create_metric_alter_metric_and_slo_parse_descriptor_forms() {
590 let query = parse_query(
591 "CREATE METRIC Svc.Latency.P99 TYPE gauge ROLE sli SOURCE rollups \
592 QUERY 'SELECT p99 FROM rollups' WINDOW 5 min TIME_FIELD observed_at",
593 )
594 .unwrap();
595 let QueryExpr::CreateMetric(metric) = query else {
596 panic!("expected create metric");
597 };
598 assert_eq!(metric.path, "svc.latency.p99");
599 assert_eq!(metric.kind, "gauge");
600 assert_eq!(metric.role, "sli");
601 assert_eq!(metric.source.as_deref(), Some("rollups"));
602 assert_eq!(metric.query.as_deref(), Some("SELECT p99 FROM rollups"));
603 assert_eq!(metric.window_ms, Some(5 * 60_000));
604 assert_eq!(metric.time_field.as_deref(), Some("observed_at"));
605
606 let query = parse_query("ALTER METRIC Svc.Latency.P99 SET PATH svc.latency.p95").unwrap();
607 let QueryExpr::AlterMetric(alter) = query else {
608 panic!("expected alter metric");
609 };
610 assert_eq!(alter.path, "svc.latency.p99");
611 assert_eq!(alter.set_role, None);
612 assert_eq!(alter.attempted_kind, None);
613 assert_eq!(alter.attempted_path.as_deref(), Some("svc.latency.p95"));
614
615 let query =
616 parse_query("CREATE SLO Api.Availability ON Svc.Latency.P99 TARGET 0.999 WINDOW 28 d")
617 .unwrap();
618 let QueryExpr::CreateSlo(slo) = query else {
619 panic!("expected create slo");
620 };
621 assert_eq!(slo.path, "api.availability");
622 assert_eq!(slo.metric_path, "svc.latency.p99");
623 assert!((slo.target - 0.999).abs() < f64::EPSILON);
624 assert_eq!(slo.window_ms, 28 * 86_400_000);
625 }
626
627 #[test]
628 fn create_hypertable_and_drop_timeseries_parse_variants() {
629 let query = parse_query(
630 "CREATE HYPERTABLE IF NOT EXISTS events TIME_COLUMN ts COLUMNAR \
631 CHUNK_INTERVAL '30m' TTL '10s' RETENTION 1 h",
632 )
633 .unwrap();
634 let QueryExpr::CreateTimeSeries(timeseries) = query else {
635 panic!("expected hypertable as timeseries");
636 };
637 let hypertable = timeseries.hypertable.expect("hypertable ddl");
638 assert_eq!(timeseries.name, "events");
639 assert!(timeseries.if_not_exists);
640 assert!(timeseries.columnar);
641 assert_eq!(timeseries.retention_ms, Some(3_600_000));
642 assert_eq!(hypertable.time_column, "ts");
643 assert_eq!(hypertable.chunk_interval_ns, 30 * 60 * 1_000_000_000);
644 assert_eq!(hypertable.default_ttl_ns, Some(10 * 1_000_000_000));
645
646 let query = parse_query("DROP TIMESERIES IF EXISTS tenant.metrics.*").unwrap();
647 assert!(matches!(
648 query,
649 QueryExpr::DropTimeSeries(drop) if drop.name == "tenant.metrics.*" && drop.if_exists
650 ));
651 }
652
653 #[test]
654 fn timeseries_metric_and_slo_errors_are_reported() {
655 for sql in [
656 "CREATE TIMESERIES events WITH RETENTION 1 d",
657 "CREATE METRICS telemetry TENANT (ctx.tenant)",
658 "CREATE METRIC svc.latency TYPE gauge",
659 "CREATE METRIC svc.latency TYPE gauge ROLE sli QUERY 42",
660 "ALTER METRIC svc.latency ROLE sli",
661 "CREATE SLO api.availability ON svc.latency WINDOW 1 h",
662 "CREATE HYPERTABLE events TIME_COLUMN ts CHUNK_INTERVAL 'not-duration'",
663 ] {
664 assert!(parse_query(sql).is_err(), "{sql} should not parse");
665 }
666 }
667}