1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7 PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 analytics_config: Vec::new(),
87 vault_own_master_key: false,
88 }))
89 }
90
91 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
93 self.expect(Token::Drop)?;
94 self.expect(Token::Table)?;
95 self.parse_drop_table_body()
96 }
97
98 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
100 let if_not_exists = self.match_if_not_exists()?;
101 let name = self.expect_ident()?;
102
103 self.expect(Token::LParen)?;
104 let mut columns = Vec::new();
105 loop {
106 let col = self.parse_column_def()?;
107 columns.push(col);
108 if !self.consume(&Token::Comma)? {
109 break;
110 }
111 }
112 self.expect(Token::RParen)?;
113
114 let mut default_ttl_ms = None;
115 let mut context_index_fields = Vec::new();
116 let mut context_index_enabled = false;
117 let mut timestamps = false;
118 let mut tenant_by: Option<String> = None;
119 let mut append_only = false;
120 let mut subscriptions = Vec::new();
121
122 while self.consume(&Token::With)? {
123 if self.consume_ident_ci("EVENTS")? {
124 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
125 continue;
126 }
127 let has_parens = self.consume(&Token::LParen)?;
134
135 loop {
136 if self.consume_ident_ci("CONTEXT_INDEX")? {
137 context_index_enabled = self.parse_bool_assign()?;
138 } else if self.consume_ident_ci("CONTEXT")? {
139 if !self.consume(&Token::Index)? {
140 return Err(ParseError::expected(
141 vec!["INDEX"],
142 self.peek(),
143 self.position(),
144 ));
145 }
146 self.expect(Token::On)?;
147 self.expect(Token::LParen)?;
148 loop {
149 context_index_fields.push(self.expect_ident()?);
150 if !self.consume(&Token::Comma)? {
151 break;
152 }
153 }
154 self.expect(Token::RParen)?;
155 context_index_enabled = true;
156 } else if self.consume_ident_ci("TIMESTAMPS")? {
157 timestamps = self.parse_bool_assign()?;
158 } else if self.consume_ident_ci("APPEND_ONLY")? {
159 append_only = self.parse_bool_assign()?;
160 } else if self.consume_ident_ci("TENANT_BY")? {
161 let _ = self.consume(&Token::Eq)?;
164 let value = self.parse_literal_value()?;
165 match value {
166 Value::Text(col) => tenant_by = Some(col.to_string()),
167 other => {
168 return Err(ParseError::new(
169 format!("WITH tenant_by expects a text literal, got {other:?}"),
170 self.position(),
171 ));
172 }
173 }
174 } else {
175 default_ttl_ms = self.parse_create_table_ttl_clause()?;
176 }
177 if has_parens {
178 if self.consume(&Token::Comma)? {
179 continue;
180 }
181 self.expect(Token::RParen)?;
182 }
183 break;
184 }
185 }
186
187 let partition_by = if self.consume(&Token::Partition)? {
189 self.expect(Token::By)?;
190 let kind = if self.consume(&Token::Range)? {
191 PartitionKind::Range
192 } else if self.consume(&Token::List)? {
193 PartitionKind::List
194 } else if self.consume(&Token::Hash)? {
195 PartitionKind::Hash
196 } else {
197 return Err(ParseError::expected(
198 vec!["RANGE", "LIST", "HASH"],
199 self.peek(),
200 self.position(),
201 ));
202 };
203 self.expect(Token::LParen)?;
204 let column = self.expect_ident()?;
205 self.expect(Token::RParen)?;
206 Some(PartitionSpec { kind, column })
207 } else {
208 None
209 };
210
211 if !append_only && self.consume_ident_ci("APPEND")? {
216 if !self.consume_ident_ci("ONLY")? {
217 return Err(ParseError::expected(
218 vec!["ONLY"],
219 self.peek(),
220 self.position(),
221 ));
222 }
223 append_only = true;
224 }
225
226 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
237 self.expect(Token::By)?;
238 self.expect(Token::LParen)?;
239 let mut path = self.expect_ident_or_keyword()?;
243 while self.consume(&Token::Dot)? {
244 let next = self.expect_ident_or_keyword()?;
245 path = format!("{path}.{next}");
246 }
247 self.expect(Token::RParen)?;
248 tenant_by = Some(path);
249 }
250
251 Ok(QueryExpr::CreateTable(CreateTableQuery {
252 collection_model: CollectionModel::Table,
253 name,
254 columns,
255 if_not_exists,
256 default_ttl_ms,
257 metrics_rollup_policies: Vec::new(),
258 context_index_fields,
259 context_index_enabled,
260 timestamps,
261 partition_by,
262 tenant_by,
263 append_only,
264 subscriptions,
265 analytics_config: Vec::new(),
266 vault_own_master_key: false,
267 }))
268 }
269
270 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
276 self.expect(Token::Explain)?;
277 self.expect(Token::Alter)?;
278 self.expect(Token::For)?;
279 self.expect(Token::Create)?;
280 self.expect(Token::Table)?;
281
282 let body = self.parse_create_table_body()?;
283 let target = match body {
284 QueryExpr::CreateTable(t) => t,
285 _ => {
286 return Err(ParseError::new(
287 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
288 .to_string(),
289 self.position(),
290 ));
291 }
292 };
293
294 let format = if self.consume(&Token::Format)? {
295 if self.consume(&Token::Json)? {
296 ExplainFormat::Json
297 } else if self.consume_ident_ci("SQL")? {
298 ExplainFormat::Sql
299 } else {
300 return Err(ParseError::expected(
301 vec!["JSON", "SQL"],
302 self.peek(),
303 self.position(),
304 ));
305 }
306 } else {
307 ExplainFormat::Sql
308 };
309
310 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
311 target,
312 format,
313 }))
314 }
315
316 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
318 let if_exists = self.match_if_exists()?;
319 let name = self.parse_drop_collection_name()?;
320 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
321 }
322
323 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
324 let if_exists = self.match_if_exists()?;
325 let name = self.parse_drop_collection_name()?;
326 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
327 }
328
329 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
330 let if_exists = self.match_if_exists()?;
331 let name = self.parse_drop_collection_name()?;
332 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
333 }
334
335 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
336 let if_exists = self.match_if_exists()?;
337 let name = self.parse_drop_collection_name()?;
338 Ok(QueryExpr::DropDocument(DropDocumentQuery {
339 name,
340 if_exists,
341 }))
342 }
343
344 pub fn parse_create_keyed_body(
345 &mut self,
346 model: CollectionModel,
347 ) -> Result<QueryExpr, ParseError> {
348 let if_not_exists = self.match_if_not_exists()?;
349 let name = self.parse_drop_collection_name()?;
350 let vault_own_master_key =
351 if model == CollectionModel::Vault && self.consume(&Token::With)? {
352 if !self.consume_ident_ci("OWN")? {
353 return Err(ParseError::expected(
354 vec!["OWN"],
355 self.peek(),
356 self.position(),
357 ));
358 }
359 if !self.consume_ident_ci("MASTER")? {
360 return Err(ParseError::expected(
361 vec!["MASTER"],
362 self.peek(),
363 self.position(),
364 ));
365 }
366 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
367 return Err(ParseError::expected(
368 vec!["KEY"],
369 self.peek(),
370 self.position(),
371 ));
372 }
373 true
374 } else {
375 false
376 };
377 let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
381 if !self.consume_ident_ci("ANALYTICS")? {
382 return Err(ParseError::expected(
383 vec!["ANALYTICS"],
384 self.peek(),
385 self.position(),
386 ));
387 }
388 self.parse_analytics_clause()?
389 } else {
390 Vec::new()
391 };
392 Ok(QueryExpr::CreateTable(CreateTableQuery {
393 collection_model: model,
394 name,
395 columns: Vec::new(),
396 if_not_exists,
397 default_ttl_ms: None,
398 metrics_rollup_policies: Vec::new(),
399 context_index_fields: Vec::new(),
400 context_index_enabled: false,
401 timestamps: false,
402 partition_by: None,
403 tenant_by: None,
404 append_only: false,
405 subscriptions: Vec::new(),
406 analytics_config,
407 vault_own_master_key,
408 }))
409 }
410
411 fn parse_analytics_clause(
417 &mut self,
418 ) -> Result<Vec<crate::catalog::AnalyticsViewDescriptor>, ParseError> {
419 use crate::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
420
421 self.expect(Token::LParen)?;
422 let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
423 loop {
424 let output_name = self.parse_analytics_output_name()?;
425 let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
426 ParseError::new(
427 format!(
428 "unknown analytics output '{output_name}': expected communities, components, or centrality"
429 ),
430 self.position(),
431 )
432 })?;
433 if views.iter().any(|view| view.output == output) {
434 return Err(ParseError::new(
435 format!("duplicate analytics output '{output_name}'"),
436 self.position(),
437 ));
438 }
439 let mut view = AnalyticsViewDescriptor {
440 output,
441 algorithm: None,
442 resolution: None,
443 max_iterations: None,
444 tolerance: None,
445 };
446 if self.consume(&Token::LParen)? {
447 loop {
448 let key = self.parse_analytics_option_key()?;
449 self.expect(Token::Eq)?;
450 match key.as_str() {
451 "using" => {
452 view.algorithm =
453 Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
454 }
455 "resolution" => view.resolution = Some(self.parse_float()?),
456 "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
457 "tolerance" => view.tolerance = Some(self.parse_float()?),
458 other => {
459 return Err(ParseError::new(
460 format!(
461 "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
462 ),
463 self.position(),
464 ))
465 }
466 }
467 if !self.consume(&Token::Comma)? {
468 break;
469 }
470 }
471 self.expect(Token::RParen)?;
472 }
473 views.push(view);
474 if !self.consume(&Token::Comma)? {
475 break;
476 }
477 }
478 self.expect(Token::RParen)?;
479 if views.is_empty() {
480 return Err(ParseError::new(
481 "WITH ANALYTICS requires at least one output".to_string(),
482 self.position(),
483 ));
484 }
485 Ok(views)
486 }
487
488 fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
492 match self.peek() {
493 Token::Components => {
494 self.advance()?;
495 Ok("components".to_string())
496 }
497 Token::Centrality => {
498 self.advance()?;
499 Ok("centrality".to_string())
500 }
501 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
502 }
503 }
504
505 fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
508 match self.peek() {
509 Token::Using => {
510 self.advance()?;
511 Ok("using".to_string())
512 }
513 Token::MaxIterations => {
514 self.advance()?;
515 Ok("max_iterations".to_string())
516 }
517 _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
518 }
519 }
520
521 pub fn parse_create_collection_model_body(
522 &mut self,
523 model: CollectionModel,
524 ) -> Result<QueryExpr, ParseError> {
525 self.parse_create_keyed_body(model)
526 }
527
528 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
529 let if_not_exists = self.match_if_not_exists()?;
530 let name = self.parse_drop_collection_name()?;
531 if !self.consume_ident_ci("KIND")? {
532 return Err(ParseError::expected(
533 vec!["KIND"],
534 self.peek(),
535 self.position(),
536 ));
537 }
538 let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
539 while self.consume(&Token::Dot)? {
540 let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
541 kind.push('.');
542 kind.push_str(&part);
543 }
544 let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
545 if !self.consume_ident_ci("DIM")? {
546 return Err(ParseError::expected(
547 vec!["DIM"],
548 self.peek(),
549 self.position(),
550 ));
551 }
552 let dimension = self.parse_integer()?;
553 if dimension <= 0 {
554 return Err(ParseError::new(
555 "VECTOR DIM must be a positive integer".to_string(),
556 self.position(),
557 ));
558 }
559 let metric = if self.consume(&Token::Metric)? {
560 self.parse_distance_metric()?
561 } else {
562 crate::storage::engine::distance::DistanceMetric::Cosine
563 };
564 (Some(dimension as usize), Some(metric))
565 } else {
566 (None, None)
567 };
568 let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
569 self.parse_signed_by_list()?
570 } else {
571 Vec::new()
572 };
573 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
574 name,
575 kind,
576 if_not_exists,
577 vector_dimension,
578 vector_metric,
579 allowed_signers,
580 }))
581 }
582
583 fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
587 let hex = match self.peek().clone() {
588 Token::String(s) => {
589 self.advance()?;
590 s
591 }
592 _ => {
593 return Err(ParseError::expected(
594 vec!["string literal (ed25519 pubkey hex)"],
595 self.peek(),
596 self.position(),
597 ));
598 }
599 };
600 decode_hex_32(&hex).map_err(|msg| {
601 ParseError::new(
602 format!("SIGNER pubkey '{hex}' invalid: {msg}"),
603 self.position(),
604 )
605 })
606 }
607
608 fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
613 self.expect(Token::LParen)?;
614 let mut out = Vec::new();
615 loop {
616 let hex = match self.peek().clone() {
617 Token::String(s) => {
618 self.advance()?;
619 s
620 }
621 _ => {
622 return Err(ParseError::expected(
623 vec!["string literal (ed25519 pubkey hex)"],
624 self.peek(),
625 self.position(),
626 ));
627 }
628 };
629 let bytes = decode_hex_32(&hex).map_err(|msg| {
630 ParseError::new(
631 format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
632 self.position(),
633 )
634 })?;
635 out.push(bytes);
636 if !self.consume(&Token::Comma)? {
637 break;
638 }
639 }
640 self.expect(Token::RParen)?;
641 if out.is_empty() {
642 return Err(ParseError::new(
643 "SIGNED_BY list must contain at least one pubkey".to_string(),
644 self.position(),
645 ));
646 }
647 Ok(out)
648 }
649
650 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
651 let if_not_exists = self.match_if_not_exists()?;
652 let name = self.parse_drop_collection_name()?;
653 if !self.consume_ident_ci("DIM")? {
654 return Err(ParseError::expected(
655 vec!["DIM"],
656 self.peek(),
657 self.position(),
658 ));
659 }
660 let dimension = self.parse_integer()?;
661 if dimension <= 0 {
662 return Err(ParseError::new(
663 "VECTOR DIM must be a positive integer".to_string(),
664 self.position(),
665 ));
666 }
667 let metric = if self.consume(&Token::Metric)? {
668 self.parse_distance_metric()?
669 } else {
670 crate::storage::engine::distance::DistanceMetric::Cosine
671 };
672 Ok(QueryExpr::CreateVector(CreateVectorQuery {
673 name,
674 dimension: dimension as usize,
675 metric,
676 if_not_exists,
677 }))
678 }
679
680 pub fn parse_drop_keyed_body(
681 &mut self,
682 model: CollectionModel,
683 ) -> Result<QueryExpr, ParseError> {
684 let if_exists = self.match_if_exists()?;
685 let name = self.parse_drop_collection_name()?;
686 Ok(QueryExpr::DropKv(DropKvQuery {
687 name,
688 if_exists,
689 model,
690 }))
691 }
692
693 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
694 self.parse_drop_keyed_body(CollectionModel::Kv)
695 }
696
697 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
698 self.parse_drop_collection_model_body(None)
699 }
700
701 pub fn parse_drop_collection_model_body(
702 &mut self,
703 model: Option<CollectionModel>,
704 ) -> Result<QueryExpr, ParseError> {
705 let if_exists = self.match_if_exists()?;
706 let name = self.parse_drop_collection_name()?;
707 Ok(QueryExpr::DropCollection(DropCollectionQuery {
708 name,
709 if_exists,
710 model,
711 }))
712 }
713
714 pub fn parse_truncate_body(
715 &mut self,
716 model: Option<CollectionModel>,
717 ) -> Result<QueryExpr, ParseError> {
718 let if_exists = self.match_if_exists()?;
719 let name = self.parse_drop_collection_name()?;
720 Ok(QueryExpr::Truncate(TruncateQuery {
721 name,
722 model,
723 if_exists,
724 }))
725 }
726
727 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
728 let mut name = self.expect_ident()?;
729 while self.consume(&Token::Dot)? {
730 if self.consume(&Token::Star)? {
731 name.push_str(".*");
732 break;
733 }
734 let next = self.expect_ident_or_keyword()?;
735 name = format!("{name}.{next}");
736 }
737 Ok(name)
738 }
739
740 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
747 self.expect(Token::Alter)?;
748 if !self.consume(&Token::Table)?
749 && !self.consume(&Token::Collection)?
750 && !self.consume_ident_ci("COLLECTION")?
751 {
752 return Err(ParseError::expected(
753 vec!["TABLE", "COLLECTION"],
754 self.peek(),
755 self.position(),
756 ));
757 }
758 let name = self.expect_ident()?;
759
760 let mut operations = Vec::new();
761 loop {
762 let op = self.parse_alter_operation(&name)?;
763 operations.push(op);
764 if !self.consume(&Token::Comma)? {
765 break;
766 }
767 }
768
769 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
770 }
771
772 pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
781 self.expect(Token::Alter)?;
782 self.expect(Token::Graph)?;
783 let name = self.expect_ident()?;
784
785 let mut operations = Vec::new();
786 loop {
787 operations.push(self.parse_alter_graph_operation()?);
788 if !self.consume(&Token::Comma)? {
789 break;
790 }
791 }
792
793 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
794 }
795
796 fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
799 if self.consume(&Token::Add)? {
800 if !self.consume_ident_ci("ANALYTICS")? {
801 return Err(ParseError::expected(
802 vec!["ANALYTICS"],
803 self.peek(),
804 self.position(),
805 ));
806 }
807 let views = self.parse_analytics_clause()?;
810 Ok(AlterOperation::AddAnalytics(views))
811 } else if self.consume(&Token::Drop)? {
812 if !self.consume_ident_ci("ANALYTICS")? {
813 return Err(ParseError::expected(
814 vec!["ANALYTICS"],
815 self.peek(),
816 self.position(),
817 ));
818 }
819 let output_name = self.parse_analytics_output_name()?;
820 let output = crate::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
821 ParseError::new(
822 format!(
823 "unknown analytics output '{output_name}': expected communities, components, or centrality"
824 ),
825 self.position(),
826 )
827 })?;
828 Ok(AlterOperation::DropAnalytics(output))
829 } else {
830 Err(ParseError::expected(
831 vec!["ADD", "DROP"],
832 self.peek(),
833 self.position(),
834 ))
835 }
836 }
837
838 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
840 if self.consume(&Token::Add)? {
841 if self.consume_ident_ci("SUBSCRIPTION")? {
842 let sub_name = self.expect_ident()?;
844 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
845 Ok(AlterOperation::AddSubscription {
846 name: sub_name,
847 descriptor,
848 })
849 } else if self.consume_ident_ci("SIGNER")? {
850 let pubkey = self.parse_single_signer_hex()?;
852 Ok(AlterOperation::AddSigner { pubkey })
853 } else {
854 let _ = self.consume(&Token::Column)?;
856 let col_def = self.parse_column_def()?;
857 Ok(AlterOperation::AddColumn(col_def))
858 }
859 } else if self.consume_ident_ci("REVOKE")? {
860 if !self.consume_ident_ci("SIGNER")? {
862 return Err(ParseError::expected(
863 vec!["SIGNER"],
864 self.peek(),
865 self.position(),
866 ));
867 }
868 let pubkey = self.parse_single_signer_hex()?;
869 Ok(AlterOperation::RevokeSigner { pubkey })
870 } else if self.consume(&Token::Drop)? {
871 if self.consume_ident_ci("SUBSCRIPTION")? {
872 let sub_name = self.expect_ident()?;
874 Ok(AlterOperation::DropSubscription { name: sub_name })
875 } else {
876 let _ = self.consume(&Token::Column)?;
878 let col_name = self.expect_ident()?;
879 Ok(AlterOperation::DropColumn(col_name))
880 }
881 } else if self.consume(&Token::Rename)? {
882 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
885 self.expect(Token::To)?;
886 let to = self.expect_ident()?;
887 Ok(AlterOperation::RenameColumn { from, to })
888 } else if self.consume(&Token::Attach)? {
889 self.expect(Token::Partition)?;
891 let child = self.expect_ident()?;
892 self.expect(Token::For)?;
893 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
897 return Err(ParseError::expected(
898 vec!["VALUES"],
899 self.peek(),
900 self.position(),
901 ));
902 }
903 let bound = self.collect_remaining_tokens_as_string()?;
904 Ok(AlterOperation::AttachPartition { child, bound })
905 } else if self.consume(&Token::Detach)? {
906 self.expect(Token::Partition)?;
908 let child = self.expect_ident()?;
909 Ok(AlterOperation::DetachPartition { child })
910 } else if self.consume(&Token::Enable)? {
911 if self.consume_ident_ci("EVENTS")? {
913 Ok(AlterOperation::EnableEvents(
914 self.parse_subscription_descriptor(table_name.to_string())?,
915 ))
916 } else if self.consume_ident_ci("TENANCY")? {
917 self.expect(Token::On)?;
918 self.expect(Token::LParen)?;
919 let mut path = self.expect_ident_or_keyword()?;
921 while self.consume(&Token::Dot)? {
922 let next = self.expect_ident_or_keyword()?;
923 path = format!("{path}.{next}");
924 }
925 self.expect(Token::RParen)?;
926 Ok(AlterOperation::EnableTenancy { column: path })
927 } else {
928 self.expect(Token::Row)?;
929 self.expect(Token::Level)?;
930 self.expect(Token::Security)?;
931 Ok(AlterOperation::EnableRowLevelSecurity)
932 }
933 } else if self.consume(&Token::Disable)? {
934 if self.consume_ident_ci("EVENTS")? {
936 Ok(AlterOperation::DisableEvents)
937 } else if self.consume_ident_ci("TENANCY")? {
938 Ok(AlterOperation::DisableTenancy)
939 } else {
940 self.expect(Token::Row)?;
941 self.expect(Token::Level)?;
942 self.expect(Token::Security)?;
943 Ok(AlterOperation::DisableRowLevelSecurity)
944 }
945 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
946 if self.consume_ident_ci("APPEND_ONLY")? {
949 let on = self.parse_bool_assign()?;
950 Ok(AlterOperation::SetAppendOnly(on))
951 } else if self.consume_ident_ci("VERSIONED")? {
952 let on = self.parse_bool_assign()?;
953 Ok(AlterOperation::SetVersioned(on))
954 } else if self.consume(&Token::Retention)? {
955 let value = self.parse_float()?;
959 let unit = self.parse_duration_unit()?;
960 Ok(AlterOperation::SetRetention {
961 duration_ms: (value * unit) as u64,
962 })
963 } else {
964 Err(ParseError::expected(
965 vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
966 self.peek(),
967 self.position(),
968 ))
969 }
970 } else if self.consume_ident_ci("UNSET")? {
971 if self.consume(&Token::Retention)? {
973 Ok(AlterOperation::UnsetRetention)
974 } else {
975 Err(ParseError::expected(
976 vec!["RETENTION"],
977 self.peek(),
978 self.position(),
979 ))
980 }
981 } else {
982 Err(ParseError::expected(
983 vec![
984 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
985 "UNSET",
986 ],
987 self.peek(),
988 self.position(),
989 ))
990 }
991 }
992
993 fn parse_subscription_descriptor(
994 &mut self,
995 source: String,
996 ) -> Result<SubscriptionDescriptor, ParseError> {
997 let mut ops_filter = Vec::new();
998 if self.consume(&Token::LParen)? {
999 loop {
1000 let op = if self.consume(&Token::Insert)? {
1001 SubscriptionOperation::Insert
1002 } else if self.consume(&Token::Update)? {
1003 SubscriptionOperation::Update
1004 } else if self.consume(&Token::Delete)? {
1005 SubscriptionOperation::Delete
1006 } else {
1007 return Err(ParseError::expected(
1008 vec!["INSERT", "UPDATE", "DELETE"],
1009 self.peek(),
1010 self.position(),
1011 ));
1012 };
1013 ops_filter.push(op);
1014 if !self.consume(&Token::Comma)? {
1015 break;
1016 }
1017 }
1018 self.expect(Token::RParen)?;
1019 }
1020
1021 let target_queue = if self.consume(&Token::To)? {
1022 self.expect_ident()?
1023 } else {
1024 format!("{source}_events")
1025 };
1026
1027 let mut redact_fields = Vec::new();
1028 if self.consume_ident_ci("REDACT")? {
1029 self.expect(Token::LParen)?;
1030 loop {
1031 redact_fields.push(self.parse_dotted_redact_path()?);
1032 if !self.consume(&Token::Comma)? {
1033 break;
1034 }
1035 }
1036 self.expect(Token::RParen)?;
1037 }
1038
1039 let where_filter = if self.consume(&Token::Where)? {
1040 Some(self.collect_subscription_where_filter()?)
1041 } else {
1042 None
1043 };
1044
1045 let all_tenants = if self.consume(&Token::On)? {
1047 self.expect(Token::All)?;
1048 if !self.consume_ident_ci("TENANTS")? {
1049 return Err(ParseError::expected(
1050 vec!["TENANTS"],
1051 self.peek(),
1052 self.position(),
1053 ));
1054 }
1055 true
1056 } else {
1057 false
1058 };
1059
1060 if self.consume_ident_ci("REQUIRES")? {
1062 self.consume_ident_ci("CAPABILITY")?;
1063 self.advance()?;
1065 }
1066
1067 Ok(SubscriptionDescriptor {
1068 name: String::new(),
1069 source,
1070 target_queue,
1071 ops_filter,
1072 where_filter,
1073 redact_fields,
1074 enabled: true,
1075 all_tenants,
1076 })
1077 }
1078
1079 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1081 let mut parts = Vec::new();
1082 if self.consume(&Token::Star)? {
1083 parts.push("*".to_string());
1084 } else {
1085 parts.push(self.expect_ident_or_keyword()?);
1086 }
1087 while self.consume(&Token::Dot)? {
1088 if self.consume(&Token::Star)? {
1089 parts.push("*".to_string());
1090 } else {
1091 parts.push(self.expect_ident_or_keyword()?);
1092 }
1093 }
1094 Ok(parts.join("."))
1095 }
1096
1097 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1098 let mut parts = Vec::new();
1099 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1100 parts.push(self.peek().to_string());
1101 self.advance()?;
1102 }
1103 if parts.is_empty() {
1104 return Err(ParseError::expected(
1105 vec!["predicate"],
1106 self.peek(),
1107 self.position(),
1108 ));
1109 }
1110 Ok(parts.join(" "))
1111 }
1112
1113 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1118 let mut parts: Vec<String> = Vec::new();
1119 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1120 parts.push(self.peek().to_string());
1121 self.advance()?;
1122 }
1123 Ok(parts.join(" "))
1124 }
1125
1126 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1128 let name = self.expect_column_ident()?;
1129 let sql_type = self.parse_column_type()?;
1130 let data_type = sql_type.to_string();
1131
1132 let mut def = CreateColumnDef {
1133 name,
1134 data_type,
1135 sql_type: sql_type.clone(),
1136 not_null: false,
1137 default: None,
1138 compress: None,
1139 unique: false,
1140 primary_key: false,
1141 enum_variants: sql_type.enum_variants().unwrap_or_default(),
1142 array_element: sql_type.array_element_type(),
1143 decimal_precision: sql_type.decimal_precision(),
1144 };
1145
1146 loop {
1148 if self.match_not_null()? {
1149 def.not_null = true;
1150 } else if self.consume(&Token::Default)? {
1151 self.expect(Token::Eq)?;
1152 def.default = Some(self.parse_literal_string_for_ddl()?);
1153 } else if self.consume(&Token::Compress)? {
1154 self.expect(Token::Colon)?;
1155 def.compress = Some(self.parse_integer()? as u8);
1156 } else if self.consume(&Token::Unique)? {
1157 def.unique = true;
1158 } else if self.match_primary_key()? {
1159 def.primary_key = true;
1160 } else {
1161 break;
1162 }
1163 }
1164
1165 Ok(def)
1166 }
1167
1168 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1170 let type_name = self.expect_ident_or_keyword()?;
1171 if self.consume(&Token::LParen)? {
1172 let inner = self.parse_type_params()?;
1173 self.expect(Token::RParen)?;
1174 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1175 } else {
1176 Ok(SqlTypeName::new(type_name))
1177 }
1178 }
1179
1180 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1182 let mut parts = Vec::new();
1183 loop {
1184 match self.peek().clone() {
1185 Token::String(s) => {
1186 let s = s.clone();
1187 self.advance()?;
1188 parts.push(TypeModifier::StringLiteral(s));
1189 }
1190 Token::Integer(n) => {
1191 self.advance()?;
1192 parts.push(TypeModifier::Number(n as u32));
1193 }
1194 _ => {
1195 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1196 }
1197 }
1198 if !self.consume(&Token::Comma)? {
1199 break;
1200 }
1201 }
1202 Ok(parts)
1203 }
1204
1205 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1207 match self.peek().clone() {
1208 Token::String(s) => {
1209 let s = s.clone();
1210 self.advance()?;
1211 Ok(s)
1212 }
1213 Token::Integer(n) => {
1214 self.advance()?;
1215 Ok(n.to_string())
1216 }
1217 Token::Float(n) => {
1218 self.advance()?;
1219 Ok(n.to_string())
1220 }
1221 Token::True => {
1222 self.advance()?;
1223 Ok("true".to_string())
1224 }
1225 Token::False => {
1226 self.advance()?;
1227 Ok("false".to_string())
1228 }
1229 Token::Null => {
1230 self.advance()?;
1231 Ok("null".to_string())
1232 }
1233 ref other => Err(ParseError::expected(
1234 vec!["string", "number", "true", "false", "null"],
1235 other,
1236 self.position(),
1237 )),
1238 }
1239 }
1240
1241 fn check_ttl_keyword(&self) -> bool {
1242 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1243 }
1244
1245 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1248 self.expect(Token::Eq)?;
1249 match self.peek() {
1250 Token::True => {
1251 self.advance()?;
1252 Ok(true)
1253 }
1254 Token::False => {
1255 self.advance()?;
1256 Ok(false)
1257 }
1258 other => Err(ParseError::expected(
1259 vec!["true", "false"],
1260 other,
1261 self.position(),
1262 )),
1263 }
1264 }
1265
1266 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1267 if self.consume_ident_ci(expected)? {
1268 Ok(())
1269 } else {
1270 Err(ParseError::expected(
1271 vec![expected],
1272 self.peek(),
1273 self.position(),
1274 ))
1275 }
1276 }
1277
1278 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1279 let option_name = self.expect_ident_or_keyword()?;
1280 if !option_name.eq_ignore_ascii_case("ttl") {
1281 return Err(ParseError::new(
1282 format!(
1286 "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1287 ),
1288 self.position(),
1289 ));
1290 }
1291
1292 let ttl_value = self.parse_float()?;
1293 let ttl_unit = match self.peek() {
1294 Token::Ident(unit) => {
1295 let unit = unit.clone();
1296 self.advance()?;
1297 unit
1298 }
1299 _ => "s".to_string(),
1300 };
1301
1302 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1303 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1304 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1305 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1306 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1307 "d" | "day" | "days" => 86_400_000.0,
1308 other => {
1309 return Err(ParseError::new(
1310 format!(
1314 "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1315 ),
1316 self.position(),
1317 ));
1318 }
1319 };
1320
1321 if !ttl_value.is_finite() || ttl_value < 0.0 {
1322 return Err(ParseError::new(
1323 "TTL must be a finite, non-negative duration".to_string(),
1324 self.position(),
1325 ));
1326 }
1327
1328 let ttl_ms = ttl_value * multiplier_ms;
1329 if ttl_ms > u64::MAX as f64 {
1330 return Err(ParseError::new(
1331 "TTL duration is too large".to_string(),
1332 self.position(),
1333 ));
1334 }
1335 if ttl_ms.fract().abs() >= f64::EPSILON {
1336 return Err(ParseError::new(
1337 "TTL duration must resolve to a whole number of milliseconds".to_string(),
1338 self.position(),
1339 ));
1340 }
1341
1342 Ok(Some(ttl_ms as u64))
1343 }
1344
1345 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1347 if self.check(&Token::If) {
1348 self.advance()?;
1349 self.expect(Token::Not)?;
1350 self.expect(Token::Exists)?;
1351 Ok(true)
1352 } else {
1353 Ok(false)
1354 }
1355 }
1356
1357 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1359 if self.check(&Token::If) {
1360 self.advance()?;
1361 self.expect(Token::Exists)?;
1362 Ok(true)
1363 } else {
1364 Ok(false)
1365 }
1366 }
1367
1368 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1370 if self.check(&Token::Not) {
1371 self.advance()?; if self.check(&Token::Null) {
1375 self.advance()?; Ok(true)
1377 } else {
1378 Err(ParseError::expected(
1382 vec!["NULL (after NOT)"],
1383 self.peek(),
1384 self.position(),
1385 ))
1386 }
1387 } else {
1388 Ok(false)
1389 }
1390 }
1391
1392 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1394 if self.check(&Token::Primary) {
1395 self.advance()?;
1396 self.expect(Token::Key)?;
1397 Ok(true)
1398 } else {
1399 Ok(false)
1400 }
1401 }
1402}
1403
1404fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1409 if s.len() != 64 {
1410 return Err(format!("expected 64 hex chars, got {}", s.len()));
1411 }
1412 let mut out = [0u8; 32];
1413 let bytes = s.as_bytes();
1414 for i in 0..32 {
1415 let hi = hex_nibble(bytes[i * 2])?;
1416 let lo = hex_nibble(bytes[i * 2 + 1])?;
1417 out[i] = (hi << 4) | lo;
1418 }
1419 Ok(out)
1420}
1421
1422fn hex_nibble(c: u8) -> Result<u8, String> {
1423 match c {
1424 b'0'..=b'9' => Ok(c - b'0'),
1425 b'a'..=b'f' => Ok(c - b'a' + 10),
1426 b'A'..=b'F' => Ok(c - b'A' + 10),
1427 _ => Err(format!("non-hex char: {:?}", c as char)),
1428 }
1429}