1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7 PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 vault_own_master_key: false,
87 }))
88 }
89
90 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92 self.expect(Token::Drop)?;
93 self.expect(Token::Table)?;
94 self.parse_drop_table_body()
95 }
96
97 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99 let if_not_exists = self.match_if_not_exists()?;
100 let name = self.expect_ident()?;
101
102 self.expect(Token::LParen)?;
103 let mut columns = Vec::new();
104 loop {
105 let col = self.parse_column_def()?;
106 columns.push(col);
107 if !self.consume(&Token::Comma)? {
108 break;
109 }
110 }
111 self.expect(Token::RParen)?;
112
113 let mut default_ttl_ms = None;
114 let mut context_index_fields = Vec::new();
115 let mut context_index_enabled = false;
116 let mut timestamps = false;
117 let mut tenant_by: Option<String> = None;
118 let mut append_only = false;
119 let mut subscriptions = Vec::new();
120
121 while self.consume(&Token::With)? {
122 if self.consume_ident_ci("EVENTS")? {
123 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124 continue;
125 }
126 let has_parens = self.consume(&Token::LParen)?;
133
134 loop {
135 if self.consume_ident_ci("CONTEXT_INDEX")? {
136 context_index_enabled = self.parse_bool_assign()?;
137 } else if self.consume_ident_ci("CONTEXT")? {
138 if !self.consume(&Token::Index)? {
139 return Err(ParseError::expected(
140 vec!["INDEX"],
141 self.peek(),
142 self.position(),
143 ));
144 }
145 self.expect(Token::On)?;
146 self.expect(Token::LParen)?;
147 loop {
148 context_index_fields.push(self.expect_ident()?);
149 if !self.consume(&Token::Comma)? {
150 break;
151 }
152 }
153 self.expect(Token::RParen)?;
154 context_index_enabled = true;
155 } else if self.consume_ident_ci("TIMESTAMPS")? {
156 timestamps = self.parse_bool_assign()?;
157 } else if self.consume_ident_ci("APPEND_ONLY")? {
158 append_only = self.parse_bool_assign()?;
159 } else if self.consume_ident_ci("TENANT_BY")? {
160 let _ = self.consume(&Token::Eq)?;
163 let value = self.parse_literal_value()?;
164 match value {
165 Value::Text(col) => tenant_by = Some(col.to_string()),
166 other => {
167 return Err(ParseError::new(
168 format!("WITH tenant_by expects a text literal, got {other:?}"),
169 self.position(),
170 ));
171 }
172 }
173 } else {
174 default_ttl_ms = self.parse_create_table_ttl_clause()?;
175 }
176 if has_parens {
177 if self.consume(&Token::Comma)? {
178 continue;
179 }
180 self.expect(Token::RParen)?;
181 }
182 break;
183 }
184 }
185
186 let partition_by = if self.consume(&Token::Partition)? {
188 self.expect(Token::By)?;
189 let kind = if self.consume(&Token::Range)? {
190 PartitionKind::Range
191 } else if self.consume(&Token::List)? {
192 PartitionKind::List
193 } else if self.consume(&Token::Hash)? {
194 PartitionKind::Hash
195 } else {
196 return Err(ParseError::expected(
197 vec!["RANGE", "LIST", "HASH"],
198 self.peek(),
199 self.position(),
200 ));
201 };
202 self.expect(Token::LParen)?;
203 let column = self.expect_ident()?;
204 self.expect(Token::RParen)?;
205 Some(PartitionSpec { kind, column })
206 } else {
207 None
208 };
209
210 if !append_only && self.consume_ident_ci("APPEND")? {
215 if !self.consume_ident_ci("ONLY")? {
216 return Err(ParseError::expected(
217 vec!["ONLY"],
218 self.peek(),
219 self.position(),
220 ));
221 }
222 append_only = true;
223 }
224
225 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236 self.expect(Token::By)?;
237 self.expect(Token::LParen)?;
238 let mut path = self.expect_ident_or_keyword()?;
242 while self.consume(&Token::Dot)? {
243 let next = self.expect_ident_or_keyword()?;
244 path = format!("{path}.{next}");
245 }
246 self.expect(Token::RParen)?;
247 tenant_by = Some(path);
248 }
249
250 Ok(QueryExpr::CreateTable(CreateTableQuery {
251 collection_model: CollectionModel::Table,
252 name,
253 columns,
254 if_not_exists,
255 default_ttl_ms,
256 metrics_rollup_policies: Vec::new(),
257 context_index_fields,
258 context_index_enabled,
259 timestamps,
260 partition_by,
261 tenant_by,
262 append_only,
263 subscriptions,
264 vault_own_master_key: false,
265 }))
266 }
267
268 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274 self.expect(Token::Explain)?;
275 self.expect(Token::Alter)?;
276 self.expect(Token::For)?;
277 self.expect(Token::Create)?;
278 self.expect(Token::Table)?;
279
280 let body = self.parse_create_table_body()?;
281 let target = match body {
282 QueryExpr::CreateTable(t) => t,
283 _ => {
284 return Err(ParseError::new(
285 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286 .to_string(),
287 self.position(),
288 ));
289 }
290 };
291
292 let format = if self.consume(&Token::Format)? {
293 if self.consume(&Token::Json)? {
294 ExplainFormat::Json
295 } else if self.consume_ident_ci("SQL")? {
296 ExplainFormat::Sql
297 } else {
298 return Err(ParseError::expected(
299 vec!["JSON", "SQL"],
300 self.peek(),
301 self.position(),
302 ));
303 }
304 } else {
305 ExplainFormat::Sql
306 };
307
308 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309 target,
310 format,
311 }))
312 }
313
314 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316 let if_exists = self.match_if_exists()?;
317 let name = self.parse_drop_collection_name()?;
318 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319 }
320
321 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322 let if_exists = self.match_if_exists()?;
323 let name = self.parse_drop_collection_name()?;
324 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325 }
326
327 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328 let if_exists = self.match_if_exists()?;
329 let name = self.parse_drop_collection_name()?;
330 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331 }
332
333 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334 let if_exists = self.match_if_exists()?;
335 let name = self.parse_drop_collection_name()?;
336 Ok(QueryExpr::DropDocument(DropDocumentQuery {
337 name,
338 if_exists,
339 }))
340 }
341
342 pub fn parse_create_keyed_body(
343 &mut self,
344 model: CollectionModel,
345 ) -> Result<QueryExpr, ParseError> {
346 let if_not_exists = self.match_if_not_exists()?;
347 let name = self.parse_drop_collection_name()?;
348 let vault_own_master_key =
349 if model == CollectionModel::Vault && self.consume(&Token::With)? {
350 if !self.consume_ident_ci("OWN")? {
351 return Err(ParseError::expected(
352 vec!["OWN"],
353 self.peek(),
354 self.position(),
355 ));
356 }
357 if !self.consume_ident_ci("MASTER")? {
358 return Err(ParseError::expected(
359 vec!["MASTER"],
360 self.peek(),
361 self.position(),
362 ));
363 }
364 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365 return Err(ParseError::expected(
366 vec!["KEY"],
367 self.peek(),
368 self.position(),
369 ));
370 }
371 true
372 } else {
373 false
374 };
375 Ok(QueryExpr::CreateTable(CreateTableQuery {
376 collection_model: model,
377 name,
378 columns: Vec::new(),
379 if_not_exists,
380 default_ttl_ms: None,
381 metrics_rollup_policies: Vec::new(),
382 context_index_fields: Vec::new(),
383 context_index_enabled: false,
384 timestamps: false,
385 partition_by: None,
386 tenant_by: None,
387 append_only: false,
388 subscriptions: Vec::new(),
389 vault_own_master_key,
390 }))
391 }
392
393 pub fn parse_create_collection_model_body(
394 &mut self,
395 model: CollectionModel,
396 ) -> Result<QueryExpr, ParseError> {
397 self.parse_create_keyed_body(model)
398 }
399
400 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401 let if_not_exists = self.match_if_not_exists()?;
402 let name = self.parse_drop_collection_name()?;
403 if !self.consume_ident_ci("KIND")? {
404 return Err(ParseError::expected(
405 vec!["KIND"],
406 self.peek(),
407 self.position(),
408 ));
409 }
410 let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411 let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
412 self.parse_signed_by_list()?
413 } else {
414 Vec::new()
415 };
416 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
417 name,
418 kind,
419 if_not_exists,
420 allowed_signers,
421 }))
422 }
423
424 fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
428 let hex = match self.peek().clone() {
429 Token::String(s) => {
430 self.advance()?;
431 s
432 }
433 _ => {
434 return Err(ParseError::expected(
435 vec!["string literal (ed25519 pubkey hex)"],
436 self.peek(),
437 self.position(),
438 ));
439 }
440 };
441 decode_hex_32(&hex).map_err(|msg| {
442 ParseError::new(
443 format!("SIGNER pubkey '{hex}' invalid: {msg}"),
444 self.position(),
445 )
446 })
447 }
448
449 fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
454 self.expect(Token::LParen)?;
455 let mut out = Vec::new();
456 loop {
457 let hex = match self.peek().clone() {
458 Token::String(s) => {
459 self.advance()?;
460 s
461 }
462 _ => {
463 return Err(ParseError::expected(
464 vec!["string literal (ed25519 pubkey hex)"],
465 self.peek(),
466 self.position(),
467 ));
468 }
469 };
470 let bytes = decode_hex_32(&hex).map_err(|msg| {
471 ParseError::new(
472 format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
473 self.position(),
474 )
475 })?;
476 out.push(bytes);
477 if !self.consume(&Token::Comma)? {
478 break;
479 }
480 }
481 self.expect(Token::RParen)?;
482 if out.is_empty() {
483 return Err(ParseError::new(
484 "SIGNED_BY list must contain at least one pubkey".to_string(),
485 self.position(),
486 ));
487 }
488 Ok(out)
489 }
490
491 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
492 let if_not_exists = self.match_if_not_exists()?;
493 let name = self.parse_drop_collection_name()?;
494 if !self.consume_ident_ci("DIM")? {
495 return Err(ParseError::expected(
496 vec!["DIM"],
497 self.peek(),
498 self.position(),
499 ));
500 }
501 let dimension = self.parse_integer()?;
502 if dimension <= 0 {
503 return Err(ParseError::new(
504 "VECTOR DIM must be a positive integer".to_string(),
505 self.position(),
506 ));
507 }
508 let metric = if self.consume(&Token::Metric)? {
509 self.parse_distance_metric()?
510 } else {
511 crate::storage::engine::distance::DistanceMetric::Cosine
512 };
513 Ok(QueryExpr::CreateVector(CreateVectorQuery {
514 name,
515 dimension: dimension as usize,
516 metric,
517 if_not_exists,
518 }))
519 }
520
521 pub fn parse_drop_keyed_body(
522 &mut self,
523 model: CollectionModel,
524 ) -> Result<QueryExpr, ParseError> {
525 let if_exists = self.match_if_exists()?;
526 let name = self.parse_drop_collection_name()?;
527 Ok(QueryExpr::DropKv(DropKvQuery {
528 name,
529 if_exists,
530 model,
531 }))
532 }
533
534 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
535 self.parse_drop_keyed_body(CollectionModel::Kv)
536 }
537
538 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
539 self.parse_drop_collection_model_body(None)
540 }
541
542 pub fn parse_drop_collection_model_body(
543 &mut self,
544 model: Option<CollectionModel>,
545 ) -> Result<QueryExpr, ParseError> {
546 let if_exists = self.match_if_exists()?;
547 let name = self.parse_drop_collection_name()?;
548 Ok(QueryExpr::DropCollection(DropCollectionQuery {
549 name,
550 if_exists,
551 model,
552 }))
553 }
554
555 pub fn parse_truncate_body(
556 &mut self,
557 model: Option<CollectionModel>,
558 ) -> Result<QueryExpr, ParseError> {
559 let if_exists = self.match_if_exists()?;
560 let name = self.parse_drop_collection_name()?;
561 Ok(QueryExpr::Truncate(TruncateQuery {
562 name,
563 model,
564 if_exists,
565 }))
566 }
567
568 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
569 let mut name = self.expect_ident()?;
570 while self.consume(&Token::Dot)? {
571 if self.consume(&Token::Star)? {
572 name.push_str(".*");
573 break;
574 }
575 let next = self.expect_ident_or_keyword()?;
576 name = format!("{name}.{next}");
577 }
578 Ok(name)
579 }
580
581 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
588 self.expect(Token::Alter)?;
589 if !self.consume(&Token::Table)?
590 && !self.consume(&Token::Collection)?
591 && !self.consume_ident_ci("COLLECTION")?
592 {
593 return Err(ParseError::expected(
594 vec!["TABLE", "COLLECTION"],
595 self.peek(),
596 self.position(),
597 ));
598 }
599 let name = self.expect_ident()?;
600
601 let mut operations = Vec::new();
602 loop {
603 let op = self.parse_alter_operation(&name)?;
604 operations.push(op);
605 if !self.consume(&Token::Comma)? {
606 break;
607 }
608 }
609
610 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
611 }
612
613 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
615 if self.consume(&Token::Add)? {
616 if self.consume_ident_ci("SUBSCRIPTION")? {
617 let sub_name = self.expect_ident()?;
619 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
620 Ok(AlterOperation::AddSubscription {
621 name: sub_name,
622 descriptor,
623 })
624 } else if self.consume_ident_ci("SIGNER")? {
625 let pubkey = self.parse_single_signer_hex()?;
627 Ok(AlterOperation::AddSigner { pubkey })
628 } else {
629 let _ = self.consume(&Token::Column)?;
631 let col_def = self.parse_column_def()?;
632 Ok(AlterOperation::AddColumn(col_def))
633 }
634 } else if self.consume_ident_ci("REVOKE")? {
635 if !self.consume_ident_ci("SIGNER")? {
637 return Err(ParseError::expected(
638 vec!["SIGNER"],
639 self.peek(),
640 self.position(),
641 ));
642 }
643 let pubkey = self.parse_single_signer_hex()?;
644 Ok(AlterOperation::RevokeSigner { pubkey })
645 } else if self.consume(&Token::Drop)? {
646 if self.consume_ident_ci("SUBSCRIPTION")? {
647 let sub_name = self.expect_ident()?;
649 Ok(AlterOperation::DropSubscription { name: sub_name })
650 } else {
651 let _ = self.consume(&Token::Column)?;
653 let col_name = self.expect_ident()?;
654 Ok(AlterOperation::DropColumn(col_name))
655 }
656 } else if self.consume(&Token::Rename)? {
657 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
660 self.expect(Token::To)?;
661 let to = self.expect_ident()?;
662 Ok(AlterOperation::RenameColumn { from, to })
663 } else if self.consume(&Token::Attach)? {
664 self.expect(Token::Partition)?;
666 let child = self.expect_ident()?;
667 self.expect(Token::For)?;
668 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
672 return Err(ParseError::expected(
673 vec!["VALUES"],
674 self.peek(),
675 self.position(),
676 ));
677 }
678 let bound = self.collect_remaining_tokens_as_string()?;
679 Ok(AlterOperation::AttachPartition { child, bound })
680 } else if self.consume(&Token::Detach)? {
681 self.expect(Token::Partition)?;
683 let child = self.expect_ident()?;
684 Ok(AlterOperation::DetachPartition { child })
685 } else if self.consume(&Token::Enable)? {
686 if self.consume_ident_ci("EVENTS")? {
688 Ok(AlterOperation::EnableEvents(
689 self.parse_subscription_descriptor(table_name.to_string())?,
690 ))
691 } else if self.consume_ident_ci("TENANCY")? {
692 self.expect(Token::On)?;
693 self.expect(Token::LParen)?;
694 let mut path = self.expect_ident_or_keyword()?;
696 while self.consume(&Token::Dot)? {
697 let next = self.expect_ident_or_keyword()?;
698 path = format!("{path}.{next}");
699 }
700 self.expect(Token::RParen)?;
701 Ok(AlterOperation::EnableTenancy { column: path })
702 } else {
703 self.expect(Token::Row)?;
704 self.expect(Token::Level)?;
705 self.expect(Token::Security)?;
706 Ok(AlterOperation::EnableRowLevelSecurity)
707 }
708 } else if self.consume(&Token::Disable)? {
709 if self.consume_ident_ci("EVENTS")? {
711 Ok(AlterOperation::DisableEvents)
712 } else if self.consume_ident_ci("TENANCY")? {
713 Ok(AlterOperation::DisableTenancy)
714 } else {
715 self.expect(Token::Row)?;
716 self.expect(Token::Level)?;
717 self.expect(Token::Security)?;
718 Ok(AlterOperation::DisableRowLevelSecurity)
719 }
720 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
721 if self.consume_ident_ci("APPEND_ONLY")? {
724 let on = self.parse_bool_assign()?;
725 Ok(AlterOperation::SetAppendOnly(on))
726 } else if self.consume_ident_ci("VERSIONED")? {
727 let on = self.parse_bool_assign()?;
728 Ok(AlterOperation::SetVersioned(on))
729 } else if self.consume(&Token::Retention)? {
730 let value = self.parse_float()?;
734 let unit = self.parse_duration_unit()?;
735 Ok(AlterOperation::SetRetention {
736 duration_ms: (value * unit) as u64,
737 })
738 } else {
739 Err(ParseError::expected(
740 vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
741 self.peek(),
742 self.position(),
743 ))
744 }
745 } else if self.consume_ident_ci("UNSET")? {
746 if self.consume(&Token::Retention)? {
748 Ok(AlterOperation::UnsetRetention)
749 } else {
750 Err(ParseError::expected(
751 vec!["RETENTION"],
752 self.peek(),
753 self.position(),
754 ))
755 }
756 } else {
757 Err(ParseError::expected(
758 vec![
759 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
760 "UNSET",
761 ],
762 self.peek(),
763 self.position(),
764 ))
765 }
766 }
767
768 fn parse_subscription_descriptor(
769 &mut self,
770 source: String,
771 ) -> Result<SubscriptionDescriptor, ParseError> {
772 let mut ops_filter = Vec::new();
773 if self.consume(&Token::LParen)? {
774 loop {
775 let op = if self.consume(&Token::Insert)? {
776 SubscriptionOperation::Insert
777 } else if self.consume(&Token::Update)? {
778 SubscriptionOperation::Update
779 } else if self.consume(&Token::Delete)? {
780 SubscriptionOperation::Delete
781 } else {
782 return Err(ParseError::expected(
783 vec!["INSERT", "UPDATE", "DELETE"],
784 self.peek(),
785 self.position(),
786 ));
787 };
788 ops_filter.push(op);
789 if !self.consume(&Token::Comma)? {
790 break;
791 }
792 }
793 self.expect(Token::RParen)?;
794 }
795
796 let target_queue = if self.consume(&Token::To)? {
797 self.expect_ident()?
798 } else {
799 format!("{source}_events")
800 };
801
802 let mut redact_fields = Vec::new();
803 if self.consume_ident_ci("REDACT")? {
804 self.expect(Token::LParen)?;
805 loop {
806 redact_fields.push(self.parse_dotted_redact_path()?);
807 if !self.consume(&Token::Comma)? {
808 break;
809 }
810 }
811 self.expect(Token::RParen)?;
812 }
813
814 let where_filter = if self.consume(&Token::Where)? {
815 Some(self.collect_subscription_where_filter()?)
816 } else {
817 None
818 };
819
820 let all_tenants = if self.consume(&Token::On)? {
822 self.expect(Token::All)?;
823 if !self.consume_ident_ci("TENANTS")? {
824 return Err(ParseError::expected(
825 vec!["TENANTS"],
826 self.peek(),
827 self.position(),
828 ));
829 }
830 true
831 } else {
832 false
833 };
834
835 if self.consume_ident_ci("REQUIRES")? {
837 self.consume_ident_ci("CAPABILITY")?;
838 self.advance()?;
840 }
841
842 Ok(SubscriptionDescriptor {
843 name: String::new(),
844 source,
845 target_queue,
846 ops_filter,
847 where_filter,
848 redact_fields,
849 enabled: true,
850 all_tenants,
851 })
852 }
853
854 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
856 let mut parts = Vec::new();
857 if self.consume(&Token::Star)? {
858 parts.push("*".to_string());
859 } else {
860 parts.push(self.expect_ident_or_keyword()?);
861 }
862 while self.consume(&Token::Dot)? {
863 if self.consume(&Token::Star)? {
864 parts.push("*".to_string());
865 } else {
866 parts.push(self.expect_ident_or_keyword()?);
867 }
868 }
869 Ok(parts.join("."))
870 }
871
872 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
873 let mut parts = Vec::new();
874 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
875 parts.push(self.peek().to_string());
876 self.advance()?;
877 }
878 if parts.is_empty() {
879 return Err(ParseError::expected(
880 vec!["predicate"],
881 self.peek(),
882 self.position(),
883 ));
884 }
885 Ok(parts.join(" "))
886 }
887
888 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
893 let mut parts: Vec<String> = Vec::new();
894 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
895 parts.push(self.peek().to_string());
896 self.advance()?;
897 }
898 Ok(parts.join(" "))
899 }
900
901 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
903 let name = self.expect_column_ident()?;
904 let sql_type = self.parse_column_type()?;
905 let data_type = sql_type.to_string();
906
907 let mut def = CreateColumnDef {
908 name,
909 data_type,
910 sql_type: sql_type.clone(),
911 not_null: false,
912 default: None,
913 compress: None,
914 unique: false,
915 primary_key: false,
916 enum_variants: sql_type.enum_variants().unwrap_or_default(),
917 array_element: sql_type.array_element_type(),
918 decimal_precision: sql_type.decimal_precision(),
919 };
920
921 loop {
923 if self.match_not_null()? {
924 def.not_null = true;
925 } else if self.consume(&Token::Default)? {
926 self.expect(Token::Eq)?;
927 def.default = Some(self.parse_literal_string_for_ddl()?);
928 } else if self.consume(&Token::Compress)? {
929 self.expect(Token::Colon)?;
930 def.compress = Some(self.parse_integer()? as u8);
931 } else if self.consume(&Token::Unique)? {
932 def.unique = true;
933 } else if self.match_primary_key()? {
934 def.primary_key = true;
935 } else {
936 break;
937 }
938 }
939
940 Ok(def)
941 }
942
943 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
945 let type_name = self.expect_ident_or_keyword()?;
946 if self.consume(&Token::LParen)? {
947 let inner = self.parse_type_params()?;
948 self.expect(Token::RParen)?;
949 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
950 } else {
951 Ok(SqlTypeName::new(type_name))
952 }
953 }
954
955 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
957 let mut parts = Vec::new();
958 loop {
959 match self.peek().clone() {
960 Token::String(s) => {
961 let s = s.clone();
962 self.advance()?;
963 parts.push(TypeModifier::StringLiteral(s));
964 }
965 Token::Integer(n) => {
966 self.advance()?;
967 parts.push(TypeModifier::Number(n as u32));
968 }
969 _ => {
970 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
971 }
972 }
973 if !self.consume(&Token::Comma)? {
974 break;
975 }
976 }
977 Ok(parts)
978 }
979
980 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
982 match self.peek().clone() {
983 Token::String(s) => {
984 let s = s.clone();
985 self.advance()?;
986 Ok(s)
987 }
988 Token::Integer(n) => {
989 self.advance()?;
990 Ok(n.to_string())
991 }
992 Token::Float(n) => {
993 self.advance()?;
994 Ok(n.to_string())
995 }
996 Token::True => {
997 self.advance()?;
998 Ok("true".to_string())
999 }
1000 Token::False => {
1001 self.advance()?;
1002 Ok("false".to_string())
1003 }
1004 Token::Null => {
1005 self.advance()?;
1006 Ok("null".to_string())
1007 }
1008 ref other => Err(ParseError::expected(
1009 vec!["string", "number", "true", "false", "null"],
1010 other,
1011 self.position(),
1012 )),
1013 }
1014 }
1015
1016 fn check_ttl_keyword(&self) -> bool {
1017 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1018 }
1019
1020 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1023 self.expect(Token::Eq)?;
1024 match self.peek() {
1025 Token::True => {
1026 self.advance()?;
1027 Ok(true)
1028 }
1029 Token::False => {
1030 self.advance()?;
1031 Ok(false)
1032 }
1033 other => Err(ParseError::expected(
1034 vec!["true", "false"],
1035 other,
1036 self.position(),
1037 )),
1038 }
1039 }
1040
1041 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1042 if self.consume_ident_ci(expected)? {
1043 Ok(())
1044 } else {
1045 Err(ParseError::expected(
1046 vec![expected],
1047 self.peek(),
1048 self.position(),
1049 ))
1050 }
1051 }
1052
1053 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1054 let option_name = self.expect_ident_or_keyword()?;
1055 if !option_name.eq_ignore_ascii_case("ttl") {
1056 return Err(ParseError::new(
1057 format!(
1061 "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1062 ),
1063 self.position(),
1064 ));
1065 }
1066
1067 let ttl_value = self.parse_float()?;
1068 let ttl_unit = match self.peek() {
1069 Token::Ident(unit) => {
1070 let unit = unit.clone();
1071 self.advance()?;
1072 unit
1073 }
1074 _ => "s".to_string(),
1075 };
1076
1077 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1078 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1079 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1080 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1081 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1082 "d" | "day" | "days" => 86_400_000.0,
1083 other => {
1084 return Err(ParseError::new(
1085 format!(
1089 "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1090 ),
1091 self.position(),
1092 ));
1093 }
1094 };
1095
1096 if !ttl_value.is_finite() || ttl_value < 0.0 {
1097 return Err(ParseError::new(
1098 "TTL must be a finite, non-negative duration".to_string(),
1099 self.position(),
1100 ));
1101 }
1102
1103 let ttl_ms = ttl_value * multiplier_ms;
1104 if ttl_ms > u64::MAX as f64 {
1105 return Err(ParseError::new(
1106 "TTL duration is too large".to_string(),
1107 self.position(),
1108 ));
1109 }
1110 if ttl_ms.fract().abs() >= f64::EPSILON {
1111 return Err(ParseError::new(
1112 "TTL duration must resolve to a whole number of milliseconds".to_string(),
1113 self.position(),
1114 ));
1115 }
1116
1117 Ok(Some(ttl_ms as u64))
1118 }
1119
1120 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1122 if self.check(&Token::If) {
1123 self.advance()?;
1124 self.expect(Token::Not)?;
1125 self.expect(Token::Exists)?;
1126 Ok(true)
1127 } else {
1128 Ok(false)
1129 }
1130 }
1131
1132 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1134 if self.check(&Token::If) {
1135 self.advance()?;
1136 self.expect(Token::Exists)?;
1137 Ok(true)
1138 } else {
1139 Ok(false)
1140 }
1141 }
1142
1143 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1145 if self.check(&Token::Not) {
1146 self.advance()?; if self.check(&Token::Null) {
1150 self.advance()?; Ok(true)
1152 } else {
1153 Err(ParseError::expected(
1157 vec!["NULL (after NOT)"],
1158 self.peek(),
1159 self.position(),
1160 ))
1161 }
1162 } else {
1163 Ok(false)
1164 }
1165 }
1166
1167 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1169 if self.check(&Token::Primary) {
1170 self.advance()?;
1171 self.expect(Token::Key)?;
1172 Ok(true)
1173 } else {
1174 Ok(false)
1175 }
1176 }
1177}
1178
1179fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1184 if s.len() != 64 {
1185 return Err(format!("expected 64 hex chars, got {}", s.len()));
1186 }
1187 let mut out = [0u8; 32];
1188 let bytes = s.as_bytes();
1189 for i in 0..32 {
1190 let hi = hex_nibble(bytes[i * 2])?;
1191 let lo = hex_nibble(bytes[i * 2 + 1])?;
1192 out[i] = (hi << 4) | lo;
1193 }
1194 Ok(out)
1195}
1196
1197fn hex_nibble(c: u8) -> Result<u8, String> {
1198 match c {
1199 b'0'..=b'9' => Ok(c - b'0'),
1200 b'a'..=b'f' => Ok(c - b'a' + 10),
1201 b'A'..=b'F' => Ok(c - b'A' + 10),
1202 _ => Err(format!("non-hex char: {:?}", c as char)),
1203 }
1204}