Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7    PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            vault_own_master_key: false,
87        }))
88    }
89
90    /// Parse: DROP TABLE [IF EXISTS] name
91    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
92        self.expect(Token::Drop)?;
93        self.expect(Token::Table)?;
94        self.parse_drop_table_body()
95    }
96
97    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
98    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
99        let if_not_exists = self.match_if_not_exists()?;
100        let name = self.expect_ident()?;
101
102        self.expect(Token::LParen)?;
103        let mut columns = Vec::new();
104        loop {
105            let col = self.parse_column_def()?;
106            columns.push(col);
107            if !self.consume(&Token::Comma)? {
108                break;
109            }
110        }
111        self.expect(Token::RParen)?;
112
113        let mut default_ttl_ms = None;
114        let mut context_index_fields = Vec::new();
115        let mut context_index_enabled = false;
116        let mut timestamps = false;
117        let mut tenant_by: Option<String> = None;
118        let mut append_only = false;
119        let mut subscriptions = Vec::new();
120
121        while self.consume(&Token::With)? {
122            if self.consume_ident_ci("EVENTS")? {
123                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
124                continue;
125            }
126            // Accept both spellings:
127            //   WITH key = value
128            //   WITH (key = value, key = value)
129            // Postgres / ClickHouse use the parenthesised form; the
130            // bare form is our legacy shorthand. The parenthesised
131            // form collects options separated by commas until `)`.
132            let has_parens = self.consume(&Token::LParen)?;
133
134            loop {
135                if self.consume_ident_ci("CONTEXT_INDEX")? {
136                    context_index_enabled = self.parse_bool_assign()?;
137                } else if self.consume_ident_ci("CONTEXT")? {
138                    if !self.consume(&Token::Index)? {
139                        return Err(ParseError::expected(
140                            vec!["INDEX"],
141                            self.peek(),
142                            self.position(),
143                        ));
144                    }
145                    self.expect(Token::On)?;
146                    self.expect(Token::LParen)?;
147                    loop {
148                        context_index_fields.push(self.expect_ident()?);
149                        if !self.consume(&Token::Comma)? {
150                            break;
151                        }
152                    }
153                    self.expect(Token::RParen)?;
154                    context_index_enabled = true;
155                } else if self.consume_ident_ci("TIMESTAMPS")? {
156                    timestamps = self.parse_bool_assign()?;
157                } else if self.consume_ident_ci("APPEND_ONLY")? {
158                    append_only = self.parse_bool_assign()?;
159                } else if self.consume_ident_ci("TENANT_BY")? {
160                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
161                    // and expects a string literal column name.
162                    let _ = self.consume(&Token::Eq)?;
163                    let value = self.parse_literal_value()?;
164                    match value {
165                        Value::Text(col) => tenant_by = Some(col.to_string()),
166                        other => {
167                            return Err(ParseError::new(
168                                format!("WITH tenant_by expects a text literal, got {other:?}"),
169                                self.position(),
170                            ));
171                        }
172                    }
173                } else {
174                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
175                }
176                if has_parens {
177                    if self.consume(&Token::Comma)? {
178                        continue;
179                    }
180                    self.expect(Token::RParen)?;
181                }
182                break;
183            }
184        }
185
186        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
187        let partition_by = if self.consume(&Token::Partition)? {
188            self.expect(Token::By)?;
189            let kind = if self.consume(&Token::Range)? {
190                PartitionKind::Range
191            } else if self.consume(&Token::List)? {
192                PartitionKind::List
193            } else if self.consume(&Token::Hash)? {
194                PartitionKind::Hash
195            } else {
196                return Err(ParseError::expected(
197                    vec!["RANGE", "LIST", "HASH"],
198                    self.peek(),
199                    self.position(),
200                ));
201            };
202            self.expect(Token::LParen)?;
203            let column = self.expect_ident()?;
204            self.expect(Token::RParen)?;
205            Some(PartitionSpec { kind, column })
206        } else {
207            None
208        };
209
210        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
211        // style). Accepted after partition spec / tenant spec / or on
212        // its own. `WITH (append_only = true)` is the other form and
213        // handled above.
214        if !append_only && self.consume_ident_ci("APPEND")? {
215            if !self.consume_ident_ci("ONLY")? {
216                return Err(ParseError::expected(
217                    vec!["ONLY"],
218                    self.peek(),
219                    self.position(),
220                ));
221            }
222            append_only = true;
223        }
224
225        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
226        // trailing clause (after partition spec if both are used).
227        //
228        // Dotted paths let non-table models declare tenancy over their
229        // natural nested structures — `metadata.tenant` for vectors,
230        // `payload.tenant` for queue messages, `tags.cluster` for
231        // timeseries, `properties.org` for graphs. The read-path
232        // resolver already navigates these paths via
233        // `resolve_runtime_document_path`; here we just store the
234        // dotted string and let the policy evaluator do the rest.
235        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
236            self.expect(Token::By)?;
237            self.expect(Token::LParen)?;
238            // Allow keyword-idents (`metadata`, `type`, `data`) as
239            // column names — SQL treats them as bare identifiers in
240            // this context.
241            let mut path = self.expect_ident_or_keyword()?;
242            while self.consume(&Token::Dot)? {
243                let next = self.expect_ident_or_keyword()?;
244                path = format!("{path}.{next}");
245            }
246            self.expect(Token::RParen)?;
247            tenant_by = Some(path);
248        }
249
250        Ok(QueryExpr::CreateTable(CreateTableQuery {
251            collection_model: CollectionModel::Table,
252            name,
253            columns,
254            if_not_exists,
255            default_ttl_ms,
256            metrics_rollup_policies: Vec::new(),
257            context_index_fields,
258            context_index_enabled,
259            timestamps,
260            partition_by,
261            tenant_by,
262            append_only,
263            subscriptions,
264            vault_own_master_key: false,
265        }))
266    }
267
268    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
269    ///
270    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
271    /// difference between the table's current contract and the target CREATE
272    /// TABLE body.
273    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
274        self.expect(Token::Explain)?;
275        self.expect(Token::Alter)?;
276        self.expect(Token::For)?;
277        self.expect(Token::Create)?;
278        self.expect(Token::Table)?;
279
280        let body = self.parse_create_table_body()?;
281        let target = match body {
282            QueryExpr::CreateTable(t) => t,
283            _ => {
284                return Err(ParseError::new(
285                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
286                        .to_string(),
287                    self.position(),
288                ));
289            }
290        };
291
292        let format = if self.consume(&Token::Format)? {
293            if self.consume(&Token::Json)? {
294                ExplainFormat::Json
295            } else if self.consume_ident_ci("SQL")? {
296                ExplainFormat::Sql
297            } else {
298                return Err(ParseError::expected(
299                    vec!["JSON", "SQL"],
300                    self.peek(),
301                    self.position(),
302                ));
303            }
304        } else {
305            ExplainFormat::Sql
306        };
307
308        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
309            target,
310            format,
311        }))
312    }
313
314    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
315    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
316        let if_exists = self.match_if_exists()?;
317        let name = self.parse_drop_collection_name()?;
318        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
319    }
320
321    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
322        let if_exists = self.match_if_exists()?;
323        let name = self.parse_drop_collection_name()?;
324        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
325    }
326
327    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
328        let if_exists = self.match_if_exists()?;
329        let name = self.parse_drop_collection_name()?;
330        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
331    }
332
333    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
334        let if_exists = self.match_if_exists()?;
335        let name = self.parse_drop_collection_name()?;
336        Ok(QueryExpr::DropDocument(DropDocumentQuery {
337            name,
338            if_exists,
339        }))
340    }
341
342    pub fn parse_create_keyed_body(
343        &mut self,
344        model: CollectionModel,
345    ) -> Result<QueryExpr, ParseError> {
346        let if_not_exists = self.match_if_not_exists()?;
347        let name = self.parse_drop_collection_name()?;
348        let vault_own_master_key =
349            if model == CollectionModel::Vault && self.consume(&Token::With)? {
350                if !self.consume_ident_ci("OWN")? {
351                    return Err(ParseError::expected(
352                        vec!["OWN"],
353                        self.peek(),
354                        self.position(),
355                    ));
356                }
357                if !self.consume_ident_ci("MASTER")? {
358                    return Err(ParseError::expected(
359                        vec!["MASTER"],
360                        self.peek(),
361                        self.position(),
362                    ));
363                }
364                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
365                    return Err(ParseError::expected(
366                        vec!["KEY"],
367                        self.peek(),
368                        self.position(),
369                    ));
370                }
371                true
372            } else {
373                false
374            };
375        Ok(QueryExpr::CreateTable(CreateTableQuery {
376            collection_model: model,
377            name,
378            columns: Vec::new(),
379            if_not_exists,
380            default_ttl_ms: None,
381            metrics_rollup_policies: Vec::new(),
382            context_index_fields: Vec::new(),
383            context_index_enabled: false,
384            timestamps: false,
385            partition_by: None,
386            tenant_by: None,
387            append_only: false,
388            subscriptions: Vec::new(),
389            vault_own_master_key,
390        }))
391    }
392
393    pub fn parse_create_collection_model_body(
394        &mut self,
395        model: CollectionModel,
396    ) -> Result<QueryExpr, ParseError> {
397        self.parse_create_keyed_body(model)
398    }
399
400    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
401        let if_not_exists = self.match_if_not_exists()?;
402        let name = self.parse_drop_collection_name()?;
403        if !self.consume_ident_ci("KIND")? {
404            return Err(ParseError::expected(
405                vec!["KIND"],
406                self.peek(),
407                self.position(),
408            ));
409        }
410        let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
411        while self.consume(&Token::Dot)? {
412            let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
413            kind.push('.');
414            kind.push_str(&part);
415        }
416        let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
417            if !self.consume_ident_ci("DIM")? {
418                return Err(ParseError::expected(
419                    vec!["DIM"],
420                    self.peek(),
421                    self.position(),
422                ));
423            }
424            let dimension = self.parse_integer()?;
425            if dimension <= 0 {
426                return Err(ParseError::new(
427                    "VECTOR DIM must be a positive integer".to_string(),
428                    self.position(),
429                ));
430            }
431            let metric = if self.consume(&Token::Metric)? {
432                self.parse_distance_metric()?
433            } else {
434                crate::storage::engine::distance::DistanceMetric::Cosine
435            };
436            (Some(dimension as usize), Some(metric))
437        } else {
438            (None, None)
439        };
440        let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
441            self.parse_signed_by_list()?
442        } else {
443            Vec::new()
444        };
445        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
446            name,
447            kind,
448            if_not_exists,
449            vector_dimension,
450            vector_metric,
451            allowed_signers,
452        }))
453    }
454
455    /// Parse a single `'hex32'` string literal as a 32-byte Ed25519
456    /// pubkey. Used by `ALTER COLLECTION ... ADD|REVOKE SIGNER 'hex'`
457    /// (issue #522).
458    fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
459        let hex = match self.peek().clone() {
460            Token::String(s) => {
461                self.advance()?;
462                s
463            }
464            _ => {
465                return Err(ParseError::expected(
466                    vec!["string literal (ed25519 pubkey hex)"],
467                    self.peek(),
468                    self.position(),
469                ));
470            }
471        };
472        decode_hex_32(&hex).map_err(|msg| {
473            ParseError::new(
474                format!("SIGNER pubkey '{hex}' invalid: {msg}"),
475                self.position(),
476            )
477        })
478    }
479
480    /// Parse `( 'hex32', 'hex32', ... )` — Ed25519 pubkey list. Each entry
481    /// must decode to exactly 32 bytes. Used by both `CREATE COLLECTION ...
482    /// SIGNED_BY (...)` and (in a later iteration) `ALTER COLLECTION` signer
483    /// mutations. Issue #520.
484    fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
485        self.expect(Token::LParen)?;
486        let mut out = Vec::new();
487        loop {
488            let hex = match self.peek().clone() {
489                Token::String(s) => {
490                    self.advance()?;
491                    s
492                }
493                _ => {
494                    return Err(ParseError::expected(
495                        vec!["string literal (ed25519 pubkey hex)"],
496                        self.peek(),
497                        self.position(),
498                    ));
499                }
500            };
501            let bytes = decode_hex_32(&hex).map_err(|msg| {
502                ParseError::new(
503                    format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
504                    self.position(),
505                )
506            })?;
507            out.push(bytes);
508            if !self.consume(&Token::Comma)? {
509                break;
510            }
511        }
512        self.expect(Token::RParen)?;
513        if out.is_empty() {
514            return Err(ParseError::new(
515                "SIGNED_BY list must contain at least one pubkey".to_string(),
516                self.position(),
517            ));
518        }
519        Ok(out)
520    }
521
522    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
523        let if_not_exists = self.match_if_not_exists()?;
524        let name = self.parse_drop_collection_name()?;
525        if !self.consume_ident_ci("DIM")? {
526            return Err(ParseError::expected(
527                vec!["DIM"],
528                self.peek(),
529                self.position(),
530            ));
531        }
532        let dimension = self.parse_integer()?;
533        if dimension <= 0 {
534            return Err(ParseError::new(
535                "VECTOR DIM must be a positive integer".to_string(),
536                self.position(),
537            ));
538        }
539        let metric = if self.consume(&Token::Metric)? {
540            self.parse_distance_metric()?
541        } else {
542            crate::storage::engine::distance::DistanceMetric::Cosine
543        };
544        Ok(QueryExpr::CreateVector(CreateVectorQuery {
545            name,
546            dimension: dimension as usize,
547            metric,
548            if_not_exists,
549        }))
550    }
551
552    pub fn parse_drop_keyed_body(
553        &mut self,
554        model: CollectionModel,
555    ) -> Result<QueryExpr, ParseError> {
556        let if_exists = self.match_if_exists()?;
557        let name = self.parse_drop_collection_name()?;
558        Ok(QueryExpr::DropKv(DropKvQuery {
559            name,
560            if_exists,
561            model,
562        }))
563    }
564
565    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
566        self.parse_drop_keyed_body(CollectionModel::Kv)
567    }
568
569    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
570        self.parse_drop_collection_model_body(None)
571    }
572
573    pub fn parse_drop_collection_model_body(
574        &mut self,
575        model: Option<CollectionModel>,
576    ) -> Result<QueryExpr, ParseError> {
577        let if_exists = self.match_if_exists()?;
578        let name = self.parse_drop_collection_name()?;
579        Ok(QueryExpr::DropCollection(DropCollectionQuery {
580            name,
581            if_exists,
582            model,
583        }))
584    }
585
586    pub fn parse_truncate_body(
587        &mut self,
588        model: Option<CollectionModel>,
589    ) -> Result<QueryExpr, ParseError> {
590        let if_exists = self.match_if_exists()?;
591        let name = self.parse_drop_collection_name()?;
592        Ok(QueryExpr::Truncate(TruncateQuery {
593            name,
594            model,
595            if_exists,
596        }))
597    }
598
599    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
600        let mut name = self.expect_ident()?;
601        while self.consume(&Token::Dot)? {
602            if self.consume(&Token::Star)? {
603                name.push_str(".*");
604                break;
605            }
606            let next = self.expect_ident_or_keyword()?;
607            name = format!("{name}.{next}");
608        }
609        Ok(name)
610    }
611
612    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
613    ///
614    /// Also accepts `ALTER COLLECTION name ADD|REVOKE SIGNER 'hex'`
615    /// (issue #522) — collection-level signer registry mutations share
616    /// the AlterTable AST so the existing executor dispatch path picks
617    /// them up without a new top-level variant.
618    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
619        self.expect(Token::Alter)?;
620        if !self.consume(&Token::Table)?
621            && !self.consume(&Token::Collection)?
622            && !self.consume_ident_ci("COLLECTION")?
623        {
624            return Err(ParseError::expected(
625                vec!["TABLE", "COLLECTION"],
626                self.peek(),
627                self.position(),
628            ));
629        }
630        let name = self.expect_ident()?;
631
632        let mut operations = Vec::new();
633        loop {
634            let op = self.parse_alter_operation(&name)?;
635            operations.push(op);
636            if !self.consume(&Token::Comma)? {
637                break;
638            }
639        }
640
641        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
642    }
643
644    /// Parse a single ALTER TABLE operation
645    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
646        if self.consume(&Token::Add)? {
647            if self.consume_ident_ci("SUBSCRIPTION")? {
648                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
649                let sub_name = self.expect_ident()?;
650                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
651                Ok(AlterOperation::AddSubscription {
652                    name: sub_name,
653                    descriptor,
654                })
655            } else if self.consume_ident_ci("SIGNER")? {
656                // ADD SIGNER 'hex_pubkey' — issue #522.
657                let pubkey = self.parse_single_signer_hex()?;
658                Ok(AlterOperation::AddSigner { pubkey })
659            } else {
660                // ADD COLUMN definition (COLUMN keyword is optional)
661                let _ = self.consume(&Token::Column)?;
662                let col_def = self.parse_column_def()?;
663                Ok(AlterOperation::AddColumn(col_def))
664            }
665        } else if self.consume_ident_ci("REVOKE")? {
666            // REVOKE SIGNER 'hex_pubkey' — issue #522.
667            if !self.consume_ident_ci("SIGNER")? {
668                return Err(ParseError::expected(
669                    vec!["SIGNER"],
670                    self.peek(),
671                    self.position(),
672                ));
673            }
674            let pubkey = self.parse_single_signer_hex()?;
675            Ok(AlterOperation::RevokeSigner { pubkey })
676        } else if self.consume(&Token::Drop)? {
677            if self.consume_ident_ci("SUBSCRIPTION")? {
678                // DROP SUBSCRIPTION name
679                let sub_name = self.expect_ident()?;
680                Ok(AlterOperation::DropSubscription { name: sub_name })
681            } else {
682                // DROP COLUMN name (COLUMN keyword is optional)
683                let _ = self.consume(&Token::Column)?;
684                let col_name = self.expect_ident()?;
685                Ok(AlterOperation::DropColumn(col_name))
686            }
687        } else if self.consume(&Token::Rename)? {
688            // RENAME COLUMN from TO to
689            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
690            let from = self.expect_ident()?;
691            self.expect(Token::To)?;
692            let to = self.expect_ident()?;
693            Ok(AlterOperation::RenameColumn { from, to })
694        } else if self.consume(&Token::Attach)? {
695            // ATTACH PARTITION child FOR VALUES ...
696            self.expect(Token::Partition)?;
697            let child = self.expect_ident()?;
698            self.expect(Token::For)?;
699            // Accept `VALUES` as an ident since the grammar doesn't have it
700            // as a reserved keyword everywhere. Collect the remaining tokens
701            // as a raw bound string for round-trip persistence.
702            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
703                return Err(ParseError::expected(
704                    vec!["VALUES"],
705                    self.peek(),
706                    self.position(),
707                ));
708            }
709            let bound = self.collect_remaining_tokens_as_string()?;
710            Ok(AlterOperation::AttachPartition { child, bound })
711        } else if self.consume(&Token::Detach)? {
712            // DETACH PARTITION child
713            self.expect(Token::Partition)?;
714            let child = self.expect_ident()?;
715            Ok(AlterOperation::DetachPartition { child })
716        } else if self.consume(&Token::Enable)? {
717            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
718            if self.consume_ident_ci("EVENTS")? {
719                Ok(AlterOperation::EnableEvents(
720                    self.parse_subscription_descriptor(table_name.to_string())?,
721                ))
722            } else if self.consume_ident_ci("TENANCY")? {
723                self.expect(Token::On)?;
724                self.expect(Token::LParen)?;
725                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
726                let mut path = self.expect_ident_or_keyword()?;
727                while self.consume(&Token::Dot)? {
728                    let next = self.expect_ident_or_keyword()?;
729                    path = format!("{path}.{next}");
730                }
731                self.expect(Token::RParen)?;
732                Ok(AlterOperation::EnableTenancy { column: path })
733            } else {
734                self.expect(Token::Row)?;
735                self.expect(Token::Level)?;
736                self.expect(Token::Security)?;
737                Ok(AlterOperation::EnableRowLevelSecurity)
738            }
739        } else if self.consume(&Token::Disable)? {
740            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
741            if self.consume_ident_ci("EVENTS")? {
742                Ok(AlterOperation::DisableEvents)
743            } else if self.consume_ident_ci("TENANCY")? {
744                Ok(AlterOperation::DisableTenancy)
745            } else {
746                self.expect(Token::Row)?;
747                self.expect(Token::Level)?;
748                self.expect(Token::Security)?;
749                Ok(AlterOperation::DisableRowLevelSecurity)
750            }
751        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
752            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
753            // SET RETENTION <duration> (issue #580)
754            if self.consume_ident_ci("APPEND_ONLY")? {
755                let on = self.parse_bool_assign()?;
756                Ok(AlterOperation::SetAppendOnly(on))
757            } else if self.consume_ident_ci("VERSIONED")? {
758                let on = self.parse_bool_assign()?;
759                Ok(AlterOperation::SetVersioned(on))
760            } else if self.consume(&Token::Retention)? {
761                // `SET RETENTION <duration>` — reuse the same float+unit
762                // grammar the timeseries CREATE clause uses so `7 DAYS`,
763                // `30 m`, `1 h`, `90 d` all parse identically.
764                let value = self.parse_float()?;
765                let unit = self.parse_duration_unit()?;
766                Ok(AlterOperation::SetRetention {
767                    duration_ms: (value * unit) as u64,
768                })
769            } else {
770                Err(ParseError::expected(
771                    vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
772                    self.peek(),
773                    self.position(),
774                ))
775            }
776        } else if self.consume_ident_ci("UNSET")? {
777            // `UNSET RETENTION` — clears the declarative retention policy.
778            if self.consume(&Token::Retention)? {
779                Ok(AlterOperation::UnsetRetention)
780            } else {
781                Err(ParseError::expected(
782                    vec!["RETENTION"],
783                    self.peek(),
784                    self.position(),
785                ))
786            }
787        } else {
788            Err(ParseError::expected(
789                vec![
790                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
791                    "UNSET",
792                ],
793                self.peek(),
794                self.position(),
795            ))
796        }
797    }
798
799    fn parse_subscription_descriptor(
800        &mut self,
801        source: String,
802    ) -> Result<SubscriptionDescriptor, ParseError> {
803        let mut ops_filter = Vec::new();
804        if self.consume(&Token::LParen)? {
805            loop {
806                let op = if self.consume(&Token::Insert)? {
807                    SubscriptionOperation::Insert
808                } else if self.consume(&Token::Update)? {
809                    SubscriptionOperation::Update
810                } else if self.consume(&Token::Delete)? {
811                    SubscriptionOperation::Delete
812                } else {
813                    return Err(ParseError::expected(
814                        vec!["INSERT", "UPDATE", "DELETE"],
815                        self.peek(),
816                        self.position(),
817                    ));
818                };
819                ops_filter.push(op);
820                if !self.consume(&Token::Comma)? {
821                    break;
822                }
823            }
824            self.expect(Token::RParen)?;
825        }
826
827        let target_queue = if self.consume(&Token::To)? {
828            self.expect_ident()?
829        } else {
830            format!("{source}_events")
831        };
832
833        let mut redact_fields = Vec::new();
834        if self.consume_ident_ci("REDACT")? {
835            self.expect(Token::LParen)?;
836            loop {
837                redact_fields.push(self.parse_dotted_redact_path()?);
838                if !self.consume(&Token::Comma)? {
839                    break;
840                }
841            }
842            self.expect(Token::RParen)?;
843        }
844
845        let where_filter = if self.consume(&Token::Where)? {
846            Some(self.collect_subscription_where_filter()?)
847        } else {
848            None
849        };
850
851        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
852        let all_tenants = if self.consume(&Token::On)? {
853            self.expect(Token::All)?;
854            if !self.consume_ident_ci("TENANTS")? {
855                return Err(ParseError::expected(
856                    vec!["TENANTS"],
857                    self.peek(),
858                    self.position(),
859                ));
860            }
861            true
862        } else {
863            false
864        };
865
866        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
867        if self.consume_ident_ci("REQUIRES")? {
868            self.consume_ident_ci("CAPABILITY")?;
869            // consume the capability string literal token
870            self.advance()?;
871        }
872
873        Ok(SubscriptionDescriptor {
874            name: String::new(),
875            source,
876            target_queue,
877            ops_filter,
878            where_filter,
879            redact_fields,
880            enabled: true,
881            all_tenants,
882        })
883    }
884
885    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
886    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
887        let mut parts = Vec::new();
888        if self.consume(&Token::Star)? {
889            parts.push("*".to_string());
890        } else {
891            parts.push(self.expect_ident_or_keyword()?);
892        }
893        while self.consume(&Token::Dot)? {
894            if self.consume(&Token::Star)? {
895                parts.push("*".to_string());
896            } else {
897                parts.push(self.expect_ident_or_keyword()?);
898            }
899        }
900        Ok(parts.join("."))
901    }
902
903    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
904        let mut parts = Vec::new();
905        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
906            parts.push(self.peek().to_string());
907            self.advance()?;
908        }
909        if parts.is_empty() {
910            return Err(ParseError::expected(
911                vec!["predicate"],
912                self.peek(),
913                self.position(),
914            ));
915        }
916        Ok(parts.join(" "))
917    }
918
919    /// Capture remaining tokens as a display-joined string.
920    ///
921    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
922    /// bound clause into storage without needing a dedicated per-kind AST.
923    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
924        let mut parts: Vec<String> = Vec::new();
925        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
926            parts.push(self.peek().to_string());
927            self.advance()?;
928        }
929        Ok(parts.join(" "))
930    }
931
932    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
933    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
934        let name = self.expect_column_ident()?;
935        let sql_type = self.parse_column_type()?;
936        let data_type = sql_type.to_string();
937
938        let mut def = CreateColumnDef {
939            name,
940            data_type,
941            sql_type: sql_type.clone(),
942            not_null: false,
943            default: None,
944            compress: None,
945            unique: false,
946            primary_key: false,
947            enum_variants: sql_type.enum_variants().unwrap_or_default(),
948            array_element: sql_type.array_element_type(),
949            decimal_precision: sql_type.decimal_precision(),
950        };
951
952        // Parse modifiers in any order
953        loop {
954            if self.match_not_null()? {
955                def.not_null = true;
956            } else if self.consume(&Token::Default)? {
957                self.expect(Token::Eq)?;
958                def.default = Some(self.parse_literal_string_for_ddl()?);
959            } else if self.consume(&Token::Compress)? {
960                self.expect(Token::Colon)?;
961                def.compress = Some(self.parse_integer()? as u8);
962            } else if self.consume(&Token::Unique)? {
963                def.unique = true;
964            } else if self.match_primary_key()? {
965                def.primary_key = true;
966            } else {
967                break;
968            }
969        }
970
971        Ok(def)
972    }
973
974    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
975    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
976        let type_name = self.expect_ident_or_keyword()?;
977        if self.consume(&Token::LParen)? {
978            let inner = self.parse_type_params()?;
979            self.expect(Token::RParen)?;
980            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
981        } else {
982            Ok(SqlTypeName::new(type_name))
983        }
984    }
985
986    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
987    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
988        let mut parts = Vec::new();
989        loop {
990            match self.peek().clone() {
991                Token::String(s) => {
992                    let s = s.clone();
993                    self.advance()?;
994                    parts.push(TypeModifier::StringLiteral(s));
995                }
996                Token::Integer(n) => {
997                    self.advance()?;
998                    parts.push(TypeModifier::Number(n as u32));
999                }
1000                _ => {
1001                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1002                }
1003            }
1004            if !self.consume(&Token::Comma)? {
1005                break;
1006            }
1007        }
1008        Ok(parts)
1009    }
1010
1011    /// Parse a literal string value for DDL DEFAULT expressions
1012    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1013        match self.peek().clone() {
1014            Token::String(s) => {
1015                let s = s.clone();
1016                self.advance()?;
1017                Ok(s)
1018            }
1019            Token::Integer(n) => {
1020                self.advance()?;
1021                Ok(n.to_string())
1022            }
1023            Token::Float(n) => {
1024                self.advance()?;
1025                Ok(n.to_string())
1026            }
1027            Token::True => {
1028                self.advance()?;
1029                Ok("true".to_string())
1030            }
1031            Token::False => {
1032                self.advance()?;
1033                Ok("false".to_string())
1034            }
1035            Token::Null => {
1036                self.advance()?;
1037                Ok("null".to_string())
1038            }
1039            ref other => Err(ParseError::expected(
1040                vec!["string", "number", "true", "false", "null"],
1041                other,
1042                self.position(),
1043            )),
1044        }
1045    }
1046
1047    fn check_ttl_keyword(&self) -> bool {
1048        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1049    }
1050
1051    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
1052    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
1053    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1054        self.expect(Token::Eq)?;
1055        match self.peek() {
1056            Token::True => {
1057                self.advance()?;
1058                Ok(true)
1059            }
1060            Token::False => {
1061                self.advance()?;
1062                Ok(false)
1063            }
1064            other => Err(ParseError::expected(
1065                vec!["true", "false"],
1066                other,
1067                self.position(),
1068            )),
1069        }
1070    }
1071
1072    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1073        if self.consume_ident_ci(expected)? {
1074            Ok(())
1075        } else {
1076            Err(ParseError::expected(
1077                vec![expected],
1078                self.peek(),
1079                self.position(),
1080            ))
1081        }
1082    }
1083
1084    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1085        let option_name = self.expect_ident_or_keyword()?;
1086        if !option_name.eq_ignore_ascii_case("ttl") {
1087            return Err(ParseError::new(
1088                // F-05: `option_name` is caller-controlled identifier text.
1089                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
1090                // before the message reaches downstream serialization sinks.
1091                format!(
1092                    "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1093                ),
1094                self.position(),
1095            ));
1096        }
1097
1098        let ttl_value = self.parse_float()?;
1099        let ttl_unit = match self.peek() {
1100            Token::Ident(unit) => {
1101                let unit = unit.clone();
1102                self.advance()?;
1103                unit
1104            }
1105            _ => "s".to_string(),
1106        };
1107
1108        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1109            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1110            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1111            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1112            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1113            "d" | "day" | "days" => 86_400_000.0,
1114            other => {
1115                return Err(ParseError::new(
1116                    // F-05: render `other` via `{:?}` so caller-controlled
1117                    // bytes (CR / LF / NUL / quotes) are escaped before
1118                    // reaching downstream serialization sinks.
1119                    format!(
1120                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1121                    ),
1122                    self.position(),
1123                ));
1124            }
1125        };
1126
1127        if !ttl_value.is_finite() || ttl_value < 0.0 {
1128            return Err(ParseError::new(
1129                "TTL must be a finite, non-negative duration".to_string(),
1130                self.position(),
1131            ));
1132        }
1133
1134        let ttl_ms = ttl_value * multiplier_ms;
1135        if ttl_ms > u64::MAX as f64 {
1136            return Err(ParseError::new(
1137                "TTL duration is too large".to_string(),
1138                self.position(),
1139            ));
1140        }
1141        if ttl_ms.fract().abs() >= f64::EPSILON {
1142            return Err(ParseError::new(
1143                "TTL duration must resolve to a whole number of milliseconds".to_string(),
1144                self.position(),
1145            ));
1146        }
1147
1148        Ok(Some(ttl_ms as u64))
1149    }
1150
1151    /// Try to match IF NOT EXISTS sequence
1152    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1153        if self.check(&Token::If) {
1154            self.advance()?;
1155            self.expect(Token::Not)?;
1156            self.expect(Token::Exists)?;
1157            Ok(true)
1158        } else {
1159            Ok(false)
1160        }
1161    }
1162
1163    /// Try to match IF EXISTS sequence
1164    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1165        if self.check(&Token::If) {
1166            self.advance()?;
1167            self.expect(Token::Exists)?;
1168            Ok(true)
1169        } else {
1170            Ok(false)
1171        }
1172    }
1173
1174    /// Try to match NOT NULL sequence
1175    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1176        if self.check(&Token::Not) {
1177            // Peek ahead - only consume if followed by NULL
1178            // We need to be careful: save state and try
1179            self.advance()?; // consume NOT
1180            if self.check(&Token::Null) {
1181                self.advance()?; // consume NULL
1182                Ok(true)
1183            } else {
1184                // This is tricky - NOT was consumed but next isn't NULL.
1185                // In column modifier context, NOT should only appear before NULL.
1186                // Return error for clarity.
1187                Err(ParseError::expected(
1188                    vec!["NULL (after NOT)"],
1189                    self.peek(),
1190                    self.position(),
1191                ))
1192            }
1193        } else {
1194            Ok(false)
1195        }
1196    }
1197
1198    /// Try to match PRIMARY KEY sequence
1199    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1200        if self.check(&Token::Primary) {
1201            self.advance()?;
1202            self.expect(Token::Key)?;
1203            Ok(true)
1204        } else {
1205            Ok(false)
1206        }
1207    }
1208}
1209
1210/// Decode a 64-char lowercase/uppercase hex string into a 32-byte array.
1211/// Returns a human-readable error message on length or character violations.
1212/// Used by `SIGNED_BY` clause parsing (issue #520) to surface bad pubkeys
1213/// at parse-time rather than downstream in the engine.
1214fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1215    if s.len() != 64 {
1216        return Err(format!("expected 64 hex chars, got {}", s.len()));
1217    }
1218    let mut out = [0u8; 32];
1219    let bytes = s.as_bytes();
1220    for i in 0..32 {
1221        let hi = hex_nibble(bytes[i * 2])?;
1222        let lo = hex_nibble(bytes[i * 2 + 1])?;
1223        out[i] = (hi << 4) | lo;
1224    }
1225    Ok(out)
1226}
1227
1228fn hex_nibble(c: u8) -> Result<u8, String> {
1229    match c {
1230        b'0'..=b'9' => Ok(c - b'0'),
1231        b'a'..=b'f' => Ok(c - b'a' + 10),
1232        b'A'..=b'F' => Ok(c - b'A' + 10),
1233        _ => Err(format!("non-hex char: {:?}", c as char)),
1234    }
1235}