Skip to main content

reddb_server/storage/query/parser/
kv.rs

1//! Parser for KV commands: `KV PUT key = value [EXPIRE n unit] [IF NOT EXISTS]`,
2//! `KV GET key`, `KV DELETE key`, `KV INCR key [BY n] [EXPIRE dur]`,
3//! `KV CAS key EXPECT <val|NULL> SET <val> [EXPIRE dur]`.
4//!
5//! Syntax summary:
6//! ```text
7//! KV PUT  <key> = <value> [EXPIRE <n> [unit]] [IF NOT EXISTS]
8//! KV PUT  <key> = <value> [EXPIRE <n> [unit]] [TAGS [tag, ...]]
9//! KV GET  <key>
10//! KV DELETE <key>
11//! INVALIDATE TAGS [tag, ...] FROM <collection>
12//! KV INCR <key> [BY <n>] [EXPIRE <n> [unit]]
13//! KV DECR <key> [BY <n>] [EXPIRE <n> [unit]]   -- sugar for INCR BY -n
14//! KV CAS  <key> EXPECT <value|NULL> SET <value> [EXPIRE <n> [unit]]
15//! ```
16//!
17//! Key forms:
18//! - Bare:   `name`          → collection = "kv_default", key = "name"
19//! - Dotted: `sessions.abc`  → collection = "sessions", key = "abc"
20//! - Quoted: `'a:b'` or `sessions.'a:b'` for keys with special characters
21
22use super::super::ast::{KvCommand, QueryExpr};
23use super::super::lexer::Token;
24use super::error::ParseError;
25use super::Parser;
26use crate::catalog::CollectionModel;
27
28/// Default collection used when a bare (non-dotted) key is specified.
29pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
30
31impl<'a> Parser<'a> {
32    /// Parse `KV <verb> …` (called after the leading `KV` token is consumed).
33    pub fn parse_kv_command(&mut self) -> Result<QueryExpr, ParseError> {
34        self.expect(Token::Kv)?;
35        self.parse_keyed_command_body(CollectionModel::Kv)
36    }
37
38    /// Parse `VAULT <verb> …` (called before consuming the leading identifier).
39    pub fn parse_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
40        if !self.consume_ident_ci("VAULT")? {
41            return Err(ParseError::expected(
42                vec!["VAULT"],
43                self.peek(),
44                self.position(),
45            ));
46        }
47        self.parse_keyed_command_body(CollectionModel::Vault)
48    }
49
50    fn parse_keyed_command_body(
51        &mut self,
52        model: CollectionModel,
53    ) -> Result<QueryExpr, ParseError> {
54        match self.peek().clone() {
55            Token::Ident(ref name) if name.eq_ignore_ascii_case("PUT") => {
56                self.advance()?;
57                self.parse_kv_put(model)
58            }
59            Token::Ident(ref name) if name.eq_ignore_ascii_case("GET") => {
60                self.advance()?;
61                let (collection, key) = self.parse_kv_key(model)?;
62                Ok(QueryExpr::KvCommand(KvCommand::Get {
63                    model,
64                    collection,
65                    key,
66                }))
67            }
68            Token::Ident(ref name) if name.eq_ignore_ascii_case("UNSEAL") => {
69                self.advance()?;
70                if model != CollectionModel::Vault {
71                    return Err(ParseError::expected(
72                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
73                        self.peek(),
74                        self.position(),
75                    ));
76                }
77                let (collection, key) = self.parse_kv_key(model)?;
78                let version = self.parse_optional_vault_version()?;
79                Ok(QueryExpr::KvCommand(KvCommand::Unseal {
80                    collection,
81                    key,
82                    version,
83                }))
84            }
85            Token::Ident(ref name) if name.eq_ignore_ascii_case("ROTATE") => {
86                self.advance()?;
87                if model != CollectionModel::Vault {
88                    return Err(ParseError::expected(
89                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
90                        self.peek(),
91                        self.position(),
92                    ));
93                }
94                self.parse_vault_rotate_body()
95            }
96            Token::Ident(ref name) if name.eq_ignore_ascii_case("HISTORY") => {
97                self.advance()?;
98                if model != CollectionModel::Vault {
99                    return Err(ParseError::expected(
100                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
101                        self.peek(),
102                        self.position(),
103                    ));
104                }
105                let (collection, key) = self.parse_kv_key(model)?;
106                Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
107            }
108            Token::Purge => {
109                self.advance()?;
110                if model != CollectionModel::Vault {
111                    return Err(ParseError::expected(
112                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
113                        self.peek(),
114                        self.position(),
115                    ));
116                }
117                let (collection, key) = self.parse_kv_key(model)?;
118                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
119            }
120            Token::Ident(ref name) if name.eq_ignore_ascii_case("PURGE") => {
121                self.advance()?;
122                if model != CollectionModel::Vault {
123                    return Err(ParseError::expected(
124                        vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
125                        self.peek(),
126                        self.position(),
127                    ));
128                }
129                let (collection, key) = self.parse_kv_key(model)?;
130                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
131            }
132            Token::Ident(ref name) if name.eq_ignore_ascii_case("LIST") => {
133                self.advance()?;
134                self.parse_keyed_list(model)
135            }
136            Token::Ident(ref name) if name.eq_ignore_ascii_case("WATCH") => {
137                self.advance()?;
138                self.parse_kv_watch(model)
139            }
140            Token::Delete => {
141                self.advance()?;
142                let (collection, key) = self.parse_kv_key(model)?;
143                Ok(QueryExpr::KvCommand(KvCommand::Delete {
144                    model,
145                    collection,
146                    key,
147                }))
148            }
149            Token::Ident(ref name) if name.eq_ignore_ascii_case("DELETE") => {
150                self.advance()?;
151                let (collection, key) = self.parse_kv_key(model)?;
152                Ok(QueryExpr::KvCommand(KvCommand::Delete {
153                    model,
154                    collection,
155                    key,
156                }))
157            }
158            Token::Ident(ref name) if name.eq_ignore_ascii_case("INCR") => {
159                self.advance()?;
160                self.parse_kv_incr(model, 1)
161            }
162            Token::Ident(ref name) if name.eq_ignore_ascii_case("DECR") => {
163                self.advance()?;
164                self.parse_kv_incr(model, -1)
165            }
166            Token::Ident(ref name) if name.eq_ignore_ascii_case("CAS") => {
167                self.advance()?;
168                self.parse_kv_cas(model)
169            }
170            Token::Ident(ref name) if name.eq_ignore_ascii_case("INVALIDATE") => {
171                self.advance()?;
172                self.parse_kv_invalidate_tags_after_invalidate()
173            }
174            _ => Err(ParseError::expected(
175                if model == CollectionModel::Vault {
176                    vec![
177                        "PUT", "GET", "UNSEAL", "ROTATE", "HISTORY", "LIST", "WATCH", "DELETE",
178                        "PURGE", "INCR", "DECR", "CAS",
179                    ]
180                } else {
181                    vec![
182                        "PUT",
183                        "GET",
184                        "WATCH",
185                        "DELETE",
186                        "INCR",
187                        "DECR",
188                        "CAS",
189                        "INVALIDATE",
190                    ]
191                },
192                self.peek(),
193                self.position(),
194            )),
195        }
196    }
197
198    pub(crate) fn parse_vault_list_after_list(&mut self) -> Result<QueryExpr, ParseError> {
199        if !self.consume_ident_ci("VAULT")? {
200            return Err(ParseError::expected(
201                vec!["VAULT"],
202                self.peek(),
203                self.position(),
204            ));
205        }
206        self.parse_keyed_list(CollectionModel::Vault)
207    }
208
209    pub(crate) fn parse_vault_watch_after_watch(&mut self) -> Result<QueryExpr, ParseError> {
210        if !self.consume_ident_ci("VAULT")? {
211            return Err(ParseError::expected(
212                vec!["VAULT"],
213                self.peek(),
214                self.position(),
215            ));
216        }
217        self.parse_kv_watch(CollectionModel::Vault)
218    }
219
220    /// Parse `UNSEAL VAULT <collection.key>`.
221    pub fn parse_unseal_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
222        if !self.consume_ident_ci("UNSEAL")? {
223            return Err(ParseError::expected(
224                vec!["UNSEAL"],
225                self.peek(),
226                self.position(),
227            ));
228        }
229        if !self.consume_ident_ci("VAULT")? {
230            return Err(ParseError::expected(
231                vec!["VAULT"],
232                self.peek(),
233                self.position(),
234            ));
235        }
236        let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
237        let version = self.parse_optional_vault_version()?;
238        Ok(QueryExpr::KvCommand(KvCommand::Unseal {
239            collection,
240            key,
241            version,
242        }))
243    }
244
245    /// Parse top-level `ROTATE/HISTORY/DELETE/PURGE VAULT <collection.key>`.
246    pub fn parse_vault_lifecycle_command(&mut self) -> Result<QueryExpr, ParseError> {
247        let operation = if matches!(self.peek(), Token::Purge) {
248            self.advance()?;
249            "PURGE".to_string()
250        } else {
251            self.expect_ident_or_keyword()?.to_ascii_uppercase()
252        };
253        if !self.consume_ident_ci("VAULT")? {
254            return Err(ParseError::expected(
255                vec!["VAULT"],
256                self.peek(),
257                self.position(),
258            ));
259        }
260        match operation.as_str() {
261            "ROTATE" => self.parse_vault_rotate_body(),
262            "HISTORY" => {
263                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
264                Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
265            }
266            "DELETE" => {
267                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
268                Ok(QueryExpr::KvCommand(KvCommand::Delete {
269                    model: CollectionModel::Vault,
270                    collection,
271                    key,
272                }))
273            }
274            "PURGE" => {
275                let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
276                Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
277            }
278            _ => Err(ParseError::expected(
279                vec!["ROTATE", "HISTORY", "DELETE", "PURGE"],
280                self.peek(),
281                self.position(),
282            )),
283        }
284    }
285
286    fn parse_vault_rotate_body(&mut self) -> Result<QueryExpr, ParseError> {
287        let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
288        self.expect(Token::Eq)?;
289        let value = self.parse_value()?;
290        let tags = if self.consume_ident_ci("TAGS")? {
291            self.parse_kv_tag_list()?
292        } else {
293            Vec::new()
294        };
295        Ok(QueryExpr::KvCommand(KvCommand::Rotate {
296            collection,
297            key,
298            value,
299            tags,
300        }))
301    }
302
303    fn parse_optional_vault_version(&mut self) -> Result<Option<i64>, ParseError> {
304        if self.consume_ident_ci("VERSION")? {
305            return Ok(Some(self.parse_float()?.round() as i64));
306        }
307        Ok(None)
308    }
309
310    fn parse_kv_put(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
311        let (collection, key) = self.parse_kv_key(model)?;
312
313        // Expect `=`
314        if !self.consume(&Token::Eq)? {
315            return Err(ParseError::expected(
316                vec!["="],
317                self.peek(),
318                self.position(),
319            ));
320        }
321
322        let value = self.parse_value()?;
323
324        let mut ttl_ms: Option<u64> = None;
325        let mut tags: Vec<String> = Vec::new();
326        let mut if_not_exists = false;
327
328        loop {
329            if self.consume_ident_ci("EXPIRE")? {
330                let n = self.parse_float()?;
331                let unit = self.parse_kv_duration_unit()?;
332                ttl_ms = Some((n * unit) as u64);
333            } else if self.consume_ident_ci("TAGS")? {
334                tags = self.parse_kv_tag_list()?;
335            } else if self.consume(&Token::If)? {
336                // IF NOT EXISTS
337                if !self.consume(&Token::Not)? && !self.consume_ident_ci("NOT")? {
338                    return Err(ParseError::expected(
339                        vec!["NOT"],
340                        self.peek(),
341                        self.position(),
342                    ));
343                }
344                if !self.consume(&Token::Exists)? && !self.consume_ident_ci("EXISTS")? {
345                    return Err(ParseError::expected(
346                        vec!["EXISTS"],
347                        self.peek(),
348                        self.position(),
349                    ));
350                }
351                if_not_exists = true;
352            } else {
353                break;
354            }
355        }
356
357        Ok(QueryExpr::KvCommand(KvCommand::Put {
358            model,
359            collection,
360            key,
361            value,
362            ttl_ms,
363            tags,
364            if_not_exists,
365        }))
366    }
367
368    /// Parse `INVALIDATE TAGS [tag, ...] FROM collection`.
369    pub(crate) fn parse_kv_invalidate_tags_after_invalidate(
370        &mut self,
371    ) -> Result<QueryExpr, ParseError> {
372        if !self.consume_ident_ci("TAGS")? {
373            return Err(ParseError::expected(
374                vec!["TAGS"],
375                self.peek(),
376                self.position(),
377            ));
378        }
379        let tags = self.parse_kv_tag_list()?;
380        if !self.consume(&Token::From)? && !self.consume_ident_ci("FROM")? {
381            return Err(ParseError::expected(
382                vec!["FROM"],
383                self.peek(),
384                self.position(),
385            ));
386        }
387        let collection = self.parse_keyed_collection_name()?;
388        Ok(QueryExpr::KvCommand(KvCommand::InvalidateTags {
389            collection,
390            tags,
391        }))
392    }
393
394    /// Parse a key that may be bare (`name`) or dotted (`collection.key`).
395    /// Keys with punctuation must be quoted as a string literal.
396    /// Returns `(collection, key)`.
397    pub(crate) fn parse_kv_key(
398        &mut self,
399        model: CollectionModel,
400    ) -> Result<(String, String), ParseError> {
401        let first = self.parse_kv_key_part()?;
402        if self.consume(&Token::Colon)? {
403            let second = self.parse_kv_key_part()?;
404            return Err(self.unquoted_kv_special_key_error(format!("'{first}:{second}'")));
405        }
406
407        if !self.consume(&Token::Dot)? {
408            return Ok((KV_DEFAULT_COLLECTION.to_string(), first));
409        }
410
411        let mut segments = vec![first, self.parse_kv_key_part()?];
412        while self.consume(&Token::Dot)? {
413            segments.push(self.parse_kv_key_part()?);
414        }
415        if self.consume(&Token::Colon)? {
416            let next = self.parse_kv_key_part()?;
417            let mut key = segments[1..].join(".");
418            key.push(':');
419            key.push_str(&next);
420            return Err(self.unquoted_kv_special_key_error(format!("{}.'{}'", segments[0], key)));
421        }
422
423        if model == CollectionModel::Vault {
424            let lower_segments: Vec<String> = segments
425                .iter()
426                .map(|segment| segment.to_ascii_lowercase())
427                .collect();
428            if lower_segments.len() >= 3
429                && lower_segments[0] == "red"
430                && lower_segments[1] == "vault"
431            {
432                return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
433            }
434            if lower_segments.len() >= 3
435                && lower_segments[0] == "red"
436                && lower_segments[1] == "secret"
437            {
438                return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
439            }
440            if lower_segments.len() >= 2 && lower_segments[0] == "secret" {
441                return Ok(("red.vault".to_string(), lower_segments[1..].join(".")));
442            }
443        }
444
445        Ok((segments.remove(0), segments.join(".")))
446    }
447
448    fn unquoted_kv_special_key_error(&self, suggestion: String) -> ParseError {
449        ParseError::new(
450            format!("KV keys containing ':' must be quoted as string literals; use {suggestion}"),
451            self.position(),
452        )
453    }
454
455    fn parse_kv_key_part(&mut self) -> Result<String, ParseError> {
456        match self.peek().clone() {
457            Token::String(value) => {
458                self.advance()?;
459                Ok(value)
460            }
461            Token::Ident(_) => self.expect_ident(),
462            _ => self.expect_ident_or_keyword(),
463        }
464    }
465
466    fn parse_keyed_list(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
467        let collection = self.expect_ident_or_keyword()?;
468        let mut prefix = None;
469        let mut limit = None;
470        let mut offset = 0usize;
471        loop {
472            if self.consume_ident_ci("PREFIX")? {
473                prefix = Some(self.expect_ident_or_keyword()?.to_string());
474            } else if self.consume(&Token::Limit)? || self.consume_ident_ci("LIMIT")? {
475                limit = Some(self.parse_float()?.round().max(0.0) as usize);
476            } else if self.consume(&Token::Offset)? || self.consume_ident_ci("OFFSET")? {
477                offset = self.parse_float()?.round().max(0.0) as usize;
478            } else {
479                break;
480            }
481        }
482        Ok(QueryExpr::KvCommand(KvCommand::List {
483            model,
484            collection,
485            prefix,
486            limit,
487            offset,
488        }))
489    }
490
491    pub(crate) fn parse_kv_watch(
492        &mut self,
493        model: CollectionModel,
494    ) -> Result<QueryExpr, ParseError> {
495        let first = self.expect_ident()?;
496        let (collection, key, prefix) = if model != CollectionModel::Kv {
497            let mut collection = first;
498            if self.consume(&Token::Dot)? {
499                let next = self.expect_ident_or_keyword()?;
500                collection = format!("{collection}.{next}");
501            }
502            if self.consume_ident_ci("PREFIX")? {
503                (collection, self.expect_ident_or_keyword()?, true)
504            } else {
505                (collection, self.expect_ident_or_keyword()?, false)
506            }
507        } else if self.consume(&Token::Dot)? {
508            if self.consume(&Token::Star)? {
509                (KV_DEFAULT_COLLECTION.to_string(), first, true)
510            } else {
511                let key = self.expect_ident_or_keyword()?;
512                if self.consume(&Token::Dot)? {
513                    self.expect(Token::Star)?;
514                    (first, key, true)
515                } else {
516                    (first, key, false)
517                }
518            }
519        } else {
520            (KV_DEFAULT_COLLECTION.to_string(), first, false)
521        };
522
523        let from_lsn = if self.consume(&Token::From)? || self.consume_ident_ci("FROM")? {
524            if !self.consume_ident_ci("LSN")? {
525                return Err(ParseError::expected(
526                    vec!["LSN"],
527                    self.peek(),
528                    self.position(),
529                ));
530            }
531            Some(self.parse_float()?.round() as u64)
532        } else {
533            None
534        };
535
536        Ok(QueryExpr::KvCommand(KvCommand::Watch {
537            model,
538            collection,
539            key,
540            prefix,
541            from_lsn,
542        }))
543    }
544
545    fn parse_keyed_collection_name(&mut self) -> Result<String, ParseError> {
546        let mut collection = self.expect_ident_or_keyword()?;
547        if self.consume(&Token::Dot)? {
548            let next = self.expect_ident_or_keyword()?;
549            collection = format!("{collection}.{next}");
550        }
551        Ok(collection)
552    }
553
554    /// Parse `INCR/DECR key [BY n] [EXPIRE dur]`. `sign` is +1 or -1.
555    fn parse_kv_incr(
556        &mut self,
557        model: CollectionModel,
558        sign: i64,
559    ) -> Result<QueryExpr, ParseError> {
560        let (collection, key) = self.parse_kv_key(model)?;
561        let mut by: i64 = sign;
562        let mut ttl_ms: Option<u64> = None;
563
564        loop {
565            if self.consume(&Token::By)? || self.consume_ident_ci("BY")? {
566                let n = self.parse_float()?;
567                by = sign * (n.round() as i64).max(1);
568            } else if self.consume_ident_ci("EXPIRE")? {
569                let n = self.parse_float()?;
570                let unit = self.parse_kv_duration_unit()?;
571                ttl_ms = Some((n * unit) as u64);
572            } else {
573                break;
574            }
575        }
576
577        Ok(QueryExpr::KvCommand(KvCommand::Incr {
578            model,
579            collection,
580            key,
581            by,
582            ttl_ms,
583        }))
584    }
585
586    pub(crate) fn parse_kv_tag_list(&mut self) -> Result<Vec<String>, ParseError> {
587        self.expect(Token::LBracket)?;
588        let mut tags = Vec::new();
589        while !self.check(&Token::RBracket) {
590            let tag = self.parse_kv_tag()?;
591            if !tag.is_empty() {
592                tags.push(tag);
593            }
594            if !self.consume(&Token::Comma)? {
595                break;
596            }
597        }
598        self.expect(Token::RBracket)?;
599        Ok(tags)
600    }
601
602    fn parse_kv_tag(&mut self) -> Result<String, ParseError> {
603        let mut tag = String::new();
604        loop {
605            match self.peek().clone() {
606                Token::Comma | Token::RBracket | Token::Eof => break,
607                Token::Ident(part) | Token::String(part) => {
608                    self.advance()?;
609                    tag.push_str(&part);
610                }
611                Token::Integer(n) => {
612                    self.advance()?;
613                    tag.push_str(&n.to_string());
614                }
615                Token::Float(n) => {
616                    self.advance()?;
617                    tag.push_str(&n.to_string());
618                }
619                Token::Colon => {
620                    self.advance()?;
621                    tag.push(':');
622                }
623                Token::Dot => {
624                    self.advance()?;
625                    tag.push('.');
626                }
627                Token::Dash => {
628                    self.advance()?;
629                    tag.push('-');
630                }
631                other => {
632                    return Err(ParseError::expected(vec!["tag"], &other, self.position()));
633                }
634            }
635        }
636        Ok(tag)
637    }
638
639    /// Parse `KV CAS key EXPECT <val|NULL> SET <val> [EXPIRE dur]`.
640    fn parse_kv_cas(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
641        let (collection, key) = self.parse_kv_key(model)?;
642
643        // EXPECT <value | NULL>
644        if !self.consume_ident_ci("EXPECT")? {
645            return Err(ParseError::expected(
646                vec!["EXPECT"],
647                self.peek(),
648                self.position(),
649            ));
650        }
651        let expected = if matches!(self.peek(), Token::Null) {
652            self.advance()?;
653            None
654        } else {
655            Some(self.parse_value()?)
656        };
657
658        // SET <value>
659        if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
660            return Err(ParseError::expected(
661                vec!["SET"],
662                self.peek(),
663                self.position(),
664            ));
665        }
666        let new_value = self.parse_value()?;
667
668        // Optional EXPIRE
669        let mut ttl_ms: Option<u64> = None;
670        if self.consume_ident_ci("EXPIRE")? {
671            let n = self.parse_float()?;
672            let unit = self.parse_kv_duration_unit()?;
673            ttl_ms = Some((n * unit) as u64);
674        }
675
676        Ok(QueryExpr::KvCommand(KvCommand::Cas {
677            model,
678            collection,
679            key,
680            expected,
681            new_value,
682            ttl_ms,
683        }))
684    }
685
686    /// Duration unit multiplier to milliseconds, defaulting to seconds.
687    fn parse_kv_duration_unit(&mut self) -> Result<f64, ParseError> {
688        let mult = match self.peek().clone() {
689            Token::Min => 60_000.0,
690            Token::Ident(ref unit) => match unit.to_ascii_lowercase().as_str() {
691                "ms" => 1.0,
692                "s" | "sec" | "secs" => 1_000.0,
693                "m" | "min" | "mins" => 60_000.0,
694                "h" | "hr" | "hrs" => 3_600_000.0,
695                "d" | "day" | "days" => 86_400_000.0,
696                _ => return Ok(1_000.0),
697            },
698            _ => return Ok(1_000.0),
699        };
700        self.advance()?;
701        Ok(mult)
702    }
703}