Skip to main content

reddb_server/storage/query/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::super::ast::{
4    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
5    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
6    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
7    PartitionSpec, QueryExpr, TruncateQuery,
8};
9use super::super::lexer::Token;
10use super::error::ParseError;
11use super::Parser;
12use crate::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use crate::storage::schema::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            analytics_config: Vec::new(),
87            vault_own_master_key: false,
88        }))
89    }
90
91    /// Parse: DROP TABLE [IF EXISTS] name
92    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
93        self.expect(Token::Drop)?;
94        self.expect(Token::Table)?;
95        self.parse_drop_table_body()
96    }
97
98    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
99    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
100        let if_not_exists = self.match_if_not_exists()?;
101        let name = self.expect_ident()?;
102
103        self.expect(Token::LParen)?;
104        let mut columns = Vec::new();
105        loop {
106            let col = self.parse_column_def()?;
107            columns.push(col);
108            if !self.consume(&Token::Comma)? {
109                break;
110            }
111        }
112        self.expect(Token::RParen)?;
113
114        let mut default_ttl_ms = None;
115        let mut context_index_fields = Vec::new();
116        let mut context_index_enabled = false;
117        let mut timestamps = false;
118        let mut tenant_by: Option<String> = None;
119        let mut append_only = false;
120        let mut subscriptions = Vec::new();
121
122        while self.consume(&Token::With)? {
123            if self.consume_ident_ci("EVENTS")? {
124                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
125                continue;
126            }
127            // Accept both spellings:
128            //   WITH key = value
129            //   WITH (key = value, key = value)
130            // Postgres / ClickHouse use the parenthesised form; the
131            // bare form is our legacy shorthand. The parenthesised
132            // form collects options separated by commas until `)`.
133            let has_parens = self.consume(&Token::LParen)?;
134
135            loop {
136                if self.consume_ident_ci("CONTEXT_INDEX")? {
137                    context_index_enabled = self.parse_bool_assign()?;
138                } else if self.consume_ident_ci("CONTEXT")? {
139                    if !self.consume(&Token::Index)? {
140                        return Err(ParseError::expected(
141                            vec!["INDEX"],
142                            self.peek(),
143                            self.position(),
144                        ));
145                    }
146                    self.expect(Token::On)?;
147                    self.expect(Token::LParen)?;
148                    loop {
149                        context_index_fields.push(self.expect_ident()?);
150                        if !self.consume(&Token::Comma)? {
151                            break;
152                        }
153                    }
154                    self.expect(Token::RParen)?;
155                    context_index_enabled = true;
156                } else if self.consume_ident_ci("TIMESTAMPS")? {
157                    timestamps = self.parse_bool_assign()?;
158                } else if self.consume_ident_ci("APPEND_ONLY")? {
159                    append_only = self.parse_bool_assign()?;
160                } else if self.consume_ident_ci("TENANT_BY")? {
161                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
162                    // and expects a string literal column name.
163                    let _ = self.consume(&Token::Eq)?;
164                    let value = self.parse_literal_value()?;
165                    match value {
166                        Value::Text(col) => tenant_by = Some(col.to_string()),
167                        other => {
168                            return Err(ParseError::new(
169                                format!("WITH tenant_by expects a text literal, got {other:?}"),
170                                self.position(),
171                            ));
172                        }
173                    }
174                } else {
175                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
176                }
177                if has_parens {
178                    if self.consume(&Token::Comma)? {
179                        continue;
180                    }
181                    self.expect(Token::RParen)?;
182                }
183                break;
184            }
185        }
186
187        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
188        let partition_by = if self.consume(&Token::Partition)? {
189            self.expect(Token::By)?;
190            let kind = if self.consume(&Token::Range)? {
191                PartitionKind::Range
192            } else if self.consume(&Token::List)? {
193                PartitionKind::List
194            } else if self.consume(&Token::Hash)? {
195                PartitionKind::Hash
196            } else {
197                return Err(ParseError::expected(
198                    vec!["RANGE", "LIST", "HASH"],
199                    self.peek(),
200                    self.position(),
201                ));
202            };
203            self.expect(Token::LParen)?;
204            let column = self.expect_ident()?;
205            self.expect(Token::RParen)?;
206            Some(PartitionSpec { kind, column })
207        } else {
208            None
209        };
210
211        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
212        // style). Accepted after partition spec / tenant spec / or on
213        // its own. `WITH (append_only = true)` is the other form and
214        // handled above.
215        if !append_only && self.consume_ident_ci("APPEND")? {
216            if !self.consume_ident_ci("ONLY")? {
217                return Err(ParseError::expected(
218                    vec!["ONLY"],
219                    self.peek(),
220                    self.position(),
221                ));
222            }
223            append_only = true;
224        }
225
226        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
227        // trailing clause (after partition spec if both are used).
228        //
229        // Dotted paths let non-table models declare tenancy over their
230        // natural nested structures — `metadata.tenant` for vectors,
231        // `payload.tenant` for queue messages, `tags.cluster` for
232        // timeseries, `properties.org` for graphs. The read-path
233        // resolver already navigates these paths via
234        // `resolve_runtime_document_path`; here we just store the
235        // dotted string and let the policy evaluator do the rest.
236        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
237            self.expect(Token::By)?;
238            self.expect(Token::LParen)?;
239            // Allow keyword-idents (`metadata`, `type`, `data`) as
240            // column names — SQL treats them as bare identifiers in
241            // this context.
242            let mut path = self.expect_ident_or_keyword()?;
243            while self.consume(&Token::Dot)? {
244                let next = self.expect_ident_or_keyword()?;
245                path = format!("{path}.{next}");
246            }
247            self.expect(Token::RParen)?;
248            tenant_by = Some(path);
249        }
250
251        Ok(QueryExpr::CreateTable(CreateTableQuery {
252            collection_model: CollectionModel::Table,
253            name,
254            columns,
255            if_not_exists,
256            default_ttl_ms,
257            metrics_rollup_policies: Vec::new(),
258            context_index_fields,
259            context_index_enabled,
260            timestamps,
261            partition_by,
262            tenant_by,
263            append_only,
264            subscriptions,
265            analytics_config: Vec::new(),
266            vault_own_master_key: false,
267        }))
268    }
269
270    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
271    ///
272    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
273    /// difference between the table's current contract and the target CREATE
274    /// TABLE body.
275    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
276        self.expect(Token::Explain)?;
277        self.expect(Token::Alter)?;
278        self.expect(Token::For)?;
279        self.expect(Token::Create)?;
280        self.expect(Token::Table)?;
281
282        let body = self.parse_create_table_body()?;
283        let target = match body {
284            QueryExpr::CreateTable(t) => t,
285            _ => {
286                return Err(ParseError::new(
287                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
288                        .to_string(),
289                    self.position(),
290                ));
291            }
292        };
293
294        let format = if self.consume(&Token::Format)? {
295            if self.consume(&Token::Json)? {
296                ExplainFormat::Json
297            } else if self.consume_ident_ci("SQL")? {
298                ExplainFormat::Sql
299            } else {
300                return Err(ParseError::expected(
301                    vec!["JSON", "SQL"],
302                    self.peek(),
303                    self.position(),
304                ));
305            }
306        } else {
307            ExplainFormat::Sql
308        };
309
310        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
311            target,
312            format,
313        }))
314    }
315
316    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
317    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
318        let if_exists = self.match_if_exists()?;
319        let name = self.parse_drop_collection_name()?;
320        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
321    }
322
323    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
324        let if_exists = self.match_if_exists()?;
325        let name = self.parse_drop_collection_name()?;
326        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
327    }
328
329    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
330        let if_exists = self.match_if_exists()?;
331        let name = self.parse_drop_collection_name()?;
332        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
333    }
334
335    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
336        let if_exists = self.match_if_exists()?;
337        let name = self.parse_drop_collection_name()?;
338        Ok(QueryExpr::DropDocument(DropDocumentQuery {
339            name,
340            if_exists,
341        }))
342    }
343
344    pub fn parse_create_keyed_body(
345        &mut self,
346        model: CollectionModel,
347    ) -> Result<QueryExpr, ParseError> {
348        let if_not_exists = self.match_if_not_exists()?;
349        let name = self.parse_drop_collection_name()?;
350        let vault_own_master_key =
351            if model == CollectionModel::Vault && self.consume(&Token::With)? {
352                if !self.consume_ident_ci("OWN")? {
353                    return Err(ParseError::expected(
354                        vec!["OWN"],
355                        self.peek(),
356                        self.position(),
357                    ));
358                }
359                if !self.consume_ident_ci("MASTER")? {
360                    return Err(ParseError::expected(
361                        vec!["MASTER"],
362                        self.peek(),
363                        self.position(),
364                    ));
365                }
366                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
367                    return Err(ParseError::expected(
368                        vec!["KEY"],
369                        self.peek(),
370                        self.position(),
371                    ));
372                }
373                true
374            } else {
375                false
376            };
377        // `CREATE GRAPH <name> WITH ANALYTICS (...)` — the analytics opt-in
378        // is graph-only (issue #800). Other keyed models reject the clause so
379        // a misplaced `WITH ANALYTICS` fails loudly instead of being ignored.
380        let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
381            if !self.consume_ident_ci("ANALYTICS")? {
382                return Err(ParseError::expected(
383                    vec!["ANALYTICS"],
384                    self.peek(),
385                    self.position(),
386                ));
387            }
388            self.parse_analytics_clause()?
389        } else {
390            Vec::new()
391        };
392        Ok(QueryExpr::CreateTable(CreateTableQuery {
393            collection_model: model,
394            name,
395            columns: Vec::new(),
396            if_not_exists,
397            default_ttl_ms: None,
398            metrics_rollup_policies: Vec::new(),
399            context_index_fields: Vec::new(),
400            context_index_enabled: false,
401            timestamps: false,
402            partition_by: None,
403            tenant_by: None,
404            append_only: false,
405            subscriptions: Vec::new(),
406            analytics_config,
407            vault_own_master_key,
408        }))
409    }
410
411    /// Parse the `( <output> [ ( <key> = <value> [, ...] ) ] [, ...] )` body of
412    /// a `WITH ANALYTICS` clause (issue #800). Recognised outputs are
413    /// `communities`, `components`, `centrality`; recognised options are
414    /// `using`, `resolution`, `max_iterations`, `tolerance`. Unknown output
415    /// names and option keys are rejected with a clear, structured error.
416    fn parse_analytics_clause(
417        &mut self,
418    ) -> Result<Vec<crate::catalog::AnalyticsViewDescriptor>, ParseError> {
419        use crate::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
420
421        self.expect(Token::LParen)?;
422        let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
423        loop {
424            let output_name = self.parse_analytics_output_name()?;
425            let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
426                ParseError::new(
427                    format!(
428                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
429                    ),
430                    self.position(),
431                )
432            })?;
433            if views.iter().any(|view| view.output == output) {
434                return Err(ParseError::new(
435                    format!("duplicate analytics output '{output_name}'"),
436                    self.position(),
437                ));
438            }
439            let mut view = AnalyticsViewDescriptor {
440                output,
441                algorithm: None,
442                resolution: None,
443                max_iterations: None,
444                tolerance: None,
445            };
446            if self.consume(&Token::LParen)? {
447                loop {
448                    let key = self.parse_analytics_option_key()?;
449                    self.expect(Token::Eq)?;
450                    match key.as_str() {
451                        "using" => {
452                            view.algorithm =
453                                Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
454                        }
455                        "resolution" => view.resolution = Some(self.parse_float()?),
456                        "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
457                        "tolerance" => view.tolerance = Some(self.parse_float()?),
458                        other => {
459                            return Err(ParseError::new(
460                                format!(
461                                    "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
462                                ),
463                                self.position(),
464                            ))
465                        }
466                    }
467                    if !self.consume(&Token::Comma)? {
468                        break;
469                    }
470                }
471                self.expect(Token::RParen)?;
472            }
473            views.push(view);
474            if !self.consume(&Token::Comma)? {
475                break;
476            }
477        }
478        self.expect(Token::RParen)?;
479        if views.is_empty() {
480            return Err(ParseError::new(
481                "WITH ANALYTICS requires at least one output".to_string(),
482                self.position(),
483            ));
484        }
485        Ok(views)
486    }
487
488    /// Read one analytics output name, normalising the keyword-lexed outputs
489    /// (`components`, `centrality`) back to their lowercase spelling so they
490    /// compare uniformly with the ident-lexed `communities`.
491    fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
492        match self.peek() {
493            Token::Components => {
494                self.advance()?;
495                Ok("components".to_string())
496            }
497            Token::Centrality => {
498                self.advance()?;
499                Ok("centrality".to_string())
500            }
501            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
502        }
503    }
504
505    /// Read one analytics option key, normalising the keyword-lexed keys
506    /// (`using`, `max_iterations`) back to their lowercase spelling.
507    fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
508        match self.peek() {
509            Token::Using => {
510                self.advance()?;
511                Ok("using".to_string())
512            }
513            Token::MaxIterations => {
514                self.advance()?;
515                Ok("max_iterations".to_string())
516            }
517            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
518        }
519    }
520
521    pub fn parse_create_collection_model_body(
522        &mut self,
523        model: CollectionModel,
524    ) -> Result<QueryExpr, ParseError> {
525        self.parse_create_keyed_body(model)
526    }
527
528    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
529        let if_not_exists = self.match_if_not_exists()?;
530        let name = self.parse_drop_collection_name()?;
531        if !self.consume_ident_ci("KIND")? {
532            return Err(ParseError::expected(
533                vec!["KIND"],
534                self.peek(),
535                self.position(),
536            ));
537        }
538        let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
539        while self.consume(&Token::Dot)? {
540            let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
541            kind.push('.');
542            kind.push_str(&part);
543        }
544        let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
545            if !self.consume_ident_ci("DIM")? {
546                return Err(ParseError::expected(
547                    vec!["DIM"],
548                    self.peek(),
549                    self.position(),
550                ));
551            }
552            let dimension = self.parse_integer()?;
553            if dimension <= 0 {
554                return Err(ParseError::new(
555                    "VECTOR DIM must be a positive integer".to_string(),
556                    self.position(),
557                ));
558            }
559            let metric = if self.consume(&Token::Metric)? {
560                self.parse_distance_metric()?
561            } else {
562                crate::storage::engine::distance::DistanceMetric::Cosine
563            };
564            (Some(dimension as usize), Some(metric))
565        } else {
566            (None, None)
567        };
568        let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
569            self.parse_signed_by_list()?
570        } else {
571            Vec::new()
572        };
573        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
574            name,
575            kind,
576            if_not_exists,
577            vector_dimension,
578            vector_metric,
579            allowed_signers,
580        }))
581    }
582
583    /// Parse a single `'hex32'` string literal as a 32-byte Ed25519
584    /// pubkey. Used by `ALTER COLLECTION ... ADD|REVOKE SIGNER 'hex'`
585    /// (issue #522).
586    fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
587        let hex = match self.peek().clone() {
588            Token::String(s) => {
589                self.advance()?;
590                s
591            }
592            _ => {
593                return Err(ParseError::expected(
594                    vec!["string literal (ed25519 pubkey hex)"],
595                    self.peek(),
596                    self.position(),
597                ));
598            }
599        };
600        decode_hex_32(&hex).map_err(|msg| {
601            ParseError::new(
602                format!("SIGNER pubkey '{hex}' invalid: {msg}"),
603                self.position(),
604            )
605        })
606    }
607
608    /// Parse `( 'hex32', 'hex32', ... )` — Ed25519 pubkey list. Each entry
609    /// must decode to exactly 32 bytes. Used by both `CREATE COLLECTION ...
610    /// SIGNED_BY (...)` and (in a later iteration) `ALTER COLLECTION` signer
611    /// mutations. Issue #520.
612    fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
613        self.expect(Token::LParen)?;
614        let mut out = Vec::new();
615        loop {
616            let hex = match self.peek().clone() {
617                Token::String(s) => {
618                    self.advance()?;
619                    s
620                }
621                _ => {
622                    return Err(ParseError::expected(
623                        vec!["string literal (ed25519 pubkey hex)"],
624                        self.peek(),
625                        self.position(),
626                    ));
627                }
628            };
629            let bytes = decode_hex_32(&hex).map_err(|msg| {
630                ParseError::new(
631                    format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
632                    self.position(),
633                )
634            })?;
635            out.push(bytes);
636            if !self.consume(&Token::Comma)? {
637                break;
638            }
639        }
640        self.expect(Token::RParen)?;
641        if out.is_empty() {
642            return Err(ParseError::new(
643                "SIGNED_BY list must contain at least one pubkey".to_string(),
644                self.position(),
645            ));
646        }
647        Ok(out)
648    }
649
650    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
651        let if_not_exists = self.match_if_not_exists()?;
652        let name = self.parse_drop_collection_name()?;
653        if !self.consume_ident_ci("DIM")? {
654            return Err(ParseError::expected(
655                vec!["DIM"],
656                self.peek(),
657                self.position(),
658            ));
659        }
660        let dimension = self.parse_integer()?;
661        if dimension <= 0 {
662            return Err(ParseError::new(
663                "VECTOR DIM must be a positive integer".to_string(),
664                self.position(),
665            ));
666        }
667        let metric = if self.consume(&Token::Metric)? {
668            self.parse_distance_metric()?
669        } else {
670            crate::storage::engine::distance::DistanceMetric::Cosine
671        };
672        Ok(QueryExpr::CreateVector(CreateVectorQuery {
673            name,
674            dimension: dimension as usize,
675            metric,
676            if_not_exists,
677        }))
678    }
679
680    pub fn parse_drop_keyed_body(
681        &mut self,
682        model: CollectionModel,
683    ) -> Result<QueryExpr, ParseError> {
684        let if_exists = self.match_if_exists()?;
685        let name = self.parse_drop_collection_name()?;
686        Ok(QueryExpr::DropKv(DropKvQuery {
687            name,
688            if_exists,
689            model,
690        }))
691    }
692
693    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
694        self.parse_drop_keyed_body(CollectionModel::Kv)
695    }
696
697    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
698        self.parse_drop_collection_model_body(None)
699    }
700
701    pub fn parse_drop_collection_model_body(
702        &mut self,
703        model: Option<CollectionModel>,
704    ) -> Result<QueryExpr, ParseError> {
705        let if_exists = self.match_if_exists()?;
706        let name = self.parse_drop_collection_name()?;
707        Ok(QueryExpr::DropCollection(DropCollectionQuery {
708            name,
709            if_exists,
710            model,
711        }))
712    }
713
714    pub fn parse_truncate_body(
715        &mut self,
716        model: Option<CollectionModel>,
717    ) -> Result<QueryExpr, ParseError> {
718        let if_exists = self.match_if_exists()?;
719        let name = self.parse_drop_collection_name()?;
720        Ok(QueryExpr::Truncate(TruncateQuery {
721            name,
722            model,
723            if_exists,
724        }))
725    }
726
727    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
728        let mut name = self.expect_ident()?;
729        while self.consume(&Token::Dot)? {
730            if self.consume(&Token::Star)? {
731                name.push_str(".*");
732                break;
733            }
734            let next = self.expect_ident_or_keyword()?;
735            name = format!("{name}.{next}");
736        }
737        Ok(name)
738    }
739
740    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
741    ///
742    /// Also accepts `ALTER COLLECTION name ADD|REVOKE SIGNER 'hex'`
743    /// (issue #522) — collection-level signer registry mutations share
744    /// the AlterTable AST so the existing executor dispatch path picks
745    /// them up without a new top-level variant.
746    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
747        self.expect(Token::Alter)?;
748        if !self.consume(&Token::Table)?
749            && !self.consume(&Token::Collection)?
750            && !self.consume_ident_ci("COLLECTION")?
751        {
752            return Err(ParseError::expected(
753                vec!["TABLE", "COLLECTION"],
754                self.peek(),
755                self.position(),
756            ));
757        }
758        let name = self.expect_ident()?;
759
760        let mut operations = Vec::new();
761        loop {
762            let op = self.parse_alter_operation(&name)?;
763            operations.push(op);
764            if !self.consume(&Token::Comma)? {
765                break;
766            }
767        }
768
769        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
770    }
771
772    /// Parse: `ALTER GRAPH <name> ADD ANALYTICS ( <output> [, ...] )`
773    /// and `ALTER GRAPH <name> DROP ANALYTICS <output>` (issue #801).
774    ///
775    /// Lifecycle management of the `WITH ANALYTICS` configuration declared at
776    /// `CREATE GRAPH` time (#800), without recreating the collection. Shares
777    /// the `AlterTable` AST so the existing executor dispatch path picks the
778    /// mutations up; the executor validates the target is a graph and that the
779    /// dropped output is actually enabled.
780    pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
781        self.expect(Token::Alter)?;
782        self.expect(Token::Graph)?;
783        let name = self.expect_ident()?;
784
785        let mut operations = Vec::new();
786        loop {
787            operations.push(self.parse_alter_graph_operation()?);
788            if !self.consume(&Token::Comma)? {
789                break;
790            }
791        }
792
793        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
794    }
795
796    /// Parse a single `ALTER GRAPH` analytics operation: either
797    /// `ADD ANALYTICS ( ... )` or `DROP ANALYTICS <output>`.
798    fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
799        if self.consume(&Token::Add)? {
800            if !self.consume_ident_ci("ANALYTICS")? {
801                return Err(ParseError::expected(
802                    vec!["ANALYTICS"],
803                    self.peek(),
804                    self.position(),
805                ));
806            }
807            // Reuse the `WITH ANALYTICS (...)` body grammar verbatim so the
808            // ADD form accepts the exact same outputs and options as CREATE.
809            let views = self.parse_analytics_clause()?;
810            Ok(AlterOperation::AddAnalytics(views))
811        } else if self.consume(&Token::Drop)? {
812            if !self.consume_ident_ci("ANALYTICS")? {
813                return Err(ParseError::expected(
814                    vec!["ANALYTICS"],
815                    self.peek(),
816                    self.position(),
817                ));
818            }
819            let output_name = self.parse_analytics_output_name()?;
820            let output = crate::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
821                ParseError::new(
822                    format!(
823                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
824                    ),
825                    self.position(),
826                )
827            })?;
828            Ok(AlterOperation::DropAnalytics(output))
829        } else {
830            Err(ParseError::expected(
831                vec!["ADD", "DROP"],
832                self.peek(),
833                self.position(),
834            ))
835        }
836    }
837
838    /// Parse a single ALTER TABLE operation
839    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
840        if self.consume(&Token::Add)? {
841            if self.consume_ident_ci("SUBSCRIPTION")? {
842                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
843                let sub_name = self.expect_ident()?;
844                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
845                Ok(AlterOperation::AddSubscription {
846                    name: sub_name,
847                    descriptor,
848                })
849            } else if self.consume_ident_ci("SIGNER")? {
850                // ADD SIGNER 'hex_pubkey' — issue #522.
851                let pubkey = self.parse_single_signer_hex()?;
852                Ok(AlterOperation::AddSigner { pubkey })
853            } else {
854                // ADD COLUMN definition (COLUMN keyword is optional)
855                let _ = self.consume(&Token::Column)?;
856                let col_def = self.parse_column_def()?;
857                Ok(AlterOperation::AddColumn(col_def))
858            }
859        } else if self.consume_ident_ci("REVOKE")? {
860            // REVOKE SIGNER 'hex_pubkey' — issue #522.
861            if !self.consume_ident_ci("SIGNER")? {
862                return Err(ParseError::expected(
863                    vec!["SIGNER"],
864                    self.peek(),
865                    self.position(),
866                ));
867            }
868            let pubkey = self.parse_single_signer_hex()?;
869            Ok(AlterOperation::RevokeSigner { pubkey })
870        } else if self.consume(&Token::Drop)? {
871            if self.consume_ident_ci("SUBSCRIPTION")? {
872                // DROP SUBSCRIPTION name
873                let sub_name = self.expect_ident()?;
874                Ok(AlterOperation::DropSubscription { name: sub_name })
875            } else {
876                // DROP COLUMN name (COLUMN keyword is optional)
877                let _ = self.consume(&Token::Column)?;
878                let col_name = self.expect_ident()?;
879                Ok(AlterOperation::DropColumn(col_name))
880            }
881        } else if self.consume(&Token::Rename)? {
882            // RENAME COLUMN from TO to
883            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
884            let from = self.expect_ident()?;
885            self.expect(Token::To)?;
886            let to = self.expect_ident()?;
887            Ok(AlterOperation::RenameColumn { from, to })
888        } else if self.consume(&Token::Attach)? {
889            // ATTACH PARTITION child FOR VALUES ...
890            self.expect(Token::Partition)?;
891            let child = self.expect_ident()?;
892            self.expect(Token::For)?;
893            // Accept `VALUES` as an ident since the grammar doesn't have it
894            // as a reserved keyword everywhere. Collect the remaining tokens
895            // as a raw bound string for round-trip persistence.
896            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
897                return Err(ParseError::expected(
898                    vec!["VALUES"],
899                    self.peek(),
900                    self.position(),
901                ));
902            }
903            let bound = self.collect_remaining_tokens_as_string()?;
904            Ok(AlterOperation::AttachPartition { child, bound })
905        } else if self.consume(&Token::Detach)? {
906            // DETACH PARTITION child
907            self.expect(Token::Partition)?;
908            let child = self.expect_ident()?;
909            Ok(AlterOperation::DetachPartition { child })
910        } else if self.consume(&Token::Enable)? {
911            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
912            if self.consume_ident_ci("EVENTS")? {
913                Ok(AlterOperation::EnableEvents(
914                    self.parse_subscription_descriptor(table_name.to_string())?,
915                ))
916            } else if self.consume_ident_ci("TENANCY")? {
917                self.expect(Token::On)?;
918                self.expect(Token::LParen)?;
919                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
920                let mut path = self.expect_ident_or_keyword()?;
921                while self.consume(&Token::Dot)? {
922                    let next = self.expect_ident_or_keyword()?;
923                    path = format!("{path}.{next}");
924                }
925                self.expect(Token::RParen)?;
926                Ok(AlterOperation::EnableTenancy { column: path })
927            } else {
928                self.expect(Token::Row)?;
929                self.expect(Token::Level)?;
930                self.expect(Token::Security)?;
931                Ok(AlterOperation::EnableRowLevelSecurity)
932            }
933        } else if self.consume(&Token::Disable)? {
934            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
935            if self.consume_ident_ci("EVENTS")? {
936                Ok(AlterOperation::DisableEvents)
937            } else if self.consume_ident_ci("TENANCY")? {
938                Ok(AlterOperation::DisableTenancy)
939            } else {
940                self.expect(Token::Row)?;
941                self.expect(Token::Level)?;
942                self.expect(Token::Security)?;
943                Ok(AlterOperation::DisableRowLevelSecurity)
944            }
945        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
946            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
947            // SET RETENTION <duration> (issue #580)
948            if self.consume_ident_ci("APPEND_ONLY")? {
949                let on = self.parse_bool_assign()?;
950                Ok(AlterOperation::SetAppendOnly(on))
951            } else if self.consume_ident_ci("VERSIONED")? {
952                let on = self.parse_bool_assign()?;
953                Ok(AlterOperation::SetVersioned(on))
954            } else if self.consume(&Token::Retention)? {
955                // `SET RETENTION <duration>` — reuse the same float+unit
956                // grammar the timeseries CREATE clause uses so `7 DAYS`,
957                // `30 m`, `1 h`, `90 d` all parse identically.
958                let value = self.parse_float()?;
959                let unit = self.parse_duration_unit()?;
960                Ok(AlterOperation::SetRetention {
961                    duration_ms: (value * unit) as u64,
962                })
963            } else {
964                Err(ParseError::expected(
965                    vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
966                    self.peek(),
967                    self.position(),
968                ))
969            }
970        } else if self.consume_ident_ci("UNSET")? {
971            // `UNSET RETENTION` — clears the declarative retention policy.
972            if self.consume(&Token::Retention)? {
973                Ok(AlterOperation::UnsetRetention)
974            } else {
975                Err(ParseError::expected(
976                    vec!["RETENTION"],
977                    self.peek(),
978                    self.position(),
979                ))
980            }
981        } else {
982            Err(ParseError::expected(
983                vec![
984                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
985                    "UNSET",
986                ],
987                self.peek(),
988                self.position(),
989            ))
990        }
991    }
992
993    fn parse_subscription_descriptor(
994        &mut self,
995        source: String,
996    ) -> Result<SubscriptionDescriptor, ParseError> {
997        let mut ops_filter = Vec::new();
998        if self.consume(&Token::LParen)? {
999            loop {
1000                let op = if self.consume(&Token::Insert)? {
1001                    SubscriptionOperation::Insert
1002                } else if self.consume(&Token::Update)? {
1003                    SubscriptionOperation::Update
1004                } else if self.consume(&Token::Delete)? {
1005                    SubscriptionOperation::Delete
1006                } else {
1007                    return Err(ParseError::expected(
1008                        vec!["INSERT", "UPDATE", "DELETE"],
1009                        self.peek(),
1010                        self.position(),
1011                    ));
1012                };
1013                ops_filter.push(op);
1014                if !self.consume(&Token::Comma)? {
1015                    break;
1016                }
1017            }
1018            self.expect(Token::RParen)?;
1019        }
1020
1021        let target_queue = if self.consume(&Token::To)? {
1022            self.expect_ident()?
1023        } else {
1024            format!("{source}_events")
1025        };
1026
1027        let mut redact_fields = Vec::new();
1028        if self.consume_ident_ci("REDACT")? {
1029            self.expect(Token::LParen)?;
1030            loop {
1031                redact_fields.push(self.parse_dotted_redact_path()?);
1032                if !self.consume(&Token::Comma)? {
1033                    break;
1034                }
1035            }
1036            self.expect(Token::RParen)?;
1037        }
1038
1039        let where_filter = if self.consume(&Token::Where)? {
1040            Some(self.collect_subscription_where_filter()?)
1041        } else {
1042            None
1043        };
1044
1045        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
1046        let all_tenants = if self.consume(&Token::On)? {
1047            self.expect(Token::All)?;
1048            if !self.consume_ident_ci("TENANTS")? {
1049                return Err(ParseError::expected(
1050                    vec!["TENANTS"],
1051                    self.peek(),
1052                    self.position(),
1053                ));
1054            }
1055            true
1056        } else {
1057            false
1058        };
1059
1060        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
1061        if self.consume_ident_ci("REQUIRES")? {
1062            self.consume_ident_ci("CAPABILITY")?;
1063            // consume the capability string literal token
1064            self.advance()?;
1065        }
1066
1067        Ok(SubscriptionDescriptor {
1068            name: String::new(),
1069            source,
1070            target_queue,
1071            ops_filter,
1072            where_filter,
1073            redact_fields,
1074            enabled: true,
1075            all_tenants,
1076        })
1077    }
1078
1079    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
1080    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1081        let mut parts = Vec::new();
1082        if self.consume(&Token::Star)? {
1083            parts.push("*".to_string());
1084        } else {
1085            parts.push(self.expect_ident_or_keyword()?);
1086        }
1087        while self.consume(&Token::Dot)? {
1088            if self.consume(&Token::Star)? {
1089                parts.push("*".to_string());
1090            } else {
1091                parts.push(self.expect_ident_or_keyword()?);
1092            }
1093        }
1094        Ok(parts.join("."))
1095    }
1096
1097    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1098        let mut parts = Vec::new();
1099        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1100            parts.push(self.peek().to_string());
1101            self.advance()?;
1102        }
1103        if parts.is_empty() {
1104            return Err(ParseError::expected(
1105                vec!["predicate"],
1106                self.peek(),
1107                self.position(),
1108            ));
1109        }
1110        Ok(parts.join(" "))
1111    }
1112
1113    /// Capture remaining tokens as a display-joined string.
1114    ///
1115    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
1116    /// bound clause into storage without needing a dedicated per-kind AST.
1117    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1118        let mut parts: Vec<String> = Vec::new();
1119        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1120            parts.push(self.peek().to_string());
1121            self.advance()?;
1122        }
1123        Ok(parts.join(" "))
1124    }
1125
1126    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
1127    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1128        let name = self.expect_column_ident()?;
1129        let sql_type = self.parse_column_type()?;
1130        let data_type = sql_type.to_string();
1131
1132        let mut def = CreateColumnDef {
1133            name,
1134            data_type,
1135            sql_type: sql_type.clone(),
1136            not_null: false,
1137            default: None,
1138            compress: None,
1139            unique: false,
1140            primary_key: false,
1141            enum_variants: sql_type.enum_variants().unwrap_or_default(),
1142            array_element: sql_type.array_element_type(),
1143            decimal_precision: sql_type.decimal_precision(),
1144        };
1145
1146        // Parse modifiers in any order
1147        loop {
1148            if self.match_not_null()? {
1149                def.not_null = true;
1150            } else if self.consume(&Token::Default)? {
1151                self.expect(Token::Eq)?;
1152                def.default = Some(self.parse_literal_string_for_ddl()?);
1153            } else if self.consume(&Token::Compress)? {
1154                self.expect(Token::Colon)?;
1155                def.compress = Some(self.parse_integer()? as u8);
1156            } else if self.consume(&Token::Unique)? {
1157                def.unique = true;
1158            } else if self.match_primary_key()? {
1159                def.primary_key = true;
1160            } else {
1161                break;
1162            }
1163        }
1164
1165        Ok(def)
1166    }
1167
1168    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
1169    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1170        let type_name = self.expect_ident_or_keyword()?;
1171        if self.consume(&Token::LParen)? {
1172            let inner = self.parse_type_params()?;
1173            self.expect(Token::RParen)?;
1174            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1175        } else {
1176            Ok(SqlTypeName::new(type_name))
1177        }
1178    }
1179
1180    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
1181    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1182        let mut parts = Vec::new();
1183        loop {
1184            match self.peek().clone() {
1185                Token::String(s) => {
1186                    let s = s.clone();
1187                    self.advance()?;
1188                    parts.push(TypeModifier::StringLiteral(s));
1189                }
1190                Token::Integer(n) => {
1191                    self.advance()?;
1192                    parts.push(TypeModifier::Number(n as u32));
1193                }
1194                _ => {
1195                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1196                }
1197            }
1198            if !self.consume(&Token::Comma)? {
1199                break;
1200            }
1201        }
1202        Ok(parts)
1203    }
1204
1205    /// Parse a literal string value for DDL DEFAULT expressions
1206    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1207        match self.peek().clone() {
1208            Token::String(s) => {
1209                let s = s.clone();
1210                self.advance()?;
1211                Ok(s)
1212            }
1213            Token::Integer(n) => {
1214                self.advance()?;
1215                Ok(n.to_string())
1216            }
1217            Token::Float(n) => {
1218                self.advance()?;
1219                Ok(n.to_string())
1220            }
1221            Token::True => {
1222                self.advance()?;
1223                Ok("true".to_string())
1224            }
1225            Token::False => {
1226                self.advance()?;
1227                Ok("false".to_string())
1228            }
1229            Token::Null => {
1230                self.advance()?;
1231                Ok("null".to_string())
1232            }
1233            ref other => Err(ParseError::expected(
1234                vec!["string", "number", "true", "false", "null"],
1235                other,
1236                self.position(),
1237            )),
1238        }
1239    }
1240
1241    fn check_ttl_keyword(&self) -> bool {
1242        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1243    }
1244
1245    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
1246    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
1247    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1248        self.expect(Token::Eq)?;
1249        match self.peek() {
1250            Token::True => {
1251                self.advance()?;
1252                Ok(true)
1253            }
1254            Token::False => {
1255                self.advance()?;
1256                Ok(false)
1257            }
1258            other => Err(ParseError::expected(
1259                vec!["true", "false"],
1260                other,
1261                self.position(),
1262            )),
1263        }
1264    }
1265
1266    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1267        if self.consume_ident_ci(expected)? {
1268            Ok(())
1269        } else {
1270            Err(ParseError::expected(
1271                vec![expected],
1272                self.peek(),
1273                self.position(),
1274            ))
1275        }
1276    }
1277
1278    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1279        let option_name = self.expect_ident_or_keyword()?;
1280        if !option_name.eq_ignore_ascii_case("ttl") {
1281            return Err(ParseError::new(
1282                // F-05: `option_name` is caller-controlled identifier text.
1283                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
1284                // before the message reaches downstream serialization sinks.
1285                format!(
1286                    "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1287                ),
1288                self.position(),
1289            ));
1290        }
1291
1292        let ttl_value = self.parse_float()?;
1293        let ttl_unit = match self.peek() {
1294            Token::Ident(unit) => {
1295                let unit = unit.clone();
1296                self.advance()?;
1297                unit
1298            }
1299            _ => "s".to_string(),
1300        };
1301
1302        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1303            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1304            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1305            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1306            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1307            "d" | "day" | "days" => 86_400_000.0,
1308            other => {
1309                return Err(ParseError::new(
1310                    // F-05: render `other` via `{:?}` so caller-controlled
1311                    // bytes (CR / LF / NUL / quotes) are escaped before
1312                    // reaching downstream serialization sinks.
1313                    format!(
1314                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1315                    ),
1316                    self.position(),
1317                ));
1318            }
1319        };
1320
1321        if !ttl_value.is_finite() || ttl_value < 0.0 {
1322            return Err(ParseError::new(
1323                "TTL must be a finite, non-negative duration".to_string(),
1324                self.position(),
1325            ));
1326        }
1327
1328        let ttl_ms = ttl_value * multiplier_ms;
1329        if ttl_ms > u64::MAX as f64 {
1330            return Err(ParseError::new(
1331                "TTL duration is too large".to_string(),
1332                self.position(),
1333            ));
1334        }
1335        if ttl_ms.fract().abs() >= f64::EPSILON {
1336            return Err(ParseError::new(
1337                "TTL duration must resolve to a whole number of milliseconds".to_string(),
1338                self.position(),
1339            ));
1340        }
1341
1342        Ok(Some(ttl_ms as u64))
1343    }
1344
1345    /// Try to match IF NOT EXISTS sequence
1346    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1347        if self.check(&Token::If) {
1348            self.advance()?;
1349            self.expect(Token::Not)?;
1350            self.expect(Token::Exists)?;
1351            Ok(true)
1352        } else {
1353            Ok(false)
1354        }
1355    }
1356
1357    /// Try to match IF EXISTS sequence
1358    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1359        if self.check(&Token::If) {
1360            self.advance()?;
1361            self.expect(Token::Exists)?;
1362            Ok(true)
1363        } else {
1364            Ok(false)
1365        }
1366    }
1367
1368    /// Try to match NOT NULL sequence
1369    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1370        if self.check(&Token::Not) {
1371            // Peek ahead - only consume if followed by NULL
1372            // We need to be careful: save state and try
1373            self.advance()?; // consume NOT
1374            if self.check(&Token::Null) {
1375                self.advance()?; // consume NULL
1376                Ok(true)
1377            } else {
1378                // This is tricky - NOT was consumed but next isn't NULL.
1379                // In column modifier context, NOT should only appear before NULL.
1380                // Return error for clarity.
1381                Err(ParseError::expected(
1382                    vec!["NULL (after NOT)"],
1383                    self.peek(),
1384                    self.position(),
1385                ))
1386            }
1387        } else {
1388            Ok(false)
1389        }
1390    }
1391
1392    /// Try to match PRIMARY KEY sequence
1393    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1394        if self.check(&Token::Primary) {
1395            self.advance()?;
1396            self.expect(Token::Key)?;
1397            Ok(true)
1398        } else {
1399            Ok(false)
1400        }
1401    }
1402}
1403
1404/// Decode a 64-char lowercase/uppercase hex string into a 32-byte array.
1405/// Returns a human-readable error message on length or character violations.
1406/// Used by `SIGNED_BY` clause parsing (issue #520) to surface bad pubkeys
1407/// at parse-time rather than downstream in the engine.
1408fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1409    if s.len() != 64 {
1410        return Err(format!("expected 64 hex chars, got {}", s.len()));
1411    }
1412    let mut out = [0u8; 32];
1413    let bytes = s.as_bytes();
1414    for i in 0..32 {
1415        let hi = hex_nibble(bytes[i * 2])?;
1416        let lo = hex_nibble(bytes[i * 2 + 1])?;
1417        out[i] = (hi << 4) | lo;
1418    }
1419    Ok(out)
1420}
1421
1422fn hex_nibble(c: u8) -> Result<u8, String> {
1423    match c {
1424        b'0'..=b'9' => Ok(c - b'0'),
1425        b'a'..=b'f' => Ok(c - b'a' + 10),
1426        b'A'..=b'F' => Ok(c - b'A' + 10),
1427        _ => Err(format!("non-hex char: {:?}", c as char)),
1428    }
1429}