Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7    PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            vault_own_master_key: false,
87        }))
88    }
89
90    /// Parse: DROP TABLE [IF EXISTS] name
91    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92        self.expect(Token::Drop)?;
93        self.expect(Token::Table)?;
94        self.parse_drop_table_body()
95    }
96
97    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
98    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99        let if_not_exists = self.match_if_not_exists()?;
100        let name = self.expect_ident()?;
101
102        self.expect(Token::LParen)?;
103        let mut columns = Vec::new();
104        loop {
105            let col = self.parse_column_def()?;
106            columns.push(col);
107            if !self.consume(&Token::Comma)? {
108                break;
109            }
110        }
111        self.expect(Token::RParen)?;
112
113        let mut default_ttl_ms = None;
114        let mut context_index_fields = Vec::new();
115        let mut context_index_enabled = false;
116        let mut timestamps = false;
117        let mut tenant_by: Option<String> = None;
118        let mut append_only = false;
119        let mut subscriptions = Vec::new();
120
121        while self.consume(&Token::With)? {
122            if self.consume_ident_ci("EVENTS")? {
123                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124                continue;
125            }
126            // Accept both spellings:
127            //   WITH key = value
128            //   WITH (key = value, key = value)
129            // Postgres / ClickHouse use the parenthesised form; the
130            // bare form is our legacy shorthand. The parenthesised
131            // form collects options separated by commas until `)`.
132            let has_parens = self.consume(&Token::LParen)?;
133
134            loop {
135                if self.consume_ident_ci("CONTEXT_INDEX")? {
136                    context_index_enabled = self.parse_bool_assign()?;
137                } else if self.consume_ident_ci("CONTEXT")? {
138                    if !self.consume(&Token::Index)? {
139                        return Err(ParseError::expected(
140                            vec!["INDEX"],
141                            self.peek(),
142                            self.position(),
143                        ));
144                    }
145                    self.expect(Token::On)?;
146                    self.expect(Token::LParen)?;
147                    loop {
148                        context_index_fields.push(self.expect_ident()?);
149                        if !self.consume(&Token::Comma)? {
150                            break;
151                        }
152                    }
153                    self.expect(Token::RParen)?;
154                    context_index_enabled = true;
155                } else if self.consume_ident_ci("TIMESTAMPS")? {
156                    timestamps = self.parse_bool_assign()?;
157                } else if self.consume_ident_ci("APPEND_ONLY")? {
158                    append_only = self.parse_bool_assign()?;
159                } else if self.consume_ident_ci("TENANT_BY")? {
160                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
161                    // and expects a string literal column name.
162                    let _ = self.consume(&Token::Eq)?;
163                    let value = self.parse_literal_value()?;
164                    match value {
165                        Value::Text(col) => tenant_by = Some(col.to_string()),
166                        other => {
167                            return Err(ParseError::new(
168                                format!("WITH tenant_by expects a text literal, got {other:?}"),
169                                self.position(),
170                            ));
171                        }
172                    }
173                } else {
174                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
175                }
176                if has_parens {
177                    if self.consume(&Token::Comma)? {
178                        continue;
179                    }
180                    self.expect(Token::RParen)?;
181                }
182                break;
183            }
184        }
185
186        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
187        let partition_by = if self.consume(&Token::Partition)? {
188            self.expect(Token::By)?;
189            let kind = if self.consume(&Token::Range)? {
190                PartitionKind::Range
191            } else if self.consume(&Token::List)? {
192                PartitionKind::List
193            } else if self.consume(&Token::Hash)? {
194                PartitionKind::Hash
195            } else {
196                return Err(ParseError::expected(
197                    vec!["RANGE", "LIST", "HASH"],
198                    self.peek(),
199                    self.position(),
200                ));
201            };
202            self.expect(Token::LParen)?;
203            let column = self.expect_ident()?;
204            self.expect(Token::RParen)?;
205            Some(PartitionSpec { kind, column })
206        } else {
207            None
208        };
209
210        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
211        // style). Accepted after partition spec / tenant spec / or on
212        // its own. `WITH (append_only = true)` is the other form and
213        // handled above.
214        if !append_only && self.consume_ident_ci("APPEND")? {
215            if !self.consume_ident_ci("ONLY")? {
216                return Err(ParseError::expected(
217                    vec!["ONLY"],
218                    self.peek(),
219                    self.position(),
220                ));
221            }
222            append_only = true;
223        }
224
225        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
226        // trailing clause (after partition spec if both are used).
227        //
228        // Dotted paths let non-table models declare tenancy over their
229        // natural nested structures — `metadata.tenant` for vectors,
230        // `payload.tenant` for queue messages, `tags.cluster` for
231        // timeseries, `properties.org` for graphs. The read-path
232        // resolver already navigates these paths via
233        // `resolve_runtime_document_path`; here we just store the
234        // dotted string and let the policy evaluator do the rest.
235        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236            self.expect(Token::By)?;
237            self.expect(Token::LParen)?;
238            // Allow keyword-idents (`metadata`, `type`, `data`) as
239            // column names — SQL treats them as bare identifiers in
240            // this context.
241            let mut path = self.expect_ident_or_keyword()?;
242            while self.consume(&Token::Dot)? {
243                let next = self.expect_ident_or_keyword()?;
244                path = format!("{path}.{next}");
245            }
246            self.expect(Token::RParen)?;
247            tenant_by = Some(path);
248        }
249
250        Ok(QueryExpr::CreateTable(CreateTableQuery {
251            collection_model: CollectionModel::Table,
252            name,
253            columns,
254            if_not_exists,
255            default_ttl_ms,
256            metrics_rollup_policies: Vec::new(),
257            context_index_fields,
258            context_index_enabled,
259            timestamps,
260            partition_by,
261            tenant_by,
262            append_only,
263            subscriptions,
264            vault_own_master_key: false,
265        }))
266    }
267
268    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
269    ///
270    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
271    /// difference between the table's current contract and the target CREATE
272    /// TABLE body.
273    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274        self.expect(Token::Explain)?;
275        self.expect(Token::Alter)?;
276        self.expect(Token::For)?;
277        self.expect(Token::Create)?;
278        self.expect(Token::Table)?;
279
280        let body = self.parse_create_table_body()?;
281        let target = match body {
282            QueryExpr::CreateTable(t) => t,
283            _ => {
284                return Err(ParseError::new(
285                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286                        .to_string(),
287                    self.position(),
288                ));
289            }
290        };
291
292        let format = if self.consume(&Token::Format)? {
293            if self.consume(&Token::Json)? {
294                ExplainFormat::Json
295            } else if self.consume_ident_ci("SQL")? {
296                ExplainFormat::Sql
297            } else {
298                return Err(ParseError::expected(
299                    vec!["JSON", "SQL"],
300                    self.peek(),
301                    self.position(),
302                ));
303            }
304        } else {
305            ExplainFormat::Sql
306        };
307
308        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309            target,
310            format,
311        }))
312    }
313
314    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
315    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316        let if_exists = self.match_if_exists()?;
317        let name = self.parse_drop_collection_name()?;
318        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319    }
320
321    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322        let if_exists = self.match_if_exists()?;
323        let name = self.parse_drop_collection_name()?;
324        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325    }
326
327    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328        let if_exists = self.match_if_exists()?;
329        let name = self.parse_drop_collection_name()?;
330        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331    }
332
333    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334        let if_exists = self.match_if_exists()?;
335        let name = self.parse_drop_collection_name()?;
336        Ok(QueryExpr::DropDocument(DropDocumentQuery {
337            name,
338            if_exists,
339        }))
340    }
341
342    pub fn parse_create_keyed_body(
343        &mut self,
344        model: CollectionModel,
345    ) -> Result<QueryExpr, ParseError> {
346        let if_not_exists = self.match_if_not_exists()?;
347        let name = self.parse_drop_collection_name()?;
348        let vault_own_master_key =
349            if model == CollectionModel::Vault && self.consume(&Token::With)? {
350                if !self.consume_ident_ci("OWN")? {
351                    return Err(ParseError::expected(
352                        vec!["OWN"],
353                        self.peek(),
354                        self.position(),
355                    ));
356                }
357                if !self.consume_ident_ci("MASTER")? {
358                    return Err(ParseError::expected(
359                        vec!["MASTER"],
360                        self.peek(),
361                        self.position(),
362                    ));
363                }
364                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365                    return Err(ParseError::expected(
366                        vec!["KEY"],
367                        self.peek(),
368                        self.position(),
369                    ));
370                }
371                true
372            } else {
373                false
374            };
375        Ok(QueryExpr::CreateTable(CreateTableQuery {
376            collection_model: model,
377            name,
378            columns: Vec::new(),
379            if_not_exists,
380            default_ttl_ms: None,
381            metrics_rollup_policies: Vec::new(),
382            context_index_fields: Vec::new(),
383            context_index_enabled: false,
384            timestamps: false,
385            partition_by: None,
386            tenant_by: None,
387            append_only: false,
388            subscriptions: Vec::new(),
389            vault_own_master_key,
390        }))
391    }
392
393    pub fn parse_create_collection_model_body(
394        &mut self,
395        model: CollectionModel,
396    ) -> Result<QueryExpr, ParseError> {
397        self.parse_create_keyed_body(model)
398    }
399
400    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401        let if_not_exists = self.match_if_not_exists()?;
402        let name = self.parse_drop_collection_name()?;
403        if !self.consume_ident_ci("KIND")? {
404            return Err(ParseError::expected(
405                vec!["KIND"],
406                self.peek(),
407                self.position(),
408            ));
409        }
410        let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
412            name,
413            kind,
414            if_not_exists,
415        }))
416    }
417
418    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
419        let if_not_exists = self.match_if_not_exists()?;
420        let name = self.parse_drop_collection_name()?;
421        if !self.consume_ident_ci("DIM")? {
422            return Err(ParseError::expected(
423                vec!["DIM"],
424                self.peek(),
425                self.position(),
426            ));
427        }
428        let dimension = self.parse_integer()?;
429        if dimension <= 0 {
430            return Err(ParseError::new(
431                "VECTOR DIM must be a positive integer".to_string(),
432                self.position(),
433            ));
434        }
435        let metric = if self.consume(&Token::Metric)? {
436            self.parse_distance_metric()?
437        } else {
438            crate::storage::engine::distance::DistanceMetric::Cosine
439        };
440        Ok(QueryExpr::CreateVector(CreateVectorQuery {
441            name,
442            dimension: dimension as usize,
443            metric,
444            if_not_exists,
445        }))
446    }
447
448    pub fn parse_drop_keyed_body(
449        &mut self,
450        model: CollectionModel,
451    ) -> Result<QueryExpr, ParseError> {
452        let if_exists = self.match_if_exists()?;
453        let name = self.parse_drop_collection_name()?;
454        Ok(QueryExpr::DropKv(DropKvQuery {
455            name,
456            if_exists,
457            model,
458        }))
459    }
460
461    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
462        self.parse_drop_keyed_body(CollectionModel::Kv)
463    }
464
465    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
466        self.parse_drop_collection_model_body(None)
467    }
468
469    pub fn parse_drop_collection_model_body(
470        &mut self,
471        model: Option<CollectionModel>,
472    ) -> Result<QueryExpr, ParseError> {
473        let if_exists = self.match_if_exists()?;
474        let name = self.parse_drop_collection_name()?;
475        Ok(QueryExpr::DropCollection(DropCollectionQuery {
476            name,
477            if_exists,
478            model,
479        }))
480    }
481
482    pub fn parse_truncate_body(
483        &mut self,
484        model: Option<CollectionModel>,
485    ) -> Result<QueryExpr, ParseError> {
486        let if_exists = self.match_if_exists()?;
487        let name = self.parse_drop_collection_name()?;
488        Ok(QueryExpr::Truncate(TruncateQuery {
489            name,
490            model,
491            if_exists,
492        }))
493    }
494
495    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
496        let mut name = self.expect_ident()?;
497        while self.consume(&Token::Dot)? {
498            if self.consume(&Token::Star)? {
499                name.push_str(".*");
500                break;
501            }
502            let next = self.expect_ident_or_keyword()?;
503            name = format!("{name}.{next}");
504        }
505        Ok(name)
506    }
507
508    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
509    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
510        self.expect(Token::Alter)?;
511        self.expect(Token::Table)?;
512        let name = self.expect_ident()?;
513
514        let mut operations = Vec::new();
515        loop {
516            let op = self.parse_alter_operation(&name)?;
517            operations.push(op);
518            if !self.consume(&Token::Comma)? {
519                break;
520            }
521        }
522
523        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
524    }
525
526    /// Parse a single ALTER TABLE operation
527    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
528        if self.consume(&Token::Add)? {
529            if self.consume_ident_ci("SUBSCRIPTION")? {
530                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
531                let sub_name = self.expect_ident()?;
532                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
533                Ok(AlterOperation::AddSubscription {
534                    name: sub_name,
535                    descriptor,
536                })
537            } else {
538                // ADD COLUMN definition (COLUMN keyword is optional)
539                let _ = self.consume(&Token::Column)?;
540                let col_def = self.parse_column_def()?;
541                Ok(AlterOperation::AddColumn(col_def))
542            }
543        } else if self.consume(&Token::Drop)? {
544            if self.consume_ident_ci("SUBSCRIPTION")? {
545                // DROP SUBSCRIPTION name
546                let sub_name = self.expect_ident()?;
547                Ok(AlterOperation::DropSubscription { name: sub_name })
548            } else {
549                // DROP COLUMN name (COLUMN keyword is optional)
550                let _ = self.consume(&Token::Column)?;
551                let col_name = self.expect_ident()?;
552                Ok(AlterOperation::DropColumn(col_name))
553            }
554        } else if self.consume(&Token::Rename)? {
555            // RENAME COLUMN from TO to
556            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
557            let from = self.expect_ident()?;
558            self.expect(Token::To)?;
559            let to = self.expect_ident()?;
560            Ok(AlterOperation::RenameColumn { from, to })
561        } else if self.consume(&Token::Attach)? {
562            // ATTACH PARTITION child FOR VALUES ...
563            self.expect(Token::Partition)?;
564            let child = self.expect_ident()?;
565            self.expect(Token::For)?;
566            // Accept `VALUES` as an ident since the grammar doesn't have it
567            // as a reserved keyword everywhere. Collect the remaining tokens
568            // as a raw bound string for round-trip persistence.
569            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
570                return Err(ParseError::expected(
571                    vec!["VALUES"],
572                    self.peek(),
573                    self.position(),
574                ));
575            }
576            let bound = self.collect_remaining_tokens_as_string()?;
577            Ok(AlterOperation::AttachPartition { child, bound })
578        } else if self.consume(&Token::Detach)? {
579            // DETACH PARTITION child
580            self.expect(Token::Partition)?;
581            let child = self.expect_ident()?;
582            Ok(AlterOperation::DetachPartition { child })
583        } else if self.consume(&Token::Enable)? {
584            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
585            if self.consume_ident_ci("EVENTS")? {
586                Ok(AlterOperation::EnableEvents(
587                    self.parse_subscription_descriptor(table_name.to_string())?,
588                ))
589            } else if self.consume_ident_ci("TENANCY")? {
590                self.expect(Token::On)?;
591                self.expect(Token::LParen)?;
592                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
593                let mut path = self.expect_ident_or_keyword()?;
594                while self.consume(&Token::Dot)? {
595                    let next = self.expect_ident_or_keyword()?;
596                    path = format!("{path}.{next}");
597                }
598                self.expect(Token::RParen)?;
599                Ok(AlterOperation::EnableTenancy { column: path })
600            } else {
601                self.expect(Token::Row)?;
602                self.expect(Token::Level)?;
603                self.expect(Token::Security)?;
604                Ok(AlterOperation::EnableRowLevelSecurity)
605            }
606        } else if self.consume(&Token::Disable)? {
607            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
608            if self.consume_ident_ci("EVENTS")? {
609                Ok(AlterOperation::DisableEvents)
610            } else if self.consume_ident_ci("TENANCY")? {
611                Ok(AlterOperation::DisableTenancy)
612            } else {
613                self.expect(Token::Row)?;
614                self.expect(Token::Level)?;
615                self.expect(Token::Security)?;
616                Ok(AlterOperation::DisableRowLevelSecurity)
617            }
618        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
619            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
620            if self.consume_ident_ci("APPEND_ONLY")? {
621                let on = self.parse_bool_assign()?;
622                Ok(AlterOperation::SetAppendOnly(on))
623            } else if self.consume_ident_ci("VERSIONED")? {
624                let on = self.parse_bool_assign()?;
625                Ok(AlterOperation::SetVersioned(on))
626            } else {
627                Err(ParseError::expected(
628                    vec!["APPEND_ONLY", "VERSIONED"],
629                    self.peek(),
630                    self.position(),
631                ))
632            }
633        } else {
634            Err(ParseError::expected(
635                vec![
636                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
637                ],
638                self.peek(),
639                self.position(),
640            ))
641        }
642    }
643
644    fn parse_subscription_descriptor(
645        &mut self,
646        source: String,
647    ) -> Result<SubscriptionDescriptor, ParseError> {
648        let mut ops_filter = Vec::new();
649        if self.consume(&Token::LParen)? {
650            loop {
651                let op = if self.consume(&Token::Insert)? {
652                    SubscriptionOperation::Insert
653                } else if self.consume(&Token::Update)? {
654                    SubscriptionOperation::Update
655                } else if self.consume(&Token::Delete)? {
656                    SubscriptionOperation::Delete
657                } else {
658                    return Err(ParseError::expected(
659                        vec!["INSERT", "UPDATE", "DELETE"],
660                        self.peek(),
661                        self.position(),
662                    ));
663                };
664                ops_filter.push(op);
665                if !self.consume(&Token::Comma)? {
666                    break;
667                }
668            }
669            self.expect(Token::RParen)?;
670        }
671
672        let target_queue = if self.consume(&Token::To)? {
673            self.expect_ident()?
674        } else {
675            format!("{source}_events")
676        };
677
678        let mut redact_fields = Vec::new();
679        if self.consume_ident_ci("REDACT")? {
680            self.expect(Token::LParen)?;
681            loop {
682                redact_fields.push(self.parse_dotted_redact_path()?);
683                if !self.consume(&Token::Comma)? {
684                    break;
685                }
686            }
687            self.expect(Token::RParen)?;
688        }
689
690        let where_filter = if self.consume(&Token::Where)? {
691            Some(self.collect_subscription_where_filter()?)
692        } else {
693            None
694        };
695
696        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
697        let all_tenants = if self.consume(&Token::On)? {
698            self.expect(Token::All)?;
699            if !self.consume_ident_ci("TENANTS")? {
700                return Err(ParseError::expected(
701                    vec!["TENANTS"],
702                    self.peek(),
703                    self.position(),
704                ));
705            }
706            true
707        } else {
708            false
709        };
710
711        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
712        if self.consume_ident_ci("REQUIRES")? {
713            self.consume_ident_ci("CAPABILITY")?;
714            // consume the capability string literal token
715            self.advance()?;
716        }
717
718        Ok(SubscriptionDescriptor {
719            name: String::new(),
720            source,
721            target_queue,
722            ops_filter,
723            where_filter,
724            redact_fields,
725            enabled: true,
726            all_tenants,
727        })
728    }
729
730    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
731    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
732        let mut parts = Vec::new();
733        if self.consume(&Token::Star)? {
734            parts.push("*".to_string());
735        } else {
736            parts.push(self.expect_ident_or_keyword()?);
737        }
738        while self.consume(&Token::Dot)? {
739            if self.consume(&Token::Star)? {
740                parts.push("*".to_string());
741            } else {
742                parts.push(self.expect_ident_or_keyword()?);
743            }
744        }
745        Ok(parts.join("."))
746    }
747
748    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
749        let mut parts = Vec::new();
750        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
751            parts.push(self.peek().to_string());
752            self.advance()?;
753        }
754        if parts.is_empty() {
755            return Err(ParseError::expected(
756                vec!["predicate"],
757                self.peek(),
758                self.position(),
759            ));
760        }
761        Ok(parts.join(" "))
762    }
763
764    /// Capture remaining tokens as a display-joined string.
765    ///
766    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
767    /// bound clause into storage without needing a dedicated per-kind AST.
768    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
769        let mut parts: Vec<String> = Vec::new();
770        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
771            parts.push(self.peek().to_string());
772            self.advance()?;
773        }
774        Ok(parts.join(" "))
775    }
776
777    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
778    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
779        let name = self.expect_column_ident()?;
780        let sql_type = self.parse_column_type()?;
781        let data_type = sql_type.to_string();
782
783        let mut def = CreateColumnDef {
784            name,
785            data_type,
786            sql_type: sql_type.clone(),
787            not_null: false,
788            default: None,
789            compress: None,
790            unique: false,
791            primary_key: false,
792            enum_variants: sql_type.enum_variants().unwrap_or_default(),
793            array_element: sql_type.array_element_type(),
794            decimal_precision: sql_type.decimal_precision(),
795        };
796
797        // Parse modifiers in any order
798        loop {
799            if self.match_not_null()? {
800                def.not_null = true;
801            } else if self.consume(&Token::Default)? {
802                self.expect(Token::Eq)?;
803                def.default = Some(self.parse_literal_string_for_ddl()?);
804            } else if self.consume(&Token::Compress)? {
805                self.expect(Token::Colon)?;
806                def.compress = Some(self.parse_integer()? as u8);
807            } else if self.consume(&Token::Unique)? {
808                def.unique = true;
809            } else if self.match_primary_key()? {
810                def.primary_key = true;
811            } else {
812                break;
813            }
814        }
815
816        Ok(def)
817    }
818
819    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
820    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
821        let type_name = self.expect_ident_or_keyword()?;
822        if self.consume(&Token::LParen)? {
823            let inner = self.parse_type_params()?;
824            self.expect(Token::RParen)?;
825            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
826        } else {
827            Ok(SqlTypeName::new(type_name))
828        }
829    }
830
831    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
832    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
833        let mut parts = Vec::new();
834        loop {
835            match self.peek().clone() {
836                Token::String(s) => {
837                    let s = s.clone();
838                    self.advance()?;
839                    parts.push(TypeModifier::StringLiteral(s));
840                }
841                Token::Integer(n) => {
842                    self.advance()?;
843                    parts.push(TypeModifier::Number(n as u32));
844                }
845                _ => {
846                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
847                }
848            }
849            if !self.consume(&Token::Comma)? {
850                break;
851            }
852        }
853        Ok(parts)
854    }
855
856    /// Parse a literal string value for DDL DEFAULT expressions
857    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
858        match self.peek().clone() {
859            Token::String(s) => {
860                let s = s.clone();
861                self.advance()?;
862                Ok(s)
863            }
864            Token::Integer(n) => {
865                self.advance()?;
866                Ok(n.to_string())
867            }
868            Token::Float(n) => {
869                self.advance()?;
870                Ok(n.to_string())
871            }
872            Token::True => {
873                self.advance()?;
874                Ok("true".to_string())
875            }
876            Token::False => {
877                self.advance()?;
878                Ok("false".to_string())
879            }
880            Token::Null => {
881                self.advance()?;
882                Ok("null".to_string())
883            }
884            ref other => Err(ParseError::expected(
885                vec!["string", "number", "true", "false", "null"],
886                other,
887                self.position(),
888            )),
889        }
890    }
891
892    fn check_ttl_keyword(&self) -> bool {
893        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
894    }
895
896    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
897    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
898    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
899        self.expect(Token::Eq)?;
900        match self.peek() {
901            Token::True => {
902                self.advance()?;
903                Ok(true)
904            }
905            Token::False => {
906                self.advance()?;
907                Ok(false)
908            }
909            other => Err(ParseError::expected(
910                vec!["true", "false"],
911                other,
912                self.position(),
913            )),
914        }
915    }
916
917    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
918        if self.consume_ident_ci(expected)? {
919            Ok(())
920        } else {
921            Err(ParseError::expected(
922                vec![expected],
923                self.peek(),
924                self.position(),
925            ))
926        }
927    }
928
929    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
930        let option_name = self.expect_ident_or_keyword()?;
931        if !option_name.eq_ignore_ascii_case("ttl") {
932            return Err(ParseError::new(
933                // F-05: `option_name` is caller-controlled identifier text.
934                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
935                // before the message reaches downstream serialization sinks.
936                format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
937                self.position(),
938            ));
939        }
940
941        let ttl_value = self.parse_float()?;
942        let ttl_unit = match self.peek() {
943            Token::Ident(unit) => {
944                let unit = unit.clone();
945                self.advance()?;
946                unit
947            }
948            _ => "s".to_string(),
949        };
950
951        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
952            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
953            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
954            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
955            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
956            "d" | "day" | "days" => 86_400_000.0,
957            other => {
958                return Err(ParseError::new(
959                    // F-05: render `other` via `{:?}` so caller-controlled
960                    // bytes (CR / LF / NUL / quotes) are escaped before
961                    // reaching downstream serialization sinks.
962                    format!("unsupported TTL unit {other:?}"),
963                    self.position(),
964                ));
965            }
966        };
967
968        if !ttl_value.is_finite() || ttl_value < 0.0 {
969            return Err(ParseError::new(
970                "TTL must be a finite, non-negative duration".to_string(),
971                self.position(),
972            ));
973        }
974
975        let ttl_ms = ttl_value * multiplier_ms;
976        if ttl_ms > u64::MAX as f64 {
977            return Err(ParseError::new(
978                "TTL duration is too large".to_string(),
979                self.position(),
980            ));
981        }
982        if ttl_ms.fract().abs() >= f64::EPSILON {
983            return Err(ParseError::new(
984                "TTL duration must resolve to a whole number of milliseconds".to_string(),
985                self.position(),
986            ));
987        }
988
989        Ok(Some(ttl_ms as u64))
990    }
991
992    /// Try to match IF NOT EXISTS sequence
993    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
994        if self.check(&Token::If) {
995            self.advance()?;
996            self.expect(Token::Not)?;
997            self.expect(Token::Exists)?;
998            Ok(true)
999        } else {
1000            Ok(false)
1001        }
1002    }
1003
1004    /// Try to match IF EXISTS sequence
1005    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1006        if self.check(&Token::If) {
1007            self.advance()?;
1008            self.expect(Token::Exists)?;
1009            Ok(true)
1010        } else {
1011            Ok(false)
1012        }
1013    }
1014
1015    /// Try to match NOT NULL sequence
1016    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1017        if self.check(&Token::Not) {
1018            // Peek ahead - only consume if followed by NULL
1019            // We need to be careful: save state and try
1020            self.advance()?; // consume NOT
1021            if self.check(&Token::Null) {
1022                self.advance()?; // consume NULL
1023                Ok(true)
1024            } else {
1025                // This is tricky - NOT was consumed but next isn't NULL.
1026                // In column modifier context, NOT should only appear before NULL.
1027                // Return error for clarity.
1028                Err(ParseError::expected(
1029                    vec!["NULL (after NOT)"],
1030                    self.peek(),
1031                    self.position(),
1032                ))
1033            }
1034        } else {
1035            Ok(false)
1036        }
1037    }
1038
1039    /// Try to match PRIMARY KEY sequence
1040    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1041        if self.check(&Token::Primary) {
1042            self.advance()?;
1043            self.expect(Token::Key)?;
1044            Ok(true)
1045        } else {
1046            Ok(false)
1047        }
1048    }
1049}