1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateColumnDef, CreateTableQuery, DropCollectionQuery,
5 DropDocumentQuery, DropGraphQuery, DropKvQuery, DropTableQuery, DropVectorQuery,
6 ExplainAlterQuery, ExplainFormat, PartitionKind, PartitionSpec, QueryExpr, TruncateQuery,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
12use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
13
14impl<'a> Parser<'a> {
15 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
17 self.expect(Token::Create)?;
18 self.expect(Token::Table)?;
19
20 let if_not_exists = self.match_if_not_exists()?;
21 let name = self.expect_ident()?;
22
23 self.expect(Token::LParen)?;
24 let mut columns = Vec::new();
25 loop {
26 let col = self.parse_column_def()?;
27 columns.push(col);
28 if !self.consume(&Token::Comma)? {
29 break;
30 }
31 }
32 self.expect(Token::RParen)?;
33
34 let mut default_ttl_ms = None;
35 let mut context_index_fields = Vec::new();
36 let mut context_index_enabled = false;
37 let mut timestamps = false;
38 let mut subscriptions = Vec::new();
39
40 while self.consume(&Token::With)? {
41 if self.consume_ident_ci("EVENTS")? {
42 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
43 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
44 context_index_enabled = self.parse_bool_assign()?;
45 } else if self.consume_ident_ci("CONTEXT")? {
46 if !self.consume(&Token::Index)? {
48 return Err(ParseError::expected(
49 vec!["INDEX"],
50 self.peek(),
51 self.position(),
52 ));
53 }
54 self.expect(Token::On)?;
55 self.expect(Token::LParen)?;
56 loop {
57 context_index_fields.push(self.expect_ident()?);
58 if !self.consume(&Token::Comma)? {
59 break;
60 }
61 }
62 self.expect(Token::RParen)?;
63 context_index_enabled = true;
64 } else if self.consume_ident_ci("TIMESTAMPS")? {
65 timestamps = self.parse_bool_assign()?;
66 } else {
67 default_ttl_ms = self.parse_create_table_ttl_clause()?;
68 }
69 }
70
71 Ok(QueryExpr::CreateTable(CreateTableQuery {
72 collection_model: CollectionModel::Table,
73 name,
74 columns,
75 if_not_exists,
76 default_ttl_ms,
77 context_index_fields,
78 context_index_enabled,
79 timestamps,
80 partition_by: None,
81 tenant_by: None,
82 append_only: false,
83 subscriptions,
84 vault_own_master_key: false,
85 }))
86 }
87
88 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
90 self.expect(Token::Drop)?;
91 self.expect(Token::Table)?;
92 self.parse_drop_table_body()
93 }
94
95 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
97 let if_not_exists = self.match_if_not_exists()?;
98 let name = self.expect_ident()?;
99
100 self.expect(Token::LParen)?;
101 let mut columns = Vec::new();
102 loop {
103 let col = self.parse_column_def()?;
104 columns.push(col);
105 if !self.consume(&Token::Comma)? {
106 break;
107 }
108 }
109 self.expect(Token::RParen)?;
110
111 let mut default_ttl_ms = None;
112 let mut context_index_fields = Vec::new();
113 let mut context_index_enabled = false;
114 let mut timestamps = false;
115 let mut tenant_by: Option<String> = None;
116 let mut append_only = false;
117 let mut subscriptions = Vec::new();
118
119 while self.consume(&Token::With)? {
120 if self.consume_ident_ci("EVENTS")? {
121 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
122 continue;
123 }
124 let has_parens = self.consume(&Token::LParen)?;
131
132 loop {
133 if self.consume_ident_ci("CONTEXT_INDEX")? {
134 context_index_enabled = self.parse_bool_assign()?;
135 } else if self.consume_ident_ci("CONTEXT")? {
136 if !self.consume(&Token::Index)? {
137 return Err(ParseError::expected(
138 vec!["INDEX"],
139 self.peek(),
140 self.position(),
141 ));
142 }
143 self.expect(Token::On)?;
144 self.expect(Token::LParen)?;
145 loop {
146 context_index_fields.push(self.expect_ident()?);
147 if !self.consume(&Token::Comma)? {
148 break;
149 }
150 }
151 self.expect(Token::RParen)?;
152 context_index_enabled = true;
153 } else if self.consume_ident_ci("TIMESTAMPS")? {
154 timestamps = self.parse_bool_assign()?;
155 } else if self.consume_ident_ci("APPEND_ONLY")? {
156 append_only = self.parse_bool_assign()?;
157 } else if self.consume_ident_ci("TENANT_BY")? {
158 let _ = self.consume(&Token::Eq)?;
161 let value = self.parse_literal_value()?;
162 match value {
163 Value::Text(col) => tenant_by = Some(col.to_string()),
164 other => {
165 return Err(ParseError::new(
166 format!("WITH tenant_by expects a text literal, got {other:?}"),
167 self.position(),
168 ));
169 }
170 }
171 } else {
172 default_ttl_ms = self.parse_create_table_ttl_clause()?;
173 }
174 if has_parens {
175 if self.consume(&Token::Comma)? {
176 continue;
177 }
178 self.expect(Token::RParen)?;
179 }
180 break;
181 }
182 }
183
184 let partition_by = if self.consume(&Token::Partition)? {
186 self.expect(Token::By)?;
187 let kind = if self.consume(&Token::Range)? {
188 PartitionKind::Range
189 } else if self.consume(&Token::List)? {
190 PartitionKind::List
191 } else if self.consume(&Token::Hash)? {
192 PartitionKind::Hash
193 } else {
194 return Err(ParseError::expected(
195 vec!["RANGE", "LIST", "HASH"],
196 self.peek(),
197 self.position(),
198 ));
199 };
200 self.expect(Token::LParen)?;
201 let column = self.expect_ident()?;
202 self.expect(Token::RParen)?;
203 Some(PartitionSpec { kind, column })
204 } else {
205 None
206 };
207
208 if !append_only && self.consume_ident_ci("APPEND")? {
213 if !self.consume_ident_ci("ONLY")? {
214 return Err(ParseError::expected(
215 vec!["ONLY"],
216 self.peek(),
217 self.position(),
218 ));
219 }
220 append_only = true;
221 }
222
223 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
234 self.expect(Token::By)?;
235 self.expect(Token::LParen)?;
236 let mut path = self.expect_ident_or_keyword()?;
240 while self.consume(&Token::Dot)? {
241 let next = self.expect_ident_or_keyword()?;
242 path = format!("{path}.{next}");
243 }
244 self.expect(Token::RParen)?;
245 tenant_by = Some(path);
246 }
247
248 Ok(QueryExpr::CreateTable(CreateTableQuery {
249 collection_model: CollectionModel::Table,
250 name,
251 columns,
252 if_not_exists,
253 default_ttl_ms,
254 context_index_fields,
255 context_index_enabled,
256 timestamps,
257 partition_by,
258 tenant_by,
259 append_only,
260 subscriptions,
261 vault_own_master_key: false,
262 }))
263 }
264
265 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
271 self.expect(Token::Explain)?;
272 self.expect(Token::Alter)?;
273 self.expect(Token::For)?;
274 self.expect(Token::Create)?;
275 self.expect(Token::Table)?;
276
277 let body = self.parse_create_table_body()?;
278 let target = match body {
279 QueryExpr::CreateTable(t) => t,
280 _ => {
281 return Err(ParseError::new(
282 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
283 .to_string(),
284 self.position(),
285 ));
286 }
287 };
288
289 let format = if self.consume(&Token::Format)? {
290 if self.consume(&Token::Json)? {
291 ExplainFormat::Json
292 } else if self.consume_ident_ci("SQL")? {
293 ExplainFormat::Sql
294 } else {
295 return Err(ParseError::expected(
296 vec!["JSON", "SQL"],
297 self.peek(),
298 self.position(),
299 ));
300 }
301 } else {
302 ExplainFormat::Sql
303 };
304
305 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
306 target,
307 format,
308 }))
309 }
310
311 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
313 let if_exists = self.match_if_exists()?;
314 let name = self.parse_drop_collection_name()?;
315 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
316 }
317
318 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
319 let if_exists = self.match_if_exists()?;
320 let name = self.parse_drop_collection_name()?;
321 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
322 }
323
324 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
325 let if_exists = self.match_if_exists()?;
326 let name = self.parse_drop_collection_name()?;
327 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
328 }
329
330 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
331 let if_exists = self.match_if_exists()?;
332 let name = self.parse_drop_collection_name()?;
333 Ok(QueryExpr::DropDocument(DropDocumentQuery {
334 name,
335 if_exists,
336 }))
337 }
338
339 pub fn parse_create_keyed_body(
340 &mut self,
341 model: CollectionModel,
342 ) -> Result<QueryExpr, ParseError> {
343 let if_not_exists = self.match_if_not_exists()?;
344 let name = self.parse_drop_collection_name()?;
345 let vault_own_master_key =
346 if model == CollectionModel::Vault && self.consume(&Token::With)? {
347 if !self.consume_ident_ci("OWN")? {
348 return Err(ParseError::expected(
349 vec!["OWN"],
350 self.peek(),
351 self.position(),
352 ));
353 }
354 if !self.consume_ident_ci("MASTER")? {
355 return Err(ParseError::expected(
356 vec!["MASTER"],
357 self.peek(),
358 self.position(),
359 ));
360 }
361 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
362 return Err(ParseError::expected(
363 vec!["KEY"],
364 self.peek(),
365 self.position(),
366 ));
367 }
368 true
369 } else {
370 false
371 };
372 Ok(QueryExpr::CreateTable(CreateTableQuery {
373 collection_model: model,
374 name,
375 columns: Vec::new(),
376 if_not_exists,
377 default_ttl_ms: None,
378 context_index_fields: Vec::new(),
379 context_index_enabled: false,
380 timestamps: false,
381 partition_by: None,
382 tenant_by: None,
383 append_only: false,
384 subscriptions: Vec::new(),
385 vault_own_master_key,
386 }))
387 }
388
389 pub fn parse_drop_keyed_body(
390 &mut self,
391 model: CollectionModel,
392 ) -> Result<QueryExpr, ParseError> {
393 let if_exists = self.match_if_exists()?;
394 let name = self.parse_drop_collection_name()?;
395 Ok(QueryExpr::DropKv(DropKvQuery {
396 name,
397 if_exists,
398 model,
399 }))
400 }
401
402 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
403 self.parse_drop_keyed_body(CollectionModel::Kv)
404 }
405
406 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
407 let if_exists = self.match_if_exists()?;
408 let name = self.parse_drop_collection_name()?;
409 Ok(QueryExpr::DropCollection(DropCollectionQuery {
410 name,
411 if_exists,
412 }))
413 }
414
415 pub fn parse_truncate_body(
416 &mut self,
417 model: Option<CollectionModel>,
418 ) -> Result<QueryExpr, ParseError> {
419 let if_exists = self.match_if_exists()?;
420 let name = self.parse_drop_collection_name()?;
421 Ok(QueryExpr::Truncate(TruncateQuery {
422 name,
423 model,
424 if_exists,
425 }))
426 }
427
428 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
429 let mut name = self.expect_ident()?;
430 while self.consume(&Token::Dot)? {
431 if self.consume(&Token::Star)? {
432 name.push_str(".*");
433 break;
434 }
435 let next = self.expect_ident_or_keyword()?;
436 name = format!("{name}.{next}");
437 }
438 Ok(name)
439 }
440
441 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
443 self.expect(Token::Alter)?;
444 self.expect(Token::Table)?;
445 let name = self.expect_ident()?;
446
447 let mut operations = Vec::new();
448 loop {
449 let op = self.parse_alter_operation(&name)?;
450 operations.push(op);
451 if !self.consume(&Token::Comma)? {
452 break;
453 }
454 }
455
456 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
457 }
458
459 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
461 if self.consume(&Token::Add)? {
462 if self.consume_ident_ci("SUBSCRIPTION")? {
463 let sub_name = self.expect_ident()?;
465 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
466 Ok(AlterOperation::AddSubscription {
467 name: sub_name,
468 descriptor,
469 })
470 } else {
471 let _ = self.consume(&Token::Column)?;
473 let col_def = self.parse_column_def()?;
474 Ok(AlterOperation::AddColumn(col_def))
475 }
476 } else if self.consume(&Token::Drop)? {
477 if self.consume_ident_ci("SUBSCRIPTION")? {
478 let sub_name = self.expect_ident()?;
480 Ok(AlterOperation::DropSubscription { name: sub_name })
481 } else {
482 let _ = self.consume(&Token::Column)?;
484 let col_name = self.expect_ident()?;
485 Ok(AlterOperation::DropColumn(col_name))
486 }
487 } else if self.consume(&Token::Rename)? {
488 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
491 self.expect(Token::To)?;
492 let to = self.expect_ident()?;
493 Ok(AlterOperation::RenameColumn { from, to })
494 } else if self.consume(&Token::Attach)? {
495 self.expect(Token::Partition)?;
497 let child = self.expect_ident()?;
498 self.expect(Token::For)?;
499 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
503 return Err(ParseError::expected(
504 vec!["VALUES"],
505 self.peek(),
506 self.position(),
507 ));
508 }
509 let bound = self.collect_remaining_tokens_as_string()?;
510 Ok(AlterOperation::AttachPartition { child, bound })
511 } else if self.consume(&Token::Detach)? {
512 self.expect(Token::Partition)?;
514 let child = self.expect_ident()?;
515 Ok(AlterOperation::DetachPartition { child })
516 } else if self.consume(&Token::Enable)? {
517 if self.consume_ident_ci("EVENTS")? {
519 Ok(AlterOperation::EnableEvents(
520 self.parse_subscription_descriptor(table_name.to_string())?,
521 ))
522 } else if self.consume_ident_ci("TENANCY")? {
523 self.expect(Token::On)?;
524 self.expect(Token::LParen)?;
525 let mut path = self.expect_ident_or_keyword()?;
527 while self.consume(&Token::Dot)? {
528 let next = self.expect_ident_or_keyword()?;
529 path = format!("{path}.{next}");
530 }
531 self.expect(Token::RParen)?;
532 Ok(AlterOperation::EnableTenancy { column: path })
533 } else {
534 self.expect(Token::Row)?;
535 self.expect(Token::Level)?;
536 self.expect(Token::Security)?;
537 Ok(AlterOperation::EnableRowLevelSecurity)
538 }
539 } else if self.consume(&Token::Disable)? {
540 if self.consume_ident_ci("EVENTS")? {
542 Ok(AlterOperation::DisableEvents)
543 } else if self.consume_ident_ci("TENANCY")? {
544 Ok(AlterOperation::DisableTenancy)
545 } else {
546 self.expect(Token::Row)?;
547 self.expect(Token::Level)?;
548 self.expect(Token::Security)?;
549 Ok(AlterOperation::DisableRowLevelSecurity)
550 }
551 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
552 if self.consume_ident_ci("APPEND_ONLY")? {
554 let on = self.parse_bool_assign()?;
555 Ok(AlterOperation::SetAppendOnly(on))
556 } else if self.consume_ident_ci("VERSIONED")? {
557 let on = self.parse_bool_assign()?;
558 Ok(AlterOperation::SetVersioned(on))
559 } else {
560 Err(ParseError::expected(
561 vec!["APPEND_ONLY", "VERSIONED"],
562 self.peek(),
563 self.position(),
564 ))
565 }
566 } else {
567 Err(ParseError::expected(
568 vec![
569 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
570 ],
571 self.peek(),
572 self.position(),
573 ))
574 }
575 }
576
577 fn parse_subscription_descriptor(
578 &mut self,
579 source: String,
580 ) -> Result<SubscriptionDescriptor, ParseError> {
581 let mut ops_filter = Vec::new();
582 if self.consume(&Token::LParen)? {
583 loop {
584 let op = if self.consume(&Token::Insert)? {
585 SubscriptionOperation::Insert
586 } else if self.consume(&Token::Update)? {
587 SubscriptionOperation::Update
588 } else if self.consume(&Token::Delete)? {
589 SubscriptionOperation::Delete
590 } else {
591 return Err(ParseError::expected(
592 vec!["INSERT", "UPDATE", "DELETE"],
593 self.peek(),
594 self.position(),
595 ));
596 };
597 ops_filter.push(op);
598 if !self.consume(&Token::Comma)? {
599 break;
600 }
601 }
602 self.expect(Token::RParen)?;
603 }
604
605 let target_queue = if self.consume(&Token::To)? {
606 self.expect_ident()?
607 } else {
608 format!("{source}_events")
609 };
610
611 let mut redact_fields = Vec::new();
612 if self.consume_ident_ci("REDACT")? {
613 self.expect(Token::LParen)?;
614 loop {
615 redact_fields.push(self.parse_dotted_redact_path()?);
616 if !self.consume(&Token::Comma)? {
617 break;
618 }
619 }
620 self.expect(Token::RParen)?;
621 }
622
623 let where_filter = if self.consume(&Token::Where)? {
624 Some(self.collect_subscription_where_filter()?)
625 } else {
626 None
627 };
628
629 let all_tenants = if self.consume(&Token::On)? {
631 self.expect(Token::All)?;
632 if !self.consume_ident_ci("TENANTS")? {
633 return Err(ParseError::expected(
634 vec!["TENANTS"],
635 self.peek(),
636 self.position(),
637 ));
638 }
639 true
640 } else {
641 false
642 };
643
644 if self.consume_ident_ci("REQUIRES")? {
646 self.consume_ident_ci("CAPABILITY")?;
647 self.advance()?;
649 }
650
651 Ok(SubscriptionDescriptor {
652 name: String::new(),
653 source,
654 target_queue,
655 ops_filter,
656 where_filter,
657 redact_fields,
658 enabled: true,
659 all_tenants,
660 })
661 }
662
663 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
665 let mut parts = Vec::new();
666 if self.consume(&Token::Star)? {
667 parts.push("*".to_string());
668 } else {
669 parts.push(self.expect_ident_or_keyword()?);
670 }
671 while self.consume(&Token::Dot)? {
672 if self.consume(&Token::Star)? {
673 parts.push("*".to_string());
674 } else {
675 parts.push(self.expect_ident_or_keyword()?);
676 }
677 }
678 Ok(parts.join("."))
679 }
680
681 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
682 let mut parts = Vec::new();
683 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
684 parts.push(self.peek().to_string());
685 self.advance()?;
686 }
687 if parts.is_empty() {
688 return Err(ParseError::expected(
689 vec!["predicate"],
690 self.peek(),
691 self.position(),
692 ));
693 }
694 Ok(parts.join(" "))
695 }
696
697 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
702 let mut parts: Vec<String> = Vec::new();
703 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
704 parts.push(self.peek().to_string());
705 self.advance()?;
706 }
707 Ok(parts.join(" "))
708 }
709
710 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
712 let name = self.expect_ident()?;
713 let sql_type = self.parse_column_type()?;
714 let data_type = sql_type.to_string();
715
716 let mut def = CreateColumnDef {
717 name,
718 data_type,
719 sql_type: sql_type.clone(),
720 not_null: false,
721 default: None,
722 compress: None,
723 unique: false,
724 primary_key: false,
725 enum_variants: sql_type.enum_variants().unwrap_or_default(),
726 array_element: sql_type.array_element_type(),
727 decimal_precision: sql_type.decimal_precision(),
728 };
729
730 loop {
732 if self.match_not_null()? {
733 def.not_null = true;
734 } else if self.consume(&Token::Default)? {
735 self.expect(Token::Eq)?;
736 def.default = Some(self.parse_literal_string_for_ddl()?);
737 } else if self.consume(&Token::Compress)? {
738 self.expect(Token::Colon)?;
739 def.compress = Some(self.parse_integer()? as u8);
740 } else if self.consume(&Token::Unique)? {
741 def.unique = true;
742 } else if self.match_primary_key()? {
743 def.primary_key = true;
744 } else {
745 break;
746 }
747 }
748
749 Ok(def)
750 }
751
752 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
754 let type_name = self.expect_ident_or_keyword()?;
755 if self.consume(&Token::LParen)? {
756 let inner = self.parse_type_params()?;
757 self.expect(Token::RParen)?;
758 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
759 } else {
760 Ok(SqlTypeName::new(type_name))
761 }
762 }
763
764 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
766 let mut parts = Vec::new();
767 loop {
768 match self.peek().clone() {
769 Token::String(s) => {
770 let s = s.clone();
771 self.advance()?;
772 parts.push(TypeModifier::StringLiteral(s));
773 }
774 Token::Integer(n) => {
775 self.advance()?;
776 parts.push(TypeModifier::Number(n as u32));
777 }
778 _ => {
779 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
780 }
781 }
782 if !self.consume(&Token::Comma)? {
783 break;
784 }
785 }
786 Ok(parts)
787 }
788
789 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
791 match self.peek().clone() {
792 Token::String(s) => {
793 let s = s.clone();
794 self.advance()?;
795 Ok(s)
796 }
797 Token::Integer(n) => {
798 self.advance()?;
799 Ok(n.to_string())
800 }
801 Token::Float(n) => {
802 self.advance()?;
803 Ok(n.to_string())
804 }
805 Token::True => {
806 self.advance()?;
807 Ok("true".to_string())
808 }
809 Token::False => {
810 self.advance()?;
811 Ok("false".to_string())
812 }
813 Token::Null => {
814 self.advance()?;
815 Ok("null".to_string())
816 }
817 ref other => Err(ParseError::expected(
818 vec!["string", "number", "true", "false", "null"],
819 other,
820 self.position(),
821 )),
822 }
823 }
824
825 fn check_ttl_keyword(&self) -> bool {
826 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
827 }
828
829 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
832 self.expect(Token::Eq)?;
833 match self.peek() {
834 Token::True => {
835 self.advance()?;
836 Ok(true)
837 }
838 Token::False => {
839 self.advance()?;
840 Ok(false)
841 }
842 other => Err(ParseError::expected(
843 vec!["true", "false"],
844 other,
845 self.position(),
846 )),
847 }
848 }
849
850 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
851 if self.consume_ident_ci(expected)? {
852 Ok(())
853 } else {
854 Err(ParseError::expected(
855 vec![expected],
856 self.peek(),
857 self.position(),
858 ))
859 }
860 }
861
862 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
863 let option_name = self.expect_ident_or_keyword()?;
864 if !option_name.eq_ignore_ascii_case("ttl") {
865 return Err(ParseError::new(
866 format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
870 self.position(),
871 ));
872 }
873
874 let ttl_value = self.parse_float()?;
875 let ttl_unit = match self.peek() {
876 Token::Ident(unit) => {
877 let unit = unit.clone();
878 self.advance()?;
879 unit
880 }
881 _ => "s".to_string(),
882 };
883
884 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
885 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
886 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
887 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
888 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
889 "d" | "day" | "days" => 86_400_000.0,
890 other => {
891 return Err(ParseError::new(
892 format!("unsupported TTL unit {other:?}"),
896 self.position(),
897 ));
898 }
899 };
900
901 if !ttl_value.is_finite() || ttl_value < 0.0 {
902 return Err(ParseError::new(
903 "TTL must be a finite, non-negative duration".to_string(),
904 self.position(),
905 ));
906 }
907
908 let ttl_ms = ttl_value * multiplier_ms;
909 if ttl_ms > u64::MAX as f64 {
910 return Err(ParseError::new(
911 "TTL duration is too large".to_string(),
912 self.position(),
913 ));
914 }
915 if ttl_ms.fract().abs() >= f64::EPSILON {
916 return Err(ParseError::new(
917 "TTL duration must resolve to a whole number of milliseconds".to_string(),
918 self.position(),
919 ));
920 }
921
922 Ok(Some(ttl_ms as u64))
923 }
924
925 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
927 if self.check(&Token::If) {
928 self.advance()?;
929 self.expect(Token::Not)?;
930 self.expect(Token::Exists)?;
931 Ok(true)
932 } else {
933 Ok(false)
934 }
935 }
936
937 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
939 if self.check(&Token::If) {
940 self.advance()?;
941 self.expect(Token::Exists)?;
942 Ok(true)
943 } else {
944 Ok(false)
945 }
946 }
947
948 fn match_not_null(&mut self) -> Result<bool, ParseError> {
950 if self.check(&Token::Not) {
951 self.advance()?; if self.check(&Token::Null) {
955 self.advance()?; Ok(true)
957 } else {
958 Err(ParseError::expected(
962 vec!["NULL (after NOT)"],
963 self.peek(),
964 self.position(),
965 ))
966 }
967 } else {
968 Ok(false)
969 }
970 }
971
972 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
974 if self.check(&Token::Primary) {
975 self.advance()?;
976 self.expect(Token::Key)?;
977 Ok(true)
978 } else {
979 Ok(false)
980 }
981 }
982}