reddb_server/storage/query/parser/
timeseries.rs1use super::super::ast::{
4 CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr,
5};
6use super::super::lexer::Token;
7use super::error::ParseError;
8use super::Parser;
9use crate::catalog::CollectionModel;
10
11impl<'a> Parser<'a> {
12 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
14 let if_not_exists = self.match_if_not_exists()?;
15 let name = self.expect_ident()?;
16
17 let mut retention_ms = None;
18 let mut chunk_size = None;
19 let mut downsample_policies = Vec::new();
20 let mut session_key: Option<String> = None;
21 let mut session_gap_ms: Option<u64> = None;
22
23 loop {
25 if self.consume(&Token::Retention)? {
26 let value = self.parse_float()?;
27 let unit = self.parse_duration_unit()?;
28 retention_ms = Some((value * unit) as u64);
29 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
30 chunk_size = Some(self.parse_integer()? as usize);
31 } else if self.consume_ident_ci("DOWNSAMPLE")? {
32 downsample_policies.push(self.parse_downsample_policy_spec()?);
33 while self.consume(&Token::Comma)? {
34 downsample_policies.push(self.parse_downsample_policy_spec()?);
35 }
36 } else if self.consume(&Token::With)? {
37 self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
43 } else {
44 break;
45 }
46 }
47
48 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
49 name,
50 retention_ms,
51 chunk_size,
52 downsample_policies,
53 if_not_exists,
54 hypertable: None,
55 session_key,
56 session_gap_ms,
57 }))
58 }
59
60 fn parse_with_session_clause(
65 &mut self,
66 session_key: &mut Option<String>,
67 session_gap_ms: &mut Option<u64>,
68 ) -> Result<(), ParseError> {
69 if !self.consume_ident_ci("SESSION_KEY")? {
70 return Err(ParseError::new(
71 "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
72 self.position(),
73 ));
74 }
75 let key = self.expect_ident()?;
76 if !self.consume_ident_ci("SESSION_GAP")? {
77 return Err(ParseError::new(
78 "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
79 self.position(),
80 ));
81 }
82 let value = self.parse_float()?;
83 let unit = self.parse_duration_unit()?;
84 *session_key = Some(key);
85 *session_gap_ms = Some((value * unit) as u64);
86 Ok(())
87 }
88
89 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
94 let if_not_exists = self.match_if_not_exists()?;
95 let name = self.expect_ident()?;
96
97 let mut raw_retention_ms = None;
98 let mut tenant_by = None;
99 let mut downsample_policies = Vec::new();
100
101 loop {
102 if self.consume(&Token::Retention)? {
103 let value = self.parse_float()?;
104 let unit = self.parse_duration_unit()?;
105 raw_retention_ms = Some((value * unit) as u64);
106 } else if self.consume_ident_ci("DOWNSAMPLE")? {
107 downsample_policies.push(self.parse_downsample_policy_spec()?);
108 while self.consume(&Token::Comma)? {
109 downsample_policies.push(self.parse_downsample_policy_spec()?);
110 }
111 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
112 self.expect(Token::By)?;
113 self.expect(Token::LParen)?;
114 let mut path = self.expect_ident_or_keyword()?;
115 while self.consume(&Token::Dot)? {
116 let next = self.expect_ident_or_keyword()?;
117 path = format!("{path}.{next}");
118 }
119 self.expect(Token::RParen)?;
120 tenant_by = Some(path);
121 } else {
122 break;
123 }
124 }
125
126 Ok(QueryExpr::CreateTable(CreateTableQuery {
127 collection_model: CollectionModel::Metrics,
128 name,
129 columns: Vec::new(),
130 if_not_exists,
131 default_ttl_ms: raw_retention_ms,
132 metrics_rollup_policies: downsample_policies,
133 context_index_fields: Vec::new(),
134 context_index_enabled: false,
135 timestamps: false,
136 partition_by: None,
137 tenant_by,
138 append_only: true,
139 subscriptions: Vec::new(),
140 vault_own_master_key: false,
141 }))
142 }
143
144 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
157 let if_not_exists = self.match_if_not_exists()?;
158 let name = self.expect_ident()?;
159
160 let mut time_column: Option<String> = None;
161 let mut chunk_interval_ns: Option<u64> = None;
162 let mut ttl_ns: Option<u64> = None;
163 let mut retention_ms = None;
164
165 loop {
166 if self.consume_ident_ci("TIME_COLUMN")? {
167 time_column = Some(self.expect_ident()?);
168 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
169 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
170 } else if self.consume_ident_ci("TTL")? {
171 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
172 } else if self.consume(&Token::Retention)? {
173 let value = self.parse_float()?;
174 let unit = self.parse_duration_unit()?;
175 retention_ms = Some((value * unit) as u64);
176 } else {
177 break;
178 }
179 }
180
181 let time_column = time_column.ok_or_else(|| {
182 ParseError::new(
183 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
184 self.position(),
185 )
186 })?;
187 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
188 ParseError::new(
189 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
190 self.position(),
191 )
192 })?;
193
194 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
195 name,
196 retention_ms,
197 chunk_size: None,
198 downsample_policies: Vec::new(),
199 if_not_exists,
200 hypertable: Some(HypertableDdl {
201 time_column,
202 chunk_interval_ns,
203 default_ttl_ns: ttl_ns,
204 }),
205 session_key: None,
206 session_gap_ms: None,
207 }))
208 }
209
210 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
213 let pos = self.position();
214 let value = self.parse_literal_value()?;
215 match value {
216 crate::storage::schema::Value::Text(s) => {
217 crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
218 ParseError::new(
219 format!("{clause} duration {s:?} is not a valid duration literal"),
224 pos,
225 )
226 })
227 }
228 other => Err(ParseError::new(
229 format!("{clause} expects a string duration literal, got {other:?}"),
230 pos,
231 )),
232 }
233 }
234
235 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
237 let if_exists = self.match_if_exists()?;
238 let name = self.parse_drop_collection_name()?;
239 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
240 name,
241 if_exists,
242 }))
243 }
244
245 pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
247 match self.peek().clone() {
254 Token::Ident(ref unit) => {
255 let mult = match unit.to_ascii_lowercase().as_str() {
256 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
257 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
258 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
259 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
260 "d" | "day" | "days" => 86_400_000.0,
261 other => {
262 return Err(ParseError::new(
263 format!("unknown duration unit {other:?}, expected s/m/h/d"),
268 self.position(),
269 ));
270 }
271 };
272 self.advance()?;
273 Ok(mult)
274 }
275 Token::Min => {
276 self.advance()?;
278 Ok(60_000.0)
279 }
280 Token::Max | Token::Avg => {
281 let kw = self.peek().clone();
285 Err(ParseError::new(
286 format!("unknown duration unit '{}', expected s/m/h/d", kw),
287 self.position(),
288 ))
289 }
290 _ => Ok(1_000.0), }
292 }
293
294 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
295 let target = self.parse_resolution_spec()?;
296 self.expect(Token::Colon)?;
297 let source = self.parse_resolution_spec()?;
298 let aggregation = if self.consume(&Token::Colon)? {
299 self.expect_ident_or_keyword()?.to_ascii_lowercase()
300 } else {
301 "avg".to_string()
302 };
303 Ok(format!("{target}:{source}:{aggregation}"))
304 }
305
306 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
307 match self.peek().clone() {
308 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
309 self.advance()?;
310 Ok(value.to_ascii_lowercase())
311 }
312 Token::Integer(value) => {
313 self.advance()?;
314 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
315 Ok(format!("{value}{unit}"))
316 }
317 Token::Float(value) => {
318 self.advance()?;
319 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
320 let number = if value.fract().abs() < f64::EPSILON {
321 format!("{}", value as i64)
322 } else {
323 value.to_string()
324 };
325 Ok(format!("{number}{unit}"))
326 }
327 other => Err(ParseError::new(
328 format!(
329 "expected duration literal for downsample policy, got {}",
330 other
331 ),
332 self.position(),
333 )),
334 }
335 }
336}