Skip to main content

reddb_server/storage/query/parser/
timeseries.rs

1//! Parser for CREATE/DROP TIMESERIES
2
3use super::super::ast::{
4    CreateTableQuery, CreateTimeSeriesQuery, DropTimeSeriesQuery, HypertableDdl, QueryExpr,
5};
6use super::super::lexer::Token;
7use super::error::ParseError;
8use super::Parser;
9use crate::catalog::CollectionModel;
10
11impl<'a> Parser<'a> {
12    /// Parse CREATE TIMESERIES body (after CREATE TIMESERIES consumed)
13    pub fn parse_create_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
14        let if_not_exists = self.match_if_not_exists()?;
15        let name = self.expect_ident()?;
16
17        let mut retention_ms = None;
18        let mut chunk_size = None;
19        let mut downsample_policies = Vec::new();
20        let mut session_key: Option<String> = None;
21        let mut session_gap_ms: Option<u64> = None;
22
23        // Parse optional clauses in any order
24        loop {
25            if self.consume(&Token::Retention)? {
26                let value = self.parse_float()?;
27                let unit = self.parse_duration_unit()?;
28                retention_ms = Some((value * unit) as u64);
29            } else if self.consume_ident_ci("CHUNK_SIZE")? || self.consume_ident_ci("CHUNKSIZE")? {
30                chunk_size = Some(self.parse_integer()? as usize);
31            } else if self.consume_ident_ci("DOWNSAMPLE")? {
32                downsample_policies.push(self.parse_downsample_policy_spec()?);
33                while self.consume(&Token::Comma)? {
34                    downsample_policies.push(self.parse_downsample_policy_spec()?);
35                }
36            } else if self.consume(&Token::With)? {
37                // `WITH SESSION_KEY <col> SESSION_GAP <duration>` — both
38                // clauses are paired so the SESSIONIZE operator (slice
39                // 2+) has a complete default. Order is fixed
40                // (SESSION_KEY first) to keep the grammar simple; one
41                // without the other is a parse error.
42                self.parse_with_session_clause(&mut session_key, &mut session_gap_ms)?;
43            } else {
44                break;
45            }
46        }
47
48        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
49            name,
50            retention_ms,
51            chunk_size,
52            downsample_policies,
53            if_not_exists,
54            hypertable: None,
55            session_key,
56            session_gap_ms,
57        }))
58    }
59
60    /// Parse `SESSION_KEY <ident> SESSION_GAP <duration>` after a
61    /// `WITH` token has been consumed. Both clauses are required; a
62    /// SESSION_KEY without a SESSION_GAP (or vice-versa) is rejected
63    /// at parse time so the descriptor never carries half a pairing.
64    fn parse_with_session_clause(
65        &mut self,
66        session_key: &mut Option<String>,
67        session_gap_ms: &mut Option<u64>,
68    ) -> Result<(), ParseError> {
69        if !self.consume_ident_ci("SESSION_KEY")? {
70            return Err(ParseError::new(
71                "expected SESSION_KEY after WITH on CREATE TIMESERIES".to_string(),
72                self.position(),
73            ));
74        }
75        let key = self.expect_ident()?;
76        if !self.consume_ident_ci("SESSION_GAP")? {
77            return Err(ParseError::new(
78                "WITH SESSION_KEY requires a paired SESSION_GAP <duration>".to_string(),
79                self.position(),
80            ));
81        }
82        let value = self.parse_float()?;
83        let unit = self.parse_duration_unit()?;
84        *session_key = Some(key);
85        *session_gap_ms = Some((value * unit) as u64);
86        Ok(())
87    }
88
89    /// Parse CREATE METRICS body (after CREATE METRICS consumed).
90    ///
91    /// v0 intentionally establishes only the collection contract. Ingestion,
92    /// series registry, and Prometheus adapter slices build on this metadata.
93    pub fn parse_create_metrics_body(&mut self) -> Result<QueryExpr, ParseError> {
94        let if_not_exists = self.match_if_not_exists()?;
95        let name = self.expect_ident()?;
96
97        let mut raw_retention_ms = None;
98        let mut tenant_by = None;
99        let mut downsample_policies = Vec::new();
100
101        loop {
102            if self.consume(&Token::Retention)? {
103                let value = self.parse_float()?;
104                let unit = self.parse_duration_unit()?;
105                raw_retention_ms = Some((value * unit) as u64);
106            } else if self.consume_ident_ci("DOWNSAMPLE")? {
107                downsample_policies.push(self.parse_downsample_policy_spec()?);
108                while self.consume(&Token::Comma)? {
109                    downsample_policies.push(self.parse_downsample_policy_spec()?);
110                }
111            } else if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
112                self.expect(Token::By)?;
113                self.expect(Token::LParen)?;
114                let mut path = self.expect_ident_or_keyword()?;
115                while self.consume(&Token::Dot)? {
116                    let next = self.expect_ident_or_keyword()?;
117                    path = format!("{path}.{next}");
118                }
119                self.expect(Token::RParen)?;
120                tenant_by = Some(path);
121            } else {
122                break;
123            }
124        }
125
126        Ok(QueryExpr::CreateTable(CreateTableQuery {
127            collection_model: CollectionModel::Metrics,
128            name,
129            columns: Vec::new(),
130            if_not_exists,
131            default_ttl_ms: raw_retention_ms,
132            metrics_rollup_policies: downsample_policies,
133            context_index_fields: Vec::new(),
134            context_index_enabled: false,
135            timestamps: false,
136            partition_by: None,
137            tenant_by,
138            append_only: true,
139            subscriptions: Vec::new(),
140            vault_own_master_key: false,
141        }))
142    }
143
144    /// Parse CREATE HYPERTABLE body — TimescaleDB-style.
145    ///
146    ///   CREATE HYPERTABLE metrics
147    ///     TIME_COLUMN ts
148    ///     CHUNK_INTERVAL '1d'
149    ///     [TTL '90d']
150    ///     [RETENTION 90 DAYS]          -- collection-level TTL (ms)
151    ///
152    /// Produces the same `CreateTimeSeriesQuery` AST as `CREATE
153    /// TIMESERIES`, with the `hypertable` field populated. The
154    /// runtime dispatcher registers the spec on the RedDB-wide
155    /// `HypertableRegistry` alongside creating the collection.
156    pub fn parse_create_hypertable_body(&mut self) -> Result<QueryExpr, ParseError> {
157        let if_not_exists = self.match_if_not_exists()?;
158        let name = self.expect_ident()?;
159
160        let mut time_column: Option<String> = None;
161        let mut chunk_interval_ns: Option<u64> = None;
162        let mut ttl_ns: Option<u64> = None;
163        let mut retention_ms = None;
164
165        loop {
166            if self.consume_ident_ci("TIME_COLUMN")? {
167                time_column = Some(self.expect_ident()?);
168            } else if self.consume_ident_ci("CHUNK_INTERVAL")? {
169                chunk_interval_ns = Some(self.parse_duration_ns_literal("CHUNK_INTERVAL")?);
170            } else if self.consume_ident_ci("TTL")? {
171                ttl_ns = Some(self.parse_duration_ns_literal("TTL")?);
172            } else if self.consume(&Token::Retention)? {
173                let value = self.parse_float()?;
174                let unit = self.parse_duration_unit()?;
175                retention_ms = Some((value * unit) as u64);
176            } else {
177                break;
178            }
179        }
180
181        let time_column = time_column.ok_or_else(|| {
182            ParseError::new(
183                "CREATE HYPERTABLE requires TIME_COLUMN <ident>".to_string(),
184                self.position(),
185            )
186        })?;
187        let chunk_interval_ns = chunk_interval_ns.ok_or_else(|| {
188            ParseError::new(
189                "CREATE HYPERTABLE requires CHUNK_INTERVAL '<duration>' (e.g. '1d')".to_string(),
190                self.position(),
191            )
192        })?;
193
194        Ok(QueryExpr::CreateTimeSeries(CreateTimeSeriesQuery {
195            name,
196            retention_ms,
197            chunk_size: None,
198            downsample_policies: Vec::new(),
199            if_not_exists,
200            hypertable: Some(HypertableDdl {
201                time_column,
202                chunk_interval_ns,
203                default_ttl_ns: ttl_ns,
204            }),
205            session_key: None,
206            session_gap_ms: None,
207        }))
208    }
209
210    /// Accept a string-literal duration (`'1d'`, `'5m'`, `'30s'`, …) and
211    /// resolve it to nanoseconds using the shared retention grammar.
212    fn parse_duration_ns_literal(&mut self, clause: &str) -> Result<u64, ParseError> {
213        let pos = self.position();
214        let value = self.parse_literal_value()?;
215        match value {
216            crate::storage::schema::Value::Text(s) => {
217                crate::storage::timeseries::retention::parse_duration_ns(&s).ok_or_else(|| {
218                    ParseError::new(
219                        // F-05: `s` is caller-controlled string-literal bytes.
220                        // Render via `{:?}` so CR/LF/NUL/quotes are escaped
221                        // before reaching downstream serialization sinks.
222                        // `clause` is a static internal label and stays bare.
223                        format!("{clause} duration {s:?} is not a valid duration literal"),
224                        pos,
225                    )
226                })
227            }
228            other => Err(ParseError::new(
229                format!("{clause} expects a string duration literal, got {other:?}"),
230                pos,
231            )),
232        }
233    }
234
235    /// Parse DROP TIMESERIES body (after DROP TIMESERIES consumed)
236    pub fn parse_drop_timeseries_body(&mut self) -> Result<QueryExpr, ParseError> {
237        let if_exists = self.match_if_exists()?;
238        let name = self.parse_drop_collection_name()?;
239        Ok(QueryExpr::DropTimeSeries(DropTimeSeriesQuery {
240            name,
241            if_exists,
242        }))
243    }
244
245    /// Parse a duration unit and return the multiplier in milliseconds
246    pub fn parse_duration_unit(&mut self) -> Result<f64, ParseError> {
247        // Aggregate-function keywords (`MIN`, `MAX`, `AVG`) lex as
248        // dedicated tokens, not `Token::Ident`, so they need their
249        // own arms. `MIN` is the minute alias; `MAX` and `AVG` have
250        // no canonical duration meaning today but were silently
251        // falling through to the seconds default — surface a clear
252        // error instead.
253        match self.peek().clone() {
254            Token::Ident(ref unit) => {
255                let mult = match unit.to_ascii_lowercase().as_str() {
256                    "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
257                    "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
258                    "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
259                    "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
260                    "d" | "day" | "days" => 86_400_000.0,
261                    other => {
262                        return Err(ParseError::new(
263                            // F-05: `other` is caller-controlled identifier
264                            // text. Render via `{:?}` so embedded CR/LF/NUL/
265                            // quotes are escaped before the message reaches
266                            // downstream serialization sinks.
267                            format!("unknown duration unit {other:?}, expected s/m/h/d"),
268                            self.position(),
269                        ));
270                    }
271                };
272                self.advance()?;
273                Ok(mult)
274            }
275            Token::Min => {
276                // `MIN` keyword used as the minute alias.
277                self.advance()?;
278                Ok(60_000.0)
279            }
280            Token::Max | Token::Avg => {
281                // These keywords have no duration semantics; reject
282                // explicitly so a stray aggregate keyword does not
283                // silently default to seconds.
284                let kw = self.peek().clone();
285                Err(ParseError::new(
286                    format!("unknown duration unit '{}', expected s/m/h/d", kw),
287                    self.position(),
288                ))
289            }
290            _ => Ok(1_000.0), // default: seconds
291        }
292    }
293
294    fn parse_downsample_policy_spec(&mut self) -> Result<String, ParseError> {
295        let target = self.parse_resolution_spec()?;
296        self.expect(Token::Colon)?;
297        let source = self.parse_resolution_spec()?;
298        let aggregation = if self.consume(&Token::Colon)? {
299            self.expect_ident_or_keyword()?.to_ascii_lowercase()
300        } else {
301            "avg".to_string()
302        };
303        Ok(format!("{target}:{source}:{aggregation}"))
304    }
305
306    fn parse_resolution_spec(&mut self) -> Result<String, ParseError> {
307        match self.peek().clone() {
308            Token::Ident(value) if value.eq_ignore_ascii_case("raw") => {
309                self.advance()?;
310                Ok(value.to_ascii_lowercase())
311            }
312            Token::Integer(value) => {
313                self.advance()?;
314                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
315                Ok(format!("{value}{unit}"))
316            }
317            Token::Float(value) => {
318                self.advance()?;
319                let unit = self.expect_ident_or_keyword()?.to_ascii_lowercase();
320                let number = if value.fract().abs() < f64::EPSILON {
321                    format!("{}", value as i64)
322                } else {
323                    value.to_string()
324                };
325                Ok(format!("{number}{unit}"))
326            }
327            other => Err(ParseError::new(
328                format!(
329                    "expected duration literal for downsample policy, got {}",
330                    other
331                ),
332                self.position(),
333            )),
334        }
335    }
336}