reddb_server/storage/query/parser/
timeseries.rs1use super::super::ast::{CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr};
4use super::super::lexer::Token;
5use super::error::ParseError;
6use super::Parser;
7
8impl<'a> Parser<'a> {
9 pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
11 let if_not_exists = self.match_if_not_exists()?;
12 let name = self.expect_ident()?;
13
14 let mut retention_ms = None;
15 let mut chunk_size = None;
16 let mut downsample_policies = Vec::new();
17
18 loop {
20 if self.consume(&Token::Retention)? {
21 let value = self.parse_float()?;
22 let unit = self.parse_duration_unit()?;
23 retention_ms = Some((value * unit) as u64);
24 } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
25 chunk_size = Some(self.parse_integer()? as usize);
26 } else if self.consume_ident_ci("DOWNSAMPLE")? {
27 downsample_policies.push(self.parse_downsample_policy_spec()?);
28 while self.consume(&Token::Comma)? {
29 downsample_policies.push(self.parse_downsample_policy_spec()?);
30 }
31 } else {
32 break;
33 }
34 }
35
36 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
37 name,
38 retention_ms,
39 chunk_size,
40 downsample_policies,
41 if_not_exists,
42 hypertable: None,
43 }))
44 }
45
46 pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
59 let if_not_exists = self.match_if_not_exists()?;
60 let name = self.expect_ident()?;
61
62 let mut time_column: Option<String> = None;
63 let mut chunk_interval_ns: Option<u64> = None;
64 let mut ttl_ns: Option<u64> = None;
65 let mut retention_ms = None;
66
67 loop {
68 if self.consume_ident_ci("TIME_COLUMN")? {
69 time_column = Some(self.expect_ident()?);
70 } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
71 chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
72 } else if self.consume_ident_ci("TTL")? {
73 ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
74 } else if self.consume(&Token::Retention)? {
75 let value = self.parse_float()?;
76 let unit = self.parse_duration_unit()?;
77 retention_ms = Some((value * unit) as u64);
78 } else {
79 break;
80 }
81 }
82
83 let time_column = time_column.ok_or_else(|| {
84 ParseError::new(
85 "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
86 self.position(),
87 )
88 })?;
89 let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
90 ParseError::new(
91 "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
92 self.position(),
93 )
94 })?;
95
96 Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
97 name,
98 retention_ms,
99 chunk_size: None,
100 downsample_policies: Vec::new(),
101 if_not_exists,
102 hypertable: Some(HypertableDdl {
103 time_column,
104 chunk_interval_ns,
105 default_ttl_ns: ttl_ns,
106 }),
107 }))
108 }
109
110 fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
113 let pos = self.position();
114 let value = self.parse_literal_value()?;
115 match value {
116 crate::storage::schema::Value::Text(s) => {
117 crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
118 ParseError::new(
119 format!("{clause} duration {s:?} is not a valid duration literal"),
124 pos,
125 )
126 })
127 }
128 other => Err(ParseError::new(
129 format!("{clause} expects a string duration literal, got {other:?}"),
130 pos,
131 )),
132 }
133 }
134
135 pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
137 let if_exists = self.match_if_exists()?;
138 let name = self.parse_drop_collection_name()?;
139 Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
140 name,
141 if_exists,
142 }))
143 }
144
145 fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
147 match self.peek().clone() {
154 Token::Ident(ref unit) => {
155 let mult = match unit.to_ascii_lowercase().as_str() {
156 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
157 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
158 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
159 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
160 "d" | "day" | "days" => 86_400_000.0,
161 other => {
162 return Err(ParseError::new(
163 format!("unknown duration unit {other:?}, expected s/m/h/d"),
168 self.position(),
169 ));
170 }
171 };
172 self.advance()?;
173 Ok(mult)
174 }
175 Token::Min => {
176 self.advance()?;
178 Ok(60_000.0)
179 }
180 Token::Max | Token::Avg => {
181 let kw = self.peek().clone();
185 Err(ParseError::new(
186 format!("unknown duration unit '{}', expected s/m/h/d", kw),
187 self.position(),
188 ))
189 }
190 _ => Ok(1_000.0), }
192 }
193
194 fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
195 let target = self.parse_resolution_spec()?;
196 self.expect(Token::Colon)?;
197 let source = self.parse_resolution_spec()?;
198 let aggregation = if self.consume(&Token::Colon)? {
199 self.expect_ident_or_keyword()?.to_ascii_lowercase()
200 } else {
201 "avg".to_string()
202 };
203 Ok(format!("{target}:{source}:{aggregation}"))
204 }
205
206 fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
207 match self.peek().clone() {
208 Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
209 self.advance()?;
210 Ok(value.to_ascii_lowercase())
211 }
212 Token::Integer(value) => {
213 self.advance()?;
214 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
215 Ok(format!("{value}{unit}"))
216 }
217 Token::Float(value) => {
218 self.advance()?;
219 let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
220 let number = if value.fract().abs() < f64::EPSILON {
221 format!("{}", value as i64)
222 } else {
223 value.to_string()
224 };
225 Ok(format!("{number}{unit}"))
226 }
227 other => Err(ParseError::new(
228 format!(
229 "expected duration literal for downsample policy, got {}",
230 other
231 ),
232 self.position(),
233 )),
234 }
235 }
236}