reddb_server/storage/query/parser/
timeseries.rs1use super::super::ast::{
4 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl,
5 QueryExpr,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::catalog::CollectionModel;
11
12impl<'a> Parser<'a> {
13 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
15 let if_not_exists = self.match_if_not_exists()?;
16 let name = self.expect_ident()?;
17
18 let mut retention_ms = None;
19 let mut chunk_size = None;
20 let mut downsample_policies = Vec::new();
21 let mut session_key: Option<String> = None;
22 let mut session_gap_ms: Option<u64> = None;
23
24 loop {
26 if self.consume(&Token::Retention)? {
27 let value = self.parse_float()?;
28 let unit = self.parse_duration_unit()?;
29 retention_ms = Some((value * unit) as u64);
30 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
31 chunk_size = Some(self.parse_integer()? as usize);
32 } else if self.consume_ident_ci("DOWNSAMPLE")? {
33 downsample_policies.push(self.parse_downsample_policy_spec()?);
34 while self.consume(&Token::Comma)? {
35 downsample_policies.push(self.parse_downsample_policy_spec()?);
36 }
37 } else if self.consume(&Token::With)? {
38 self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
44 } else {
45 break;
46 }
47 }
48
49 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
50 name,
51 retention_ms,
52 chunk_size,
53 downsample_policies,
54 if_not_exists,
55 hypertable: None,
56 session_key,
57 session_gap_ms,
58 }))
59 }
60
61 fn parse_with_session_clause(
66 &mut self,
67 session_key: &mut Option<String>,
68 session_gap_ms: &mut Option<u64>,
69 ) -> Result<(), ParseError> {
70 if !self.consume_ident_ci("SESSION_KEY")? {
71 return Err(ParseError::new(
72 "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
73 self.position(),
74 ));
75 }
76 let key = self.expect_ident()?;
77 if !self.consume_ident_ci("SESSION_GAP")? {
78 return Err(ParseError::new(
79 "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
80 self.position(),
81 ));
82 }
83 let value = self.parse_float()?;
84 let unit = self.parse_duration_unit()?;
85 *session_key = Some(key);
86 *session_gap_ms = Some((value * unit) as u64);
87 Ok(())
88 }
89
90 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
95 let if_not_exists = self.match_if_not_exists()?;
96 let name = self.expect_ident()?;
97
98 let mut raw_retention_ms = None;
99 let mut tenant_by = None;
100 let mut downsample_policies = Vec::new();
101
102 loop {
103 if self.consume(&Token::Retention)? {
104 let value = self.parse_float()?;
105 let unit = self.parse_duration_unit()?;
106 raw_retention_ms = Some((value * unit) as u64);
107 } else if self.consume_ident_ci("DOWNSAMPLE")? {
108 downsample_policies.push(self.parse_downsample_policy_spec()?);
109 while self.consume(&Token::Comma)? {
110 downsample_policies.push(self.parse_downsample_policy_spec()?);
111 }
112 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
113 self.expect(Token::By)?;
114 self.expect(Token::LParen)?;
115 let mut path = self.expect_ident_or_keyword()?;
116 while self.consume(&Token::Dot)? {
117 let next = self.expect_ident_or_keyword()?;
118 path = format!("{path}.{next}");
119 }
120 self.expect(Token::RParen)?;
121 tenant_by = Some(path);
122 } else {
123 break;
124 }
125 }
126
127 Ok(QueryExpr::CreateTable(CreateTableQuery {
128 collection_model: CollectionModel::Metrics,
129 name,
130 columns: Vec::new(),
131 if_not_exists,
132 default_ttl_ms: raw_retention_ms,
133 metrics_rollup_policies: downsample_policies,
134 context_index_fields: Vec::new(),
135 context_index_enabled: false,
136 timestamps: false,
137 partition_by: None,
138 tenant_by,
139 append_only: true,
140 subscriptions: Vec::new(),
141 analytics_config: Vec::new(),
142 vault_own_master_key: false,
143 }))
144 }
145
146 pub fn parse_create_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
148 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
149 while self.consume(&Token::Dot)? {
150 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
151 path = format!("{path}.{next}");
152 }
153
154 let mut kind = None;
155 let mut role = None;
156 let mut source: Option<String> = None;
157 let mut query: Option<String> = None;
158 let mut window_ms: Option<u64> = None;
159 let mut time_field: Option<String> = None;
160 loop {
161 if self.consume_ident_ci("TYPE")? || self.consume_ident_ci("KIND")? {
162 kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
163 } else if self.consume_ident_ci("ROLE")? {
164 role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
165 } else if self.consume_ident_ci("SOURCE")? {
166 source = Some(self.expect_ident_or_keyword()?);
167 } else if self.consume_ident_ci("QUERY")? {
168 let value = self.parse_literal_value()?;
169 match value {
170 crate::storage::schema::Value::Text(s) => query = Some(s.to_string()),
171 other => {
172 return Err(ParseError::new(
173 format!("derived metric QUERY expects a string literal, got {other:?}"),
174 self.position(),
175 ));
176 }
177 }
178 } else if self.consume_ident_ci("WINDOW")? {
179 let value = self.parse_float()?;
180 let unit = self.parse_duration_unit()?;
181 window_ms = Some((value * unit) as u64);
182 } else if self.consume_ident_ci("TIME_FIELD")? {
183 time_field = Some(self.expect_ident_or_keyword()?);
184 } else {
185 break;
186 }
187 }
188
189 Ok(QueryExpr::CreateMetric(
190 crate::storage::query::ast::CreateMetricQuery {
191 path,
192 kind: kind.ok_or_else(|| {
193 ParseError::new(
194 "metric descriptor requires TYPE or KIND".to_string(),
195 self.position(),
196 )
197 })?,
198 role: role.ok_or_else(|| {
199 ParseError::new(
200 "metric descriptor requires ROLE".to_string(),
201 self.position(),
202 )
203 })?,
204 source,
205 query,
206 window_ms,
207 time_field,
208 },
209 ))
210 }
211
212 pub fn parse_alter_metric_body(&mut self) -> Result<QueryExpr, ParseError> {
223 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
224 while self.consume(&Token::Dot)? {
225 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
226 path = format!("{path}.{next}");
227 }
228
229 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
230 return Err(ParseError::expected(
231 vec!["SET"],
232 self.peek(),
233 self.position(),
234 ));
235 }
236
237 let mut set_role = None;
238 let mut attempted_kind = None;
239 let mut attempted_path = None;
240
241 if self.consume_ident_ci("ROLE")? {
242 set_role = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
243 } else if self.consume_ident_ci("KIND")? || self.consume_ident_ci("TYPE")? {
244 attempted_kind = Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
245 } else if self.consume(&Token::Path)? || self.consume_ident_ci("PATH")? {
246 let mut new_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
247 while self.consume(&Token::Dot)? {
248 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
249 new_path = format!("{new_path}.{next}");
250 }
251 attempted_path = Some(new_path);
252 } else {
253 return Err(ParseError::expected(
254 vec!["ROLE", "KIND", "TYPE", "PATH"],
255 self.peek(),
256 self.position(),
257 ));
258 }
259
260 Ok(QueryExpr::AlterMetric(
261 crate::storage::query::ast::AlterMetricQuery {
262 path,
263 set_role,
264 attempted_kind,
265 attempted_path,
266 },
267 ))
268 }
269
270 pub fn parse_create_slo_body(&mut self) -> Result<QueryExpr, ParseError> {
283 let mut path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
284 while self.consume(&Token::Dot)? {
285 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
286 path = format!("{path}.{next}");
287 }
288
289 if !self.consume(&Token::On)? {
290 return Err(ParseError::expected(
291 vec!["ON"],
292 self.peek(),
293 self.position(),
294 ));
295 }
296
297 let mut metric_path = self.expect_ident_or_keyword()?.to_ascii_lowercase();
298 while self.consume(&Token::Dot)? {
299 let next = self.expect_ident_or_keyword()?.to_ascii_lowercase();
300 metric_path = format!("{metric_path}.{next}");
301 }
302
303 let mut target: Option<f64> = None;
304 let mut window_ms: Option<u64> = None;
305
306 loop {
307 if self.consume_ident_ci("TARGET")? {
308 target = Some(self.parse_float()?);
309 } else if self.consume_ident_ci("WINDOW")? {
310 let value = self.parse_float()?;
311 let unit = self.parse_duration_unit()?;
312 window_ms = Some((value * unit) as u64);
313 } else {
314 break;
315 }
316 }
317
318 Ok(QueryExpr::CreateSlo(CreateSloQuery {
319 path,
320 metric_path,
321 target: target.ok_or_else(|| {
322 ParseError::new(
323 "SLO descriptor requires TARGET <number>".to_string(),
324 self.position(),
325 )
326 })?,
327 window_ms: window_ms.ok_or_else(|| {
328 ParseError::new(
329 "SLO descriptor requires WINDOW <duration>".to_string(),
330 self.position(),
331 )
332 })?,
333 }))
334 }
335
336 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
349 let if_not_exists = self.match_if_not_exists()?;
350 let name = self.expect_ident()?;
351
352 let mut time_column: Option<String> = None;
353 let mut chunk_interval_ns: Option<u64> = None;
354 let mut ttl_ns: Option<u64> = None;
355 let mut retention_ms = None;
356
357 loop {
358 if self.consume_ident_ci("TIME_COLUMN")? {
359 time_column = Some(self.expect_ident()?);
360 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
361 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
362 } else if self.consume_ident_ci("TTL")? {
363 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
364 } else if self.consume(&Token::Retention)? {
365 let value = self.parse_float()?;
366 let unit = self.parse_duration_unit()?;
367 retention_ms = Some((value * unit) as u64);
368 } else {
369 break;
370 }
371 }
372
373 let time_column = time_column.ok_or_else(|| {
374 ParseError::new(
375 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
376 self.position(),
377 )
378 })?;
379 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
380 ParseError::new(
381 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
382 self.position(),
383 )
384 })?;
385
386 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
387 name,
388 retention_ms,
389 chunk_size: None,
390 downsample_policies: Vec::new(),
391 if_not_exists,
392 hypertable: Some(HypertableDdl {
393 time_column,
394 chunk_interval_ns,
395 default_ttl_ns: ttl_ns,
396 }),
397 session_key: None,
398 session_gap_ms: None,
399 }))
400 }
401
402 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
405 let pos = self.position();
406 let value = self.parse_literal_value()?;
407 match value {
408 crate::storage::schema::Value::Text(s) => {
409 crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
410 ParseError::new(
411 format!("{clause} duration {s:?} is not a valid duration literal"),
416 pos,
417 )
418 })
419 }
420 other => Err(ParseError::new(
421 format!("{clause} expects a string duration literal, got {other:?}"),
422 pos,
423 )),
424 }
425 }
426
427 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
429 let if_exists = self.match_if_exists()?;
430 let name = self.parse_drop_collection_name()?;
431 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
432 name,
433 if_exists,
434 }))
435 }
436
437 pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
439 match self.peek().clone() {
446 Token::Ident(ref unit) => {
447 let mult = match unit.to_ascii_lowercase().as_str() {
448 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
449 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
450 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
451 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
452 "d" | "day" | "days" => 86_400_000.0,
453 other => {
454 return Err(ParseError::new(
455 format!("unknown duration unit {other:?}, expected s/m/h/d"),
460 self.position(),
461 ));
462 }
463 };
464 self.advance()?;
465 Ok(mult)
466 }
467 Token::Min => {
468 self.advance()?;
470 Ok(60_000.0)
471 }
472 Token::Max | Token::Avg => {
473 let kw = self.peek().clone();
477 Err(ParseError::new(
478 format!("unknown duration unit '{}', expected s/m/h/d", kw),
479 self.position(),
480 ))
481 }
482 _ => Ok(1_000.0), }
484 }
485
486 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
487 let target = self.parse_resolution_spec()?;
488 self.expect(Token::Colon)?;
489 let source = self.parse_resolution_spec()?;
490 let aggregation = if self.consume(&Token::Colon)? {
491 self.expect_ident_or_keyword()?.to_ascii_lowercase()
492 } else {
493 "avg".to_string()
494 };
495 Ok(format!("{target}:{source}:{aggregation}"))
496 }
497
498 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
499 match self.peek().clone() {
500 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
501 self.advance()?;
502 Ok(value.to_ascii_lowercase())
503 }
504 Token::Integer(value) => {
505 self.advance()?;
506 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
507 Ok(format!("{value}{unit}"))
508 }
509 Token::Float(value) => {
510 self.advance()?;
511 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
512 let number = if value.fract().abs() < f64::EPSILON {
513 format!("{}", value as i64)
514 } else {
515 value.to_string()
516 };
517 Ok(format!("{number}{unit}"))
518 }
519 other => Err(ParseError::new(
520 format!(
521 "expected duration literal for downsample policy, got {}",
522 other
523 ),
524 self.position(),
525 )),
526 }
527 }
528}