1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7 PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 vault_own_master_key: false,
87 }))
88 }
89
90 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92 self.expect(Token::Drop)?;
93 self.expect(Token::Table)?;
94 self.parse_drop_table_body()
95 }
96
97 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99 let if_not_exists = self.match_if_not_exists()?;
100 let name = self.expect_ident()?;
101
102 self.expect(Token::LParen)?;
103 let mut columns = Vec::new();
104 loop {
105 let col = self.parse_column_def()?;
106 columns.push(col);
107 if !self.consume(&Token::Comma)? {
108 break;
109 }
110 }
111 self.expect(Token::RParen)?;
112
113 let mut default_ttl_ms = None;
114 let mut context_index_fields = Vec::new();
115 let mut context_index_enabled = false;
116 let mut timestamps = false;
117 let mut tenant_by: Option<String> = None;
118 let mut append_only = false;
119 let mut subscriptions = Vec::new();
120
121 while self.consume(&Token::With)? {
122 if self.consume_ident_ci("EVENTS")? {
123 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124 continue;
125 }
126 let has_parens = self.consume(&Token::LParen)?;
133
134 loop {
135 if self.consume_ident_ci("CONTEXT_INDEX")? {
136 context_index_enabled = self.parse_bool_assign()?;
137 } else if self.consume_ident_ci("CONTEXT")? {
138 if !self.consume(&Token::Index)? {
139 return Err(ParseError::expected(
140 vec!["INDEX"],
141 self.peek(),
142 self.position(),
143 ));
144 }
145 self.expect(Token::On)?;
146 self.expect(Token::LParen)?;
147 loop {
148 context_index_fields.push(self.expect_ident()?);
149 if !self.consume(&Token::Comma)? {
150 break;
151 }
152 }
153 self.expect(Token::RParen)?;
154 context_index_enabled = true;
155 } else if self.consume_ident_ci("TIMESTAMPS")? {
156 timestamps = self.parse_bool_assign()?;
157 } else if self.consume_ident_ci("APPEND_ONLY")? {
158 append_only = self.parse_bool_assign()?;
159 } else if self.consume_ident_ci("TENANT_BY")? {
160 let _ = self.consume(&Token::Eq)?;
163 let value = self.parse_literal_value()?;
164 match value {
165 Value::Text(col) => tenant_by = Some(col.to_string()),
166 other => {
167 return Err(ParseError::new(
168 format!("WITH tenant_by expects a text literal, got {other:?}"),
169 self.position(),
170 ));
171 }
172 }
173 } else {
174 default_ttl_ms = self.parse_create_table_ttl_clause()?;
175 }
176 if has_parens {
177 if self.consume(&Token::Comma)? {
178 continue;
179 }
180 self.expect(Token::RParen)?;
181 }
182 break;
183 }
184 }
185
186 let partition_by = if self.consume(&Token::Partition)? {
188 self.expect(Token::By)?;
189 let kind = if self.consume(&Token::Range)? {
190 PartitionKind::Range
191 } else if self.consume(&Token::List)? {
192 PartitionKind::List
193 } else if self.consume(&Token::Hash)? {
194 PartitionKind::Hash
195 } else {
196 return Err(ParseError::expected(
197 vec!["RANGE", "LIST", "HASH"],
198 self.peek(),
199 self.position(),
200 ));
201 };
202 self.expect(Token::LParen)?;
203 let column = self.expect_ident()?;
204 self.expect(Token::RParen)?;
205 Some(PartitionSpec { kind, column })
206 } else {
207 None
208 };
209
210 if !append_only && self.consume_ident_ci("APPEND")? {
215 if !self.consume_ident_ci("ONLY")? {
216 return Err(ParseError::expected(
217 vec!["ONLY"],
218 self.peek(),
219 self.position(),
220 ));
221 }
222 append_only = true;
223 }
224
225 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236 self.expect(Token::By)?;
237 self.expect(Token::LParen)?;
238 let mut path = self.expect_ident_or_keyword()?;
242 while self.consume(&Token::Dot)? {
243 let next = self.expect_ident_or_keyword()?;
244 path = format!("{path}.{next}");
245 }
246 self.expect(Token::RParen)?;
247 tenant_by = Some(path);
248 }
249
250 Ok(QueryExpr::CreateTable(CreateTableQuery {
251 collection_model: CollectionModel::Table,
252 name,
253 columns,
254 if_not_exists,
255 default_ttl_ms,
256 metrics_rollup_policies: Vec::new(),
257 context_index_fields,
258 context_index_enabled,
259 timestamps,
260 partition_by,
261 tenant_by,
262 append_only,
263 subscriptions,
264 vault_own_master_key: false,
265 }))
266 }
267
268 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274 self.expect(Token::Explain)?;
275 self.expect(Token::Alter)?;
276 self.expect(Token::For)?;
277 self.expect(Token::Create)?;
278 self.expect(Token::Table)?;
279
280 let body = self.parse_create_table_body()?;
281 let target = match body {
282 QueryExpr::CreateTable(t) => t,
283 _ => {
284 return Err(ParseError::new(
285 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286 .to_string(),
287 self.position(),
288 ));
289 }
290 };
291
292 let format = if self.consume(&Token::Format)? {
293 if self.consume(&Token::Json)? {
294 ExplainFormat::Json
295 } else if self.consume_ident_ci("SQL")? {
296 ExplainFormat::Sql
297 } else {
298 return Err(ParseError::expected(
299 vec!["JSON", "SQL"],
300 self.peek(),
301 self.position(),
302 ));
303 }
304 } else {
305 ExplainFormat::Sql
306 };
307
308 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309 target,
310 format,
311 }))
312 }
313
314 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316 let if_exists = self.match_if_exists()?;
317 let name = self.parse_drop_collection_name()?;
318 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319 }
320
321 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322 let if_exists = self.match_if_exists()?;
323 let name = self.parse_drop_collection_name()?;
324 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325 }
326
327 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328 let if_exists = self.match_if_exists()?;
329 let name = self.parse_drop_collection_name()?;
330 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331 }
332
333 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334 let if_exists = self.match_if_exists()?;
335 let name = self.parse_drop_collection_name()?;
336 Ok(QueryExpr::DropDocument(DropDocumentQuery {
337 name,
338 if_exists,
339 }))
340 }
341
342 pub fn parse_create_keyed_body(
343 &mut self,
344 model: CollectionModel,
345 ) -> Result<QueryExpr, ParseError> {
346 let if_not_exists = self.match_if_not_exists()?;
347 let name = self.parse_drop_collection_name()?;
348 let vault_own_master_key =
349 if model == CollectionModel::Vault && self.consume(&Token::With)? {
350 if !self.consume_ident_ci("OWN")? {
351 return Err(ParseError::expected(
352 vec!["OWN"],
353 self.peek(),
354 self.position(),
355 ));
356 }
357 if !self.consume_ident_ci("MASTER")? {
358 return Err(ParseError::expected(
359 vec!["MASTER"],
360 self.peek(),
361 self.position(),
362 ));
363 }
364 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365 return Err(ParseError::expected(
366 vec!["KEY"],
367 self.peek(),
368 self.position(),
369 ));
370 }
371 true
372 } else {
373 false
374 };
375 Ok(QueryExpr::CreateTable(CreateTableQuery {
376 collection_model: model,
377 name,
378 columns: Vec::new(),
379 if_not_exists,
380 default_ttl_ms: None,
381 metrics_rollup_policies: Vec::new(),
382 context_index_fields: Vec::new(),
383 context_index_enabled: false,
384 timestamps: false,
385 partition_by: None,
386 tenant_by: None,
387 append_only: false,
388 subscriptions: Vec::new(),
389 vault_own_master_key,
390 }))
391 }
392
393 pub fn parse_create_collection_model_body(
394 &mut self,
395 model: CollectionModel,
396 ) -> Result<QueryExpr, ParseError> {
397 self.parse_create_keyed_body(model)
398 }
399
400 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401 let if_not_exists = self.match_if_not_exists()?;
402 let name = self.parse_drop_collection_name()?;
403 if !self.consume_ident_ci("KIND")? {
404 return Err(ParseError::expected(
405 vec!["KIND"],
406 self.peek(),
407 self.position(),
408 ));
409 }
410 let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411 while self.consume(&Token::Dot)? {
412 let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
413 kind.push('.');
414 kind.push_str(&part);
415 }
416 let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
417 if !self.consume_ident_ci("DIM")? {
418 return Err(ParseError::expected(
419 vec!["DIM"],
420 self.peek(),
421 self.position(),
422 ));
423 }
424 let dimension = self.parse_integer()?;
425 if dimension <= 0 {
426 return Err(ParseError::new(
427 "VECTOR DIM must be a positive integer".to_string(),
428 self.position(),
429 ));
430 }
431 let metric = if self.consume(&Token::Metric)? {
432 self.parse_distance_metric()?
433 } else {
434 crate::storage::engine::distance::DistanceMetric::Cosine
435 };
436 (Some(dimension as usize), Some(metric))
437 } else {
438 (None, None)
439 };
440 let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
441 self.parse_signed_by_list()?
442 } else {
443 Vec::new()
444 };
445 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
446 name,
447 kind,
448 if_not_exists,
449 vector_dimension,
450 vector_metric,
451 allowed_signers,
452 }))
453 }
454
455 fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
459 let hex = match self.peek().clone() {
460 Token::String(s) => {
461 self.advance()?;
462 s
463 }
464 _ => {
465 return Err(ParseError::expected(
466 vec!["string literal (ed25519 pubkey hex)"],
467 self.peek(),
468 self.position(),
469 ));
470 }
471 };
472 decode_hex_32(&hex).map_err(|msg| {
473 ParseError::new(
474 format!("SIGNER pubkey '{hex}' invalid: {msg}"),
475 self.position(),
476 )
477 })
478 }
479
480 fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
485 self.expect(Token::LParen)?;
486 let mut out = Vec::new();
487 loop {
488 let hex = match self.peek().clone() {
489 Token::String(s) => {
490 self.advance()?;
491 s
492 }
493 _ => {
494 return Err(ParseError::expected(
495 vec!["string literal (ed25519 pubkey hex)"],
496 self.peek(),
497 self.position(),
498 ));
499 }
500 };
501 let bytes = decode_hex_32(&hex).map_err(|msg| {
502 ParseError::new(
503 format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
504 self.position(),
505 )
506 })?;
507 out.push(bytes);
508 if !self.consume(&Token::Comma)? {
509 break;
510 }
511 }
512 self.expect(Token::RParen)?;
513 if out.is_empty() {
514 return Err(ParseError::new(
515 "SIGNED_BY list must contain at least one pubkey".to_string(),
516 self.position(),
517 ));
518 }
519 Ok(out)
520 }
521
522 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
523 let if_not_exists = self.match_if_not_exists()?;
524 let name = self.parse_drop_collection_name()?;
525 if !self.consume_ident_ci("DIM")? {
526 return Err(ParseError::expected(
527 vec!["DIM"],
528 self.peek(),
529 self.position(),
530 ));
531 }
532 let dimension = self.parse_integer()?;
533 if dimension <= 0 {
534 return Err(ParseError::new(
535 "VECTOR DIM must be a positive integer".to_string(),
536 self.position(),
537 ));
538 }
539 let metric = if self.consume(&Token::Metric)? {
540 self.parse_distance_metric()?
541 } else {
542 crate::storage::engine::distance::DistanceMetric::Cosine
543 };
544 Ok(QueryExpr::CreateVector(CreateVectorQuery {
545 name,
546 dimension: dimension as usize,
547 metric,
548 if_not_exists,
549 }))
550 }
551
552 pub fn parse_drop_keyed_body(
553 &mut self,
554 model: CollectionModel,
555 ) -> Result<QueryExpr, ParseError> {
556 let if_exists = self.match_if_exists()?;
557 let name = self.parse_drop_collection_name()?;
558 Ok(QueryExpr::DropKv(DropKvQuery {
559 name,
560 if_exists,
561 model,
562 }))
563 }
564
565 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
566 self.parse_drop_keyed_body(CollectionModel::Kv)
567 }
568
569 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
570 self.parse_drop_collection_model_body(None)
571 }
572
573 pub fn parse_drop_collection_model_body(
574 &mut self,
575 model: Option<CollectionModel>,
576 ) -> Result<QueryExpr, ParseError> {
577 let if_exists = self.match_if_exists()?;
578 let name = self.parse_drop_collection_name()?;
579 Ok(QueryExpr::DropCollection(DropCollectionQuery {
580 name,
581 if_exists,
582 model,
583 }))
584 }
585
586 pub fn parse_truncate_body(
587 &mut self,
588 model: Option<CollectionModel>,
589 ) -> Result<QueryExpr, ParseError> {
590 let if_exists = self.match_if_exists()?;
591 let name = self.parse_drop_collection_name()?;
592 Ok(QueryExpr::Truncate(TruncateQuery {
593 name,
594 model,
595 if_exists,
596 }))
597 }
598
599 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
600 let mut name = self.expect_ident()?;
601 while self.consume(&Token::Dot)? {
602 if self.consume(&Token::Star)? {
603 name.push_str(".*");
604 break;
605 }
606 let next = self.expect_ident_or_keyword()?;
607 name = format!("{name}.{next}");
608 }
609 Ok(name)
610 }
611
612 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
619 self.expect(Token::Alter)?;
620 if !self.consume(&Token::Table)?
621 && !self.consume(&Token::Collection)?
622 && !self.consume_ident_ci("COLLECTION")?
623 {
624 return Err(ParseError::expected(
625 vec!["TABLE", "COLLECTION"],
626 self.peek(),
627 self.position(),
628 ));
629 }
630 let name = self.expect_ident()?;
631
632 let mut operations = Vec::new();
633 loop {
634 let op = self.parse_alter_operation(&name)?;
635 operations.push(op);
636 if !self.consume(&Token::Comma)? {
637 break;
638 }
639 }
640
641 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
642 }
643
644 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
646 if self.consume(&Token::Add)? {
647 if self.consume_ident_ci("SUBSCRIPTION")? {
648 let sub_name = self.expect_ident()?;
650 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
651 Ok(AlterOperation::AddSubscription {
652 name: sub_name,
653 descriptor,
654 })
655 } else if self.consume_ident_ci("SIGNER")? {
656 let pubkey = self.parse_single_signer_hex()?;
658 Ok(AlterOperation::AddSigner { pubkey })
659 } else {
660 let _ = self.consume(&Token::Column)?;
662 let col_def = self.parse_column_def()?;
663 Ok(AlterOperation::AddColumn(col_def))
664 }
665 } else if self.consume_ident_ci("REVOKE")? {
666 if !self.consume_ident_ci("SIGNER")? {
668 return Err(ParseError::expected(
669 vec!["SIGNER"],
670 self.peek(),
671 self.position(),
672 ));
673 }
674 let pubkey = self.parse_single_signer_hex()?;
675 Ok(AlterOperation::RevokeSigner { pubkey })
676 } else if self.consume(&Token::Drop)? {
677 if self.consume_ident_ci("SUBSCRIPTION")? {
678 let sub_name = self.expect_ident()?;
680 Ok(AlterOperation::DropSubscription { name: sub_name })
681 } else {
682 let _ = self.consume(&Token::Column)?;
684 let col_name = self.expect_ident()?;
685 Ok(AlterOperation::DropColumn(col_name))
686 }
687 } else if self.consume(&Token::Rename)? {
688 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
691 self.expect(Token::To)?;
692 let to = self.expect_ident()?;
693 Ok(AlterOperation::RenameColumn { from, to })
694 } else if self.consume(&Token::Attach)? {
695 self.expect(Token::Partition)?;
697 let child = self.expect_ident()?;
698 self.expect(Token::For)?;
699 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
703 return Err(ParseError::expected(
704 vec!["VALUES"],
705 self.peek(),
706 self.position(),
707 ));
708 }
709 let bound = self.collect_remaining_tokens_as_string()?;
710 Ok(AlterOperation::AttachPartition { child, bound })
711 } else if self.consume(&Token::Detach)? {
712 self.expect(Token::Partition)?;
714 let child = self.expect_ident()?;
715 Ok(AlterOperation::DetachPartition { child })
716 } else if self.consume(&Token::Enable)? {
717 if self.consume_ident_ci("EVENTS")? {
719 Ok(AlterOperation::EnableEvents(
720 self.parse_subscription_descriptor(table_name.to_string())?,
721 ))
722 } else if self.consume_ident_ci("TENANCY")? {
723 self.expect(Token::On)?;
724 self.expect(Token::LParen)?;
725 let mut path = self.expect_ident_or_keyword()?;
727 while self.consume(&Token::Dot)? {
728 let next = self.expect_ident_or_keyword()?;
729 path = format!("{path}.{next}");
730 }
731 self.expect(Token::RParen)?;
732 Ok(AlterOperation::EnableTenancy { column: path })
733 } else {
734 self.expect(Token::Row)?;
735 self.expect(Token::Level)?;
736 self.expect(Token::Security)?;
737 Ok(AlterOperation::EnableRowLevelSecurity)
738 }
739 } else if self.consume(&Token::Disable)? {
740 if self.consume_ident_ci("EVENTS")? {
742 Ok(AlterOperation::DisableEvents)
743 } else if self.consume_ident_ci("TENANCY")? {
744 Ok(AlterOperation::DisableTenancy)
745 } else {
746 self.expect(Token::Row)?;
747 self.expect(Token::Level)?;
748 self.expect(Token::Security)?;
749 Ok(AlterOperation::DisableRowLevelSecurity)
750 }
751 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
752 if self.consume_ident_ci("APPEND_ONLY")? {
755 let on = self.parse_bool_assign()?;
756 Ok(AlterOperation::SetAppendOnly(on))
757 } else if self.consume_ident_ci("VERSIONED")? {
758 let on = self.parse_bool_assign()?;
759 Ok(AlterOperation::SetVersioned(on))
760 } else if self.consume(&Token::Retention)? {
761 let value = self.parse_float()?;
765 let unit = self.parse_duration_unit()?;
766 Ok(AlterOperation::SetRetention {
767 duration_ms: (value * unit) as u64,
768 })
769 } else {
770 Err(ParseError::expected(
771 vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
772 self.peek(),
773 self.position(),
774 ))
775 }
776 } else if self.consume_ident_ci("UNSET")? {
777 if self.consume(&Token::Retention)? {
779 Ok(AlterOperation::UnsetRetention)
780 } else {
781 Err(ParseError::expected(
782 vec!["RETENTION"],
783 self.peek(),
784 self.position(),
785 ))
786 }
787 } else {
788 Err(ParseError::expected(
789 vec![
790 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
791 "UNSET",
792 ],
793 self.peek(),
794 self.position(),
795 ))
796 }
797 }
798
799 fn parse_subscription_descriptor(
800 &mut self,
801 source: String,
802 ) -> Result<SubscriptionDescriptor, ParseError> {
803 let mut ops_filter = Vec::new();
804 if self.consume(&Token::LParen)? {
805 loop {
806 let op = if self.consume(&Token::Insert)? {
807 SubscriptionOperation::Insert
808 } else if self.consume(&Token::Update)? {
809 SubscriptionOperation::Update
810 } else if self.consume(&Token::Delete)? {
811 SubscriptionOperation::Delete
812 } else {
813 return Err(ParseError::expected(
814 vec!["INSERT", "UPDATE", "DELETE"],
815 self.peek(),
816 self.position(),
817 ));
818 };
819 ops_filter.push(op);
820 if !self.consume(&Token::Comma)? {
821 break;
822 }
823 }
824 self.expect(Token::RParen)?;
825 }
826
827 let target_queue = if self.consume(&Token::To)? {
828 self.expect_ident()?
829 } else {
830 format!("{source}_events")
831 };
832
833 let mut redact_fields = Vec::new();
834 if self.consume_ident_ci("REDACT")? {
835 self.expect(Token::LParen)?;
836 loop {
837 redact_fields.push(self.parse_dotted_redact_path()?);
838 if !self.consume(&Token::Comma)? {
839 break;
840 }
841 }
842 self.expect(Token::RParen)?;
843 }
844
845 let where_filter = if self.consume(&Token::Where)? {
846 Some(self.collect_subscription_where_filter()?)
847 } else {
848 None
849 };
850
851 let all_tenants = if self.consume(&Token::On)? {
853 self.expect(Token::All)?;
854 if !self.consume_ident_ci("TENANTS")? {
855 return Err(ParseError::expected(
856 vec!["TENANTS"],
857 self.peek(),
858 self.position(),
859 ));
860 }
861 true
862 } else {
863 false
864 };
865
866 if self.consume_ident_ci("REQUIRES")? {
868 self.consume_ident_ci("CAPABILITY")?;
869 self.advance()?;
871 }
872
873 Ok(SubscriptionDescriptor {
874 name: String::new(),
875 source,
876 target_queue,
877 ops_filter,
878 where_filter,
879 redact_fields,
880 enabled: true,
881 all_tenants,
882 })
883 }
884
885 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
887 let mut parts = Vec::new();
888 if self.consume(&Token::Star)? {
889 parts.push("*".to_string());
890 } else {
891 parts.push(self.expect_ident_or_keyword()?);
892 }
893 while self.consume(&Token::Dot)? {
894 if self.consume(&Token::Star)? {
895 parts.push("*".to_string());
896 } else {
897 parts.push(self.expect_ident_or_keyword()?);
898 }
899 }
900 Ok(parts.join("."))
901 }
902
903 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
904 let mut parts = Vec::new();
905 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
906 parts.push(self.peek().to_string());
907 self.advance()?;
908 }
909 if parts.is_empty() {
910 return Err(ParseError::expected(
911 vec!["predicate"],
912 self.peek(),
913 self.position(),
914 ));
915 }
916 Ok(parts.join(" "))
917 }
918
919 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
924 let mut parts: Vec<String> = Vec::new();
925 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
926 parts.push(self.peek().to_string());
927 self.advance()?;
928 }
929 Ok(parts.join(" "))
930 }
931
932 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
934 let name = self.expect_column_ident()?;
935 let sql_type = self.parse_column_type()?;
936 let data_type = sql_type.to_string();
937
938 let mut def = CreateColumnDef {
939 name,
940 data_type,
941 sql_type: sql_type.clone(),
942 not_null: false,
943 default: None,
944 compress: None,
945 unique: false,
946 primary_key: false,
947 enum_variants: sql_type.enum_variants().unwrap_or_default(),
948 array_element: sql_type.array_element_type(),
949 decimal_precision: sql_type.decimal_precision(),
950 };
951
952 loop {
954 if self.match_not_null()? {
955 def.not_null = true;
956 } else if self.consume(&Token::Default)? {
957 self.expect(Token::Eq)?;
958 def.default = Some(self.parse_literal_string_for_ddl()?);
959 } else if self.consume(&Token::Compress)? {
960 self.expect(Token::Colon)?;
961 def.compress = Some(self.parse_integer()? as u8);
962 } else if self.consume(&Token::Unique)? {
963 def.unique = true;
964 } else if self.match_primary_key()? {
965 def.primary_key = true;
966 } else {
967 break;
968 }
969 }
970
971 Ok(def)
972 }
973
974 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
976 let type_name = self.expect_ident_or_keyword()?;
977 if self.consume(&Token::LParen)? {
978 let inner = self.parse_type_params()?;
979 self.expect(Token::RParen)?;
980 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
981 } else {
982 Ok(SqlTypeName::new(type_name))
983 }
984 }
985
986 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
988 let mut parts = Vec::new();
989 loop {
990 match self.peek().clone() {
991 Token::String(s) => {
992 let s = s.clone();
993 self.advance()?;
994 parts.push(TypeModifier::StringLiteral(s));
995 }
996 Token::Integer(n) => {
997 self.advance()?;
998 parts.push(TypeModifier::Number(n as u32));
999 }
1000 _ => {
1001 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1002 }
1003 }
1004 if !self.consume(&Token::Comma)? {
1005 break;
1006 }
1007 }
1008 Ok(parts)
1009 }
1010
1011 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1013 match self.peek().clone() {
1014 Token::String(s) => {
1015 let s = s.clone();
1016 self.advance()?;
1017 Ok(s)
1018 }
1019 Token::Integer(n) => {
1020 self.advance()?;
1021 Ok(n.to_string())
1022 }
1023 Token::Float(n) => {
1024 self.advance()?;
1025 Ok(n.to_string())
1026 }
1027 Token::True => {
1028 self.advance()?;
1029 Ok("true".to_string())
1030 }
1031 Token::False => {
1032 self.advance()?;
1033 Ok("false".to_string())
1034 }
1035 Token::Null => {
1036 self.advance()?;
1037 Ok("null".to_string())
1038 }
1039 ref other => Err(ParseError::expected(
1040 vec!["string", "number", "true", "false", "null"],
1041 other,
1042 self.position(),
1043 )),
1044 }
1045 }
1046
1047 fn check_ttl_keyword(&self) -> bool {
1048 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1049 }
1050
1051 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1054 self.expect(Token::Eq)?;
1055 match self.peek() {
1056 Token::True => {
1057 self.advance()?;
1058 Ok(true)
1059 }
1060 Token::False => {
1061 self.advance()?;
1062 Ok(false)
1063 }
1064 other => Err(ParseError::expected(
1065 vec!["true", "false"],
1066 other,
1067 self.position(),
1068 )),
1069 }
1070 }
1071
1072 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1073 if self.consume_ident_ci(expected)? {
1074 Ok(())
1075 } else {
1076 Err(ParseError::expected(
1077 vec![expected],
1078 self.peek(),
1079 self.position(),
1080 ))
1081 }
1082 }
1083
1084 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1085 let option_name = self.expect_ident_or_keyword()?;
1086 if !option_name.eq_ignore_ascii_case("ttl") {
1087 return Err(ParseError::new(
1088 format!(
1092 "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1093 ),
1094 self.position(),
1095 ));
1096 }
1097
1098 let ttl_value = self.parse_float()?;
1099 let ttl_unit = match self.peek() {
1100 Token::Ident(unit) => {
1101 let unit = unit.clone();
1102 self.advance()?;
1103 unit
1104 }
1105 _ => "s".to_string(),
1106 };
1107
1108 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1109 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1110 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1111 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1112 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1113 "d" | "day" | "days" => 86_400_000.0,
1114 other => {
1115 return Err(ParseError::new(
1116 format!(
1120 "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1121 ),
1122 self.position(),
1123 ));
1124 }
1125 };
1126
1127 if !ttl_value.is_finite() || ttl_value < 0.0 {
1128 return Err(ParseError::new(
1129 "TTL must be a finite, non-negative duration".to_string(),
1130 self.position(),
1131 ));
1132 }
1133
1134 let ttl_ms = ttl_value * multiplier_ms;
1135 if ttl_ms > u64::MAX as f64 {
1136 return Err(ParseError::new(
1137 "TTL duration is too large".to_string(),
1138 self.position(),
1139 ));
1140 }
1141 if ttl_ms.fract().abs() >= f64::EPSILON {
1142 return Err(ParseError::new(
1143 "TTL duration must resolve to a whole number of milliseconds".to_string(),
1144 self.position(),
1145 ));
1146 }
1147
1148 Ok(Some(ttl_ms as u64))
1149 }
1150
1151 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1153 if self.check(&Token::If) {
1154 self.advance()?;
1155 self.expect(Token::Not)?;
1156 self.expect(Token::Exists)?;
1157 Ok(true)
1158 } else {
1159 Ok(false)
1160 }
1161 }
1162
1163 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1165 if self.check(&Token::If) {
1166 self.advance()?;
1167 self.expect(Token::Exists)?;
1168 Ok(true)
1169 } else {
1170 Ok(false)
1171 }
1172 }
1173
1174 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1176 if self.check(&Token::Not) {
1177 self.advance()?; if self.check(&Token::Null) {
1181 self.advance()?; Ok(true)
1183 } else {
1184 Err(ParseError::expected(
1188 vec!["NULL (after NOT)"],
1189 self.peek(),
1190 self.position(),
1191 ))
1192 }
1193 } else {
1194 Ok(false)
1195 }
1196 }
1197
1198 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1200 if self.check(&Token::Primary) {
1201 self.advance()?;
1202 self.expect(Token::Key)?;
1203 Ok(true)
1204 } else {
1205 Ok(false)
1206 }
1207 }
1208}
1209
1210fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1215 if s.len() != 64 {
1216 return Err(format!("expected 64 hex chars, got {}", s.len()));
1217 }
1218 let mut out = [0u8; 32];
1219 let bytes = s.as_bytes();
1220 for i in 0..32 {
1221 let hi = hex_nibble(bytes[i * 2])?;
1222 let lo = hex_nibble(bytes[i * 2 + 1])?;
1223 out[i] = (hi << 4) | lo;
1224 }
1225 Ok(out)
1226}
1227
1228fn hex_nibble(c: u8) -> Result<u8, String> {
1229 match c {
1230 b'0'..=b'9' => Ok(c - b'0'),
1231 b'a'..=b'f' => Ok(c - b'a' + 10),
1232 b'A'..=b'F' => Ok(c - b'A' + 10),
1233 _ => Err(format!("non-hex char: {:?}", c as char)),
1234 }
1235}