reddb_server/storage/query/parser/
timeseries.rs1use super::super::ast::{
4 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
5 QueryExpr,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut retention_ms = None;
19 let mut chunk_size = None;
20 let mut downsample_policies = Vec::new();
21 let mut session_key: Option<String> = None;
22 let mut session_gap_ms: Option<u64> = None;
23 let mut columnar = false;
24
25 loop {
27 if self.consume(&Token::Retention)? {
28 let value = self.parse_float()?;
29 let unit = self.parse_duration_unit()?;
30 retention_ms = Some((value * unit) as u64);
31 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
32 chunk_size = Some(self.parse_integer()? as usize);
33 } else if self.consume_ident_ci("COLUMNAR")? {
34 columnar = true;
36 } else if self.consume_ident_ci("DOWNSAMPLE")? {
37 downsample_policies.push(self.parse_downsample_policy_spec()?);
38 while self.consume(&Token::Comma)? {
39 downsample_policies.push(self.parse_downsample_policy_spec()?);
40 }
41 } else if self.consume(&Token::With)? {
42 self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
48 } else {
49 break;
50 }
51 }
52
53 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
54 name,
55 retention_ms,
56 chunk_size,
57 downsample_policies,
58 if_not_exists,
59 hypertable: None,
60 session_key,
61 session_gap_ms,
62 columnar,
63 }))
64 }
65
66 fn parse_with_session_clause(
71 &mut self,
72 session_key: &mut Option<String>,
73 session_gap_ms: &mut Option<u64>,
74 ) -> Result<(), ParseError> {
75 if !self.consume_ident_ci("SESSION_KEY")? {
76 return Err(ParseError::new(
77 "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
78 self.position(),
79 ));
80 }
81 let key = self.expect_ident()?;
82 if !self.consume_ident_ci("SESSION_GAP")? {
83 return Err(ParseError::new(
84 "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
85 self.position(),
86 ));
87 }
88 let value = self.parse_float()?;
89 let unit = self.parse_duration_unit()?;
90 *session_key = Some(key);
91 *session_gap_ms = Some((value * unit) as u64);
92 Ok(())
93 }
94
95 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
100 let if_not_exists = self.match_if_not_exists()?;
101 let name = self.expect_ident()?;
102
103 let mut raw_retention_ms = None;
104 let mut tenant_by = None;
105 let mut downsample_policies = Vec::new();
106
107 loop {
108 if self.consume(&Token::Retention)? {
109 let value = self.parse_float()?;
110 let unit = self.parse_duration_unit()?;
111 raw_retention_ms = Some((value * unit) as u64);
112 } else if self.consume_ident_ci("DOWNSAMPLE")? {
113 downsample_policies.push(self.parse_downsample_policy_spec()?);
114 while self.consume(&Token::Comma)? {
115 downsample_policies.push(self.parse_downsample_policy_spec()?);
116 }
117 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
118 self.expect(Token::By)?;
119 self.expect(Token::LParen)?;
120 let mut path = self.expect_ident_or_keyword()?;
121 while self.consume(&Token::Dot)? {
122 let next = self.expect_ident_or_keyword()?;
123 path = format!("{path}.{next}");
124 }
125 self.expect(Token::RParen)?;
126 tenant_by = Some(path);
127 } else {
128 break;
129 }
130 }
131
132 Ok(QueryExpr::CreateTable(CreateTableQuery {
133 collection_model: CollectionModel::Metrics,
134 name,
135 columns: Vec::new(),
136 if_not_exists,
137 default_ttl_ms: raw_retention_ms,
138 metrics_rollup_policies: downsample_policies,
139 context_index_fields: Vec::new(),
140 context_index_enabled: false,
141 timestamps: false,
142 partition_by: None,
143 tenant_by,
144 append_only: true,
145 subscriptions: Vec::new(),
146 analytics_config: Vec::new(),
147 vault_own_master_key: false,
148 }))
149 }
150
151 pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
153 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
154 while self.consume(&Token::Dot)? {
155 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
156 path = format!("{path}.{next}");
157 }
158
159 let mut kind = None;
160 let mut role = None;
161 let mut source: Option<String> = None;
162 let mut query: Option<String> = None;
163 let mut window_ms: Option<u64> = None;
164 let mut time_field: Option<String> = None;
165 loop {
166 if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
167 kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
168 } else if self.consume_ident_ci("ROLE")? {
169 role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
170 } else if self.consume_ident_ci("SOURCE")? {
171 source = Some(self.expect_ident_or_keyword()?);
172 } else if self.consume_ident_ci("QUERY")? {
173 let value = self.parse_literal_value()?;
174 match value {
175 crate::storage::schema::Value::Text(s) => query = Some(s.to_string()),
176 other => {
177 return Err(ParseError::new(
178 format!("derived metric QUERY expects a string literal, got {other:?}"),
179 self.position(),
180 ));
181 }
182 }
183 } else if self.consume_ident_ci("WINDOW")? {
184 let value = self.parse_float()?;
185 let unit = self.parse_duration_unit()?;
186 window_ms = Some((value * unit) as u64);
187 } else if self.consume_ident_ci("TIME_FIELD")? {
188 time_field = Some(self.expect_ident_or_keyword()?);
189 } else {
190 break;
191 }
192 }
193
194 Ok(QueryExpr::CreateMetric(
195 crate::storage::query::ast::CreateMetricQuery {
196 path,
197 kind: kind.ok_or_else(|| {
198 ParseError::new(
199 "metric descriptor requires TYPE or KIND".to_string(),
200 self.position(),
201 )
202 })?,
203 role: role.ok_or_else(|| {
204 ParseError::new(
205 "metric descriptor requires ROLE".to_string(),
206 self.position(),
207 )
208 })?,
209 source,
210 query,
211 window_ms,
212 time_field,
213 },
214 ))
215 }
216
217 pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
228 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
229 while self.consume(&Token::Dot)? {
230 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
231 path = format!("{path}.{next}");
232 }
233
234 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
235 return Err(ParseError::expected(
236 vec!["SET"],
237 self.peek(),
238 self.position(),
239 ));
240 }
241
242 let mut set_role = None;
243 let mut attempted_kind = None;
244 let mut attempted_path = None;
245
246 if self.consume_ident_ci("ROLE")? {
247 set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
248 } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
249 attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
250 } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
251 let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
252 while self.consume(&Token::Dot)? {
253 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
254 new_path = format!("{new_path}.{next}");
255 }
256 attempted_path = Some(new_path);
257 } else {
258 return Err(ParseError::expected(
259 vec!["ROLE", "KIND", "TYPE", "PATH"],
260 self.peek(),
261 self.position(),
262 ));
263 }
264
265 Ok(QueryExpr::AlterMetric(
266 crate::storage::query::ast::AlterMetricQuery {
267 path,
268 set_role,
269 attempted_kind,
270 attempted_path,
271 },
272 ))
273 }
274
275 pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
288 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
289 while self.consume(&Token::Dot)? {
290 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
291 path = format!("{path}.{next}");
292 }
293
294 if !self.consume(&Token::On)? {
295 return Err(ParseError::expected(
296 vec!["ON"],
297 self.peek(),
298 self.position(),
299 ));
300 }
301
302 let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
303 while self.consume(&Token::Dot)? {
304 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
305 metric_path = format!("{metric_path}.{next}");
306 }
307
308 let mut target: Option<f64> = None;
309 let mut window_ms: Option<u64> = None;
310
311 loop {
312 if self.consume_ident_ci("TARGET")? {
313 target = Some(self.parse_float()?);
314 } else if self.consume_ident_ci("WINDOW")? {
315 let value = self.parse_float()?;
316 let unit = self.parse_duration_unit()?;
317 window_ms = Some((value * unit) as u64);
318 } else {
319 break;
320 }
321 }
322
323 Ok(QueryExpr::CreateSlo(CreateSloQuery {
324 path,
325 metric_path,
326 target: target.ok_or_else(|| {
327 ParseError::new(
328 "SLO descriptor requires TARGET <number>".to_string(),
329 self.position(),
330 )
331 })?,
332 window_ms: window_ms.ok_or_else(|| {
333 ParseError::new(
334 "SLO descriptor requires WINDOW <duration>".to_string(),
335 self.position(),
336 )
337 })?,
338 }))
339 }
340
341 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
354 let if_not_exists = self.match_if_not_exists()?;
355 let name = self.expect_ident()?;
356
357 let mut time_column: Option<String> = None;
358 let mut chunk_interval_ns: Option<u64> = None;
359 let mut ttl_ns: Option<u64> = None;
360 let mut retention_ms = None;
361 let mut columnar = false;
362
363 loop {
364 if self.consume_ident_ci("TIME_COLUMN")? {
365 time_column = Some(self.expect_ident()?);
366 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
367 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
368 } else if self.consume_ident_ci("COLUMNAR")? {
369 columnar = true;
371 } else if self.consume_ident_ci("TTL")? {
372 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
373 } else if self.consume(&Token::Retention)? {
374 let value = self.parse_float()?;
375 let unit = self.parse_duration_unit()?;
376 retention_ms = Some((value * unit) as u64);
377 } else {
378 break;
379 }
380 }
381
382 let time_column = time_column.ok_or_else(|| {
383 ParseError::new(
384 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
385 self.position(),
386 )
387 })?;
388 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
389 ParseError::new(
390 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
391 self.position(),
392 )
393 })?;
394
395 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
396 name,
397 retention_ms,
398 chunk_size: None,
399 downsample_policies: Vec::new(),
400 if_not_exists,
401 hypertable: Some(HypertableDdl {
402 time_column,
403 chunk_interval_ns,
404 default_ttl_ns: ttl_ns,
405 }),
406 session_key: None,
407 session_gap_ms: None,
408 columnar,
409 }))
410 }
411
412 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
415 let pos = self.position();
416 let value = self.parse_literal_value()?;
417 match value {
418 crate::storage::schema::Value::Text(s) => {
419 crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
420 ParseError::new(
421 format!("{clause} duration {s:?} is not a valid duration literal"),
426 pos,
427 )
428 })
429 }
430 other => Err(ParseError::new(
431 format!("{clause} expects a string duration literal, got {other:?}"),
432 pos,
433 )),
434 }
435 }
436
437 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
439 let if_exists = self.match_if_exists()?;
440 let name = self.parse_drop_collection_name()?;
441 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
442 name,
443 if_exists,
444 }))
445 }
446
447 pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
449 match self.peek().clone() {
456 Token::Ident(ref unit) => {
457 let mult = match unit.to_ascii_lowercase().as_str() {
458 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
459 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
460 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
461 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
462 "d" | "day" | "days" => 86_400_000.0,
463 other => {
464 return Err(ParseError::new(
465 format!("unknown duration unit {other:?}, expected s/m/h/d"),
470 self.position(),
471 ));
472 }
473 };
474 self.advance()?;
475 Ok(mult)
476 }
477 Token::Min => {
478 self.advance()?;
480 Ok(60_000.0)
481 }
482 Token::Max | Token::Avg => {
483 let kw = self.peek().clone();
487 Err(ParseError::new(
488 format!("unknown duration unit '{}', expected s/m/h/d", kw),
489 self.position(),
490 ))
491 }
492 _ => Ok(1_000.0), }
494 }
495
496 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
497 let target = self.parse_resolution_spec()?;
498 self.expect(Token::Colon)?;
499 let source = self.parse_resolution_spec()?;
500 let aggregation = if self.consume(&Token::Colon)? {
501 self.expect_ident_or_keyword()?.to_ascii_lowercase()
502 } else {
503 "avg".to_string()
504 };
505 Ok(format!("{target}:{source}:{aggregation}"))
506 }
507
508 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
509 match self.peek().clone() {
510 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
511 self.advance()?;
512 Ok(value.to_ascii_lowercase())
513 }
514 Token::Integer(value) => {
515 self.advance()?;
516 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
517 Ok(format!("{value}{unit}"))
518 }
519 Token::Float(value) => {
520 self.advance()?;
521 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
522 let number = if value.fract().abs() < f64::EPSILON {
523 format!("{}", value as i64)
524 } else {
525 value.to_string()
526 };
527 Ok(format!("{number}{unit}"))
528 }
529 other => Err(ParseError::new(
530 format!(
531 "expected duration literal for downsample policy, got {}",
532 other
533 ),
534 self.position(),
535 )),
536 }
537 }
538}