Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7    PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            context_index_fields,
79            context_index_enabled,
80            timestamps,
81            partition_by: None,
82            tenant_by: None,
83            append_only: false,
84            subscriptions,
85            vault_own_master_key: false,
86        }))
87    }
88
89    /// Parse: DROP TABLE [IF EXISTS] name
90    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
91        self.expect(Token::Drop)?;
92        self.expect(Token::Table)?;
93        self.parse_drop_table_body()
94    }
95
96    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
97    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
98        let if_not_exists = self.match_if_not_exists()?;
99        let name = self.expect_ident()?;
100
101        self.expect(Token::LParen)?;
102        let mut columns = Vec::new();
103        loop {
104            let col = self.parse_column_def()?;
105            columns.push(col);
106            if !self.consume(&Token::Comma)? {
107                break;
108            }
109        }
110        self.expect(Token::RParen)?;
111
112        let mut default_ttl_ms = None;
113        let mut context_index_fields = Vec::new();
114        let mut context_index_enabled = false;
115        let mut timestamps = false;
116        let mut tenant_by: Option<String> = None;
117        let mut append_only = false;
118        let mut subscriptions = Vec::new();
119
120        while self.consume(&Token::With)? {
121            if self.consume_ident_ci("EVENTS")? {
122                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
123                continue;
124            }
125            // Accept both spellings:
126            //   WITH key = value
127            //   WITH (key = value, key = value)
128            // Postgres / ClickHouse use the parenthesised form; the
129            // bare form is our legacy shorthand. The parenthesised
130            // form collects options separated by commas until `)`.
131            let has_parens = self.consume(&Token::LParen)?;
132
133            loop {
134                if self.consume_ident_ci("CONTEXT_INDEX")? {
135                    context_index_enabled = self.parse_bool_assign()?;
136                } else if self.consume_ident_ci("CONTEXT")? {
137                    if !self.consume(&Token::Index)? {
138                        return Err(ParseError::expected(
139                            vec!["INDEX"],
140                            self.peek(),
141                            self.position(),
142                        ));
143                    }
144                    self.expect(Token::On)?;
145                    self.expect(Token::LParen)?;
146                    loop {
147                        context_index_fields.push(self.expect_ident()?);
148                        if !self.consume(&Token::Comma)? {
149                            break;
150                        }
151                    }
152                    self.expect(Token::RParen)?;
153                    context_index_enabled = true;
154                } else if self.consume_ident_ci("TIMESTAMPS")? {
155                    timestamps = self.parse_bool_assign()?;
156                } else if self.consume_ident_ci("APPEND_ONLY")? {
157                    append_only = self.parse_bool_assign()?;
158                } else if self.consume_ident_ci("TENANT_BY")? {
159                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
160                    // and expects a string literal column name.
161                    let _ = self.consume(&Token::Eq)?;
162                    let value = self.parse_literal_value()?;
163                    match value {
164                        Value::Text(col) => tenant_by = Some(col.to_string()),
165                        other => {
166                            return Err(ParseError::new(
167                                format!("WITH tenant_by expects a text literal, got {other:?}"),
168                                self.position(),
169                            ));
170                        }
171                    }
172                } else {
173                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
174                }
175                if has_parens {
176                    if self.consume(&Token::Comma)? {
177                        continue;
178                    }
179                    self.expect(Token::RParen)?;
180                }
181                break;
182            }
183        }
184
185        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
186        let partition_by = if self.consume(&Token::Partition)? {
187            self.expect(Token::By)?;
188            let kind = if self.consume(&Token::Range)? {
189                PartitionKind::Range
190            } else if self.consume(&Token::List)? {
191                PartitionKind::List
192            } else if self.consume(&Token::Hash)? {
193                PartitionKind::Hash
194            } else {
195                return Err(ParseError::expected(
196                    vec!["RANGE", "LIST", "HASH"],
197                    self.peek(),
198                    self.position(),
199                ));
200            };
201            self.expect(Token::LParen)?;
202            let column = self.expect_ident()?;
203            self.expect(Token::RParen)?;
204            Some(PartitionSpec { kind, column })
205        } else {
206            None
207        };
208
209        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
210        // style). Accepted after partition spec / tenant spec / or on
211        // its own. `WITH (append_only = true)` is the other form and
212        // handled above.
213        if !append_only && self.consume_ident_ci("APPEND")? {
214            if !self.consume_ident_ci("ONLY")? {
215                return Err(ParseError::expected(
216                    vec!["ONLY"],
217                    self.peek(),
218                    self.position(),
219                ));
220            }
221            append_only = true;
222        }
223
224        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
225        // trailing clause (after partition spec if both are used).
226        //
227        // Dotted paths let non-table models declare tenancy over their
228        // natural nested structures — `metadata.tenant` for vectors,
229        // `payload.tenant` for queue messages, `tags.cluster` for
230        // timeseries, `properties.org` for graphs. The read-path
231        // resolver already navigates these paths via
232        // `resolve_runtime_document_path`; here we just store the
233        // dotted string and let the policy evaluator do the rest.
234        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
235            self.expect(Token::By)?;
236            self.expect(Token::LParen)?;
237            // Allow keyword-idents (`metadata`, `type`, `data`) as
238            // column names — SQL treats them as bare identifiers in
239            // this context.
240            let mut path = self.expect_ident_or_keyword()?;
241            while self.consume(&Token::Dot)? {
242                let next = self.expect_ident_or_keyword()?;
243                path = format!("{path}.{next}");
244            }
245            self.expect(Token::RParen)?;
246            tenant_by = Some(path);
247        }
248
249        Ok(QueryExpr::CreateTable(CreateTableQuery {
250            collection_model: CollectionModel::Table,
251            name,
252            columns,
253            if_not_exists,
254            default_ttl_ms,
255            context_index_fields,
256            context_index_enabled,
257            timestamps,
258            partition_by,
259            tenant_by,
260            append_only,
261            subscriptions,
262            vault_own_master_key: false,
263        }))
264    }
265
266    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
267    ///
268    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
269    /// difference between the table's current contract and the target CREATE
270    /// TABLE body.
271    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
272        self.expect(Token::Explain)?;
273        self.expect(Token::Alter)?;
274        self.expect(Token::For)?;
275        self.expect(Token::Create)?;
276        self.expect(Token::Table)?;
277
278        let body = self.parse_create_table_body()?;
279        let target = match body {
280            QueryExpr::CreateTable(t) => t,
281            _ => {
282                return Err(ParseError::new(
283                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
284                        .to_string(),
285                    self.position(),
286                ));
287            }
288        };
289
290        let format = if self.consume(&Token::Format)? {
291            if self.consume(&Token::Json)? {
292                ExplainFormat::Json
293            } else if self.consume_ident_ci("SQL")? {
294                ExplainFormat::Sql
295            } else {
296                return Err(ParseError::expected(
297                    vec!["JSON", "SQL"],
298                    self.peek(),
299                    self.position(),
300                ));
301            }
302        } else {
303            ExplainFormat::Sql
304        };
305
306        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
307            target,
308            format,
309        }))
310    }
311
312    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
313    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
314        let if_exists = self.match_if_exists()?;
315        let name = self.parse_drop_collection_name()?;
316        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
317    }
318
319    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
320        let if_exists = self.match_if_exists()?;
321        let name = self.parse_drop_collection_name()?;
322        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
323    }
324
325    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
326        let if_exists = self.match_if_exists()?;
327        let name = self.parse_drop_collection_name()?;
328        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
329    }
330
331    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
332        let if_exists = self.match_if_exists()?;
333        let name = self.parse_drop_collection_name()?;
334        Ok(QueryExpr::DropDocument(DropDocumentQuery {
335            name,
336            if_exists,
337        }))
338    }
339
340    pub fn parse_create_keyed_body(
341        &mut self,
342        model: CollectionModel,
343    ) -> Result<QueryExpr, ParseError> {
344        let if_not_exists = self.match_if_not_exists()?;
345        let name = self.parse_drop_collection_name()?;
346        let vault_own_master_key =
347            if model == CollectionModel::Vault && self.consume(&Token::With)? {
348                if !self.consume_ident_ci("OWN")? {
349                    return Err(ParseError::expected(
350                        vec!["OWN"],
351                        self.peek(),
352                        self.position(),
353                    ));
354                }
355                if !self.consume_ident_ci("MASTER")? {
356                    return Err(ParseError::expected(
357                        vec!["MASTER"],
358                        self.peek(),
359                        self.position(),
360                    ));
361                }
362                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
363                    return Err(ParseError::expected(
364                        vec!["KEY"],
365                        self.peek(),
366                        self.position(),
367                    ));
368                }
369                true
370            } else {
371                false
372            };
373        Ok(QueryExpr::CreateTable(CreateTableQuery {
374            collection_model: model,
375            name,
376            columns: Vec::new(),
377            if_not_exists,
378            default_ttl_ms: None,
379            context_index_fields: Vec::new(),
380            context_index_enabled: false,
381            timestamps: false,
382            partition_by: None,
383            tenant_by: None,
384            append_only: false,
385            subscriptions: Vec::new(),
386            vault_own_master_key,
387        }))
388    }
389
390    pub fn parse_create_collection_model_body(
391        &mut self,
392        model: CollectionModel,
393    ) -> Result<QueryExpr, ParseError> {
394        self.parse_create_keyed_body(model)
395    }
396
397    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
398        let if_not_exists = self.match_if_not_exists()?;
399        let name = self.parse_drop_collection_name()?;
400        if !self.consume_ident_ci("KIND")? {
401            return Err(ParseError::expected(
402                vec!["KIND"],
403                self.peek(),
404                self.position(),
405            ));
406        }
407        let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
408        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
409            name,
410            kind,
411            if_not_exists,
412        }))
413    }
414
415    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
416        let if_not_exists = self.match_if_not_exists()?;
417        let name = self.parse_drop_collection_name()?;
418        if !self.consume_ident_ci("DIM")? {
419            return Err(ParseError::expected(
420                vec!["DIM"],
421                self.peek(),
422                self.position(),
423            ));
424        }
425        let dimension = self.parse_integer()?;
426        if dimension <= 0 {
427            return Err(ParseError::new(
428                "VECTOR DIM must be a positive integer".to_string(),
429                self.position(),
430            ));
431        }
432        let metric = if self.consume(&Token::Metric)? {
433            self.parse_distance_metric()?
434        } else {
435            crate::storage::engine::distance::DistanceMetric::Cosine
436        };
437        Ok(QueryExpr::CreateVector(CreateVectorQuery {
438            name,
439            dimension: dimension as usize,
440            metric,
441            if_not_exists,
442        }))
443    }
444
445    pub fn parse_drop_keyed_body(
446        &mut self,
447        model: CollectionModel,
448    ) -> Result<QueryExpr, ParseError> {
449        let if_exists = self.match_if_exists()?;
450        let name = self.parse_drop_collection_name()?;
451        Ok(QueryExpr::DropKv(DropKvQuery {
452            name,
453            if_exists,
454            model,
455        }))
456    }
457
458    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
459        self.parse_drop_keyed_body(CollectionModel::Kv)
460    }
461
462    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
463        let if_exists = self.match_if_exists()?;
464        let name = self.parse_drop_collection_name()?;
465        Ok(QueryExpr::DropCollection(DropCollectionQuery {
466            name,
467            if_exists,
468        }))
469    }
470
471    pub fn parse_truncate_body(
472        &mut self,
473        model: Option<CollectionModel>,
474    ) -> Result<QueryExpr, ParseError> {
475        let if_exists = self.match_if_exists()?;
476        let name = self.parse_drop_collection_name()?;
477        Ok(QueryExpr::Truncate(TruncateQuery {
478            name,
479            model,
480            if_exists,
481        }))
482    }
483
484    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
485        let mut name = self.expect_ident()?;
486        while self.consume(&Token::Dot)? {
487            if self.consume(&Token::Star)? {
488                name.push_str(".*");
489                break;
490            }
491            let next = self.expect_ident_or_keyword()?;
492            name = format!("{name}.{next}");
493        }
494        Ok(name)
495    }
496
497    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
498    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
499        self.expect(Token::Alter)?;
500        self.expect(Token::Table)?;
501        let name = self.expect_ident()?;
502
503        let mut operations = Vec::new();
504        loop {
505            let op = self.parse_alter_operation(&name)?;
506            operations.push(op);
507            if !self.consume(&Token::Comma)? {
508                break;
509            }
510        }
511
512        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
513    }
514
515    /// Parse a single ALTER TABLE operation
516    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
517        if self.consume(&Token::Add)? {
518            if self.consume_ident_ci("SUBSCRIPTION")? {
519                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
520                let sub_name = self.expect_ident()?;
521                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
522                Ok(AlterOperation::AddSubscription {
523                    name: sub_name,
524                    descriptor,
525                })
526            } else {
527                // ADD COLUMN definition (COLUMN keyword is optional)
528                let _ = self.consume(&Token::Column)?;
529                let col_def = self.parse_column_def()?;
530                Ok(AlterOperation::AddColumn(col_def))
531            }
532        } else if self.consume(&Token::Drop)? {
533            if self.consume_ident_ci("SUBSCRIPTION")? {
534                // DROP SUBSCRIPTION name
535                let sub_name = self.expect_ident()?;
536                Ok(AlterOperation::DropSubscription { name: sub_name })
537            } else {
538                // DROP COLUMN name (COLUMN keyword is optional)
539                let _ = self.consume(&Token::Column)?;
540                let col_name = self.expect_ident()?;
541                Ok(AlterOperation::DropColumn(col_name))
542            }
543        } else if self.consume(&Token::Rename)? {
544            // RENAME COLUMN from TO to
545            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
546            let from = self.expect_ident()?;
547            self.expect(Token::To)?;
548            let to = self.expect_ident()?;
549            Ok(AlterOperation::RenameColumn { from, to })
550        } else if self.consume(&Token::Attach)? {
551            // ATTACH PARTITION child FOR VALUES ...
552            self.expect(Token::Partition)?;
553            let child = self.expect_ident()?;
554            self.expect(Token::For)?;
555            // Accept `VALUES` as an ident since the grammar doesn't have it
556            // as a reserved keyword everywhere. Collect the remaining tokens
557            // as a raw bound string for round-trip persistence.
558            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
559                return Err(ParseError::expected(
560                    vec!["VALUES"],
561                    self.peek(),
562                    self.position(),
563                ));
564            }
565            let bound = self.collect_remaining_tokens_as_string()?;
566            Ok(AlterOperation::AttachPartition { child, bound })
567        } else if self.consume(&Token::Detach)? {
568            // DETACH PARTITION child
569            self.expect(Token::Partition)?;
570            let child = self.expect_ident()?;
571            Ok(AlterOperation::DetachPartition { child })
572        } else if self.consume(&Token::Enable)? {
573            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
574            if self.consume_ident_ci("EVENTS")? {
575                Ok(AlterOperation::EnableEvents(
576                    self.parse_subscription_descriptor(table_name.to_string())?,
577                ))
578            } else if self.consume_ident_ci("TENANCY")? {
579                self.expect(Token::On)?;
580                self.expect(Token::LParen)?;
581                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
582                let mut path = self.expect_ident_or_keyword()?;
583                while self.consume(&Token::Dot)? {
584                    let next = self.expect_ident_or_keyword()?;
585                    path = format!("{path}.{next}");
586                }
587                self.expect(Token::RParen)?;
588                Ok(AlterOperation::EnableTenancy { column: path })
589            } else {
590                self.expect(Token::Row)?;
591                self.expect(Token::Level)?;
592                self.expect(Token::Security)?;
593                Ok(AlterOperation::EnableRowLevelSecurity)
594            }
595        } else if self.consume(&Token::Disable)? {
596            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
597            if self.consume_ident_ci("EVENTS")? {
598                Ok(AlterOperation::DisableEvents)
599            } else if self.consume_ident_ci("TENANCY")? {
600                Ok(AlterOperation::DisableTenancy)
601            } else {
602                self.expect(Token::Row)?;
603                self.expect(Token::Level)?;
604                self.expect(Token::Security)?;
605                Ok(AlterOperation::DisableRowLevelSecurity)
606            }
607        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
608            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
609            if self.consume_ident_ci("APPEND_ONLY")? {
610                let on = self.parse_bool_assign()?;
611                Ok(AlterOperation::SetAppendOnly(on))
612            } else if self.consume_ident_ci("VERSIONED")? {
613                let on = self.parse_bool_assign()?;
614                Ok(AlterOperation::SetVersioned(on))
615            } else {
616                Err(ParseError::expected(
617                    vec!["APPEND_ONLY", "VERSIONED"],
618                    self.peek(),
619                    self.position(),
620                ))
621            }
622        } else {
623            Err(ParseError::expected(
624                vec![
625                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
626                ],
627                self.peek(),
628                self.position(),
629            ))
630        }
631    }
632
633    fn parse_subscription_descriptor(
634        &mut self,
635        source: String,
636    ) -> Result<SubscriptionDescriptor, ParseError> {
637        let mut ops_filter = Vec::new();
638        if self.consume(&Token::LParen)? {
639            loop {
640                let op = if self.consume(&Token::Insert)? {
641                    SubscriptionOperation::Insert
642                } else if self.consume(&Token::Update)? {
643                    SubscriptionOperation::Update
644                } else if self.consume(&Token::Delete)? {
645                    SubscriptionOperation::Delete
646                } else {
647                    return Err(ParseError::expected(
648                        vec!["INSERT", "UPDATE", "DELETE"],
649                        self.peek(),
650                        self.position(),
651                    ));
652                };
653                ops_filter.push(op);
654                if !self.consume(&Token::Comma)? {
655                    break;
656                }
657            }
658            self.expect(Token::RParen)?;
659        }
660
661        let target_queue = if self.consume(&Token::To)? {
662            self.expect_ident()?
663        } else {
664            format!("{source}_events")
665        };
666
667        let mut redact_fields = Vec::new();
668        if self.consume_ident_ci("REDACT")? {
669            self.expect(Token::LParen)?;
670            loop {
671                redact_fields.push(self.parse_dotted_redact_path()?);
672                if !self.consume(&Token::Comma)? {
673                    break;
674                }
675            }
676            self.expect(Token::RParen)?;
677        }
678
679        let where_filter = if self.consume(&Token::Where)? {
680            Some(self.collect_subscription_where_filter()?)
681        } else {
682            None
683        };
684
685        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
686        let all_tenants = if self.consume(&Token::On)? {
687            self.expect(Token::All)?;
688            if !self.consume_ident_ci("TENANTS")? {
689                return Err(ParseError::expected(
690                    vec!["TENANTS"],
691                    self.peek(),
692                    self.position(),
693                ));
694            }
695            true
696        } else {
697            false
698        };
699
700        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
701        if self.consume_ident_ci("REQUIRES")? {
702            self.consume_ident_ci("CAPABILITY")?;
703            // consume the capability string literal token
704            self.advance()?;
705        }
706
707        Ok(SubscriptionDescriptor {
708            name: String::new(),
709            source,
710            target_queue,
711            ops_filter,
712            where_filter,
713            redact_fields,
714            enabled: true,
715            all_tenants,
716        })
717    }
718
719    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
720    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
721        let mut parts = Vec::new();
722        if self.consume(&Token::Star)? {
723            parts.push("*".to_string());
724        } else {
725            parts.push(self.expect_ident_or_keyword()?);
726        }
727        while self.consume(&Token::Dot)? {
728            if self.consume(&Token::Star)? {
729                parts.push("*".to_string());
730            } else {
731                parts.push(self.expect_ident_or_keyword()?);
732            }
733        }
734        Ok(parts.join("."))
735    }
736
737    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
738        let mut parts = Vec::new();
739        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
740            parts.push(self.peek().to_string());
741            self.advance()?;
742        }
743        if parts.is_empty() {
744            return Err(ParseError::expected(
745                vec!["predicate"],
746                self.peek(),
747                self.position(),
748            ));
749        }
750        Ok(parts.join(" "))
751    }
752
753    /// Capture remaining tokens as a display-joined string.
754    ///
755    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
756    /// bound clause into storage without needing a dedicated per-kind AST.
757    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
758        let mut parts: Vec<String> = Vec::new();
759        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
760            parts.push(self.peek().to_string());
761            self.advance()?;
762        }
763        Ok(parts.join(" "))
764    }
765
766    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
767    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
768        let name = self.expect_column_ident()?;
769        let sql_type = self.parse_column_type()?;
770        let data_type = sql_type.to_string();
771
772        let mut def = CreateColumnDef {
773            name,
774            data_type,
775            sql_type: sql_type.clone(),
776            not_null: false,
777            default: None,
778            compress: None,
779            unique: false,
780            primary_key: false,
781            enum_variants: sql_type.enum_variants().unwrap_or_default(),
782            array_element: sql_type.array_element_type(),
783            decimal_precision: sql_type.decimal_precision(),
784        };
785
786        // Parse modifiers in any order
787        loop {
788            if self.match_not_null()? {
789                def.not_null = true;
790            } else if self.consume(&Token::Default)? {
791                self.expect(Token::Eq)?;
792                def.default = Some(self.parse_literal_string_for_ddl()?);
793            } else if self.consume(&Token::Compress)? {
794                self.expect(Token::Colon)?;
795                def.compress = Some(self.parse_integer()? as u8);
796            } else if self.consume(&Token::Unique)? {
797                def.unique = true;
798            } else if self.match_primary_key()? {
799                def.primary_key = true;
800            } else {
801                break;
802            }
803        }
804
805        Ok(def)
806    }
807
808    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
809    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
810        let type_name = self.expect_ident_or_keyword()?;
811        if self.consume(&Token::LParen)? {
812            let inner = self.parse_type_params()?;
813            self.expect(Token::RParen)?;
814            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
815        } else {
816            Ok(SqlTypeName::new(type_name))
817        }
818    }
819
820    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
821    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
822        let mut parts = Vec::new();
823        loop {
824            match self.peek().clone() {
825                Token::String(s) => {
826                    let s = s.clone();
827                    self.advance()?;
828                    parts.push(TypeModifier::StringLiteral(s));
829                }
830                Token::Integer(n) => {
831                    self.advance()?;
832                    parts.push(TypeModifier::Number(n as u32));
833                }
834                _ => {
835                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
836                }
837            }
838            if !self.consume(&Token::Comma)? {
839                break;
840            }
841        }
842        Ok(parts)
843    }
844
845    /// Parse a literal string value for DDL DEFAULT expressions
846    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
847        match self.peek().clone() {
848            Token::String(s) => {
849                let s = s.clone();
850                self.advance()?;
851                Ok(s)
852            }
853            Token::Integer(n) => {
854                self.advance()?;
855                Ok(n.to_string())
856            }
857            Token::Float(n) => {
858                self.advance()?;
859                Ok(n.to_string())
860            }
861            Token::True => {
862                self.advance()?;
863                Ok("true".to_string())
864            }
865            Token::False => {
866                self.advance()?;
867                Ok("false".to_string())
868            }
869            Token::Null => {
870                self.advance()?;
871                Ok("null".to_string())
872            }
873            ref other => Err(ParseError::expected(
874                vec!["string", "number", "true", "false", "null"],
875                other,
876                self.position(),
877            )),
878        }
879    }
880
881    fn check_ttl_keyword(&self) -> bool {
882        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
883    }
884
885    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
886    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
887    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
888        self.expect(Token::Eq)?;
889        match self.peek() {
890            Token::True => {
891                self.advance()?;
892                Ok(true)
893            }
894            Token::False => {
895                self.advance()?;
896                Ok(false)
897            }
898            other => Err(ParseError::expected(
899                vec!["true", "false"],
900                other,
901                self.position(),
902            )),
903        }
904    }
905
906    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
907        if self.consume_ident_ci(expected)? {
908            Ok(())
909        } else {
910            Err(ParseError::expected(
911                vec![expected],
912                self.peek(),
913                self.position(),
914            ))
915        }
916    }
917
918    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
919        let option_name = self.expect_ident_or_keyword()?;
920        if !option_name.eq_ignore_ascii_case("ttl") {
921            return Err(ParseError::new(
922                // F-05: `option_name` is caller-controlled identifier text.
923                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
924                // before the message reaches downstream serialization sinks.
925                format!("unsupported CREATE TABLE option {option_name:?}, expected TTL"),
926                self.position(),
927            ));
928        }
929
930        let ttl_value = self.parse_float()?;
931        let ttl_unit = match self.peek() {
932            Token::Ident(unit) => {
933                let unit = unit.clone();
934                self.advance()?;
935                unit
936            }
937            _ => "s".to_string(),
938        };
939
940        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
941            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
942            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
943            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
944            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
945            "d" | "day" | "days" => 86_400_000.0,
946            other => {
947                return Err(ParseError::new(
948                    // F-05: render `other` via `{:?}` so caller-controlled
949                    // bytes (CR / LF / NUL / quotes) are escaped before
950                    // reaching downstream serialization sinks.
951                    format!("unsupported TTL unit {other:?}"),
952                    self.position(),
953                ));
954            }
955        };
956
957        if !ttl_value.is_finite() || ttl_value < 0.0 {
958            return Err(ParseError::new(
959                "TTL must be a finite, non-negative duration".to_string(),
960                self.position(),
961            ));
962        }
963
964        let ttl_ms = ttl_value * multiplier_ms;
965        if ttl_ms > u64::MAX as f64 {
966            return Err(ParseError::new(
967                "TTL duration is too large".to_string(),
968                self.position(),
969            ));
970        }
971        if ttl_ms.fract().abs() >= f64::EPSILON {
972            return Err(ParseError::new(
973                "TTL duration must resolve to a whole number of milliseconds".to_string(),
974                self.position(),
975            ));
976        }
977
978        Ok(Some(ttl_ms as u64))
979    }
980
981    /// Try to match IF NOT EXISTS sequence
982    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
983        if self.check(&Token::If) {
984            self.advance()?;
985            self.expect(Token::Not)?;
986            self.expect(Token::Exists)?;
987            Ok(true)
988        } else {
989            Ok(false)
990        }
991    }
992
993    /// Try to match IF EXISTS sequence
994    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
995        if self.check(&Token::If) {
996            self.advance()?;
997            self.expect(Token::Exists)?;
998            Ok(true)
999        } else {
1000            Ok(false)
1001        }
1002    }
1003
1004    /// Try to match NOT NULL sequence
1005    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1006        if self.check(&Token::Not) {
1007            // Peek ahead - only consume if followed by NULL
1008            // We need to be careful: save state and try
1009            self.advance()?; // consume NOT
1010            if self.check(&Token::Null) {
1011                self.advance()?; // consume NULL
1012                Ok(true)
1013            } else {
1014                // This is tricky - NOT was consumed but next isn't NULL.
1015                // In column modifier context, NOT should only appear before NULL.
1016                // Return error for clarity.
1017                Err(ParseError::expected(
1018                    vec!["NULL (after NOT)"],
1019                    self.peek(),
1020                    self.position(),
1021                ))
1022            }
1023        } else {
1024            Ok(false)
1025        }
1026    }
1027
1028    /// Try to match PRIMARY KEY sequence
1029    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1030        if self.check(&Token::Primary) {
1031            self.advance()?;
1032            self.expect(Token::Key)?;
1033            Ok(true)
1034        } else {
1035            Ok(false)
1036        }
1037    }
1038}