Skip to main content

reddb_rql/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
7    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
8    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
9    PartitionSpec, QueryExpr, TruncateQuery,
10};
11use crate::lexer::Token;
12use reddb_types::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use reddb_types::types::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            analytics_config: Vec::new(),
87            vault_own_master_key: false,
88            ai_policy: None,
89        }))
90    }
91
92    /// Parse: DROP TABLE [IF EXISTS] name
93    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
94        self.expect(Token::Drop)?;
95        self.expect(Token::Table)?;
96        self.parse_drop_table_body()
97    }
98
99    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
100    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
101        let if_not_exists = self.match_if_not_exists()?;
102        let name = self.expect_ident()?;
103
104        self.expect(Token::LParen)?;
105        let mut columns = Vec::new();
106        loop {
107            let col = self.parse_column_def()?;
108            columns.push(col);
109            if !self.consume(&Token::Comma)? {
110                break;
111            }
112        }
113        self.expect(Token::RParen)?;
114
115        let mut default_ttl_ms = None;
116        let mut context_index_fields = Vec::new();
117        let mut context_index_enabled = false;
118        let mut timestamps = false;
119        let mut tenant_by: Option<String> = None;
120        let mut append_only = false;
121        let mut subscriptions = Vec::new();
122        let mut ai_policy = reddb_types::catalog::AiPolicy::default();
123
124        while self.consume(&Token::With)? {
125            if self.consume_ident_ci("EVENTS")? {
126                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
127                continue;
128            }
129            // Accept both spellings:
130            //   WITH key = value
131            //   WITH (key = value, key = value)
132            // Postgres / ClickHouse use the parenthesised form; the
133            // bare form is our legacy shorthand. The parenthesised
134            // form collects options separated by commas until `)`.
135            let has_parens = self.consume(&Token::LParen)?;
136
137            loop {
138                if self.consume_ident_ci("CONTEXT_INDEX")? {
139                    context_index_enabled = self.parse_bool_assign()?;
140                } else if self.consume_ident_ci("CONTEXT")? {
141                    if !self.consume(&Token::Index)? {
142                        return Err(ParseError::expected(
143                            vec!["INDEX"],
144                            self.peek(),
145                            self.position(),
146                        ));
147                    }
148                    self.expect(Token::On)?;
149                    self.expect(Token::LParen)?;
150                    loop {
151                        context_index_fields.push(self.expect_ident()?);
152                        if !self.consume(&Token::Comma)? {
153                            break;
154                        }
155                    }
156                    self.expect(Token::RParen)?;
157                    context_index_enabled = true;
158                } else if self.consume_ident_ci("TIMESTAMPS")? {
159                    timestamps = self.parse_bool_assign()?;
160                } else if self.consume_ident_ci("EMBED")? {
161                    if ai_policy.embed.is_some() {
162                        return Err(ParseError::new(
163                            "duplicate EMBED clause in AI policy".to_string(),
164                            self.position(),
165                        ));
166                    }
167                    ai_policy.embed = Some(self.parse_ai_embed_policy()?);
168                } else if self.consume_ident_ci("MODERATE")? {
169                    if ai_policy.moderate.is_some() {
170                        return Err(ParseError::new(
171                            "duplicate MODERATE clause in AI policy".to_string(),
172                            self.position(),
173                        ));
174                    }
175                    ai_policy.moderate = Some(self.parse_ai_moderate_policy()?);
176                } else if self.consume_ident_ci("VISION")? {
177                    if ai_policy.vision.is_some() {
178                        return Err(ParseError::new(
179                            "duplicate VISION clause in AI policy".to_string(),
180                            self.position(),
181                        ));
182                    }
183                    ai_policy.vision = Some(self.parse_ai_vision_policy()?);
184                } else if self.consume_ident_ci("APPEND_ONLY")? {
185                    append_only = self.parse_bool_assign()?;
186                } else if self.consume_ident_ci("TENANT_BY")? {
187                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
188                    // and expects a string literal column name.
189                    let _ = self.consume(&Token::Eq)?;
190                    let value = self.parse_literal_value()?;
191                    match value {
192                        Value::Text(col) => tenant_by = Some(col.to_string()),
193                        other => {
194                            return Err(ParseError::new(
195                                format!("WITH tenant_by expects a text literal, got {other:?}"),
196                                self.position(),
197                            ));
198                        }
199                    }
200                } else {
201                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
202                }
203                if has_parens {
204                    if self.consume(&Token::Comma)? {
205                        continue;
206                    }
207                    self.expect(Token::RParen)?;
208                }
209                break;
210            }
211        }
212
213        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
214        let partition_by = if self.consume(&Token::Partition)? {
215            self.expect(Token::By)?;
216            let kind = if self.consume(&Token::Range)? {
217                PartitionKind::Range
218            } else if self.consume(&Token::List)? {
219                PartitionKind::List
220            } else if self.consume(&Token::Hash)? {
221                PartitionKind::Hash
222            } else {
223                return Err(ParseError::expected(
224                    vec!["RANGE", "LIST", "HASH"],
225                    self.peek(),
226                    self.position(),
227                ));
228            };
229            self.expect(Token::LParen)?;
230            let column = self.expect_ident()?;
231            self.expect(Token::RParen)?;
232            Some(PartitionSpec { kind, column })
233        } else {
234            None
235        };
236
237        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
238        // style). Accepted after partition spec / tenant spec / or on
239        // its own. `WITH (append_only = true)` is the other form and
240        // handled above.
241        if !append_only && self.consume_ident_ci("APPEND")? {
242            if !self.consume_ident_ci("ONLY")? {
243                return Err(ParseError::expected(
244                    vec!["ONLY"],
245                    self.peek(),
246                    self.position(),
247                ));
248            }
249            append_only = true;
250        }
251
252        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
253        // trailing clause (after partition spec if both are used).
254        //
255        // Dotted paths let non-table models declare tenancy over their
256        // natural nested structures — `metadata.tenant` for vectors,
257        // `payload.tenant` for queue messages, `tags.cluster` for
258        // timeseries, `properties.org` for graphs. The read-path
259        // resolver already navigates these paths via
260        // `resolve_runtime_document_path`; here we just store the
261        // dotted string and let the policy evaluator do the rest.
262        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
263            self.expect(Token::By)?;
264            self.expect(Token::LParen)?;
265            // Allow keyword-idents (`metadata`, `type`, `data`) as
266            // column names — SQL treats them as bare identifiers in
267            // this context.
268            let mut path = self.expect_ident_or_keyword()?;
269            while self.consume(&Token::Dot)? {
270                let next = self.expect_ident_or_keyword()?;
271                path = format!("{path}.{next}");
272            }
273            self.expect(Token::RParen)?;
274            tenant_by = Some(path);
275        }
276
277        Ok(QueryExpr::CreateTable(CreateTableQuery {
278            collection_model: CollectionModel::Table,
279            name,
280            columns,
281            if_not_exists,
282            default_ttl_ms,
283            metrics_rollup_policies: Vec::new(),
284            context_index_fields,
285            context_index_enabled,
286            timestamps,
287            partition_by,
288            tenant_by,
289            append_only,
290            subscriptions,
291            analytics_config: Vec::new(),
292            vault_own_master_key: false,
293            ai_policy: (!ai_policy.is_empty()).then_some(ai_policy),
294        }))
295    }
296
297    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
298    ///
299    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
300    /// difference between the table's current contract and the target CREATE
301    /// TABLE body.
302    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
303        self.expect(Token::Explain)?;
304        self.expect(Token::Alter)?;
305        self.expect(Token::For)?;
306        self.expect(Token::Create)?;
307        self.expect(Token::Table)?;
308
309        let body = self.parse_create_table_body()?;
310        let target = match body {
311            QueryExpr::CreateTable(t) => t,
312            _ => {
313                return Err(ParseError::new(
314                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
315                        .to_string(),
316                    self.position(),
317                ));
318            }
319        };
320
321        let format = if self.consume(&Token::Format)? {
322            if self.consume(&Token::Json)? {
323                ExplainFormat::Json
324            } else if self.consume_ident_ci("SQL")? {
325                ExplainFormat::Sql
326            } else {
327                return Err(ParseError::expected(
328                    vec!["JSON", "SQL"],
329                    self.peek(),
330                    self.position(),
331                ));
332            }
333        } else {
334            ExplainFormat::Sql
335        };
336
337        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
338            target,
339            format,
340        }))
341    }
342
343    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
344    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
345        let if_exists = self.match_if_exists()?;
346        let name = self.parse_drop_collection_name()?;
347        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
348    }
349
350    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
351        let if_exists = self.match_if_exists()?;
352        let name = self.parse_drop_collection_name()?;
353        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
354    }
355
356    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
357        let if_exists = self.match_if_exists()?;
358        let name = self.parse_drop_collection_name()?;
359        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
360    }
361
362    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
363        let if_exists = self.match_if_exists()?;
364        let name = self.parse_drop_collection_name()?;
365        Ok(QueryExpr::DropDocument(DropDocumentQuery {
366            name,
367            if_exists,
368        }))
369    }
370
371    pub fn parse_create_keyed_body(
372        &mut self,
373        model: CollectionModel,
374    ) -> Result<QueryExpr, ParseError> {
375        let if_not_exists = self.match_if_not_exists()?;
376        let name = self.parse_drop_collection_name()?;
377        let vault_own_master_key =
378            if model == CollectionModel::Vault && self.consume(&Token::With)? {
379                if !self.consume_ident_ci("OWN")? {
380                    return Err(ParseError::expected(
381                        vec!["OWN"],
382                        self.peek(),
383                        self.position(),
384                    ));
385                }
386                if !self.consume_ident_ci("MASTER")? {
387                    return Err(ParseError::expected(
388                        vec!["MASTER"],
389                        self.peek(),
390                        self.position(),
391                    ));
392                }
393                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
394                    return Err(ParseError::expected(
395                        vec!["KEY"],
396                        self.peek(),
397                        self.position(),
398                    ));
399                }
400                true
401            } else {
402                false
403            };
404        // `CREATE GRAPH <name> WITH ANALYTICS (...)` — the analytics opt-in
405        // is graph-only (issue #800). Other keyed models reject the clause so
406        // a misplaced `WITH ANALYTICS` fails loudly instead of being ignored.
407        let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
408            if !self.consume_ident_ci("ANALYTICS")? {
409                return Err(ParseError::expected(
410                    vec!["ANALYTICS"],
411                    self.peek(),
412                    self.position(),
413                ));
414            }
415            self.parse_analytics_clause()?
416        } else {
417            Vec::new()
418        };
419        Ok(QueryExpr::CreateTable(CreateTableQuery {
420            collection_model: model,
421            name,
422            columns: Vec::new(),
423            if_not_exists,
424            default_ttl_ms: None,
425            metrics_rollup_policies: Vec::new(),
426            context_index_fields: Vec::new(),
427            context_index_enabled: false,
428            timestamps: false,
429            partition_by: None,
430            tenant_by: None,
431            append_only: false,
432            subscriptions: Vec::new(),
433            analytics_config,
434            vault_own_master_key,
435            ai_policy: None,
436        }))
437    }
438
439    /// Parse the `( <output> [ ( <key> = <value> [, ...] ) ] [, ...] )` body of
440    /// a `WITH ANALYTICS` clause (issue #800). Recognised outputs are
441    /// `communities`, `components`, `centrality`; recognised options are
442    /// `using`, `resolution`, `max_iterations`, `tolerance`. Unknown output
443    /// names and option keys are rejected with a clear, structured error.
444    fn parse_analytics_clause(
445        &mut self,
446    ) -> Result<Vec<reddb_types::catalog::AnalyticsViewDescriptor>, ParseError> {
447        use reddb_types::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
448
449        self.expect(Token::LParen)?;
450        let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
451        loop {
452            let output_name = self.parse_analytics_output_name()?;
453            let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
454                ParseError::new(
455                    format!(
456                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
457                    ),
458                    self.position(),
459                )
460            })?;
461            if views.iter().any(|view| view.output == output) {
462                return Err(ParseError::new(
463                    format!("duplicate analytics output '{output_name}'"),
464                    self.position(),
465                ));
466            }
467            let mut view = AnalyticsViewDescriptor {
468                output,
469                algorithm: None,
470                resolution: None,
471                max_iterations: None,
472                tolerance: None,
473            };
474            if self.consume(&Token::LParen)? {
475                loop {
476                    let key = self.parse_analytics_option_key()?;
477                    self.expect(Token::Eq)?;
478                    match key.as_str() {
479                        "using" => {
480                            view.algorithm =
481                                Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
482                        }
483                        "resolution" => view.resolution = Some(self.parse_float()?),
484                        "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
485                        "tolerance" => view.tolerance = Some(self.parse_float()?),
486                        other => {
487                            return Err(ParseError::new(
488                                format!(
489                                    "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
490                                ),
491                                self.position(),
492                            ))
493                        }
494                    }
495                    if !self.consume(&Token::Comma)? {
496                        break;
497                    }
498                }
499                self.expect(Token::RParen)?;
500            }
501            views.push(view);
502            if !self.consume(&Token::Comma)? {
503                break;
504            }
505        }
506        self.expect(Token::RParen)?;
507        if views.is_empty() {
508            return Err(ParseError::new(
509                "WITH ANALYTICS requires at least one output".to_string(),
510                self.position(),
511            ));
512        }
513        Ok(views)
514    }
515
516    /// Read one analytics output name, normalising the keyword-lexed outputs
517    /// (`components`, `centrality`) back to their lowercase spelling so they
518    /// compare uniformly with the ident-lexed `communities`.
519    fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
520        match self.peek() {
521            Token::Components => {
522                self.advance()?;
523                Ok("components".to_string())
524            }
525            Token::Centrality => {
526                self.advance()?;
527                Ok("centrality".to_string())
528            }
529            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
530        }
531    }
532
533    /// Read one analytics option key, normalising the keyword-lexed keys
534    /// (`using`, `max_iterations`) back to their lowercase spelling.
535    fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
536        match self.peek() {
537            Token::Using => {
538                self.advance()?;
539                Ok("using".to_string())
540            }
541            Token::MaxIterations => {
542                self.advance()?;
543                Ok("max_iterations".to_string())
544            }
545            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
546        }
547    }
548
549    pub fn parse_create_collection_model_body(
550        &mut self,
551        model: CollectionModel,
552    ) -> Result<QueryExpr, ParseError> {
553        self.parse_create_keyed_body(model)
554    }
555
556    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
557        let if_not_exists = self.match_if_not_exists()?;
558        let name = self.parse_drop_collection_name()?;
559        if !self.consume_ident_ci("KIND")? {
560            return Err(ParseError::expected(
561                vec!["KIND"],
562                self.peek(),
563                self.position(),
564            ));
565        }
566        let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
567        while self.consume(&Token::Dot)? {
568            let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
569            kind.push('.');
570            kind.push_str(&part);
571        }
572        let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
573            if !self.consume_ident_ci("DIM")? {
574                return Err(ParseError::expected(
575                    vec!["DIM"],
576                    self.peek(),
577                    self.position(),
578                ));
579            }
580            let dimension = self.parse_integer()?;
581            if dimension <= 0 {
582                return Err(ParseError::new(
583                    "VECTOR DIM must be a positive integer".to_string(),
584                    self.position(),
585                ));
586            }
587            let metric = if self.consume(&Token::Metric)? {
588                self.parse_distance_metric()?
589            } else {
590                reddb_types::distance::DistanceMetric::Cosine
591            };
592            (Some(dimension as usize), Some(metric))
593        } else {
594            (None, None)
595        };
596        let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
597            self.parse_signed_by_list()?
598        } else {
599            Vec::new()
600        };
601        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
602            name,
603            kind,
604            if_not_exists,
605            vector_dimension,
606            vector_metric,
607            allowed_signers,
608        }))
609    }
610
611    /// Parse a single `'hex32'` string literal as a 32-byte Ed25519
612    /// pubkey. Used by `ALTER COLLECTION ... ADD|REVOKE SIGNER 'hex'`
613    /// (issue #522).
614    fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
615        let hex = match self.peek().clone() {
616            Token::String(s) => {
617                self.advance()?;
618                s
619            }
620            _ => {
621                return Err(ParseError::expected(
622                    vec!["string literal (ed25519 pubkey hex)"],
623                    self.peek(),
624                    self.position(),
625                ));
626            }
627        };
628        decode_hex_32(&hex).map_err(|msg| {
629            ParseError::new(
630                format!("SIGNER pubkey '{hex}' invalid: {msg}"),
631                self.position(),
632            )
633        })
634    }
635
636    /// Parse `( 'hex32', 'hex32', ... )` — Ed25519 pubkey list. Each entry
637    /// must decode to exactly 32 bytes. Used by both `CREATE COLLECTION ...
638    /// SIGNED_BY (...)` and (in a later iteration) `ALTER COLLECTION` signer
639    /// mutations. Issue #520.
640    fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
641        self.expect(Token::LParen)?;
642        let mut out = Vec::new();
643        loop {
644            let hex = match self.peek().clone() {
645                Token::String(s) => {
646                    self.advance()?;
647                    s
648                }
649                _ => {
650                    return Err(ParseError::expected(
651                        vec!["string literal (ed25519 pubkey hex)"],
652                        self.peek(),
653                        self.position(),
654                    ));
655                }
656            };
657            let bytes = decode_hex_32(&hex).map_err(|msg| {
658                ParseError::new(
659                    format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
660                    self.position(),
661                )
662            })?;
663            out.push(bytes);
664            if !self.consume(&Token::Comma)? {
665                break;
666            }
667        }
668        self.expect(Token::RParen)?;
669        if out.is_empty() {
670            return Err(ParseError::new(
671                "SIGNED_BY list must contain at least one pubkey".to_string(),
672                self.position(),
673            ));
674        }
675        Ok(out)
676    }
677
678    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
679        let if_not_exists = self.match_if_not_exists()?;
680        let name = self.parse_drop_collection_name()?;
681        if !self.consume_ident_ci("DIM")? {
682            return Err(ParseError::expected(
683                vec!["DIM"],
684                self.peek(),
685                self.position(),
686            ));
687        }
688        let dimension = self.parse_integer()?;
689        if dimension <= 0 {
690            return Err(ParseError::new(
691                "VECTOR DIM must be a positive integer".to_string(),
692                self.position(),
693            ));
694        }
695        let metric = if self.consume(&Token::Metric)? {
696            self.parse_distance_metric()?
697        } else {
698            reddb_types::distance::DistanceMetric::Cosine
699        };
700        Ok(QueryExpr::CreateVector(CreateVectorQuery {
701            name,
702            dimension: dimension as usize,
703            metric,
704            if_not_exists,
705        }))
706    }
707
708    pub fn parse_drop_keyed_body(
709        &mut self,
710        model: CollectionModel,
711    ) -> Result<QueryExpr, ParseError> {
712        let if_exists = self.match_if_exists()?;
713        let name = self.parse_drop_collection_name()?;
714        Ok(QueryExpr::DropKv(DropKvQuery {
715            name,
716            if_exists,
717            model,
718        }))
719    }
720
721    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
722        self.parse_drop_keyed_body(CollectionModel::Kv)
723    }
724
725    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
726        self.parse_drop_collection_model_body(None)
727    }
728
729    pub fn parse_drop_collection_model_body(
730        &mut self,
731        model: Option<CollectionModel>,
732    ) -> Result<QueryExpr, ParseError> {
733        let if_exists = self.match_if_exists()?;
734        let name = self.parse_drop_collection_name()?;
735        Ok(QueryExpr::DropCollection(DropCollectionQuery {
736            name,
737            if_exists,
738            model,
739        }))
740    }
741
742    pub fn parse_truncate_body(
743        &mut self,
744        model: Option<CollectionModel>,
745    ) -> Result<QueryExpr, ParseError> {
746        let if_exists = self.match_if_exists()?;
747        let name = self.parse_drop_collection_name()?;
748        Ok(QueryExpr::Truncate(TruncateQuery {
749            name,
750            model,
751            if_exists,
752        }))
753    }
754
755    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
756        let mut name = self.expect_ident()?;
757        while self.consume(&Token::Dot)? {
758            if self.consume(&Token::Star)? {
759                name.push_str(".*");
760                break;
761            }
762            let next = self.expect_ident_or_keyword()?;
763            name = format!("{name}.{next}");
764        }
765        Ok(name)
766    }
767
768    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
769    ///
770    /// Also accepts `ALTER COLLECTION name ADD|REVOKE SIGNER 'hex'`
771    /// (issue #522) — collection-level signer registry mutations share
772    /// the AlterTable AST so the existing executor dispatch path picks
773    /// them up without a new top-level variant.
774    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
775        self.expect(Token::Alter)?;
776        if !self.consume(&Token::Table)?
777            && !self.consume(&Token::Collection)?
778            && !self.consume_ident_ci("COLLECTION")?
779        {
780            return Err(ParseError::expected(
781                vec!["TABLE", "COLLECTION"],
782                self.peek(),
783                self.position(),
784            ));
785        }
786        let name = self.expect_ident()?;
787
788        let mut operations = Vec::new();
789        loop {
790            let op = self.parse_alter_operation(&name)?;
791            operations.push(op);
792            if !self.consume(&Token::Comma)? {
793                break;
794            }
795        }
796
797        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
798    }
799
800    /// Parse: `ALTER GRAPH <name> ADD ANALYTICS ( <output> [, ...] )`
801    /// and `ALTER GRAPH <name> DROP ANALYTICS <output>` (issue #801).
802    ///
803    /// Lifecycle management of the `WITH ANALYTICS` configuration declared at
804    /// `CREATE GRAPH` time (#800), without recreating the collection. Shares
805    /// the `AlterTable` AST so the existing executor dispatch path picks the
806    /// mutations up; the executor validates the target is a graph and that the
807    /// dropped output is actually enabled.
808    pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
809        self.expect(Token::Alter)?;
810        self.expect(Token::Graph)?;
811        let name = self.expect_ident()?;
812
813        let mut operations = Vec::new();
814        loop {
815            operations.push(self.parse_alter_graph_operation()?);
816            if !self.consume(&Token::Comma)? {
817                break;
818            }
819        }
820
821        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
822    }
823
824    /// Parse a single `ALTER GRAPH` analytics operation: either
825    /// `ADD ANALYTICS ( ... )` or `DROP ANALYTICS <output>`.
826    fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
827        if self.consume(&Token::Add)? {
828            if !self.consume_ident_ci("ANALYTICS")? {
829                return Err(ParseError::expected(
830                    vec!["ANALYTICS"],
831                    self.peek(),
832                    self.position(),
833                ));
834            }
835            // Reuse the `WITH ANALYTICS (...)` body grammar verbatim so the
836            // ADD form accepts the exact same outputs and options as CREATE.
837            let views = self.parse_analytics_clause()?;
838            Ok(AlterOperation::AddAnalytics(views))
839        } else if self.consume(&Token::Drop)? {
840            if !self.consume_ident_ci("ANALYTICS")? {
841                return Err(ParseError::expected(
842                    vec!["ANALYTICS"],
843                    self.peek(),
844                    self.position(),
845                ));
846            }
847            let output_name = self.parse_analytics_output_name()?;
848            let output = reddb_types::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
849                ParseError::new(
850                    format!(
851                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
852                    ),
853                    self.position(),
854                )
855            })?;
856            Ok(AlterOperation::DropAnalytics(output))
857        } else {
858            Err(ParseError::expected(
859                vec!["ADD", "DROP"],
860                self.peek(),
861                self.position(),
862            ))
863        }
864    }
865
866    /// Parse a single ALTER TABLE operation
867    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
868        if self.consume(&Token::Add)? {
869            if self.consume_ident_ci("SUBSCRIPTION")? {
870                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
871                // #1374 — a subscription name may be a keyword (e.g. `search`);
872                // accept keyword tokens as identifiers here (PG-style).
873                let sub_name = self.expect_ident_or_keyword()?;
874                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
875                Ok(AlterOperation::AddSubscription {
876                    name: sub_name,
877                    descriptor,
878                })
879            } else if self.consume_ident_ci("SIGNER")? {
880                // ADD SIGNER 'hex_pubkey' — issue #522.
881                let pubkey = self.parse_single_signer_hex()?;
882                Ok(AlterOperation::AddSigner { pubkey })
883            } else {
884                // ADD COLUMN definition (COLUMN keyword is optional)
885                let _ = self.consume(&Token::Column)?;
886                let col_def = self.parse_column_def()?;
887                Ok(AlterOperation::AddColumn(col_def))
888            }
889        } else if self.consume_ident_ci("REVOKE")? {
890            // REVOKE SIGNER 'hex_pubkey' — issue #522.
891            if !self.consume_ident_ci("SIGNER")? {
892                return Err(ParseError::expected(
893                    vec!["SIGNER"],
894                    self.peek(),
895                    self.position(),
896                ));
897            }
898            let pubkey = self.parse_single_signer_hex()?;
899            Ok(AlterOperation::RevokeSigner { pubkey })
900        } else if self.consume(&Token::Drop)? {
901            if self.consume_ident_ci("SUBSCRIPTION")? {
902                // DROP SUBSCRIPTION name (may be a keyword, e.g. `search`)
903                let sub_name = self.expect_ident_or_keyword()?;
904                Ok(AlterOperation::DropSubscription { name: sub_name })
905            } else {
906                // DROP COLUMN name (COLUMN keyword is optional)
907                let _ = self.consume(&Token::Column)?;
908                let col_name = self.expect_ident()?;
909                Ok(AlterOperation::DropColumn(col_name))
910            }
911        } else if self.consume(&Token::Rename)? {
912            // RENAME COLUMN from TO to
913            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
914            let from = self.expect_ident()?;
915            self.expect(Token::To)?;
916            let to = self.expect_ident()?;
917            Ok(AlterOperation::RenameColumn { from, to })
918        } else if self.consume(&Token::Attach)? {
919            // ATTACH PARTITION child FOR VALUES ...
920            self.expect(Token::Partition)?;
921            let child = self.expect_ident()?;
922            self.expect(Token::For)?;
923            // Accept `VALUES` as an ident since the grammar doesn't have it
924            // as a reserved keyword everywhere. Collect the remaining tokens
925            // as a raw bound string for round-trip persistence.
926            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
927                return Err(ParseError::expected(
928                    vec!["VALUES"],
929                    self.peek(),
930                    self.position(),
931                ));
932            }
933            let bound = self.collect_remaining_tokens_as_string()?;
934            Ok(AlterOperation::AttachPartition { child, bound })
935        } else if self.consume(&Token::Detach)? {
936            // DETACH PARTITION child
937            self.expect(Token::Partition)?;
938            let child = self.expect_ident()?;
939            Ok(AlterOperation::DetachPartition { child })
940        } else if self.consume(&Token::Enable)? {
941            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
942            if self.consume_ident_ci("EVENTS")? {
943                Ok(AlterOperation::EnableEvents(
944                    self.parse_subscription_descriptor(table_name.to_string())?,
945                ))
946            } else if self.consume_ident_ci("TENANCY")? {
947                self.expect(Token::On)?;
948                self.expect(Token::LParen)?;
949                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
950                let mut path = self.expect_ident_or_keyword()?;
951                while self.consume(&Token::Dot)? {
952                    let next = self.expect_ident_or_keyword()?;
953                    path = format!("{path}.{next}");
954                }
955                self.expect(Token::RParen)?;
956                Ok(AlterOperation::EnableTenancy { column: path })
957            } else {
958                self.expect(Token::Row)?;
959                self.expect(Token::Level)?;
960                self.expect(Token::Security)?;
961                Ok(AlterOperation::EnableRowLevelSecurity)
962            }
963        } else if self.consume(&Token::Disable)? {
964            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
965            if self.consume_ident_ci("EVENTS")? {
966                Ok(AlterOperation::DisableEvents)
967            } else if self.consume_ident_ci("TENANCY")? {
968                Ok(AlterOperation::DisableTenancy)
969            } else {
970                self.expect(Token::Row)?;
971                self.expect(Token::Level)?;
972                self.expect(Token::Security)?;
973                Ok(AlterOperation::DisableRowLevelSecurity)
974            }
975        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
976            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
977            // SET RETENTION <duration> (issue #580)
978            if self.consume_ident_ci("APPEND_ONLY")? {
979                let on = self.parse_bool_assign()?;
980                Ok(AlterOperation::SetAppendOnly(on))
981            } else if self.consume_ident_ci("VERSIONED")? {
982                let on = self.parse_bool_assign()?;
983                Ok(AlterOperation::SetVersioned(on))
984            } else if self.consume(&Token::Retention)? {
985                // `SET RETENTION <duration>` — reuse the same float+unit
986                // grammar the timeseries CREATE clause uses so `7 DAYS`,
987                // `30 m`, `1 h`, `90 d` all parse identically.
988                let value = self.parse_float()?;
989                let unit = self.parse_duration_unit()?;
990                Ok(AlterOperation::SetRetention {
991                    duration_ms: (value * unit) as u64,
992                })
993            } else {
994                Err(ParseError::expected(
995                    vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
996                    self.peek(),
997                    self.position(),
998                ))
999            }
1000        } else if self.consume_ident_ci("UNSET")? {
1001            // `UNSET RETENTION` — clears the declarative retention policy.
1002            if self.consume(&Token::Retention)? {
1003                Ok(AlterOperation::UnsetRetention)
1004            } else {
1005                Err(ParseError::expected(
1006                    vec!["RETENTION"],
1007                    self.peek(),
1008                    self.position(),
1009                ))
1010            }
1011        } else {
1012            Err(ParseError::expected(
1013                vec![
1014                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
1015                    "UNSET",
1016                ],
1017                self.peek(),
1018                self.position(),
1019            ))
1020        }
1021    }
1022
1023    fn parse_subscription_descriptor(
1024        &mut self,
1025        source: String,
1026    ) -> Result<SubscriptionDescriptor, ParseError> {
1027        let mut ops_filter = Vec::new();
1028        if self.consume(&Token::LParen)? {
1029            loop {
1030                let op = if self.consume(&Token::Insert)? {
1031                    SubscriptionOperation::Insert
1032                } else if self.consume(&Token::Update)? {
1033                    SubscriptionOperation::Update
1034                } else if self.consume(&Token::Delete)? {
1035                    SubscriptionOperation::Delete
1036                } else {
1037                    return Err(ParseError::expected(
1038                        vec!["INSERT", "UPDATE", "DELETE"],
1039                        self.peek(),
1040                        self.position(),
1041                    ));
1042                };
1043                ops_filter.push(op);
1044                if !self.consume(&Token::Comma)? {
1045                    break;
1046                }
1047            }
1048            self.expect(Token::RParen)?;
1049        }
1050
1051        let target_queue = if self.consume(&Token::To)? {
1052            self.expect_ident()?
1053        } else {
1054            format!("{source}_events")
1055        };
1056
1057        let mut redact_fields = Vec::new();
1058        if self.consume_ident_ci("REDACT")? {
1059            self.expect(Token::LParen)?;
1060            loop {
1061                redact_fields.push(self.parse_dotted_redact_path()?);
1062                if !self.consume(&Token::Comma)? {
1063                    break;
1064                }
1065            }
1066            self.expect(Token::RParen)?;
1067        }
1068
1069        let where_filter = if self.consume(&Token::Where)? {
1070            Some(self.collect_subscription_where_filter()?)
1071        } else {
1072            None
1073        };
1074
1075        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
1076        let all_tenants = if self.consume(&Token::On)? {
1077            self.expect(Token::All)?;
1078            if !self.consume_ident_ci("TENANTS")? {
1079                return Err(ParseError::expected(
1080                    vec!["TENANTS"],
1081                    self.peek(),
1082                    self.position(),
1083                ));
1084            }
1085            true
1086        } else {
1087            false
1088        };
1089
1090        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
1091        if self.consume_ident_ci("REQUIRES")? {
1092            self.consume_ident_ci("CAPABILITY")?;
1093            // consume the capability string literal token
1094            self.advance()?;
1095        }
1096
1097        Ok(SubscriptionDescriptor {
1098            name: String::new(),
1099            source,
1100            target_queue,
1101            ops_filter,
1102            where_filter,
1103            redact_fields,
1104            enabled: true,
1105            all_tenants,
1106        })
1107    }
1108
1109    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
1110    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1111        let mut parts = Vec::new();
1112        if self.consume(&Token::Star)? {
1113            parts.push("*".to_string());
1114        } else {
1115            parts.push(self.expect_ident_or_keyword()?);
1116        }
1117        while self.consume(&Token::Dot)? {
1118            if self.consume(&Token::Star)? {
1119                parts.push("*".to_string());
1120            } else {
1121                parts.push(self.expect_ident_or_keyword()?);
1122            }
1123        }
1124        Ok(parts.join("."))
1125    }
1126
1127    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1128        let mut parts = Vec::new();
1129        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1130            parts.push(self.peek().to_string());
1131            self.advance()?;
1132        }
1133        if parts.is_empty() {
1134            return Err(ParseError::expected(
1135                vec!["predicate"],
1136                self.peek(),
1137                self.position(),
1138            ));
1139        }
1140        Ok(parts.join(" "))
1141    }
1142
1143    /// Capture remaining tokens as a display-joined string.
1144    ///
1145    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
1146    /// bound clause into storage without needing a dedicated per-kind AST.
1147    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1148        let mut parts: Vec<String> = Vec::new();
1149        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1150            parts.push(self.peek().to_string());
1151            self.advance()?;
1152        }
1153        Ok(parts.join(" "))
1154    }
1155
1156    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
1157    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1158        let name = self.expect_column_ident()?;
1159        let sql_type = self.parse_column_type()?;
1160        let data_type = sql_type.to_string();
1161
1162        let mut def = CreateColumnDef {
1163            name,
1164            data_type,
1165            sql_type: sql_type.clone(),
1166            not_null: false,
1167            default: None,
1168            compress: None,
1169            unique: false,
1170            primary_key: false,
1171            enum_variants: sql_type.enum_variants().unwrap_or_default(),
1172            array_element: sql_type.array_element_type(),
1173            decimal_precision: sql_type.decimal_precision(),
1174        };
1175
1176        // Parse modifiers in any order
1177        loop {
1178            if self.match_not_null()? {
1179                def.not_null = true;
1180            } else if self.consume(&Token::Default)? {
1181                self.expect(Token::Eq)?;
1182                def.default = Some(self.parse_literal_string_for_ddl()?);
1183            } else if self.consume(&Token::Compress)? {
1184                self.expect(Token::Colon)?;
1185                def.compress = Some(self.parse_integer()? as u8);
1186            } else if self.consume(&Token::Unique)? {
1187                def.unique = true;
1188            } else if self.match_primary_key()? {
1189                def.primary_key = true;
1190            } else {
1191                break;
1192            }
1193        }
1194
1195        Ok(def)
1196    }
1197
1198    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
1199    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1200        let type_name = self.expect_ident_or_keyword()?;
1201        if self.consume(&Token::LParen)? {
1202            let inner = self.parse_type_params()?;
1203            self.expect(Token::RParen)?;
1204            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1205        } else {
1206            Ok(SqlTypeName::new(type_name))
1207        }
1208    }
1209
1210    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
1211    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1212        let mut parts = Vec::new();
1213        loop {
1214            match self.peek().clone() {
1215                Token::String(s) => {
1216                    let s = s.clone();
1217                    self.advance()?;
1218                    parts.push(TypeModifier::StringLiteral(s));
1219                }
1220                Token::Integer(n) => {
1221                    self.advance()?;
1222                    parts.push(TypeModifier::Number(n as u32));
1223                }
1224                _ => {
1225                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1226                }
1227            }
1228            if !self.consume(&Token::Comma)? {
1229                break;
1230            }
1231        }
1232        Ok(parts)
1233    }
1234
1235    /// Parse a literal string value for DDL DEFAULT expressions
1236    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1237        match self.peek().clone() {
1238            Token::String(s) => {
1239                let s = s.clone();
1240                self.advance()?;
1241                Ok(s)
1242            }
1243            Token::Integer(n) => {
1244                self.advance()?;
1245                Ok(n.to_string())
1246            }
1247            Token::Float(n) => {
1248                self.advance()?;
1249                Ok(n.to_string())
1250            }
1251            Token::True => {
1252                self.advance()?;
1253                Ok("true".to_string())
1254            }
1255            Token::False => {
1256                self.advance()?;
1257                Ok("false".to_string())
1258            }
1259            Token::Null => {
1260                self.advance()?;
1261                Ok("null".to_string())
1262            }
1263            ref other => Err(ParseError::expected(
1264                vec!["string", "number", "true", "false", "null"],
1265                other,
1266                self.position(),
1267            )),
1268        }
1269    }
1270
1271    fn check_ttl_keyword(&self) -> bool {
1272        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1273    }
1274
1275    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
1276    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
1277    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1278        self.expect(Token::Eq)?;
1279        match self.peek() {
1280            Token::True => {
1281                self.advance()?;
1282                Ok(true)
1283            }
1284            Token::False => {
1285                self.advance()?;
1286                Ok(false)
1287            }
1288            other => Err(ParseError::expected(
1289                vec!["true", "false"],
1290                other,
1291                self.position(),
1292            )),
1293        }
1294    }
1295
1296    /// Parse a parenthesised list of string literals: `('a', 'b', ...)`.
1297    /// Used for the `fields` / `outputs` AI-policy options. Requires at
1298    /// least one entry.
1299    fn parse_ai_string_list(&mut self) -> Result<Vec<String>, ParseError> {
1300        self.expect(Token::LParen)?;
1301        let mut out = Vec::new();
1302        loop {
1303            out.push(self.parse_string()?);
1304            if !self.consume(&Token::Comma)? {
1305                break;
1306            }
1307        }
1308        self.expect(Token::RParen)?;
1309        Ok(out)
1310    }
1311
1312    /// Parse a bare `true` / `false` token (after the `=` is consumed).
1313    fn parse_ai_bool(&mut self) -> Result<bool, ParseError> {
1314        match self.peek() {
1315            Token::True => {
1316                self.advance()?;
1317                Ok(true)
1318            }
1319            Token::False => {
1320                self.advance()?;
1321                Ok(false)
1322            }
1323            other => Err(ParseError::expected(
1324                vec!["true", "false"],
1325                other,
1326                self.position(),
1327            )),
1328        }
1329    }
1330
1331    /// Parse a single enum-ish word — either a quoted string or a bare
1332    /// identifier/keyword. Used for `degraded` / `on_reject` values.
1333    fn parse_ai_word(&mut self) -> Result<String, ParseError> {
1334        if matches!(self.peek(), Token::String(_)) {
1335            self.parse_string()
1336        } else {
1337            self.expect_ident_or_keyword()
1338        }
1339    }
1340
1341    /// `EMBED (fields = (...), provider = '..', model = '..')`.
1342    fn parse_ai_embed_policy(&mut self) -> Result<reddb_types::catalog::EmbedPolicy, ParseError> {
1343        self.expect(Token::LParen)?;
1344        let mut fields: Vec<String> = Vec::new();
1345        let mut provider: Option<String> = None;
1346        let mut model: Option<String> = None;
1347        loop {
1348            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1349            self.expect(Token::Eq)?;
1350            match key.as_str() {
1351                "fields" => fields = self.parse_ai_string_list()?,
1352                "provider" => provider = Some(self.parse_string()?),
1353                "model" => model = Some(self.parse_string()?),
1354                other => {
1355                    return Err(ParseError::new(
1356                        format!(
1357                            "unsupported EMBED policy option {other:?}; supported: fields, provider, model"
1358                        ),
1359                        self.position(),
1360                    ));
1361                }
1362            }
1363            if !self.consume(&Token::Comma)? {
1364                break;
1365            }
1366        }
1367        self.expect(Token::RParen)?;
1368        if fields.is_empty() {
1369            return Err(ParseError::new(
1370                "EMBED policy requires fields = ('<col>', ...)".to_string(),
1371                self.position(),
1372            ));
1373        }
1374        let provider = provider.ok_or_else(|| {
1375            ParseError::new(
1376                "EMBED policy requires provider = '<token>'".to_string(),
1377                self.position(),
1378            )
1379        })?;
1380        let model = model.ok_or_else(|| {
1381            ParseError::new(
1382                "EMBED policy requires model = '<name>'".to_string(),
1383                self.position(),
1384            )
1385        })?;
1386        Ok(reddb_types::catalog::EmbedPolicy {
1387            fields,
1388            provider,
1389            model,
1390        })
1391    }
1392
1393    /// `MODERATE (fields = (...), provider, model, sync, degraded, on_reject)`.
1394    fn parse_ai_moderate_policy(
1395        &mut self,
1396    ) -> Result<reddb_types::catalog::ModeratePolicy, ParseError> {
1397        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1398        self.expect(Token::LParen)?;
1399        let mut fields: Vec<String> = Vec::new();
1400        let mut provider: Option<String> = None;
1401        let mut model: Option<String> = None;
1402        let mut sync_gate = false;
1403        let mut degraded_mode = ModerateDegradedMode::default();
1404        let mut reject_action = ModerateRejectAction::default();
1405        let mut hard_delete_on_reject = false;
1406        loop {
1407            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1408            self.expect(Token::Eq)?;
1409            match key.as_str() {
1410                "fields" => fields = self.parse_ai_string_list()?,
1411                "provider" => provider = Some(self.parse_string()?),
1412                "model" => model = Some(self.parse_string()?),
1413                "sync" | "sync_gate" => sync_gate = self.parse_ai_bool()?,
1414                "hard_delete" | "hard_delete_on_reject" => {
1415                    hard_delete_on_reject = self.parse_ai_bool()?
1416                }
1417                "degraded" | "degraded_mode" => {
1418                    let word = self.parse_ai_word()?;
1419                    degraded_mode = ModerateDegradedMode::from_str(&word).ok_or_else(|| {
1420                        ParseError::new(
1421                            format!(
1422                                "unsupported MODERATE degraded mode {word:?}; supported: open, closed"
1423                            ),
1424                            self.position(),
1425                        )
1426                    })?;
1427                }
1428                "on_reject" | "reject_action" => {
1429                    let word = self.parse_ai_word()?;
1430                    reject_action = ModerateRejectAction::from_str(&word).ok_or_else(|| {
1431                        ParseError::new(
1432                            format!(
1433                                "unsupported MODERATE reject action {word:?}; supported: reject, flag, redact"
1434                            ),
1435                            self.position(),
1436                        )
1437                    })?;
1438                }
1439                other => {
1440                    return Err(ParseError::new(
1441                        format!(
1442                            "unsupported MODERATE policy option {other:?}; supported: fields, provider, model, sync, degraded, on_reject, hard_delete"
1443                        ),
1444                        self.position(),
1445                    ));
1446                }
1447            }
1448            if !self.consume(&Token::Comma)? {
1449                break;
1450            }
1451        }
1452        self.expect(Token::RParen)?;
1453        if fields.is_empty() {
1454            return Err(ParseError::new(
1455                "MODERATE policy requires fields = ('<col>', ...)".to_string(),
1456                self.position(),
1457            ));
1458        }
1459        let provider = provider.ok_or_else(|| {
1460            ParseError::new(
1461                "MODERATE policy requires provider = '<token>'".to_string(),
1462                self.position(),
1463            )
1464        })?;
1465        let model = model.ok_or_else(|| {
1466            ParseError::new(
1467                "MODERATE policy requires model = '<name>'".to_string(),
1468                self.position(),
1469            )
1470        })?;
1471        Ok(reddb_types::catalog::ModeratePolicy {
1472            fields,
1473            provider,
1474            model,
1475            sync_gate,
1476            degraded_mode,
1477            reject_action,
1478            hard_delete_on_reject,
1479        })
1480    }
1481
1482    /// `VISION (image_field = '..', outputs = (...), provider, model)`.
1483    fn parse_ai_vision_policy(&mut self) -> Result<reddb_types::catalog::VisionPolicy, ParseError> {
1484        self.expect(Token::LParen)?;
1485        let mut image_field: Option<String> = None;
1486        let mut output_kinds: Vec<String> = Vec::new();
1487        let mut provider: Option<String> = None;
1488        let mut model: Option<String> = None;
1489        loop {
1490            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1491            self.expect(Token::Eq)?;
1492            match key.as_str() {
1493                "image_field" => image_field = Some(self.parse_string()?),
1494                "outputs" | "output_kinds" => output_kinds = self.parse_ai_string_list()?,
1495                "provider" => provider = Some(self.parse_string()?),
1496                "model" => model = Some(self.parse_string()?),
1497                other => {
1498                    return Err(ParseError::new(
1499                        format!(
1500                            "unsupported VISION policy option {other:?}; supported: image_field, outputs, provider, model"
1501                        ),
1502                        self.position(),
1503                    ));
1504                }
1505            }
1506            if !self.consume(&Token::Comma)? {
1507                break;
1508            }
1509        }
1510        self.expect(Token::RParen)?;
1511        let image_field = image_field.ok_or_else(|| {
1512            ParseError::new(
1513                "VISION policy requires image_field = '<col>'".to_string(),
1514                self.position(),
1515            )
1516        })?;
1517        if output_kinds.is_empty() {
1518            return Err(ParseError::new(
1519                "VISION policy requires outputs = ('<kind>', ...)".to_string(),
1520                self.position(),
1521            ));
1522        }
1523        let provider = provider.ok_or_else(|| {
1524            ParseError::new(
1525                "VISION policy requires provider = '<token>'".to_string(),
1526                self.position(),
1527            )
1528        })?;
1529        let model = model.ok_or_else(|| {
1530            ParseError::new(
1531                "VISION policy requires model = '<name>'".to_string(),
1532                self.position(),
1533            )
1534        })?;
1535        Ok(reddb_types::catalog::VisionPolicy {
1536            image_field,
1537            output_kinds,
1538            provider,
1539            model,
1540        })
1541    }
1542
1543    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1544        if self.consume_ident_ci(expected)? {
1545            Ok(())
1546        } else {
1547            Err(ParseError::expected(
1548                vec![expected],
1549                self.peek(),
1550                self.position(),
1551            ))
1552        }
1553    }
1554
1555    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1556        let option_name = self.expect_ident_or_keyword()?;
1557        if !option_name.eq_ignore_ascii_case("ttl") {
1558            return Err(ParseError::new(
1559                // F-05: `option_name` is caller-controlled identifier text.
1560                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
1561                // before the message reaches downstream serialization sinks.
1562                format!(
1563                    "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1564                ),
1565                self.position(),
1566            ));
1567        }
1568
1569        let ttl_value = self.parse_float()?;
1570        let ttl_unit = match self.peek() {
1571            Token::Ident(unit) => {
1572                let unit = unit.clone();
1573                self.advance()?;
1574                unit
1575            }
1576            _ => "s".to_string(),
1577        };
1578
1579        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1580            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1581            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1582            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1583            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1584            "d" | "day" | "days" => 86_400_000.0,
1585            other => {
1586                return Err(ParseError::new(
1587                    // F-05: render `other` via `{:?}` so caller-controlled
1588                    // bytes (CR / LF / NUL / quotes) are escaped before
1589                    // reaching downstream serialization sinks.
1590                    format!(
1591                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1592                    ),
1593                    self.position(),
1594                ));
1595            }
1596        };
1597
1598        if !ttl_value.is_finite() || ttl_value < 0.0 {
1599            return Err(ParseError::new(
1600                "TTL must be a finite, non-negative duration".to_string(),
1601                self.position(),
1602            ));
1603        }
1604
1605        let ttl_ms = ttl_value * multiplier_ms;
1606        if ttl_ms > u64::MAX as f64 {
1607            return Err(ParseError::new(
1608                "TTL duration is too large".to_string(),
1609                self.position(),
1610            ));
1611        }
1612        if ttl_ms.fract().abs() >= f64::EPSILON {
1613            return Err(ParseError::new(
1614                "TTL duration must resolve to a whole number of milliseconds".to_string(),
1615                self.position(),
1616            ));
1617        }
1618
1619        Ok(Some(ttl_ms as u64))
1620    }
1621
1622    /// Try to match IF NOT EXISTS sequence
1623    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1624        if self.check(&Token::If) {
1625            self.advance()?;
1626            self.expect(Token::Not)?;
1627            self.expect(Token::Exists)?;
1628            Ok(true)
1629        } else {
1630            Ok(false)
1631        }
1632    }
1633
1634    /// Try to match IF EXISTS sequence
1635    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1636        if self.check(&Token::If) {
1637            self.advance()?;
1638            self.expect(Token::Exists)?;
1639            Ok(true)
1640        } else {
1641            Ok(false)
1642        }
1643    }
1644
1645    /// Try to match NOT NULL sequence
1646    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1647        if self.check(&Token::Not) {
1648            // Peek ahead - only consume if followed by NULL
1649            // We need to be careful: save state and try
1650            self.advance()?; // consume NOT
1651            if self.check(&Token::Null) {
1652                self.advance()?; // consume NULL
1653                Ok(true)
1654            } else {
1655                // This is tricky - NOT was consumed but next isn't NULL.
1656                // In column modifier context, NOT should only appear before NULL.
1657                // Return error for clarity.
1658                Err(ParseError::expected(
1659                    vec!["NULL (after NOT)"],
1660                    self.peek(),
1661                    self.position(),
1662                ))
1663            }
1664        } else {
1665            Ok(false)
1666        }
1667    }
1668
1669    /// Try to match PRIMARY KEY sequence
1670    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1671        if self.check(&Token::Primary) {
1672            self.advance()?;
1673            self.expect(Token::Key)?;
1674            Ok(true)
1675        } else {
1676            Ok(false)
1677        }
1678    }
1679}
1680
1681/// Decode a 64-char lowercase/uppercase hex string into a 32-byte array.
1682/// Returns a human-readable error message on length or character violations.
1683/// Used by `SIGNED_BY` clause parsing (issue #520) to surface bad pubkeys
1684/// at parse-time rather than downstream in the engine.
1685fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1686    if s.len() != 64 {
1687        return Err(format!("expected 64 hex chars, got {}", s.len()));
1688    }
1689    let mut out = [0u8; 32];
1690    let bytes = s.as_bytes();
1691    for i in 0..32 {
1692        let hi = hex_nibble(bytes[i * 2])?;
1693        let lo = hex_nibble(bytes[i * 2 + 1])?;
1694        out[i] = (hi << 4) | lo;
1695    }
1696    Ok(out)
1697}
1698
1699fn hex_nibble(c: u8) -> Result<u8, String> {
1700    match c {
1701        b'0'..=b'9' => Ok(c - b'0'),
1702        b'a'..=b'f' => Ok(c - b'a' + 10),
1703        b'A'..=b'F' => Ok(c - b'A' + 10),
1704        _ => Err(format!("non-hex char: {:?}", c as char)),
1705    }
1706}
1707
1708#[cfg(test)]
1709mod tests {
1710    use super::*;
1711    use reddb_types::catalog::{AnalyticsOutput, CollectionModel, SubscriptionOperation};
1712
1713    fn parser(input: &str) -> Parser<'_> {
1714        Parser::new(input).unwrap_or_else(|err| panic!("failed to lex {input:?}: {err:?}"))
1715    }
1716
1717    #[test]
1718    fn parse_create_table_body_parenthesized_options_and_trailing_clauses() {
1719        let QueryExpr::CreateTable(table) = parser(
1720            "IF NOT EXISTS events (id INT, tenant_meta TEXT) \
1721             WITH (tenant_by = 'tenant_id', append_only = true, timestamps = false) \
1722             PARTITION BY HASH (id) TENANT BY (tenant_meta.tenant)",
1723        )
1724        .parse_create_table_body()
1725        .expect("create table body") else {
1726            panic!("Expected CreateTableQuery");
1727        };
1728
1729        assert_eq!(table.name, "events");
1730        assert!(table.if_not_exists);
1731        assert!(table.append_only);
1732        assert!(!table.timestamps);
1733        assert_eq!(table.tenant_by.as_deref(), Some("tenant_id"));
1734        assert_eq!(
1735            table
1736                .partition_by
1737                .as_ref()
1738                .map(|spec| (spec.kind, spec.column.as_str())),
1739            Some((PartitionKind::Hash, "id"))
1740        );
1741
1742        let err = parser("bad (id INT) WITH (tenant_by = 42)")
1743            .parse_create_table_body()
1744            .unwrap_err();
1745        assert!(
1746            err.to_string()
1747                .contains("WITH tenant_by expects a text literal"),
1748            "{err}"
1749        );
1750    }
1751
1752    #[test]
1753    fn parse_create_table_ai_policy_round_trips_all_modalities() {
1754        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1755        let QueryExpr::CreateTable(table) = parser(
1756            "posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
1757               EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
1758               MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag, hard_delete = true), \
1759               VISION (image_field = 'photo', outputs = ('caption', 'tags'), provider = 'openai', model = 'gpt-4o') \
1760             )",
1761        )
1762        .parse_create_table_body()
1763        .expect("create table body with ai policy") else {
1764            panic!("Expected CreateTableQuery");
1765        };
1766
1767        let policy = table.ai_policy.expect("ai policy present");
1768
1769        let embed = policy.embed.expect("embed block");
1770        assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
1771        assert_eq!(embed.provider, "openai");
1772        assert_eq!(embed.model, "text-embedding-3-small");
1773
1774        let moderate = policy.moderate.expect("moderate block");
1775        assert_eq!(moderate.fields, vec!["body".to_string()]);
1776        assert!(moderate.sync_gate);
1777        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
1778        assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
1779        assert!(moderate.hard_delete_on_reject);
1780
1781        let vision = policy.vision.expect("vision block");
1782        assert_eq!(vision.image_field, "photo");
1783        assert_eq!(
1784            vision.output_kinds,
1785            vec!["caption".to_string(), "tags".to_string()]
1786        );
1787        assert_eq!(vision.model, "gpt-4o");
1788    }
1789
1790    #[test]
1791    fn parse_moderate_policy_aliases_and_error_branches() {
1792        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1793        // Alias spellings hit the same match arms as the short forms.
1794        let QueryExpr::CreateTable(table) = parser(
1795            "t (id INT, body TEXT) WITH ( \
1796               MODERATE (fields = ('body'), provider = 'openai', model = 'm', \
1797                 sync_gate = true, degraded_mode = open, reject_action = redact, \
1798                 hard_delete_on_reject = true) \
1799             )",
1800        )
1801        .parse_create_table_body()
1802        .expect("alias spellings parse") else {
1803            panic!("Expected CreateTableQuery");
1804        };
1805        let moderate = table
1806            .ai_policy
1807            .expect("policy")
1808            .moderate
1809            .expect("moderate block");
1810        assert!(moderate.sync_gate);
1811        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1812        assert_eq!(moderate.reject_action, ModerateRejectAction::Redact);
1813        assert!(moderate.hard_delete_on_reject);
1814
1815        // Each invalid MODERATE option surfaces a descriptive parse error.
1816        for (sql, needle) in [
1817            (
1818                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1819                "unsupported MODERATE policy option",
1820            ),
1821            (
1822                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', degraded = sideways))",
1823                "unsupported MODERATE degraded mode",
1824            ),
1825            (
1826                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', on_reject = explode))",
1827                "unsupported MODERATE reject action",
1828            ),
1829            (
1830                "t (id INT, body TEXT) WITH (MODERATE (provider = 'o', model = 'm'))",
1831                "MODERATE policy requires fields",
1832            ),
1833        ] {
1834            let err = parser(sql)
1835                .parse_create_table_body()
1836                .expect_err("moderate policy error");
1837            assert!(format!("{err}").contains(needle), "got: {err}");
1838        }
1839    }
1840
1841    #[test]
1842    fn parse_embed_and_vision_policy_error_branches() {
1843        // EMBED + VISION unknown-option and missing-required-field arms each
1844        // surface a descriptive parse error.
1845        for (sql, needle) in [
1846            (
1847                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1848                "unsupported EMBED policy option",
1849            ),
1850            (
1851                "t (id INT, body TEXT) WITH (EMBED (provider = 'o', model = 'm'))",
1852                "EMBED policy requires fields",
1853            ),
1854            (
1855                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), model = 'm'))",
1856                "EMBED policy requires provider",
1857            ),
1858            (
1859                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o'))",
1860                "EMBED policy requires model",
1861            ),
1862            (
1863                "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', provider = 'o', model = 'm', bogus = 1))",
1864                "unsupported VISION policy option",
1865            ),
1866            (
1867                "t (id INT, photo TEXT) WITH (VISION (provider = 'o', model = 'm'))",
1868                "VISION policy requires image_field",
1869            ),
1870        ] {
1871            let err = parser(sql)
1872                .parse_create_table_body()
1873                .expect_err("ai policy error");
1874            assert!(format!("{err}").contains(needle), "got: {err}");
1875        }
1876
1877        // The `output_kinds` alias for VISION parses like `outputs`.
1878        let QueryExpr::CreateTable(table) = parser(
1879            "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', \
1880               output_kinds = ('caption'), provider = 'o', model = 'm'))",
1881        )
1882        .parse_create_table_body()
1883        .expect("vision output_kinds alias") else {
1884            panic!("Expected CreateTableQuery");
1885        };
1886        let vision = table
1887            .ai_policy
1888            .expect("policy")
1889            .vision
1890            .expect("vision block");
1891        assert_eq!(vision.output_kinds, vec!["caption".to_string()]);
1892    }
1893
1894    #[test]
1895    fn parse_create_table_ai_policy_defaults_and_no_clause() {
1896        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1897        // MODERATE with no sync/degraded/on_reject falls back to defaults.
1898        let QueryExpr::CreateTable(table) = parser(
1899            "msgs (id INT, body TEXT) WITH ( \
1900               MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest') \
1901             )",
1902        )
1903        .parse_create_table_body()
1904        .expect("create table body") else {
1905            panic!("Expected CreateTableQuery");
1906        };
1907        let moderate = table
1908            .ai_policy
1909            .expect("policy")
1910            .moderate
1911            .expect("moderate block");
1912        assert!(!moderate.sync_gate);
1913        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1914        assert_eq!(moderate.reject_action, ModerateRejectAction::Reject);
1915        assert!(!moderate.hard_delete_on_reject);
1916
1917        // No AI clause leaves the policy entirely absent.
1918        let QueryExpr::CreateTable(plain) = parser("plain (id INT)")
1919            .parse_create_table_body()
1920            .expect("create table body")
1921        else {
1922            panic!("Expected CreateTableQuery");
1923        };
1924        assert!(plain.ai_policy.is_none());
1925    }
1926
1927    #[test]
1928    fn parse_create_table_ai_policy_rejects_malformed_clauses() {
1929        // Missing provider.
1930        let err = parser("t (id INT) WITH (EMBED (fields = ('body'), model = 'm'))")
1931            .parse_create_table_body()
1932            .unwrap_err();
1933        assert!(
1934            err.to_string().contains("EMBED policy requires provider"),
1935            "{err}"
1936        );
1937
1938        // Unknown option key inside a clause.
1939        let err = parser(
1940            "t (id INT) WITH (VISION (image_field = 'p', outputs = ('caption'), provider = 'openai', model = 'm', bogus = 1))",
1941        )
1942        .parse_create_table_body()
1943        .unwrap_err();
1944        assert!(
1945            err.to_string().contains("unsupported VISION policy option"),
1946            "{err}"
1947        );
1948
1949        // Invalid moderation degraded mode.
1950        let err = parser(
1951            "t (id INT) WITH (MODERATE (fields = ('body'), provider = 'openai', model = 'm', degraded = maybe))",
1952        )
1953        .parse_create_table_body()
1954        .unwrap_err();
1955        assert!(
1956            err.to_string()
1957                .contains("unsupported MODERATE degraded mode"),
1958            "{err}"
1959        );
1960
1961        // Duplicate EMBED clause.
1962        let err = parser(
1963            "t (id INT) WITH (EMBED (fields = ('a'), provider = 'openai', model = 'm'), EMBED (fields = ('b'), provider = 'openai', model = 'm'))",
1964        )
1965        .parse_create_table_body()
1966        .unwrap_err();
1967        assert!(err.to_string().contains("duplicate EMBED clause"), "{err}");
1968    }
1969
1970    #[test]
1971    fn parse_keyed_bodies_cover_vault_analytics_and_dotted_drop_names() {
1972        let QueryExpr::CreateTable(vault) =
1973            parser("IF NOT EXISTS tenant.secrets WITH OWN MASTER KEY")
1974                .parse_create_keyed_body(CollectionModel::Vault)
1975                .expect("create vault")
1976        else {
1977            panic!("Expected CreateTableQuery");
1978        };
1979        assert_eq!(vault.collection_model, CollectionModel::Vault);
1980        assert_eq!(vault.name, "tenant.secrets");
1981        assert!(vault.if_not_exists);
1982        assert!(vault.vault_own_master_key);
1983
1984        let QueryExpr::CreateTable(graph) = parser(
1985            "g WITH ANALYTICS (centrality (using = pagerank, max_iterations = 12, tolerance = 0.001))",
1986        )
1987        .parse_create_keyed_body(CollectionModel::Graph)
1988        .expect("create graph")
1989        else {
1990            panic!("Expected CreateTableQuery");
1991        };
1992        assert_eq!(graph.analytics_config.len(), 1);
1993        let view = &graph.analytics_config[0];
1994        assert_eq!(view.output, AnalyticsOutput::Centrality);
1995        assert_eq!(view.algorithm.as_deref(), Some("pagerank"));
1996        assert_eq!(view.max_iterations, Some(12));
1997        assert_eq!(view.tolerance, Some(0.001));
1998
1999        let err = parser("g WITH OTHER")
2000            .parse_create_keyed_body(CollectionModel::Graph)
2001            .unwrap_err();
2002        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2003
2004        assert!(parser("CREATE KV cache WITH ANALYTICS (components)")
2005            .parse()
2006            .unwrap_err()
2007            .to_string()
2008            .contains("Unexpected token after query"));
2009
2010        let QueryExpr::DropKv(drop) = parser("IF EXISTS tenant.cache.*")
2011            .parse_drop_keyed_body(CollectionModel::Kv)
2012            .expect("drop kv")
2013        else {
2014            panic!("Expected DropKvQuery");
2015        };
2016        assert_eq!(drop.name, "tenant.cache.*");
2017        assert!(drop.if_exists);
2018        assert_eq!(drop.model, CollectionModel::Kv);
2019    }
2020
2021    #[test]
2022    fn parse_collection_signed_by_list_and_errors() {
2023        let pk_a = "aa".repeat(32);
2024        let pk_b = "BB".repeat(32);
2025        let QueryExpr::CreateCollection(collection) =
2026            parser(&format!("signed KIND graph SIGNED_BY ('{pk_a}', '{pk_b}')"))
2027                .parse_create_collection_body()
2028                .expect("create collection")
2029        else {
2030            panic!("Expected CreateCollectionQuery");
2031        };
2032        assert_eq!(collection.allowed_signers, vec![[0xaau8; 32], [0xBBu8; 32]]);
2033
2034        let err = parser("signed KIND graph SIGNED_BY (42)")
2035            .parse_create_collection_body()
2036            .unwrap_err();
2037        assert!(
2038            err.to_string()
2039                .contains("string literal (ed25519 pubkey hex)"),
2040            "{err}"
2041        );
2042
2043        let err = parser("signed KIND graph SIGNED_BY ('deadbeef')")
2044            .parse_create_collection_body()
2045            .unwrap_err();
2046        assert!(err.to_string().contains("expected 64 hex chars"), "{err}");
2047    }
2048
2049    #[test]
2050    fn parse_alter_operations_cover_subscriptions_partitions_tenancy_and_signers() {
2051        let pk = "11".repeat(32);
2052        let QueryExpr::AlterTable(alter) = parser(&format!(
2053            "ALTER COLLECTION audit \
2054             ADD SUBSCRIPTION pii TO audit_events REDACT (payload.ssn, *.secret) WHERE level = 'warn', \
2055             DROP SUBSCRIPTION pii, \
2056             ADD SIGNER '{pk}', \
2057             REVOKE SIGNER '{pk}', \
2058             ATTACH PARTITION audit_2026 FOR VALUES FROM (2026) TO (2027), \
2059             DETACH PARTITION audit_2026, \
2060             ENABLE EVENTS (INSERT, UPDATE) TO table_events ON ALL TENANTS, \
2061             DISABLE EVENTS, \
2062             ENABLE TENANCY ON (metadata.tenant), \
2063             DISABLE TENANCY, \
2064             SET APPEND_ONLY = true, \
2065             SET VERSIONED = false, \
2066             SET RETENTION 2 h, \
2067             UNSET RETENTION"
2068        ))
2069        .parse_alter_table_query()
2070        .expect("alter collection")
2071        else {
2072            panic!("Expected AlterTableQuery");
2073        };
2074
2075        assert_eq!(alter.name, "audit");
2076        assert_eq!(alter.operations.len(), 14);
2077        match &alter.operations[0] {
2078            AlterOperation::AddSubscription { name, descriptor } => {
2079                assert_eq!(name, "pii");
2080                assert_eq!(descriptor.target_queue, "audit_events");
2081                assert_eq!(descriptor.redact_fields, vec!["payload.ssn", "*.secret"]);
2082                assert_eq!(descriptor.where_filter.as_deref(), Some("LEVEL = 'warn'"));
2083            }
2084            other => panic!("expected AddSubscription, got {other:?}"),
2085        }
2086        assert!(matches!(
2087            &alter.operations[1],
2088            AlterOperation::DropSubscription { name } if name == "pii"
2089        ));
2090        assert!(matches!(
2091            &alter.operations[2],
2092            AlterOperation::AddSigner { pubkey } if *pubkey == [0x11; 32]
2093        ));
2094        assert!(matches!(
2095            &alter.operations[3],
2096            AlterOperation::RevokeSigner { pubkey } if *pubkey == [0x11; 32]
2097        ));
2098        assert!(matches!(
2099            &alter.operations[4],
2100            AlterOperation::AttachPartition { child, bound }
2101                if child == "audit_2026" && bound == "FROM ( 2026 ) TO ( 2027 )"
2102        ));
2103        assert!(matches!(
2104            &alter.operations[5],
2105            AlterOperation::DetachPartition { child } if child == "audit_2026"
2106        ));
2107        match &alter.operations[6] {
2108            AlterOperation::EnableEvents(descriptor) => {
2109                assert_eq!(
2110                    descriptor.ops_filter,
2111                    vec![SubscriptionOperation::Insert, SubscriptionOperation::Update]
2112                );
2113                assert_eq!(descriptor.target_queue, "table_events");
2114                assert!(descriptor.all_tenants);
2115            }
2116            other => panic!("expected EnableEvents, got {other:?}"),
2117        }
2118        assert!(matches!(
2119            &alter.operations[7],
2120            AlterOperation::DisableEvents
2121        ));
2122        assert!(matches!(
2123            &alter.operations[8],
2124            AlterOperation::EnableTenancy { column } if column == "METADATA.tenant"
2125        ));
2126        assert!(matches!(
2127            &alter.operations[9],
2128            AlterOperation::DisableTenancy
2129        ));
2130        assert!(matches!(
2131            &alter.operations[10],
2132            AlterOperation::SetAppendOnly(true)
2133        ));
2134        assert!(matches!(
2135            &alter.operations[11],
2136            AlterOperation::SetVersioned(false)
2137        ));
2138        assert!(matches!(
2139            &alter.operations[12],
2140            AlterOperation::SetRetention { duration_ms } if *duration_ms == 7_200_000
2141        ));
2142        assert!(matches!(
2143            &alter.operations[13],
2144            AlterOperation::UnsetRetention
2145        ));
2146    }
2147
2148    #[test]
2149    fn parse_alter_graph_analytics_keyword_errors() {
2150        let err = parser("ALTER GRAPH g ADD centrality")
2151            .parse_alter_graph_query()
2152            .unwrap_err();
2153        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2154
2155        let err = parser("ALTER GRAPH g DROP centrality")
2156            .parse_alter_graph_query()
2157            .unwrap_err();
2158        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2159    }
2160
2161    #[test]
2162    fn decode_hex_32_reports_length_and_character_errors() {
2163        assert_eq!(decode_hex_32(&"0f".repeat(32)).unwrap(), [0x0f; 32]);
2164        assert_eq!(
2165            decode_hex_32("deadbeef").unwrap_err(),
2166            "expected 64 hex chars, got 8"
2167        );
2168        assert!(decode_hex_32(&"gg".repeat(32))
2169            .unwrap_err()
2170            .contains("non-hex char"));
2171    }
2172}