Skip to main content

laminar_sql/parser/
lookup_table.rs

1//! Parser for CREATE/DROP LOOKUP TABLE DDL statements.
2//!
3//! Lookup tables are dimension/reference tables used in enrichment joins.
4//! They can be backed by external connectors (PostgreSQL CDC, Redis, etc.)
5//! with configurable caching and predicate pushdown strategies.
6
7#[allow(clippy::disallowed_types)] // cold path: SQL parsing
8use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18/// CREATE LOOKUP TABLE statement.
19#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21    /// Table name.
22    pub name: ObjectName,
23    /// Column definitions.
24    pub columns: Vec<ColumnDef>,
25    /// Primary key column names.
26    pub primary_key: Vec<String>,
27    /// WITH clause options.
28    pub with_options: HashMap<String, String>,
29    /// Whether OR REPLACE was specified.
30    pub or_replace: bool,
31    /// Whether IF NOT EXISTS was specified.
32    pub if_not_exists: bool,
33}
34
35/// Validated lookup table properties from the WITH clause.
36#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38    /// Connector type.
39    pub connector: ConnectorType,
40    /// Connection string.
41    pub connection: Option<String>,
42    /// Lookup strategy.
43    pub strategy: LookupStrategy,
44    /// In-memory cache size.
45    pub cache_memory: Option<ByteSize>,
46    /// On-disk cache size.
47    pub cache_disk: Option<ByteSize>,
48    /// Cache TTL in seconds.
49    pub cache_ttl: Option<u64>,
50    /// Predicate pushdown mode.
51    pub pushdown_mode: PushdownMode,
52}
53
54/// Connector type for lookup tables.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57    /// PostgreSQL standalone connector (poll-based snapshot, no CDC).
58    Postgres,
59    /// PostgreSQL CDC connector.
60    PostgresCdc,
61    /// MySQL CDC connector.
62    MysqlCdc,
63    /// Redis connector.
64    Redis,
65    /// S3 Parquet connector.
66    S3Parquet,
67    /// Delta Lake connector.
68    DeltaLake,
69    /// Static in-memory data.
70    Static,
71    /// Custom connector type.
72    Custom(String),
73}
74
75impl ConnectorType {
76    /// Parse a connector type from a string.
77    ///
78    /// # Errors
79    ///
80    /// Returns `ParseError` if the connector type is empty.
81    pub fn parse(s: &str) -> Result<Self, ParseError> {
82        if s.is_empty() {
83            return Err(ParseError::ValidationError(
84                "connector type cannot be empty".to_string(),
85            ));
86        }
87        Ok(match s.to_lowercase().as_str() {
88            "postgres" => Self::Postgres,
89            "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
90            "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
91            "redis" => Self::Redis,
92            "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
93            "delta-lake" | "delta_lake" | "delta" | "deltalake" => Self::DeltaLake,
94            "static" | "memory" => Self::Static,
95            other => Self::Custom(other.to_string()),
96        })
97    }
98}
99
100/// Lookup strategy for how table data is distributed/cached.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
102pub enum LookupStrategy {
103    /// Full table replicated on every node.
104    #[default]
105    Replicated,
106    /// Table partitioned across nodes by key.
107    Partitioned,
108    /// Rows fetched on demand (no pre-loading).
109    OnDemand,
110}
111
112impl LookupStrategy {
113    /// Parse a lookup strategy from a string.
114    ///
115    /// # Errors
116    ///
117    /// Returns `ParseError` if the strategy is unknown.
118    pub fn parse(s: &str) -> Result<Self, ParseError> {
119        match s.to_lowercase().as_str() {
120            "replicated" | "full" | "poll" | "snapshot" | "cdc" => Ok(Self::Replicated),
121            "partitioned" | "sharded" => Ok(Self::Partitioned),
122            "on-demand" | "on_demand" | "lazy" | "manual" => Ok(Self::OnDemand),
123            other => Err(ParseError::ValidationError(format!(
124                "unknown lookup strategy: '{other}' \
125                 (expected: replicated, partitioned, on-demand)"
126            ))),
127        }
128    }
129}
130
131/// Predicate pushdown mode.
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
133pub enum PushdownMode {
134    /// Automatically decide based on connector capabilities.
135    #[default]
136    Auto,
137    /// Always push predicates to the source.
138    Enabled,
139    /// Never push predicates to the source.
140    Disabled,
141}
142
143impl PushdownMode {
144    /// Parse a pushdown mode from a string.
145    ///
146    /// # Errors
147    ///
148    /// Returns `ParseError` if the mode is unknown.
149    pub fn parse(s: &str) -> Result<Self, ParseError> {
150        match s.to_lowercase().as_str() {
151            "auto" => Ok(Self::Auto),
152            "enabled" | "true" | "on" => Ok(Self::Enabled),
153            "disabled" | "false" | "off" => Ok(Self::Disabled),
154            other => Err(ParseError::ValidationError(format!(
155                "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
156            ))),
157        }
158    }
159}
160
161/// A parsed byte size (e.g., "512mb", "1gb", "10kb").
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct ByteSize(pub u64);
164
165impl ByteSize {
166    /// Parse a human-readable byte size string.
167    ///
168    /// Supports suffixes: b, kb, mb, gb, tb (case-insensitive).
169    ///
170    /// # Errors
171    ///
172    /// Returns `ParseError` if the string cannot be parsed.
173    pub fn parse(s: &str) -> Result<Self, ParseError> {
174        let s = s.trim().to_lowercase();
175        let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
176            (n, 1024 * 1024 * 1024 * 1024)
177        } else if let Some(n) = s.strip_suffix("gb") {
178            (n, 1024 * 1024 * 1024)
179        } else if let Some(n) = s.strip_suffix("mb") {
180            (n, 1024 * 1024)
181        } else if let Some(n) = s.strip_suffix("kb") {
182            (n, 1024)
183        } else if let Some(n) = s.strip_suffix('b') {
184            (n, 1)
185        } else {
186            // Assume bytes if no suffix
187            (s.as_str(), 1)
188        };
189
190        let num: u64 = num_str
191            .trim()
192            .parse()
193            .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
194
195        Ok(Self(num * multiplier))
196    }
197
198    /// Returns the size in bytes.
199    #[must_use]
200    pub fn as_bytes(&self) -> u64 {
201        self.0
202    }
203}
204
205/// Parse a CREATE LOOKUP TABLE statement.
206///
207/// Syntax:
208/// ```sql
209/// CREATE [OR REPLACE] LOOKUP TABLE [IF NOT EXISTS] <name> (
210///   <col> <type> [NOT NULL],
211///   ...
212///   PRIMARY KEY (<col>, ...)
213/// ) WITH (
214///   'connector' = 'postgres-cdc',
215///   'connection' = 'postgresql://...',
216///   ...
217/// );
218/// ```
219///
220/// # Errors
221///
222/// Returns `ParseError` if the statement syntax is invalid.
223pub fn parse_create_lookup_table(
224    parser: &mut Parser,
225) -> Result<CreateLookupTableStatement, ParseError> {
226    // CREATE already consumed by the router; consume it here for standalone parsing
227    parser
228        .expect_keyword(Keyword::CREATE)
229        .map_err(ParseError::SqlParseError)?;
230
231    let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
232
233    expect_custom_keyword(parser, "LOOKUP")?;
234
235    parser
236        .expect_keyword(Keyword::TABLE)
237        .map_err(ParseError::SqlParseError)?;
238
239    let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
240
241    let name = parser
242        .parse_object_name(false)
243        .map_err(ParseError::SqlParseError)?;
244
245    // Parse column definitions: ( col1 TYPE, col2 TYPE, ..., PRIMARY KEY (col1, ...) )
246    parser
247        .expect_token(&Token::LParen)
248        .map_err(ParseError::SqlParseError)?;
249
250    let mut columns = Vec::new();
251    let mut primary_key = Vec::new();
252
253    loop {
254        // Check for PRIMARY KEY clause
255        if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
256            parser
257                .expect_token(&Token::LParen)
258                .map_err(ParseError::SqlParseError)?;
259
260            loop {
261                let ident = parser
262                    .parse_identifier()
263                    .map_err(ParseError::SqlParseError)?;
264                primary_key.push(ident.value);
265
266                if !parser.consume_token(&Token::Comma) {
267                    break;
268                }
269            }
270
271            parser
272                .expect_token(&Token::RParen)
273                .map_err(ParseError::SqlParseError)?;
274
275            // Consume optional trailing comma after PRIMARY KEY clause
276            let _ = parser.consume_token(&Token::Comma);
277        } else if parser.consume_token(&Token::RParen) {
278            // End of column definitions
279            break;
280        } else {
281            // Parse column definition
282            let col = parser
283                .parse_column_def()
284                .map_err(ParseError::SqlParseError)?;
285            columns.push(col);
286
287            // Comma or closing paren
288            if !parser.consume_token(&Token::Comma) {
289                parser
290                    .expect_token(&Token::RParen)
291                    .map_err(ParseError::SqlParseError)?;
292                break;
293            }
294        }
295    }
296
297    if columns.is_empty() {
298        return Err(ParseError::StreamingError(
299            "LOOKUP TABLE must have at least one column".to_string(),
300        ));
301    }
302
303    // Parse WITH clause
304    let with_options = parse_with_options(parser)?;
305    if with_options.is_empty() {
306        return Err(ParseError::StreamingError(
307            "LOOKUP TABLE requires a WITH clause".to_string(),
308        ));
309    }
310
311    Ok(CreateLookupTableStatement {
312        name,
313        columns,
314        primary_key,
315        with_options,
316        or_replace,
317        if_not_exists,
318    })
319}
320
321/// Parse a DROP LOOKUP TABLE statement.
322///
323/// Syntax: `DROP LOOKUP TABLE [IF EXISTS] <name>`
324///
325/// # Errors
326///
327/// Returns `ParseError` if the statement syntax is invalid.
328pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
329    parser
330        .expect_keyword(Keyword::DROP)
331        .map_err(ParseError::SqlParseError)?;
332
333    expect_custom_keyword(parser, "LOOKUP")?;
334
335    parser
336        .expect_keyword(Keyword::TABLE)
337        .map_err(ParseError::SqlParseError)?;
338
339    let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
340
341    let name = parser
342        .parse_object_name(false)
343        .map_err(ParseError::SqlParseError)?;
344
345    Ok((name, if_exists))
346}
347
348/// Validate and extract typed properties from raw WITH options.
349///
350/// # Errors
351///
352/// Returns `ParseError` if required properties are missing or invalid.
353pub fn validate_properties<S: ::std::hash::BuildHasher>(
354    options: &HashMap<String, String, S>,
355) -> Result<LookupTableProperties, ParseError> {
356    let connector_str = options.get("connector").ok_or_else(|| {
357        ParseError::ValidationError("missing required property: 'connector'".to_string())
358    })?;
359    let connector = ConnectorType::parse(connector_str)?;
360
361    let connection = options.get("connection").cloned();
362
363    let strategy = match options.get("strategy") {
364        Some(s) => LookupStrategy::parse(s)?,
365        None => LookupStrategy::default(),
366    };
367
368    let cache_memory = options
369        .get("cache.memory")
370        .map(|s| ByteSize::parse(s))
371        .transpose()?;
372
373    let cache_disk = options
374        .get("cache.disk")
375        .map(|s| ByteSize::parse(s))
376        .transpose()?;
377
378    let cache_ttl = options
379        .get("cache.ttl")
380        .map(|s| {
381            s.parse::<u64>()
382                .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
383        })
384        .transpose()?;
385
386    let pushdown_mode = match options.get("pushdown") {
387        Some(s) => PushdownMode::parse(s)?,
388        None => PushdownMode::default(),
389    };
390
391    Ok(LookupTableProperties {
392        connector,
393        connection,
394        strategy,
395        cache_memory,
396        cache_disk,
397        cache_ttl,
398        pushdown_mode,
399    })
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use crate::parser::StreamingParser;
406    use crate::parser::StreamingStatement;
407
408    /// Helper to parse SQL and return the first statement.
409    fn parse_one(sql: &str) -> StreamingStatement {
410        let stmts = StreamingParser::parse_sql(sql).unwrap();
411        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
412        stmts.into_iter().next().unwrap()
413    }
414
415    #[test]
416    fn test_parse_basic_create_lookup_table() {
417        let stmt = parse_one(
418            "CREATE LOOKUP TABLE instruments (
419                symbol VARCHAR NOT NULL,
420                name VARCHAR,
421                PRIMARY KEY (symbol)
422            ) WITH (
423                'connector' = 'postgres-cdc',
424                'connection' = 'postgresql://localhost/db'
425            )",
426        );
427        match stmt {
428            StreamingStatement::CreateLookupTable(lt) => {
429                assert_eq!(lt.name.to_string(), "instruments");
430                assert_eq!(lt.columns.len(), 2);
431                assert_eq!(lt.primary_key, vec!["symbol"]);
432                assert!(!lt.or_replace);
433                assert!(!lt.if_not_exists);
434                assert_eq!(
435                    lt.with_options.get("connector"),
436                    Some(&"postgres-cdc".to_string())
437                );
438            }
439            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
440        }
441    }
442
443    #[test]
444    fn test_parse_or_replace_and_if_not_exists() {
445        let stmt = parse_one(
446            "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
447                id INT,
448                PRIMARY KEY (id)
449            ) WITH (
450                'connector' = 'static'
451            )",
452        );
453        match stmt {
454            StreamingStatement::CreateLookupTable(lt) => {
455                assert!(lt.or_replace);
456                assert!(lt.if_not_exists);
457            }
458            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
459        }
460    }
461
462    #[test]
463    fn test_parse_with_primary_key() {
464        let stmt = parse_one(
465            "CREATE LOOKUP TABLE t (
466                a INT,
467                b VARCHAR,
468                c FLOAT,
469                PRIMARY KEY (a, b)
470            ) WITH ('connector' = 'static')",
471        );
472        match stmt {
473            StreamingStatement::CreateLookupTable(lt) => {
474                assert_eq!(lt.primary_key, vec!["a", "b"]);
475                assert_eq!(lt.columns.len(), 3);
476            }
477            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
478        }
479    }
480
481    #[test]
482    fn test_parse_with_clause_properties() {
483        let stmt = parse_one(
484            "CREATE LOOKUP TABLE t (
485                id INT,
486                PRIMARY KEY (id)
487            ) WITH (
488                'connector' = 'postgres-cdc',
489                'connection' = 'postgresql://localhost/db',
490                'strategy' = 'replicated',
491                'cache.memory' = '512mb',
492                'pushdown' = 'auto'
493            )",
494        );
495        match stmt {
496            StreamingStatement::CreateLookupTable(lt) => {
497                let props = validate_properties(&lt.with_options).unwrap();
498                assert_eq!(props.connector, ConnectorType::PostgresCdc);
499                assert_eq!(
500                    props.connection.as_deref(),
501                    Some("postgresql://localhost/db")
502                );
503                assert_eq!(props.strategy, LookupStrategy::Replicated);
504                assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
505                assert_eq!(props.pushdown_mode, PushdownMode::Auto);
506            }
507            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
508        }
509    }
510
511    #[test]
512    fn test_parse_drop_lookup_table() {
513        let stmt = parse_one("DROP LOOKUP TABLE instruments");
514        match stmt {
515            StreamingStatement::DropLookupTable { name, if_exists } => {
516                assert_eq!(name.to_string(), "instruments");
517                assert!(!if_exists);
518            }
519            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
520        }
521    }
522
523    #[test]
524    fn test_parse_drop_lookup_table_if_exists() {
525        let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
526        match stmt {
527            StreamingStatement::DropLookupTable { name, if_exists } => {
528                assert_eq!(name.to_string(), "instruments");
529                assert!(if_exists);
530            }
531            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
532        }
533    }
534
535    #[test]
536    fn test_byte_size_parsing() {
537        assert_eq!(
538            ByteSize::parse("512mb").unwrap(),
539            ByteSize(512 * 1024 * 1024)
540        );
541        assert_eq!(
542            ByteSize::parse("1gb").unwrap(),
543            ByteSize(1024 * 1024 * 1024)
544        );
545        assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
546        assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
547        assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
548        assert_eq!(
549            ByteSize::parse("2tb").unwrap(),
550            ByteSize(2 * 1024 * 1024 * 1024 * 1024)
551        );
552    }
553
554    #[test]
555    fn test_connector_type_parsing() {
556        assert_eq!(
557            ConnectorType::parse("postgres-cdc").unwrap(),
558            ConnectorType::PostgresCdc
559        );
560        assert_eq!(
561            ConnectorType::parse("mysql-cdc").unwrap(),
562            ConnectorType::MysqlCdc
563        );
564        assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
565        assert_eq!(
566            ConnectorType::parse("s3-parquet").unwrap(),
567            ConnectorType::S3Parquet
568        );
569        assert_eq!(
570            ConnectorType::parse("static").unwrap(),
571            ConnectorType::Static
572        );
573        assert_eq!(
574            ConnectorType::parse("delta-lake").unwrap(),
575            ConnectorType::DeltaLake
576        );
577        assert_eq!(
578            ConnectorType::parse("delta").unwrap(),
579            ConnectorType::DeltaLake
580        );
581        assert_eq!(
582            ConnectorType::parse("custom-src").unwrap(),
583            ConnectorType::Custom("custom-src".to_string())
584        );
585    }
586
587    #[test]
588    fn test_error_missing_columns() {
589        let result =
590            StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
591        assert!(result.is_err());
592    }
593
594    #[test]
595    fn test_error_missing_with_clause() {
596        let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
597        assert!(result.is_err());
598    }
599
600    #[test]
601    fn test_error_invalid_property() {
602        let mut options = HashMap::new();
603        options.insert("connector".to_string(), "postgres-cdc".to_string());
604        options.insert("strategy".to_string(), "invalid-strategy".to_string());
605        let result = validate_properties(&options);
606        assert!(result.is_err());
607    }
608}