1use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
7 QueryExpr,
8};
9use crate::lexer::Token;
10use reddb_types::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut retention_ms = None;
19 let mut chunk_size = None;
20 let mut downsample_policies = Vec::new();
21 let mut session_key: Option<String> = None;
22 let mut session_gap_ms: Option<u64> = None;
23 let mut columnar = false;
24
25 loop {
27 if self.consume(&Token::Retention)? {
28 let value = self.parse_float()?;
29 let unit = self.parse_duration_unit()?;
30 retention_ms = Some((value * unit) as u64);
31 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32 chunk_size = Some(self.parse_integer()? as usize);
33 } else if self.consume_ident_ci("COLUMNAR")? {
34 columnar = true;
36 } else if self.consume_ident_ci("DOWNSAMPLE")? {
37 downsample_policies.push(self.parse_downsample_policy_spec()?);
38 while self.consume(&Token::Comma)? {
39 downsample_policies.push(self.parse_downsample_policy_spec()?);
40 }
41 } else if self.consume(&Token::With)? {
42 self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48 } else {
49 break;
50 }
51 }
52
53 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54 name,
55 retention_ms,
56 chunk_size,
57 downsample_policies,
58 if_not_exists,
59 hypertable: None,
60 session_key,
61 session_gap_ms,
62 columnar,
63 }))
64 }
65
66 fn parse_with_session_clause(
71 &mut self,
72 session_key: &mut Option<String>,
73 session_gap_ms: &mut Option<u64>,
74 ) -> Result<(), ParseError> {
75 if !self.consume_ident_ci("SESSION_KEY")? {
76 return Err(ParseError::new(
77 "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78 self.position(),
79 ));
80 }
81 let key = self.expect_ident()?;
82 if !self.consume_ident_ci("SESSION_GAP")? {
83 return Err(ParseError::new(
84 "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85 self.position(),
86 ));
87 }
88 let value = self.parse_float()?;
89 let unit = self.parse_duration_unit()?;
90 *session_key = Some(key);
91 *session_gap_ms = Some((value * unit) as u64);
92 Ok(())
93 }
94
95 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100 let if_not_exists = self.match_if_not_exists()?;
101 let name = self.expect_ident()?;
102
103 let mut raw_retention_ms = None;
104 let mut tenant_by = None;
105 let mut downsample_policies = Vec::new();
106
107 loop {
108 if self.consume(&Token::Retention)? {
109 let value = self.parse_float()?;
110 let unit = self.parse_duration_unit()?;
111 raw_retention_ms = Some((value * unit) as u64);
112 } else if self.consume_ident_ci("DOWNSAMPLE")? {
113 downsample_policies.push(self.parse_downsample_policy_spec()?);
114 while self.consume(&Token::Comma)? {
115 downsample_policies.push(self.parse_downsample_policy_spec()?);
116 }
117 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118 self.expect(Token::By)?;
119 self.expect(Token::LParen)?;
120 let mut path = self.expect_ident_or_keyword()?;
121 while self.consume(&Token::Dot)? {
122 let next = self.expect_ident_or_keyword()?;
123 path = format!("{path}.{next}");
124 }
125 self.expect(Token::RParen)?;
126 tenant_by = Some(path);
127 } else {
128 break;
129 }
130 }
131
132 Ok(QueryExpr::CreateTable(CreateTableQuery {
133 collection_model: CollectionModel::Metrics,
134 name,
135 columns: Vec::new(),
136 if_not_exists,
137 default_ttl_ms: raw_retention_ms,
138 metrics_rollup_policies: downsample_policies,
139 context_index_fields: Vec::new(),
140 context_index_enabled: false,
141 timestamps: false,
142 partition_by: None,
143 tenant_by,
144 append_only: true,
145 subscriptions: Vec::new(),
146 analytics_config: Vec::new(),
147 vault_own_master_key: false,
148 ai_policy: None,
149 }))
150 }
151
152 pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
154 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
155 while self.consume(&Token::Dot)? {
156 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
157 path = format!("{path}.{next}");
158 }
159
160 let mut kind = None;
161 let mut role = None;
162 let mut source: Option<String> = None;
163 let mut query: Option<String> = None;
164 let mut window_ms: Option<u64> = None;
165 let mut time_field: Option<String> = None;
166 loop {
167 if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
168 kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
169 } else if self.consume_ident_ci("ROLE")? {
170 role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
171 } else if self.consume_ident_ci("SOURCE")? {
172 source = Some(self.expect_ident_or_keyword()?);
173 } else if self.consume_ident_ci("QUERY")? {
174 let value = self.parse_literal_value()?;
175 match value {
176 reddb_types::types::Value::Text(s) => query = Some(s.to_string()),
177 other => {
178 return Err(ParseError::new(
179 format!("derived metric QUERY expects a string literal, got {other:?}"),
180 self.position(),
181 ));
182 }
183 }
184 } else if self.consume_ident_ci("WINDOW")? {
185 let value = self.parse_float()?;
186 let unit = self.parse_duration_unit()?;
187 window_ms = Some((value * unit) as u64);
188 } else if self.consume_ident_ci("TIME_FIELD")? {
189 time_field = Some(self.expect_ident_or_keyword()?);
190 } else {
191 break;
192 }
193 }
194
195 Ok(QueryExpr::CreateMetric(crate::ast::CreateMetricQuery {
196 path,
197 kind: kind.ok_or_else(|| {
198 ParseError::new(
199 "metric descriptor requires TYPE or KIND".to_string(),
200 self.position(),
201 )
202 })?,
203 role: role.ok_or_else(|| {
204 ParseError::new(
205 "metric descriptor requires ROLE".to_string(),
206 self.position(),
207 )
208 })?,
209 source,
210 query,
211 window_ms,
212 time_field,
213 }))
214 }
215
216 pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
227 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
228 while self.consume(&Token::Dot)? {
229 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
230 path = format!("{path}.{next}");
231 }
232
233 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
234 return Err(ParseError::expected(
235 vec!["SET"],
236 self.peek(),
237 self.position(),
238 ));
239 }
240
241 let mut set_role = None;
242 let mut attempted_kind = None;
243 let mut attempted_path = None;
244
245 if self.consume_ident_ci("ROLE")? {
246 set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
247 } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
248 attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
249 } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
250 let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
251 while self.consume(&Token::Dot)? {
252 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
253 new_path = format!("{new_path}.{next}");
254 }
255 attempted_path = Some(new_path);
256 } else {
257 return Err(ParseError::expected(
258 vec!["ROLE", "KIND", "TYPE", "PATH"],
259 self.peek(),
260 self.position(),
261 ));
262 }
263
264 Ok(QueryExpr::AlterMetric(crate::ast::AlterMetricQuery {
265 path,
266 set_role,
267 attempted_kind,
268 attempted_path,
269 }))
270 }
271
272 pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
285 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
286 while self.consume(&Token::Dot)? {
287 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
288 path = format!("{path}.{next}");
289 }
290
291 if !self.consume(&Token::On)? {
292 return Err(ParseError::expected(
293 vec!["ON"],
294 self.peek(),
295 self.position(),
296 ));
297 }
298
299 let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
300 while self.consume(&Token::Dot)? {
301 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
302 metric_path = format!("{metric_path}.{next}");
303 }
304
305 let mut target: Option<f64> = None;
306 let mut window_ms: Option<u64> = None;
307
308 loop {
309 if self.consume_ident_ci("TARGET")? {
310 target = Some(self.parse_float()?);
311 } else if self.consume_ident_ci("WINDOW")? {
312 let value = self.parse_float()?;
313 let unit = self.parse_duration_unit()?;
314 window_ms = Some((value * unit) as u64);
315 } else {
316 break;
317 }
318 }
319
320 Ok(QueryExpr::CreateSlo(CreateSloQuery {
321 path,
322 metric_path,
323 target: target.ok_or_else(|| {
324 ParseError::new(
325 "SLO descriptor requires TARGET <number>".to_string(),
326 self.position(),
327 )
328 })?,
329 window_ms: window_ms.ok_or_else(|| {
330 ParseError::new(
331 "SLO descriptor requires WINDOW <duration>".to_string(),
332 self.position(),
333 )
334 })?,
335 }))
336 }
337
338 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
351 let if_not_exists = self.match_if_not_exists()?;
352 let name = self.expect_ident()?;
353
354 let mut time_column: Option<String> = None;
355 let mut chunk_interval_ns: Option<u64> = None;
356 let mut ttl_ns: Option<u64> = None;
357 let mut retention_ms = None;
358 let mut columnar = false;
359
360 loop {
361 if self.consume_ident_ci("TIME_COLUMN")? {
362 time_column = Some(self.expect_ident()?);
363 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
364 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
365 } else if self.consume_ident_ci("COLUMNAR")? {
366 columnar = true;
368 } else if self.consume_ident_ci("TTL")? {
369 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
370 } else if self.consume(&Token::Retention)? {
371 let value = self.parse_float()?;
372 let unit = self.parse_duration_unit()?;
373 retention_ms = Some((value * unit) as u64);
374 } else {
375 break;
376 }
377 }
378
379 let time_column = time_column.ok_or_else(|| {
380 ParseError::new(
381 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
382 self.position(),
383 )
384 })?;
385 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
386 ParseError::new(
387 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
388 self.position(),
389 )
390 })?;
391
392 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
393 name,
394 retention_ms,
395 chunk_size: None,
396 downsample_policies: Vec::new(),
397 if_not_exists,
398 hypertable: Some(HypertableDdl {
399 time_column,
400 chunk_interval_ns,
401 default_ttl_ns: ttl_ns,
402 }),
403 session_key: None,
404 session_gap_ms: None,
405 columnar,
406 }))
407 }
408
409 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
412 let pos = self.position();
413 let value = self.parse_literal_value()?;
414 match value {
415 reddb_types::types::Value::Text(s) => {
416 reddb_types::duration::parse_duration_ns(&s).ok_or_else(|| {
417 ParseError::new(
418 format!("{clause} duration {s:?} is not a valid duration literal"),
423 pos,
424 )
425 })
426 }
427 other => Err(ParseError::new(
428 format!("{clause} expects a string duration literal, got {other:?}"),
429 pos,
430 )),
431 }
432 }
433
434 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
436 let if_exists = self.match_if_exists()?;
437 let name = self.parse_drop_collection_name()?;
438 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
439 name,
440 if_exists,
441 }))
442 }
443
444 pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
446 match self.peek().clone() {
453 Token::Ident(ref unit) => {
454 let mult = match unit.to_ascii_lowercase().as_str() {
455 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
456 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
457 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
458 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
459 "d" | "day" | "days" => 86_400_000.0,
460 other => {
461 return Err(ParseError::new(
462 format!("unknown duration unit {other:?}, expected s/m/h/d"),
467 self.position(),
468 ));
469 }
470 };
471 self.advance()?;
472 Ok(mult)
473 }
474 Token::Min => {
475 self.advance()?;
477 Ok(60_000.0)
478 }
479 Token::Max | Token::Avg => {
480 let kw = self.peek().clone();
484 Err(ParseError::new(
485 format!("unknown duration unit '{}', expected s/m/h/d", kw),
486 self.position(),
487 ))
488 }
489 _ => Ok(1_000.0), }
491 }
492
493 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
494 let target = self.parse_resolution_spec()?;
495 self.expect(Token::Colon)?;
496 let source = self.parse_resolution_spec()?;
497 let aggregation = if self.consume(&Token::Colon)? {
498 self.expect_ident_or_keyword()?.to_ascii_lowercase()
499 } else {
500 "avg".to_string()
501 };
502 Ok(format!("{target}:{source}:{aggregation}"))
503 }
504
505 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
506 match self.peek().clone() {
507 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
508 self.advance()?;
509 Ok(value.to_ascii_lowercase())
510 }
511 Token::Integer(value) => {
512 self.advance()?;
513 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
514 Ok(format!("{value}{unit}"))
515 }
516 Token::Float(value) => {
517 self.advance()?;
518 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
519 let number = if value.fract().abs() < f64::EPSILON {
520 format!("{}", value as i64)
521 } else {
522 value.to_string()
523 };
524 Ok(format!("{number}{unit}"))
525 }
526 other => Err(ParseError::new(
527 format!(
528 "expected duration literal for downsample policy, got {}",
529 other
530 ),
531 self.position(),
532 )),
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use reddb_types::catalog::CollectionModel;
541
542 fn parse_query(input: &str) -> Result<QueryExpr, ParseError> {
543 crate::parser::parse(input).map(|query| query.query)
544 }
545
546 #[test]
547 fn create_timeseries_accepts_clause_order_defaults_and_columnar() {
548 let query = parse_query(
549 "CREATE TIMESERIES IF NOT EXISTS readings COLUMNAR DOWNSAMPLE 1h:raw \
550 RETENTION 2 h CHUNKSIZE 64",
551 )
552 .unwrap();
553
554 let QueryExpr::CreateTimeSeries(timeseries) = query else {
555 panic!("expected create timeseries");
556 };
557 assert_eq!(timeseries.name, "readings");
558 assert!(timeseries.if_not_exists);
559 assert!(timeseries.columnar);
560 assert_eq!(timeseries.retention_ms, Some(2 * 3_600_000));
561 assert_eq!(timeseries.chunk_size, Some(64));
562 assert_eq!(timeseries.downsample_policies, vec!["1h:raw:avg"]);
563 assert_eq!(timeseries.session_key, None);
564 assert_eq!(timeseries.session_gap_ms, None);
565 assert!(timeseries.hypertable.is_none());
566 }
567
568 #[test]
569 fn create_metrics_sets_collection_defaults_and_optional_clauses() {
570 let query = parse_query(
571 "CREATE METRICS IF NOT EXISTS telemetry RETENTION 30 m \
572 DOWNSAMPLE 5m:raw:max TENANT BY (ctx.tenant)",
573 )
574 .unwrap();
575
576 let QueryExpr::CreateTable(metrics) = query else {
577 panic!("expected metrics collection");
578 };
579 assert_eq!(metrics.collection_model, CollectionModel::Metrics);
580 assert_eq!(metrics.name, "telemetry");
581 assert!(metrics.if_not_exists);
582 assert_eq!(metrics.default_ttl_ms, Some(30 * 60_000));
583 assert_eq!(metrics.metrics_rollup_policies, vec!["5m:raw:max"]);
584 assert_eq!(metrics.tenant_by.as_deref(), Some("ctx.tenant"));
585 assert!(metrics.append_only);
586 assert!(metrics.columns.is_empty());
587 }
588
589 #[test]
590 fn create_metric_alter_metric_and_slo_parse_descriptor_forms() {
591 let query = parse_query(
592 "CREATE METRIC Svc.Latency.P99 TYPE gauge ROLE sli SOURCE rollups \
593 QUERY 'SELECT p99 FROM rollups' WINDOW 5 min TIME_FIELD observed_at",
594 )
595 .unwrap();
596 let QueryExpr::CreateMetric(metric) = query else {
597 panic!("expected create metric");
598 };
599 assert_eq!(metric.path, "svc.latency.p99");
600 assert_eq!(metric.kind, "gauge");
601 assert_eq!(metric.role, "sli");
602 assert_eq!(metric.source.as_deref(), Some("rollups"));
603 assert_eq!(metric.query.as_deref(), Some("SELECT p99 FROM rollups"));
604 assert_eq!(metric.window_ms, Some(5 * 60_000));
605 assert_eq!(metric.time_field.as_deref(), Some("observed_at"));
606
607 let query = parse_query("ALTER METRIC Svc.Latency.P99 SET PATH svc.latency.p95").unwrap();
608 let QueryExpr::AlterMetric(alter) = query else {
609 panic!("expected alter metric");
610 };
611 assert_eq!(alter.path, "svc.latency.p99");
612 assert_eq!(alter.set_role, None);
613 assert_eq!(alter.attempted_kind, None);
614 assert_eq!(alter.attempted_path.as_deref(), Some("svc.latency.p95"));
615
616 let query =
617 parse_query("CREATE SLO Api.Availability ON Svc.Latency.P99 TARGET 0.999 WINDOW 28 d")
618 .unwrap();
619 let QueryExpr::CreateSlo(slo) = query else {
620 panic!("expected create slo");
621 };
622 assert_eq!(slo.path, "api.availability");
623 assert_eq!(slo.metric_path, "svc.latency.p99");
624 assert!((slo.target - 0.999).abs() < f64::EPSILON);
625 assert_eq!(slo.window_ms, 28 * 86_400_000);
626 }
627
628 #[test]
629 fn create_hypertable_and_drop_timeseries_parse_variants() {
630 let query = parse_query(
631 "CREATE HYPERTABLE IF NOT EXISTS events TIME_COLUMN ts COLUMNAR \
632 CHUNK_INTERVAL '30m' TTL '10s' RETENTION 1 h",
633 )
634 .unwrap();
635 let QueryExpr::CreateTimeSeries(timeseries) = query else {
636 panic!("expected hypertable as timeseries");
637 };
638 let hypertable = timeseries.hypertable.expect("hypertable ddl");
639 assert_eq!(timeseries.name, "events");
640 assert!(timeseries.if_not_exists);
641 assert!(timeseries.columnar);
642 assert_eq!(timeseries.retention_ms, Some(3_600_000));
643 assert_eq!(hypertable.time_column, "ts");
644 assert_eq!(hypertable.chunk_interval_ns, 30 * 60 * 1_000_000_000);
645 assert_eq!(hypertable.default_ttl_ns, Some(10 * 1_000_000_000));
646
647 let query = parse_query("DROP TIMESERIES IF EXISTS tenant.metrics.*").unwrap();
648 assert!(matches!(
649 query,
650 QueryExpr::DropTimeSeries(drop) if drop.name == "tenant.metrics.*" && drop.if_exists
651 ));
652 }
653
654 #[test]
655 fn timeseries_metric_and_slo_errors_are_reported() {
656 for sql in [
657 "CREATE TIMESERIES events WITH RETENTION 1 d",
658 "CREATE METRICS telemetry TENANT (ctx.tenant)",
659 "CREATE METRIC svc.latency TYPE gauge",
660 "CREATE METRIC svc.latency TYPE gauge ROLE sli QUERY 42",
661 "ALTER METRIC svc.latency ROLE sli",
662 "CREATE SLO api.availability ON svc.latency WINDOW 1 h",
663 "CREATE HYPERTABLE events TIME_COLUMN ts CHUNK_INTERVAL 'not-duration'",
664 ] {
665 assert!(parse_query(sql).is_err(), "{sql} should not parse");
666 }
667 }
668}