Skip to main content

reddb_server/storage/query/parser/
kv.rs

1//! Parser for KV commands: `KV PUT key = value [EXPIRE n unit] [IF NOT EXISTS]`,
2//! `KV GET key`, `KV DELETE key`, `KV INCR key [BY n] [EXPIRE dur]`,
3//! `KV CAS key EXPECT <val|NULL> SET <val> [EXPIRE dur]`.
4//!
5//! Syntax summary:
6//! ```text
7//! KV PUT  <key> = <value> [EXPIRE <n> [unit]] [IF NOT EXISTS]
8//! KV PUT  <key> = <value> [EXPIRE <n> [unit]] [TAGS [tag, ...]]
9//! KV GET  <key>
10//! KV DELETE <key>
11//! INVALIDATE TAGS [tag, ...] FROM <collection>
12//! KV INCR <key> [BY <n>] [EXPIRE <n> [unit]]
13//! KV DECR <key> [BY <n>] [EXPIRE <n> [unit]]   -- sugar for INCR BY -n
14//! KV CAS  <key> EXPECT <value|NULL> SET <value> [EXPIRE <n> [unit]]
15//! ```
16//!
17//! Key forms:
18//! - Bare:   `name`          → collection = "kv_default", key = "name"
19//! - Dotted: `sessions.abc`  → collection = "sessions", key = "abc"
20
21use super::super::ast::{KvCommand, QueryExpr};
22use super::super::lexer::Token;
23use super::error::ParseError;
24use super::Parser;
25use crate::catalog::CollectionModel;
26
27/// Default collection used when a bare (non-dotted) key is specified.
28pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
29
30impl<'a> Parser<'a> {
31    /// Parse `KV <verb> …` (called after the leading `KV` token is consumed).
32    pub fn parse_kv_command(&mut self) -> Result<QueryExpr, ParseError> {
33        self.expect(Token::Kv)?;
34        self.parse_keyed_command_body(CollectionModel::Kv)
35    }
36
37    /// Parse `VAULT <verb> …` (called before consuming the leading identifier).
38    pub fn parse_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
39        if !self.consume_ident_ci("VAULT")? {
40            return Err(ParseError::expected(
41                vec!["VAULT"],
42                self.peek(),
43                self.position(),
44            ));
45        }
46        self.parse_keyed_command_body(CollectionModel::Vault)
47    }
48
49    fn parse_keyed_command_body(
50        &mut self,
51        model: CollectionModel,
52    ) -> Result<QueryExpr, ParseError> {
53        match self.peek().clone() {
54            Token::Ident(ref name) if name.eq_ignore_ascii_case("PUT") => {
55                self.advance()?;
56                self.parse_kv_put(model)
57            }
58            Token::Ident(ref name) if name.eq_ignore_ascii_case("GET") => {
59                self.advance()?;
60                let (collection, key) = self.parse_kv_key(model)?;
61                Ok(QueryExpr::KvCommand(KvCommand::Get {
62                    model,
63                    collection,
64                    key,
65                }))
66            }
67            Token::Ident(ref name) if name.eq_ignore_ascii_case("UNSEAL") => {
68                self.advance()?;
69                if model != CollectionModel::Vault {
70                    return Err(ParseError::expected(
71                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
72                        self.peek(),
73                        self.position(),
74                    ));
75                }
76                let (collection, key) = self.parse_kv_key(model)?;
77                let version = self.parse_optional_vault_version()?;
78                Ok(QueryExpr::KvCommand(KvCommand::Unseal {
79                    collection,
80                    key,
81                    version,
82                }))
83            }
84            Token::Ident(ref name) if name.eq_ignore_ascii_case("ROTATE") => {
85                self.advance()?;
86                if model != CollectionModel::Vault {
87                    return Err(ParseError::expected(
88                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
89                        self.peek(),
90                        self.position(),
91                    ));
92                }
93                self.parse_vault_rotate_body()
94            }
95            Token::Ident(ref name) if name.eq_ignore_ascii_case("HISTORY") => {
96                self.advance()?;
97                if model != CollectionModel::Vault {
98                    return Err(ParseError::expected(
99                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
100                        self.peek(),
101                        self.position(),
102                    ));
103                }
104                let (collection, key) = self.parse_kv_key(model)?;
105                Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
106            }
107            Token::Purge => {
108                self.advance()?;
109                if model != CollectionModel::Vault {
110                    return Err(ParseError::expected(
111                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
112                        self.peek(),
113                        self.position(),
114                    ));
115                }
116                let (collection, key) = self.parse_kv_key(model)?;
117                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
118            }
119            Token::Ident(ref name) if name.eq_ignore_ascii_case("PURGE") => {
120                self.advance()?;
121                if model != CollectionModel::Vault {
122                    return Err(ParseError::expected(
123                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
124                        self.peek(),
125                        self.position(),
126                    ));
127                }
128                let (collection, key) = self.parse_kv_key(model)?;
129                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
130            }
131            Token::Ident(ref name) if name.eq_ignore_ascii_case("LIST") => {
132                self.advance()?;
133                self.parse_keyed_list(model)
134            }
135            Token::Ident(ref name) if name.eq_ignore_ascii_case("WATCH") => {
136                self.advance()?;
137                self.parse_kv_watch(model)
138            }
139            Token::Delete => {
140                self.advance()?;
141                let (collection, key) = self.parse_kv_key(model)?;
142                Ok(QueryExpr::KvCommand(KvCommand::Delete {
143                    model,
144                    collection,
145                    key,
146                }))
147            }
148            Token::Ident(ref name) if name.eq_ignore_ascii_case("DELETE") => {
149                self.advance()?;
150                let (collection, key) = self.parse_kv_key(model)?;
151                Ok(QueryExpr::KvCommand(KvCommand::Delete {
152                    model,
153                    collection,
154                    key,
155                }))
156            }
157            Token::Ident(ref name) if name.eq_ignore_ascii_case("INCR") => {
158                self.advance()?;
159                self.parse_kv_incr(model, 1)
160            }
161            Token::Ident(ref name) if name.eq_ignore_ascii_case("DECR") => {
162                self.advance()?;
163                self.parse_kv_incr(model, -1)
164            }
165            Token::Ident(ref name) if name.eq_ignore_ascii_case("CAS") => {
166                self.advance()?;
167                self.parse_kv_cas(model)
168            }
169            Token::Ident(ref name) if name.eq_ignore_ascii_case("INVALIDATE") => {
170                self.advance()?;
171                self.parse_kv_invalidate_tags_after_invalidate()
172            }
173            _ => Err(ParseError::expected(
174                if model == CollectionModel::Vault {
175                    vec![
176                        "PUT", "GET", "UNSEAL", "ROTATE", "HISTORY", "LIST", "WATCH", "DELETE",
177                        "PURGE", "INCR", "DECR", "CAS",
178                    ]
179                } else {
180                    vec![
181                        "PUT",
182                        "GET",
183                        "WATCH",
184                        "DELETE",
185                        "INCR",
186                        "DECR",
187                        "CAS",
188                        "INVALIDATE",
189                    ]
190                },
191                self.peek(),
192                self.position(),
193            )),
194        }
195    }
196
197    pub(crate) fn parse_vault_list_after_list(&mut self) -> Result<QueryExpr, ParseError> {
198        if !self.consume_ident_ci("VAULT")? {
199            return Err(ParseError::expected(
200                vec!["VAULT"],
201                self.peek(),
202                self.position(),
203            ));
204        }
205        self.parse_keyed_list(CollectionModel::Vault)
206    }
207
208    pub(crate) fn parse_vault_watch_after_watch(&mut self) -> Result<QueryExpr, ParseError> {
209        if !self.consume_ident_ci("VAULT")? {
210            return Err(ParseError::expected(
211                vec!["VAULT"],
212                self.peek(),
213                self.position(),
214            ));
215        }
216        self.parse_kv_watch(CollectionModel::Vault)
217    }
218
219    /// Parse `UNSEAL VAULT <collection.key>`.
220    pub fn parse_unseal_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
221        if !self.consume_ident_ci("UNSEAL")? {
222            return Err(ParseError::expected(
223                vec!["UNSEAL"],
224                self.peek(),
225                self.position(),
226            ));
227        }
228        if !self.consume_ident_ci("VAULT")? {
229            return Err(ParseError::expected(
230                vec!["VAULT"],
231                self.peek(),
232                self.position(),
233            ));
234        }
235        let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
236        let version = self.parse_optional_vault_version()?;
237        Ok(QueryExpr::KvCommand(KvCommand::Unseal {
238            collection,
239            key,
240            version,
241        }))
242    }
243
244    /// Parse top-level `ROTATE/HISTORY/DELETE/PURGE VAULT <collection.key>`.
245    pub fn parse_vault_lifecycle_command(&mut self) -> Result<QueryExpr, ParseError> {
246        let operation = if matches!(self.peek(), Token::Purge) {
247            self.advance()?;
248            "PURGE".to_string()
249        } else {
250            self.expect_ident_or_keyword()?.to_ascii_uppercase()
251        };
252        if !self.consume_ident_ci("VAULT")? {
253            return Err(ParseError::expected(
254                vec!["VAULT"],
255                self.peek(),
256                self.position(),
257            ));
258        }
259        match operation.as_str() {
260            "ROTATE" => self.parse_vault_rotate_body(),
261            "HISTORY" => {
262                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
263                Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
264            }
265            "DELETE" => {
266                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
267                Ok(QueryExpr::KvCommand(KvCommand::Delete {
268                    model: CollectionModel::Vault,
269                    collection,
270                    key,
271                }))
272            }
273            "PURGE" => {
274                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
275                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
276            }
277            _ => Err(ParseError::expected(
278                vec!["ROTATE", "HISTORY", "DELETE", "PURGE"],
279                self.peek(),
280                self.position(),
281            )),
282        }
283    }
284
285    fn parse_vault_rotate_body(&mut self) -> Result<QueryExpr, ParseError> {
286        let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
287        self.expect(Token::Eq)?;
288        let value = self.parse_value()?;
289        let tags = if self.consume_ident_ci("TAGS")? {
290            self.parse_kv_tag_list()?
291        } else {
292            Vec::new()
293        };
294        Ok(QueryExpr::KvCommand(KvCommand::Rotate {
295            collection,
296            key,
297            value,
298            tags,
299        }))
300    }
301
302    fn parse_optional_vault_version(&mut self) -> Result<Option<i64>, ParseError> {
303        if self.consume_ident_ci("VERSION")? {
304            return Ok(Some(self.parse_float()?.round() as i64));
305        }
306        Ok(None)
307    }
308
309    fn parse_kv_put(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
310        let (collection, key) = self.parse_kv_key(model)?;
311
312        // Expect `=`
313        if !self.consume(&Token::Eq)? {
314            return Err(ParseError::expected(
315                vec!["="],
316                self.peek(),
317                self.position(),
318            ));
319        }
320
321        let value = self.parse_value()?;
322
323        let mut ttl_ms: Option<u64> = None;
324        let mut tags: Vec<String> = Vec::new();
325        let mut if_not_exists = false;
326
327        loop {
328            if self.consume_ident_ci("EXPIRE")? {
329                let n = self.parse_float()?;
330                let unit = self.parse_kv_duration_unit()?;
331                ttl_ms = Some((n * unit) as u64);
332            } else if self.consume_ident_ci("TAGS")? {
333                tags = self.parse_kv_tag_list()?;
334            } else if self.consume(&Token::If)? {
335                // IF NOT EXISTS
336                if !self.consume(&Token::Not)? && !self.consume_ident_ci("NOT")? {
337                    return Err(ParseError::expected(
338                        vec!["NOT"],
339                        self.peek(),
340                        self.position(),
341                    ));
342                }
343                if !self.consume(&Token::Exists)? && !self.consume_ident_ci("EXISTS")? {
344                    return Err(ParseError::expected(
345                        vec!["EXISTS"],
346                        self.peek(),
347                        self.position(),
348                    ));
349                }
350                if_not_exists = true;
351            } else {
352                break;
353            }
354        }
355
356        Ok(QueryExpr::KvCommand(KvCommand::Put {
357            model,
358            collection,
359            key,
360            value,
361            ttl_ms,
362            tags,
363            if_not_exists,
364        }))
365    }
366
367    /// Parse `INVALIDATE TAGS [tag, ...] FROM collection`.
368    pub(crate) fn parse_kv_invalidate_tags_after_invalidate(
369        &mut self,
370    ) -> Result<QueryExpr, ParseError> {
371        if !self.consume_ident_ci("TAGS")? {
372            return Err(ParseError::expected(
373                vec!["TAGS"],
374                self.peek(),
375                self.position(),
376            ));
377        }
378        let tags = self.parse_kv_tag_list()?;
379        if !self.consume(&Token::From)? && !self.consume_ident_ci("FROM")? {
380            return Err(ParseError::expected(
381                vec!["FROM"],
382                self.peek(),
383                self.position(),
384            ));
385        }
386        let collection = self.parse_keyed_collection_name()?;
387        Ok(QueryExpr::KvCommand(KvCommand::InvalidateTags {
388            collection,
389            tags,
390        }))
391    }
392
393    /// Parse a key that may be bare (`name`) or dotted (`collection.key`).
394    /// Returns `(collection, key)`.
395    pub(crate) fn parse_kv_key(
396        &mut self,
397        model: CollectionModel,
398    ) -> Result<(String, String), ParseError> {
399        let first = self.expect_ident()?;
400        if !self.consume(&Token::Dot)? {
401            return Ok((KV_DEFAULT_COLLECTION.to_string(), first));
402        }
403
404        let mut segments = vec![first, self.expect_ident_or_keyword()?];
405        while self.consume(&Token::Dot)? {
406            segments.push(self.expect_ident_or_keyword()?);
407        }
408
409        if model == CollectionModel::Vault {
410            let lower_segments: Vec<String> = segments
411                .iter()
412                .map(|segment| segment.to_ascii_lowercase())
413                .collect();
414            if lower_segments.len() >= 3
415                && lower_segments[0] == "red"
416                && lower_segments[1] == "vault"
417            {
418                return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
419            }
420            if lower_segments.len() >= 3
421                && lower_segments[0] == "red"
422                && lower_segments[1] == "secret"
423            {
424                return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
425            }
426            if lower_segments.len() >= 2 && lower_segments[0] == "secret" {
427                return Ok(("red.vault".to_string(), lower_segments[1..].join(".")));
428            }
429        }
430
431        Ok((segments.remove(0), segments.join(".")))
432    }
433
434    fn parse_keyed_list(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
435        let collection = self.expect_ident_or_keyword()?;
436        let mut prefix = None;
437        let mut limit = None;
438        let mut offset = 0usize;
439        loop {
440            if self.consume_ident_ci("PREFIX")? {
441                prefix = Some(self.expect_ident_or_keyword()?.to_string());
442            } else if self.consume(&Token::Limit)? || self.consume_ident_ci("LIMIT")? {
443                limit = Some(self.parse_float()?.round().max(0.0) as usize);
444            } else if self.consume(&Token::Offset)? || self.consume_ident_ci("OFFSET")? {
445                offset = self.parse_float()?.round().max(0.0) as usize;
446            } else {
447                break;
448            }
449        }
450        Ok(QueryExpr::KvCommand(KvCommand::List {
451            model,
452            collection,
453            prefix,
454            limit,
455            offset,
456        }))
457    }
458
459    pub(crate) fn parse_kv_watch(
460        &mut self,
461        model: CollectionModel,
462    ) -> Result<QueryExpr, ParseError> {
463        let first = self.expect_ident()?;
464        let (collection, key, prefix) = if model != CollectionModel::Kv {
465            let mut collection = first;
466            if self.consume(&Token::Dot)? {
467                let next = self.expect_ident_or_keyword()?;
468                collection = format!("{collection}.{next}");
469            }
470            if self.consume_ident_ci("PREFIX")? {
471                (collection, self.expect_ident_or_keyword()?, true)
472            } else {
473                (collection, self.expect_ident_or_keyword()?, false)
474            }
475        } else if self.consume(&Token::Dot)? {
476            if self.consume(&Token::Star)? {
477                (KV_DEFAULT_COLLECTION.to_string(), first, true)
478            } else {
479                let key = self.expect_ident_or_keyword()?;
480                if self.consume(&Token::Dot)? {
481                    self.expect(Token::Star)?;
482                    (first, key, true)
483                } else {
484                    (first, key, false)
485                }
486            }
487        } else {
488            (KV_DEFAULT_COLLECTION.to_string(), first, false)
489        };
490
491        let from_lsn = if self.consume(&Token::From)? || self.consume_ident_ci("FROM")? {
492            if !self.consume_ident_ci("LSN")? {
493                return Err(ParseError::expected(
494                    vec!["LSN"],
495                    self.peek(),
496                    self.position(),
497                ));
498            }
499            Some(self.parse_float()?.round() as u64)
500        } else {
501            None
502        };
503
504        Ok(QueryExpr::KvCommand(KvCommand::Watch {
505            model,
506            collection,
507            key,
508            prefix,
509            from_lsn,
510        }))
511    }
512
513    fn parse_keyed_collection_name(&mut self) -> Result<String, ParseError> {
514        let mut collection = self.expect_ident_or_keyword()?;
515        if self.consume(&Token::Dot)? {
516            let next = self.expect_ident_or_keyword()?;
517            collection = format!("{collection}.{next}");
518        }
519        Ok(collection)
520    }
521
522    /// Parse `INCR/DECR key [BY n] [EXPIRE dur]`. `sign` is +1 or -1.
523    fn parse_kv_incr(
524        &mut self,
525        model: CollectionModel,
526        sign: i64,
527    ) -> Result<QueryExpr, ParseError> {
528        let (collection, key) = self.parse_kv_key(model)?;
529        let mut by: i64 = sign;
530        let mut ttl_ms: Option<u64> = None;
531
532        loop {
533            if self.consume(&Token::By)? || self.consume_ident_ci("BY")? {
534                let n = self.parse_float()?;
535                by = sign * (n.round() as i64).max(1);
536            } else if self.consume_ident_ci("EXPIRE")? {
537                let n = self.parse_float()?;
538                let unit = self.parse_kv_duration_unit()?;
539                ttl_ms = Some((n * unit) as u64);
540            } else {
541                break;
542            }
543        }
544
545        Ok(QueryExpr::KvCommand(KvCommand::Incr {
546            model,
547            collection,
548            key,
549            by,
550            ttl_ms,
551        }))
552    }
553
554    pub(crate) fn parse_kv_tag_list(&mut self) -> Result<Vec<String>, ParseError> {
555        self.expect(Token::LBracket)?;
556        let mut tags = Vec::new();
557        while !self.check(&Token::RBracket) {
558            let tag = self.parse_kv_tag()?;
559            if !tag.is_empty() {
560                tags.push(tag);
561            }
562            if !self.consume(&Token::Comma)? {
563                break;
564            }
565        }
566        self.expect(Token::RBracket)?;
567        Ok(tags)
568    }
569
570    fn parse_kv_tag(&mut self) -> Result<String, ParseError> {
571        let mut tag = String::new();
572        loop {
573            match self.peek().clone() {
574                Token::Comma | Token::RBracket | Token::Eof => break,
575                Token::Ident(part) | Token::String(part) => {
576                    self.advance()?;
577                    tag.push_str(&part);
578                }
579                Token::Integer(n) => {
580                    self.advance()?;
581                    tag.push_str(&n.to_string());
582                }
583                Token::Float(n) => {
584                    self.advance()?;
585                    tag.push_str(&n.to_string());
586                }
587                Token::Colon => {
588                    self.advance()?;
589                    tag.push(':');
590                }
591                Token::Dot => {
592                    self.advance()?;
593                    tag.push('.');
594                }
595                Token::Dash => {
596                    self.advance()?;
597                    tag.push('-');
598                }
599                other => {
600                    return Err(ParseError::expected(vec!["tag"], &other, self.position()));
601                }
602            }
603        }
604        Ok(tag)
605    }
606
607    /// Parse `KV CAS key EXPECT <val|NULL> SET <val> [EXPIRE dur]`.
608    fn parse_kv_cas(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
609        let (collection, key) = self.parse_kv_key(model)?;
610
611        // EXPECT <value | NULL>
612        if !self.consume_ident_ci("EXPECT")? {
613            return Err(ParseError::expected(
614                vec!["EXPECT"],
615                self.peek(),
616                self.position(),
617            ));
618        }
619        let expected = if matches!(self.peek(), Token::Null) {
620            self.advance()?;
621            None
622        } else {
623            Some(self.parse_value()?)
624        };
625
626        // SET <value>
627        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
628            return Err(ParseError::expected(
629                vec!["SET"],
630                self.peek(),
631                self.position(),
632            ));
633        }
634        let new_value = self.parse_value()?;
635
636        // Optional EXPIRE
637        let mut ttl_ms: Option<u64> = None;
638        if self.consume_ident_ci("EXPIRE")? {
639            let n = self.parse_float()?;
640            let unit = self.parse_kv_duration_unit()?;
641            ttl_ms = Some((n * unit) as u64);
642        }
643
644        Ok(QueryExpr::KvCommand(KvCommand::Cas {
645            model,
646            collection,
647            key,
648            expected,
649            new_value,
650            ttl_ms,
651        }))
652    }
653
654    /// Duration unit multiplier to milliseconds, defaulting to seconds.
655    fn parse_kv_duration_unit(&mut self) -> Result<f64, ParseError> {
656        let mult = match self.peek().clone() {
657            Token::Min => 60_000.0,
658            Token::Ident(ref unit) => match unit.to_ascii_lowercase().as_str() {
659                "ms" => 1.0,
660                "s" | "sec" | "secs" => 1_000.0,
661                "m" | "min" | "mins" => 60_000.0,
662                "h" | "hr" | "hrs" => 3_600_000.0,
663                "d" | "day" | "days" => 86_400_000.0,
664                _ => return Ok(1_000.0),
665            },
666            _ => return Ok(1_000.0),
667        };
668        self.advance()?;
669        Ok(mult)
670    }
671}