Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7    PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            vault_own_master_key: false,
87        }))
88    }
89
90    /// Parse: DROP TABLE [IF EXISTS] name
91    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92        self.expect(Token::Drop)?;
93        self.expect(Token::Table)?;
94        self.parse_drop_table_body()
95    }
96
97    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
98    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99        let if_not_exists = self.match_if_not_exists()?;
100        let name = self.expect_ident()?;
101
102        self.expect(Token::LParen)?;
103        let mut columns = Vec::new();
104        loop {
105            let col = self.parse_column_def()?;
106            columns.push(col);
107            if !self.consume(&Token::Comma)? {
108                break;
109            }
110        }
111        self.expect(Token::RParen)?;
112
113        let mut default_ttl_ms = None;
114        let mut context_index_fields = Vec::new();
115        let mut context_index_enabled = false;
116        let mut timestamps = false;
117        let mut tenant_by: Option<String> = None;
118        let mut append_only = false;
119        let mut subscriptions = Vec::new();
120
121        while self.consume(&Token::With)? {
122            if self.consume_ident_ci("EVENTS")? {
123                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124                continue;
125            }
126            // Accept both spellings:
127            //   WITH key = value
128            //   WITH (key = value, key = value)
129            // Postgres / ClickHouse use the parenthesised form; the
130            // bare form is our legacy shorthand. The parenthesised
131            // form collects options separated by commas until `)`.
132            let has_parens = self.consume(&Token::LParen)?;
133
134            loop {
135                if self.consume_ident_ci("CONTEXT_INDEX")? {
136                    context_index_enabled = self.parse_bool_assign()?;
137                } else if self.consume_ident_ci("CONTEXT")? {
138                    if !self.consume(&Token::Index)? {
139                        return Err(ParseError::expected(
140                            vec!["INDEX"],
141                            self.peek(),
142                            self.position(),
143                        ));
144                    }
145                    self.expect(Token::On)?;
146                    self.expect(Token::LParen)?;
147                    loop {
148                        context_index_fields.push(self.expect_ident()?);
149                        if !self.consume(&Token::Comma)? {
150                            break;
151                        }
152                    }
153                    self.expect(Token::RParen)?;
154                    context_index_enabled = true;
155                } else if self.consume_ident_ci("TIMESTAMPS")? {
156                    timestamps = self.parse_bool_assign()?;
157                } else if self.consume_ident_ci("APPEND_ONLY")? {
158                    append_only = self.parse_bool_assign()?;
159                } else if self.consume_ident_ci("TENANT_BY")? {
160                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
161                    // and expects a string literal column name.
162                    let _ = self.consume(&Token::Eq)?;
163                    let value = self.parse_literal_value()?;
164                    match value {
165                        Value::Text(col) => tenant_by = Some(col.to_string()),
166                        other => {
167                            return Err(ParseError::new(
168                                format!("WITH tenant_by expects a text literal, got {other:?}"),
169                                self.position(),
170                            ));
171                        }
172                    }
173                } else {
174                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
175                }
176                if has_parens {
177                    if self.consume(&Token::Comma)? {
178                        continue;
179                    }
180                    self.expect(Token::RParen)?;
181                }
182                break;
183            }
184        }
185
186        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
187        let partition_by = if self.consume(&Token::Partition)? {
188            self.expect(Token::By)?;
189            let kind = if self.consume(&Token::Range)? {
190                PartitionKind::Range
191            } else if self.consume(&Token::List)? {
192                PartitionKind::List
193            } else if self.consume(&Token::Hash)? {
194                PartitionKind::Hash
195            } else {
196                return Err(ParseError::expected(
197                    vec!["RANGE", "LIST", "HASH"],
198                    self.peek(),
199                    self.position(),
200                ));
201            };
202            self.expect(Token::LParen)?;
203            let column = self.expect_ident()?;
204            self.expect(Token::RParen)?;
205            Some(PartitionSpec { kind, column })
206        } else {
207            None
208        };
209
210        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
211        // style). Accepted after partition spec / tenant spec / or on
212        // its own. `WITH (append_only = true)` is the other form and
213        // handled above.
214        if !append_only && self.consume_ident_ci("APPEND")? {
215            if !self.consume_ident_ci("ONLY")? {
216                return Err(ParseError::expected(
217                    vec!["ONLY"],
218                    self.peek(),
219                    self.position(),
220                ));
221            }
222            append_only = true;
223        }
224
225        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
226        // trailing clause (after partition spec if both are used).
227        //
228        // Dotted paths let non-table models declare tenancy over their
229        // natural nested structures — `metadata.tenant` for vectors,
230        // `payload.tenant` for queue messages, `tags.cluster` for
231        // timeseries, `properties.org` for graphs. The read-path
232        // resolver already navigates these paths via
233        // `resolve_runtime_document_path`; here we just store the
234        // dotted string and let the policy evaluator do the rest.
235        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236            self.expect(Token::By)?;
237            self.expect(Token::LParen)?;
238            // Allow keyword-idents (`metadata`, `type`, `data`) as
239            // column names — SQL treats them as bare identifiers in
240            // this context.
241            let mut path = self.expect_ident_or_keyword()?;
242            while self.consume(&Token::Dot)? {
243                let next = self.expect_ident_or_keyword()?;
244                path = format!("{path}.{next}");
245            }
246            self.expect(Token::RParen)?;
247            tenant_by = Some(path);
248        }
249
250        Ok(QueryExpr::CreateTable(CreateTableQuery {
251            collection_model: CollectionModel::Table,
252            name,
253            columns,
254            if_not_exists,
255            default_ttl_ms,
256            metrics_rollup_policies: Vec::new(),
257            context_index_fields,
258            context_index_enabled,
259            timestamps,
260            partition_by,
261            tenant_by,
262            append_only,
263            subscriptions,
264            vault_own_master_key: false,
265        }))
266    }
267
268    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
269    ///
270    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
271    /// difference between the table's current contract and the target CREATE
272    /// TABLE body.
273    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274        self.expect(Token::Explain)?;
275        self.expect(Token::Alter)?;
276        self.expect(Token::For)?;
277        self.expect(Token::Create)?;
278        self.expect(Token::Table)?;
279
280        let body = self.parse_create_table_body()?;
281        let target = match body {
282            QueryExpr::CreateTable(t) => t,
283            _ => {
284                return Err(ParseError::new(
285                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286                        .to_string(),
287                    self.position(),
288                ));
289            }
290        };
291
292        let format = if self.consume(&Token::Format)? {
293            if self.consume(&Token::Json)? {
294                ExplainFormat::Json
295            } else if self.consume_ident_ci("SQL")? {
296                ExplainFormat::Sql
297            } else {
298                return Err(ParseError::expected(
299                    vec!["JSON", "SQL"],
300                    self.peek(),
301                    self.position(),
302                ));
303            }
304        } else {
305            ExplainFormat::Sql
306        };
307
308        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309            target,
310            format,
311        }))
312    }
313
314    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
315    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316        let if_exists = self.match_if_exists()?;
317        let name = self.parse_drop_collection_name()?;
318        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319    }
320
321    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322        let if_exists = self.match_if_exists()?;
323        let name = self.parse_drop_collection_name()?;
324        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325    }
326
327    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328        let if_exists = self.match_if_exists()?;
329        let name = self.parse_drop_collection_name()?;
330        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331    }
332
333    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334        let if_exists = self.match_if_exists()?;
335        let name = self.parse_drop_collection_name()?;
336        Ok(QueryExpr::DropDocument(DropDocumentQuery {
337            name,
338            if_exists,
339        }))
340    }
341
342    pub fn parse_create_keyed_body(
343        &mut self,
344        model: CollectionModel,
345    ) -> Result<QueryExpr, ParseError> {
346        let if_not_exists = self.match_if_not_exists()?;
347        let name = self.parse_drop_collection_name()?;
348        let vault_own_master_key =
349            if model == CollectionModel::Vault && self.consume(&Token::With)? {
350                if !self.consume_ident_ci("OWN")? {
351                    return Err(ParseError::expected(
352                        vec!["OWN"],
353                        self.peek(),
354                        self.position(),
355                    ));
356                }
357                if !self.consume_ident_ci("MASTER")? {
358                    return Err(ParseError::expected(
359                        vec!["MASTER"],
360                        self.peek(),
361                        self.position(),
362                    ));
363                }
364                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365                    return Err(ParseError::expected(
366                        vec!["KEY"],
367                        self.peek(),
368                        self.position(),
369                    ));
370                }
371                true
372            } else {
373                false
374            };
375        Ok(QueryExpr::CreateTable(CreateTableQuery {
376            collection_model: model,
377            name,
378            columns: Vec::new(),
379            if_not_exists,
380            default_ttl_ms: None,
381            metrics_rollup_policies: Vec::new(),
382            context_index_fields: Vec::new(),
383            context_index_enabled: false,
384            timestamps: false,
385            partition_by: None,
386            tenant_by: None,
387            append_only: false,
388            subscriptions: Vec::new(),
389            vault_own_master_key,
390        }))
391    }
392
393    pub fn parse_create_collection_model_body(
394        &mut self,
395        model: CollectionModel,
396    ) -> Result<QueryExpr, ParseError> {
397        self.parse_create_keyed_body(model)
398    }
399
400    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401        let if_not_exists = self.match_if_not_exists()?;
402        let name = self.parse_drop_collection_name()?;
403        if !self.consume_ident_ci("KIND")? {
404            return Err(ParseError::expected(
405                vec!["KIND"],
406                self.peek(),
407                self.position(),
408            ));
409        }
410        let kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411        let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
412            self.parse_signed_by_list()?
413        } else {
414            Vec::new()
415        };
416        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
417            name,
418            kind,
419            if_not_exists,
420            allowed_signers,
421        }))
422    }
423
424    /// Parse a single `'hex32'` string literal as a 32-byte Ed25519
425    /// pubkey. Used by `ALTER COLLECTION ... ADD|REVOKE SIGNER 'hex'`
426    /// (issue #522).
427    fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
428        let hex = match self.peek().clone() {
429            Token::String(s) => {
430                self.advance()?;
431                s
432            }
433            _ => {
434                return Err(ParseError::expected(
435                    vec!["string literal (ed25519 pubkey hex)"],
436                    self.peek(),
437                    self.position(),
438                ));
439            }
440        };
441        decode_hex_32(&hex).map_err(|msg| {
442            ParseError::new(
443                format!("SIGNER pubkey '{hex}' invalid: {msg}"),
444                self.position(),
445            )
446        })
447    }
448
449    /// Parse `( 'hex32', 'hex32', ... )` — Ed25519 pubkey list. Each entry
450    /// must decode to exactly 32 bytes. Used by both `CREATE COLLECTION ...
451    /// SIGNED_BY (...)` and (in a later iteration) `ALTER COLLECTION` signer
452    /// mutations. Issue #520.
453    fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
454        self.expect(Token::LParen)?;
455        let mut out = Vec::new();
456        loop {
457            let hex = match self.peek().clone() {
458                Token::String(s) => {
459                    self.advance()?;
460                    s
461                }
462                _ => {
463                    return Err(ParseError::expected(
464                        vec!["string literal (ed25519 pubkey hex)"],
465                        self.peek(),
466                        self.position(),
467                    ));
468                }
469            };
470            let bytes = decode_hex_32(&hex).map_err(|msg| {
471                ParseError::new(
472                    format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
473                    self.position(),
474                )
475            })?;
476            out.push(bytes);
477            if !self.consume(&Token::Comma)? {
478                break;
479            }
480        }
481        self.expect(Token::RParen)?;
482        if out.is_empty() {
483            return Err(ParseError::new(
484                "SIGNED_BY list must contain at least one pubkey".to_string(),
485                self.position(),
486            ));
487        }
488        Ok(out)
489    }
490
491    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
492        let if_not_exists = self.match_if_not_exists()?;
493        let name = self.parse_drop_collection_name()?;
494        if !self.consume_ident_ci("DIM")? {
495            return Err(ParseError::expected(
496                vec!["DIM"],
497                self.peek(),
498                self.position(),
499            ));
500        }
501        let dimension = self.parse_integer()?;
502        if dimension <= 0 {
503            return Err(ParseError::new(
504                "VECTOR DIM must be a positive integer".to_string(),
505                self.position(),
506            ));
507        }
508        let metric = if self.consume(&Token::Metric)? {
509            self.parse_distance_metric()?
510        } else {
511            crate::storage::engine::distance::DistanceMetric::Cosine
512        };
513        Ok(QueryExpr::CreateVector(CreateVectorQuery {
514            name,
515            dimension: dimension as usize,
516            metric,
517            if_not_exists,
518        }))
519    }
520
521    pub fn parse_drop_keyed_body(
522        &mut self,
523        model: CollectionModel,
524    ) -> Result<QueryExpr, ParseError> {
525        let if_exists = self.match_if_exists()?;
526        let name = self.parse_drop_collection_name()?;
527        Ok(QueryExpr::DropKv(DropKvQuery {
528            name,
529            if_exists,
530            model,
531        }))
532    }
533
534    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
535        self.parse_drop_keyed_body(CollectionModel::Kv)
536    }
537
538    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
539        self.parse_drop_collection_model_body(None)
540    }
541
542    pub fn parse_drop_collection_model_body(
543        &mut self,
544        model: Option<CollectionModel>,
545    ) -> Result<QueryExpr, ParseError> {
546        let if_exists = self.match_if_exists()?;
547        let name = self.parse_drop_collection_name()?;
548        Ok(QueryExpr::DropCollection(DropCollectionQuery {
549            name,
550            if_exists,
551            model,
552        }))
553    }
554
555    pub fn parse_truncate_body(
556        &mut self,
557        model: Option<CollectionModel>,
558    ) -> Result<QueryExpr, ParseError> {
559        let if_exists = self.match_if_exists()?;
560        let name = self.parse_drop_collection_name()?;
561        Ok(QueryExpr::Truncate(TruncateQuery {
562            name,
563            model,
564            if_exists,
565        }))
566    }
567
568    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
569        let mut name = self.expect_ident()?;
570        while self.consume(&Token::Dot)? {
571            if self.consume(&Token::Star)? {
572                name.push_str(".*");
573                break;
574            }
575            let next = self.expect_ident_or_keyword()?;
576            name = format!("{name}.{next}");
577        }
578        Ok(name)
579    }
580
581    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
582    ///
583    /// Also accepts `ALTER COLLECTION name ADD|REVOKE SIGNER 'hex'`
584    /// (issue #522) — collection-level signer registry mutations share
585    /// the AlterTable AST so the existing executor dispatch path picks
586    /// them up without a new top-level variant.
587    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
588        self.expect(Token::Alter)?;
589        if !self.consume(&Token::Table)?
590            && !self.consume(&Token::Collection)?
591            && !self.consume_ident_ci("COLLECTION")?
592        {
593            return Err(ParseError::expected(
594                vec!["TABLE", "COLLECTION"],
595                self.peek(),
596                self.position(),
597            ));
598        }
599        let name = self.expect_ident()?;
600
601        let mut operations = Vec::new();
602        loop {
603            let op = self.parse_alter_operation(&name)?;
604            operations.push(op);
605            if !self.consume(&Token::Comma)? {
606                break;
607            }
608        }
609
610        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
611    }
612
613    /// Parse a single ALTER TABLE operation
614    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
615        if self.consume(&Token::Add)? {
616            if self.consume_ident_ci("SUBSCRIPTION")? {
617                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
618                let sub_name = self.expect_ident()?;
619                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
620                Ok(AlterOperation::AddSubscription {
621                    name: sub_name,
622                    descriptor,
623                })
624            } else if self.consume_ident_ci("SIGNER")? {
625                // ADD SIGNER 'hex_pubkey' — issue #522.
626                let pubkey = self.parse_single_signer_hex()?;
627                Ok(AlterOperation::AddSigner { pubkey })
628            } else {
629                // ADD COLUMN definition (COLUMN keyword is optional)
630                let _ = self.consume(&Token::Column)?;
631                let col_def = self.parse_column_def()?;
632                Ok(AlterOperation::AddColumn(col_def))
633            }
634        } else if self.consume_ident_ci("REVOKE")? {
635            // REVOKE SIGNER 'hex_pubkey' — issue #522.
636            if !self.consume_ident_ci("SIGNER")? {
637                return Err(ParseError::expected(
638                    vec!["SIGNER"],
639                    self.peek(),
640                    self.position(),
641                ));
642            }
643            let pubkey = self.parse_single_signer_hex()?;
644            Ok(AlterOperation::RevokeSigner { pubkey })
645        } else if self.consume(&Token::Drop)? {
646            if self.consume_ident_ci("SUBSCRIPTION")? {
647                // DROP SUBSCRIPTION name
648                let sub_name = self.expect_ident()?;
649                Ok(AlterOperation::DropSubscription { name: sub_name })
650            } else {
651                // DROP COLUMN name (COLUMN keyword is optional)
652                let _ = self.consume(&Token::Column)?;
653                let col_name = self.expect_ident()?;
654                Ok(AlterOperation::DropColumn(col_name))
655            }
656        } else if self.consume(&Token::Rename)? {
657            // RENAME COLUMN from TO to
658            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
659            let from = self.expect_ident()?;
660            self.expect(Token::To)?;
661            let to = self.expect_ident()?;
662            Ok(AlterOperation::RenameColumn { from, to })
663        } else if self.consume(&Token::Attach)? {
664            // ATTACH PARTITION child FOR VALUES ...
665            self.expect(Token::Partition)?;
666            let child = self.expect_ident()?;
667            self.expect(Token::For)?;
668            // Accept `VALUES` as an ident since the grammar doesn't have it
669            // as a reserved keyword everywhere. Collect the remaining tokens
670            // as a raw bound string for round-trip persistence.
671            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
672                return Err(ParseError::expected(
673                    vec!["VALUES"],
674                    self.peek(),
675                    self.position(),
676                ));
677            }
678            let bound = self.collect_remaining_tokens_as_string()?;
679            Ok(AlterOperation::AttachPartition { child, bound })
680        } else if self.consume(&Token::Detach)? {
681            // DETACH PARTITION child
682            self.expect(Token::Partition)?;
683            let child = self.expect_ident()?;
684            Ok(AlterOperation::DetachPartition { child })
685        } else if self.consume(&Token::Enable)? {
686            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
687            if self.consume_ident_ci("EVENTS")? {
688                Ok(AlterOperation::EnableEvents(
689                    self.parse_subscription_descriptor(table_name.to_string())?,
690                ))
691            } else if self.consume_ident_ci("TENANCY")? {
692                self.expect(Token::On)?;
693                self.expect(Token::LParen)?;
694                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
695                let mut path = self.expect_ident_or_keyword()?;
696                while self.consume(&Token::Dot)? {
697                    let next = self.expect_ident_or_keyword()?;
698                    path = format!("{path}.{next}");
699                }
700                self.expect(Token::RParen)?;
701                Ok(AlterOperation::EnableTenancy { column: path })
702            } else {
703                self.expect(Token::Row)?;
704                self.expect(Token::Level)?;
705                self.expect(Token::Security)?;
706                Ok(AlterOperation::EnableRowLevelSecurity)
707            }
708        } else if self.consume(&Token::Disable)? {
709            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
710            if self.consume_ident_ci("EVENTS")? {
711                Ok(AlterOperation::DisableEvents)
712            } else if self.consume_ident_ci("TENANCY")? {
713                Ok(AlterOperation::DisableTenancy)
714            } else {
715                self.expect(Token::Row)?;
716                self.expect(Token::Level)?;
717                self.expect(Token::Security)?;
718                Ok(AlterOperation::DisableRowLevelSecurity)
719            }
720        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
721            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
722            // SET RETENTION <duration> (issue #580)
723            if self.consume_ident_ci("APPEND_ONLY")? {
724                let on = self.parse_bool_assign()?;
725                Ok(AlterOperation::SetAppendOnly(on))
726            } else if self.consume_ident_ci("VERSIONED")? {
727                let on = self.parse_bool_assign()?;
728                Ok(AlterOperation::SetVersioned(on))
729            } else if self.consume(&Token::Retention)? {
730                // `SET RETENTION <duration>` — reuse the same float+unit
731                // grammar the timeseries CREATE clause uses so `7 DAYS`,
732                // `30 m`, `1 h`, `90 d` all parse identically.
733                let value = self.parse_float()?;
734                let unit = self.parse_duration_unit()?;
735                Ok(AlterOperation::SetRetention {
736                    duration_ms: (value * unit) as u64,
737                })
738            } else {
739                Err(ParseError::expected(
740                    vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
741                    self.peek(),
742                    self.position(),
743                ))
744            }
745        } else if self.consume_ident_ci("UNSET")? {
746            // `UNSET RETENTION` — clears the declarative retention policy.
747            if self.consume(&Token::Retention)? {
748                Ok(AlterOperation::UnsetRetention)
749            } else {
750                Err(ParseError::expected(
751                    vec!["RETENTION"],
752                    self.peek(),
753                    self.position(),
754                ))
755            }
756        } else {
757            Err(ParseError::expected(
758                vec![
759                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
760                    "UNSET",
761                ],
762                self.peek(),
763                self.position(),
764            ))
765        }
766    }
767
768    fn parse_subscription_descriptor(
769        &mut self,
770        source: String,
771    ) -> Result<SubscriptionDescriptor, ParseError> {
772        let mut ops_filter = Vec::new();
773        if self.consume(&Token::LParen)? {
774            loop {
775                let op = if self.consume(&Token::Insert)? {
776                    SubscriptionOperation::Insert
777                } else if self.consume(&Token::Update)? {
778                    SubscriptionOperation::Update
779                } else if self.consume(&Token::Delete)? {
780                    SubscriptionOperation::Delete
781                } else {
782                    return Err(ParseError::expected(
783                        vec!["INSERT", "UPDATE", "DELETE"],
784                        self.peek(),
785                        self.position(),
786                    ));
787                };
788                ops_filter.push(op);
789                if !self.consume(&Token::Comma)? {
790                    break;
791                }
792            }
793            self.expect(Token::RParen)?;
794        }
795
796        let target_queue = if self.consume(&Token::To)? {
797            self.expect_ident()?
798        } else {
799            format!("{source}_events")
800        };
801
802        let mut redact_fields = Vec::new();
803        if self.consume_ident_ci("REDACT")? {
804            self.expect(Token::LParen)?;
805            loop {
806                redact_fields.push(self.parse_dotted_redact_path()?);
807                if !self.consume(&Token::Comma)? {
808                    break;
809                }
810            }
811            self.expect(Token::RParen)?;
812        }
813
814        let where_filter = if self.consume(&Token::Where)? {
815            Some(self.collect_subscription_where_filter()?)
816        } else {
817            None
818        };
819
820        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
821        let all_tenants = if self.consume(&Token::On)? {
822            self.expect(Token::All)?;
823            if !self.consume_ident_ci("TENANTS")? {
824                return Err(ParseError::expected(
825                    vec!["TENANTS"],
826                    self.peek(),
827                    self.position(),
828                ));
829            }
830            true
831        } else {
832            false
833        };
834
835        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
836        if self.consume_ident_ci("REQUIRES")? {
837            self.consume_ident_ci("CAPABILITY")?;
838            // consume the capability string literal token
839            self.advance()?;
840        }
841
842        Ok(SubscriptionDescriptor {
843            name: String::new(),
844            source,
845            target_queue,
846            ops_filter,
847            where_filter,
848            redact_fields,
849            enabled: true,
850            all_tenants,
851        })
852    }
853
854    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
855    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
856        let mut parts = Vec::new();
857        if self.consume(&Token::Star)? {
858            parts.push("*".to_string());
859        } else {
860            parts.push(self.expect_ident_or_keyword()?);
861        }
862        while self.consume(&Token::Dot)? {
863            if self.consume(&Token::Star)? {
864                parts.push("*".to_string());
865            } else {
866                parts.push(self.expect_ident_or_keyword()?);
867            }
868        }
869        Ok(parts.join("."))
870    }
871
872    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
873        let mut parts = Vec::new();
874        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
875            parts.push(self.peek().to_string());
876            self.advance()?;
877        }
878        if parts.is_empty() {
879            return Err(ParseError::expected(
880                vec!["predicate"],
881                self.peek(),
882                self.position(),
883            ));
884        }
885        Ok(parts.join(" "))
886    }
887
888    /// Capture remaining tokens as a display-joined string.
889    ///
890    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
891    /// bound clause into storage without needing a dedicated per-kind AST.
892    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
893        let mut parts: Vec<String> = Vec::new();
894        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
895            parts.push(self.peek().to_string());
896            self.advance()?;
897        }
898        Ok(parts.join(" "))
899    }
900
901    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
902    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
903        let name = self.expect_column_ident()?;
904        let sql_type = self.parse_column_type()?;
905        let data_type = sql_type.to_string();
906
907        let mut def = CreateColumnDef {
908            name,
909            data_type,
910            sql_type: sql_type.clone(),
911            not_null: false,
912            default: None,
913            compress: None,
914            unique: false,
915            primary_key: false,
916            enum_variants: sql_type.enum_variants().unwrap_or_default(),
917            array_element: sql_type.array_element_type(),
918            decimal_precision: sql_type.decimal_precision(),
919        };
920
921        // Parse modifiers in any order
922        loop {
923            if self.match_not_null()? {
924                def.not_null = true;
925            } else if self.consume(&Token::Default)? {
926                self.expect(Token::Eq)?;
927                def.default = Some(self.parse_literal_string_for_ddl()?);
928            } else if self.consume(&Token::Compress)? {
929                self.expect(Token::Colon)?;
930                def.compress = Some(self.parse_integer()? as u8);
931            } else if self.consume(&Token::Unique)? {
932                def.unique = true;
933            } else if self.match_primary_key()? {
934                def.primary_key = true;
935            } else {
936                break;
937            }
938        }
939
940        Ok(def)
941    }
942
943    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
944    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
945        let type_name = self.expect_ident_or_keyword()?;
946        if self.consume(&Token::LParen)? {
947            let inner = self.parse_type_params()?;
948            self.expect(Token::RParen)?;
949            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
950        } else {
951            Ok(SqlTypeName::new(type_name))
952        }
953    }
954
955    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
956    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
957        let mut parts = Vec::new();
958        loop {
959            match self.peek().clone() {
960                Token::String(s) => {
961                    let s = s.clone();
962                    self.advance()?;
963                    parts.push(TypeModifier::StringLiteral(s));
964                }
965                Token::Integer(n) => {
966                    self.advance()?;
967                    parts.push(TypeModifier::Number(n as u32));
968                }
969                _ => {
970                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
971                }
972            }
973            if !self.consume(&Token::Comma)? {
974                break;
975            }
976        }
977        Ok(parts)
978    }
979
980    /// Parse a literal string value for DDL DEFAULT expressions
981    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
982        match self.peek().clone() {
983            Token::String(s) => {
984                let s = s.clone();
985                self.advance()?;
986                Ok(s)
987            }
988            Token::Integer(n) => {
989                self.advance()?;
990                Ok(n.to_string())
991            }
992            Token::Float(n) => {
993                self.advance()?;
994                Ok(n.to_string())
995            }
996            Token::True => {
997                self.advance()?;
998                Ok("true".to_string())
999            }
1000            Token::False => {
1001                self.advance()?;
1002                Ok("false".to_string())
1003            }
1004            Token::Null => {
1005                self.advance()?;
1006                Ok("null".to_string())
1007            }
1008            ref other => Err(ParseError::expected(
1009                vec!["string", "number", "true", "false", "null"],
1010                other,
1011                self.position(),
1012            )),
1013        }
1014    }
1015
1016    fn check_ttl_keyword(&self) -> bool {
1017        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1018    }
1019
1020    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
1021    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
1022    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1023        self.expect(Token::Eq)?;
1024        match self.peek() {
1025            Token::True => {
1026                self.advance()?;
1027                Ok(true)
1028            }
1029            Token::False => {
1030                self.advance()?;
1031                Ok(false)
1032            }
1033            other => Err(ParseError::expected(
1034                vec!["true", "false"],
1035                other,
1036                self.position(),
1037            )),
1038        }
1039    }
1040
1041    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1042        if self.consume_ident_ci(expected)? {
1043            Ok(())
1044        } else {
1045            Err(ParseError::expected(
1046                vec![expected],
1047                self.peek(),
1048                self.position(),
1049            ))
1050        }
1051    }
1052
1053    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1054        let option_name = self.expect_ident_or_keyword()?;
1055        if !option_name.eq_ignore_ascii_case("ttl") {
1056            return Err(ParseError::new(
1057                // F-05: `option_name` is caller-controlled identifier text.
1058                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
1059                // before the message reaches downstream serialization sinks.
1060                format!(
1061                    "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1062                ),
1063                self.position(),
1064            ));
1065        }
1066
1067        let ttl_value = self.parse_float()?;
1068        let ttl_unit = match self.peek() {
1069            Token::Ident(unit) => {
1070                let unit = unit.clone();
1071                self.advance()?;
1072                unit
1073            }
1074            _ => "s".to_string(),
1075        };
1076
1077        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1078            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1079            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1080            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1081            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1082            "d" | "day" | "days" => 86_400_000.0,
1083            other => {
1084                return Err(ParseError::new(
1085                    // F-05: render `other` via `{:?}` so caller-controlled
1086                    // bytes (CR / LF / NUL / quotes) are escaped before
1087                    // reaching downstream serialization sinks.
1088                    format!(
1089                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1090                    ),
1091                    self.position(),
1092                ));
1093            }
1094        };
1095
1096        if !ttl_value.is_finite() || ttl_value < 0.0 {
1097            return Err(ParseError::new(
1098                "TTL must be a finite, non-negative duration".to_string(),
1099                self.position(),
1100            ));
1101        }
1102
1103        let ttl_ms = ttl_value * multiplier_ms;
1104        if ttl_ms > u64::MAX as f64 {
1105            return Err(ParseError::new(
1106                "TTL duration is too large".to_string(),
1107                self.position(),
1108            ));
1109        }
1110        if ttl_ms.fract().abs() >= f64::EPSILON {
1111            return Err(ParseError::new(
1112                "TTL duration must resolve to a whole number of milliseconds".to_string(),
1113                self.position(),
1114            ));
1115        }
1116
1117        Ok(Some(ttl_ms as u64))
1118    }
1119
1120    /// Try to match IF NOT EXISTS sequence
1121    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1122        if self.check(&Token::If) {
1123            self.advance()?;
1124            self.expect(Token::Not)?;
1125            self.expect(Token::Exists)?;
1126            Ok(true)
1127        } else {
1128            Ok(false)
1129        }
1130    }
1131
1132    /// Try to match IF EXISTS sequence
1133    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1134        if self.check(&Token::If) {
1135            self.advance()?;
1136            self.expect(Token::Exists)?;
1137            Ok(true)
1138        } else {
1139            Ok(false)
1140        }
1141    }
1142
1143    /// Try to match NOT NULL sequence
1144    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1145        if self.check(&Token::Not) {
1146            // Peek ahead - only consume if followed by NULL
1147            // We need to be careful: save state and try
1148            self.advance()?; // consume NOT
1149            if self.check(&Token::Null) {
1150                self.advance()?; // consume NULL
1151                Ok(true)
1152            } else {
1153                // This is tricky - NOT was consumed but next isn't NULL.
1154                // In column modifier context, NOT should only appear before NULL.
1155                // Return error for clarity.
1156                Err(ParseError::expected(
1157                    vec!["NULL (after NOT)"],
1158                    self.peek(),
1159                    self.position(),
1160                ))
1161            }
1162        } else {
1163            Ok(false)
1164        }
1165    }
1166
1167    /// Try to match PRIMARY KEY sequence
1168    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1169        if self.check(&Token::Primary) {
1170            self.advance()?;
1171            self.expect(Token::Key)?;
1172            Ok(true)
1173        } else {
1174            Ok(false)
1175        }
1176    }
1177}
1178
1179/// Decode a 64-char lowercase/uppercase hex string into a 32-byte array.
1180/// Returns a human-readable error message on length or character violations.
1181/// Used by `SIGNED_BY` clause parsing (issue #520) to surface bad pubkeys
1182/// at parse-time rather than downstream in the engine.
1183fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1184    if s.len() != 64 {
1185        return Err(format!("expected 64 hex chars, got {}", s.len()));
1186    }
1187    let mut out = [0u8; 32];
1188    let bytes = s.as_bytes();
1189    for i in 0..32 {
1190        let hi = hex_nibble(bytes[i * 2])?;
1191        let lo = hex_nibble(bytes[i * 2 + 1])?;
1192        out[i] = (hi << 4) | lo;
1193    }
1194    Ok(out)
1195}
1196
1197fn hex_nibble(c: u8) -> Result<u8, String> {
1198    match c {
1199        b'0'..=b'9' => Ok(c - b'0'),
1200        b'a'..=b'f' => Ok(c - b'a' + 10),
1201        b'A'..=b'F' => Ok(c - b'A' + 10),
1202        _ => Err(format!("non-hex char: {:?}", c as char)),
1203    }
1204}