1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7 PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 context_index_fields,
79 context_index_enabled,
80 timestamps,
81 partition_by: None,
82 tenant_by: None,
83 append_only: false,
84 subscriptions,
85 vault_own_master_key: false,
86 }))
87 }
88
89 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
91 self.expect(Token::Drop)?;
92 self.expect(Token::Table)?;
93 self.parse_drop_table_body()
94 }
95
96 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
98 let if_not_exists = self.match_if_not_exists()?;
99 let name = self.expect_ident()?;
100
101 self.expect(Token::LParen)?;
102 let mut columns = Vec::new();
103 loop {
104 let col = self.parse_column_def()?;
105 columns.push(col);
106 if !self.consume(&Token::Comma)? {
107 break;
108 }
109 }
110 self.expect(Token::RParen)?;
111
112 let mut default_ttl_ms = None;
113 let mut context_index_fields = Vec::new();
114 let mut context_index_enabled = false;
115 let mut timestamps = false;
116 let mut tenant_by: Option<String> = None;
117 let mut append_only = false;
118 let mut subscriptions = Vec::new();
119
120 while self.consume(&Token::With)? {
121 if self.consume_ident_ci("EVENTS")? {
122 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
123 continue;
124 }
125 let has_parens = self.consume(&Token::LParen)?;
132
133 loop {
134 if self.consume_ident_ci("CONTEXT_INDEX")? {
135 context_index_enabled = self.parse_bool_assign()?;
136 } else if self.consume_ident_ci("CONTEXT")? {
137 if !self.consume(&Token::Index)? {
138 return Err(ParseError::expected(
139 vec!["INDEX"],
140 self.peek(),
141 self.position(),
142 ));
143 }
144 self.expect(Token::On)?;
145 self.expect(Token::LParen)?;
146 loop {
147 context_index_fields.push(self.expect_ident()?);
148 if !self.consume(&Token::Comma)? {
149 break;
150 }
151 }
152 self.expect(Token::RParen)?;
153 context_index_enabled = true;
154 } else if self.consume_ident_ci("TIMESTAMPS")? {
155 timestamps = self.parse_bool_assign()?;
156 } else if self.consume_ident_ci("APPEND_ONLY")? {
157 append_only = self.parse_bool_assign()?;
158 } else if self.consume_ident_ci("TENANT_BY")? {
159 let _ = self.consume(&Token::Eq)?;
162 let value = self.parse_literal_value()?;
163 match value {
164 Value::Text(col) => tenant_by = Some(col.to_string()),
165 other => {
166 return Err(ParseError::new(
167 format!("WITH tenant_by expects a text literal, got {other:?}"),
168 self.position(),
169 ));
170 }
171 }
172 } else {
173 default_ttl_ms = self.parse_create_table_ttl_clause()?;
174 }
175 if has_parens {
176 if self.consume(&Token::Comma)? {
177 continue;
178 }
179 self.expect(Token::RParen)?;
180 }
181 break;
182 }
183 }
184
185 let partition_by = if self.consume(&Token::Partition)? {
187 self.expect(Token::By)?;
188 let kind = if self.consume(&Token::Range)? {
189 PartitionKind::Range
190 } else if self.consume(&Token::List)? {
191 PartitionKind::List
192 } else if self.consume(&Token::Hash)? {
193 PartitionKind::Hash
194 } else {
195 return Err(ParseError::expected(
196 vec!["RANGE", "LIST", "HASH"],
197 self.peek(),
198 self.position(),
199 ));
200 };
201 self.expect(Token::LParen)?;
202 let column = self.expect_ident()?;
203 self.expect(Token::RParen)?;
204 Some(PartitionSpec { kind, column })
205 } else {
206 None
207 };
208
209 if !append_only && self.consume_ident_ci("APPEND")? {
214 if !self.consume_ident_ci("ONLY")? {
215 return Err(ParseError::expected(
216 vec!["ONLY"],
217 self.peek(),
218 self.position(),
219 ));
220 }
221 append_only = true;
222 }
223
224 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
235 self.expect(Token::By)?;
236 self.expect(Token::LParen)?;
237 let mut path = self.expect_ident_or_keyword()?;
241 while self.consume(&Token::Dot)? {
242 let next = self.expect_ident_or_keyword()?;
243 path = format!("{path}.{next}");
244 }
245 self.expect(Token::RParen)?;
246 tenant_by = Some(path);
247 }
248
249 Ok(QueryExpr::CreateTable(CreateTableQuery {
250 collection_model: CollectionModel::Table,
251 name,
252 columns,
253 if_not_exists,
254 default_ttl_ms,
255 context_index_fields,
256 context_index_enabled,
257 timestamps,
258 partition_by,
259 tenant_by,
260 append_only,
261 subscriptions,
262 vault_own_master_key: false,
263 }))
264 }
265
266 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
272 self.expect(Token::Explain)?;
273 self.expect(Token::Alter)?;
274 self.expect(Token::For)?;
275 self.expect(Token::Create)?;
276 self.expect(Token::Table)?;
277
278 let body = self.parse_create_table_body()?;
279 let target = match body {
280 QueryExpr::CreateTable(t) => t,
281 _ => {
282 return Err(ParseError::new(
283 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
284 .to_string(),
285 self.position(),
286 ));
287 }
288 };
289
290 let format = if self.consume(&Token::Format)? {
291 if self.consume(&Token::Json)? {
292 ExplainFormat::Json
293 } else if self.consume_ident_ci("SQL")? {
294 ExplainFormat::Sql
295 } else {
296 return Err(ParseError::expected(
297 vec!["JSON", "SQL"],
298 self.peek(),
299 self.position(),
300 ));
301 }
302 } else {
303 ExplainFormat::Sql
304 };
305
306 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
307 target,
308 format,
309 }))
310 }
311
312 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
314 let if_exists = self.match_if_exists()?;
315 let name = self.parse_drop_collection_name()?;
316 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
317 }
318
319 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
320 let if_exists = self.match_if_exists()?;
321 let name = self.parse_drop_collection_name()?;
322 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
323 }
324
325 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
326 let if_exists = self.match_if_exists()?;
327 let name = self.parse_drop_collection_name()?;
328 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
329 }
330
331 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
332 let if_exists = self.match_if_exists()?;
333 let name = self.parse_drop_collection_name()?;
334 Ok(QueryExpr::DropDocument(DropDocumentQuery {
335 name,
336 if_exists,
337 }))
338 }
339
340 pub fn parse_create_keyed_body(
341 &mut self,
342 model: CollectionModel,
343 ) -> Result<QueryExpr, ParseError> {
344 let if_not_exists = self.match_if_not_exists()?;
345 let name = self.parse_drop_collection_name()?;
346 let vault_own_master_key =
347 if model == CollectionModel::Vault && self.consume(&Token::With)? {
348 if !self.consume_ident_ci("OWN")? {
349 return Err(ParseError::expected(
350 vec!["OWN"],
351 self.peek(),
352 self.position(),
353 ));
354 }
355 if !self.consume_ident_ci("MASTER")? {
356 return Err(ParseError::expected(
357 vec!["MASTER"],
358 self.peek(),
359 self.position(),
360 ));
361 }
362 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
363 return Err(ParseError::expected(
364 vec!["KEY"],
365 self.peek(),
366 self.position(),
367 ));
368 }
369 true
370 } else {
371 false
372 };
373 Ok(QueryExpr::CreateTable(CreateTableQuery {
374 collection_model: model,
375 name,
376 columns: Vec::new(),
377 if_not_exists,
378 default_ttl_ms: None,
379 context_index_fields: Vec::new(),
380 context_index_enabled: false,
381 timestamps: false,
382 partition_by: None,
383 tenant_by: None,
384 append_only: false,
385 subscriptions: Vec::new(),
386 vault_own_master_key,
387 }))
388 }
389
390 pub fn parse_create_collection_model_body(
391 &mut self,
392 model: CollectionModel,
393 ) -> Result<QueryExpr, ParseError> {
394 self.parse_create_keyed_body(model)
395 }
396
397 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
398 let if_not_exists = self.match_if_not_exists()?;
399 let name = self.parse_drop_collection_name()?;
400 if !self.consume_ident_ci("KIND")? {
401 return Err(ParseError::expected(
402 vec!["KIND"],
403 self.peek(),
404 self.position(),
405 ));
406 }
407 let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
408 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
409 name,
410 kind,
411 if_not_exists,
412 }))
413 }
414
415 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
416 let if_not_exists = self.match_if_not_exists()?;
417 let name = self.parse_drop_collection_name()?;
418 if !self.consume_ident_ci("DIM")? {
419 return Err(ParseError::expected(
420 vec!["DIM"],
421 self.peek(),
422 self.position(),
423 ));
424 }
425 let dimension = self.parse_integer()?;
426 if dimension <= 0 {
427 return Err(ParseError::new(
428 "VECTOR DIM must be a positive integer".to_string(),
429 self.position(),
430 ));
431 }
432 let metric = if self.consume(&Token::Metric)? {
433 self.parse_distance_metric()?
434 } else {
435 crate::storage::engine::distance::DistanceMetric::Cosine
436 };
437 Ok(QueryExpr::CreateVector(CreateVectorQuery {
438 name,
439 dimension: dimension as usize,
440 metric,
441 if_not_exists,
442 }))
443 }
444
445 pub fn parse_drop_keyed_body(
446 &mut self,
447 model: CollectionModel,
448 ) -> Result<QueryExpr, ParseError> {
449 let if_exists = self.match_if_exists()?;
450 let name = self.parse_drop_collection_name()?;
451 Ok(QueryExpr::DropKv(DropKvQuery {
452 name,
453 if_exists,
454 model,
455 }))
456 }
457
458 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
459 self.parse_drop_keyed_body(CollectionModel::Kv)
460 }
461
462 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
463 let if_exists = self.match_if_exists()?;
464 let name = self.parse_drop_collection_name()?;
465 Ok(QueryExpr::DropCollection(DropCollectionQuery {
466 name,
467 if_exists,
468 }))
469 }
470
471 pub fn parse_truncate_body(
472 &mut self,
473 model: Option<CollectionModel>,
474 ) -> Result<QueryExpr, ParseError> {
475 let if_exists = self.match_if_exists()?;
476 let name = self.parse_drop_collection_name()?;
477 Ok(QueryExpr::Truncate(TruncateQuery {
478 name,
479 model,
480 if_exists,
481 }))
482 }
483
484 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
485 let mut name = self.expect_ident()?;
486 while self.consume(&Token::Dot)? {
487 if self.consume(&Token::Star)? {
488 name.push_str(".*");
489 break;
490 }
491 let next = self.expect_ident_or_keyword()?;
492 name = format!("{name}.{next}");
493 }
494 Ok(name)
495 }
496
497 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
499 self.expect(Token::Alter)?;
500 self.expect(Token::Table)?;
501 let name = self.expect_ident()?;
502
503 let mut operations = Vec::new();
504 loop {
505 let op = self.parse_alter_operation(&name)?;
506 operations.push(op);
507 if !self.consume(&Token::Comma)? {
508 break;
509 }
510 }
511
512 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
513 }
514
515 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
517 if self.consume(&Token::Add)? {
518 if self.consume_ident_ci("SUBSCRIPTION")? {
519 let sub_name = self.expect_ident()?;
521 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
522 Ok(AlterOperation::AddSubscription {
523 name: sub_name,
524 descriptor,
525 })
526 } else {
527 let _ = self.consume(&Token::Column)?;
529 let col_def = self.parse_column_def()?;
530 Ok(AlterOperation::AddColumn(col_def))
531 }
532 } else if self.consume(&Token::Drop)? {
533 if self.consume_ident_ci("SUBSCRIPTION")? {
534 let sub_name = self.expect_ident()?;
536 Ok(AlterOperation::DropSubscription { name: sub_name })
537 } else {
538 let _ = self.consume(&Token::Column)?;
540 let col_name = self.expect_ident()?;
541 Ok(AlterOperation::DropColumn(col_name))
542 }
543 } else if self.consume(&Token::Rename)? {
544 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
547 self.expect(Token::To)?;
548 let to = self.expect_ident()?;
549 Ok(AlterOperation::RenameColumn { from, to })
550 } else if self.consume(&Token::Attach)? {
551 self.expect(Token::Partition)?;
553 let child = self.expect_ident()?;
554 self.expect(Token::For)?;
555 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
559 return Err(ParseError::expected(
560 vec!["VALUES"],
561 self.peek(),
562 self.position(),
563 ));
564 }
565 let bound = self.collect_remaining_tokens_as_string()?;
566 Ok(AlterOperation::AttachPartition { child, bound })
567 } else if self.consume(&Token::Detach)? {
568 self.expect(Token::Partition)?;
570 let child = self.expect_ident()?;
571 Ok(AlterOperation::DetachPartition { child })
572 } else if self.consume(&Token::Enable)? {
573 if self.consume_ident_ci("EVENTS")? {
575 Ok(AlterOperation::EnableEvents(
576 self.parse_subscription_descriptor(table_name.to_string())?,
577 ))
578 } else if self.consume_ident_ci("TENANCY")? {
579 self.expect(Token::On)?;
580 self.expect(Token::LParen)?;
581 let mut path = self.expect_ident_or_keyword()?;
583 while self.consume(&Token::Dot)? {
584 let next = self.expect_ident_or_keyword()?;
585 path = format!("{path}.{next}");
586 }
587 self.expect(Token::RParen)?;
588 Ok(AlterOperation::EnableTenancy { column: path })
589 } else {
590 self.expect(Token::Row)?;
591 self.expect(Token::Level)?;
592 self.expect(Token::Security)?;
593 Ok(AlterOperation::EnableRowLevelSecurity)
594 }
595 } else if self.consume(&Token::Disable)? {
596 if self.consume_ident_ci("EVENTS")? {
598 Ok(AlterOperation::DisableEvents)
599 } else if self.consume_ident_ci("TENANCY")? {
600 Ok(AlterOperation::DisableTenancy)
601 } else {
602 self.expect(Token::Row)?;
603 self.expect(Token::Level)?;
604 self.expect(Token::Security)?;
605 Ok(AlterOperation::DisableRowLevelSecurity)
606 }
607 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
608 if self.consume_ident_ci("APPEND_ONLY")? {
610 let on = self.parse_bool_assign()?;
611 Ok(AlterOperation::SetAppendOnly(on))
612 } else if self.consume_ident_ci("VERSIONED")? {
613 let on = self.parse_bool_assign()?;
614 Ok(AlterOperation::SetVersioned(on))
615 } else {
616 Err(ParseError::expected(
617 vec!["APPEND_ONLY", "VERSIONED"],
618 self.peek(),
619 self.position(),
620 ))
621 }
622 } else {
623 Err(ParseError::expected(
624 vec![
625 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
626 ],
627 self.peek(),
628 self.position(),
629 ))
630 }
631 }
632
633 fn parse_subscription_descriptor(
634 &mut self,
635 source: String,
636 ) -> Result<SubscriptionDescriptor, ParseError> {
637 let mut ops_filter = Vec::new();
638 if self.consume(&Token::LParen)? {
639 loop {
640 let op = if self.consume(&Token::Insert)? {
641 SubscriptionOperation::Insert
642 } else if self.consume(&Token::Update)? {
643 SubscriptionOperation::Update
644 } else if self.consume(&Token::Delete)? {
645 SubscriptionOperation::Delete
646 } else {
647 return Err(ParseError::expected(
648 vec!["INSERT", "UPDATE", "DELETE"],
649 self.peek(),
650 self.position(),
651 ));
652 };
653 ops_filter.push(op);
654 if !self.consume(&Token::Comma)? {
655 break;
656 }
657 }
658 self.expect(Token::RParen)?;
659 }
660
661 let target_queue = if self.consume(&Token::To)? {
662 self.expect_ident()?
663 } else {
664 format!("{source}_events")
665 };
666
667 let mut redact_fields = Vec::new();
668 if self.consume_ident_ci("REDACT")? {
669 self.expect(Token::LParen)?;
670 loop {
671 redact_fields.push(self.parse_dotted_redact_path()?);
672 if !self.consume(&Token::Comma)? {
673 break;
674 }
675 }
676 self.expect(Token::RParen)?;
677 }
678
679 let where_filter = if self.consume(&Token::Where)? {
680 Some(self.collect_subscription_where_filter()?)
681 } else {
682 None
683 };
684
685 let all_tenants = if self.consume(&Token::On)? {
687 self.expect(Token::All)?;
688 if !self.consume_ident_ci("TENANTS")? {
689 return Err(ParseError::expected(
690 vec!["TENANTS"],
691 self.peek(),
692 self.position(),
693 ));
694 }
695 true
696 } else {
697 false
698 };
699
700 if self.consume_ident_ci("REQUIRES")? {
702 self.consume_ident_ci("CAPABILITY")?;
703 self.advance()?;
705 }
706
707 Ok(SubscriptionDescriptor {
708 name: String::new(),
709 source,
710 target_queue,
711 ops_filter,
712 where_filter,
713 redact_fields,
714 enabled: true,
715 all_tenants,
716 })
717 }
718
719 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
721 let mut parts = Vec::new();
722 if self.consume(&Token::Star)? {
723 parts.push("*".to_string());
724 } else {
725 parts.push(self.expect_ident_or_keyword()?);
726 }
727 while self.consume(&Token::Dot)? {
728 if self.consume(&Token::Star)? {
729 parts.push("*".to_string());
730 } else {
731 parts.push(self.expect_ident_or_keyword()?);
732 }
733 }
734 Ok(parts.join("."))
735 }
736
737 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
738 let mut parts = Vec::new();
739 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
740 parts.push(self.peek().to_string());
741 self.advance()?;
742 }
743 if parts.is_empty() {
744 return Err(ParseError::expected(
745 vec!["predicate"],
746 self.peek(),
747 self.position(),
748 ));
749 }
750 Ok(parts.join(" "))
751 }
752
753 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
758 let mut parts: Vec<String> = Vec::new();
759 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
760 parts.push(self.peek().to_string());
761 self.advance()?;
762 }
763 Ok(parts.join(" "))
764 }
765
766 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
768 let name = self.expect_column_ident()?;
769 let sql_type = self.parse_column_type()?;
770 let data_type = sql_type.to_string();
771
772 let mut def = CreateColumnDef {
773 name,
774 data_type,
775 sql_type: sql_type.clone(),
776 not_null: false,
777 default: None,
778 compress: None,
779 unique: false,
780 primary_key: false,
781 enum_variants: sql_type.enum_variants().unwrap_or_default(),
782 array_element: sql_type.array_element_type(),
783 decimal_precision: sql_type.decimal_precision(),
784 };
785
786 loop {
788 if self.match_not_null()? {
789 def.not_null = true;
790 } else if self.consume(&Token::Default)? {
791 self.expect(Token::Eq)?;
792 def.default = Some(self.parse_literal_string_for_ddl()?);
793 } else if self.consume(&Token::Compress)? {
794 self.expect(Token::Colon)?;
795 def.compress = Some(self.parse_integer()? as u8);
796 } else if self.consume(&Token::Unique)? {
797 def.unique = true;
798 } else if self.match_primary_key()? {
799 def.primary_key = true;
800 } else {
801 break;
802 }
803 }
804
805 Ok(def)
806 }
807
808 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
810 let type_name = self.expect_ident_or_keyword()?;
811 if self.consume(&Token::LParen)? {
812 let inner = self.parse_type_params()?;
813 self.expect(Token::RParen)?;
814 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
815 } else {
816 Ok(SqlTypeName::new(type_name))
817 }
818 }
819
820 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
822 let mut parts = Vec::new();
823 loop {
824 match self.peek().clone() {
825 Token::String(s) => {
826 let s = s.clone();
827 self.advance()?;
828 parts.push(TypeModifier::StringLiteral(s));
829 }
830 Token::Integer(n) => {
831 self.advance()?;
832 parts.push(TypeModifier::Number(n as u32));
833 }
834 _ => {
835 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
836 }
837 }
838 if !self.consume(&Token::Comma)? {
839 break;
840 }
841 }
842 Ok(parts)
843 }
844
845 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
847 match self.peek().clone() {
848 Token::String(s) => {
849 let s = s.clone();
850 self.advance()?;
851 Ok(s)
852 }
853 Token::Integer(n) => {
854 self.advance()?;
855 Ok(n.to_string())
856 }
857 Token::Float(n) => {
858 self.advance()?;
859 Ok(n.to_string())
860 }
861 Token::True => {
862 self.advance()?;
863 Ok("true".to_string())
864 }
865 Token::False => {
866 self.advance()?;
867 Ok("false".to_string())
868 }
869 Token::Null => {
870 self.advance()?;
871 Ok("null".to_string())
872 }
873 ref other => Err(ParseError::expected(
874 vec!["string", "number", "true", "false", "null"],
875 other,
876 self.position(),
877 )),
878 }
879 }
880
881 fn check_ttl_keyword(&self) -> bool {
882 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
883 }
884
885 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
888 self.expect(Token::Eq)?;
889 match self.peek() {
890 Token::True => {
891 self.advance()?;
892 Ok(true)
893 }
894 Token::False => {
895 self.advance()?;
896 Ok(false)
897 }
898 other => Err(ParseError::expected(
899 vec!["true", "false"],
900 other,
901 self.position(),
902 )),
903 }
904 }
905
906 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
907 if self.consume_ident_ci(expected)? {
908 Ok(())
909 } else {
910 Err(ParseError::expected(
911 vec![expected],
912 self.peek(),
913 self.position(),
914 ))
915 }
916 }
917
918 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
919 let option_name = self.expect_ident_or_keyword()?;
920 if !option_name.eq_ignore_ascii_case("ttl") {
921 return Err(ParseError::new(
922 format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
926 self.position(),
927 ));
928 }
929
930 let ttl_value = self.parse_float()?;
931 let ttl_unit = match self.peek() {
932 Token::Ident(unit) => {
933 let unit = unit.clone();
934 self.advance()?;
935 unit
936 }
937 _ => "s".to_string(),
938 };
939
940 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
941 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
942 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
943 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
944 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
945 "d" | "day" | "days" => 86_400_000.0,
946 other => {
947 return Err(ParseError::new(
948 format!("unsupported TTL unit {other:?}"),
952 self.position(),
953 ));
954 }
955 };
956
957 if !ttl_value.is_finite() || ttl_value < 0.0 {
958 return Err(ParseError::new(
959 "TTL must be a finite, non-negative duration".to_string(),
960 self.position(),
961 ));
962 }
963
964 let ttl_ms = ttl_value * multiplier_ms;
965 if ttl_ms > u64::MAX as f64 {
966 return Err(ParseError::new(
967 "TTL duration is too large".to_string(),
968 self.position(),
969 ));
970 }
971 if ttl_ms.fract().abs() >= f64::EPSILON {
972 return Err(ParseError::new(
973 "TTL duration must resolve to a whole number of milliseconds".to_string(),
974 self.position(),
975 ));
976 }
977
978 Ok(Some(ttl_ms as u64))
979 }
980
981 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
983 if self.check(&Token::If) {
984 self.advance()?;
985 self.expect(Token::Not)?;
986 self.expect(Token::Exists)?;
987 Ok(true)
988 } else {
989 Ok(false)
990 }
991 }
992
993 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
995 if self.check(&Token::If) {
996 self.advance()?;
997 self.expect(Token::Exists)?;
998 Ok(true)
999 } else {
1000 Ok(false)
1001 }
1002 }
1003
1004 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1006 if self.check(&Token::Not) {
1007 self.advance()?; if self.check(&Token::Null) {
1011 self.advance()?; Ok(true)
1013 } else {
1014 Err(ParseError::expected(
1018 vec!["NULL (after NOT)"],
1019 self.peek(),
1020 self.position(),
1021 ))
1022 }
1023 } else {
1024 Ok(false)
1025 }
1026 }
1027
1028 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1030 if self.check(&Token::Primary) {
1031 self.advance()?;
1032 self.expect(Token::Key)?;
1033 Ok(true)
1034 } else {
1035 Ok(false)
1036 }
1037 }
1038}