1use super::super::ast::{KvCommand, QueryExpr};
23use super::super::lexer::Token;
24use super::error::ParseError;
25use super::Parser;
26use crate::catalog::CollectionModel;
27
28pub const KV_DEFAULT_COLLECTION: &str = "kv_default";
30
31impl<'a> Parser<'a> {
32 pub fn parse_kv_command(&mut self) -> Result<QueryExpr, ParseError> {
34 self.expect(Token::Kv)?;
35 self.parse_keyed_command_body(CollectionModel::Kv)
36 }
37
38 pub fn parse_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
40 if !self.consume_ident_ci("VAULT")? {
41 return Err(ParseError::expected(
42 vec!["VAULT"],
43 self.peek(),
44 self.position(),
45 ));
46 }
47 self.parse_keyed_command_body(CollectionModel::Vault)
48 }
49
50 fn parse_keyed_command_body(
51 &mut self,
52 model: CollectionModel,
53 ) -> Result<QueryExpr, ParseError> {
54 match self.peek().clone() {
55 Token::Ident(ref name) if name.eq_ignore_ascii_case("PUT") => {
56 self.advance()?;
57 self.parse_kv_put(model)
58 }
59 Token::Ident(ref name) if name.eq_ignore_ascii_case("GET") => {
60 self.advance()?;
61 let (collection, key) = self.parse_kv_key(model)?;
62 Ok(QueryExpr::KvCommand(KvCommand::Get {
63 model,
64 collection,
65 key,
66 }))
67 }
68 Token::Ident(ref name) if name.eq_ignore_ascii_case("UNSEAL") => {
69 self.advance()?;
70 if model != CollectionModel::Vault {
71 return Err(ParseError::expected(
72 vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
73 self.peek(),
74 self.position(),
75 ));
76 }
77 let (collection, key) = self.parse_kv_key(model)?;
78 let version = self.parse_optional_vault_version()?;
79 Ok(QueryExpr::KvCommand(KvCommand::Unseal {
80 collection,
81 key,
82 version,
83 }))
84 }
85 Token::Ident(ref name) if name.eq_ignore_ascii_case("ROTATE") => {
86 self.advance()?;
87 if model != CollectionModel::Vault {
88 return Err(ParseError::expected(
89 vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
90 self.peek(),
91 self.position(),
92 ));
93 }
94 self.parse_vault_rotate_body()
95 }
96 Token::Ident(ref name) if name.eq_ignore_ascii_case("HISTORY") => {
97 self.advance()?;
98 if model != CollectionModel::Vault {
99 return Err(ParseError::expected(
100 vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
101 self.peek(),
102 self.position(),
103 ));
104 }
105 let (collection, key) = self.parse_kv_key(model)?;
106 Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
107 }
108 Token::Purge => {
109 self.advance()?;
110 if model != CollectionModel::Vault {
111 return Err(ParseError::expected(
112 vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
113 self.peek(),
114 self.position(),
115 ));
116 }
117 let (collection, key) = self.parse_kv_key(model)?;
118 Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
119 }
120 Token::Ident(ref name) if name.eq_ignore_ascii_case("PURGE") => {
121 self.advance()?;
122 if model != CollectionModel::Vault {
123 return Err(ParseError::expected(
124 vec!["PUT", "GET", "DELETE", "INCR", "DECR", "CAS"],
125 self.peek(),
126 self.position(),
127 ));
128 }
129 let (collection, key) = self.parse_kv_key(model)?;
130 Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
131 }
132 Token::Ident(ref name) if name.eq_ignore_ascii_case("LIST") => {
133 self.advance()?;
134 self.parse_keyed_list(model)
135 }
136 Token::Ident(ref name) if name.eq_ignore_ascii_case("WATCH") => {
137 self.advance()?;
138 self.parse_kv_watch(model)
139 }
140 Token::Delete => {
141 self.advance()?;
142 let (collection, key) = self.parse_kv_key(model)?;
143 Ok(QueryExpr::KvCommand(KvCommand::Delete {
144 model,
145 collection,
146 key,
147 }))
148 }
149 Token::Ident(ref name) if name.eq_ignore_ascii_case("DELETE") => {
150 self.advance()?;
151 let (collection, key) = self.parse_kv_key(model)?;
152 Ok(QueryExpr::KvCommand(KvCommand::Delete {
153 model,
154 collection,
155 key,
156 }))
157 }
158 Token::Ident(ref name) if name.eq_ignore_ascii_case("INCR") => {
159 self.advance()?;
160 self.parse_kv_incr(model, 1)
161 }
162 Token::Ident(ref name) if name.eq_ignore_ascii_case("DECR") => {
163 self.advance()?;
164 self.parse_kv_incr(model, -1)
165 }
166 Token::Ident(ref name) if name.eq_ignore_ascii_case("CAS") => {
167 self.advance()?;
168 self.parse_kv_cas(model)
169 }
170 Token::Ident(ref name) if name.eq_ignore_ascii_case("INVALIDATE") => {
171 self.advance()?;
172 self.parse_kv_invalidate_tags_after_invalidate()
173 }
174 _ => Err(ParseError::expected(
175 if model == CollectionModel::Vault {
176 vec![
177 "PUT", "GET", "UNSEAL", "ROTATE", "HISTORY", "LIST", "WATCH", "DELETE",
178 "PURGE", "INCR", "DECR", "CAS",
179 ]
180 } else {
181 vec![
182 "PUT",
183 "GET",
184 "WATCH",
185 "DELETE",
186 "INCR",
187 "DECR",
188 "CAS",
189 "INVALIDATE",
190 ]
191 },
192 self.peek(),
193 self.position(),
194 )),
195 }
196 }
197
198 pub(crate) fn parse_vault_list_after_list(&mut self) -> Result<QueryExpr, ParseError> {
199 if !self.consume_ident_ci("VAULT")? {
200 return Err(ParseError::expected(
201 vec!["VAULT"],
202 self.peek(),
203 self.position(),
204 ));
205 }
206 self.parse_keyed_list(CollectionModel::Vault)
207 }
208
209 pub(crate) fn parse_vault_watch_after_watch(&mut self) -> Result<QueryExpr, ParseError> {
210 if !self.consume_ident_ci("VAULT")? {
211 return Err(ParseError::expected(
212 vec!["VAULT"],
213 self.peek(),
214 self.position(),
215 ));
216 }
217 self.parse_kv_watch(CollectionModel::Vault)
218 }
219
220 pub fn parse_unseal_vault_command(&mut self) -> Result<QueryExpr, ParseError> {
222 if !self.consume_ident_ci("UNSEAL")? {
223 return Err(ParseError::expected(
224 vec!["UNSEAL"],
225 self.peek(),
226 self.position(),
227 ));
228 }
229 if !self.consume_ident_ci("VAULT")? {
230 return Err(ParseError::expected(
231 vec!["VAULT"],
232 self.peek(),
233 self.position(),
234 ));
235 }
236 let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
237 let version = self.parse_optional_vault_version()?;
238 Ok(QueryExpr::KvCommand(KvCommand::Unseal {
239 collection,
240 key,
241 version,
242 }))
243 }
244
245 pub fn parse_vault_lifecycle_command(&mut self) -> Result<QueryExpr, ParseError> {
247 let operation = if matches!(self.peek(), Token::Purge) {
248 self.advance()?;
249 "PURGE".to_string()
250 } else {
251 self.expect_ident_or_keyword()?.to_ascii_uppercase()
252 };
253 if !self.consume_ident_ci("VAULT")? {
254 return Err(ParseError::expected(
255 vec!["VAULT"],
256 self.peek(),
257 self.position(),
258 ));
259 }
260 match operation.as_str() {
261 "ROTATE" => self.parse_vault_rotate_body(),
262 "HISTORY" => {
263 let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
264 Ok(QueryExpr::KvCommand(KvCommand::History { collection, key }))
265 }
266 "DELETE" => {
267 let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
268 Ok(QueryExpr::KvCommand(KvCommand::Delete {
269 model: CollectionModel::Vault,
270 collection,
271 key,
272 }))
273 }
274 "PURGE" => {
275 let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
276 Ok(QueryExpr::KvCommand(KvCommand::Purge { collection, key }))
277 }
278 _ => Err(ParseError::expected(
279 vec!["ROTATE", "HISTORY", "DELETE", "PURGE"],
280 self.peek(),
281 self.position(),
282 )),
283 }
284 }
285
286 fn parse_vault_rotate_body(&mut self) -> Result<QueryExpr, ParseError> {
287 let (collection, key) = self.parse_kv_key(CollectionModel::Vault)?;
288 self.expect(Token::Eq)?;
289 let value = self.parse_value()?;
290 let tags = if self.consume_ident_ci("TAGS")? {
291 self.parse_kv_tag_list()?
292 } else {
293 Vec::new()
294 };
295 Ok(QueryExpr::KvCommand(KvCommand::Rotate {
296 collection,
297 key,
298 value,
299 tags,
300 }))
301 }
302
303 fn parse_optional_vault_version(&mut self) -> Result<Option<i64>, ParseError> {
304 if self.consume_ident_ci("VERSION")? {
305 return Ok(Some(self.parse_float()?.round() as i64));
306 }
307 Ok(None)
308 }
309
310 fn parse_kv_put(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
311 let (collection, key) = self.parse_kv_key(model)?;
312
313 if !self.consume(&Token::Eq)? {
315 return Err(ParseError::expected(
316 vec!["="],
317 self.peek(),
318 self.position(),
319 ));
320 }
321
322 let value = self.parse_value()?;
323
324 let mut ttl_ms: Option<u64> = None;
325 let mut tags: Vec<String> = Vec::new();
326 let mut if_not_exists = false;
327
328 loop {
329 if self.consume_ident_ci("EXPIRE")? {
330 let n = self.parse_float()?;
331 let unit = self.parse_kv_duration_unit()?;
332 ttl_ms = Some((n * unit) as u64);
333 } else if self.consume_ident_ci("TAGS")? {
334 tags = self.parse_kv_tag_list()?;
335 } else if self.consume(&Token::If)? {
336 if !self.consume(&Token::Not)? && !self.consume_ident_ci("NOT")? {
338 return Err(ParseError::expected(
339 vec!["NOT"],
340 self.peek(),
341 self.position(),
342 ));
343 }
344 if !self.consume(&Token::Exists)? && !self.consume_ident_ci("EXISTS")? {
345 return Err(ParseError::expected(
346 vec!["EXISTS"],
347 self.peek(),
348 self.position(),
349 ));
350 }
351 if_not_exists = true;
352 } else {
353 break;
354 }
355 }
356
357 Ok(QueryExpr::KvCommand(KvCommand::Put {
358 model,
359 collection,
360 key,
361 value,
362 ttl_ms,
363 tags,
364 if_not_exists,
365 }))
366 }
367
368 pub(crate) fn parse_kv_invalidate_tags_after_invalidate(
370 &mut self,
371 ) -> Result<QueryExpr, ParseError> {
372 if !self.consume_ident_ci("TAGS")? {
373 return Err(ParseError::expected(
374 vec!["TAGS"],
375 self.peek(),
376 self.position(),
377 ));
378 }
379 let tags = self.parse_kv_tag_list()?;
380 if !self.consume(&Token::From)? && !self.consume_ident_ci("FROM")? {
381 return Err(ParseError::expected(
382 vec!["FROM"],
383 self.peek(),
384 self.position(),
385 ));
386 }
387 let collection = self.parse_keyed_collection_name()?;
388 Ok(QueryExpr::KvCommand(KvCommand::InvalidateTags {
389 collection,
390 tags,
391 }))
392 }
393
394 pub(crate) fn parse_kv_key(
398 &mut self,
399 model: CollectionModel,
400 ) -> Result<(String, String), ParseError> {
401 let first = self.parse_kv_key_part()?;
402 if self.consume(&Token::Colon)? {
403 let second = self.parse_kv_key_part()?;
404 return Err(self.unquoted_kv_special_key_error(format!("'{first}:{second}'")));
405 }
406
407 if !self.consume(&Token::Dot)? {
408 return Ok((KV_DEFAULT_COLLECTION.to_string(), first));
409 }
410
411 let mut segments = vec![first, self.parse_kv_key_part()?];
412 while self.consume(&Token::Dot)? {
413 segments.push(self.parse_kv_key_part()?);
414 }
415 if self.consume(&Token::Colon)? {
416 let next = self.parse_kv_key_part()?;
417 let mut key = segments[1..].join(".");
418 key.push(':');
419 key.push_str(&next);
420 return Err(self.unquoted_kv_special_key_error(format!("{}.'{}'", segments[0], key)));
421 }
422
423 if model == CollectionModel::Vault {
424 let lower_segments: Vec<String> = segments
425 .iter()
426 .map(|segment| segment.to_ascii_lowercase())
427 .collect();
428 if lower_segments.len() >= 3
429 && lower_segments[0] == "red"
430 && lower_segments[1] == "vault"
431 {
432 return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
433 }
434 if lower_segments.len() >= 3
435 && lower_segments[0] == "red"
436 && lower_segments[1] == "secret"
437 {
438 return Ok(("red.vault".to_string(), lower_segments[2..].join(".")));
439 }
440 if lower_segments.len() >= 2 && lower_segments[0] == "secret" {
441 return Ok(("red.vault".to_string(), lower_segments[1..].join(".")));
442 }
443 }
444
445 Ok((segments.remove(0), segments.join(".")))
446 }
447
448 fn unquoted_kv_special_key_error(&self, suggestion: String) -> ParseError {
449 ParseError::new(
450 format!("KV keys containing ':' must be quoted as string literals; use {suggestion}"),
451 self.position(),
452 )
453 }
454
455 fn parse_kv_key_part(&mut self) -> Result<String, ParseError> {
456 match self.peek().clone() {
457 Token::String(value) => {
458 self.advance()?;
459 Ok(value)
460 }
461 Token::Ident(_) => self.expect_ident(),
462 _ => self.expect_ident_or_keyword(),
463 }
464 }
465
466 fn parse_keyed_list(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
467 let collection = self.expect_ident_or_keyword()?;
468 let mut prefix = None;
469 let mut limit = None;
470 let mut offset = 0usize;
471 loop {
472 if self.consume_ident_ci("PREFIX")? {
473 prefix = Some(self.expect_ident_or_keyword()?.to_string());
474 } else if self.consume(&Token::Limit)? || self.consume_ident_ci("LIMIT")? {
475 limit = Some(self.parse_float()?.round().max(0.0) as usize);
476 } else if self.consume(&Token::Offset)? || self.consume_ident_ci("OFFSET")? {
477 offset = self.parse_float()?.round().max(0.0) as usize;
478 } else {
479 break;
480 }
481 }
482 Ok(QueryExpr::KvCommand(KvCommand::List {
483 model,
484 collection,
485 prefix,
486 limit,
487 offset,
488 }))
489 }
490
491 pub(crate) fn parse_kv_watch(
492 &mut self,
493 model: CollectionModel,
494 ) -> Result<QueryExpr, ParseError> {
495 let first = self.expect_ident()?;
496 let (collection, key, prefix) = if model != CollectionModel::Kv {
497 let mut collection = first;
498 if self.consume(&Token::Dot)? {
499 let next = self.expect_ident_or_keyword()?;
500 collection = format!("{collection}.{next}");
501 }
502 if self.consume_ident_ci("PREFIX")? {
503 (collection, self.expect_ident_or_keyword()?, true)
504 } else {
505 (collection, self.expect_ident_or_keyword()?, false)
506 }
507 } else if self.consume(&Token::Dot)? {
508 if self.consume(&Token::Star)? {
509 (KV_DEFAULT_COLLECTION.to_string(), first, true)
510 } else {
511 let key = self.expect_ident_or_keyword()?;
512 if self.consume(&Token::Dot)? {
513 self.expect(Token::Star)?;
514 (first, key, true)
515 } else {
516 (first, key, false)
517 }
518 }
519 } else {
520 (KV_DEFAULT_COLLECTION.to_string(), first, false)
521 };
522
523 let from_lsn = if self.consume(&Token::From)? || self.consume_ident_ci("FROM")? {
524 if !self.consume_ident_ci("LSN")? {
525 return Err(ParseError::expected(
526 vec!["LSN"],
527 self.peek(),
528 self.position(),
529 ));
530 }
531 Some(self.parse_float()?.round() as u64)
532 } else {
533 None
534 };
535
536 Ok(QueryExpr::KvCommand(KvCommand::Watch {
537 model,
538 collection,
539 key,
540 prefix,
541 from_lsn,
542 }))
543 }
544
545 fn parse_keyed_collection_name(&mut self) -> Result<String, ParseError> {
546 let mut collection = self.expect_ident_or_keyword()?;
547 if self.consume(&Token::Dot)? {
548 let next = self.expect_ident_or_keyword()?;
549 collection = format!("{collection}.{next}");
550 }
551 Ok(collection)
552 }
553
554 fn parse_kv_incr(
556 &mut self,
557 model: CollectionModel,
558 sign: i64,
559 ) -> Result<QueryExpr, ParseError> {
560 let (collection, key) = self.parse_kv_key(model)?;
561 let mut by: i64 = sign;
562 let mut ttl_ms: Option<u64> = None;
563
564 loop {
565 if self.consume(&Token::By)? || self.consume_ident_ci("BY")? {
566 let n = self.parse_float()?;
567 by = sign * (n.round() as i64).max(1);
568 } else if self.consume_ident_ci("EXPIRE")? {
569 let n = self.parse_float()?;
570 let unit = self.parse_kv_duration_unit()?;
571 ttl_ms = Some((n * unit) as u64);
572 } else {
573 break;
574 }
575 }
576
577 Ok(QueryExpr::KvCommand(KvCommand::Incr {
578 model,
579 collection,
580 key,
581 by,
582 ttl_ms,
583 }))
584 }
585
586 pub(crate) fn parse_kv_tag_list(&mut self) -> Result<Vec<String>, ParseError> {
587 self.expect(Token::LBracket)?;
588 let mut tags = Vec::new();
589 while !self.check(&Token::RBracket) {
590 let tag = self.parse_kv_tag()?;
591 if !tag.is_empty() {
592 tags.push(tag);
593 }
594 if !self.consume(&Token::Comma)? {
595 break;
596 }
597 }
598 self.expect(Token::RBracket)?;
599 Ok(tags)
600 }
601
602 fn parse_kv_tag(&mut self) -> Result<String, ParseError> {
603 let mut tag = String::new();
604 loop {
605 match self.peek().clone() {
606 Token::Comma | Token::RBracket | Token::Eof => break,
607 Token::Ident(part) | Token::String(part) => {
608 self.advance()?;
609 tag.push_str(&part);
610 }
611 Token::Integer(n) => {
612 self.advance()?;
613 tag.push_str(&n.to_string());
614 }
615 Token::Float(n) => {
616 self.advance()?;
617 tag.push_str(&n.to_string());
618 }
619 Token::Colon => {
620 self.advance()?;
621 tag.push(':');
622 }
623 Token::Dot => {
624 self.advance()?;
625 tag.push('.');
626 }
627 Token::Dash => {
628 self.advance()?;
629 tag.push('-');
630 }
631 other => {
632 return Err(ParseError::expected(vec!["tag"], &other, self.position()));
633 }
634 }
635 }
636 Ok(tag)
637 }
638
639 fn parse_kv_cas(&mut self, model: CollectionModel) -> Result<QueryExpr, ParseError> {
641 let (collection, key) = self.parse_kv_key(model)?;
642
643 if !self.consume_ident_ci("EXPECT")? {
645 return Err(ParseError::expected(
646 vec!["EXPECT"],
647 self.peek(),
648 self.position(),
649 ));
650 }
651 let expected = if matches!(self.peek(), Token::Null) {
652 self.advance()?;
653 None
654 } else {
655 Some(self.parse_value()?)
656 };
657
658 if !self.consume(&Token::Set)? && !self.consume_ident_ci("SET")? {
660 return Err(ParseError::expected(
661 vec!["SET"],
662 self.peek(),
663 self.position(),
664 ));
665 }
666 let new_value = self.parse_value()?;
667
668 let mut ttl_ms: Option<u64> = None;
670 if self.consume_ident_ci("EXPIRE")? {
671 let n = self.parse_float()?;
672 let unit = self.parse_kv_duration_unit()?;
673 ttl_ms = Some((n * unit) as u64);
674 }
675
676 Ok(QueryExpr::KvCommand(KvCommand::Cas {
677 model,
678 collection,
679 key,
680 expected,
681 new_value,
682 ttl_ms,
683 }))
684 }
685
686 fn parse_kv_duration_unit(&mut self) -> Result<f64, ParseError> {
688 let mult = match self.peek().clone() {
689 Token::Min => 60_000.0,
690 Token::Ident(ref unit) => match unit.to_ascii_lowercase().as_str() {
691 "ms" => 1.0,
692 "s" | "sec" | "secs" => 1_000.0,
693 "m" | "min" | "mins" => 60_000.0,
694 "h" | "hr" | "hrs" => 3_600_000.0,
695 "d" | "day" | "days" => 86_400_000.0,
696 _ => return Ok(1_000.0),
697 },
698 _ => return Ok(1_000.0),
699 };
700 self.advance()?;
701 Ok(mult)
702 }
703}