reddb_server/storage/query/parser/
timeseries.rs1use super::super::ast::{
4 CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr,
5};
6use super::super::lexer::Token;
7use super::error::ParseError;
8use super::Parser;
9use crate::catalog::CollectionModel;
10
11impl<'a> Parser<'a> {
12 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
14 let if_not_exists = self.match_if_not_exists()?;
15 let name = self.expect_ident()?;
16
17 let mut retention_ms = None;
18 let mut chunk_size = None;
19 let mut downsample_policies = Vec::new();
20
21 loop {
23 if self.consume(&Token::Retention)? {
24 let value = self.parse_float()?;
25 let unit = self.parse_duration_unit()?;
26 retention_ms = Some((value * unit) as u64);
27 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
28 chunk_size = Some(self.parse_integer()? as usize);
29 } else if self.consume_ident_ci("DOWNSAMPLE")? {
30 downsample_policies.push(self.parse_downsample_policy_spec()?);
31 while self.consume(&Token::Comma)? {
32 downsample_policies.push(self.parse_downsample_policy_spec()?);
33 }
34 } else {
35 break;
36 }
37 }
38
39 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
40 name,
41 retention_ms,
42 chunk_size,
43 downsample_policies,
44 if_not_exists,
45 hypertable: None,
46 }))
47 }
48
49 pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
54 let if_not_exists = self.match_if_not_exists()?;
55 let name = self.expect_ident()?;
56
57 let mut raw_retention_ms = None;
58 let mut tenant_by = None;
59 let mut downsample_policies = Vec::new();
60
61 loop {
62 if self.consume(&Token::Retention)? {
63 let value = self.parse_float()?;
64 let unit = self.parse_duration_unit()?;
65 raw_retention_ms = Some((value * unit) as u64);
66 } else if self.consume_ident_ci("DOWNSAMPLE")? {
67 downsample_policies.push(self.parse_downsample_policy_spec()?);
68 while self.consume(&Token::Comma)? {
69 downsample_policies.push(self.parse_downsample_policy_spec()?);
70 }
71 } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
72 self.expect(Token::By)?;
73 self.expect(Token::LParen)?;
74 let mut path = self.expect_ident_or_keyword()?;
75 while self.consume(&Token::Dot)? {
76 let next = self.expect_ident_or_keyword()?;
77 path = format!("{path}.{next}");
78 }
79 self.expect(Token::RParen)?;
80 tenant_by = Some(path);
81 } else {
82 break;
83 }
84 }
85
86 Ok(QueryExpr::CreateTable(CreateTableQuery {
87 collection_model: CollectionModel::Metrics,
88 name,
89 columns: Vec::new(),
90 if_not_exists,
91 default_ttl_ms: raw_retention_ms,
92 metrics_rollup_policies: downsample_policies,
93 context_index_fields: Vec::new(),
94 context_index_enabled: false,
95 timestamps: false,
96 partition_by: None,
97 tenant_by,
98 append_only: true,
99 subscriptions: Vec::new(),
100 vault_own_master_key: false,
101 }))
102 }
103
104 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
117 let if_not_exists = self.match_if_not_exists()?;
118 let name = self.expect_ident()?;
119
120 let mut time_column: Option<String> = None;
121 let mut chunk_interval_ns: Option<u64> = None;
122 let mut ttl_ns: Option<u64> = None;
123 let mut retention_ms = None;
124
125 loop {
126 if self.consume_ident_ci("TIME_COLUMN")? {
127 time_column = Some(self.expect_ident()?);
128 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
129 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
130 } else if self.consume_ident_ci("TTL")? {
131 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
132 } else if self.consume(&Token::Retention)? {
133 let value = self.parse_float()?;
134 let unit = self.parse_duration_unit()?;
135 retention_ms = Some((value * unit) as u64);
136 } else {
137 break;
138 }
139 }
140
141 let time_column = time_column.ok_or_else(|| {
142 ParseError::new(
143 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
144 self.position(),
145 )
146 })?;
147 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
148 ParseError::new(
149 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
150 self.position(),
151 )
152 })?;
153
154 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
155 name,
156 retention_ms,
157 chunk_size: None,
158 downsample_policies: Vec::new(),
159 if_not_exists,
160 hypertable: Some(HypertableDdl {
161 time_column,
162 chunk_interval_ns,
163 default_ttl_ns: ttl_ns,
164 }),
165 }))
166 }
167
168 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
171 let pos = self.position();
172 let value = self.parse_literal_value()?;
173 match value {
174 crate::storage::schema::Value::Text(s) => {
175 crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
176 ParseError::new(
177 format!("{clause} duration {s:?} is not a valid duration literal"),
182 pos,
183 )
184 })
185 }
186 other => Err(ParseError::new(
187 format!("{clause} expects a string duration literal, got {other:?}"),
188 pos,
189 )),
190 }
191 }
192
193 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
195 let if_exists = self.match_if_exists()?;
196 let name = self.parse_drop_collection_name()?;
197 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
198 name,
199 if_exists,
200 }))
201 }
202
203 fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
205 match self.peek().clone() {
212 Token::Ident(ref unit) => {
213 let mult = match unit.to_ascii_lowercase().as_str() {
214 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
215 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
216 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
217 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
218 "d" | "day" | "days" => 86_400_000.0,
219 other => {
220 return Err(ParseError::new(
221 format!("unknown duration unit {other:?}, expected s/m/h/d"),
226 self.position(),
227 ));
228 }
229 };
230 self.advance()?;
231 Ok(mult)
232 }
233 Token::Min => {
234 self.advance()?;
236 Ok(60_000.0)
237 }
238 Token::Max | Token::Avg => {
239 let kw = self.peek().clone();
243 Err(ParseError::new(
244 format!("unknown duration unit '{}', expected s/m/h/d", kw),
245 self.position(),
246 ))
247 }
248 _ => Ok(1_000.0), }
250 }
251
252 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
253 let target = self.parse_resolution_spec()?;
254 self.expect(Token::Colon)?;
255 let source = self.parse_resolution_spec()?;
256 let aggregation = if self.consume(&Token::Colon)? {
257 self.expect_ident_or_keyword()?.to_ascii_lowercase()
258 } else {
259 "avg".to_string()
260 };
261 Ok(format!("{target}:{source}:{aggregation}"))
262 }
263
264 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
265 match self.peek().clone() {
266 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
267 self.advance()?;
268 Ok(value.to_ascii_lowercase())
269 }
270 Token::Integer(value) => {
271 self.advance()?;
272 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
273 Ok(format!("{value}{unit}"))
274 }
275 Token::Float(value) => {
276 self.advance()?;
277 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
278 let number = if value.fract().abs() < f64::EPSILON {
279 format!("{}", value as i64)
280 } else {
281 value.to_string()
282 };
283 Ok(format!("{number}{unit}"))
284 }
285 other => Err(ParseError::new(
286 format!(
287 "expected duration literal for downsample policy, got {}",
288 other
289 ),
290 self.position(),
291 )),
292 }
293 }
294}