Skip to main content

reddb_rql/parser/
ddl.rs

1//! DDL SQL Parser: CREATE TABLE, DROP TABLE, ALTER TABLE
2
3use super::error::ParseError;
4use super::Parser;
5use crate::ast::{
6    AlterOperation, AlterTableQuery, CreateCollectionQuery, CreateColumnDef, CreateTableQuery,
7    CreateVectorQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropKvQuery,
8    DropTableQuery, DropVectorQuery, ExplainAlterQuery, ExplainFormat, PartitionKind,
9    PartitionSpec, QueryExpr, TruncateQuery,
10};
11use crate::lexer::Token;
12use reddb_types::catalog::{CollectionModel, SubscriptionDescriptor, SubscriptionOperation};
13use reddb_types::types::{SqlTypeName, TypeModifier, Value};
14
15impl<'a> Parser<'a> {
16    /// Parse: CREATE TABLE [IF NOT EXISTS] name (col1 TYPE [modifiers], ...)
17    pub fn parse_create_table_query(&mut self) -> Result<QueryExpr, ParseError> {
18        self.expect(Token::Create)?;
19        self.expect(Token::Table)?;
20
21        let if_not_exists = self.match_if_not_exists()?;
22        let name = self.expect_ident()?;
23
24        self.expect(Token::LParen)?;
25        let mut columns = Vec::new();
26        loop {
27            let col = self.parse_column_def()?;
28            columns.push(col);
29            if !self.consume(&Token::Comma)? {
30                break;
31            }
32        }
33        self.expect(Token::RParen)?;
34
35        let mut default_ttl_ms = None;
36        let mut context_index_fields = Vec::new();
37        let mut context_index_enabled = false;
38        let mut timestamps = false;
39        let mut subscriptions = Vec::new();
40
41        while self.consume(&Token::With)? {
42            if self.consume_ident_ci("EVENTS")? {
43                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
44            } else if self.consume_ident_ci("CONTEXT_INDEX")? {
45                context_index_enabled = self.parse_bool_assign()?;
46            } else if self.consume_ident_ci("CONTEXT")? {
47                // Consume INDEX token (reserved keyword)
48                if !self.consume(&Token::Index)? {
49                    return Err(ParseError::expected(
50                        vec!["INDEX"],
51                        self.peek(),
52                        self.position(),
53                    ));
54                }
55                self.expect(Token::On)?;
56                self.expect(Token::LParen)?;
57                loop {
58                    context_index_fields.push(self.expect_ident()?);
59                    if !self.consume(&Token::Comma)? {
60                        break;
61                    }
62                }
63                self.expect(Token::RParen)?;
64                context_index_enabled = true;
65            } else if self.consume_ident_ci("TIMESTAMPS")? {
66                timestamps = self.parse_bool_assign()?;
67            } else {
68                default_ttl_ms = self.parse_create_table_ttl_clause()?;
69            }
70        }
71
72        Ok(QueryExpr::CreateTable(CreateTableQuery {
73            collection_model: CollectionModel::Table,
74            name,
75            columns,
76            if_not_exists,
77            default_ttl_ms,
78            metrics_rollup_policies: Vec::new(),
79            context_index_fields,
80            context_index_enabled,
81            timestamps,
82            partition_by: None,
83            tenant_by: None,
84            append_only: false,
85            subscriptions,
86            analytics_config: Vec::new(),
87            vault_own_master_key: false,
88            ai_policy: None,
89        }))
90    }
91
92    /// Parse: DROP TABLE [IF EXISTS] name
93    pub fn parse_drop_table_query(&mut self) -> Result<QueryExpr, ParseError> {
94        self.expect(Token::Drop)?;
95        self.expect(Token::Table)?;
96        self.parse_drop_table_body()
97    }
98
99    /// Parse the body of CREATE TABLE after CREATE TABLE has been consumed
100    pub fn parse_create_table_body(&mut self) -> Result<QueryExpr, ParseError> {
101        let if_not_exists = self.match_if_not_exists()?;
102        let name = self.expect_ident()?;
103
104        self.expect(Token::LParen)?;
105        let mut columns = Vec::new();
106        loop {
107            let col = self.parse_column_def()?;
108            columns.push(col);
109            if !self.consume(&Token::Comma)? {
110                break;
111            }
112        }
113        self.expect(Token::RParen)?;
114
115        let mut default_ttl_ms = None;
116        let mut context_index_fields = Vec::new();
117        let mut context_index_enabled = false;
118        let mut timestamps = false;
119        let mut tenant_by: Option<String> = None;
120        let mut append_only = false;
121        let mut subscriptions = Vec::new();
122        let mut ai_policy = reddb_types::catalog::AiPolicy::default();
123
124        while self.consume(&Token::With)? {
125            if self.consume_ident_ci("EVENTS")? {
126                subscriptions.push(self.parse_subscription_descriptor(name.clone())?);
127                continue;
128            }
129            // Accept both spellings:
130            //   WITH key = value
131            //   WITH (key = value, key = value)
132            // Postgres / ClickHouse use the parenthesised form; the
133            // bare form is our legacy shorthand. The parenthesised
134            // form collects options separated by commas until `)`.
135            let has_parens = self.consume(&Token::LParen)?;
136
137            loop {
138                if self.consume_ident_ci("CONTEXT_INDEX")? {
139                    context_index_enabled = self.parse_bool_assign()?;
140                } else if self.consume_ident_ci("CONTEXT")? {
141                    if !self.consume(&Token::Index)? {
142                        return Err(ParseError::expected(
143                            vec!["INDEX"],
144                            self.peek(),
145                            self.position(),
146                        ));
147                    }
148                    self.expect(Token::On)?;
149                    self.expect(Token::LParen)?;
150                    loop {
151                        context_index_fields.push(self.expect_ident()?);
152                        if !self.consume(&Token::Comma)? {
153                            break;
154                        }
155                    }
156                    self.expect(Token::RParen)?;
157                    context_index_enabled = true;
158                } else if self.consume_ident_ci("TIMESTAMPS")? {
159                    timestamps = self.parse_bool_assign()?;
160                } else if self.consume_ident_ci("EMBED")? {
161                    if ai_policy.embed.is_some() {
162                        return Err(ParseError::new(
163                            "duplicate EMBED clause in AI policy".to_string(),
164                            self.position(),
165                        ));
166                    }
167                    ai_policy.embed = Some(self.parse_ai_embed_policy()?);
168                } else if self.consume_ident_ci("MODERATE")? {
169                    if ai_policy.moderate.is_some() {
170                        return Err(ParseError::new(
171                            "duplicate MODERATE clause in AI policy".to_string(),
172                            self.position(),
173                        ));
174                    }
175                    ai_policy.moderate = Some(self.parse_ai_moderate_policy()?);
176                } else if self.consume_ident_ci("VISION")? {
177                    if ai_policy.vision.is_some() {
178                        return Err(ParseError::new(
179                            "duplicate VISION clause in AI policy".to_string(),
180                            self.position(),
181                        ));
182                    }
183                    ai_policy.vision = Some(self.parse_ai_vision_policy()?);
184                } else if self.consume_ident_ci("APPEND_ONLY")? {
185                    append_only = self.parse_bool_assign()?;
186                } else if self.consume_ident_ci("TENANT_BY")? {
187                    // `WITH (tenant_by = 'col')` form — accepts `=` optional
188                    // and expects a string literal column name.
189                    let _ = self.consume(&Token::Eq)?;
190                    let value = self.parse_literal_value()?;
191                    match value {
192                        Value::Text(col) => tenant_by = Some(col.to_string()),
193                        other => {
194                            return Err(ParseError::new(
195                                format!("WITH tenant_by expects a text literal, got {other:?}"),
196                                self.position(),
197                            ));
198                        }
199                    }
200                } else {
201                    default_ttl_ms = self.parse_create_table_ttl_clause()?;
202                }
203                if has_parens {
204                    if self.consume(&Token::Comma)? {
205                        continue;
206                    }
207                    self.expect(Token::RParen)?;
208                }
209                break;
210            }
211        }
212
213        // Optional `PARTITION BY RANGE|LIST|HASH (col)` clause (Phase 2.2).
214        let partition_by = if self.consume(&Token::Partition)? {
215            self.expect(Token::By)?;
216            let kind = if self.consume(&Token::Range)? {
217                PartitionKind::Range
218            } else if self.consume(&Token::List)? {
219                PartitionKind::List
220            } else if self.consume(&Token::Hash)? {
221                PartitionKind::Hash
222            } else {
223                return Err(ParseError::expected(
224                    vec!["RANGE", "LIST", "HASH"],
225                    self.peek(),
226                    self.position(),
227                ));
228            };
229            self.expect(Token::LParen)?;
230            let column = self.expect_ident()?;
231            self.expect(Token::RParen)?;
232            Some(PartitionSpec { kind, column })
233        } else {
234            None
235        };
236
237        // Shorthand: trailing `APPEND ONLY` keyword pair (PG / ClickHouse
238        // style). Accepted after partition spec / tenant spec / or on
239        // its own. `WITH (append_only = true)` is the other form and
240        // handled above.
241        if !append_only && self.consume_ident_ci("APPEND")? {
242            if !self.consume_ident_ci("ONLY")? {
243                return Err(ParseError::expected(
244                    vec!["ONLY"],
245                    self.peek(),
246                    self.position(),
247                ));
248            }
249            append_only = true;
250        }
251
252        // Shorthand: `TENANT BY (col)` or `TENANT BY (root.sub.path)`
253        // trailing clause (after partition spec if both are used).
254        //
255        // Dotted paths let non-table models declare tenancy over their
256        // natural nested structures — `metadata.tenant` for vectors,
257        // `payload.tenant` for queue messages, `tags.cluster` for
258        // timeseries, `properties.org` for graphs. The read-path
259        // resolver already navigates these paths via
260        // `resolve_runtime_document_path`; here we just store the
261        // dotted string and let the policy evaluator do the rest.
262        if tenant_by.is_none() && self.consume_ident_ci("TENANT")? {
263            self.expect(Token::By)?;
264            self.expect(Token::LParen)?;
265            // Allow keyword-idents (`metadata`, `type`, `data`) as
266            // column names — SQL treats them as bare identifiers in
267            // this context.
268            let mut path = self.expect_ident_or_keyword()?;
269            while self.consume(&Token::Dot)? {
270                let next = self.expect_ident_or_keyword()?;
271                path = format!("{path}.{next}");
272            }
273            self.expect(Token::RParen)?;
274            tenant_by = Some(path);
275        }
276
277        Ok(QueryExpr::CreateTable(CreateTableQuery {
278            collection_model: CollectionModel::Table,
279            name,
280            columns,
281            if_not_exists,
282            default_ttl_ms,
283            metrics_rollup_policies: Vec::new(),
284            context_index_fields,
285            context_index_enabled,
286            timestamps,
287            partition_by,
288            tenant_by,
289            append_only,
290            subscriptions,
291            analytics_config: Vec::new(),
292            vault_own_master_key: false,
293            ai_policy: (!ai_policy.is_empty()).then_some(ai_policy),
294        }))
295    }
296
297    /// Parse: EXPLAIN ALTER FOR CREATE TABLE name (...) [FORMAT JSON|SQL]
298    ///
299    /// Pure read: does not execute DDL. Returns a schema-diff rendering of the
300    /// difference between the table's current contract and the target CREATE
301    /// TABLE body.
302    pub fn parse_explain_alter_query(&mut self) -> Result<QueryExpr, ParseError> {
303        self.expect(Token::Explain)?;
304        self.expect(Token::Alter)?;
305        self.expect(Token::For)?;
306        self.expect(Token::Create)?;
307        self.expect(Token::Table)?;
308
309        let body = self.parse_create_table_body()?;
310        let target = match body {
311            QueryExpr::CreateTable(t) => t,
312            _ => {
313                return Err(ParseError::new(
314                    "EXPLAIN ALTER FOR CREATE TABLE body must be a CREATE TABLE statement"
315                        .to_string(),
316                    self.position(),
317                ));
318            }
319        };
320
321        let format = if self.consume(&Token::Format)? {
322            if self.consume(&Token::Json)? {
323                ExplainFormat::Json
324            } else if self.consume_ident_ci("SQL")? {
325                ExplainFormat::Sql
326            } else {
327                return Err(ParseError::expected(
328                    vec!["JSON", "SQL"],
329                    self.peek(),
330                    self.position(),
331                ));
332            }
333        } else {
334            ExplainFormat::Sql
335        };
336
337        Ok(QueryExpr::ExplainAlter(ExplainAlterQuery {
338            target,
339            format,
340        }))
341    }
342
343    /// Parse the body of DROP TABLE after DROP TABLE has been consumed
344    pub fn parse_drop_table_body(&mut self) -> Result<QueryExpr, ParseError> {
345        let if_exists = self.match_if_exists()?;
346        let name = self.parse_drop_collection_name()?;
347        Ok(QueryExpr::DropTable(DropTableQuery { name, if_exists }))
348    }
349
350    pub fn parse_drop_graph_body(&mut self) -> Result<QueryExpr, ParseError> {
351        let if_exists = self.match_if_exists()?;
352        let name = self.parse_drop_collection_name()?;
353        Ok(QueryExpr::DropGraph(DropGraphQuery { name, if_exists }))
354    }
355
356    pub fn parse_drop_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
357        let if_exists = self.match_if_exists()?;
358        let name = self.parse_drop_collection_name()?;
359        Ok(QueryExpr::DropVector(DropVectorQuery { name, if_exists }))
360    }
361
362    pub fn parse_drop_document_body(&mut self) -> Result<QueryExpr, ParseError> {
363        let if_exists = self.match_if_exists()?;
364        let name = self.parse_drop_collection_name()?;
365        Ok(QueryExpr::DropDocument(DropDocumentQuery {
366            name,
367            if_exists,
368        }))
369    }
370
371    pub fn parse_create_keyed_body(
372        &mut self,
373        model: CollectionModel,
374    ) -> Result<QueryExpr, ParseError> {
375        let if_not_exists = self.match_if_not_exists()?;
376        let name = self.parse_drop_collection_name()?;
377        let vault_own_master_key =
378            if model == CollectionModel::Vault && self.consume(&Token::With)? {
379                if !self.consume_ident_ci("OWN")? {
380                    return Err(ParseError::expected(
381                        vec!["OWN"],
382                        self.peek(),
383                        self.position(),
384                    ));
385                }
386                if !self.consume_ident_ci("MASTER")? {
387                    return Err(ParseError::expected(
388                        vec!["MASTER"],
389                        self.peek(),
390                        self.position(),
391                    ));
392                }
393                if !self.consume(&Token::Key)? && !self.consume_ident_ci("KEY")? {
394                    return Err(ParseError::expected(
395                        vec!["KEY"],
396                        self.peek(),
397                        self.position(),
398                    ));
399                }
400                true
401            } else {
402                false
403            };
404        // `CREATE GRAPH <name> WITH ANALYTICS (...)` — the analytics opt-in
405        // is graph-only (issue #800). Other keyed models reject the clause so
406        // a misplaced `WITH ANALYTICS` fails loudly instead of being ignored.
407        let analytics_config = if model == CollectionModel::Graph && self.consume(&Token::With)? {
408            if !self.consume_ident_ci("ANALYTICS")? {
409                return Err(ParseError::expected(
410                    vec!["ANALYTICS"],
411                    self.peek(),
412                    self.position(),
413                ));
414            }
415            self.parse_analytics_clause()?
416        } else {
417            Vec::new()
418        };
419        Ok(QueryExpr::CreateTable(CreateTableQuery {
420            collection_model: model,
421            name,
422            columns: Vec::new(),
423            if_not_exists,
424            default_ttl_ms: None,
425            metrics_rollup_policies: Vec::new(),
426            context_index_fields: Vec::new(),
427            context_index_enabled: false,
428            timestamps: false,
429            partition_by: None,
430            tenant_by: None,
431            append_only: false,
432            subscriptions: Vec::new(),
433            analytics_config,
434            vault_own_master_key,
435            ai_policy: None,
436        }))
437    }
438
439    /// Parse the `( <output> [ ( <key> = <value> [, ...] ) ] [, ...] )` body of
440    /// a `WITH ANALYTICS` clause (issue #800). Recognised outputs are
441    /// `communities`, `components`, `centrality`; recognised options are
442    /// `using`, `resolution`, `max_iterations`, `tolerance`. Unknown output
443    /// names and option keys are rejected with a clear, structured error.
444    fn parse_analytics_clause(
445        &mut self,
446    ) -> Result<Vec<reddb_types::catalog::AnalyticsViewDescriptor>, ParseError> {
447        use reddb_types::catalog::{AnalyticsOutput, AnalyticsViewDescriptor};
448
449        self.expect(Token::LParen)?;
450        let mut views: Vec<AnalyticsViewDescriptor> = Vec::new();
451        loop {
452            let output_name = self.parse_analytics_output_name()?;
453            let output = AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
454                ParseError::new(
455                    format!(
456                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
457                    ),
458                    self.position(),
459                )
460            })?;
461            if views.iter().any(|view| view.output == output) {
462                return Err(ParseError::new(
463                    format!("duplicate analytics output '{output_name}'"),
464                    self.position(),
465                ));
466            }
467            let mut view = AnalyticsViewDescriptor {
468                output,
469                algorithm: None,
470                resolution: None,
471                max_iterations: None,
472                tolerance: None,
473            };
474            if self.consume(&Token::LParen)? {
475                loop {
476                    let key = self.parse_analytics_option_key()?;
477                    self.expect(Token::Eq)?;
478                    match key.as_str() {
479                        "using" => {
480                            view.algorithm =
481                                Some(self.expect_ident_or_keyword()?.to_ascii_lowercase());
482                        }
483                        "resolution" => view.resolution = Some(self.parse_float()?),
484                        "max_iterations" => view.max_iterations = Some(self.parse_integer()?),
485                        "tolerance" => view.tolerance = Some(self.parse_float()?),
486                        other => {
487                            return Err(ParseError::new(
488                                format!(
489                                    "unknown analytics option '{other}': expected using, resolution, max_iterations, or tolerance"
490                                ),
491                                self.position(),
492                            ))
493                        }
494                    }
495                    if !self.consume(&Token::Comma)? {
496                        break;
497                    }
498                }
499                self.expect(Token::RParen)?;
500            }
501            views.push(view);
502            if !self.consume(&Token::Comma)? {
503                break;
504            }
505        }
506        self.expect(Token::RParen)?;
507        if views.is_empty() {
508            return Err(ParseError::new(
509                "WITH ANALYTICS requires at least one output".to_string(),
510                self.position(),
511            ));
512        }
513        Ok(views)
514    }
515
516    /// Read one analytics output name, normalising the keyword-lexed outputs
517    /// (`components`, `centrality`) back to their lowercase spelling so they
518    /// compare uniformly with the ident-lexed `communities`.
519    fn parse_analytics_output_name(&mut self) -> Result<String, ParseError> {
520        match self.peek() {
521            Token::Components => {
522                self.advance()?;
523                Ok("components".to_string())
524            }
525            Token::Centrality => {
526                self.advance()?;
527                Ok("centrality".to_string())
528            }
529            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
530        }
531    }
532
533    /// Read one analytics option key, normalising the keyword-lexed keys
534    /// (`using`, `max_iterations`) back to their lowercase spelling.
535    fn parse_analytics_option_key(&mut self) -> Result<String, ParseError> {
536        match self.peek() {
537            Token::Using => {
538                self.advance()?;
539                Ok("using".to_string())
540            }
541            Token::MaxIterations => {
542                self.advance()?;
543                Ok("max_iterations".to_string())
544            }
545            _ => Ok(self.expect_ident()?.to_ascii_lowercase()),
546        }
547    }
548
549    pub fn parse_create_collection_model_body(
550        &mut self,
551        model: CollectionModel,
552    ) -> Result<QueryExpr, ParseError> {
553        self.parse_create_keyed_body(model)
554    }
555
556    pub fn parse_create_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
557        let if_not_exists = self.match_if_not_exists()?;
558        let name = self.parse_drop_collection_name()?;
559        if !self.consume_ident_ci("KIND")? {
560            return Err(ParseError::expected(
561                vec!["KIND"],
562                self.peek(),
563                self.position(),
564            ));
565        }
566        let mut kind = self.expect_ident_or_keyword()?.to_ascii_lowercase();
567        while self.consume(&Token::Dot)? {
568            let part = self.expect_ident_or_keyword()?.to_ascii_lowercase();
569            kind.push('.');
570            kind.push_str(&part);
571        }
572        let (vector_dimension, vector_metric) = if kind == "vector.turbo" {
573            if !self.consume_ident_ci("DIM")? {
574                return Err(ParseError::expected(
575                    vec!["DIM"],
576                    self.peek(),
577                    self.position(),
578                ));
579            }
580            let dimension = self.parse_integer()?;
581            if dimension <= 0 {
582                return Err(ParseError::new(
583                    "VECTOR DIM must be a positive integer".to_string(),
584                    self.position(),
585                ));
586            }
587            let metric = if self.consume(&Token::Metric)? {
588                self.parse_distance_metric()?
589            } else {
590                reddb_types::distance::DistanceMetric::Cosine
591            };
592            (Some(dimension as usize), Some(metric))
593        } else {
594            (None, None)
595        };
596        let allowed_signers = if self.consume_ident_ci("SIGNED_BY")? {
597            self.parse_signed_by_list()?
598        } else {
599            Vec::new()
600        };
601        Ok(QueryExpr::CreateCollection(CreateCollectionQuery {
602            name,
603            kind,
604            if_not_exists,
605            vector_dimension,
606            vector_metric,
607            allowed_signers,
608        }))
609    }
610
611    /// Parse a single `'hex32'` string literal as a 32-byte Ed25519
612    /// pubkey. Used by `ALTER COLLECTION ... ADD|REVOKE SIGNER 'hex'`
613    /// (issue #522).
614    fn parse_single_signer_hex(&mut self) -> Result<[u8; 32], ParseError> {
615        let hex = match self.peek().clone() {
616            Token::String(s) => {
617                self.advance()?;
618                s
619            }
620            _ => {
621                return Err(ParseError::expected(
622                    vec!["string literal (ed25519 pubkey hex)"],
623                    self.peek(),
624                    self.position(),
625                ));
626            }
627        };
628        decode_hex_32(&hex).map_err(|msg| {
629            ParseError::new(
630                format!("SIGNER pubkey '{hex}' invalid: {msg}"),
631                self.position(),
632            )
633        })
634    }
635
636    /// Parse `( 'hex32', 'hex32', ... )` — Ed25519 pubkey list. Each entry
637    /// must decode to exactly 32 bytes. Used by both `CREATE COLLECTION ...
638    /// SIGNED_BY (...)` and (in a later iteration) `ALTER COLLECTION` signer
639    /// mutations. Issue #520.
640    fn parse_signed_by_list(&mut self) -> Result<Vec<[u8; 32]>, ParseError> {
641        self.expect(Token::LParen)?;
642        let mut out = Vec::new();
643        loop {
644            let hex = match self.peek().clone() {
645                Token::String(s) => {
646                    self.advance()?;
647                    s
648                }
649                _ => {
650                    return Err(ParseError::expected(
651                        vec!["string literal (ed25519 pubkey hex)"],
652                        self.peek(),
653                        self.position(),
654                    ));
655                }
656            };
657            let bytes = decode_hex_32(&hex).map_err(|msg| {
658                ParseError::new(
659                    format!("SIGNED_BY pubkey '{hex}' invalid: {msg}"),
660                    self.position(),
661                )
662            })?;
663            out.push(bytes);
664            if !self.consume(&Token::Comma)? {
665                break;
666            }
667        }
668        self.expect(Token::RParen)?;
669        if out.is_empty() {
670            return Err(ParseError::new(
671                "SIGNED_BY list must contain at least one pubkey".to_string(),
672                self.position(),
673            ));
674        }
675        Ok(out)
676    }
677
678    pub fn parse_create_vector_body(&mut self) -> Result<QueryExpr, ParseError> {
679        let if_not_exists = self.match_if_not_exists()?;
680        let name = self.parse_drop_collection_name()?;
681        if !self.consume_ident_ci("DIM")? {
682            return Err(ParseError::expected(
683                vec!["DIM"],
684                self.peek(),
685                self.position(),
686            ));
687        }
688        let dimension = self.parse_integer()?;
689        if dimension <= 0 {
690            return Err(ParseError::new(
691                "VECTOR DIM must be a positive integer".to_string(),
692                self.position(),
693            ));
694        }
695        let metric = if self.consume(&Token::Metric)? {
696            self.parse_distance_metric()?
697        } else {
698            reddb_types::distance::DistanceMetric::Cosine
699        };
700        Ok(QueryExpr::CreateVector(CreateVectorQuery {
701            name,
702            dimension: dimension as usize,
703            metric,
704            if_not_exists,
705        }))
706    }
707
708    pub fn parse_drop_keyed_body(
709        &mut self,
710        model: CollectionModel,
711    ) -> Result<QueryExpr, ParseError> {
712        let if_exists = self.match_if_exists()?;
713        let name = self.parse_drop_collection_name()?;
714        Ok(QueryExpr::DropKv(DropKvQuery {
715            name,
716            if_exists,
717            model,
718        }))
719    }
720
721    pub fn parse_drop_kv_body(&mut self) -> Result<QueryExpr, ParseError> {
722        self.parse_drop_keyed_body(CollectionModel::Kv)
723    }
724
725    pub fn parse_drop_collection_body(&mut self) -> Result<QueryExpr, ParseError> {
726        self.parse_drop_collection_model_body(None)
727    }
728
729    pub fn parse_drop_collection_model_body(
730        &mut self,
731        model: Option<CollectionModel>,
732    ) -> Result<QueryExpr, ParseError> {
733        let if_exists = self.match_if_exists()?;
734        let name = self.parse_drop_collection_name()?;
735        Ok(QueryExpr::DropCollection(DropCollectionQuery {
736            name,
737            if_exists,
738            model,
739        }))
740    }
741
742    pub fn parse_truncate_body(
743        &mut self,
744        model: Option<CollectionModel>,
745    ) -> Result<QueryExpr, ParseError> {
746        let if_exists = self.match_if_exists()?;
747        let name = self.parse_drop_collection_name()?;
748        Ok(QueryExpr::Truncate(TruncateQuery {
749            name,
750            model,
751            if_exists,
752        }))
753    }
754
755    pub(crate) fn parse_drop_collection_name(&mut self) -> Result<String, ParseError> {
756        let mut name = self.expect_ident()?;
757        while self.consume(&Token::Dot)? {
758            if self.consume(&Token::Star)? {
759                name.push_str(".*");
760                break;
761            }
762            let next = self.expect_ident_or_keyword()?;
763            name = format!("{name}.{next}");
764        }
765        Ok(name)
766    }
767
768    /// Parse: ALTER TABLE name ADD/DROP/RENAME COLUMN ...
769    ///
770    /// Also accepts `ALTER COLLECTION name ADD|REVOKE SIGNER 'hex'`
771    /// (issue #522) — collection-level signer registry mutations share
772    /// the AlterTable AST so the existing executor dispatch path picks
773    /// them up without a new top-level variant.
774    pub fn parse_alter_table_query(&mut self) -> Result<QueryExpr, ParseError> {
775        self.expect(Token::Alter)?;
776        if !self.consume(&Token::Table)?
777            && !self.consume(&Token::Collection)?
778            && !self.consume_ident_ci("COLLECTION")?
779        {
780            return Err(ParseError::expected(
781                vec!["TABLE", "COLLECTION"],
782                self.peek(),
783                self.position(),
784            ));
785        }
786        let name = self.expect_ident()?;
787
788        let mut operations = Vec::new();
789        loop {
790            let op = self.parse_alter_operation(&name)?;
791            operations.push(op);
792            if !self.consume(&Token::Comma)? {
793                break;
794            }
795        }
796
797        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
798    }
799
800    /// Parse: `ALTER GRAPH <name> ADD ANALYTICS ( <output> [, ...] )`
801    /// and `ALTER GRAPH <name> DROP ANALYTICS <output>` (issue #801).
802    ///
803    /// Lifecycle management of the `WITH ANALYTICS` configuration declared at
804    /// `CREATE GRAPH` time (#800), without recreating the collection. Shares
805    /// the `AlterTable` AST so the existing executor dispatch path picks the
806    /// mutations up; the executor validates the target is a graph and that the
807    /// dropped output is actually enabled.
808    pub fn parse_alter_graph_query(&mut self) -> Result<QueryExpr, ParseError> {
809        self.expect(Token::Alter)?;
810        self.expect(Token::Graph)?;
811        let name = self.expect_ident()?;
812
813        let mut operations = Vec::new();
814        loop {
815            operations.push(self.parse_alter_graph_operation()?);
816            if !self.consume(&Token::Comma)? {
817                break;
818            }
819        }
820
821        Ok(QueryExpr::AlterTable(AlterTableQuery { name, operations }))
822    }
823
824    /// Parse a single `ALTER GRAPH` analytics operation: either
825    /// `ADD ANALYTICS ( ... )` or `DROP ANALYTICS <output>`.
826    fn parse_alter_graph_operation(&mut self) -> Result<AlterOperation, ParseError> {
827        if self.consume(&Token::Add)? {
828            if !self.consume_ident_ci("ANALYTICS")? {
829                return Err(ParseError::expected(
830                    vec!["ANALYTICS"],
831                    self.peek(),
832                    self.position(),
833                ));
834            }
835            // Reuse the `WITH ANALYTICS (...)` body grammar verbatim so the
836            // ADD form accepts the exact same outputs and options as CREATE.
837            let views = self.parse_analytics_clause()?;
838            Ok(AlterOperation::AddAnalytics(views))
839        } else if self.consume(&Token::Drop)? {
840            if !self.consume_ident_ci("ANALYTICS")? {
841                return Err(ParseError::expected(
842                    vec!["ANALYTICS"],
843                    self.peek(),
844                    self.position(),
845                ));
846            }
847            let output_name = self.parse_analytics_output_name()?;
848            let output = reddb_types::catalog::AnalyticsOutput::from_str(&output_name).ok_or_else(|| {
849                ParseError::new(
850                    format!(
851                        "unknown analytics output '{output_name}': expected communities, components, or centrality"
852                    ),
853                    self.position(),
854                )
855            })?;
856            Ok(AlterOperation::DropAnalytics(output))
857        } else {
858            Err(ParseError::expected(
859                vec!["ADD", "DROP"],
860                self.peek(),
861                self.position(),
862            ))
863        }
864    }
865
866    /// Parse a single ALTER TABLE operation
867    fn parse_alter_operation(&mut self, table_name: &str) -> Result<AlterOperation, ParseError> {
868        if self.consume(&Token::Add)? {
869            if self.consume_ident_ci("SUBSCRIPTION")? {
870                // ADD SUBSCRIPTION name TO queue [REDACT (...)] [WHERE ...]
871                let sub_name = self.expect_ident()?;
872                let descriptor = self.parse_subscription_descriptor(table_name.to_string())?;
873                Ok(AlterOperation::AddSubscription {
874                    name: sub_name,
875                    descriptor,
876                })
877            } else if self.consume_ident_ci("SIGNER")? {
878                // ADD SIGNER 'hex_pubkey' — issue #522.
879                let pubkey = self.parse_single_signer_hex()?;
880                Ok(AlterOperation::AddSigner { pubkey })
881            } else {
882                // ADD COLUMN definition (COLUMN keyword is optional)
883                let _ = self.consume(&Token::Column)?;
884                let col_def = self.parse_column_def()?;
885                Ok(AlterOperation::AddColumn(col_def))
886            }
887        } else if self.consume_ident_ci("REVOKE")? {
888            // REVOKE SIGNER 'hex_pubkey' — issue #522.
889            if !self.consume_ident_ci("SIGNER")? {
890                return Err(ParseError::expected(
891                    vec!["SIGNER"],
892                    self.peek(),
893                    self.position(),
894                ));
895            }
896            let pubkey = self.parse_single_signer_hex()?;
897            Ok(AlterOperation::RevokeSigner { pubkey })
898        } else if self.consume(&Token::Drop)? {
899            if self.consume_ident_ci("SUBSCRIPTION")? {
900                // DROP SUBSCRIPTION name
901                let sub_name = self.expect_ident()?;
902                Ok(AlterOperation::DropSubscription { name: sub_name })
903            } else {
904                // DROP COLUMN name (COLUMN keyword is optional)
905                let _ = self.consume(&Token::Column)?;
906                let col_name = self.expect_ident()?;
907                Ok(AlterOperation::DropColumn(col_name))
908            }
909        } else if self.consume(&Token::Rename)? {
910            // RENAME COLUMN from TO to
911            let _ = self.consume(&Token::Column)?; // COLUMN keyword is optional
912            let from = self.expect_ident()?;
913            self.expect(Token::To)?;
914            let to = self.expect_ident()?;
915            Ok(AlterOperation::RenameColumn { from, to })
916        } else if self.consume(&Token::Attach)? {
917            // ATTACH PARTITION child FOR VALUES ...
918            self.expect(Token::Partition)?;
919            let child = self.expect_ident()?;
920            self.expect(Token::For)?;
921            // Accept `VALUES` as an ident since the grammar doesn't have it
922            // as a reserved keyword everywhere. Collect the remaining tokens
923            // as a raw bound string for round-trip persistence.
924            if !self.consume_ident_ci("VALUES")? && !self.consume(&Token::Values)? {
925                return Err(ParseError::expected(
926                    vec!["VALUES"],
927                    self.peek(),
928                    self.position(),
929                ));
930            }
931            let bound = self.collect_remaining_tokens_as_string()?;
932            Ok(AlterOperation::AttachPartition { child, bound })
933        } else if self.consume(&Token::Detach)? {
934            // DETACH PARTITION child
935            self.expect(Token::Partition)?;
936            let child = self.expect_ident()?;
937            Ok(AlterOperation::DetachPartition { child })
938        } else if self.consume(&Token::Enable)? {
939            // ENABLE EVENTS | ENABLE ROW LEVEL SECURITY | ENABLE TENANCY ON (col)
940            if self.consume_ident_ci("EVENTS")? {
941                Ok(AlterOperation::EnableEvents(
942                    self.parse_subscription_descriptor(table_name.to_string())?,
943                ))
944            } else if self.consume_ident_ci("TENANCY")? {
945                self.expect(Token::On)?;
946                self.expect(Token::LParen)?;
947                // Dotted paths allowed (`metadata.tenant`, `payload.org`).
948                let mut path = self.expect_ident_or_keyword()?;
949                while self.consume(&Token::Dot)? {
950                    let next = self.expect_ident_or_keyword()?;
951                    path = format!("{path}.{next}");
952                }
953                self.expect(Token::RParen)?;
954                Ok(AlterOperation::EnableTenancy { column: path })
955            } else {
956                self.expect(Token::Row)?;
957                self.expect(Token::Level)?;
958                self.expect(Token::Security)?;
959                Ok(AlterOperation::EnableRowLevelSecurity)
960            }
961        } else if self.consume(&Token::Disable)? {
962            // DISABLE EVENTS | DISABLE ROW LEVEL SECURITY | DISABLE TENANCY
963            if self.consume_ident_ci("EVENTS")? {
964                Ok(AlterOperation::DisableEvents)
965            } else if self.consume_ident_ci("TENANCY")? {
966                Ok(AlterOperation::DisableTenancy)
967            } else {
968                self.expect(Token::Row)?;
969                self.expect(Token::Level)?;
970                self.expect(Token::Security)?;
971                Ok(AlterOperation::DisableRowLevelSecurity)
972            }
973        } else if self.consume(&Token::Set)? || self.consume_ident_ci("SET")? {
974            // SET APPEND_ONLY = true|false | SET VERSIONED = true|false
975            // SET RETENTION <duration> (issue #580)
976            if self.consume_ident_ci("APPEND_ONLY")? {
977                let on = self.parse_bool_assign()?;
978                Ok(AlterOperation::SetAppendOnly(on))
979            } else if self.consume_ident_ci("VERSIONED")? {
980                let on = self.parse_bool_assign()?;
981                Ok(AlterOperation::SetVersioned(on))
982            } else if self.consume(&Token::Retention)? {
983                // `SET RETENTION <duration>` — reuse the same float+unit
984                // grammar the timeseries CREATE clause uses so `7 DAYS`,
985                // `30 m`, `1 h`, `90 d` all parse identically.
986                let value = self.parse_float()?;
987                let unit = self.parse_duration_unit()?;
988                Ok(AlterOperation::SetRetention {
989                    duration_ms: (value * unit) as u64,
990                })
991            } else {
992                Err(ParseError::expected(
993                    vec!["APPEND_ONLY", "VERSIONED", "RETENTION"],
994                    self.peek(),
995                    self.position(),
996                ))
997            }
998        } else if self.consume_ident_ci("UNSET")? {
999            // `UNSET RETENTION` — clears the declarative retention policy.
1000            if self.consume(&Token::Retention)? {
1001                Ok(AlterOperation::UnsetRetention)
1002            } else {
1003                Err(ParseError::expected(
1004                    vec!["RETENTION"],
1005                    self.peek(),
1006                    self.position(),
1007                ))
1008            }
1009        } else {
1010            Err(ParseError::expected(
1011                vec![
1012                    "ADD", "DROP", "RENAME", "ATTACH", "DETACH", "ENABLE", "DISABLE", "SET",
1013                    "UNSET",
1014                ],
1015                self.peek(),
1016                self.position(),
1017            ))
1018        }
1019    }
1020
1021    fn parse_subscription_descriptor(
1022        &mut self,
1023        source: String,
1024    ) -> Result<SubscriptionDescriptor, ParseError> {
1025        let mut ops_filter = Vec::new();
1026        if self.consume(&Token::LParen)? {
1027            loop {
1028                let op = if self.consume(&Token::Insert)? {
1029                    SubscriptionOperation::Insert
1030                } else if self.consume(&Token::Update)? {
1031                    SubscriptionOperation::Update
1032                } else if self.consume(&Token::Delete)? {
1033                    SubscriptionOperation::Delete
1034                } else {
1035                    return Err(ParseError::expected(
1036                        vec!["INSERT", "UPDATE", "DELETE"],
1037                        self.peek(),
1038                        self.position(),
1039                    ));
1040                };
1041                ops_filter.push(op);
1042                if !self.consume(&Token::Comma)? {
1043                    break;
1044                }
1045            }
1046            self.expect(Token::RParen)?;
1047        }
1048
1049        let target_queue = if self.consume(&Token::To)? {
1050            self.expect_ident()?
1051        } else {
1052            format!("{source}_events")
1053        };
1054
1055        let mut redact_fields = Vec::new();
1056        if self.consume_ident_ci("REDACT")? {
1057            self.expect(Token::LParen)?;
1058            loop {
1059                redact_fields.push(self.parse_dotted_redact_path()?);
1060                if !self.consume(&Token::Comma)? {
1061                    break;
1062                }
1063            }
1064            self.expect(Token::RParen)?;
1065        }
1066
1067        let where_filter = if self.consume(&Token::Where)? {
1068            Some(self.collect_subscription_where_filter()?)
1069        } else {
1070            None
1071        };
1072
1073        // ON ALL TENANTS: opt-in cluster-wide subscription (requires capability check at execution)
1074        let all_tenants = if self.consume(&Token::On)? {
1075            self.expect(Token::All)?;
1076            if !self.consume_ident_ci("TENANTS")? {
1077                return Err(ParseError::expected(
1078                    vec!["TENANTS"],
1079                    self.peek(),
1080                    self.position(),
1081                ));
1082            }
1083            true
1084        } else {
1085            false
1086        };
1087
1088        // REQUIRES CAPABILITY '...' — parsed and discarded; enforcement is at execution time
1089        if self.consume_ident_ci("REQUIRES")? {
1090            self.consume_ident_ci("CAPABILITY")?;
1091            // consume the capability string literal token
1092            self.advance()?;
1093        }
1094
1095        Ok(SubscriptionDescriptor {
1096            name: String::new(),
1097            source,
1098            target_queue,
1099            ops_filter,
1100            where_filter,
1101            redact_fields,
1102            enabled: true,
1103            all_tenants,
1104        })
1105    }
1106
1107    /// Parse a dotted redact path: `field`, `obj.field`, `obj.*.field`, etc.
1108    fn parse_dotted_redact_path(&mut self) -> Result<String, ParseError> {
1109        let mut parts = Vec::new();
1110        if self.consume(&Token::Star)? {
1111            parts.push("*".to_string());
1112        } else {
1113            parts.push(self.expect_ident_or_keyword()?);
1114        }
1115        while self.consume(&Token::Dot)? {
1116            if self.consume(&Token::Star)? {
1117                parts.push("*".to_string());
1118            } else {
1119                parts.push(self.expect_ident_or_keyword()?);
1120            }
1121        }
1122        Ok(parts.join("."))
1123    }
1124
1125    fn collect_subscription_where_filter(&mut self) -> Result<String, ParseError> {
1126        let mut parts = Vec::new();
1127        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1128            parts.push(self.peek().to_string());
1129            self.advance()?;
1130        }
1131        if parts.is_empty() {
1132            return Err(ParseError::expected(
1133                vec!["predicate"],
1134                self.peek(),
1135                self.position(),
1136            ));
1137        }
1138        Ok(parts.join(" "))
1139    }
1140
1141    /// Capture remaining tokens as a display-joined string.
1142    ///
1143    /// Used by `ATTACH PARTITION ... FOR VALUES <bound>` to round-trip the
1144    /// bound clause into storage without needing a dedicated per-kind AST.
1145    fn collect_remaining_tokens_as_string(&mut self) -> Result<String, ParseError> {
1146        let mut parts: Vec<String> = Vec::new();
1147        while !self.check(&Token::Eof) && !self.check(&Token::Comma) {
1148            parts.push(self.peek().to_string());
1149            self.advance()?;
1150        }
1151        Ok(parts.join(" "))
1152    }
1153
1154    /// Parse a single column definition: name TYPE [NOT NULL] [DEFAULT=val] [COMPRESS:N] [UNIQUE] [PRIMARY KEY]
1155    fn parse_column_def(&mut self) -> Result<CreateColumnDef, ParseError> {
1156        let name = self.expect_column_ident()?;
1157        let sql_type = self.parse_column_type()?;
1158        let data_type = sql_type.to_string();
1159
1160        let mut def = CreateColumnDef {
1161            name,
1162            data_type,
1163            sql_type: sql_type.clone(),
1164            not_null: false,
1165            default: None,
1166            compress: None,
1167            unique: false,
1168            primary_key: false,
1169            enum_variants: sql_type.enum_variants().unwrap_or_default(),
1170            array_element: sql_type.array_element_type(),
1171            decimal_precision: sql_type.decimal_precision(),
1172        };
1173
1174        // Parse modifiers in any order
1175        loop {
1176            if self.match_not_null()? {
1177                def.not_null = true;
1178            } else if self.consume(&Token::Default)? {
1179                self.expect(Token::Eq)?;
1180                def.default = Some(self.parse_literal_string_for_ddl()?);
1181            } else if self.consume(&Token::Compress)? {
1182                self.expect(Token::Colon)?;
1183                def.compress = Some(self.parse_integer()? as u8);
1184            } else if self.consume(&Token::Unique)? {
1185                def.unique = true;
1186            } else if self.match_primary_key()? {
1187                def.primary_key = true;
1188            } else {
1189                break;
1190            }
1191        }
1192
1193        Ok(def)
1194    }
1195
1196    /// Parse column type: TEXT, INTEGER, EMAIL, ENUM('a','b','c'), ARRAY(TEXT), DECIMAL(2)
1197    fn parse_column_type(&mut self) -> Result<SqlTypeName, ParseError> {
1198        let type_name = self.expect_ident_or_keyword()?;
1199        if self.consume(&Token::LParen)? {
1200            let inner = self.parse_type_params()?;
1201            self.expect(Token::RParen)?;
1202            Ok(SqlTypeName::new(type_name).with_modifiers(inner))
1203        } else {
1204            Ok(SqlTypeName::new(type_name))
1205        }
1206    }
1207
1208    /// Parse type parameters inside parentheses: 'a','b' or TEXT or 2
1209    fn parse_type_params(&mut self) -> Result<Vec<TypeModifier>, ParseError> {
1210        let mut parts = Vec::new();
1211        loop {
1212            match self.peek().clone() {
1213                Token::String(s) => {
1214                    let s = s.clone();
1215                    self.advance()?;
1216                    parts.push(TypeModifier::StringLiteral(s));
1217                }
1218                Token::Integer(n) => {
1219                    self.advance()?;
1220                    parts.push(TypeModifier::Number(n as u32));
1221                }
1222                _ => {
1223                    parts.push(TypeModifier::Type(Box::new(self.parse_column_type()?)));
1224                }
1225            }
1226            if !self.consume(&Token::Comma)? {
1227                break;
1228            }
1229        }
1230        Ok(parts)
1231    }
1232
1233    /// Parse a literal string value for DDL DEFAULT expressions
1234    fn parse_literal_string_for_ddl(&mut self) -> Result<String, ParseError> {
1235        match self.peek().clone() {
1236            Token::String(s) => {
1237                let s = s.clone();
1238                self.advance()?;
1239                Ok(s)
1240            }
1241            Token::Integer(n) => {
1242                self.advance()?;
1243                Ok(n.to_string())
1244            }
1245            Token::Float(n) => {
1246                self.advance()?;
1247                Ok(n.to_string())
1248            }
1249            Token::True => {
1250                self.advance()?;
1251                Ok("true".to_string())
1252            }
1253            Token::False => {
1254                self.advance()?;
1255                Ok("false".to_string())
1256            }
1257            Token::Null => {
1258                self.advance()?;
1259                Ok("null".to_string())
1260            }
1261            ref other => Err(ParseError::expected(
1262                vec!["string", "number", "true", "false", "null"],
1263                other,
1264                self.position(),
1265            )),
1266        }
1267    }
1268
1269    fn check_ttl_keyword(&self) -> bool {
1270        matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ttl"))
1271    }
1272
1273    /// Parse `= true` / `= false` after a `WITH <option>` keyword.
1274    /// Used for boolean table options like `WITH TIMESTAMPS = true`.
1275    fn parse_bool_assign(&mut self) -> Result<bool, ParseError> {
1276        self.expect(Token::Eq)?;
1277        match self.peek() {
1278            Token::True => {
1279                self.advance()?;
1280                Ok(true)
1281            }
1282            Token::False => {
1283                self.advance()?;
1284                Ok(false)
1285            }
1286            other => Err(ParseError::expected(
1287                vec!["true", "false"],
1288                other,
1289                self.position(),
1290            )),
1291        }
1292    }
1293
1294    /// Parse a parenthesised list of string literals: `('a', 'b', ...)`.
1295    /// Used for the `fields` / `outputs` AI-policy options. Requires at
1296    /// least one entry.
1297    fn parse_ai_string_list(&mut self) -> Result<Vec<String>, ParseError> {
1298        self.expect(Token::LParen)?;
1299        let mut out = Vec::new();
1300        loop {
1301            out.push(self.parse_string()?);
1302            if !self.consume(&Token::Comma)? {
1303                break;
1304            }
1305        }
1306        self.expect(Token::RParen)?;
1307        Ok(out)
1308    }
1309
1310    /// Parse a bare `true` / `false` token (after the `=` is consumed).
1311    fn parse_ai_bool(&mut self) -> Result<bool, ParseError> {
1312        match self.peek() {
1313            Token::True => {
1314                self.advance()?;
1315                Ok(true)
1316            }
1317            Token::False => {
1318                self.advance()?;
1319                Ok(false)
1320            }
1321            other => Err(ParseError::expected(
1322                vec!["true", "false"],
1323                other,
1324                self.position(),
1325            )),
1326        }
1327    }
1328
1329    /// Parse a single enum-ish word — either a quoted string or a bare
1330    /// identifier/keyword. Used for `degraded` / `on_reject` values.
1331    fn parse_ai_word(&mut self) -> Result<String, ParseError> {
1332        if matches!(self.peek(), Token::String(_)) {
1333            self.parse_string()
1334        } else {
1335            self.expect_ident_or_keyword()
1336        }
1337    }
1338
1339    /// `EMBED (fields = (...), provider = '..', model = '..')`.
1340    fn parse_ai_embed_policy(&mut self) -> Result<reddb_types::catalog::EmbedPolicy, ParseError> {
1341        self.expect(Token::LParen)?;
1342        let mut fields: Vec<String> = Vec::new();
1343        let mut provider: Option<String> = None;
1344        let mut model: Option<String> = None;
1345        loop {
1346            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1347            self.expect(Token::Eq)?;
1348            match key.as_str() {
1349                "fields" => fields = self.parse_ai_string_list()?,
1350                "provider" => provider = Some(self.parse_string()?),
1351                "model" => model = Some(self.parse_string()?),
1352                other => {
1353                    return Err(ParseError::new(
1354                        format!(
1355                            "unsupported EMBED policy option {other:?}; supported: fields, provider, model"
1356                        ),
1357                        self.position(),
1358                    ));
1359                }
1360            }
1361            if !self.consume(&Token::Comma)? {
1362                break;
1363            }
1364        }
1365        self.expect(Token::RParen)?;
1366        if fields.is_empty() {
1367            return Err(ParseError::new(
1368                "EMBED policy requires fields = ('<col>', ...)".to_string(),
1369                self.position(),
1370            ));
1371        }
1372        let provider = provider.ok_or_else(|| {
1373            ParseError::new(
1374                "EMBED policy requires provider = '<token>'".to_string(),
1375                self.position(),
1376            )
1377        })?;
1378        let model = model.ok_or_else(|| {
1379            ParseError::new(
1380                "EMBED policy requires model = '<name>'".to_string(),
1381                self.position(),
1382            )
1383        })?;
1384        Ok(reddb_types::catalog::EmbedPolicy {
1385            fields,
1386            provider,
1387            model,
1388        })
1389    }
1390
1391    /// `MODERATE (fields = (...), provider, model, sync, degraded, on_reject)`.
1392    fn parse_ai_moderate_policy(
1393        &mut self,
1394    ) -> Result<reddb_types::catalog::ModeratePolicy, ParseError> {
1395        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1396        self.expect(Token::LParen)?;
1397        let mut fields: Vec<String> = Vec::new();
1398        let mut provider: Option<String> = None;
1399        let mut model: Option<String> = None;
1400        let mut sync_gate = false;
1401        let mut degraded_mode = ModerateDegradedMode::default();
1402        let mut reject_action = ModerateRejectAction::default();
1403        let mut hard_delete_on_reject = false;
1404        loop {
1405            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1406            self.expect(Token::Eq)?;
1407            match key.as_str() {
1408                "fields" => fields = self.parse_ai_string_list()?,
1409                "provider" => provider = Some(self.parse_string()?),
1410                "model" => model = Some(self.parse_string()?),
1411                "sync" | "sync_gate" => sync_gate = self.parse_ai_bool()?,
1412                "hard_delete" | "hard_delete_on_reject" => {
1413                    hard_delete_on_reject = self.parse_ai_bool()?
1414                }
1415                "degraded" | "degraded_mode" => {
1416                    let word = self.parse_ai_word()?;
1417                    degraded_mode = ModerateDegradedMode::from_str(&word).ok_or_else(|| {
1418                        ParseError::new(
1419                            format!(
1420                                "unsupported MODERATE degraded mode {word:?}; supported: open, closed"
1421                            ),
1422                            self.position(),
1423                        )
1424                    })?;
1425                }
1426                "on_reject" | "reject_action" => {
1427                    let word = self.parse_ai_word()?;
1428                    reject_action = ModerateRejectAction::from_str(&word).ok_or_else(|| {
1429                        ParseError::new(
1430                            format!(
1431                                "unsupported MODERATE reject action {word:?}; supported: reject, flag, redact"
1432                            ),
1433                            self.position(),
1434                        )
1435                    })?;
1436                }
1437                other => {
1438                    return Err(ParseError::new(
1439                        format!(
1440                            "unsupported MODERATE policy option {other:?}; supported: fields, provider, model, sync, degraded, on_reject, hard_delete"
1441                        ),
1442                        self.position(),
1443                    ));
1444                }
1445            }
1446            if !self.consume(&Token::Comma)? {
1447                break;
1448            }
1449        }
1450        self.expect(Token::RParen)?;
1451        if fields.is_empty() {
1452            return Err(ParseError::new(
1453                "MODERATE policy requires fields = ('<col>', ...)".to_string(),
1454                self.position(),
1455            ));
1456        }
1457        let provider = provider.ok_or_else(|| {
1458            ParseError::new(
1459                "MODERATE policy requires provider = '<token>'".to_string(),
1460                self.position(),
1461            )
1462        })?;
1463        let model = model.ok_or_else(|| {
1464            ParseError::new(
1465                "MODERATE policy requires model = '<name>'".to_string(),
1466                self.position(),
1467            )
1468        })?;
1469        Ok(reddb_types::catalog::ModeratePolicy {
1470            fields,
1471            provider,
1472            model,
1473            sync_gate,
1474            degraded_mode,
1475            reject_action,
1476            hard_delete_on_reject,
1477        })
1478    }
1479
1480    /// `VISION (image_field = '..', outputs = (...), provider, model)`.
1481    fn parse_ai_vision_policy(&mut self) -> Result<reddb_types::catalog::VisionPolicy, ParseError> {
1482        self.expect(Token::LParen)?;
1483        let mut image_field: Option<String> = None;
1484        let mut output_kinds: Vec<String> = Vec::new();
1485        let mut provider: Option<String> = None;
1486        let mut model: Option<String> = None;
1487        loop {
1488            let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
1489            self.expect(Token::Eq)?;
1490            match key.as_str() {
1491                "image_field" => image_field = Some(self.parse_string()?),
1492                "outputs" | "output_kinds" => output_kinds = self.parse_ai_string_list()?,
1493                "provider" => provider = Some(self.parse_string()?),
1494                "model" => model = Some(self.parse_string()?),
1495                other => {
1496                    return Err(ParseError::new(
1497                        format!(
1498                            "unsupported VISION policy option {other:?}; supported: image_field, outputs, provider, model"
1499                        ),
1500                        self.position(),
1501                    ));
1502                }
1503            }
1504            if !self.consume(&Token::Comma)? {
1505                break;
1506            }
1507        }
1508        self.expect(Token::RParen)?;
1509        let image_field = image_field.ok_or_else(|| {
1510            ParseError::new(
1511                "VISION policy requires image_field = '<col>'".to_string(),
1512                self.position(),
1513            )
1514        })?;
1515        if output_kinds.is_empty() {
1516            return Err(ParseError::new(
1517                "VISION policy requires outputs = ('<kind>', ...)".to_string(),
1518                self.position(),
1519            ));
1520        }
1521        let provider = provider.ok_or_else(|| {
1522            ParseError::new(
1523                "VISION policy requires provider = '<token>'".to_string(),
1524                self.position(),
1525            )
1526        })?;
1527        let model = model.ok_or_else(|| {
1528            ParseError::new(
1529                "VISION policy requires model = '<name>'".to_string(),
1530                self.position(),
1531            )
1532        })?;
1533        Ok(reddb_types::catalog::VisionPolicy {
1534            image_field,
1535            output_kinds,
1536            provider,
1537            model,
1538        })
1539    }
1540
1541    fn expect_ident_ci_ddl(&mut self, expected: &str) -> Result<(), ParseError> {
1542        if self.consume_ident_ci(expected)? {
1543            Ok(())
1544        } else {
1545            Err(ParseError::expected(
1546                vec![expected],
1547                self.peek(),
1548                self.position(),
1549            ))
1550        }
1551    }
1552
1553    fn parse_create_table_ttl_clause(&mut self) -> Result<Option<u64>, ParseError> {
1554        let option_name = self.expect_ident_or_keyword()?;
1555        if !option_name.eq_ignore_ascii_case("ttl") {
1556            return Err(ParseError::new(
1557                // F-05: `option_name` is caller-controlled identifier text.
1558                // Render via `{:?}` so embedded CR/LF/NUL/quotes are escaped
1559                // before the message reaches downstream serialization sinks.
1560                format!(
1561                    "unsupported CREATE TABLE option {option_name:?}; supported options: TTL <duration> [ms|s|m|h|d] (e.g. `WITH TTL 30 m`)"
1562                ),
1563                self.position(),
1564            ));
1565        }
1566
1567        let ttl_value = self.parse_float()?;
1568        let ttl_unit = match self.peek() {
1569            Token::Ident(unit) => {
1570                let unit = unit.clone();
1571                self.advance()?;
1572                unit
1573            }
1574            _ => "s".to_string(),
1575        };
1576
1577        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
1578            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
1579            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
1580            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
1581            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
1582            "d" | "day" | "days" => 86_400_000.0,
1583            other => {
1584                return Err(ParseError::new(
1585                    // F-05: render `other` via `{:?}` so caller-controlled
1586                    // bytes (CR / LF / NUL / quotes) are escaped before
1587                    // reaching downstream serialization sinks.
1588                    format!(
1589                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
1590                    ),
1591                    self.position(),
1592                ));
1593            }
1594        };
1595
1596        if !ttl_value.is_finite() || ttl_value < 0.0 {
1597            return Err(ParseError::new(
1598                "TTL must be a finite, non-negative duration".to_string(),
1599                self.position(),
1600            ));
1601        }
1602
1603        let ttl_ms = ttl_value * multiplier_ms;
1604        if ttl_ms > u64::MAX as f64 {
1605            return Err(ParseError::new(
1606                "TTL duration is too large".to_string(),
1607                self.position(),
1608            ));
1609        }
1610        if ttl_ms.fract().abs() >= f64::EPSILON {
1611            return Err(ParseError::new(
1612                "TTL duration must resolve to a whole number of milliseconds".to_string(),
1613                self.position(),
1614            ));
1615        }
1616
1617        Ok(Some(ttl_ms as u64))
1618    }
1619
1620    /// Try to match IF NOT EXISTS sequence
1621    pub(crate) fn match_if_not_exists(&mut self) -> Result<bool, ParseError> {
1622        if self.check(&Token::If) {
1623            self.advance()?;
1624            self.expect(Token::Not)?;
1625            self.expect(Token::Exists)?;
1626            Ok(true)
1627        } else {
1628            Ok(false)
1629        }
1630    }
1631
1632    /// Try to match IF EXISTS sequence
1633    pub(crate) fn match_if_exists(&mut self) -> Result<bool, ParseError> {
1634        if self.check(&Token::If) {
1635            self.advance()?;
1636            self.expect(Token::Exists)?;
1637            Ok(true)
1638        } else {
1639            Ok(false)
1640        }
1641    }
1642
1643    /// Try to match NOT NULL sequence
1644    fn match_not_null(&mut self) -> Result<bool, ParseError> {
1645        if self.check(&Token::Not) {
1646            // Peek ahead - only consume if followed by NULL
1647            // We need to be careful: save state and try
1648            self.advance()?; // consume NOT
1649            if self.check(&Token::Null) {
1650                self.advance()?; // consume NULL
1651                Ok(true)
1652            } else {
1653                // This is tricky - NOT was consumed but next isn't NULL.
1654                // In column modifier context, NOT should only appear before NULL.
1655                // Return error for clarity.
1656                Err(ParseError::expected(
1657                    vec!["NULL (after NOT)"],
1658                    self.peek(),
1659                    self.position(),
1660                ))
1661            }
1662        } else {
1663            Ok(false)
1664        }
1665    }
1666
1667    /// Try to match PRIMARY KEY sequence
1668    fn match_primary_key(&mut self) -> Result<bool, ParseError> {
1669        if self.check(&Token::Primary) {
1670            self.advance()?;
1671            self.expect(Token::Key)?;
1672            Ok(true)
1673        } else {
1674            Ok(false)
1675        }
1676    }
1677}
1678
1679/// Decode a 64-char lowercase/uppercase hex string into a 32-byte array.
1680/// Returns a human-readable error message on length or character violations.
1681/// Used by `SIGNED_BY` clause parsing (issue #520) to surface bad pubkeys
1682/// at parse-time rather than downstream in the engine.
1683fn decode_hex_32(s: &str) -> Result<[u8; 32], String> {
1684    if s.len() != 64 {
1685        return Err(format!("expected 64 hex chars, got {}", s.len()));
1686    }
1687    let mut out = [0u8; 32];
1688    let bytes = s.as_bytes();
1689    for i in 0..32 {
1690        let hi = hex_nibble(bytes[i * 2])?;
1691        let lo = hex_nibble(bytes[i * 2 + 1])?;
1692        out[i] = (hi << 4) | lo;
1693    }
1694    Ok(out)
1695}
1696
1697fn hex_nibble(c: u8) -> Result<u8, String> {
1698    match c {
1699        b'0'..=b'9' => Ok(c - b'0'),
1700        b'a'..=b'f' => Ok(c - b'a' + 10),
1701        b'A'..=b'F' => Ok(c - b'A' + 10),
1702        _ => Err(format!("non-hex char: {:?}", c as char)),
1703    }
1704}
1705
1706#[cfg(test)]
1707mod tests {
1708    use super::*;
1709    use reddb_types::catalog::{AnalyticsOutput, CollectionModel, SubscriptionOperation};
1710
1711    fn parser(input: &str) -> Parser<'_> {
1712        Parser::new(input).unwrap_or_else(|err| panic!("failed to lex {input:?}: {err:?}"))
1713    }
1714
1715    #[test]
1716    fn parse_create_table_body_parenthesized_options_and_trailing_clauses() {
1717        let QueryExpr::CreateTable(table) = parser(
1718            "IF NOT EXISTS events (id INT, tenant_meta TEXT) \
1719             WITH (tenant_by = 'tenant_id', append_only = true, timestamps = false) \
1720             PARTITION BY HASH (id) TENANT BY (tenant_meta.tenant)",
1721        )
1722        .parse_create_table_body()
1723        .expect("create table body") else {
1724            panic!("Expected CreateTableQuery");
1725        };
1726
1727        assert_eq!(table.name, "events");
1728        assert!(table.if_not_exists);
1729        assert!(table.append_only);
1730        assert!(!table.timestamps);
1731        assert_eq!(table.tenant_by.as_deref(), Some("tenant_id"));
1732        assert_eq!(
1733            table
1734                .partition_by
1735                .as_ref()
1736                .map(|spec| (spec.kind, spec.column.as_str())),
1737            Some((PartitionKind::Hash, "id"))
1738        );
1739
1740        let err = parser("bad (id INT) WITH (tenant_by = 42)")
1741            .parse_create_table_body()
1742            .unwrap_err();
1743        assert!(
1744            err.to_string()
1745                .contains("WITH tenant_by expects a text literal"),
1746            "{err}"
1747        );
1748    }
1749
1750    #[test]
1751    fn parse_create_table_ai_policy_round_trips_all_modalities() {
1752        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1753        let QueryExpr::CreateTable(table) = parser(
1754            "posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
1755               EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
1756               MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag, hard_delete = true), \
1757               VISION (image_field = 'photo', outputs = ('caption', 'tags'), provider = 'openai', model = 'gpt-4o') \
1758             )",
1759        )
1760        .parse_create_table_body()
1761        .expect("create table body with ai policy") else {
1762            panic!("Expected CreateTableQuery");
1763        };
1764
1765        let policy = table.ai_policy.expect("ai policy present");
1766
1767        let embed = policy.embed.expect("embed block");
1768        assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
1769        assert_eq!(embed.provider, "openai");
1770        assert_eq!(embed.model, "text-embedding-3-small");
1771
1772        let moderate = policy.moderate.expect("moderate block");
1773        assert_eq!(moderate.fields, vec!["body".to_string()]);
1774        assert!(moderate.sync_gate);
1775        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
1776        assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
1777        assert!(moderate.hard_delete_on_reject);
1778
1779        let vision = policy.vision.expect("vision block");
1780        assert_eq!(vision.image_field, "photo");
1781        assert_eq!(
1782            vision.output_kinds,
1783            vec!["caption".to_string(), "tags".to_string()]
1784        );
1785        assert_eq!(vision.model, "gpt-4o");
1786    }
1787
1788    #[test]
1789    fn parse_moderate_policy_aliases_and_error_branches() {
1790        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1791        // Alias spellings hit the same match arms as the short forms.
1792        let QueryExpr::CreateTable(table) = parser(
1793            "t (id INT, body TEXT) WITH ( \
1794               MODERATE (fields = ('body'), provider = 'openai', model = 'm', \
1795                 sync_gate = true, degraded_mode = open, reject_action = redact, \
1796                 hard_delete_on_reject = true) \
1797             )",
1798        )
1799        .parse_create_table_body()
1800        .expect("alias spellings parse") else {
1801            panic!("Expected CreateTableQuery");
1802        };
1803        let moderate = table
1804            .ai_policy
1805            .expect("policy")
1806            .moderate
1807            .expect("moderate block");
1808        assert!(moderate.sync_gate);
1809        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1810        assert_eq!(moderate.reject_action, ModerateRejectAction::Redact);
1811        assert!(moderate.hard_delete_on_reject);
1812
1813        // Each invalid MODERATE option surfaces a descriptive parse error.
1814        for (sql, needle) in [
1815            (
1816                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1817                "unsupported MODERATE policy option",
1818            ),
1819            (
1820                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', degraded = sideways))",
1821                "unsupported MODERATE degraded mode",
1822            ),
1823            (
1824                "t (id INT, body TEXT) WITH (MODERATE (fields = ('body'), provider = 'o', model = 'm', on_reject = explode))",
1825                "unsupported MODERATE reject action",
1826            ),
1827            (
1828                "t (id INT, body TEXT) WITH (MODERATE (provider = 'o', model = 'm'))",
1829                "MODERATE policy requires fields",
1830            ),
1831        ] {
1832            let err = parser(sql)
1833                .parse_create_table_body()
1834                .expect_err("moderate policy error");
1835            assert!(format!("{err}").contains(needle), "got: {err}");
1836        }
1837    }
1838
1839    #[test]
1840    fn parse_embed_and_vision_policy_error_branches() {
1841        // EMBED + VISION unknown-option and missing-required-field arms each
1842        // surface a descriptive parse error.
1843        for (sql, needle) in [
1844            (
1845                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o', model = 'm', bogus = 1))",
1846                "unsupported EMBED policy option",
1847            ),
1848            (
1849                "t (id INT, body TEXT) WITH (EMBED (provider = 'o', model = 'm'))",
1850                "EMBED policy requires fields",
1851            ),
1852            (
1853                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), model = 'm'))",
1854                "EMBED policy requires provider",
1855            ),
1856            (
1857                "t (id INT, body TEXT) WITH (EMBED (fields = ('body'), provider = 'o'))",
1858                "EMBED policy requires model",
1859            ),
1860            (
1861                "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', provider = 'o', model = 'm', bogus = 1))",
1862                "unsupported VISION policy option",
1863            ),
1864            (
1865                "t (id INT, photo TEXT) WITH (VISION (provider = 'o', model = 'm'))",
1866                "VISION policy requires image_field",
1867            ),
1868        ] {
1869            let err = parser(sql)
1870                .parse_create_table_body()
1871                .expect_err("ai policy error");
1872            assert!(format!("{err}").contains(needle), "got: {err}");
1873        }
1874
1875        // The `output_kinds` alias for VISION parses like `outputs`.
1876        let QueryExpr::CreateTable(table) = parser(
1877            "t (id INT, photo TEXT) WITH (VISION (image_field = 'photo', \
1878               output_kinds = ('caption'), provider = 'o', model = 'm'))",
1879        )
1880        .parse_create_table_body()
1881        .expect("vision output_kinds alias") else {
1882            panic!("Expected CreateTableQuery");
1883        };
1884        let vision = table
1885            .ai_policy
1886            .expect("policy")
1887            .vision
1888            .expect("vision block");
1889        assert_eq!(vision.output_kinds, vec!["caption".to_string()]);
1890    }
1891
1892    #[test]
1893    fn parse_create_table_ai_policy_defaults_and_no_clause() {
1894        use reddb_types::catalog::{ModerateDegradedMode, ModerateRejectAction};
1895        // MODERATE with no sync/degraded/on_reject falls back to defaults.
1896        let QueryExpr::CreateTable(table) = parser(
1897            "msgs (id INT, body TEXT) WITH ( \
1898               MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest') \
1899             )",
1900        )
1901        .parse_create_table_body()
1902        .expect("create table body") else {
1903            panic!("Expected CreateTableQuery");
1904        };
1905        let moderate = table
1906            .ai_policy
1907            .expect("policy")
1908            .moderate
1909            .expect("moderate block");
1910        assert!(!moderate.sync_gate);
1911        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Open);
1912        assert_eq!(moderate.reject_action, ModerateRejectAction::Reject);
1913        assert!(!moderate.hard_delete_on_reject);
1914
1915        // No AI clause leaves the policy entirely absent.
1916        let QueryExpr::CreateTable(plain) = parser("plain (id INT)")
1917            .parse_create_table_body()
1918            .expect("create table body")
1919        else {
1920            panic!("Expected CreateTableQuery");
1921        };
1922        assert!(plain.ai_policy.is_none());
1923    }
1924
1925    #[test]
1926    fn parse_create_table_ai_policy_rejects_malformed_clauses() {
1927        // Missing provider.
1928        let err = parser("t (id INT) WITH (EMBED (fields = ('body'), model = 'm'))")
1929            .parse_create_table_body()
1930            .unwrap_err();
1931        assert!(
1932            err.to_string().contains("EMBED policy requires provider"),
1933            "{err}"
1934        );
1935
1936        // Unknown option key inside a clause.
1937        let err = parser(
1938            "t (id INT) WITH (VISION (image_field = 'p', outputs = ('caption'), provider = 'openai', model = 'm', bogus = 1))",
1939        )
1940        .parse_create_table_body()
1941        .unwrap_err();
1942        assert!(
1943            err.to_string().contains("unsupported VISION policy option"),
1944            "{err}"
1945        );
1946
1947        // Invalid moderation degraded mode.
1948        let err = parser(
1949            "t (id INT) WITH (MODERATE (fields = ('body'), provider = 'openai', model = 'm', degraded = maybe))",
1950        )
1951        .parse_create_table_body()
1952        .unwrap_err();
1953        assert!(
1954            err.to_string()
1955                .contains("unsupported MODERATE degraded mode"),
1956            "{err}"
1957        );
1958
1959        // Duplicate EMBED clause.
1960        let err = parser(
1961            "t (id INT) WITH (EMBED (fields = ('a'), provider = 'openai', model = 'm'), EMBED (fields = ('b'), provider = 'openai', model = 'm'))",
1962        )
1963        .parse_create_table_body()
1964        .unwrap_err();
1965        assert!(err.to_string().contains("duplicate EMBED clause"), "{err}");
1966    }
1967
1968    #[test]
1969    fn parse_keyed_bodies_cover_vault_analytics_and_dotted_drop_names() {
1970        let QueryExpr::CreateTable(vault) =
1971            parser("IF NOT EXISTS tenant.secrets WITH OWN MASTER KEY")
1972                .parse_create_keyed_body(CollectionModel::Vault)
1973                .expect("create vault")
1974        else {
1975            panic!("Expected CreateTableQuery");
1976        };
1977        assert_eq!(vault.collection_model, CollectionModel::Vault);
1978        assert_eq!(vault.name, "tenant.secrets");
1979        assert!(vault.if_not_exists);
1980        assert!(vault.vault_own_master_key);
1981
1982        let QueryExpr::CreateTable(graph) = parser(
1983            "g WITH ANALYTICS (centrality (using = pagerank, max_iterations = 12, tolerance = 0.001))",
1984        )
1985        .parse_create_keyed_body(CollectionModel::Graph)
1986        .expect("create graph")
1987        else {
1988            panic!("Expected CreateTableQuery");
1989        };
1990        assert_eq!(graph.analytics_config.len(), 1);
1991        let view = &graph.analytics_config[0];
1992        assert_eq!(view.output, AnalyticsOutput::Centrality);
1993        assert_eq!(view.algorithm.as_deref(), Some("pagerank"));
1994        assert_eq!(view.max_iterations, Some(12));
1995        assert_eq!(view.tolerance, Some(0.001));
1996
1997        let err = parser("g WITH OTHER")
1998            .parse_create_keyed_body(CollectionModel::Graph)
1999            .unwrap_err();
2000        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2001
2002        assert!(parser("CREATE KV cache WITH ANALYTICS (components)")
2003            .parse()
2004            .unwrap_err()
2005            .to_string()
2006            .contains("Unexpected token after query"));
2007
2008        let QueryExpr::DropKv(drop) = parser("IF EXISTS tenant.cache.*")
2009            .parse_drop_keyed_body(CollectionModel::Kv)
2010            .expect("drop kv")
2011        else {
2012            panic!("Expected DropKvQuery");
2013        };
2014        assert_eq!(drop.name, "tenant.cache.*");
2015        assert!(drop.if_exists);
2016        assert_eq!(drop.model, CollectionModel::Kv);
2017    }
2018
2019    #[test]
2020    fn parse_collection_signed_by_list_and_errors() {
2021        let pk_a = "aa".repeat(32);
2022        let pk_b = "BB".repeat(32);
2023        let QueryExpr::CreateCollection(collection) =
2024            parser(&format!("signed KIND graph SIGNED_BY ('{pk_a}', '{pk_b}')"))
2025                .parse_create_collection_body()
2026                .expect("create collection")
2027        else {
2028            panic!("Expected CreateCollectionQuery");
2029        };
2030        assert_eq!(collection.allowed_signers, vec![[0xaau8; 32], [0xBBu8; 32]]);
2031
2032        let err = parser("signed KIND graph SIGNED_BY (42)")
2033            .parse_create_collection_body()
2034            .unwrap_err();
2035        assert!(
2036            err.to_string()
2037                .contains("string literal (ed25519 pubkey hex)"),
2038            "{err}"
2039        );
2040
2041        let err = parser("signed KIND graph SIGNED_BY ('deadbeef')")
2042            .parse_create_collection_body()
2043            .unwrap_err();
2044        assert!(err.to_string().contains("expected 64 hex chars"), "{err}");
2045    }
2046
2047    #[test]
2048    fn parse_alter_operations_cover_subscriptions_partitions_tenancy_and_signers() {
2049        let pk = "11".repeat(32);
2050        let QueryExpr::AlterTable(alter) = parser(&format!(
2051            "ALTER COLLECTION audit \
2052             ADD SUBSCRIPTION pii TO audit_events REDACT (payload.ssn, *.secret) WHERE level = 'warn', \
2053             DROP SUBSCRIPTION pii, \
2054             ADD SIGNER '{pk}', \
2055             REVOKE SIGNER '{pk}', \
2056             ATTACH PARTITION audit_2026 FOR VALUES FROM (2026) TO (2027), \
2057             DETACH PARTITION audit_2026, \
2058             ENABLE EVENTS (INSERT, UPDATE) TO table_events ON ALL TENANTS, \
2059             DISABLE EVENTS, \
2060             ENABLE TENANCY ON (metadata.tenant), \
2061             DISABLE TENANCY, \
2062             SET APPEND_ONLY = true, \
2063             SET VERSIONED = false, \
2064             SET RETENTION 2 h, \
2065             UNSET RETENTION"
2066        ))
2067        .parse_alter_table_query()
2068        .expect("alter collection")
2069        else {
2070            panic!("Expected AlterTableQuery");
2071        };
2072
2073        assert_eq!(alter.name, "audit");
2074        assert_eq!(alter.operations.len(), 14);
2075        match &alter.operations[0] {
2076            AlterOperation::AddSubscription { name, descriptor } => {
2077                assert_eq!(name, "pii");
2078                assert_eq!(descriptor.target_queue, "audit_events");
2079                assert_eq!(descriptor.redact_fields, vec!["payload.ssn", "*.secret"]);
2080                assert_eq!(descriptor.where_filter.as_deref(), Some("LEVEL = 'warn'"));
2081            }
2082            other => panic!("expected AddSubscription, got {other:?}"),
2083        }
2084        assert!(matches!(
2085            &alter.operations[1],
2086            AlterOperation::DropSubscription { name } if name == "pii"
2087        ));
2088        assert!(matches!(
2089            &alter.operations[2],
2090            AlterOperation::AddSigner { pubkey } if *pubkey == [0x11; 32]
2091        ));
2092        assert!(matches!(
2093            &alter.operations[3],
2094            AlterOperation::RevokeSigner { pubkey } if *pubkey == [0x11; 32]
2095        ));
2096        assert!(matches!(
2097            &alter.operations[4],
2098            AlterOperation::AttachPartition { child, bound }
2099                if child == "audit_2026" && bound == "FROM ( 2026 ) TO ( 2027 )"
2100        ));
2101        assert!(matches!(
2102            &alter.operations[5],
2103            AlterOperation::DetachPartition { child } if child == "audit_2026"
2104        ));
2105        match &alter.operations[6] {
2106            AlterOperation::EnableEvents(descriptor) => {
2107                assert_eq!(
2108                    descriptor.ops_filter,
2109                    vec![SubscriptionOperation::Insert, SubscriptionOperation::Update]
2110                );
2111                assert_eq!(descriptor.target_queue, "table_events");
2112                assert!(descriptor.all_tenants);
2113            }
2114            other => panic!("expected EnableEvents, got {other:?}"),
2115        }
2116        assert!(matches!(
2117            &alter.operations[7],
2118            AlterOperation::DisableEvents
2119        ));
2120        assert!(matches!(
2121            &alter.operations[8],
2122            AlterOperation::EnableTenancy { column } if column == "METADATA.tenant"
2123        ));
2124        assert!(matches!(
2125            &alter.operations[9],
2126            AlterOperation::DisableTenancy
2127        ));
2128        assert!(matches!(
2129            &alter.operations[10],
2130            AlterOperation::SetAppendOnly(true)
2131        ));
2132        assert!(matches!(
2133            &alter.operations[11],
2134            AlterOperation::SetVersioned(false)
2135        ));
2136        assert!(matches!(
2137            &alter.operations[12],
2138            AlterOperation::SetRetention { duration_ms } if *duration_ms == 7_200_000
2139        ));
2140        assert!(matches!(
2141            &alter.operations[13],
2142            AlterOperation::UnsetRetention
2143        ));
2144    }
2145
2146    #[test]
2147    fn parse_alter_graph_analytics_keyword_errors() {
2148        let err = parser("ALTER GRAPH g ADD centrality")
2149            .parse_alter_graph_query()
2150            .unwrap_err();
2151        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2152
2153        let err = parser("ALTER GRAPH g DROP centrality")
2154            .parse_alter_graph_query()
2155            .unwrap_err();
2156        assert!(err.to_string().contains("expected: ANALYTICS"), "{err}");
2157    }
2158
2159    #[test]
2160    fn decode_hex_32_reports_length_and_character_errors() {
2161        assert_eq!(decode_hex_32(&"0f".repeat(32)).unwrap(), [0x0f; 32]);
2162        assert_eq!(
2163            decode_hex_32("deadbeef").unwrap_err(),
2164            "expected 64 hex chars, got 8"
2165        );
2166        assert!(decode_hex_32(&"gg".repeat(32))
2167            .unwrap_err()
2168            .contains("non-hex char"));
2169    }
2170}