1use super::super::ast::{
4 AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5 CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6 DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7 PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16 pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18 self.expect(Token::Create)?;
19 self.expect(Token::Table)?;
20
21 let if_not_exists = self.match_if_not_exists()?;
22 let name = self.expect_ident()?;
23
24 self.expect(Token::LParen)?;
25 let mut columns = Vec::new();
26 loop {
27 let col = self.parse_column_def()?;
28 columns.push(col);
29 if !self.consume(&Token::Comma)? {
30 break;
31 }
32 }
33 self.expect(Token::RParen)?;
34
35 let mut default_ttl_ms = None;
36 let mut context_index_fields = Vec::new();
37 let mut context_index_enabled = false;
38 let mut timestamps = false;
39 let mut subscriptions = Vec::new();
40
41 while self.consume(&Token::With)? {
42 if self.consume_ident_ci("EVENTS")? {
43 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44 } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45 context_index_enabled = self.parse_bool_assign()?;
46 } else if self.consume_ident_ci("CONTEXT")? {
47 if !self.consume(&Token::Index)? {
49 return Err(ParseError::expected(
50 vec!["INDEX"],
51 self.peek(),
52 self.position(),
53 ));
54 }
55 self.expect(Token::On)?;
56 self.expect(Token::LParen)?;
57 loop {
58 context_index_fields.push(self.expect_ident()?);
59 if !self.consume(&Token::Comma)? {
60 break;
61 }
62 }
63 self.expect(Token::RParen)?;
64 context_index_enabled = true;
65 } else if self.consume_ident_ci("TIMESTAMPS")? {
66 timestamps = self.parse_bool_assign()?;
67 } else {
68 default_ttl_ms = self.parse_create_table_ttl_clause()?;
69 }
70 }
71
72 Ok(QueryExpr::CreateTable(CreateTableQuery {
73 collection_model: CollectionModel::Table,
74 name,
75 columns,
76 if_not_exists,
77 default_ttl_ms,
78 metrics_rollup_policies: Vec::new(),
79 context_index_fields,
80 context_index_enabled,
81 timestamps,
82 partition_by: None,
83 tenant_by: None,
84 append_only: false,
85 subscriptions,
86 vault_own_master_key: false,
87 }))
88 }
89
90 pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92 self.expect(Token::Drop)?;
93 self.expect(Token::Table)?;
94 self.parse_drop_table_body()
95 }
96
97 pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99 let if_not_exists = self.match_if_not_exists()?;
100 let name = self.expect_ident()?;
101
102 self.expect(Token::LParen)?;
103 let mut columns = Vec::new();
104 loop {
105 let col = self.parse_column_def()?;
106 columns.push(col);
107 if !self.consume(&Token::Comma)? {
108 break;
109 }
110 }
111 self.expect(Token::RParen)?;
112
113 let mut default_ttl_ms = None;
114 let mut context_index_fields = Vec::new();
115 let mut context_index_enabled = false;
116 let mut timestamps = false;
117 let mut tenant_by: Option<String> = None;
118 let mut append_only = false;
119 let mut subscriptions = Vec::new();
120
121 while self.consume(&Token::With)? {
122 if self.consume_ident_ci("EVENTS")? {
123 subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124 continue;
125 }
126 let has_parens = self.consume(&Token::LParen)?;
133
134 loop {
135 if self.consume_ident_ci("CONTEXT_INDEX")? {
136 context_index_enabled = self.parse_bool_assign()?;
137 } else if self.consume_ident_ci("CONTEXT")? {
138 if !self.consume(&Token::Index)? {
139 return Err(ParseError::expected(
140 vec!["INDEX"],
141 self.peek(),
142 self.position(),
143 ));
144 }
145 self.expect(Token::On)?;
146 self.expect(Token::LParen)?;
147 loop {
148 context_index_fields.push(self.expect_ident()?);
149 if !self.consume(&Token::Comma)? {
150 break;
151 }
152 }
153 self.expect(Token::RParen)?;
154 context_index_enabled = true;
155 } else if self.consume_ident_ci("TIMESTAMPS")? {
156 timestamps = self.parse_bool_assign()?;
157 } else if self.consume_ident_ci("APPEND_ONLY")? {
158 append_only = self.parse_bool_assign()?;
159 } else if self.consume_ident_ci("TENANT_BY")? {
160 let _ = self.consume(&Token::Eq)?;
163 let value = self.parse_literal_value()?;
164 match value {
165 Value::Text(col) => tenant_by = Some(col.to_string()),
166 other => {
167 return Err(ParseError::new(
168 format!("WITH tenant_by expects a text literal, got {other:?}"),
169 self.position(),
170 ));
171 }
172 }
173 } else {
174 default_ttl_ms = self.parse_create_table_ttl_clause()?;
175 }
176 if has_parens {
177 if self.consume(&Token::Comma)? {
178 continue;
179 }
180 self.expect(Token::RParen)?;
181 }
182 break;
183 }
184 }
185
186 let partition_by = if self.consume(&Token::Partition)? {
188 self.expect(Token::By)?;
189 let kind = if self.consume(&Token::Range)? {
190 PartitionKind::Range
191 } else if self.consume(&Token::List)? {
192 PartitionKind::List
193 } else if self.consume(&Token::Hash)? {
194 PartitionKind::Hash
195 } else {
196 return Err(ParseError::expected(
197 vec!["RANGE", "LIST", "HASH"],
198 self.peek(),
199 self.position(),
200 ));
201 };
202 self.expect(Token::LParen)?;
203 let column = self.expect_ident()?;
204 self.expect(Token::RParen)?;
205 Some(PartitionSpec { kind, column })
206 } else {
207 None
208 };
209
210 if !append_only && self.consume_ident_ci("APPEND")? {
215 if !self.consume_ident_ci("ONLY")? {
216 return Err(ParseError::expected(
217 vec!["ONLY"],
218 self.peek(),
219 self.position(),
220 ));
221 }
222 append_only = true;
223 }
224
225 if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236 self.expect(Token::By)?;
237 self.expect(Token::LParen)?;
238 let mut path = self.expect_ident_or_keyword()?;
242 while self.consume(&Token::Dot)? {
243 let next = self.expect_ident_or_keyword()?;
244 path = format!("{path}.{next}");
245 }
246 self.expect(Token::RParen)?;
247 tenant_by = Some(path);
248 }
249
250 Ok(QueryExpr::CreateTable(CreateTableQuery {
251 collection_model: CollectionModel::Table,
252 name,
253 columns,
254 if_not_exists,
255 default_ttl_ms,
256 metrics_rollup_policies: Vec::new(),
257 context_index_fields,
258 context_index_enabled,
259 timestamps,
260 partition_by,
261 tenant_by,
262 append_only,
263 subscriptions,
264 vault_own_master_key: false,
265 }))
266 }
267
268 pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274 self.expect(Token::Explain)?;
275 self.expect(Token::Alter)?;
276 self.expect(Token::For)?;
277 self.expect(Token::Create)?;
278 self.expect(Token::Table)?;
279
280 let body = self.parse_create_table_body()?;
281 let target = match body {
282 QueryExpr::CreateTable(t) => t,
283 _ => {
284 return Err(ParseError::new(
285 "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286 .to_string(),
287 self.position(),
288 ));
289 }
290 };
291
292 let format = if self.consume(&Token::Format)? {
293 if self.consume(&Token::Json)? {
294 ExplainFormat::Json
295 } else if self.consume_ident_ci("SQL")? {
296 ExplainFormat::Sql
297 } else {
298 return Err(ParseError::expected(
299 vec!["JSON", "SQL"],
300 self.peek(),
301 self.position(),
302 ));
303 }
304 } else {
305 ExplainFormat::Sql
306 };
307
308 Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309 target,
310 format,
311 }))
312 }
313
314 pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316 let if_exists = self.match_if_exists()?;
317 let name = self.parse_drop_collection_name()?;
318 Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319 }
320
321 pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322 let if_exists = self.match_if_exists()?;
323 let name = self.parse_drop_collection_name()?;
324 Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325 }
326
327 pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328 let if_exists = self.match_if_exists()?;
329 let name = self.parse_drop_collection_name()?;
330 Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331 }
332
333 pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334 let if_exists = self.match_if_exists()?;
335 let name = self.parse_drop_collection_name()?;
336 Ok(QueryExpr::DropDocument(DropDocumentQuery {
337 name,
338 if_exists,
339 }))
340 }
341
342 pub fn parse_create_keyed_body(
343 &mut self,
344 model: CollectionModel,
345 ) -> Result<QueryExpr, ParseError> {
346 let if_not_exists = self.match_if_not_exists()?;
347 let name = self.parse_drop_collection_name()?;
348 let vault_own_master_key =
349 if model == CollectionModel::Vault && self.consume(&Token::With)? {
350 if !self.consume_ident_ci("OWN")? {
351 return Err(ParseError::expected(
352 vec!["OWN"],
353 self.peek(),
354 self.position(),
355 ));
356 }
357 if !self.consume_ident_ci("MASTER")? {
358 return Err(ParseError::expected(
359 vec!["MASTER"],
360 self.peek(),
361 self.position(),
362 ));
363 }
364 if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365 return Err(ParseError::expected(
366 vec!["KEY"],
367 self.peek(),
368 self.position(),
369 ));
370 }
371 true
372 } else {
373 false
374 };
375 Ok(QueryExpr::CreateTable(CreateTableQuery {
376 collection_model: model,
377 name,
378 columns: Vec::new(),
379 if_not_exists,
380 default_ttl_ms: None,
381 metrics_rollup_policies: Vec::new(),
382 context_index_fields: Vec::new(),
383 context_index_enabled: false,
384 timestamps: false,
385 partition_by: None,
386 tenant_by: None,
387 append_only: false,
388 subscriptions: Vec::new(),
389 vault_own_master_key,
390 }))
391 }
392
393 pub fn parse_create_collection_model_body(
394 &mut self,
395 model: CollectionModel,
396 ) -> Result<QueryExpr, ParseError> {
397 self.parse_create_keyed_body(model)
398 }
399
400 pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401 let if_not_exists = self.match_if_not_exists()?;
402 let name = self.parse_drop_collection_name()?;
403 if !self.consume_ident_ci("KIND")? {
404 return Err(ParseError::expected(
405 vec!["KIND"],
406 self.peek(),
407 self.position(),
408 ));
409 }
410 let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411 Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
412 name,
413 kind,
414 if_not_exists,
415 }))
416 }
417
418 pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
419 let if_not_exists = self.match_if_not_exists()?;
420 let name = self.parse_drop_collection_name()?;
421 if !self.consume_ident_ci("DIM")? {
422 return Err(ParseError::expected(
423 vec!["DIM"],
424 self.peek(),
425 self.position(),
426 ));
427 }
428 let dimension = self.parse_integer()?;
429 if dimension <= 0 {
430 return Err(ParseError::new(
431 "VECTOR DIM must be a positive integer".to_string(),
432 self.position(),
433 ));
434 }
435 let metric = if self.consume(&Token::Metric)? {
436 self.parse_distance_metric()?
437 } else {
438 crate::storage::engine::distance::DistanceMetric::Cosine
439 };
440 Ok(QueryExpr::CreateVector(CreateVectorQuery {
441 name,
442 dimension: dimension as usize,
443 metric,
444 if_not_exists,
445 }))
446 }
447
448 pub fn parse_drop_keyed_body(
449 &mut self,
450 model: CollectionModel,
451 ) -> Result<QueryExpr, ParseError> {
452 let if_exists = self.match_if_exists()?;
453 let name = self.parse_drop_collection_name()?;
454 Ok(QueryExpr::DropKv(DropKvQuery {
455 name,
456 if_exists,
457 model,
458 }))
459 }
460
461 pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
462 self.parse_drop_keyed_body(CollectionModel::Kv)
463 }
464
465 pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
466 self.parse_drop_collection_model_body(None)
467 }
468
469 pub fn parse_drop_collection_model_body(
470 &mut self,
471 model: Option<CollectionModel>,
472 ) -> Result<QueryExpr, ParseError> {
473 let if_exists = self.match_if_exists()?;
474 let name = self.parse_drop_collection_name()?;
475 Ok(QueryExpr::DropCollection(DropCollectionQuery {
476 name,
477 if_exists,
478 model,
479 }))
480 }
481
482 pub fn parse_truncate_body(
483 &mut self,
484 model: Option<CollectionModel>,
485 ) -> Result<QueryExpr, ParseError> {
486 let if_exists = self.match_if_exists()?;
487 let name = self.parse_drop_collection_name()?;
488 Ok(QueryExpr::Truncate(TruncateQuery {
489 name,
490 model,
491 if_exists,
492 }))
493 }
494
495 pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
496 let mut name = self.expect_ident()?;
497 while self.consume(&Token::Dot)? {
498 if self.consume(&Token::Star)? {
499 name.push_str(".*");
500 break;
501 }
502 let next = self.expect_ident_or_keyword()?;
503 name = format!("{name}.{next}");
504 }
505 Ok(name)
506 }
507
508 pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
510 self.expect(Token::Alter)?;
511 self.expect(Token::Table)?;
512 let name = self.expect_ident()?;
513
514 let mut operations = Vec::new();
515 loop {
516 let op = self.parse_alter_operation(&name)?;
517 operations.push(op);
518 if !self.consume(&Token::Comma)? {
519 break;
520 }
521 }
522
523 Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
524 }
525
526 fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
528 if self.consume(&Token::Add)? {
529 if self.consume_ident_ci("SUBSCRIPTION")? {
530 let sub_name = self.expect_ident()?;
532 let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
533 Ok(AlterOperation::AddSubscription {
534 name: sub_name,
535 descriptor,
536 })
537 } else {
538 let _ = self.consume(&Token::Column)?;
540 let col_def = self.parse_column_def()?;
541 Ok(AlterOperation::AddColumn(col_def))
542 }
543 } else if self.consume(&Token::Drop)? {
544 if self.consume_ident_ci("SUBSCRIPTION")? {
545 let sub_name = self.expect_ident()?;
547 Ok(AlterOperation::DropSubscription { name: sub_name })
548 } else {
549 let _ = self.consume(&Token::Column)?;
551 let col_name = self.expect_ident()?;
552 Ok(AlterOperation::DropColumn(col_name))
553 }
554 } else if self.consume(&Token::Rename)? {
555 let _ = self.consume(&Token::Column)?; let from = self.expect_ident()?;
558 self.expect(Token::To)?;
559 let to = self.expect_ident()?;
560 Ok(AlterOperation::RenameColumn { from, to })
561 } else if self.consume(&Token::Attach)? {
562 self.expect(Token::Partition)?;
564 let child = self.expect_ident()?;
565 self.expect(Token::For)?;
566 if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
570 return Err(ParseError::expected(
571 vec!["VALUES"],
572 self.peek(),
573 self.position(),
574 ));
575 }
576 let bound = self.collect_remaining_tokens_as_string()?;
577 Ok(AlterOperation::AttachPartition { child, bound })
578 } else if self.consume(&Token::Detach)? {
579 self.expect(Token::Partition)?;
581 let child = self.expect_ident()?;
582 Ok(AlterOperation::DetachPartition { child })
583 } else if self.consume(&Token::Enable)? {
584 if self.consume_ident_ci("EVENTS")? {
586 Ok(AlterOperation::EnableEvents(
587 self.parse_subscription_descriptor(table_name.to_string())?,
588 ))
589 } else if self.consume_ident_ci("TENANCY")? {
590 self.expect(Token::On)?;
591 self.expect(Token::LParen)?;
592 let mut path = self.expect_ident_or_keyword()?;
594 while self.consume(&Token::Dot)? {
595 let next = self.expect_ident_or_keyword()?;
596 path = format!("{path}.{next}");
597 }
598 self.expect(Token::RParen)?;
599 Ok(AlterOperation::EnableTenancy { column: path })
600 } else {
601 self.expect(Token::Row)?;
602 self.expect(Token::Level)?;
603 self.expect(Token::Security)?;
604 Ok(AlterOperation::EnableRowLevelSecurity)
605 }
606 } else if self.consume(&Token::Disable)? {
607 if self.consume_ident_ci("EVENTS")? {
609 Ok(AlterOperation::DisableEvents)
610 } else if self.consume_ident_ci("TENANCY")? {
611 Ok(AlterOperation::DisableTenancy)
612 } else {
613 self.expect(Token::Row)?;
614 self.expect(Token::Level)?;
615 self.expect(Token::Security)?;
616 Ok(AlterOperation::DisableRowLevelSecurity)
617 }
618 } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
619 if self.consume_ident_ci("APPEND_ONLY")? {
621 let on = self.parse_bool_assign()?;
622 Ok(AlterOperation::SetAppendOnly(on))
623 } else if self.consume_ident_ci("VERSIONED")? {
624 let on = self.parse_bool_assign()?;
625 Ok(AlterOperation::SetVersioned(on))
626 } else {
627 Err(ParseError::expected(
628 vec!["APPEND_ONLY", "VERSIONED"],
629 self.peek(),
630 self.position(),
631 ))
632 }
633 } else {
634 Err(ParseError::expected(
635 vec![
636 "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
637 ],
638 self.peek(),
639 self.position(),
640 ))
641 }
642 }
643
644 fn parse_subscription_descriptor(
645 &mut self,
646 source: String,
647 ) -> Result<SubscriptionDescriptor, ParseError> {
648 let mut ops_filter = Vec::new();
649 if self.consume(&Token::LParen)? {
650 loop {
651 let op = if self.consume(&Token::Insert)? {
652 SubscriptionOperation::Insert
653 } else if self.consume(&Token::Update)? {
654 SubscriptionOperation::Update
655 } else if self.consume(&Token::Delete)? {
656 SubscriptionOperation::Delete
657 } else {
658 return Err(ParseError::expected(
659 vec!["INSERT", "UPDATE", "DELETE"],
660 self.peek(),
661 self.position(),
662 ));
663 };
664 ops_filter.push(op);
665 if !self.consume(&Token::Comma)? {
666 break;
667 }
668 }
669 self.expect(Token::RParen)?;
670 }
671
672 let target_queue = if self.consume(&Token::To)? {
673 self.expect_ident()?
674 } else {
675 format!("{source}_events")
676 };
677
678 let mut redact_fields = Vec::new();
679 if self.consume_ident_ci("REDACT")? {
680 self.expect(Token::LParen)?;
681 loop {
682 redact_fields.push(self.parse_dotted_redact_path()?);
683 if !self.consume(&Token::Comma)? {
684 break;
685 }
686 }
687 self.expect(Token::RParen)?;
688 }
689
690 let where_filter = if self.consume(&Token::Where)? {
691 Some(self.collect_subscription_where_filter()?)
692 } else {
693 None
694 };
695
696 let all_tenants = if self.consume(&Token::On)? {
698 self.expect(Token::All)?;
699 if !self.consume_ident_ci("TENANTS")? {
700 return Err(ParseError::expected(
701 vec!["TENANTS"],
702 self.peek(),
703 self.position(),
704 ));
705 }
706 true
707 } else {
708 false
709 };
710
711 if self.consume_ident_ci("REQUIRES")? {
713 self.consume_ident_ci("CAPABILITY")?;
714 self.advance()?;
716 }
717
718 Ok(SubscriptionDescriptor {
719 name: String::new(),
720 source,
721 target_queue,
722 ops_filter,
723 where_filter,
724 redact_fields,
725 enabled: true,
726 all_tenants,
727 })
728 }
729
730 fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
732 let mut parts = Vec::new();
733 if self.consume(&Token::Star)? {
734 parts.push("*".to_string());
735 } else {
736 parts.push(self.expect_ident_or_keyword()?);
737 }
738 while self.consume(&Token::Dot)? {
739 if self.consume(&Token::Star)? {
740 parts.push("*".to_string());
741 } else {
742 parts.push(self.expect_ident_or_keyword()?);
743 }
744 }
745 Ok(parts.join("."))
746 }
747
748 fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
749 let mut parts = Vec::new();
750 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
751 parts.push(self.peek().to_string());
752 self.advance()?;
753 }
754 if parts.is_empty() {
755 return Err(ParseError::expected(
756 vec!["predicate"],
757 self.peek(),
758 self.position(),
759 ));
760 }
761 Ok(parts.join(" "))
762 }
763
764 fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
769 let mut parts: Vec<String> = Vec::new();
770 while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
771 parts.push(self.peek().to_string());
772 self.advance()?;
773 }
774 Ok(parts.join(" "))
775 }
776
777 fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
779 let name = self.expect_column_ident()?;
780 let sql_type = self.parse_column_type()?;
781 let data_type = sql_type.to_string();
782
783 let mut def = CreateColumnDef {
784 name,
785 data_type,
786 sql_type: sql_type.clone(),
787 not_null: false,
788 default: None,
789 compress: None,
790 unique: false,
791 primary_key: false,
792 enum_variants: sql_type.enum_variants().unwrap_or_default(),
793 array_element: sql_type.array_element_type(),
794 decimal_precision: sql_type.decimal_precision(),
795 };
796
797 loop {
799 if self.match_not_null()? {
800 def.not_null = true;
801 } else if self.consume(&Token::Default)? {
802 self.expect(Token::Eq)?;
803 def.default = Some(self.parse_literal_string_for_ddl()?);
804 } else if self.consume(&Token::Compress)? {
805 self.expect(Token::Colon)?;
806 def.compress = Some(self.parse_integer()? as u8);
807 } else if self.consume(&Token::Unique)? {
808 def.unique = true;
809 } else if self.match_primary_key()? {
810 def.primary_key = true;
811 } else {
812 break;
813 }
814 }
815
816 Ok(def)
817 }
818
819 fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
821 let type_name = self.expect_ident_or_keyword()?;
822 if self.consume(&Token::LParen)? {
823 let inner = self.parse_type_params()?;
824 self.expect(Token::RParen)?;
825 Ok(SqlTypeName::new(type_name).with_modifiers(inner))
826 } else {
827 Ok(SqlTypeName::new(type_name))
828 }
829 }
830
831 fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
833 let mut parts = Vec::new();
834 loop {
835 match self.peek().clone() {
836 Token::String(s) => {
837 let s = s.clone();
838 self.advance()?;
839 parts.push(TypeModifier::StringLiteral(s));
840 }
841 Token::Integer(n) => {
842 self.advance()?;
843 parts.push(TypeModifier::Number(n as u32));
844 }
845 _ => {
846 parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
847 }
848 }
849 if !self.consume(&Token::Comma)? {
850 break;
851 }
852 }
853 Ok(parts)
854 }
855
856 fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
858 match self.peek().clone() {
859 Token::String(s) => {
860 let s = s.clone();
861 self.advance()?;
862 Ok(s)
863 }
864 Token::Integer(n) => {
865 self.advance()?;
866 Ok(n.to_string())
867 }
868 Token::Float(n) => {
869 self.advance()?;
870 Ok(n.to_string())
871 }
872 Token::True => {
873 self.advance()?;
874 Ok("true".to_string())
875 }
876 Token::False => {
877 self.advance()?;
878 Ok("false".to_string())
879 }
880 Token::Null => {
881 self.advance()?;
882 Ok("null".to_string())
883 }
884 ref other => Err(ParseError::expected(
885 vec!["string", "number", "true", "false", "null"],
886 other,
887 self.position(),
888 )),
889 }
890 }
891
892 fn check_ttl_keyword(&self) -> bool {
893 matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
894 }
895
896 fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
899 self.expect(Token::Eq)?;
900 match self.peek() {
901 Token::True => {
902 self.advance()?;
903 Ok(true)
904 }
905 Token::False => {
906 self.advance()?;
907 Ok(false)
908 }
909 other => Err(ParseError::expected(
910 vec!["true", "false"],
911 other,
912 self.position(),
913 )),
914 }
915 }
916
917 fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
918 if self.consume_ident_ci(expected)? {
919 Ok(())
920 } else {
921 Err(ParseError::expected(
922 vec![expected],
923 self.peek(),
924 self.position(),
925 ))
926 }
927 }
928
929 fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
930 let option_name = self.expect_ident_or_keyword()?;
931 if !option_name.eq_ignore_ascii_case("ttl") {
932 return Err(ParseError::new(
933 format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
937 self.position(),
938 ));
939 }
940
941 let ttl_value = self.parse_float()?;
942 let ttl_unit = match self.peek() {
943 Token::Ident(unit) => {
944 let unit = unit.clone();
945 self.advance()?;
946 unit
947 }
948 _ => "s".to_string(),
949 };
950
951 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
952 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
953 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
954 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
955 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
956 "d" | "day" | "days" => 86_400_000.0,
957 other => {
958 return Err(ParseError::new(
959 format!("unsupported TTL unit {other:?}"),
963 self.position(),
964 ));
965 }
966 };
967
968 if !ttl_value.is_finite() || ttl_value < 0.0 {
969 return Err(ParseError::new(
970 "TTL must be a finite, non-negative duration".to_string(),
971 self.position(),
972 ));
973 }
974
975 let ttl_ms = ttl_value * multiplier_ms;
976 if ttl_ms > u64::MAX as f64 {
977 return Err(ParseError::new(
978 "TTL duration is too large".to_string(),
979 self.position(),
980 ));
981 }
982 if ttl_ms.fract().abs() >= f64::EPSILON {
983 return Err(ParseError::new(
984 "TTL duration must resolve to a whole number of milliseconds".to_string(),
985 self.position(),
986 ));
987 }
988
989 Ok(Some(ttl_ms as u64))
990 }
991
992 pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
994 if self.check(&Token::If) {
995 self.advance()?;
996 self.expect(Token::Not)?;
997 self.expect(Token::Exists)?;
998 Ok(true)
999 } else {
1000 Ok(false)
1001 }
1002 }
1003
1004 pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1006 if self.check(&Token::If) {
1007 self.advance()?;
1008 self.expect(Token::Exists)?;
1009 Ok(true)
1010 } else {
1011 Ok(false)
1012 }
1013 }
1014
1015 fn match_not_null(&mut self) -> Result<bool, ParseError> {
1017 if self.check(&Token::Not) {
1018 self.advance()?; if self.check(&Token::Null) {
1022 self.advance()?; Ok(true)
1024 } else {
1025 Err(ParseError::expected(
1029 vec!["NULL (after NOT)"],
1030 self.peek(),
1031 self.position(),
1032 ))
1033 }
1034 } else {
1035 Ok(false)
1036 }
1037 }
1038
1039 fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1041 if self.check(&Token::Primary) {
1042 self.advance()?;
1043 self.expect(Token::Key)?;
1044 Ok(true)
1045 } else {
1046 Ok(false)
1047 }
1048 }
1049}