Skip to main content

laminar_sql/parser/
lookup_table.rs

1//! Parser for CREATE/DROP LOOKUP TABLE DDL statements.
2//!
3//! Lookup tables are dimension/reference tables used in enrichment joins.
4//! They can be backed by external connectors (PostgreSQL CDC, Redis, etc.)
5//! with configurable caching and predicate pushdown strategies.
6
7use std::collections::HashMap;
8
9use sqlparser::ast::{ColumnDef, ObjectName};
10use sqlparser::keywords::Keyword;
11use sqlparser::parser::Parser;
12use sqlparser::tokenizer::Token;
13
14use super::tokenizer::{expect_custom_keyword, parse_with_options};
15use super::ParseError;
16
17/// CREATE LOOKUP TABLE statement.
18#[derive(Debug, Clone, PartialEq)]
19pub struct CreateLookupTableStatement {
20    /// Table name.
21    pub name: ObjectName,
22    /// Column definitions.
23    pub columns: Vec<ColumnDef>,
24    /// Primary key column names.
25    pub primary_key: Vec<String>,
26    /// WITH clause options.
27    pub with_options: HashMap<String, String>,
28    /// Whether OR REPLACE was specified.
29    pub or_replace: bool,
30    /// Whether IF NOT EXISTS was specified.
31    pub if_not_exists: bool,
32}
33
34/// Validated lookup table properties from the WITH clause.
35#[derive(Debug, Clone, PartialEq)]
36pub struct LookupTableProperties {
37    /// Connector type.
38    pub connector: ConnectorType,
39    /// Connection string.
40    pub connection: Option<String>,
41    /// Lookup strategy.
42    pub strategy: LookupStrategy,
43    /// In-memory cache size.
44    pub cache_memory: Option<ByteSize>,
45    /// On-disk cache size.
46    pub cache_disk: Option<ByteSize>,
47    /// Cache TTL in seconds.
48    pub cache_ttl: Option<u64>,
49    /// Predicate pushdown mode.
50    pub pushdown_mode: PushdownMode,
51}
52
53/// Connector type for lookup tables.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum ConnectorType {
56    /// PostgreSQL CDC connector.
57    PostgresCdc,
58    /// MySQL CDC connector.
59    MysqlCdc,
60    /// Redis connector.
61    Redis,
62    /// S3 Parquet connector.
63    S3Parquet,
64    /// Static in-memory data.
65    Static,
66    /// Custom connector type.
67    Custom(String),
68}
69
70impl ConnectorType {
71    /// Parse a connector type from a string.
72    ///
73    /// # Errors
74    ///
75    /// Returns `ParseError` if the connector type is empty.
76    pub fn parse(s: &str) -> Result<Self, ParseError> {
77        if s.is_empty() {
78            return Err(ParseError::ValidationError(
79                "connector type cannot be empty".to_string(),
80            ));
81        }
82        Ok(match s.to_lowercase().as_str() {
83            "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
84            "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
85            "redis" => Self::Redis,
86            "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
87            "static" | "memory" => Self::Static,
88            other => Self::Custom(other.to_string()),
89        })
90    }
91}
92
93/// Lookup strategy for how table data is distributed/cached.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum LookupStrategy {
96    /// Full table replicated on every node.
97    #[default]
98    Replicated,
99    /// Table partitioned across nodes by key.
100    Partitioned,
101    /// Rows fetched on demand (no pre-loading).
102    OnDemand,
103}
104
105impl LookupStrategy {
106    /// Parse a lookup strategy from a string.
107    ///
108    /// # Errors
109    ///
110    /// Returns `ParseError` if the strategy is unknown.
111    pub fn parse(s: &str) -> Result<Self, ParseError> {
112        match s.to_lowercase().as_str() {
113            "replicated" | "full" => Ok(Self::Replicated),
114            "partitioned" | "sharded" => Ok(Self::Partitioned),
115            "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
116            other => Err(ParseError::ValidationError(format!(
117                "unknown lookup strategy: '{other}' \
118                 (expected: replicated, partitioned, on-demand)"
119            ))),
120        }
121    }
122}
123
124/// Predicate pushdown mode.
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
126pub enum PushdownMode {
127    /// Automatically decide based on connector capabilities.
128    #[default]
129    Auto,
130    /// Always push predicates to the source.
131    Enabled,
132    /// Never push predicates to the source.
133    Disabled,
134}
135
136impl PushdownMode {
137    /// Parse a pushdown mode from a string.
138    ///
139    /// # Errors
140    ///
141    /// Returns `ParseError` if the mode is unknown.
142    pub fn parse(s: &str) -> Result<Self, ParseError> {
143        match s.to_lowercase().as_str() {
144            "auto" => Ok(Self::Auto),
145            "enabled" | "true" | "on" => Ok(Self::Enabled),
146            "disabled" | "false" | "off" => Ok(Self::Disabled),
147            other => Err(ParseError::ValidationError(format!(
148                "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
149            ))),
150        }
151    }
152}
153
154/// A parsed byte size (e.g., "512mb", "1gb", "10kb").
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub struct ByteSize(pub u64);
157
158impl ByteSize {
159    /// Parse a human-readable byte size string.
160    ///
161    /// Supports suffixes: b, kb, mb, gb, tb (case-insensitive).
162    ///
163    /// # Errors
164    ///
165    /// Returns `ParseError` if the string cannot be parsed.
166    pub fn parse(s: &str) -> Result<Self, ParseError> {
167        let s = s.trim().to_lowercase();
168        let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
169            (n, 1024 * 1024 * 1024 * 1024)
170        } else if let Some(n) = s.strip_suffix("gb") {
171            (n, 1024 * 1024 * 1024)
172        } else if let Some(n) = s.strip_suffix("mb") {
173            (n, 1024 * 1024)
174        } else if let Some(n) = s.strip_suffix("kb") {
175            (n, 1024)
176        } else if let Some(n) = s.strip_suffix('b') {
177            (n, 1)
178        } else {
179            // Assume bytes if no suffix
180            (s.as_str(), 1)
181        };
182
183        let num: u64 = num_str
184            .trim()
185            .parse()
186            .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
187
188        Ok(Self(num * multiplier))
189    }
190
191    /// Returns the size in bytes.
192    #[must_use]
193    pub fn as_bytes(&self) -> u64 {
194        self.0
195    }
196}
197
198/// Parse a CREATE LOOKUP TABLE statement.
199///
200/// Syntax:
201/// ```sql
202/// CREATE [OR REPLACE] LOOKUP TABLE [IF NOT EXISTS] <name> (
203///   <col> <type> [NOT NULL],
204///   ...
205///   PRIMARY KEY (<col>, ...)
206/// ) WITH (
207///   'connector' = 'postgres-cdc',
208///   'connection' = 'postgresql://...',
209///   ...
210/// );
211/// ```
212///
213/// # Errors
214///
215/// Returns `ParseError` if the statement syntax is invalid.
216pub fn parse_create_lookup_table(
217    parser: &mut Parser,
218) -> Result<CreateLookupTableStatement, ParseError> {
219    // CREATE already consumed by the router; consume it here for standalone parsing
220    parser
221        .expect_keyword(Keyword::CREATE)
222        .map_err(ParseError::SqlParseError)?;
223
224    let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
225
226    expect_custom_keyword(parser, "LOOKUP")?;
227
228    parser
229        .expect_keyword(Keyword::TABLE)
230        .map_err(ParseError::SqlParseError)?;
231
232    let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
233
234    let name = parser
235        .parse_object_name(false)
236        .map_err(ParseError::SqlParseError)?;
237
238    // Parse column definitions: ( col1 TYPE, col2 TYPE, ..., PRIMARY KEY (col1, ...) )
239    parser
240        .expect_token(&Token::LParen)
241        .map_err(ParseError::SqlParseError)?;
242
243    let mut columns = Vec::new();
244    let mut primary_key = Vec::new();
245
246    loop {
247        // Check for PRIMARY KEY clause
248        if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
249            parser
250                .expect_token(&Token::LParen)
251                .map_err(ParseError::SqlParseError)?;
252
253            loop {
254                let ident = parser
255                    .parse_identifier()
256                    .map_err(ParseError::SqlParseError)?;
257                primary_key.push(ident.value);
258
259                if !parser.consume_token(&Token::Comma) {
260                    break;
261                }
262            }
263
264            parser
265                .expect_token(&Token::RParen)
266                .map_err(ParseError::SqlParseError)?;
267
268            // Consume optional trailing comma after PRIMARY KEY clause
269            let _ = parser.consume_token(&Token::Comma);
270        } else if parser.consume_token(&Token::RParen) {
271            // End of column definitions
272            break;
273        } else {
274            // Parse column definition
275            let col = parser
276                .parse_column_def()
277                .map_err(ParseError::SqlParseError)?;
278            columns.push(col);
279
280            // Comma or closing paren
281            if !parser.consume_token(&Token::Comma) {
282                parser
283                    .expect_token(&Token::RParen)
284                    .map_err(ParseError::SqlParseError)?;
285                break;
286            }
287        }
288    }
289
290    if columns.is_empty() {
291        return Err(ParseError::StreamingError(
292            "LOOKUP TABLE must have at least one column".to_string(),
293        ));
294    }
295
296    // Parse WITH clause
297    let with_options = parse_with_options(parser)?;
298    if with_options.is_empty() {
299        return Err(ParseError::StreamingError(
300            "LOOKUP TABLE requires a WITH clause".to_string(),
301        ));
302    }
303
304    Ok(CreateLookupTableStatement {
305        name,
306        columns,
307        primary_key,
308        with_options,
309        or_replace,
310        if_not_exists,
311    })
312}
313
314/// Parse a DROP LOOKUP TABLE statement.
315///
316/// Syntax: `DROP LOOKUP TABLE [IF EXISTS] <name>`
317///
318/// # Errors
319///
320/// Returns `ParseError` if the statement syntax is invalid.
321pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
322    parser
323        .expect_keyword(Keyword::DROP)
324        .map_err(ParseError::SqlParseError)?;
325
326    expect_custom_keyword(parser, "LOOKUP")?;
327
328    parser
329        .expect_keyword(Keyword::TABLE)
330        .map_err(ParseError::SqlParseError)?;
331
332    let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
333
334    let name = parser
335        .parse_object_name(false)
336        .map_err(ParseError::SqlParseError)?;
337
338    Ok((name, if_exists))
339}
340
341/// Validate and extract typed properties from raw WITH options.
342///
343/// # Errors
344///
345/// Returns `ParseError` if required properties are missing or invalid.
346pub fn validate_properties<S: ::std::hash::BuildHasher>(
347    options: &HashMap<String, String, S>,
348) -> Result<LookupTableProperties, ParseError> {
349    let connector_str = options.get("connector").ok_or_else(|| {
350        ParseError::ValidationError("missing required property: 'connector'".to_string())
351    })?;
352    let connector = ConnectorType::parse(connector_str)?;
353
354    let connection = options.get("connection").cloned();
355
356    let strategy = match options.get("strategy") {
357        Some(s) => LookupStrategy::parse(s)?,
358        None => LookupStrategy::default(),
359    };
360
361    let cache_memory = options
362        .get("cache.memory")
363        .map(|s| ByteSize::parse(s))
364        .transpose()?;
365
366    let cache_disk = options
367        .get("cache.disk")
368        .map(|s| ByteSize::parse(s))
369        .transpose()?;
370
371    let cache_ttl = options
372        .get("cache.ttl")
373        .map(|s| {
374            s.parse::<u64>()
375                .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
376        })
377        .transpose()?;
378
379    let pushdown_mode = match options.get("pushdown") {
380        Some(s) => PushdownMode::parse(s)?,
381        None => PushdownMode::default(),
382    };
383
384    Ok(LookupTableProperties {
385        connector,
386        connection,
387        strategy,
388        cache_memory,
389        cache_disk,
390        cache_ttl,
391        pushdown_mode,
392    })
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use crate::parser::StreamingParser;
399    use crate::parser::StreamingStatement;
400
401    /// Helper to parse SQL and return the first statement.
402    fn parse_one(sql: &str) -> StreamingStatement {
403        let stmts = StreamingParser::parse_sql(sql).unwrap();
404        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
405        stmts.into_iter().next().unwrap()
406    }
407
408    #[test]
409    fn test_parse_basic_create_lookup_table() {
410        let stmt = parse_one(
411            "CREATE LOOKUP TABLE instruments (
412                symbol VARCHAR NOT NULL,
413                name VARCHAR,
414                PRIMARY KEY (symbol)
415            ) WITH (
416                'connector' = 'postgres-cdc',
417                'connection' = 'postgresql://localhost/db'
418            )",
419        );
420        match stmt {
421            StreamingStatement::CreateLookupTable(lt) => {
422                assert_eq!(lt.name.to_string(), "instruments");
423                assert_eq!(lt.columns.len(), 2);
424                assert_eq!(lt.primary_key, vec!["symbol"]);
425                assert!(!lt.or_replace);
426                assert!(!lt.if_not_exists);
427                assert_eq!(
428                    lt.with_options.get("connector"),
429                    Some(&"postgres-cdc".to_string())
430                );
431            }
432            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
433        }
434    }
435
436    #[test]
437    fn test_parse_or_replace_and_if_not_exists() {
438        let stmt = parse_one(
439            "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
440                id INT,
441                PRIMARY KEY (id)
442            ) WITH (
443                'connector' = 'static'
444            )",
445        );
446        match stmt {
447            StreamingStatement::CreateLookupTable(lt) => {
448                assert!(lt.or_replace);
449                assert!(lt.if_not_exists);
450            }
451            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
452        }
453    }
454
455    #[test]
456    fn test_parse_with_primary_key() {
457        let stmt = parse_one(
458            "CREATE LOOKUP TABLE t (
459                a INT,
460                b VARCHAR,
461                c FLOAT,
462                PRIMARY KEY (a, b)
463            ) WITH ('connector' = 'static')",
464        );
465        match stmt {
466            StreamingStatement::CreateLookupTable(lt) => {
467                assert_eq!(lt.primary_key, vec!["a", "b"]);
468                assert_eq!(lt.columns.len(), 3);
469            }
470            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
471        }
472    }
473
474    #[test]
475    fn test_parse_with_clause_properties() {
476        let stmt = parse_one(
477            "CREATE LOOKUP TABLE t (
478                id INT,
479                PRIMARY KEY (id)
480            ) WITH (
481                'connector' = 'postgres-cdc',
482                'connection' = 'postgresql://localhost/db',
483                'strategy' = 'replicated',
484                'cache.memory' = '512mb',
485                'pushdown' = 'auto'
486            )",
487        );
488        match stmt {
489            StreamingStatement::CreateLookupTable(lt) => {
490                let props = validate_properties(&lt.with_options).unwrap();
491                assert_eq!(props.connector, ConnectorType::PostgresCdc);
492                assert_eq!(
493                    props.connection.as_deref(),
494                    Some("postgresql://localhost/db")
495                );
496                assert_eq!(props.strategy, LookupStrategy::Replicated);
497                assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
498                assert_eq!(props.pushdown_mode, PushdownMode::Auto);
499            }
500            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
501        }
502    }
503
504    #[test]
505    fn test_parse_drop_lookup_table() {
506        let stmt = parse_one("DROP LOOKUP TABLE instruments");
507        match stmt {
508            StreamingStatement::DropLookupTable { name, if_exists } => {
509                assert_eq!(name.to_string(), "instruments");
510                assert!(!if_exists);
511            }
512            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
513        }
514    }
515
516    #[test]
517    fn test_parse_drop_lookup_table_if_exists() {
518        let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
519        match stmt {
520            StreamingStatement::DropLookupTable { name, if_exists } => {
521                assert_eq!(name.to_string(), "instruments");
522                assert!(if_exists);
523            }
524            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
525        }
526    }
527
528    #[test]
529    fn test_byte_size_parsing() {
530        assert_eq!(
531            ByteSize::parse("512mb").unwrap(),
532            ByteSize(512 * 1024 * 1024)
533        );
534        assert_eq!(
535            ByteSize::parse("1gb").unwrap(),
536            ByteSize(1024 * 1024 * 1024)
537        );
538        assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
539        assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
540        assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
541        assert_eq!(
542            ByteSize::parse("2tb").unwrap(),
543            ByteSize(2 * 1024 * 1024 * 1024 * 1024)
544        );
545    }
546
547    #[test]
548    fn test_connector_type_parsing() {
549        assert_eq!(
550            ConnectorType::parse("postgres-cdc").unwrap(),
551            ConnectorType::PostgresCdc
552        );
553        assert_eq!(
554            ConnectorType::parse("mysql-cdc").unwrap(),
555            ConnectorType::MysqlCdc
556        );
557        assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
558        assert_eq!(
559            ConnectorType::parse("s3-parquet").unwrap(),
560            ConnectorType::S3Parquet
561        );
562        assert_eq!(
563            ConnectorType::parse("static").unwrap(),
564            ConnectorType::Static
565        );
566        assert_eq!(
567            ConnectorType::parse("custom-src").unwrap(),
568            ConnectorType::Custom("custom-src".to_string())
569        );
570    }
571
572    #[test]
573    fn test_error_missing_columns() {
574        let result =
575            StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
576        assert!(result.is_err());
577    }
578
579    #[test]
580    fn test_error_missing_with_clause() {
581        let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
582        assert!(result.is_err());
583    }
584
585    #[test]
586    fn test_error_invalid_property() {
587        let mut options = HashMap::new();
588        options.insert("connector".to_string(), "postgres-cdc".to_string());
589        options.insert("strategy".to_string(), "invalid-strategy".to_string());
590        let result = validate_properties(&options);
591        assert!(result.is_err());
592    }
593}