1use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
7 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
8 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
9 PartitionSpec, QueryExpr, TruncateQuery,
10};
11use crate::lexer::Token;
12use reddb_types::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use reddb_types::types::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 analytics_config: Vec::new(),
87 vault_own_master_key: false,
88 ai_policy: None,
89 }))
90 }
91
92 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
94 self.expect(Token::Drop)?;
95 self.expect(Token::Table)?;
96 self.parse_drop_table_body()
97 }
98
99 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
101 let if_not_exists = self.match_if_not_exists()?;
102 let name = self.expect_ident()?;
103
104 self.expect(Token::LParen)?;
105 let mut columns = Vec::new();
106 loop {
107 let col = self.parse_column_def()?;
108 columns.push(col);
109 if !self.consume(&Token::Comma)? {
110 break;
111 }
112 }
113 self.expect(Token::RParen)?;
114
115 let mut default_ttl_ms = None;
116 let mut context_index_fields = Vec::new();
117 let mut context_index_enabled = false;
118 let mut timestamps = false;
119 let mut tenant_by: Option<String> = None;
120 let mut append_only = false;
121 let mut subscriptions = Vec::new();
122 let mut ai_policy = reddb_types::catalog::AiPolicy::default();
123
124 while self.consume(&Token::With)? {
125 if self.consume_ident_ci("EVENTS")? {
126 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
127 continue;
128 }
129 let has_parens = self.consume(&Token::LParen)?;
136
137 loop {
138 if self.consume_ident_ci("CONTEXT_INDEX")? {
139 context_index_enabled = self.parse_bool_assign()?;
140 } else if self.consume_ident_ci("CONTEXT")? {
141 if !self.consume(&Token::Index)? {
142 return Err(ParseError::expected(
143 vec!["INDEX"],
144 self.peek(),
145 self.position(),
146 ));
147 }
148 self.expect(Token::On)?;
149 self.expect(Token::LParen)?;
150 loop {
151 context_index_fields.push(self.expect_ident()?);
152 if !self.consume(&Token::Comma)? {
153 break;
154 }
155 }
156 self.expect(Token::RParen)?;
157 context_index_enabled = true;
158 } else if self.consume_ident_ci("TIMESTAMPS")? {
159 timestamps = self.parse_bool_assign()?;
160 } else if self.consume_ident_ci("EMBED")? {
161 if ai_policy.embed.is_some() {
162 return Err(ParseError::new(
163 "duplicate EMBED clause in AI policy".to_string(),
164 self.position(),
165 ));
166 }
167 ai_policy.embed = Some(self.parse_ai_embed_policy()?);
168 } else if self.consume_ident_ci("MODERATE")? {
169 if ai_policy.moderate.is_some() {
170 return Err(ParseError::new(
171 "duplicate MODERATE clause in AI policy".to_string(),
172 self.position(),
173 ));
174 }
175 ai_policy.moderate = Some(self.parse_ai_moderate_policy()?);
176 } else if self.consume_ident_ci("VISION")? {
177 if ai_policy.vision.is_some() {
178 return Err(ParseError::new(
179 "duplicate VISION clause in AI policy".to_string(),
180 self.position(),
181 ));
182 }
183 ai_policy.vision = Some(self.parse_ai_vision_policy()?);
184 } else if self.consume_ident_ci("APPEND_ONLY")? {
185 append_only = self.parse_bool_assign()?;
186 } else if self.consume_ident_ci("TENANT_BY")? {
187 let _ = self.consume(&Token::Eq)?;
190 let value = self.parse_literal_value()?;
191 match value {
192 Value::Text(col) => tenant_by = Some(col.to_string()),
193 other => {
194 return Err(ParseError::new(
195 format!("WITH tenant_by expects a text literal, got {other:?}"),
196 self.position(),
197 ));
198 }
199 }
200 } else {
201 default_ttl_ms = self.parse_create_table_ttl_clause()?;
202 }
203 if has_parens {
204 if self.consume(&Token::Comma)? {
205 continue;
206 }
207 self.expect(Token::RParen)?;
208 }
209 break;
210 }
211 }
212
213 let partition_by = if self.consume(&Token::Partition)? {
215 self.expect(Token::By)?;
216 let kind = if self.consume(&Token::Range)? {
217 PartitionKind::Range
218 } else if self.consume(&Token::List)? {
219 PartitionKind::List
220 } else if self.consume(&Token::Hash)? {
221 PartitionKind::Hash
222 } else {
223 return Err(ParseError::expected(
224 vec!["RANGE", "LIST", "HASH"],
225 self.peek(),
226 self.position(),
227 ));
228 };
229 self.expect(Token::LParen)?;
230 let column = self.expect_ident()?;
231 self.expect(Token::RParen)?;
232 Some(PartitionSpec { kind, column })
233 } else {
234 None
235 };
236
237 if !append_only && self.consume_ident_ci("APPEND")? {
242 if !self.consume_ident_ci("ONLY")? {
243 return Err(ParseError::expected(
244 vec!["ONLY"],
245 self.peek(),
246 self.position(),
247 ));
248 }
249 append_only = true;
250 }
251
252 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
263 self.expect(Token::By)?;
264 self.expect(Token::LParen)?;
265 let mut path = self.expect_ident_or_keyword()?;
269 while self.consume(&Token::Dot)? {
270 let next = self.expect_ident_or_keyword()?;
271 path = format!("{path}.{next}");
272 }
273 self.expect(Token::RParen)?;
274 tenant_by = Some(path);
275 }
276
277 Ok(QueryExpr::CreateTable(CreateTableQuery {
278 collection_model: CollectionModel::Table,
279 name,
280 columns,
281 if_not_exists,
282 default_ttl_ms,
283 metrics_rollup_policies: Vec::new(),
284 context_index_fields,
285 context_index_enabled,
286 timestamps,
287 partition_by,
288 tenant_by,
289 append_only,
290 subscriptions,
291 analytics_config: Vec::new(),
292 vault_own_master_key: false,
293 ai_policy: (!ai_policy.is_empty()).then_some(ai_policy),
294 }))
295 }
296
297 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
303 self.expect(Token::Explain)?;
304 self.expect(Token::Alter)?;
305 self.expect(Token::For)?;
306 self.expect(Token::Create)?;
307 self.expect(Token::Table)?;
308
309 let body = self.parse_create_table_body()?;
310 let target = match body {
311 QueryExpr::CreateTable(t) => t,
312 _ => {
313 return Err(ParseError::new(
314 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
315 .to_string(),
316 self.position(),
317 ));
318 }
319 };
320
321 let format = if self.consume(&Token::Format)? {
322 if self.consume(&Token::Json)? {
323 ExplainFormat::Json
324 } else if self.consume_ident_ci("SQL")? {
325 ExplainFormat::Sql
326 } else {
327 return Err(ParseError::expected(
328 vec!["JSON", "SQL"],
329 self.peek(),
330 self.position(),
331 ));
332 }
333 } else {
334 ExplainFormat::Sql
335 };
336
337 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
338 target,
339 format,
340 }))
341 }
342
343 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
345 let if_exists = self.match_if_exists()?;
346 let name = self.parse_drop_collection_name()?;
347 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
348 }
349
350 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
351 let if_exists = self.match_if_exists()?;
352 let name = self.parse_drop_collection_name()?;
353 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
354 }
355
356 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
357 let if_exists = self.match_if_exists()?;
358 let name = self.parse_drop_collection_name()?;
359 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
360 }
361
362 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
363 let if_exists = self.match_if_exists()?;
364 let name = self.parse_drop_collection_name()?;
365 Ok(QueryExpr::DropDocument(DropDocumentQuery {
366 name,
367 if_exists,
368 }))
369 }
370
371 pub fn parse_create_keyed_body(
372 &mut self,
373 model: CollectionModel,
374 ) -> Result<QueryExpr, ParseError> {
375 let if_not_exists = self.match_if_not_exists()?;
376 let name = self.parse_drop_collection_name()?;
377 let vault_own_master_key =
378 if model == CollectionModel::Vault && self.consume(&Token::With)? {
379 if !self.consume_ident_ci("OWN")? {
380 return Err(ParseError::expected(
381 vec!["OWN"],
382 self.peek(),
383 self.position(),
384 ));
385 }
386 if !self.consume_ident_ci("MASTER")? {
387 return Err(ParseError::expected(
388 vec!["MASTER"],
389 self.peek(),
390 self.position(),
391 ));
392 }
393 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
394 return Err(ParseError::expected(
395 vec!["KEY"],
396 self.peek(),
397 self.position(),
398 ));
399 }
400 true
401 } else {
402 false
403 };
404 let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
408 if !self.consume_ident_ci("ANALYTICS")? {
409 return Err(ParseError::expected(
410 vec!["ANALYTICS"],
411 self.peek(),
412 self.position(),
413 ));
414 }
415 self.parse_analytics_clause()?
416 } else {
417 Vec::new()
418 };
419 Ok(QueryExpr::CreateTable(CreateTableQuery {
420 collection_model: model,
421 name,
422 columns: Vec::new(),
423 if_not_exists,
424 default_ttl_ms: None,
425 metrics_rollup_policies: Vec::new(),
426 context_index_fields: Vec::new(),
427 context_index_enabled: false,
428 timestamps: false,
429 partition_by: None,
430 tenant_by: None,
431 append_only: false,
432 subscriptions: Vec::new(),
433 analytics_config,
434 vault_own_master_key,
435 ai_policy: None,
436 }))
437 }
438
439 fn parse_analytics_clause(
445 &mut self,
446 ) -> Result<Vec<reddb_types::catalog::AnalyticsViewDescriptor>, ParseError> {
447 use reddb_types::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
448
449 self.expect(Token::LParen)?;
450 let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
451 loop {
452 let output_name = self.parse_analytics_output_name()?;
453 let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
454 ParseError::new(
455 format!(
456 "unknown analytics output '{output_name}': expected communities, components, or centrality"
457 ),
458 self.position(),
459 )
460 })?;
461 if views.iter().any(|view| view.output == output) {
462 return Err(ParseError::new(
463 format!("duplicate analytics output '{output_name}'"),
464 self.position(),
465 ));
466 }
467 let mut view = AnalyticsViewDescriptor {
468 output,
469 algorithm: None,
470 resolution: None,
471 max_iterations: None,
472 tolerance: None,
473 };
474 if self.consume(&Token::LParen)? {
475 loop {
476 let key = self.parse_analytics_option_key()?;
477 self.expect(Token::Eq)?;
478 match key.as_str() {
479 "using" => {
480 view.algorithm =
481 Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
482 }
483 "resolution" => view.resolution = Some(self.parse_float()?),
484 "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
485 "tolerance" => view.tolerance = Some(self.parse_float()?),
486 other => {
487 return Err(ParseError::new(
488 format!(
489 "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
490 ),
491 self.position(),
492 ))
493 }
494 }
495 if !self.consume(&Token::Comma)? {
496 break;
497 }
498 }
499 self.expect(Token::RParen)?;
500 }
501 views.push(view);
502 if !self.consume(&Token::Comma)? {
503 break;
504 }
505 }
506 self.expect(Token::RParen)?;
507 if views.is_empty() {
508 return Err(ParseError::new(
509 "WITH ANALYTICS requires at least one output".to_string(),
510 self.position(),
511 ));
512 }
513 Ok(views)
514 }
515
516 fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
520 match self.peek() {
521 Token::Components => {
522 self.advance()?;
523 Ok("components".to_string())
524 }
525 Token::Centrality => {
526 self.advance()?;
527 Ok("centrality".to_string())
528 }
529 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
530 }
531 }
532
533 fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
536 match self.peek() {
537 Token::Using => {
538 self.advance()?;
539 Ok("using".to_string())
540 }
541 Token::MaxIterations => {
542 self.advance()?;
543 Ok("max_iterations".to_string())
544 }
545 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
546 }
547 }
548
549 pub fn parse_create_collection_model_body(
550 &mut self,
551 model: CollectionModel,
552 ) -> Result<QueryExpr, ParseError> {
553 self.parse_create_keyed_body(model)
554 }
555
556 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
557 let if_not_exists = self.match_if_not_exists()?;
558 let name = self.parse_drop_collection_name()?;
559 if !self.consume_ident_ci("KIND")? {
560 return Err(ParseError::expected(
561 vec!["KIND"],
562 self.peek(),
563 self.position(),
564 ));
565 }
566 let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
567 while self.consume(&Token::Dot)? {
568 let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
569 kind.push('.');
570 kind.push_str(&part);
571 }
572 let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
573 if !self.consume_ident_ci("DIM")? {
574 return Err(ParseError::expected(
575 vec!["DIM"],
576 self.peek(),
577 self.position(),
578 ));
579 }
580 let dimension = self.parse_integer()?;
581 if dimension <= 0 {
582 return Err(ParseError::new(
583 "VECTOR DIM must be a positive integer".to_string(),
584 self.position(),
585 ));
586 }
587 let metric = if self.consume(&Token::Metric)? {
588 self.parse_distance_metric()?
589 } else {
590 reddb_types::distance::DistanceMetric::Cosine
591 };
592 (Some(dimension as usize), Some(metric))
593 } else {
594 (None, None)
595 };
596 let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
597 self.parse_signed_by_list()?
598 } else {
599 Vec::new()
600 };
601 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
602 name,
603 kind,
604 if_not_exists,
605 vector_dimension,
606 vector_metric,
607 allowed_signers,
608 }))
609 }
610
611 fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
615 let hex = match self.peek().clone() {
616 Token::String(s) => {
617 self.advance()?;
618 s
619 }
620 _ => {
621 return Err(ParseError::expected(
622 vec!["string literal (ed25519 pubkey hex)"],
623 self.peek(),
624 self.position(),
625 ));
626 }
627 };
628 decode_hex_32(&hex).map_err(|msg| {
629 ParseError::new(
630 format!("SIGNER pubkey '{hex}' invalid: {msg}"),
631 self.position(),
632 )
633 })
634 }
635
636 fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
641 self.expect(Token::LParen)?;
642 let mut out = Vec::new();
643 loop {
644 let hex = match self.peek().clone() {
645 Token::String(s) => {
646 self.advance()?;
647 s
648 }
649 _ => {
650 return Err(ParseError::expected(
651 vec!["string literal (ed25519 pubkey hex)"],
652 self.peek(),
653 self.position(),
654 ));
655 }
656 };
657 let bytes = decode_hex_32(&hex).map_err(|msg| {
658 ParseError::new(
659 format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
660 self.position(),
661 )
662 })?;
663 out.push(bytes);
664 if !self.consume(&Token::Comma)? {
665 break;
666 }
667 }
668 self.expect(Token::RParen)?;
669 if out.is_empty() {
670 return Err(ParseError::new(
671 "SIGNED_BY list must contain at least one pubkey".to_string(),
672 self.position(),
673 ));
674 }
675 Ok(out)
676 }
677
678 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
679 let if_not_exists = self.match_if_not_exists()?;
680 let name = self.parse_drop_collection_name()?;
681 if !self.consume_ident_ci("DIM")? {
682 return Err(ParseError::expected(
683 vec!["DIM"],
684 self.peek(),
685 self.position(),
686 ));
687 }
688 let dimension = self.parse_integer()?;
689 if dimension <= 0 {
690 return Err(ParseError::new(
691 "VECTOR DIM must be a positive integer".to_string(),
692 self.position(),
693 ));
694 }
695 let metric = if self.consume(&Token::Metric)? {
696 self.parse_distance_metric()?
697 } else {
698 reddb_types::distance::DistanceMetric::Cosine
699 };
700 Ok(QueryExpr::CreateVector(CreateVectorQuery {
701 name,
702 dimension: dimension as usize,
703 metric,
704 if_not_exists,
705 }))
706 }
707
708 pub fn parse_drop_keyed_body(
709 &mut self,
710 model: CollectionModel,
711 ) -> Result<QueryExpr, ParseError> {
712 let if_exists = self.match_if_exists()?;
713 let name = self.parse_drop_collection_name()?;
714 Ok(QueryExpr::DropKv(DropKvQuery {
715 name,
716 if_exists,
717 model,
718 }))
719 }
720
721 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
722 self.parse_drop_keyed_body(CollectionModel::Kv)
723 }
724
725 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
726 self.parse_drop_collection_model_body(None)
727 }
728
729 pub fn parse_drop_collection_model_body(
730 &mut self,
731 model: Option<CollectionModel>,
732 ) -> Result<QueryExpr, ParseError> {
733 let if_exists = self.match_if_exists()?;
734 let name = self.parse_drop_collection_name()?;
735 Ok(QueryExpr::DropCollection(DropCollectionQuery {
736 name,
737 if_exists,
738 model,
739 }))
740 }
741
742 pub fn parse_truncate_body(
743 &mut self,
744 model: Option<CollectionModel>,
745 ) -> Result<QueryExpr, ParseError> {
746 let if_exists = self.match_if_exists()?;
747 let name = self.parse_drop_collection_name()?;
748 Ok(QueryExpr::Truncate(TruncateQuery {
749 name,
750 model,
751 if_exists,
752 }))
753 }
754
755 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
756 let mut name = self.expect_ident()?;
757 while self.consume(&Token::Dot)? {
758 if self.consume(&Token::Star)? {
759 name.push_str(".*");
760 break;
761 }
762 let next = self.expect_ident_or_keyword()?;
763 name = format!("{name}.{next}");
764 }
765 Ok(name)
766 }
767
768 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
775 self.expect(Token::Alter)?;
776 if !self.consume(&Token::Table)?
777 && !self.consume(&Token::Collection)?
778 && !self.consume_ident_ci("COLLECTION")?
779 {
780 return Err(ParseError::expected(
781 vec!["TABLE", "COLLECTION"],
782 self.peek(),
783 self.position(),
784 ));
785 }
786 let name = self.expect_ident()?;
787
788 let mut operations = Vec::new();
789 loop {
790 let op = self.parse_alter_operation(&name)?;
791 operations.push(op);
792 if !self.consume(&Token::Comma)? {
793 break;
794 }
795 }
796
797 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
798 }
799
800 pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
809 self.expect(Token::Alter)?;
810 self.expect(Token::Graph)?;
811 let name = self.expect_ident()?;
812
813 let mut operations = Vec::new();
814 loop {
815 operations.push(self.parse_alter_graph_operation()?);
816 if !self.consume(&Token::Comma)? {
817 break;
818 }
819 }
820
821 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
822 }
823
824 fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
827 if self.consume(&Token::Add)? {
828 if !self.consume_ident_ci("ANALYTICS")? {
829 return Err(ParseError::expected(
830 vec!["ANALYTICS"],
831 self.peek(),
832 self.position(),
833 ));
834 }
835 let views = self.parse_analytics_clause()?;
838 Ok(AlterOperation::AddAnalytics(views))
839 } else if self.consume(&Token::Drop)? {
840 if !self.consume_ident_ci("ANALYTICS")? {
841 return Err(ParseError::expected(
842 vec!["ANALYTICS"],
843 self.peek(),
844 self.position(),
845 ));
846 }
847 let output_name = self.parse_analytics_output_name()?;
848 let output = reddb_types::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
849 ParseError::new(
850 format!(
851 "unknown analytics output '{output_name}': expected communities, components, or centrality"
852 ),
853 self.position(),
854 )
855 })?;
856 Ok(AlterOperation::DropAnalytics(output))
857 } else {
858 Err(ParseError::expected(
859 vec!["ADD", "DROP"],
860 self.peek(),
861 self.position(),
862 ))
863 }
864 }
865
866 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
868 if self.consume(&Token::Add)? {
869 if self.consume_ident_ci("SUBSCRIPTION")? {
870 let sub_name = self.expect_ident()?;
872 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
873 Ok(AlterOperation::AddSubscription {
874 name: sub_name,
875 descriptor,
876 })
877 } else if self.consume_ident_ci("SIGNER")? {
878 let pubkey = self.parse_single_signer_hex()?;
880 Ok(AlterOperation::AddSigner { pubkey })
881 } else {
882 let _ = self.consume(&Token::Column)?;
884 let col_def = self.parse_column_def()?;
885 Ok(AlterOperation::AddColumn(col_def))
886 }
887 } else if self.consume_ident_ci("REVOKE")? {
888 if !self.consume_ident_ci("SIGNER")? {
890 return Err(ParseError::expected(
891 vec!["SIGNER"],
892 self.peek(),
893 self.position(),
894 ));
895 }
896 let pubkey = self.parse_single_signer_hex()?;
897 Ok(AlterOperation::RevokeSigner { pubkey })
898 } else if self.consume(&Token::Drop)? {
899 if self.consume_ident_ci("SUBSCRIPTION")? {
900 let sub_name = self.expect_ident()?;
902 Ok(AlterOperation::DropSubscription { name: sub_name })
903 } else {
904 let _ = self.consume(&Token::Column)?;
906 let col_name = self.expect_ident()?;
907 Ok(AlterOperation::DropColumn(col_name))
908 }
909 } else if self.consume(&Token::Rename)? {
910 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
913 self.expect(Token::To)?;
914 let to = self.expect_ident()?;
915 Ok(AlterOperation::RenameColumn { from, to })
916 } else if self.consume(&Token::Attach)? {
917 self.expect(Token::Partition)?;
919 let child = self.expect_ident()?;
920 self.expect(Token::For)?;
921 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
925 return Err(ParseError::expected(
926 vec!["VALUES"],
927 self.peek(),
928 self.position(),
929 ));
930 }
931 let bound = self.collect_remaining_tokens_as_string()?;
932 Ok(AlterOperation::AttachPartition { child, bound })
933 } else if self.consume(&Token::Detach)? {
934 self.expect(Token::Partition)?;
936 let child = self.expect_ident()?;
937 Ok(AlterOperation::DetachPartition { child })
938 } else if self.consume(&Token::Enable)? {
939 if self.consume_ident_ci("EVENTS")? {
941 Ok(AlterOperation::EnableEvents(
942 self.parse_subscription_descriptor(table_name.to_string())?,
943 ))
944 } else if self.consume_ident_ci("TENANCY")? {
945 self.expect(Token::On)?;
946 self.expect(Token::LParen)?;
947 let mut path = self.expect_ident_or_keyword()?;
949 while self.consume(&Token::Dot)? {
950 let next = self.expect_ident_or_keyword()?;
951 path = format!("{path}.{next}");
952 }
953 self.expect(Token::RParen)?;
954 Ok(AlterOperation::EnableTenancy { column: path })
955 } else {
956 self.expect(Token::Row)?;
957 self.expect(Token::Level)?;
958 self.expect(Token::Security)?;
959 Ok(AlterOperation::EnableRowLevelSecurity)
960 }
961 } else if self.consume(&Token::Disable)? {
962 if self.consume_ident_ci("EVENTS")? {
964 Ok(AlterOperation::DisableEvents)
965 } else if self.consume_ident_ci("TENANCY")? {
966 Ok(AlterOperation::DisableTenancy)
967 } else {
968 self.expect(Token::Row)?;
969 self.expect(Token::Level)?;
970 self.expect(Token::Security)?;
971 Ok(AlterOperation::DisableRowLevelSecurity)
972 }
973 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
974 if self.consume_ident_ci("APPEND_ONLY")? {
977 let on = self.parse_bool_assign()?;
978 Ok(AlterOperation::SetAppendOnly(on))
979 } else if self.consume_ident_ci("VERSIONED")? {
980 let on = self.parse_bool_assign()?;
981 Ok(AlterOperation::SetVersioned(on))
982 } else if self.consume(&Token::Retention)? {
983 let value = self.parse_float()?;
987 let unit = self.parse_duration_unit()?;
988 Ok(AlterOperation::SetRetention {
989 duration_ms: (value * unit) as u64,
990 })
991 } else {
992 Err(ParseError::expected(
993 vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
994 self.peek(),
995 self.position(),
996 ))
997 }
998 } else if self.consume_ident_ci("UNSET")? {
999 if self.consume(&Token::Retention)? {
1001 Ok(AlterOperation::UnsetRetention)
1002 } else {
1003 Err(ParseError::expected(
1004 vec!["RETENTION"],
1005 self.peek(),
1006 self.position(),
1007 ))
1008 }
1009 } else {
1010 Err(ParseError::expected(
1011 vec![
1012 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
1013 "UNSET",
1014 ],
1015 self.peek(),
1016 self.position(),
1017 ))
1018 }
1019 }
1020
1021 fn parse_subscription_descriptor(
1022 &mut self,
1023 source: String,
1024 ) -> Result<SubscriptionDescriptor, ParseError> {
1025 let mut ops_filter = Vec::new();
1026 if self.consume(&Token::LParen)? {
1027 loop {
1028 let op = if self.consume(&Token::Insert)? {
1029 SubscriptionOperation::Insert
1030 } else if self.consume(&Token::Update)? {
1031 SubscriptionOperation::Update
1032 } else if self.consume(&Token::Delete)? {
1033 SubscriptionOperation::Delete
1034 } else {
1035 return Err(ParseError::expected(
1036 vec!["INSERT", "UPDATE", "DELETE"],
1037 self.peek(),
1038 self.position(),
1039 ));
1040 };
1041 ops_filter.push(op);
1042 if !self.consume(&Token::Comma)? {
1043 break;
1044 }
1045 }
1046 self.expect(Token::RParen)?;
1047 }
1048
1049 let target_queue = if self.consume(&Token::To)? {
1050 self.expect_ident()?
1051 } else {
1052 format!("{source}_events")
1053 };
1054
1055 let mut redact_fields = Vec::new();
1056 if self.consume_ident_ci("REDACT")? {
1057 self.expect(Token::LParen)?;
1058 loop {
1059 redact_fields.push(self.parse_dotted_redact_path()?);
1060 if !self.consume(&Token::Comma)? {
1061 break;
1062 }
1063 }
1064 self.expect(Token::RParen)?;
1065 }
1066
1067 let where_filter = if self.consume(&Token::Where)? {
1068 Some(self.collect_subscription_where_filter()?)
1069 } else {
1070 None
1071 };
1072
1073 let all_tenants = if self.consume(&Token::On)? {
1075 self.expect(Token::All)?;
1076 if !self.consume_ident_ci("TENANTS")? {
1077 return Err(ParseError::expected(
1078 vec!["TENANTS"],
1079 self.peek(),
1080 self.position(),
1081 ));
1082 }
1083 true
1084 } else {
1085 false
1086 };
1087
1088 if self.consume_ident_ci("REQUIRES")? {
1090 self.consume_ident_ci("CAPABILITY")?;
1091 self.advance()?;
1093 }
1094
1095 Ok(SubscriptionDescriptor {
1096 name: String::new(),
1097 source,
1098 target_queue,
1099 ops_filter,
1100 where_filter,
1101 redact_fields,
1102 enabled: true,
1103 all_tenants,
1104 })
1105 }
1106
1107 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1109 let mut parts = Vec::new();
1110 if self.consume(&Token::Star)? {
1111 parts.push("*".to_string());
1112 } else {
1113 parts.push(self.expect_ident_or_keyword()?);
1114 }
1115 while self.consume(&Token::Dot)? {
1116 if self.consume(&Token::Star)? {
1117 parts.push("*".to_string());
1118 } else {
1119 parts.push(self.expect_ident_or_keyword()?);
1120 }
1121 }
1122 Ok(parts.join("."))
1123 }
1124
1125 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1126 let mut parts = Vec::new();
1127 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1128 parts.push(self.peek().to_string());
1129 self.advance()?;
1130 }
1131 if parts.is_empty() {
1132 return Err(ParseError::expected(
1133 vec!["predicate"],
1134 self.peek(),
1135 self.position(),
1136 ));
1137 }
1138 Ok(parts.join(" "))
1139 }
1140
1141 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1146 let mut parts: Vec<String> = Vec::new();
1147 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1148 parts.push(self.peek().to_string());
1149 self.advance()?;
1150 }
1151 Ok(parts.join(" "))
1152 }
1153
1154 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1156 let name = self.expect_column_ident()?;
1157 let sql_type = self.parse_column_type()?;
1158 let data_type = sql_type.to_string();
1159
1160 let mut def = CreateColumnDef {
1161 name,
1162 data_type,
1163 sql_type: sql_type.clone(),
1164 not_null: false,
1165 default: None,
1166 compress: None,
1167 unique: false,
1168 primary_key: false,
1169 enum_variants: sql_type.enum_variants().unwrap_or_default(),
1170 array_element: sql_type.array_element_type(),
1171 decimal_precision: sql_type.decimal_precision(),
1172 };
1173
1174 loop {
1176 if self.match_not_null()? {
1177 def.not_null = true;
1178 } else if self.consume(&Token::Default)? {
1179 self.expect(Token::Eq)?;
1180 def.default = Some(self.parse_literal_string_for_ddl()?);
1181 } else if self.consume(&Token::Compress)? {
1182 self.expect(Token::Colon)?;
1183 def.compress = Some(self.parse_integer()? as u8);
1184 } else if self.consume(&Token::Unique)? {
1185 def.unique = true;
1186 } else if self.match_primary_key()? {
1187 def.primary_key = true;
1188 } else {
1189 break;
1190 }
1191 }
1192
1193 Ok(def)
1194 }
1195
1196 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1198 let type_name = self.expect_ident_or_keyword()?;
1199 if self.consume(&Token::LParen)? {
1200 let inner = self.parse_type_params()?;
1201 self.expect(Token::RParen)?;
1202 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1203 } else {
1204 Ok(SqlTypeName::new(type_name))
1205 }
1206 }
1207
1208 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1210 let mut parts = Vec::new();
1211 loop {
1212 match self.peek().clone() {
1213 Token::String(s) => {
1214 let s = s.clone();
1215 self.advance()?;
1216 parts.push(TypeModifier::StringLiteral(s));
1217 }
1218 Token::Integer(n) => {
1219 self.advance()?;
1220 parts.push(TypeModifier::Number(n as u32));
1221 }
1222 _ => {
1223 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1224 }
1225 }
1226 if !self.consume(&Token::Comma)? {
1227 break;
1228 }
1229 }
1230 Ok(parts)
1231 }
1232
1233 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1235 match self.peek().clone() {
1236 Token::String(s) => {
1237 let s = s.clone();
1238 self.advance()?;
1239 Ok(s)
1240 }
1241 Token::Integer(n) => {
1242 self.advance()?;
1243 Ok(n.to_string())
1244 }
1245 Token::Float(n) => {
1246 self.advance()?;
1247 Ok(n.to_string())
1248 }
1249 Token::True => {
1250 self.advance()?;
1251 Ok("true".to_string())
1252 }
1253 Token::False => {
1254 self.advance()?;
1255 Ok("false".to_string())
1256 }
1257 Token::Null => {
1258 self.advance()?;
1259 Ok("null".to_string())
1260 }
1261 ref other => Err(ParseError::expected(
1262 vec!["string", "number", "true", "false", "null"],
1263 other,
1264 self.position(),
1265 )),
1266 }
1267 }
1268
1269 fn check_ttl_keyword(&self) -> bool {
1270 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1271 }
1272
1273 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1276 self.expect(Token::Eq)?;
1277 match self.peek() {
1278 Token::True => {
1279 self.advance()?;
1280 Ok(true)
1281 }
1282 Token::False => {
1283 self.advance()?;
1284 Ok(false)
1285 }
1286 other => Err(ParseError::expected(
1287 vec!["true", "false"],
1288 other,
1289 self.position(),
1290 )),
1291 }
1292 }
1293
1294 fn parse_ai_string_list(&mut self) -> Result<Vec<String>, ParseError> {
1298 self.expect(Token::LParen)?;
1299 let mut out = Vec::new();
1300 loop {
1301 out.push(self.parse_string()?);
1302 if !self.consume(&Token::Comma)? {
1303 break;
1304 }
1305 }
1306 self.expect(Token::RParen)?;
1307 Ok(out)
1308 }
1309
1310 fn parse_ai_bool(&mut self) -> Result<bool, ParseError> {
1312 match self.peek() {
1313 Token::True => {
1314 self.advance()?;
1315 Ok(true)
1316 }
1317 Token::False => {
1318 self.advance()?;
1319 Ok(false)
1320 }
1321 other => Err(ParseError::expected(
1322 vec!["true", "false"],
1323 other,
1324 self.position(),
1325 )),
1326 }
1327 }
1328
1329 fn parse_ai_word(&mut self) -> Result<String, ParseError> {
1332 if matches!(self.peek(), Token::String(_)) {
1333 self.parse_string()
1334 } else {
1335 self.expect_ident_or_keyword()
1336 }
1337 }
1338
1339 fn parse_ai_embed_policy(&mut self) -> Result<reddb_types::catalog::EmbedPolicy, ParseError> {
1341 self.expect(Token::LParen)?;
1342 let mut fields: Vec<String> = Vec::new();
1343 let mut provider: Option<String> = None;
1344 let mut model: Option<String> = None;
1345 loop {
1346 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1347 self.expect(Token::Eq)?;
1348 match key.as_str() {
1349 "fields" => fields = self.parse_ai_string_list()?,
1350 "provider" => provider = Some(self.parse_string()?),
1351 "model" => model = Some(self.parse_string()?),
1352 other => {
1353 return Err(ParseError::new(
1354 format!(
1355 "unsupported EMBED policy option {other:?}; supported: fields, provider, model"
1356 ),
1357 self.position(),
1358 ));
1359 }
1360 }
1361 if !self.consume(&Token::Comma)? {
1362 break;
1363 }
1364 }
1365 self.expect(Token::RParen)?;
1366 if fields.is_empty() {
1367 return Err(ParseError::new(
1368 "EMBED policy requires fields = ('<col>', ...)".to_string(),
1369 self.position(),
1370 ));
1371 }
1372 let provider = provider.ok_or_else(|| {
1373 ParseError::new(
1374 "EMBED policy requires provider = '<token>'".to_string(),
1375 self.position(),
1376 )
1377 })?;
1378 let model = model.ok_or_else(|| {
1379 ParseError::new(
1380 "EMBED policy requires model = '<name>'".to_string(),
1381 self.position(),
1382 )
1383 })?;
1384 Ok(reddb_types::catalog::EmbedPolicy {
1385 fields,
1386 provider,
1387 model,
1388 })
1389 }
1390
1391 fn parse_ai_moderate_policy(
1393 &mut self,
1394 ) -> Result<reddb_types::catalog::ModeratePolicy, ParseError> {
1395 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1396 self.expect(Token::LParen)?;
1397 let mut fields: Vec<String> = Vec::new();
1398 let mut provider: Option<String> = None;
1399 let mut model: Option<String> = None;
1400 let mut sync_gate = false;
1401 let mut degraded_mode = ModerateDegradedMode::default();
1402 let mut reject_action = ModerateRejectAction::default();
1403 let mut hard_delete_on_reject = false;
1404 loop {
1405 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1406 self.expect(Token::Eq)?;
1407 match key.as_str() {
1408 "fields" => fields = self.parse_ai_string_list()?,
1409 "provider" => provider = Some(self.parse_string()?),
1410 "model" => model = Some(self.parse_string()?),
1411 "sync" | "sync_gate" => sync_gate = self.parse_ai_bool()?,
1412 "hard_delete" | "hard_delete_on_reject" => {
1413 hard_delete_on_reject = self.parse_ai_bool()?
1414 }
1415 "degraded" | "degraded_mode" => {
1416 let word = self.parse_ai_word()?;
1417 degraded_mode = ModerateDegradedMode::from_str(&word).ok_or_else(|| {
1418 ParseError::new(
1419 format!(
1420 "unsupported MODERATE degraded mode {word:?}; supported: open, closed"
1421 ),
1422 self.position(),
1423 )
1424 })?;
1425 }
1426 "on_reject" | "reject_action" => {
1427 let word = self.parse_ai_word()?;
1428 reject_action = ModerateRejectAction::from_str(&word).ok_or_else(|| {
1429 ParseError::new(
1430 format!(
1431 "unsupported MODERATE reject action {word:?}; supported: reject, flag, redact"
1432 ),
1433 self.position(),
1434 )
1435 })?;
1436 }
1437 other => {
1438 return Err(ParseError::new(
1439 format!(
1440 "unsupported MODERATE policy option {other:?}; supported: fields, provider, model, sync, degraded, on_reject, hard_delete"
1441 ),
1442 self.position(),
1443 ));
1444 }
1445 }
1446 if !self.consume(&Token::Comma)? {
1447 break;
1448 }
1449 }
1450 self.expect(Token::RParen)?;
1451 if fields.is_empty() {
1452 return Err(ParseError::new(
1453 "MODERATE policy requires fields = ('<col>', ...)".to_string(),
1454 self.position(),
1455 ));
1456 }
1457 let provider = provider.ok_or_else(|| {
1458 ParseError::new(
1459 "MODERATE policy requires provider = '<token>'".to_string(),
1460 self.position(),
1461 )
1462 })?;
1463 let model = model.ok_or_else(|| {
1464 ParseError::new(
1465 "MODERATE policy requires model = '<name>'".to_string(),
1466 self.position(),
1467 )
1468 })?;
1469 Ok(reddb_types::catalog::ModeratePolicy {
1470 fields,
1471 provider,
1472 model,
1473 sync_gate,
1474 degraded_mode,
1475 reject_action,
1476 hard_delete_on_reject,
1477 })
1478 }
1479
1480 fn parse_ai_vision_policy(&mut self) -> Result<reddb_types::catalog::VisionPolicy, ParseError> {
1482 self.expect(Token::LParen)?;
1483 let mut image_field: Option<String> = None;
1484 let mut output_kinds: Vec<String> = Vec::new();
1485 let mut provider: Option<String> = None;
1486 let mut model: Option<String> = None;
1487 loop {
1488 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1489 self.expect(Token::Eq)?;
1490 match key.as_str() {
1491 "image_field" => image_field = Some(self.parse_string()?),
1492 "outputs" | "output_kinds" => output_kinds = self.parse_ai_string_list()?,
1493 "provider" => provider = Some(self.parse_string()?),
1494 "model" => model = Some(self.parse_string()?),
1495 other => {
1496 return Err(ParseError::new(
1497 format!(
1498 "unsupported VISION policy option {other:?}; supported: image_field, outputs, provider, model"
1499 ),
1500 self.position(),
1501 ));
1502 }
1503 }
1504 if !self.consume(&Token::Comma)? {
1505 break;
1506 }
1507 }
1508 self.expect(Token::RParen)?;
1509 let image_field = image_field.ok_or_else(|| {
1510 ParseError::new(
1511 "VISION policy requires image_field = '<col>'".to_string(),
1512 self.position(),
1513 )
1514 })?;
1515 if output_kinds.is_empty() {
1516 return Err(ParseError::new(
1517 "VISION policy requires outputs = ('<kind>', ...)".to_string(),
1518 self.position(),
1519 ));
1520 }
1521 let provider = provider.ok_or_else(|| {
1522 ParseError::new(
1523 "VISION policy requires provider = '<token>'".to_string(),
1524 self.position(),
1525 )
1526 })?;
1527 let model = model.ok_or_else(|| {
1528 ParseError::new(
1529 "VISION policy requires model = '<name>'".to_string(),
1530 self.position(),
1531 )
1532 })?;
1533 Ok(reddb_types::catalog::VisionPolicy {
1534 image_field,
1535 output_kinds,
1536 provider,
1537 model,
1538 })
1539 }
1540
1541 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1542 if self.consume_ident_ci(expected)? {
1543 Ok(())
1544 } else {
1545 Err(ParseError::expected(
1546 vec![expected],
1547 self.peek(),
1548 self.position(),
1549 ))
1550 }
1551 }
1552
1553 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1554 let option_name = self.expect_ident_or_keyword()?;
1555 if !option_name.eq_ignore_ascii_case("ttl") {
1556 return Err(ParseError::new(
1557 format!(
1561 "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1562 ),
1563 self.position(),
1564 ));
1565 }
1566
1567 let ttl_value = self.parse_float()?;
1568 let ttl_unit = match self.peek() {
1569 Token::Ident(unit) => {
1570 let unit = unit.clone();
1571 self.advance()?;
1572 unit
1573 }
1574 _ => "s".to_string(),
1575 };
1576
1577 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1578 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1579 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1580 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1581 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1582 "d" | "day" | "days" => 86_400_000.0,
1583 other => {
1584 return Err(ParseError::new(
1585 format!(
1589 "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1590 ),
1591 self.position(),
1592 ));
1593 }
1594 };
1595
1596 if !ttl_value.is_finite() || ttl_value < 0.0 {
1597 return Err(ParseError::new(
1598 "TTL must be a finite, non-negative duration".to_string(),
1599 self.position(),
1600 ));
1601 }
1602
1603 let ttl_ms = ttl_value * multiplier_ms;
1604 if ttl_ms > u64::MAX as f64 {
1605 return Err(ParseError::new(
1606 "TTL duration is too large".to_string(),
1607 self.position(),
1608 ));
1609 }
1610 if ttl_ms.fract().abs() >= f64::EPSILON {
1611 return Err(ParseError::new(
1612 "TTL duration must resolve to a whole number of milliseconds".to_string(),
1613 self.position(),
1614 ));
1615 }
1616
1617 Ok(Some(ttl_ms as u64))
1618 }
1619
1620 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1622 if self.check(&Token::If) {
1623 self.advance()?;
1624 self.expect(Token::Not)?;
1625 self.expect(Token::Exists)?;
1626 Ok(true)
1627 } else {
1628 Ok(false)
1629 }
1630 }
1631
1632 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1634 if self.check(&Token::If) {
1635 self.advance()?;
1636 self.expect(Token::Exists)?;
1637 Ok(true)
1638 } else {
1639 Ok(false)
1640 }
1641 }
1642
1643 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1645 if self.check(&Token::Not) {
1646 self.advance()?; if self.check(&Token::Null) {
1650 self.advance()?; Ok(true)
1652 } else {
1653 Err(ParseError::expected(
1657 vec!["NULL (after NOT)"],
1658 self.peek(),
1659 self.position(),
1660 ))
1661 }
1662 } else {
1663 Ok(false)
1664 }
1665 }
1666
1667 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1669 if self.check(&Token::Primary) {
1670 self.advance()?;
1671 self.expect(Token::Key)?;
1672 Ok(true)
1673 } else {
1674 Ok(false)
1675 }
1676 }
1677}
1678
1679fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1684 if s.len() != 64 {
1685 return Err(format!("expected 64 hex chars, got {}", s.len()));
1686 }
1687 let mut out = [0u8; 32];
1688 let bytes = s.as_bytes();
1689 for i in 0..32 {
1690 let hi = hex_nibble(bytes[i * 2])?;
1691 let lo = hex_nibble(bytes[i * 2 + 1])?;
1692 out[i] = (hi << 4) | lo;
1693 }
1694 Ok(out)
1695}
1696
1697fn hex_nibble(c: u8) -> Result<u8, String> {
1698 match c {
1699 b'0'..=b'9' => Ok(c - b'0'),
1700 b'a'..=b'f' => Ok(c - b'a' + 10),
1701 b'A'..=b'F' => Ok(c - b'A' + 10),
1702 _ => Err(format!("non-hex char: {:?}", c as char)),
1703 }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708 use super::*;
1709 use reddb_types::catalog::{AnalyticsOutput, CollectionModel, SubscriptionOperation};
1710
1711 fn parser(input: &str) -> Parser<'_> {
1712 Parser::new(input).unwrap_or_else(|err| panic!("failed to lex {input:?}: {err:?}"))
1713 }
1714
1715 #[test]
1716 fn parse_create_table_body_parenthesized_options_and_trailing_clauses() {
1717 let QueryExpr::CreateTable(table) = parser(
1718 "IF NOT EXISTS events (id INT, tenant_meta TEXT) \
1719 WITH (tenant_by = 'tenant_id', append_only = true, timestamps = false) \
1720 PARTITION BY HASH (id) TENANT BY (tenant_meta.tenant)",
1721 )
1722 .parse_create_table_body()
1723 .expect("create table body") else {
1724 panic!("Expected CreateTableQuery");
1725 };
1726
1727 assert_eq!(table.name, "events");
1728 assert!(table.if_not_exists);
1729 assert!(table.append_only);
1730 assert!(!table.timestamps);
1731 assert_eq!(table.tenant_by.as_deref(), Some("tenant_id"));
1732 assert_eq!(
1733 table
1734 .partition_by
1735 .as_ref()
1736 .map(|spec| (spec.kind, spec.column.as_str())),
1737 Some((PartitionKind::Hash, "id"))
1738 );
1739
1740 let err = parser("bad (id INT) WITH (tenant_by = 42)")
1741 .parse_create_table_body()
1742 .unwrap_err();
1743 assert!(
1744 err.to_string()
1745 .contains("WITH tenant_by expects a text literal"),
1746 "{err}"
1747 );
1748 }
1749
1750 #[test]
1751 fn parse_create_table_ai_policy_round_trips_all_modalities() {
1752 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1753 let QueryExpr::CreateTable(table) = parser(
1754 "posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
1755 EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
1756 MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag, hard_delete = true), \
1757 VISION (image_field = 'photo', outputs = ('caption', 'tags'), provider = 'openai', model = 'gpt-4o') \
1758 )",
1759 )
1760 .parse_create_table_body()
1761 .expect("create table body with ai policy") else {
1762 panic!("Expected CreateTableQuery");
1763 };
1764
1765 let policy = table.ai_policy.expect("ai policy present");
1766
1767 let embed = policy.embed.expect("embed block");
1768 assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
1769 assert_eq!(embed.provider, "openai");
1770 assert_eq!(embed.model, "text-embedding-3-small");
1771
1772 let moderate = policy.moderate.expect("moderate block");
1773 assert_eq!(moderate.fields, vec!["body".to_string()]);
1774 assert!(moderate.sync_gate);
1775 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
1776 assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
1777 assert!(moderate.hard_delete_on_reject);
1778
1779 let vision = policy.vision.expect("vision block");
1780 assert_eq!(vision.image_field, "photo");
1781 assert_eq!(
1782 vision.output_kinds,
1783 vec!["caption".to_string(), "tags".to_string()]
1784 );
1785 assert_eq!(vision.model, "gpt-4o");
1786 }
1787
1788 #[test]
1789 fn parse_moderate_policy_aliases_and_error_branches() {
1790 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1791 let QueryExpr::CreateTable(table) = parser(
1793 "t (id INT, body TEXT) WITH ( \
1794 MODERATE (fields = ('body'), provider = 'openai', model = 'm', \
1795 sync_gate = true, degraded_mode = open, reject_action = redact, \
1796 hard_delete_on_reject = true) \
1797 )",
1798 )
1799 .parse_create_table_body()
1800 .expect("alias spellings parse") else {
1801 panic!("Expected CreateTableQuery");
1802 };
1803 let moderate = table
1804 .ai_policy
1805 .expect("policy")
1806 .moderate
1807 .expect("moderate block");
1808 assert!(moderate.sync_gate);
1809 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1810 assert_eq!(moderate.reject_action, ModerateRejectAction::Redact);
1811 assert!(moderate.hard_delete_on_reject);
1812
1813 for (sql, needle) in [
1815 (
1816 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1817 "unsupported MODERATE policy option",
1818 ),
1819 (
1820 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', degraded = sideways))",
1821 "unsupported MODERATE degraded mode",
1822 ),
1823 (
1824 "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', on_reject = explode))",
1825 "unsupported MODERATE reject action",
1826 ),
1827 (
1828 "t (id INT, body TEXT) WITH (MODERATE (provider = 'o', model = 'm'))",
1829 "MODERATE policy requires fields",
1830 ),
1831 ] {
1832 let err = parser(sql)
1833 .parse_create_table_body()
1834 .expect_err("moderate policy error");
1835 assert!(format!("{err}").contains(needle), "got: {err}");
1836 }
1837 }
1838
1839 #[test]
1840 fn parse_embed_and_vision_policy_error_branches() {
1841 for (sql, needle) in [
1844 (
1845 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1846 "unsupported EMBED policy option",
1847 ),
1848 (
1849 "t (id INT, body TEXT) WITH (EMBED (provider = 'o', model = 'm'))",
1850 "EMBED policy requires fields",
1851 ),
1852 (
1853 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), model = 'm'))",
1854 "EMBED policy requires provider",
1855 ),
1856 (
1857 "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o'))",
1858 "EMBED policy requires model",
1859 ),
1860 (
1861 "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', provider = 'o', model = 'm', bogus = 1))",
1862 "unsupported VISION policy option",
1863 ),
1864 (
1865 "t (id INT, photo TEXT) WITH (VISION (provider = 'o', model = 'm'))",
1866 "VISION policy requires image_field",
1867 ),
1868 ] {
1869 let err = parser(sql)
1870 .parse_create_table_body()
1871 .expect_err("ai policy error");
1872 assert!(format!("{err}").contains(needle), "got: {err}");
1873 }
1874
1875 let QueryExpr::CreateTable(table) = parser(
1877 "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', \
1878 output_kinds = ('caption'), provider = 'o', model = 'm'))",
1879 )
1880 .parse_create_table_body()
1881 .expect("vision output_kinds alias") else {
1882 panic!("Expected CreateTableQuery");
1883 };
1884 let vision = table
1885 .ai_policy
1886 .expect("policy")
1887 .vision
1888 .expect("vision block");
1889 assert_eq!(vision.output_kinds, vec!["caption".to_string()]);
1890 }
1891
1892 #[test]
1893 fn parse_create_table_ai_policy_defaults_and_no_clause() {
1894 use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1895 let QueryExpr::CreateTable(table) = parser(
1897 "msgs (id INT, body TEXT) WITH ( \
1898 MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest') \
1899 )",
1900 )
1901 .parse_create_table_body()
1902 .expect("create table body") else {
1903 panic!("Expected CreateTableQuery");
1904 };
1905 let moderate = table
1906 .ai_policy
1907 .expect("policy")
1908 .moderate
1909 .expect("moderate block");
1910 assert!(!moderate.sync_gate);
1911 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1912 assert_eq!(moderate.reject_action, ModerateRejectAction::Reject);
1913 assert!(!moderate.hard_delete_on_reject);
1914
1915 let QueryExpr::CreateTable(plain) = parser("plain (id INT)")
1917 .parse_create_table_body()
1918 .expect("create table body")
1919 else {
1920 panic!("Expected CreateTableQuery");
1921 };
1922 assert!(plain.ai_policy.is_none());
1923 }
1924
1925 #[test]
1926 fn parse_create_table_ai_policy_rejects_malformed_clauses() {
1927 let err = parser("t (id INT) WITH (EMBED (fields = ('body'), model = 'm'))")
1929 .parse_create_table_body()
1930 .unwrap_err();
1931 assert!(
1932 err.to_string().contains("EMBED policy requires provider"),
1933 "{err}"
1934 );
1935
1936 let err = parser(
1938 "t (id INT) WITH (VISION (image_field = 'p', outputs = ('caption'), provider = 'openai', model = 'm', bogus = 1))",
1939 )
1940 .parse_create_table_body()
1941 .unwrap_err();
1942 assert!(
1943 err.to_string().contains("unsupported VISION policy option"),
1944 "{err}"
1945 );
1946
1947 let err = parser(
1949 "t (id INT) WITH (MODERATE (fields = ('body'), provider = 'openai', model = 'm', degraded = maybe))",
1950 )
1951 .parse_create_table_body()
1952 .unwrap_err();
1953 assert!(
1954 err.to_string()
1955 .contains("unsupported MODERATE degraded mode"),
1956 "{err}"
1957 );
1958
1959 let err = parser(
1961 "t (id INT) WITH (EMBED (fields = ('a'), provider = 'openai', model = 'm'), EMBED (fields = ('b'), provider = 'openai', model = 'm'))",
1962 )
1963 .parse_create_table_body()
1964 .unwrap_err();
1965 assert!(err.to_string().contains("duplicate EMBED clause"), "{err}");
1966 }
1967
1968 #[test]
1969 fn parse_keyed_bodies_cover_vault_analytics_and_dotted_drop_names() {
1970 let QueryExpr::CreateTable(vault) =
1971 parser("IF NOT EXISTS tenant.secrets WITH OWN MASTER KEY")
1972 .parse_create_keyed_body(CollectionModel::Vault)
1973 .expect("create vault")
1974 else {
1975 panic!("Expected CreateTableQuery");
1976 };
1977 assert_eq!(vault.collection_model, CollectionModel::Vault);
1978 assert_eq!(vault.name, "tenant.secrets");
1979 assert!(vault.if_not_exists);
1980 assert!(vault.vault_own_master_key);
1981
1982 let QueryExpr::CreateTable(graph) = parser(
1983 "g WITH ANALYTICS (centrality (using = pagerank, max_iterations = 12, tolerance = 0.001))",
1984 )
1985 .parse_create_keyed_body(CollectionModel::Graph)
1986 .expect("create graph")
1987 else {
1988 panic!("Expected CreateTableQuery");
1989 };
1990 assert_eq!(graph.analytics_config.len(), 1);
1991 let view = &graph.analytics_config[0];
1992 assert_eq!(view.output, AnalyticsOutput::Centrality);
1993 assert_eq!(view.algorithm.as_deref(), Some("pagerank"));
1994 assert_eq!(view.max_iterations, Some(12));
1995 assert_eq!(view.tolerance, Some(0.001));
1996
1997 let err = parser("g WITH OTHER")
1998 .parse_create_keyed_body(CollectionModel::Graph)
1999 .unwrap_err();
2000 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2001
2002 assert!(parser("CREATE KV cache WITH ANALYTICS (components)")
2003 .parse()
2004 .unwrap_err()
2005 .to_string()
2006 .contains("Unexpected token after query"));
2007
2008 let QueryExpr::DropKv(drop) = parser("IF EXISTS tenant.cache.*")
2009 .parse_drop_keyed_body(CollectionModel::Kv)
2010 .expect("drop kv")
2011 else {
2012 panic!("Expected DropKvQuery");
2013 };
2014 assert_eq!(drop.name, "tenant.cache.*");
2015 assert!(drop.if_exists);
2016 assert_eq!(drop.model, CollectionModel::Kv);
2017 }
2018
2019 #[test]
2020 fn parse_collection_signed_by_list_and_errors() {
2021 let pk_a = "aa".repeat(32);
2022 let pk_b = "BB".repeat(32);
2023 let QueryExpr::CreateCollection(collection) =
2024 parser(&format!("signed KIND graph SIGNED_BY ('{pk_a}', '{pk_b}')"))
2025 .parse_create_collection_body()
2026 .expect("create collection")
2027 else {
2028 panic!("Expected CreateCollectionQuery");
2029 };
2030 assert_eq!(collection.allowed_signers, vec![[0xaau8; 32], [0xBBu8; 32]]);
2031
2032 let err = parser("signed KIND graph SIGNED_BY (42)")
2033 .parse_create_collection_body()
2034 .unwrap_err();
2035 assert!(
2036 err.to_string()
2037 .contains("string literal (ed25519 pubkey hex)"),
2038 "{err}"
2039 );
2040
2041 let err = parser("signed KIND graph SIGNED_BY ('deadbeef')")
2042 .parse_create_collection_body()
2043 .unwrap_err();
2044 assert!(err.to_string().contains("expected 64 hex chars"), "{err}");
2045 }
2046
2047 #[test]
2048 fn parse_alter_operations_cover_subscriptions_partitions_tenancy_and_signers() {
2049 let pk = "11".repeat(32);
2050 let QueryExpr::AlterTable(alter) = parser(&format!(
2051 "ALTER COLLECTION audit \
2052 ADD SUBSCRIPTION pii TO audit_events REDACT (payload.ssn, *.secret) WHERE level = 'warn', \
2053 DROP SUBSCRIPTION pii, \
2054 ADD SIGNER '{pk}', \
2055 REVOKE SIGNER '{pk}', \
2056 ATTACH PARTITION audit_2026 FOR VALUES FROM (2026) TO (2027), \
2057 DETACH PARTITION audit_2026, \
2058 ENABLE EVENTS (INSERT, UPDATE) TO table_events ON ALL TENANTS, \
2059 DISABLE EVENTS, \
2060 ENABLE TENANCY ON (metadata.tenant), \
2061 DISABLE TENANCY, \
2062 SET APPEND_ONLY = true, \
2063 SET VERSIONED = false, \
2064 SET RETENTION 2 h, \
2065 UNSET RETENTION"
2066 ))
2067 .parse_alter_table_query()
2068 .expect("alter collection")
2069 else {
2070 panic!("Expected AlterTableQuery");
2071 };
2072
2073 assert_eq!(alter.name, "audit");
2074 assert_eq!(alter.operations.len(), 14);
2075 match &alter.operations[0] {
2076 AlterOperation::AddSubscription { name, descriptor } => {
2077 assert_eq!(name, "pii");
2078 assert_eq!(descriptor.target_queue, "audit_events");
2079 assert_eq!(descriptor.redact_fields, vec!["payload.ssn", "*.secret"]);
2080 assert_eq!(descriptor.where_filter.as_deref(), Some("LEVEL = 'warn'"));
2081 }
2082 other => panic!("expected AddSubscription, got {other:?}"),
2083 }
2084 assert!(matches!(
2085 &alter.operations[1],
2086 AlterOperation::DropSubscription { name } if name == "pii"
2087 ));
2088 assert!(matches!(
2089 &alter.operations[2],
2090 AlterOperation::AddSigner { pubkey } if *pubkey == [0x11; 32]
2091 ));
2092 assert!(matches!(
2093 &alter.operations[3],
2094 AlterOperation::RevokeSigner { pubkey } if *pubkey == [0x11; 32]
2095 ));
2096 assert!(matches!(
2097 &alter.operations[4],
2098 AlterOperation::AttachPartition { child, bound }
2099 if child == "audit_2026" && bound == "FROM ( 2026 ) TO ( 2027 )"
2100 ));
2101 assert!(matches!(
2102 &alter.operations[5],
2103 AlterOperation::DetachPartition { child } if child == "audit_2026"
2104 ));
2105 match &alter.operations[6] {
2106 AlterOperation::EnableEvents(descriptor) => {
2107 assert_eq!(
2108 descriptor.ops_filter,
2109 vec![SubscriptionOperation::Insert, SubscriptionOperation::Update]
2110 );
2111 assert_eq!(descriptor.target_queue, "table_events");
2112 assert!(descriptor.all_tenants);
2113 }
2114 other => panic!("expected EnableEvents, got {other:?}"),
2115 }
2116 assert!(matches!(
2117 &alter.operations[7],
2118 AlterOperation::DisableEvents
2119 ));
2120 assert!(matches!(
2121 &alter.operations[8],
2122 AlterOperation::EnableTenancy { column } if column == "METADATA.tenant"
2123 ));
2124 assert!(matches!(
2125 &alter.operations[9],
2126 AlterOperation::DisableTenancy
2127 ));
2128 assert!(matches!(
2129 &alter.operations[10],
2130 AlterOperation::SetAppendOnly(true)
2131 ));
2132 assert!(matches!(
2133 &alter.operations[11],
2134 AlterOperation::SetVersioned(false)
2135 ));
2136 assert!(matches!(
2137 &alter.operations[12],
2138 AlterOperation::SetRetention { duration_ms } if *duration_ms == 7_200_000
2139 ));
2140 assert!(matches!(
2141 &alter.operations[13],
2142 AlterOperation::UnsetRetention
2143 ));
2144 }
2145
2146 #[test]
2147 fn parse_alter_graph_analytics_keyword_errors() {
2148 let err = parser("ALTER GRAPH g ADD centrality")
2149 .parse_alter_graph_query()
2150 .unwrap_err();
2151 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2152
2153 let err = parser("ALTER GRAPH g DROP centrality")
2154 .parse_alter_graph_query()
2155 .unwrap_err();
2156 assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2157 }
2158
2159 #[test]
2160 fn decode_hex_32_reports_length_and_character_errors() {
2161 assert_eq!(decode_hex_32(&"0f".repeat(32)).unwrap(), [0x0f; 32]);
2162 assert_eq!(
2163 decode_hex_32("deadbeef").unwrap_err(),
2164 "expected 64 hex chars, got 8"
2165 );
2166 assert!(decode_hex_32(&"gg".repeat(32))
2167 .unwrap_err()
2168 .contains("non-hex char"));
2169 }
2170}