Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateColumnDef, CreateTableQuery, DropCollectionQuery,
5    DropDocumentQuery, DropGraphQuery, DropKvQuery, DropTableQuery, DropVectorQuery,
6    ExplainAlterQuery, ExplainFormat, PartitionKind, PartitionSpec, QueryExpr, TruncateQuery,
7};
8use super::super::lexer::Token;
9use super::error::ParseError;
10use super::Parser;
11use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
12use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
13
14impl<'a> Parser<'a> {
15    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
16    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
17        self.expect(Token::Create)?;
18        self.expect(Token::Table)?;
19
20        let if_not_exists = self.match_if_not_exists()?;
21        let name = self.expect_ident()?;
22
23        self.expect(Token::LParen)?;
24        let mut columns = Vec::new();
25        loop {
26            let col = self.parse_column_def()?;
27            columns.push(col);
28            if !self.consume(&Token::Comma)? {
29                break;
30            }
31        }
32        self.expect(Token::RParen)?;
33
34        let mut default_ttl_ms = None;
35        let mut context_index_fields = Vec::new();
36        let mut context_index_enabled = false;
37        let mut timestamps = false;
38        let mut subscriptions = Vec::new();
39
40        while self.consume(&Token::With)? {
41            if self.consume_ident_ci("EVENTS")? {
42                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
43            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
44                context_index_enabled = self.parse_bool_assign()?;
45            } else if self.consume_ident_ci("CONTEXT")? {
46                // Consume INDEX token (reserved keyword)
47                if !self.consume(&Token::Index)? {
48                    return Err(ParseError::expected(
49                        vec!["INDEX"],
50                        self.peek(),
51                        self.position(),
52                    ));
53                }
54                self.expect(Token::On)?;
55                self.expect(Token::LParen)?;
56                loop {
57                    context_index_fields.push(self.expect_ident()?);
58                    if !self.consume(&Token::Comma)? {
59                        break;
60                    }
61                }
62                self.expect(Token::RParen)?;
63                context_index_enabled = true;
64            } else if self.consume_ident_ci("TIMESTAMPS")? {
65                timestamps = self.parse_bool_assign()?;
66            } else {
67                default_ttl_ms = self.parse_create_table_ttl_clause()?;
68            }
69        }
70
71        Ok(QueryExpr::CreateTable(CreateTableQuery {
72            collection_model: CollectionModel::Table,
73            name,
74            columns,
75            if_not_exists,
76            default_ttl_ms,
77            context_index_fields,
78            context_index_enabled,
79            timestamps,
80            partition_by: None,
81            tenant_by: None,
82            append_only: false,
83            subscriptions,
84            vault_own_master_key: false,
85        }))
86    }
87
88    /// Parse: DROP TABLE [IF EXISTS] name
89    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
90        self.expect(Token::Drop)?;
91        self.expect(Token::Table)?;
92        self.parse_drop_table_body()
93    }
94
95    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
96    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
97        let if_not_exists = self.match_if_not_exists()?;
98        let name = self.expect_ident()?;
99
100        self.expect(Token::LParen)?;
101        let mut columns = Vec::new();
102        loop {
103            let col = self.parse_column_def()?;
104            columns.push(col);
105            if !self.consume(&Token::Comma)? {
106                break;
107            }
108        }
109        self.expect(Token::RParen)?;
110
111        let mut default_ttl_ms = None;
112        let mut context_index_fields = Vec::new();
113        let mut context_index_enabled = false;
114        let mut timestamps = false;
115        let mut tenant_by: Option<String> = None;
116        let mut append_only = false;
117        let mut subscriptions = Vec::new();
118
119        while self.consume(&Token::With)? {
120            if self.consume_ident_ci("EVENTS")? {
121                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
122                continue;
123            }
124            // Accept both spellings:
125            //   WITH key = value
126            //   WITH (key = value, key = value)
127            // Postgres / ClickHouse use the parenthesised form; the
128            // bare form is our legacy shorthand. The parenthesised
129            // form collects options separated by commas until `)`.
130            let has_parens = self.consume(&Token::LParen)?;
131
132            loop {
133                if self.consume_ident_ci("CONTEXT_INDEX")? {
134                    context_index_enabled = self.parse_bool_assign()?;
135                } else if self.consume_ident_ci("CONTEXT")? {
136                    if !self.consume(&Token::Index)? {
137                        return Err(ParseError::expected(
138                            vec!["INDEX"],
139                            self.peek(),
140                            self.position(),
141                        ));
142                    }
143                    self.expect(Token::On)?;
144                    self.expect(Token::LParen)?;
145                    loop {
146                        context_index_fields.push(self.expect_ident()?);
147                        if !self.consume(&Token::Comma)? {
148                            break;
149                        }
150                    }
151                    self.expect(Token::RParen)?;
152                    context_index_enabled = true;
153                } else if self.consume_ident_ci("TIMESTAMPS")? {
154                    timestamps = self.parse_bool_assign()?;
155                } else if self.consume_ident_ci("APPEND_ONLY")? {
156                    append_only = self.parse_bool_assign()?;
157                } else if self.consume_ident_ci("TENANT_BY")? {
158                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
159                    // and expects a string literal column name.
160                    let _ = self.consume(&Token::Eq)?;
161                    let value = self.parse_literal_value()?;
162                    match value {
163                        Value::Text(col) => tenant_by = Some(col.to_string()),
164                        other => {
165                            return Err(ParseError::new(
166                                format!("WITH tenant_by expects a text literal, got {other:?}"),
167                                self.position(),
168                            ));
169                        }
170                    }
171                } else {
172                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
173                }
174                if has_parens {
175                    if self.consume(&Token::Comma)? {
176                        continue;
177                    }
178                    self.expect(Token::RParen)?;
179                }
180                break;
181            }
182        }
183
184        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
185        let partition_by = if self.consume(&Token::Partition)? {
186            self.expect(Token::By)?;
187            let kind = if self.consume(&Token::Range)? {
188                PartitionKind::Range
189            } else if self.consume(&Token::List)? {
190                PartitionKind::List
191            } else if self.consume(&Token::Hash)? {
192                PartitionKind::Hash
193            } else {
194                return Err(ParseError::expected(
195                    vec!["RANGE", "LIST", "HASH"],
196                    self.peek(),
197                    self.position(),
198                ));
199            };
200            self.expect(Token::LParen)?;
201            let column = self.expect_ident()?;
202            self.expect(Token::RParen)?;
203            Some(PartitionSpec { kind, column })
204        } else {
205            None
206        };
207
208        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
209        // style). Accepted after partition spec / tenant spec / or on
210        // its own. `WITH (append_only = true)` is the other form and
211        // handled above.
212        if !append_only && self.consume_ident_ci("APPEND")? {
213            if !self.consume_ident_ci("ONLY")? {
214                return Err(ParseError::expected(
215                    vec!["ONLY"],
216                    self.peek(),
217                    self.position(),
218                ));
219            }
220            append_only = true;
221        }
222
223        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
224        // trailing clause (after partition spec if both are used).
225        //
226        // Dotted paths let non-table models declare tenancy over their
227        // natural nested structures — `metadata.tenant` for vectors,
228        // `payload.tenant` for queue messages, `tags.cluster` for
229        // timeseries, `properties.org` for graphs. The read-path
230        // resolver already navigates these paths via
231        // `resolve_runtime_document_path`; here we just store the
232        // dotted string and let the policy evaluator do the rest.
233        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
234            self.expect(Token::By)?;
235            self.expect(Token::LParen)?;
236            // Allow keyword-idents (`metadata`, `type`, `data`) as
237            // column names — SQL treats them as bare identifiers in
238            // this context.
239            let mut path = self.expect_ident_or_keyword()?;
240            while self.consume(&Token::Dot)? {
241                let next = self.expect_ident_or_keyword()?;
242                path = format!("{path}.{next}");
243            }
244            self.expect(Token::RParen)?;
245            tenant_by = Some(path);
246        }
247
248        Ok(QueryExpr::CreateTable(CreateTableQuery {
249            collection_model: CollectionModel::Table,
250            name,
251            columns,
252            if_not_exists,
253            default_ttl_ms,
254            context_index_fields,
255            context_index_enabled,
256            timestamps,
257            partition_by,
258            tenant_by,
259            append_only,
260            subscriptions,
261            vault_own_master_key: false,
262        }))
263    }
264
265    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
266    ///
267    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
268    /// difference between the table's current contract and the target CREATE
269    /// TABLE body.
270    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
271        self.expect(Token::Explain)?;
272        self.expect(Token::Alter)?;
273        self.expect(Token::For)?;
274        self.expect(Token::Create)?;
275        self.expect(Token::Table)?;
276
277        let body = self.parse_create_table_body()?;
278        let target = match body {
279            QueryExpr::CreateTable(t) => t,
280            _ => {
281                return Err(ParseError::new(
282                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
283                        .to_string(),
284                    self.position(),
285                ));
286            }
287        };
288
289        let format = if self.consume(&Token::Format)? {
290            if self.consume(&Token::Json)? {
291                ExplainFormat::Json
292            } else if self.consume_ident_ci("SQL")? {
293                ExplainFormat::Sql
294            } else {
295                return Err(ParseError::expected(
296                    vec!["JSON", "SQL"],
297                    self.peek(),
298                    self.position(),
299                ));
300            }
301        } else {
302            ExplainFormat::Sql
303        };
304
305        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
306            target,
307            format,
308        }))
309    }
310
311    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
312    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
313        let if_exists = self.match_if_exists()?;
314        let name = self.parse_drop_collection_name()?;
315        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
316    }
317
318    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
319        let if_exists = self.match_if_exists()?;
320        let name = self.parse_drop_collection_name()?;
321        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
322    }
323
324    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
325        let if_exists = self.match_if_exists()?;
326        let name = self.parse_drop_collection_name()?;
327        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
328    }
329
330    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
331        let if_exists = self.match_if_exists()?;
332        let name = self.parse_drop_collection_name()?;
333        Ok(QueryExpr::DropDocument(DropDocumentQuery {
334            name,
335            if_exists,
336        }))
337    }
338
339    pub fn parse_create_keyed_body(
340        &mut self,
341        model: CollectionModel,
342    ) -> Result<QueryExpr, ParseError> {
343        let if_not_exists = self.match_if_not_exists()?;
344        let name = self.parse_drop_collection_name()?;
345        let vault_own_master_key =
346            if model == CollectionModel::Vault && self.consume(&Token::With)? {
347                if !self.consume_ident_ci("OWN")? {
348                    return Err(ParseError::expected(
349                        vec!["OWN"],
350                        self.peek(),
351                        self.position(),
352                    ));
353                }
354                if !self.consume_ident_ci("MASTER")? {
355                    return Err(ParseError::expected(
356                        vec!["MASTER"],
357                        self.peek(),
358                        self.position(),
359                    ));
360                }
361                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
362                    return Err(ParseError::expected(
363                        vec!["KEY"],
364                        self.peek(),
365                        self.position(),
366                    ));
367                }
368                true
369            } else {
370                false
371            };
372        Ok(QueryExpr::CreateTable(CreateTableQuery {
373            collection_model: model,
374            name,
375            columns: Vec::new(),
376            if_not_exists,
377            default_ttl_ms: None,
378            context_index_fields: Vec::new(),
379            context_index_enabled: false,
380            timestamps: false,
381            partition_by: None,
382            tenant_by: None,
383            append_only: false,
384            subscriptions: Vec::new(),
385            vault_own_master_key,
386        }))
387    }
388
389    pub fn parse_drop_keyed_body(
390        &mut self,
391        model: CollectionModel,
392    ) -> Result<QueryExpr, ParseError> {
393        let if_exists = self.match_if_exists()?;
394        let name = self.parse_drop_collection_name()?;
395        Ok(QueryExpr::DropKv(DropKvQuery {
396            name,
397            if_exists,
398            model,
399        }))
400    }
401
402    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
403        self.parse_drop_keyed_body(CollectionModel::Kv)
404    }
405
406    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
407        let if_exists = self.match_if_exists()?;
408        let name = self.parse_drop_collection_name()?;
409        Ok(QueryExpr::DropCollection(DropCollectionQuery {
410            name,
411            if_exists,
412        }))
413    }
414
415    pub fn parse_truncate_body(
416        &mut self,
417        model: Option<CollectionModel>,
418    ) -> Result<QueryExpr, ParseError> {
419        let if_exists = self.match_if_exists()?;
420        let name = self.parse_drop_collection_name()?;
421        Ok(QueryExpr::Truncate(TruncateQuery {
422            name,
423            model,
424            if_exists,
425        }))
426    }
427
428    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
429        let mut name = self.expect_ident()?;
430        while self.consume(&Token::Dot)? {
431            if self.consume(&Token::Star)? {
432                name.push_str(".*");
433                break;
434            }
435            let next = self.expect_ident_or_keyword()?;
436            name = format!("{name}.{next}");
437        }
438        Ok(name)
439    }
440
441    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
442    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
443        self.expect(Token::Alter)?;
444        self.expect(Token::Table)?;
445        let name = self.expect_ident()?;
446
447        let mut operations = Vec::new();
448        loop {
449            let op = self.parse_alter_operation(&name)?;
450            operations.push(op);
451            if !self.consume(&Token::Comma)? {
452                break;
453            }
454        }
455
456        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
457    }
458
459    /// Parse a single ALTER TABLE operation
460    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
461        if self.consume(&Token::Add)? {
462            if self.consume_ident_ci("SUBSCRIPTION")? {
463                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
464                let sub_name = self.expect_ident()?;
465                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
466                Ok(AlterOperation::AddSubscription {
467                    name: sub_name,
468                    descriptor,
469                })
470            } else {
471                // ADD COLUMN definition (COLUMN keyword is optional)
472                let _ = self.consume(&Token::Column)?;
473                let col_def = self.parse_column_def()?;
474                Ok(AlterOperation::AddColumn(col_def))
475            }
476        } else if self.consume(&Token::Drop)? {
477            if self.consume_ident_ci("SUBSCRIPTION")? {
478                // DROP SUBSCRIPTION name
479                let sub_name = self.expect_ident()?;
480                Ok(AlterOperation::DropSubscription { name: sub_name })
481            } else {
482                // DROP COLUMN name (COLUMN keyword is optional)
483                let _ = self.consume(&Token::Column)?;
484                let col_name = self.expect_ident()?;
485                Ok(AlterOperation::DropColumn(col_name))
486            }
487        } else if self.consume(&Token::Rename)? {
488            // RENAME COLUMN from TO to
489            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
490            let from = self.expect_ident()?;
491            self.expect(Token::To)?;
492            let to = self.expect_ident()?;
493            Ok(AlterOperation::RenameColumn { from, to })
494        } else if self.consume(&Token::Attach)? {
495            // ATTACH PARTITION child FOR VALUES ...
496            self.expect(Token::Partition)?;
497            let child = self.expect_ident()?;
498            self.expect(Token::For)?;
499            // Accept `VALUES` as an ident since the grammar doesn't have it
500            // as a reserved keyword everywhere. Collect the remaining tokens
501            // as a raw bound string for round-trip persistence.
502            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
503                return Err(ParseError::expected(
504                    vec!["VALUES"],
505                    self.peek(),
506                    self.position(),
507                ));
508            }
509            let bound = self.collect_remaining_tokens_as_string()?;
510            Ok(AlterOperation::AttachPartition { child, bound })
511        } else if self.consume(&Token::Detach)? {
512            // DETACH PARTITION child
513            self.expect(Token::Partition)?;
514            let child = self.expect_ident()?;
515            Ok(AlterOperation::DetachPartition { child })
516        } else if self.consume(&Token::Enable)? {
517            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
518            if self.consume_ident_ci("EVENTS")? {
519                Ok(AlterOperation::EnableEvents(
520                    self.parse_subscription_descriptor(table_name.to_string())?,
521                ))
522            } else if self.consume_ident_ci("TENANCY")? {
523                self.expect(Token::On)?;
524                self.expect(Token::LParen)?;
525                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
526                let mut path = self.expect_ident_or_keyword()?;
527                while self.consume(&Token::Dot)? {
528                    let next = self.expect_ident_or_keyword()?;
529                    path = format!("{path}.{next}");
530                }
531                self.expect(Token::RParen)?;
532                Ok(AlterOperation::EnableTenancy { column: path })
533            } else {
534                self.expect(Token::Row)?;
535                self.expect(Token::Level)?;
536                self.expect(Token::Security)?;
537                Ok(AlterOperation::EnableRowLevelSecurity)
538            }
539        } else if self.consume(&Token::Disable)? {
540            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
541            if self.consume_ident_ci("EVENTS")? {
542                Ok(AlterOperation::DisableEvents)
543            } else if self.consume_ident_ci("TENANCY")? {
544                Ok(AlterOperation::DisableTenancy)
545            } else {
546                self.expect(Token::Row)?;
547                self.expect(Token::Level)?;
548                self.expect(Token::Security)?;
549                Ok(AlterOperation::DisableRowLevelSecurity)
550            }
551        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
552            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
553            if self.consume_ident_ci("APPEND_ONLY")? {
554                let on = self.parse_bool_assign()?;
555                Ok(AlterOperation::SetAppendOnly(on))
556            } else if self.consume_ident_ci("VERSIONED")? {
557                let on = self.parse_bool_assign()?;
558                Ok(AlterOperation::SetVersioned(on))
559            } else {
560                Err(ParseError::expected(
561                    vec!["APPEND_ONLY", "VERSIONED"],
562                    self.peek(),
563                    self.position(),
564                ))
565            }
566        } else {
567            Err(ParseError::expected(
568                vec![
569                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
570                ],
571                self.peek(),
572                self.position(),
573            ))
574        }
575    }
576
577    fn parse_subscription_descriptor(
578        &mut self,
579        source: String,
580    ) -> Result<SubscriptionDescriptor, ParseError> {
581        let mut ops_filter = Vec::new();
582        if self.consume(&Token::LParen)? {
583            loop {
584                let op = if self.consume(&Token::Insert)? {
585                    SubscriptionOperation::Insert
586                } else if self.consume(&Token::Update)? {
587                    SubscriptionOperation::Update
588                } else if self.consume(&Token::Delete)? {
589                    SubscriptionOperation::Delete
590                } else {
591                    return Err(ParseError::expected(
592                        vec!["INSERT", "UPDATE", "DELETE"],
593                        self.peek(),
594                        self.position(),
595                    ));
596                };
597                ops_filter.push(op);
598                if !self.consume(&Token::Comma)? {
599                    break;
600                }
601            }
602            self.expect(Token::RParen)?;
603        }
604
605        let target_queue = if self.consume(&Token::To)? {
606            self.expect_ident()?
607        } else {
608            format!("{source}_events")
609        };
610
611        let mut redact_fields = Vec::new();
612        if self.consume_ident_ci("REDACT")? {
613            self.expect(Token::LParen)?;
614            loop {
615                redact_fields.push(self.parse_dotted_redact_path()?);
616                if !self.consume(&Token::Comma)? {
617                    break;
618                }
619            }
620            self.expect(Token::RParen)?;
621        }
622
623        let where_filter = if self.consume(&Token::Where)? {
624            Some(self.collect_subscription_where_filter()?)
625        } else {
626            None
627        };
628
629        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
630        let all_tenants = if self.consume(&Token::On)? {
631            self.expect(Token::All)?;
632            if !self.consume_ident_ci("TENANTS")? {
633                return Err(ParseError::expected(
634                    vec!["TENANTS"],
635                    self.peek(),
636                    self.position(),
637                ));
638            }
639            true
640        } else {
641            false
642        };
643
644        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
645        if self.consume_ident_ci("REQUIRES")? {
646            self.consume_ident_ci("CAPABILITY")?;
647            // consume the capability string literal token
648            self.advance()?;
649        }
650
651        Ok(SubscriptionDescriptor {
652            name: String::new(),
653            source,
654            target_queue,
655            ops_filter,
656            where_filter,
657            redact_fields,
658            enabled: true,
659            all_tenants,
660        })
661    }
662
663    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
664    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
665        let mut parts = Vec::new();
666        if self.consume(&Token::Star)? {
667            parts.push("*".to_string());
668        } else {
669            parts.push(self.expect_ident_or_keyword()?);
670        }
671        while self.consume(&Token::Dot)? {
672            if self.consume(&Token::Star)? {
673                parts.push("*".to_string());
674            } else {
675                parts.push(self.expect_ident_or_keyword()?);
676            }
677        }
678        Ok(parts.join("."))
679    }
680
681    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
682        let mut parts = Vec::new();
683        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
684            parts.push(self.peek().to_string());
685            self.advance()?;
686        }
687        if parts.is_empty() {
688            return Err(ParseError::expected(
689                vec!["predicate"],
690                self.peek(),
691                self.position(),
692            ));
693        }
694        Ok(parts.join(" "))
695    }
696
697    /// Capture remaining tokens as a display-joined string.
698    ///
699    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
700    /// bound clause into storage without needing a dedicated per-kind AST.
701    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
702        let mut parts: Vec<String> = Vec::new();
703        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
704            parts.push(self.peek().to_string());
705            self.advance()?;
706        }
707        Ok(parts.join(" "))
708    }
709
710    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
711    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
712        let name = self.expect_ident()?;
713        let sql_type = self.parse_column_type()?;
714        let data_type = sql_type.to_string();
715
716        let mut def = CreateColumnDef {
717            name,
718            data_type,
719            sql_type: sql_type.clone(),
720            not_null: false,
721            default: None,
722            compress: None,
723            unique: false,
724            primary_key: false,
725            enum_variants: sql_type.enum_variants().unwrap_or_default(),
726            array_element: sql_type.array_element_type(),
727            decimal_precision: sql_type.decimal_precision(),
728        };
729
730        // Parse modifiers in any order
731        loop {
732            if self.match_not_null()? {
733                def.not_null = true;
734            } else if self.consume(&Token::Default)? {
735                self.expect(Token::Eq)?;
736                def.default = Some(self.parse_literal_string_for_ddl()?);
737            } else if self.consume(&Token::Compress)? {
738                self.expect(Token::Colon)?;
739                def.compress = Some(self.parse_integer()? as u8);
740            } else if self.consume(&Token::Unique)? {
741                def.unique = true;
742            } else if self.match_primary_key()? {
743                def.primary_key = true;
744            } else {
745                break;
746            }
747        }
748
749        Ok(def)
750    }
751
752    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
753    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
754        let type_name = self.expect_ident_or_keyword()?;
755        if self.consume(&Token::LParen)? {
756            let inner = self.parse_type_params()?;
757            self.expect(Token::RParen)?;
758            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
759        } else {
760            Ok(SqlTypeName::new(type_name))
761        }
762    }
763
764    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
765    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
766        let mut parts = Vec::new();
767        loop {
768            match self.peek().clone() {
769                Token::String(s) => {
770                    let s = s.clone();
771                    self.advance()?;
772                    parts.push(TypeModifier::StringLiteral(s));
773                }
774                Token::Integer(n) => {
775                    self.advance()?;
776                    parts.push(TypeModifier::Number(n as u32));
777                }
778                _ => {
779                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
780                }
781            }
782            if !self.consume(&Token::Comma)? {
783                break;
784            }
785        }
786        Ok(parts)
787    }
788
789    /// Parse a literal string value for DDL DEFAULT expressions
790    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
791        match self.peek().clone() {
792            Token::String(s) => {
793                let s = s.clone();
794                self.advance()?;
795                Ok(s)
796            }
797            Token::Integer(n) => {
798                self.advance()?;
799                Ok(n.to_string())
800            }
801            Token::Float(n) => {
802                self.advance()?;
803                Ok(n.to_string())
804            }
805            Token::True => {
806                self.advance()?;
807                Ok("true".to_string())
808            }
809            Token::False => {
810                self.advance()?;
811                Ok("false".to_string())
812            }
813            Token::Null => {
814                self.advance()?;
815                Ok("null".to_string())
816            }
817            ref other => Err(ParseError::expected(
818                vec!["string", "number", "true", "false", "null"],
819                other,
820                self.position(),
821            )),
822        }
823    }
824
825    fn check_ttl_keyword(&self) -> bool {
826        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
827    }
828
829    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
830    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
831    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
832        self.expect(Token::Eq)?;
833        match self.peek() {
834            Token::True => {
835                self.advance()?;
836                Ok(true)
837            }
838            Token::False => {
839                self.advance()?;
840                Ok(false)
841            }
842            other => Err(ParseError::expected(
843                vec!["true", "false"],
844                other,
845                self.position(),
846            )),
847        }
848    }
849
850    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
851        if self.consume_ident_ci(expected)? {
852            Ok(())
853        } else {
854            Err(ParseError::expected(
855                vec![expected],
856                self.peek(),
857                self.position(),
858            ))
859        }
860    }
861
862    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
863        let option_name = self.expect_ident_or_keyword()?;
864        if !option_name.eq_ignore_ascii_case("ttl") {
865            return Err(ParseError::new(
866                // F-05: `option_name` is caller-controlled identifier text.
867                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
868                // before the message reaches downstream serialization sinks.
869                format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
870                self.position(),
871            ));
872        }
873
874        let ttl_value = self.parse_float()?;
875        let ttl_unit = match self.peek() {
876            Token::Ident(unit) => {
877                let unit = unit.clone();
878                self.advance()?;
879                unit
880            }
881            _ => "s".to_string(),
882        };
883
884        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
885            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
886            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
887            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
888            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
889            "d" | "day" | "days" => 86_400_000.0,
890            other => {
891                return Err(ParseError::new(
892                    // F-05: render `other` via `{:?}` so caller-controlled
893                    // bytes (CR / LF / NUL / quotes) are escaped before
894                    // reaching downstream serialization sinks.
895                    format!("unsupported TTL unit {other:?}"),
896                    self.position(),
897                ));
898            }
899        };
900
901        if !ttl_value.is_finite() || ttl_value < 0.0 {
902            return Err(ParseError::new(
903                "TTL must be a finite, non-negative duration".to_string(),
904                self.position(),
905            ));
906        }
907
908        let ttl_ms = ttl_value * multiplier_ms;
909        if ttl_ms > u64::MAX as f64 {
910            return Err(ParseError::new(
911                "TTL duration is too large".to_string(),
912                self.position(),
913            ));
914        }
915        if ttl_ms.fract().abs() >= f64::EPSILON {
916            return Err(ParseError::new(
917                "TTL duration must resolve to a whole number of milliseconds".to_string(),
918                self.position(),
919            ));
920        }
921
922        Ok(Some(ttl_ms as u64))
923    }
924
925    /// Try to match IF NOT EXISTS sequence
926    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
927        if self.check(&Token::If) {
928            self.advance()?;
929            self.expect(Token::Not)?;
930            self.expect(Token::Exists)?;
931            Ok(true)
932        } else {
933            Ok(false)
934        }
935    }
936
937    /// Try to match IF EXISTS sequence
938    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
939        if self.check(&Token::If) {
940            self.advance()?;
941            self.expect(Token::Exists)?;
942            Ok(true)
943        } else {
944            Ok(false)
945        }
946    }
947
948    /// Try to match NOT NULL sequence
949    fn match_not_null(&mut self) -> Result<bool, ParseError> {
950        if self.check(&Token::Not) {
951            // Peek ahead - only consume if followed by NULL
952            // We need to be careful: save state and try
953            self.advance()?; // consume NOT
954            if self.check(&Token::Null) {
955                self.advance()?; // consume NULL
956                Ok(true)
957            } else {
958                // This is tricky - NOT was consumed but next isn't NULL.
959                // In column modifier context, NOT should only appear before NULL.
960                // Return error for clarity.
961                Err(ParseError::expected(
962                    vec!["NULL (after NOT)"],
963                    self.peek(),
964                    self.position(),
965                ))
966            }
967        } else {
968            Ok(false)
969        }
970    }
971
972    /// Try to match PRIMARY KEY sequence
973    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
974        if self.check(&Token::Primary) {
975            self.advance()?;
976            self.expect(Token::Key)?;
977            Ok(true)
978        } else {
979            Ok(false)
980        }
981    }
982}