Skip to main content

reddb_server/storage/query/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::super::ast::{CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr};
4use super::super::lexer::Token;
5use super::error::ParseError;
6use super::Parser;
7
8impl<'a> Parser<'a> {
9    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
10    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
11        let if_not_exists = self.match_if_not_exists()?;
12        let name = self.expect_ident()?;
13
14        let mut retention_ms = None;
15        let mut chunk_size = None;
16        let mut downsample_policies = Vec::new();
17
18        // Parse optional clauses in any order
19        loop {
20            if self.consume(&Token::Retention)? {
21                let value = self.parse_float()?;
22                let unit = self.parse_duration_unit()?;
23                retention_ms = Some((value * unit) as u64);
24            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
25                chunk_size = Some(self.parse_integer()? as usize);
26            } else if self.consume_ident_ci("DOWNSAMPLE")? {
27                downsample_policies.push(self.parse_downsample_policy_spec()?);
28                while self.consume(&Token::Comma)? {
29                    downsample_policies.push(self.parse_downsample_policy_spec()?);
30                }
31            } else {
32                break;
33            }
34        }
35
36        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
37            name,
38            retention_ms,
39            chunk_size,
40            downsample_policies,
41            if_not_exists,
42            hypertable: None,
43        }))
44    }
45
46    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
47    ///
48    ///   CREATE HYPERTABLE metrics
49    ///     TIME_COLUMN ts
50    ///     CHUNK_INTERVAL '1d'
51    ///     [TTL '90d']
52    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
53    ///
54    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
55    /// TIMESERIES`, with the `hypertable` field populated. The
56    /// runtime dispatcher registers the spec on the RedDB-wide
57    /// `HypertableRegistry` alongside creating the collection.
58    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
59        let if_not_exists = self.match_if_not_exists()?;
60        let name = self.expect_ident()?;
61
62        let mut time_column: Option<String> = None;
63        let mut chunk_interval_ns: Option<u64> = None;
64        let mut ttl_ns: Option<u64> = None;
65        let mut retention_ms = None;
66
67        loop {
68            if self.consume_ident_ci("TIME_COLUMN")? {
69                time_column = Some(self.expect_ident()?);
70            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
71                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
72            } else if self.consume_ident_ci("TTL")? {
73                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
74            } else if self.consume(&Token::Retention)? {
75                let value = self.parse_float()?;
76                let unit = self.parse_duration_unit()?;
77                retention_ms = Some((value * unit) as u64);
78            } else {
79                break;
80            }
81        }
82
83        let time_column = time_column.ok_or_else(|| {
84            ParseError::new(
85                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
86                self.position(),
87            )
88        })?;
89        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
90            ParseError::new(
91                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
92                self.position(),
93            )
94        })?;
95
96        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
97            name,
98            retention_ms,
99            chunk_size: None,
100            downsample_policies: Vec::new(),
101            if_not_exists,
102            hypertable: Some(HypertableDdl {
103                time_column,
104                chunk_interval_ns,
105                default_ttl_ns: ttl_ns,
106            }),
107        }))
108    }
109
110    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
111    /// resolve it to nanoseconds using the shared retention grammar.
112    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
113        let pos = self.position();
114        let value = self.parse_literal_value()?;
115        match value {
116            crate::storage::schema::Value::Text(s) => {
117                crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
118                    ParseError::new(
119                        // F-05: `s` is caller-controlled string-literal bytes.
120                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
121                        // before reaching downstream serialization sinks.
122                        // `clause` is a static internal label and stays bare.
123                        format!("{clause} duration {s:?} is not a valid duration literal"),
124                        pos,
125                    )
126                })
127            }
128            other => Err(ParseError::new(
129                format!("{clause} expects a string duration literal, got {other:?}"),
130                pos,
131            )),
132        }
133    }
134
135    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
136    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
137        let if_exists = self.match_if_exists()?;
138        let name = self.parse_drop_collection_name()?;
139        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
140            name,
141            if_exists,
142        }))
143    }
144
145    /// Parse a duration unit and return the multiplier in milliseconds
146    fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
147        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
148        // dedicated tokens, not `Token::Ident`, so they need their
149        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
150        // no canonical duration meaning today but were silently
151        // falling through to the seconds default — surface a clear
152        // error instead.
153        match self.peek().clone() {
154            Token::Ident(ref unit) => {
155                let mult = match unit.to_ascii_lowercase().as_str() {
156                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
157                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
158                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
159                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
160                    "d" | "day" | "days" => 86_400_000.0,
161                    other => {
162                        return Err(ParseError::new(
163                            // F-05: `other` is caller-controlled identifier
164                            // text. Render via `{:?}` so embedded CR/LF/NUL/
165                            // quotes are escaped before the message reaches
166                            // downstream serialization sinks.
167                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
168                            self.position(),
169                        ));
170                    }
171                };
172                self.advance()?;
173                Ok(mult)
174            }
175            Token::Min => {
176                // `MIN` keyword used as the minute alias.
177                self.advance()?;
178                Ok(60_000.0)
179            }
180            Token::Max | Token::Avg => {
181                // These keywords have no duration semantics; reject
182                // explicitly so a stray aggregate keyword does not
183                // silently default to seconds.
184                let kw = self.peek().clone();
185                Err(ParseError::new(
186                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
187                    self.position(),
188                ))
189            }
190            _ => Ok(1_000.0), // default: seconds
191        }
192    }
193
194    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
195        let target = self.parse_resolution_spec()?;
196        self.expect(Token::Colon)?;
197        let source = self.parse_resolution_spec()?;
198        let aggregation = if self.consume(&Token::Colon)? {
199            self.expect_ident_or_keyword()?.to_ascii_lowercase()
200        } else {
201            "avg".to_string()
202        };
203        Ok(format!("{target}:{source}:{aggregation}"))
204    }
205
206    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
207        match self.peek().clone() {
208            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
209                self.advance()?;
210                Ok(value.to_ascii_lowercase())
211            }
212            Token::Integer(value) => {
213                self.advance()?;
214                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
215                Ok(format!("{value}{unit}"))
216            }
217            Token::Float(value) => {
218                self.advance()?;
219                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
220                let number = if value.fract().abs() < f64::EPSILON {
221                    format!("{}", value as i64)
222                } else {
223                    value.to_string()
224                };
225                Ok(format!("{number}{unit}"))
226            }
227            other => Err(ParseError::new(
228                format!(
229                    "expected duration literal for downsample policy, got {}",
230                    other
231                ),
232                self.position(),
233            )),
234        }
235    }
236}