1use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
7 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
8 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
9 PartitionSpec, QueryExpr, TruncateQuery,
10};
11use crate::lexer::Token;
12use reddb_types::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use reddb_types::types::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 analytics_config: Vec::new(),
87 vault_own_master_key: false,
88 ai_policy: None,
89 }))
90 }
91
92 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
94 self.expect(Token::Drop)?;
95 self.expect(Token::Table)?;
96 self.parse_drop_table_body()
97 }
98
99 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
101 let if_not_exists = self.match_if_not_exists()?;
102 let name = self.expect_ident()?;
103
104 self.expect(Token::LParen)?;
105 let mut columns = Vec::new();
106 loop {
107 let col = self.parse_column_def()?;
108 columns.push(col);
109 if !self.consume(&Token::Comma)? {
110 break;
111 }
112 }
113 self.expect(Token::RParen)?;
114
115 let mut default_ttl_ms = None;
116 let mut context_index_fields = Vec::new();
117 let mut context_index_enabled = false;
118 let mut timestamps = false;
119 let mut tenant_by: Option<String> = None;
120 let mut append_only = false;
121 let mut subscriptions = Vec::new();
122 let mut ai_policy = reddb_types::catalog::AiPolicy::default();
123
124 while self.consume(&Token::With)? {
125 if self.consume_ident_ci("EVENTS")? {
126 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
127 continue;
128 }
129 let has_parens = self.consume(&Token::LParen)?;
136
137 loop {
138 if self.consume_ident_ci("CONTEXT_INDEX")? {
139 context_index_enabled = self.parse_bool_assign()?;
140 } else if self.consume_ident_ci("CONTEXT")? {
141 if !self.consume(&Token::Index)? {
142 return Err(ParseError::expected(
143 vec!["INDEX"],
144 self.peek(),
145 self.position(),
146 ));
147 }
148 self.expect(Token::On)?;
149 self.expect(Token::LParen)?;
150 loop {
151 context_index_fields.push(self.expect_ident()?);
152 if !self.consume(&Token::Comma)? {
153 break;
154 }
155 }
156 self.expect(Token::RParen)?;
157 context_index_enabled = true;
158 } else if self.consume_ident_ci("TIMESTAMPS")? {
159 timestamps = self.parse_bool_assign()?;
160 } else if self.consume_ident_ci("EMBED")? {
161 if ai_policy.embed.is_some() {
162 return Err(ParseError::new(
163 "duplicate EMBED clause in AI policy".to_string(),
164 self.position(),
165 ));
166 }
167 ai_policy.embed = Some(self.parse_ai_embed_policy()?);
168 } else if self.consume_ident_ci("MODERATE")? {
169 if ai_policy.moderate.is_some() {
170 return Err(ParseError::new(
171 "duplicate MODERATE clause in AI policy".to_string(),
172 self.position(),
173 ));
174 }
175 ai_policy.moderate = Some(self.parse_ai_moderate_policy()?);
176 } else if self.consume_ident_ci("VISION")? {
177 if ai_policy.vision.is_some() {
178 return Err(ParseError::new(
179 "duplicate VISION clause in AI policy".to_string(),
180 self.position(),
181 ));
182 }
183 ai_policy.vision = Some(self.parse_ai_vision_policy()?);
184 } else if self.consume_ident_ci("APPEND_ONLY")? {
185 append_only = self.parse_bool_assign()?;
186 } else if self.consume_ident_ci("TENANT_BY")? {
187 let _ = self.consume(&Token::Eq)?;
190 let value = self.parse_literal_value()?;
191 match value {
192 Value::Text(col) => tenant_by = Some(col.to_string()),
193 other => {
194 return Err(ParseError::new(
195 format!("WITH tenant_by expects a text literal, got {other:?}"),
196 self.position(),
197 ));
198 }
199 }
200 } else {
201 default_ttl_ms = self.parse_create_table_ttl_clause()?;
202 }
203 if has_parens {
204 if self.consume(&Token::Comma)? {
205 continue;
206 }
207 self.expect(Token::RParen)?;
208 }
209 break;
210 }
211 }
212
213 let partition_by = if self.consume(&Token::Partition)? {
215 self.expect(Token::By)?;
216 let kind = if self.consume(&Token::Range)? {
217 PartitionKind::Range
218 } else if self.consume(&Token::List)? {
219 PartitionKind::List
220 } else if self.consume(&Token::Hash)? {
221 PartitionKind::Hash
222 } else {
223 return Err(ParseError::expected(
224 vec!["RANGE", "LIST", "HASH"],
225 self.peek(),
226 self.position(),
227 ));
228 };
229 self.expect(Token::LParen)?;
230 let column = self.expect_ident()?;
231 self.expect(Token::RParen)?;
232 Some(PartitionSpec { kind, column })
233 } else {
234 None
235 };
236
237 if !append_only && self.consume_ident_ci("APPEND")? {
242 if !self.consume_ident_ci("ONLY")? {
243 return Err(ParseError::expected(
244 vec!["ONLY"],
245 self.peek(),
246 self.position(),
247 ));
248 }
249 append_only = true;
250 }
251
252 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
263 self.expect(Token::By)?;
264 self.expect(Token::LParen)?;
265 let mut path = self.expect_ident_or_keyword()?;
269 while self.consume(&Token::Dot)? {
270 let next = self.expect_ident_or_keyword()?;
271 path = format!("{path}.{next}");
272 }
273 self.expect(Token::RParen)?;
274 tenant_by = Some(path);
275 }
276
277 Ok(QueryExpr::CreateTable(CreateTableQuery {
278 collection_model: CollectionModel::Table,
279 name,
280 columns,
281 if_not_exists,
282 default_ttl_ms,
283 metrics_rollup_policies: Vec::new(),
284 context_index_fields,
285 context_index_enabled,
286 timestamps,
287 partition_by,
288 tenant_by,
289 append_only,
290 subscriptions,
291 analytics_config: Vec::new(),
292 vault_own_master_key: false,
293 ai_policy: (!ai_policy.is_empty()).then_some(ai_policy),
294 }))
295 }
296
297 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
303 self.expect(Token::Explain)?;
304 self.expect(Token::Alter)?;
305 self.expect(Token::For)?;
306 self.expect(Token::Create)?;
307 self.expect(Token::Table)?;
308
309 let body = self.parse_create_table_body()?;
310 let target = match body {
311 QueryExpr::CreateTable(t) => t,
312 _ => {
313 return Err(ParseError::new(
314 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
315 .to_string(),
316 self.position(),
317 ));
318 }
319 };
320
321 let format = if self.consume(&Token::Format)? {
322 if self.consume(&Token::Json)? {
323 ExplainFormat::Json
324 } else if self.consume_ident_ci("SQL")? {
325 ExplainFormat::Sql
326 } else {
327 return Err(ParseError::expected(
328 vec!["JSON", "SQL"],
329 self.peek(),
330 self.position(),
331 ));
332 }
333 } else {
334 ExplainFormat::Sql
335 };
336
337 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
338 target,
339 format,
340 }))
341 }
342
343 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
345 let if_exists = self.match_if_exists()?;
346 let name = self.parse_drop_collection_name()?;
347 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
348 }
349
350 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
351 let if_exists = self.match_if_exists()?;
352 let name = self.parse_drop_collection_name()?;
353 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
354 }
355
356 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
357 let if_exists = self.match_if_exists()?;
358 let name = self.parse_drop_collection_name()?;
359 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
360 }
361
362 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
363 let if_exists = self.match_if_exists()?;
364 let name = self.parse_drop_collection_name()?;
365 Ok(QueryExpr::DropDocument(DropDocumentQuery {
366 name,
367 if_exists,
368 }))
369 }
370
371 pub fn parse_create_keyed_body(
372 &mut self,
373 model: CollectionModel,
374 ) -> Result<QueryExpr, ParseError> {
375 let if_not_exists = self.match_if_not_exists()?;
376 let name = self.parse_drop_collection_name()?;
377 let vault_own_master_key =
378 if model == CollectionModel::Vault && self.consume(&Token::With)? {
379 if !self.consume_ident_ci("OWN")? {
380 return Err(ParseError::expected(
381 vec!["OWN"],
382 self.peek(),
383 self.position(),
384 ));
385 }
386 if !self.consume_ident_ci("MASTER")? {
387 return Err(ParseError::expected(
388 vec!["MASTER"],
389 self.peek(),
390 self.position(),
391 ));
392 }
393 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
394 return Err(ParseError::expected(
395 vec!["KEY"],
396 self.peek(),
397 self.position(),
398 ));
399 }
400 true
401 } else {
402 false
403 };
404 let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
408 if !self.consume_ident_ci("ANALYTICS")? {
409 return Err(ParseError::expected(
410 vec!["ANALYTICS"],
411 self.peek(),
412 self.position(),
413 ));
414 }
415 self.parse_analytics_clause()?
416 } else {
417 Vec::new()
418 };
419 Ok(QueryExpr::CreateTable(CreateTableQuery {
420 collection_model: model,
421 name,
422 columns: Vec::new(),
423 if_not_exists,
424 default_ttl_ms: None,
425 metrics_rollup_policies: Vec::new(),
426 context_index_fields: Vec::new(),
427 context_index_enabled: false,
428 timestamps: false,
429 partition_by: None,
430 tenant_by: None,
431 append_only: false,
432 subscriptions: Vec::new(),
433 analytics_config,
434 vault_own_master_key,
435 ai_policy: None,
436 }))
437 }
438
439 fn parse_analytics_clause(
445 &mut self,
446 ) -> Result<Vec<reddb_types::catalog::AnalyticsViewDescriptor>, ParseError> {
447 use reddb_types::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
448
449 self.expect(Token::LParen)?;
450 let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
451 loop {
452 let output_name = self.parse_analytics_output_name()?;
453 let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
454 ParseError::new(
455 format!(
456 "unknown analytics output '{output_name}': expected communities, components, or centrality"
457 ),
458 self.position(),
459 )
460 })?;
461 if views.iter().any(|view| view.output == output) {
462 return Err(ParseError::new(
463 format!("duplicate analytics output '{output_name}'"),
464 self.position(),
465 ));
466 }
467 let mut view = AnalyticsViewDescriptor {
468 output,
469 algorithm: None,
470 resolution: None,
471 max_iterations: None,
472 tolerance: None,
473 };
474 if self.consume(&Token::LParen)? {
475 loop {
476 let key = self.parse_analytics_option_key()?;
477 self.expect(Token::Eq)?;
478 match key.as_str() {
479 "using" => {
480 view.algorithm =
481 Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
482 }
483 "resolution" => view.resolution = Some(self.parse_float()?),
484 "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
485 "tolerance" => view.tolerance = Some(self.parse_float()?),
486 other => {
487 return Err(ParseError::new(
488 format!(
489 "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
490 ),
491 self.position(),
492 ))
493 }
494 }
495 if !self.consume(&Token::Comma)? {
496 break;
497 }
498 }
499 self.expect(Token::RParen)?;
500 }
501 views.push(view);
502 if !self.consume(&Token::Comma)? {
503 break;
504 }
505 }
506 self.expect(Token::RParen)?;
507 if views.is_empty() {
508 return Err(ParseError::new(
509 "WITH ANALYTICS requires at least one output".to_string(),
510 self.position(),
511 ));
512 }
513 Ok(views)
514 }
515
516 fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
520 match self.peek() {
521 Token::Components => {
522 self.advance()?;
523 Ok("components".to_string())
524 }
525 Token::Centrality => {
526 self.advance()?;
527 Ok("centrality".to_string())
528 }
529 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
530 }
531 }
532
533 fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
536 match self.peek() {
537 Token::Using => {
538 self.advance()?;
539 Ok("using".to_string())
540 }
541 Token::MaxIterations => {
542 self.advance()?;
543 Ok("max_iterations".to_string())
544 }
545 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
546 }
547 }
548
549 pub fn parse_create_collection_model_body(
550 &mut self,
551 model: CollectionModel,
552 ) -> Result<QueryExpr, ParseError> {
553 self.parse_create_keyed_body(model)
554 }
555
556 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
557 let if_not_exists = self.match_if_not_exists()?;
558 let name = self.parse_drop_collection_name()?;
559 if !self.consume_ident_ci("KIND")? {
560 return Err(ParseError::expected(
561 vec!["KIND"],
562 self.peek(),
563 self.position(),
564 ));
565 }
566 let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
567 while self.consume(&Token::Dot)? {
568 let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
569 kind.push('.');
570 kind.push_str(&part);
571 }
572 let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
573 if !self.consume_ident_ci("DIM")? {
574 return Err(ParseError::expected(
575 vec!["DIM"],
576 self.peek(),
577 self.position(),
578 ));
579 }
580 let dimension = self.parse_integer()?;
581 if dimension <= 0 {
582 return Err(ParseError::new(
583 "VECTOR DIM must be a positive integer".to_string(),
584 self.position(),
585 ));
586 }
587 let metric = if self.consume(&Token::Metric)? {
588 self.parse_distance_metric()?
589 } else {
590 reddb_types::distance::DistanceMetric::Cosine
591 };
592 (Some(dimension as usize), Some(metric))
593 } else {
594 (None, None)
595 };
596 let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
597 self.parse_signed_by_list()?
598 } else {
599 Vec::new()
600 };
601 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
602 name,
603 kind,
604 if_not_exists,
605 vector_dimension,
606 vector_metric,
607 allowed_signers,
608 }))
609 }
610
611 fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
615 let hex = match self.peek().clone() {
616 Token::String(s) => {
617 self.advance()?;
618 s
619 }
620 _ => {
621 return Err(ParseError::expected(
622 vec!["string literal (ed25519 pubkey hex)"],
623 self.peek(),
624 self.position(),
625 ));
626 }
627 };
628 decode_hex_32(&hex).map_err(|msg| {
629 ParseError::new(
630 format!("SIGNER pubkey '{hex}' invalid: {msg}"),
631 self.position(),
632 )
633 })
634 }
635
636 fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
641 self.expect(Token::LParen)?;
642 let mut out = Vec::new();
643 loop {
644 let hex = match self.peek().clone() {
645 Token::String(s) => {
646 self.advance()?;
647 s
648 }
649 _ => {
650 return Err(ParseError::expected(
651 vec!["string literal (ed25519 pubkey hex)"],
652 self.peek(),
653 self.position(),
654 ));
655 }
656 };
657 let bytes = decode_hex_32(&hex).map_err(|msg| {
658 ParseError::new(
659 format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
660 self.position(),
661 )
662 })?;
663 out.push(bytes);
664 if !self.consume(&Token::Comma)? {
665 break;
666 }
667 }
668 self.expect(Token::RParen)?;
669 if out.is_empty() {
670 return Err(ParseError::new(
671 "SIGNED_BY list must contain at least one pubkey".to_string(),
672 self.position(),
673 ));
674 }
675 Ok(out)
676 }
677
678 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
679 let if_not_exists = self.match_if_not_exists()?;
680 let name = self.parse_drop_collection_name()?;
681 if !self.consume_ident_ci("DIM")? {
682 return Err(ParseError::expected(
683 vec!["DIM"],
684 self.peek(),
685 self.position(),
686 ));
687 }
688 let dimension = self.parse_integer()?;
689 if dimension <= 0 {
690 return Err(ParseError::new(
691 "VECTOR DIM must be a positive integer".to_string(),
692 self.position(),
693 ));
694 }
695 let metric = if self.consume(&Token::Metric)? {
696 self.parse_distance_metric()?
697 } else {
698 reddb_types::distance::DistanceMetric::Cosine
699 };
700 Ok(QueryExpr::CreateVector(CreateVectorQuery {
701 name,
702 dimension: dimension as usize,
703 metric,
704 if_not_exists,
705 }))
706 }
707
708 pub fn parse_drop_keyed_body(
709 &mut self,
710 model: CollectionModel,
711 ) -> Result<QueryExpr, ParseError> {
712 let if_exists = self.match_if_exists()?;
713 let name = self.parse_drop_collection_name()?;
714 Ok(QueryExpr::DropKv(DropKvQuery {
715 name,
716 if_exists,
717 model,
718 }))
719 }
720
721 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
722 self.parse_drop_keyed_body(CollectionModel::Kv)
723 }
724
725 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
726 self.parse_drop_collection_model_body(None)
727 }
728
729 pub fn parse_drop_collection_model_body(
730 &mut self,
731 model: Option<CollectionModel>,
732 ) -> Result<QueryExpr, ParseError> {
733 let if_exists = self.match_if_exists()?;
734 let name = self.parse_drop_collection_name()?;
735 Ok(QueryExpr::DropCollection(DropCollectionQuery {
736 name,
737 if_exists,
738 model,
739 }))
740 }
741
742 pub fn parse_truncate_body(
743 &mut self,
744 model: Option<CollectionModel>,
745 ) -> Result<QueryExpr, ParseError> {
746 let if_exists = self.match_if_exists()?;
747 let name = self.parse_drop_collection_name()?;
748 Ok(QueryExpr::Truncate(TruncateQuery {
749 name,
750 model,
751 if_exists,
752 }))
753 }
754
755 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
756 let mut name = self.expect_ident()?;
757 while self.consume(&Token::Dot)? {
758 if self.consume(&Token::Star)? {
759 name.push_str(".*");
760 break;
761 }
762 let next = self.expect_ident_or_keyword()?;
763 name = format!("{name}.{next}");
764 }
765 Ok(name)
766 }
767
768 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
775 self.expect(Token::Alter)?;
776 if !self.consume(&Token::Table)?
777 && !self.consume(&Token::Collection)?
778 && !self.consume_ident_ci("COLLECTION")?
779 {
780 return Err(ParseError::expected(
781 vec!["TABLE", "COLLECTION"],
782 self.peek(),
783 self.position(),
784 ));
785 }
786 let name = self.expect_ident()?;
787
788 let mut operations = Vec::new();
789 loop {
790 let op = self.parse_alter_operation(&name)?;
791 operations.push(op);
792 if !self.consume(&Token::Comma)? {
793 break;
794 }
795 }
796
797 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
798 }
799
800 pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
809 self.expect(Token::Alter)?;
810 self.expect(Token::Graph)?;
811 let name = self.expect_ident()?;
812
813 let mut operations = Vec::new();
814 loop {
815 operations.push(self.parse_alter_graph_operation()?);
816 if !self.consume(&Token::Comma)? {
817 break;
818 }
819 }
820
821 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
822 }
823
824 fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
827 if self.consume(&Token::Add)? {
828 if !self.consume_ident_ci("ANALYTICS")? {
829 return Err(ParseError::expected(
830 vec!["ANALYTICS"],
831 self.peek(),
832 self.position(),
833 ));
834 }
835 let views = self.parse_analytics_clause()?;
838 Ok(AlterOperation::AddAnalytics(views))
839 } else if self.consume(&Token::Drop)? {
840 if !self.consume_ident_ci("ANALYTICS")? {
841 return Err(ParseError::expected(
842 vec!["ANALYTICS"],
843 self.peek(),
844 self.position(),
845 ));
846 }
847 let output_name = self.parse_analytics_output_name()?;
848 let output = reddb_types::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
849 ParseError::new(
850 format!(
851 "unknown analytics output '{output_name}': expected communities, components, or centrality"
852 ),
853 self.position(),
854 )
855 })?;
856 Ok(AlterOperation::DropAnalytics(output))
857 } else {
858 Err(ParseError::expected(
859 vec!["ADD", "DROP"],
860 self.peek(),
861 self.position(),
862 ))
863 }
864 }
865
866 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
868 if self.consume(&Token::Add)? {
869 if self.consume_ident_ci("SUBSCRIPTION")? {
870 let sub_name = self.expect_ident_or_keyword()?;
874 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
875 Ok(AlterOperation::AddSubscription {
876 name: sub_name,
877 descriptor,
878 })
879 } else if self.consume_ident_ci("SIGNER")? {
880 let pubkey = self.parse_single_signer_hex()?;
882 Ok(AlterOperation::AddSigner { pubkey })
883 } else {
884 let _ = self.consume(&Token::Column)?;
886 let col_def = self.parse_column_def()?;
887 Ok(AlterOperation::AddColumn(col_def))
888 }
889 } else if self.consume_ident_ci("REVOKE")? {
890 if !self.consume_ident_ci("SIGNER")? {
892 return Err(ParseError::expected(
893 vec!["SIGNER"],
894 self.peek(),
895 self.position(),
896 ));
897 }
898 let pubkey = self.parse_single_signer_hex()?;
899 Ok(AlterOperation::RevokeSigner { pubkey })
900 } else if self.consume(&Token::Drop)? {
901 if self.consume_ident_ci("SUBSCRIPTION")? {
902 let sub_name = self.expect_ident_or_keyword()?;
904 Ok(AlterOperation::DropSubscription { name: sub_name })
905 } else {
906 let _ = self.consume(&Token::Column)?;
908 let col_name = self.expect_ident()?;
909 Ok(AlterOperation::DropColumn(col_name))
910 }
911 } else if self.consume(&Token::Rename)? {
912 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
915 self.expect(Token::To)?;
916 let to = self.expect_ident()?;
917 Ok(AlterOperation::RenameColumn { from, to })
918 } else if self.consume(&Token::Attach)? {
919 self.expect(Token::Partition)?;
921 let child = self.expect_ident()?;
922 self.expect(Token::For)?;
923 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
927 return Err(ParseError::expected(
928 vec!["VALUES"],
929 self.peek(),
930 self.position(),
931 ));
932 }
933 let bound = self.collect_remaining_tokens_as_string()?;
934 Ok(AlterOperation::AttachPartition { child, bound })
935 } else if self.consume(&Token::Detach)? {
936 self.expect(Token::Partition)?;
938 let child = self.expect_ident()?;
939 Ok(AlterOperation::DetachPartition { child })
940 } else if self.consume(&Token::Enable)? {
941 if self.consume_ident_ci("EVENTS")? {
943 Ok(AlterOperation::EnableEvents(
944 self.parse_subscription_descriptor(table_name.to_string())?,
945 ))
946 } else if self.consume_ident_ci("TENANCY")? {
947 self.expect(Token::On)?;
948 self.expect(Token::LParen)?;
949 let mut path = self.expect_ident_or_keyword()?;
951 while self.consume(&Token::Dot)? {
952 let next = self.expect_ident_or_keyword()?;
953 path = format!("{path}.{next}");
954 }
955 self.expect(Token::RParen)?;
956 Ok(AlterOperation::EnableTenancy { column: path })
957 } else {
958 self.expect(Token::Row)?;
959 self.expect(Token::Level)?;
960 self.expect(Token::Security)?;
961 Ok(AlterOperation::EnableRowLevelSecurity)
962 }
963 } else if self.consume(&Token::Disable)? {
964 if self.consume_ident_ci("EVENTS")? {
966 Ok(AlterOperation::DisableEvents)
967 } else if self.consume_ident_ci("TENANCY")? {
968 Ok(AlterOperation::DisableTenancy)
969 } else {
970 self.expect(Token::Row)?;
971 self.expect(Token::Level)?;
972 self.expect(Token::Security)?;
973 Ok(AlterOperation::DisableRowLevelSecurity)
974 }
975 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
976 if self.consume_ident_ci("APPEND_ONLY")? {
979 let on = self.parse_bool_assign()?;
980 Ok(AlterOperation::SetAppendOnly(on))
981 } else if self.consume_ident_ci("VERSIONED")? {
982 let on = self.parse_bool_assign()?;
983 Ok(AlterOperation::SetVersioned(on))
984 } else if self.consume(&Token::Retention)? {
985 let value = self.parse_float()?;
989 let unit = self.parse_duration_unit()?;
990 Ok(AlterOperation::SetRetention {
991 duration_ms: (value * unit) as u64,
992 })
993 } else {
994 Err(ParseError::expected(
995 vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
996 self.peek(),
997 self.position(),
998 ))
999 }
1000 } else if self.consume_ident_ci("UNSET")? {
1001 if self.consume(&Token::Retention)? {
1003 Ok(AlterOperation::UnsetRetention)
1004 } else {
1005 Err(ParseError::expected(
1006 vec!["RETENTION"],
1007 self.peek(),
1008 self.position(),
1009 ))
1010 }
1011 } else {
1012 Err(ParseError::expected(
1013 vec![
1014 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
1015 "UNSET",
1016 ],
1017 self.peek(),
1018 self.position(),
1019 ))
1020 }
1021 }
1022
1023 fn parse_subscription_descriptor(
1024 &mut self,
1025 source: String,
1026 ) -> Result<SubscriptionDescriptor, ParseError> {
1027 let mut ops_filter = Vec::new();
1028 if self.consume(&Token::LParen)? {
1029 loop {
1030 let op = if self.consume(&Token::Insert)? {
1031 SubscriptionOperation::Insert
1032 } else if self.consume(&Token::Update)? {
1033 SubscriptionOperation::Update
1034 } else if self.consume(&Token::Delete)? {
1035 SubscriptionOperation::Delete
1036 } else {
1037 return Err(ParseError::expected(
1038 vec!["INSERT", "UPDATE", "DELETE"],
1039 self.peek(),
1040 self.position(),
1041 ));
1042 };
1043 ops_filter.push(op);
1044 if !self.consume(&Token::Comma)? {
1045 break;
1046 }
1047 }
1048 self.expect(Token::RParen)?;
1049 }
1050
1051 let target_queue = if self.consume(&Token::To)? {
1052 self.expect_ident()?
1053 } else {
1054 format!("{source}_events")
1055 };
1056
1057 let mut redact_fields = Vec::new();
1058 if self.consume_ident_ci("REDACT")? {
1059 self.expect(Token::LParen)?;
1060 loop {
1061 redact_fields.push(self.parse_dotted_redact_path()?);
1062 if !self.consume(&Token::Comma)? {
1063 break;
1064 }
1065 }
1066 self.expect(Token::RParen)?;
1067 }
1068
1069 let where_filter = if self.consume(&Token::Where)? {
1070 Some(self.collect_subscription_where_filter()?)
1071 } else {
1072 None
1073 };
1074
1075 let all_tenants = if self.consume(&Token::On)? {
1077 self.expect(Token::All)?;
1078 if !self.consume_ident_ci("TENANTS")? {
1079 return Err(ParseError::expected(
1080 vec!["TENANTS"],
1081 self.peek(),
1082 self.position(),
1083 ));
1084 }
1085 true
1086 } else {
1087 false
1088 };
1089
1090 if self.consume_ident_ci("REQUIRES")? {
1092 self.consume_ident_ci("CAPABILITY")?;
1093 self.advance()?;
1095 }
1096
1097 Ok(SubscriptionDescriptor {
1098 name: String::new(),
1099 source,
1100 target_queue,
1101 ops_filter,
1102 where_filter,
1103 redact_fields,
1104 enabled: true,
1105 all_tenants,
1106 })
1107 }
1108
1109 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1111 let mut parts = Vec::new();
1112 if self.consume(&Token::Star)? {
1113 parts.push("*".to_string());
1114 } else {
1115 parts.push(self.expect_ident_or_keyword()?);
1116 }
1117 while self.consume(&Token::Dot)? {
1118 if self.consume(&Token::Star)? {
1119 parts.push("*".to_string());
1120 } else {
1121 parts.push(self.expect_ident_or_keyword()?);
1122 }
1123 }
1124 Ok(parts.join("."))
1125 }
1126
1127 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1128 let mut parts = Vec::new();
1129 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1130 parts.push(self.peek().to_string());
1131 self.advance()?;
1132 }
1133 if parts.is_empty() {
1134 return Err(ParseError::expected(
1135 vec!["predicate"],
1136 self.peek(),
1137 self.position(),
1138 ));
1139 }
1140 Ok(parts.join(" "))
1141 }
1142
1143 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1148 let mut parts: Vec<String> = Vec::new();
1149 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1150 parts.push(self.peek().to_string());
1151 self.advance()?;
1152 }
1153 Ok(parts.join(" "))
1154 }
1155
1156 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1158 let name = self.expect_column_ident()?;
1159 let sql_type = self.parse_column_type()?;
1160 let data_type = sql_type.to_string();
1161
1162 let mut def = CreateColumnDef {
1163 name,
1164 data_type,
1165 sql_type: sql_type.clone(),
1166 not_null: false,
1167 default: None,
1168 compress: None,
1169 unique: false,
1170 primary_key: false,
1171 enum_variants: sql_type.enum_variants().unwrap_or_default(),
1172 array_element: sql_type.array_element_type(),
1173 decimal_precision: sql_type.decimal_precision(),
1174 };
1175
1176 loop {
1178 if self.match_not_null()? {
1179 def.not_null = true;
1180 } else if self.consume(&Token::Default)? {
1181 self.expect(Token::Eq)?;
1182 def.default = Some(self.parse_literal_string_for_ddl()?);
1183 } else if self.consume(&Token::Compress)? {
1184 self.expect(Token::Colon)?;
1185 def.compress = Some(self.parse_integer()? as u8);
1186 } else if self.consume(&Token::Unique)? {
1187 def.unique = true;
1188 } else if self.match_primary_key()? {
1189 def.primary_key = true;
1190 } else {
1191 break;
1192 }
1193 }
1194
1195 Ok(def)
1196 }
1197
1198 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1200 let type_name = self.expect_ident_or_keyword()?;
1201 if self.consume(&Token::LParen)? {
1202 let inner = self.parse_type_params()?;
1203 self.expect(Token::RParen)?;
1204 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1205 } else {
1206 Ok(SqlTypeName::new(type_name))
1207 }
1208 }
1209
1210 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1212 let mut parts = Vec::new();
1213 loop {
1214 match self.peek().clone() {
1215 Token::String(s) => {
1216 let s = s.clone();
1217 self.advance()?;
1218 parts.push(TypeModifier::StringLiteral(s));
1219 }
1220 Token::Integer(n) => {
1221 self.advance()?;
1222 parts.push(TypeModifier::Number(n as u32));
1223 }
1224 _ => {
1225 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1226 }
1227 }
1228 if !self.consume(&Token::Comma)? {
1229 break;
1230 }
1231 }
1232 Ok(parts)
1233 }
1234
1235 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1237 match self.peek().clone() {
1238 Token::String(s) => {
1239 let s = s.clone();
1240 self.advance()?;
1241 Ok(s)
1242 }
1243 Token::Integer(n) => {
1244 self.advance()?;
1245 Ok(n.to_string())
1246 }
1247 Token::Float(n) => {
1248 self.advance()?;
1249 Ok(n.to_string())
1250 }
1251 Token::True => {
1252 self.advance()?;
1253 Ok("true".to_string())
1254 }
1255 Token::False => {
1256 self.advance()?;
1257 Ok("false".to_string())
1258 }
1259 Token::Null => {
1260 self.advance()?;
1261 Ok("null".to_string())
1262 }
1263 ref other => Err(ParseError::expected(
1264 vec!["string", "number", "true", "false", "null"],
1265 other,
1266 self.position(),
1267 )),
1268 }
1269 }
1270
1271 fn check_ttl_keyword(&self) -> bool {
1272 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1273 }
1274
1275 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1278 self.expect(Token::Eq)?;
1279 match self.peek() {
1280 Token::True => {
1281 self.advance()?;
1282 Ok(true)
1283 }
1284 Token::False => {
1285 self.advance()?;
1286 Ok(false)
1287 }
1288 other => Err(ParseError::expected(
1289 vec!["true", "false"],
1290 other,
1291 self.position(),
1292 )),
1293 }
1294 }
1295
1296 fn parse_ai_string_list(&mut self) -> Result<Vec<String>, ParseError> {
1300 self.expect(Token::LParen)?;
1301 let mut out = Vec::new();
1302 loop {
1303 out.push(self.parse_string()?);
1304 if !self.consume(&Token::Comma)? {
1305 break;
1306 }
1307 }
1308 self.expect(Token::RParen)?;
1309 Ok(out)
1310 }
1311
1312 fn parse_ai_bool(&mut self) -> Result<bool, ParseError> {
1314 match self.peek() {
1315 Token::True => {
1316 self.advance()?;
1317 Ok(true)
1318 }
1319 Token::False => {
1320 self.advance()?;
1321 Ok(false)
1322 }
1323 other => Err(ParseError::expected(
1324 vec!["true", "false"],
1325 other,
1326 self.position(),
1327 )),
1328 }
1329 }
1330
1331 fn parse_ai_word(&mut self) -> Result<String, ParseError> {
1334 if matches!(self.peek(), Token::String(_)) {
1335 self.parse_string()
1336 } else {
1337 self.expect_ident_or_keyword()
1338 }
1339 }
1340
1341 fn parse_ai_embed_policy(&mut self) -> Result<reddb_types::catalog::EmbedPolicy, ParseError> {
1343 self.expect(Token::LParen)?;
1344 let mut fields: Vec<String> = Vec::new();
1345 let mut provider: Option<String> = None;
1346 let mut model: Option<String> = None;
1347 loop {
1348 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1349 self.expect(Token::Eq)?;
1350 match key.as_str() {
1351 "fields" => fields = self.parse_ai_string_list()?,
1352 "provider" => provider = Some(self.parse_string()?),
1353 "model" => model = Some(self.parse_string()?),
1354 other => {
1355 return Err(ParseError::new(
1356 format!(
1357 "unsupported EMBED policy option {other:?}; supported: fields, provider, model"
1358 ),
1359 self.position(),
1360 ));
1361 }
1362 }
1363 if !self.consume(&Token::Comma)? {
1364 break;
1365 }
1366 }
1367 self.expect(Token::RParen)?;
1368 if fields.is_empty() {
1369 return Err(ParseError::new(
1370 "EMBED policy requires fields = ('<col>', ...)".to_string(),
1371 self.position(),
1372 ));
1373 }
1374 let provider = provider.ok_or_else(|| {
1375 ParseError::new(
1376 "EMBED policy requires provider = '<token>'".to_string(),
1377 self.position(),
1378 )
1379 })?;
1380 let model = model.ok_or_else(|| {
1381 ParseError::new(
1382 "EMBED policy requires model = '<name>'".to_string(),
1383 self.position(),
1384 )
1385 })?;
1386 Ok(reddb_types::catalog::EmbedPolicy {
1387 fields,
1388 provider,
1389 model,
1390 })
1391 }
1392
1393 fn parse_ai_moderate_policy(
1395 &mut self,
1396 ) -> Result<reddb_types::catalog::ModeratePolicy, ParseError> {
1397 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1398 self.expect(Token::LParen)?;
1399 let mut fields: Vec<String> = Vec::new();
1400 let mut provider: Option<String> = None;
1401 let mut model: Option<String> = None;
1402 let mut sync_gate = false;
1403 let mut degraded_mode = ModerateDegradedMode::default();
1404 let mut reject_action = ModerateRejectAction::default();
1405 let mut hard_delete_on_reject = false;
1406 loop {
1407 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1408 self.expect(Token::Eq)?;
1409 match key.as_str() {
1410 "fields" => fields = self.parse_ai_string_list()?,
1411 "provider" => provider = Some(self.parse_string()?),
1412 "model" => model = Some(self.parse_string()?),
1413 "sync" | "sync_gate" => sync_gate = self.parse_ai_bool()?,
1414 "hard_delete" | "hard_delete_on_reject" => {
1415 hard_delete_on_reject = self.parse_ai_bool()?
1416 }
1417 "degraded" | "degraded_mode" => {
1418 let word = self.parse_ai_word()?;
1419 degraded_mode = ModerateDegradedMode::from_str(&word).ok_or_else(|| {
1420 ParseError::new(
1421 format!(
1422 "unsupported MODERATE degraded mode {word:?}; supported: open, closed"
1423 ),
1424 self.position(),
1425 )
1426 })?;
1427 }
1428 "on_reject" | "reject_action" => {
1429 let word = self.parse_ai_word()?;
1430 reject_action = ModerateRejectAction::from_str(&word).ok_or_else(|| {
1431 ParseError::new(
1432 format!(
1433 "unsupported MODERATE reject action {word:?}; supported: reject, flag, redact"
1434 ),
1435 self.position(),
1436 )
1437 })?;
1438 }
1439 other => {
1440 return Err(ParseError::new(
1441 format!(
1442 "unsupported MODERATE policy option {other:?}; supported: fields, provider, model, sync, degraded, on_reject, hard_delete"
1443 ),
1444 self.position(),
1445 ));
1446 }
1447 }
1448 if !self.consume(&Token::Comma)? {
1449 break;
1450 }
1451 }
1452 self.expect(Token::RParen)?;
1453 if fields.is_empty() {
1454 return Err(ParseError::new(
1455 "MODERATE policy requires fields = ('<col>', ...)".to_string(),
1456 self.position(),
1457 ));
1458 }
1459 let provider = provider.ok_or_else(|| {
1460 ParseError::new(
1461 "MODERATE policy requires provider = '<token>'".to_string(),
1462 self.position(),
1463 )
1464 })?;
1465 let model = model.ok_or_else(|| {
1466 ParseError::new(
1467 "MODERATE policy requires model = '<name>'".to_string(),
1468 self.position(),
1469 )
1470 })?;
1471 Ok(reddb_types::catalog::ModeratePolicy {
1472 fields,
1473 provider,
1474 model,
1475 sync_gate,
1476 degraded_mode,
1477 reject_action,
1478 hard_delete_on_reject,
1479 })
1480 }
1481
1482 fn parse_ai_vision_policy(&mut self) -> Result<reddb_types::catalog::VisionPolicy, ParseError> {
1484 self.expect(Token::LParen)?;
1485 let mut image_field: Option<String> = None;
1486 let mut output_kinds: Vec<String> = Vec::new();
1487 let mut provider: Option<String> = None;
1488 let mut model: Option<String> = None;
1489 loop {
1490 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1491 self.expect(Token::Eq)?;
1492 match key.as_str() {
1493 "image_field" => image_field = Some(self.parse_string()?),
1494 "outputs" | "output_kinds" => output_kinds = self.parse_ai_string_list()?,
1495 "provider" => provider = Some(self.parse_string()?),
1496 "model" => model = Some(self.parse_string()?),
1497 other => {
1498 return Err(ParseError::new(
1499 format!(
1500 "unsupported VISION policy option {other:?}; supported: image_field, outputs, provider, model"
1501 ),
1502 self.position(),
1503 ));
1504 }
1505 }
1506 if !self.consume(&Token::Comma)? {
1507 break;
1508 }
1509 }
1510 self.expect(Token::RParen)?;
1511 let image_field = image_field.ok_or_else(|| {
1512 ParseError::new(
1513 "VISION policy requires image_field = '<col>'".to_string(),
1514 self.position(),
1515 )
1516 })?;
1517 if output_kinds.is_empty() {
1518 return Err(ParseError::new(
1519 "VISION policy requires outputs = ('<kind>', ...)".to_string(),
1520 self.position(),
1521 ));
1522 }
1523 let provider = provider.ok_or_else(|| {
1524 ParseError::new(
1525 "VISION policy requires provider = '<token>'".to_string(),
1526 self.position(),
1527 )
1528 })?;
1529 let model = model.ok_or_else(|| {
1530 ParseError::new(
1531 "VISION policy requires model = '<name>'".to_string(),
1532 self.position(),
1533 )
1534 })?;
1535 Ok(reddb_types::catalog::VisionPolicy {
1536 image_field,
1537 output_kinds,
1538 provider,
1539 model,
1540 })
1541 }
1542
1543 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1544 if self.consume_ident_ci(expected)? {
1545 Ok(())
1546 } else {
1547 Err(ParseError::expected(
1548 vec![expected],
1549 self.peek(),
1550 self.position(),
1551 ))
1552 }
1553 }
1554
1555 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1556 let option_name = self.expect_ident_or_keyword()?;
1557 if !option_name.eq_ignore_ascii_case("ttl") {
1558 return Err(ParseError::new(
1559 format!(
1563 "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1564 ),
1565 self.position(),
1566 ));
1567 }
1568
1569 let ttl_value = self.parse_float()?;
1570 let ttl_unit = match self.peek() {
1571 Token::Ident(unit) => {
1572 let unit = unit.clone();
1573 self.advance()?;
1574 unit
1575 }
1576 _ => "s".to_string(),
1577 };
1578
1579 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1580 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1581 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1582 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1583 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1584 "d" | "day" | "days" => 86_400_000.0,
1585 other => {
1586 return Err(ParseError::new(
1587 format!(
1591 "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1592 ),
1593 self.position(),
1594 ));
1595 }
1596 };
1597
1598 if !ttl_value.is_finite() || ttl_value < 0.0 {
1599 return Err(ParseError::new(
1600 "TTL must be a finite, non-negative duration".to_string(),
1601 self.position(),
1602 ));
1603 }
1604
1605 let ttl_ms = ttl_value * multiplier_ms;
1606 if ttl_ms > u64::MAX as f64 {
1607 return Err(ParseError::new(
1608 "TTL duration is too large".to_string(),
1609 self.position(),
1610 ));
1611 }
1612 if ttl_ms.fract().abs() >= f64::EPSILON {
1613 return Err(ParseError::new(
1614 "TTL duration must resolve to a whole number of milliseconds".to_string(),
1615 self.position(),
1616 ));
1617 }
1618
1619 Ok(Some(ttl_ms as u64))
1620 }
1621
1622 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1624 if self.check(&Token::If) {
1625 self.advance()?;
1626 self.expect(Token::Not)?;
1627 self.expect(Token::Exists)?;
1628 Ok(true)
1629 } else {
1630 Ok(false)
1631 }
1632 }
1633
1634 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1636 if self.check(&Token::If) {
1637 self.advance()?;
1638 self.expect(Token::Exists)?;
1639 Ok(true)
1640 } else {
1641 Ok(false)
1642 }
1643 }
1644
1645 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1647 if self.check(&Token::Not) {
1648 self.advance()?; if self.check(&Token::Null) {
1652 self.advance()?; Ok(true)
1654 } else {
1655 Err(ParseError::expected(
1659 vec!["NULL (after NOT)"],
1660 self.peek(),
1661 self.position(),
1662 ))
1663 }
1664 } else {
1665 Ok(false)
1666 }
1667 }
1668
1669 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1671 if self.check(&Token::Primary) {
1672 self.advance()?;
1673 self.expect(Token::Key)?;
1674 Ok(true)
1675 } else {
1676 Ok(false)
1677 }
1678 }
1679}
1680
1681fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1686 if s.len() != 64 {
1687 return Err(format!("expected 64 hex chars, got {}", s.len()));
1688 }
1689 let mut out = [0u8; 32];
1690 let bytes = s.as_bytes();
1691 for i in 0..32 {
1692 let hi = hex_nibble(bytes[i * 2])?;
1693 let lo = hex_nibble(bytes[i * 2 + 1])?;
1694 out[i] = (hi << 4) | lo;
1695 }
1696 Ok(out)
1697}
1698
1699fn hex_nibble(c: u8) -> Result<u8, String> {
1700 match c {
1701 b'0'..=b'9' => Ok(c - b'0'),
1702 b'a'..=b'f' => Ok(c - b'a' + 10),
1703 b'A'..=b'F' => Ok(c - b'A' + 10),
1704 _ => Err(format!("non-hex char: {:?}", c as char)),
1705 }
1706}
1707
1708#[cfg(test)]
1709mod tests {
1710 use super::*;
1711 use reddb_types::catalog::{AnalyticsOutput, CollectionModel, SubscriptionOperation};
1712
1713 fn parser(input: &str) -> Parser<'_> {
1714 Parser::new(input).unwrap_or_else(|err| panic!("failed to lex {input:?}: {err:?}"))
1715 }
1716
1717 #[test]
1718 fn parse_create_table_body_parenthesized_options_and_trailing_clauses() {
1719 let QueryExpr::CreateTable(table) = parser(
1720 "IF NOT EXISTS events (id INT, tenant_meta TEXT) \
1721 WITH (tenant_by = 'tenant_id', append_only = true, timestamps = false) \
1722 PARTITION BY HASH (id) TENANT BY (tenant_meta.tenant)",
1723 )
1724 .parse_create_table_body()
1725 .expect("create table body") else {
1726 panic!("Expected CreateTableQuery");
1727 };
1728
1729 assert_eq!(table.name, "events");
1730 assert!(table.if_not_exists);
1731 assert!(table.append_only);
1732 assert!(!table.timestamps);
1733 assert_eq!(table.tenant_by.as_deref(), Some("tenant_id"));
1734 assert_eq!(
1735 table
1736 .partition_by
1737 .as_ref()
1738 .map(|spec| (spec.kind, spec.column.as_str())),
1739 Some((PartitionKind::Hash, "id"))
1740 );
1741
1742 let err = parser("bad (id INT) WITH (tenant_by = 42)")
1743 .parse_create_table_body()
1744 .unwrap_err();
1745 assert!(
1746 err.to_string()
1747 .contains("WITH tenant_by expects a text literal"),
1748 "{err}"
1749 );
1750 }
1751
1752 #[test]
1753 fn parse_create_table_ai_policy_round_trips_all_modalities() {
1754 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1755 let QueryExpr::CreateTable(table) = parser(
1756 "posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
1757 EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
1758 MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag, hard_delete = true), \
1759 VISION (image_field = 'photo', outputs = ('caption', 'tags'), provider = 'openai', model = 'gpt-4o') \
1760 )",
1761 )
1762 .parse_create_table_body()
1763 .expect("create table body with ai policy") else {
1764 panic!("Expected CreateTableQuery");
1765 };
1766
1767 let policy = table.ai_policy.expect("ai policy present");
1768
1769 let embed = policy.embed.expect("embed block");
1770 assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
1771 assert_eq!(embed.provider, "openai");
1772 assert_eq!(embed.model, "text-embedding-3-small");
1773
1774 let moderate = policy.moderate.expect("moderate block");
1775 assert_eq!(moderate.fields, vec!["body".to_string()]);
1776 assert!(moderate.sync_gate);
1777 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
1778 assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
1779 assert!(moderate.hard_delete_on_reject);
1780
1781 let vision = policy.vision.expect("vision block");
1782 assert_eq!(vision.image_field, "photo");
1783 assert_eq!(
1784 vision.output_kinds,
1785 vec!["caption".to_string(), "tags".to_string()]
1786 );
1787 assert_eq!(vision.model, "gpt-4o");
1788 }
1789
1790 #[test]
1791 fn parse_moderate_policy_aliases_and_error_branches() {
1792 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1793 let QueryExpr::CreateTable(table) = parser(
1795 "t (id INT, body TEXT) WITH ( \
1796 MODERATE (fields = ('body'), provider = 'openai', model = 'm', \
1797 sync_gate = true, degraded_mode = open, reject_action = redact, \
1798 hard_delete_on_reject = true) \
1799 )",
1800 )
1801 .parse_create_table_body()
1802 .expect("alias spellings parse") else {
1803 panic!("Expected CreateTableQuery");
1804 };
1805 let moderate = table
1806 .ai_policy
1807 .expect("policy")
1808 .moderate
1809 .expect("moderate block");
1810 assert!(moderate.sync_gate);
1811 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1812 assert_eq!(moderate.reject_action, ModerateRejectAction::Redact);
1813 assert!(moderate.hard_delete_on_reject);
1814
1815 for (sql, needle) in [
1817 (
1818 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1819 "unsupported MODERATE policy option",
1820 ),
1821 (
1822 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', degraded = sideways))",
1823 "unsupported MODERATE degraded mode",
1824 ),
1825 (
1826 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', on_reject = explode))",
1827 "unsupported MODERATE reject action",
1828 ),
1829 (
1830 "t (id INT, body TEXT) WITH (MODERATE (provider = 'o', model = 'm'))",
1831 "MODERATE policy requires fields",
1832 ),
1833 ] {
1834 let err = parser(sql)
1835 .parse_create_table_body()
1836 .expect_err("moderate policy error");
1837 assert!(format!("{err}").contains(needle), "got: {err}");
1838 }
1839 }
1840
1841 #[test]
1842 fn parse_embed_and_vision_policy_error_branches() {
1843 for (sql, needle) in [
1846 (
1847 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1848 "unsupported EMBED policy option",
1849 ),
1850 (
1851 "t (id INT, body TEXT) WITH (EMBED (provider = 'o', model = 'm'))",
1852 "EMBED policy requires fields",
1853 ),
1854 (
1855 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), model = 'm'))",
1856 "EMBED policy requires provider",
1857 ),
1858 (
1859 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o'))",
1860 "EMBED policy requires model",
1861 ),
1862 (
1863 "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', provider = 'o', model = 'm', bogus = 1))",
1864 "unsupported VISION policy option",
1865 ),
1866 (
1867 "t (id INT, photo TEXT) WITH (VISION (provider = 'o', model = 'm'))",
1868 "VISION policy requires image_field",
1869 ),
1870 ] {
1871 let err = parser(sql)
1872 .parse_create_table_body()
1873 .expect_err("ai policy error");
1874 assert!(format!("{err}").contains(needle), "got: {err}");
1875 }
1876
1877 let QueryExpr::CreateTable(table) = parser(
1879 "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', \
1880 output_kinds = ('caption'), provider = 'o', model = 'm'))",
1881 )
1882 .parse_create_table_body()
1883 .expect("vision output_kinds alias") else {
1884 panic!("Expected CreateTableQuery");
1885 };
1886 let vision = table
1887 .ai_policy
1888 .expect("policy")
1889 .vision
1890 .expect("vision block");
1891 assert_eq!(vision.output_kinds, vec!["caption".to_string()]);
1892 }
1893
1894 #[test]
1895 fn parse_create_table_ai_policy_defaults_and_no_clause() {
1896 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1897 let QueryExpr::CreateTable(table) = parser(
1899 "msgs (id INT, body TEXT) WITH ( \
1900 MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest') \
1901 )",
1902 )
1903 .parse_create_table_body()
1904 .expect("create table body") else {
1905 panic!("Expected CreateTableQuery");
1906 };
1907 let moderate = table
1908 .ai_policy
1909 .expect("policy")
1910 .moderate
1911 .expect("moderate block");
1912 assert!(!moderate.sync_gate);
1913 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1914 assert_eq!(moderate.reject_action, ModerateRejectAction::Reject);
1915 assert!(!moderate.hard_delete_on_reject);
1916
1917 let QueryExpr::CreateTable(plain) = parser("plain (id INT)")
1919 .parse_create_table_body()
1920 .expect("create table body")
1921 else {
1922 panic!("Expected CreateTableQuery");
1923 };
1924 assert!(plain.ai_policy.is_none());
1925 }
1926
1927 #[test]
1928 fn parse_create_table_ai_policy_rejects_malformed_clauses() {
1929 let err = parser("t (id INT) WITH (EMBED (fields = ('body'), model = 'm'))")
1931 .parse_create_table_body()
1932 .unwrap_err();
1933 assert!(
1934 err.to_string().contains("EMBED policy requires provider"),
1935 "{err}"
1936 );
1937
1938 let err = parser(
1940 "t (id INT) WITH (VISION (image_field = 'p', outputs = ('caption'), provider = 'openai', model = 'm', bogus = 1))",
1941 )
1942 .parse_create_table_body()
1943 .unwrap_err();
1944 assert!(
1945 err.to_string().contains("unsupported VISION policy option"),
1946 "{err}"
1947 );
1948
1949 let err = parser(
1951 "t (id INT) WITH (MODERATE (fields = ('body'), provider = 'openai', model = 'm', degraded = maybe))",
1952 )
1953 .parse_create_table_body()
1954 .unwrap_err();
1955 assert!(
1956 err.to_string()
1957 .contains("unsupported MODERATE degraded mode"),
1958 "{err}"
1959 );
1960
1961 let err = parser(
1963 "t (id INT) WITH (EMBED (fields = ('a'), provider = 'openai', model = 'm'), EMBED (fields = ('b'), provider = 'openai', model = 'm'))",
1964 )
1965 .parse_create_table_body()
1966 .unwrap_err();
1967 assert!(err.to_string().contains("duplicate EMBED clause"), "{err}");
1968 }
1969
1970 #[test]
1971 fn parse_keyed_bodies_cover_vault_analytics_and_dotted_drop_names() {
1972 let QueryExpr::CreateTable(vault) =
1973 parser("IF NOT EXISTS tenant.secrets WITH OWN MASTER KEY")
1974 .parse_create_keyed_body(CollectionModel::Vault)
1975 .expect("create vault")
1976 else {
1977 panic!("Expected CreateTableQuery");
1978 };
1979 assert_eq!(vault.collection_model, CollectionModel::Vault);
1980 assert_eq!(vault.name, "tenant.secrets");
1981 assert!(vault.if_not_exists);
1982 assert!(vault.vault_own_master_key);
1983
1984 let QueryExpr::CreateTable(graph) = parser(
1985 "g WITH ANALYTICS (centrality (using = pagerank, max_iterations = 12, tolerance = 0.001))",
1986 )
1987 .parse_create_keyed_body(CollectionModel::Graph)
1988 .expect("create graph")
1989 else {
1990 panic!("Expected CreateTableQuery");
1991 };
1992 assert_eq!(graph.analytics_config.len(), 1);
1993 let view = &graph.analytics_config[0];
1994 assert_eq!(view.output, AnalyticsOutput::Centrality);
1995 assert_eq!(view.algorithm.as_deref(), Some("pagerank"));
1996 assert_eq!(view.max_iterations, Some(12));
1997 assert_eq!(view.tolerance, Some(0.001));
1998
1999 let err = parser("g WITH OTHER")
2000 .parse_create_keyed_body(CollectionModel::Graph)
2001 .unwrap_err();
2002 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2003
2004 assert!(parser("CREATE KV cache WITH ANALYTICS (components)")
2005 .parse()
2006 .unwrap_err()
2007 .to_string()
2008 .contains("Unexpected token after query"));
2009
2010 let QueryExpr::DropKv(drop) = parser("IF EXISTS tenant.cache.*")
2011 .parse_drop_keyed_body(CollectionModel::Kv)
2012 .expect("drop kv")
2013 else {
2014 panic!("Expected DropKvQuery");
2015 };
2016 assert_eq!(drop.name, "tenant.cache.*");
2017 assert!(drop.if_exists);
2018 assert_eq!(drop.model, CollectionModel::Kv);
2019 }
2020
2021 #[test]
2022 fn parse_collection_signed_by_list_and_errors() {
2023 let pk_a = "aa".repeat(32);
2024 let pk_b = "BB".repeat(32);
2025 let QueryExpr::CreateCollection(collection) =
2026 parser(&format!("signed KIND graph SIGNED_BY ('{pk_a}', '{pk_b}')"))
2027 .parse_create_collection_body()
2028 .expect("create collection")
2029 else {
2030 panic!("Expected CreateCollectionQuery");
2031 };
2032 assert_eq!(collection.allowed_signers, vec![[0xaau8; 32], [0xBBu8; 32]]);
2033
2034 let err = parser("signed KIND graph SIGNED_BY (42)")
2035 .parse_create_collection_body()
2036 .unwrap_err();
2037 assert!(
2038 err.to_string()
2039 .contains("string literal (ed25519 pubkey hex)"),
2040 "{err}"
2041 );
2042
2043 let err = parser("signed KIND graph SIGNED_BY ('deadbeef')")
2044 .parse_create_collection_body()
2045 .unwrap_err();
2046 assert!(err.to_string().contains("expected 64 hex chars"), "{err}");
2047 }
2048
2049 #[test]
2050 fn parse_alter_operations_cover_subscriptions_partitions_tenancy_and_signers() {
2051 let pk = "11".repeat(32);
2052 let QueryExpr::AlterTable(alter) = parser(&format!(
2053 "ALTER COLLECTION audit \
2054 ADD SUBSCRIPTION pii TO audit_events REDACT (payload.ssn, *.secret) WHERE level = 'warn', \
2055 DROP SUBSCRIPTION pii, \
2056 ADD SIGNER '{pk}', \
2057 REVOKE SIGNER '{pk}', \
2058 ATTACH PARTITION audit_2026 FOR VALUES FROM (2026) TO (2027), \
2059 DETACH PARTITION audit_2026, \
2060 ENABLE EVENTS (INSERT, UPDATE) TO table_events ON ALL TENANTS, \
2061 DISABLE EVENTS, \
2062 ENABLE TENANCY ON (metadata.tenant), \
2063 DISABLE TENANCY, \
2064 SET APPEND_ONLY = true, \
2065 SET VERSIONED = false, \
2066 SET RETENTION 2 h, \
2067 UNSET RETENTION"
2068 ))
2069 .parse_alter_table_query()
2070 .expect("alter collection")
2071 else {
2072 panic!("Expected AlterTableQuery");
2073 };
2074
2075 assert_eq!(alter.name, "audit");
2076 assert_eq!(alter.operations.len(), 14);
2077 match &alter.operations[0] {
2078 AlterOperation::AddSubscription { name, descriptor } => {
2079 assert_eq!(name, "pii");
2080 assert_eq!(descriptor.target_queue, "audit_events");
2081 assert_eq!(descriptor.redact_fields, vec!["payload.ssn", "*.secret"]);
2082 assert_eq!(descriptor.where_filter.as_deref(), Some("LEVEL = 'warn'"));
2083 }
2084 other => panic!("expected AddSubscription, got {other:?}"),
2085 }
2086 assert!(matches!(
2087 &alter.operations[1],
2088 AlterOperation::DropSubscription { name } if name == "pii"
2089 ));
2090 assert!(matches!(
2091 &alter.operations[2],
2092 AlterOperation::AddSigner { pubkey } if *pubkey == [0x11; 32]
2093 ));
2094 assert!(matches!(
2095 &alter.operations[3],
2096 AlterOperation::RevokeSigner { pubkey } if *pubkey == [0x11; 32]
2097 ));
2098 assert!(matches!(
2099 &alter.operations[4],
2100 AlterOperation::AttachPartition { child, bound }
2101 if child == "audit_2026" && bound == "FROM ( 2026 ) TO ( 2027 )"
2102 ));
2103 assert!(matches!(
2104 &alter.operations[5],
2105 AlterOperation::DetachPartition { child } if child == "audit_2026"
2106 ));
2107 match &alter.operations[6] {
2108 AlterOperation::EnableEvents(descriptor) => {
2109 assert_eq!(
2110 descriptor.ops_filter,
2111 vec![SubscriptionOperation::Insert, SubscriptionOperation::Update]
2112 );
2113 assert_eq!(descriptor.target_queue, "table_events");
2114 assert!(descriptor.all_tenants);
2115 }
2116 other => panic!("expected EnableEvents, got {other:?}"),
2117 }
2118 assert!(matches!(
2119 &alter.operations[7],
2120 AlterOperation::DisableEvents
2121 ));
2122 assert!(matches!(
2123 &alter.operations[8],
2124 AlterOperation::EnableTenancy { column } if column == "METADATA.tenant"
2125 ));
2126 assert!(matches!(
2127 &alter.operations[9],
2128 AlterOperation::DisableTenancy
2129 ));
2130 assert!(matches!(
2131 &alter.operations[10],
2132 AlterOperation::SetAppendOnly(true)
2133 ));
2134 assert!(matches!(
2135 &alter.operations[11],
2136 AlterOperation::SetVersioned(false)
2137 ));
2138 assert!(matches!(
2139 &alter.operations[12],
2140 AlterOperation::SetRetention { duration_ms } if *duration_ms == 7_200_000
2141 ));
2142 assert!(matches!(
2143 &alter.operations[13],
2144 AlterOperation::UnsetRetention
2145 ));
2146 }
2147
2148 #[test]
2149 fn parse_alter_graph_analytics_keyword_errors() {
2150 let err = parser("ALTER GRAPH g ADD centrality")
2151 .parse_alter_graph_query()
2152 .unwrap_err();
2153 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2154
2155 let err = parser("ALTER GRAPH g DROP centrality")
2156 .parse_alter_graph_query()
2157 .unwrap_err();
2158 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2159 }
2160
2161 #[test]
2162 fn decode_hex_32_reports_length_and_character_errors() {
2163 assert_eq!(decode_hex_32(&"0f".repeat(32)).unwrap(), [0x0f; 32]);
2164 assert_eq!(
2165 decode_hex_32("deadbeef").unwrap_err(),
2166 "expected 64 hex chars, got 8"
2167 );
2168 assert!(decode_hex_32(&"gg".repeat(32))
2169 .unwrap_err()
2170 .contains("non-hex char"));
2171 }
2172}