Skip to main content

laminar_sql/parser/
lookup_table.rs

1//! Parser for CREATE/DROP LOOKUP TABLE DDL statements.
2//!
3//! Lookup tables are dimension/reference tables used in enrichment joins.
4//! They can be backed by external connectors (PostgreSQL CDC, Redis, etc.)
5//! with configurable caching and predicate pushdown strategies.
6
7#[allow(clippy::disallowed_types)] // cold path: SQL parsing
8use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18/// CREATE LOOKUP TABLE statement.
19#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21    /// Table name.
22    pub name: ObjectName,
23    /// Column definitions.
24    pub columns: Vec<ColumnDef>,
25    /// Primary key column names.
26    pub primary_key: Vec<String>,
27    /// WITH clause options.
28    pub with_options: HashMap<String, String>,
29    /// Whether OR REPLACE was specified.
30    pub or_replace: bool,
31    /// Whether IF NOT EXISTS was specified.
32    pub if_not_exists: bool,
33}
34
35/// Validated lookup table properties from the WITH clause.
36#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38    /// Connector type.
39    pub connector: ConnectorType,
40    /// Connection string.
41    pub connection: Option<String>,
42    /// Lookup strategy.
43    pub strategy: LookupStrategy,
44    /// In-memory cache size.
45    pub cache_memory: Option<ByteSize>,
46    /// On-disk cache size.
47    pub cache_disk: Option<ByteSize>,
48    /// Cache TTL in seconds.
49    pub cache_ttl: Option<u64>,
50    /// Predicate pushdown mode.
51    pub pushdown_mode: PushdownMode,
52}
53
54/// Connector type for lookup tables.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57    /// PostgreSQL CDC connector.
58    PostgresCdc,
59    /// MySQL CDC connector.
60    MysqlCdc,
61    /// Redis connector.
62    Redis,
63    /// S3 Parquet connector.
64    S3Parquet,
65    /// Delta Lake connector.
66    DeltaLake,
67    /// Static in-memory data.
68    Static,
69    /// Custom connector type.
70    Custom(String),
71}
72
73impl ConnectorType {
74    /// Parse a connector type from a string.
75    ///
76    /// # Errors
77    ///
78    /// Returns `ParseError` if the connector type is empty.
79    pub fn parse(s: &str) -> Result<Self, ParseError> {
80        if s.is_empty() {
81            return Err(ParseError::ValidationError(
82                "connector type cannot be empty".to_string(),
83            ));
84        }
85        Ok(match s.to_lowercase().as_str() {
86            "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
87            "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
88            "redis" => Self::Redis,
89            "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
90            "delta-lake" | "delta_lake" | "delta" | "deltalake" => Self::DeltaLake,
91            "static" | "memory" => Self::Static,
92            other => Self::Custom(other.to_string()),
93        })
94    }
95}
96
97/// Lookup strategy for how table data is distributed/cached.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum LookupStrategy {
100    /// Full table replicated on every node.
101    #[default]
102    Replicated,
103    /// Table partitioned across nodes by key.
104    Partitioned,
105    /// Rows fetched on demand (no pre-loading).
106    OnDemand,
107}
108
109impl LookupStrategy {
110    /// Parse a lookup strategy from a string.
111    ///
112    /// # Errors
113    ///
114    /// Returns `ParseError` if the strategy is unknown.
115    pub fn parse(s: &str) -> Result<Self, ParseError> {
116        match s.to_lowercase().as_str() {
117            "replicated" | "full" => Ok(Self::Replicated),
118            "partitioned" | "sharded" => Ok(Self::Partitioned),
119            "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
120            other => Err(ParseError::ValidationError(format!(
121                "unknown lookup strategy: '{other}' \
122                 (expected: replicated, partitioned, on-demand)"
123            ))),
124        }
125    }
126}
127
128/// Predicate pushdown mode.
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum PushdownMode {
131    /// Automatically decide based on connector capabilities.
132    #[default]
133    Auto,
134    /// Always push predicates to the source.
135    Enabled,
136    /// Never push predicates to the source.
137    Disabled,
138}
139
140impl PushdownMode {
141    /// Parse a pushdown mode from a string.
142    ///
143    /// # Errors
144    ///
145    /// Returns `ParseError` if the mode is unknown.
146    pub fn parse(s: &str) -> Result<Self, ParseError> {
147        match s.to_lowercase().as_str() {
148            "auto" => Ok(Self::Auto),
149            "enabled" | "true" | "on" => Ok(Self::Enabled),
150            "disabled" | "false" | "off" => Ok(Self::Disabled),
151            other => Err(ParseError::ValidationError(format!(
152                "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
153            ))),
154        }
155    }
156}
157
158/// A parsed byte size (e.g., "512mb", "1gb", "10kb").
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub struct ByteSize(pub u64);
161
162impl ByteSize {
163    /// Parse a human-readable byte size string.
164    ///
165    /// Supports suffixes: b, kb, mb, gb, tb (case-insensitive).
166    ///
167    /// # Errors
168    ///
169    /// Returns `ParseError` if the string cannot be parsed.
170    pub fn parse(s: &str) -> Result<Self, ParseError> {
171        let s = s.trim().to_lowercase();
172        let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
173            (n, 1024 * 1024 * 1024 * 1024)
174        } else if let Some(n) = s.strip_suffix("gb") {
175            (n, 1024 * 1024 * 1024)
176        } else if let Some(n) = s.strip_suffix("mb") {
177            (n, 1024 * 1024)
178        } else if let Some(n) = s.strip_suffix("kb") {
179            (n, 1024)
180        } else if let Some(n) = s.strip_suffix('b') {
181            (n, 1)
182        } else {
183            // Assume bytes if no suffix
184            (s.as_str(), 1)
185        };
186
187        let num: u64 = num_str
188            .trim()
189            .parse()
190            .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
191
192        Ok(Self(num * multiplier))
193    }
194
195    /// Returns the size in bytes.
196    #[must_use]
197    pub fn as_bytes(&self) -> u64 {
198        self.0
199    }
200}
201
202/// Parse a CREATE LOOKUP TABLE statement.
203///
204/// Syntax:
205/// ```sql
206/// CREATE [OR REPLACE] LOOKUP TABLE [IF NOT EXISTS] <name> (
207///   <col> <type> [NOT NULL],
208///   ...
209///   PRIMARY KEY (<col>, ...)
210/// ) WITH (
211///   'connector' = 'postgres-cdc',
212///   'connection' = 'postgresql://...',
213///   ...
214/// );
215/// ```
216///
217/// # Errors
218///
219/// Returns `ParseError` if the statement syntax is invalid.
220pub fn parse_create_lookup_table(
221    parser: &mut Parser,
222) -> Result<CreateLookupTableStatement, ParseError> {
223    // CREATE already consumed by the router; consume it here for standalone parsing
224    parser
225        .expect_keyword(Keyword::CREATE)
226        .map_err(ParseError::SqlParseError)?;
227
228    let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
229
230    expect_custom_keyword(parser, "LOOKUP")?;
231
232    parser
233        .expect_keyword(Keyword::TABLE)
234        .map_err(ParseError::SqlParseError)?;
235
236    let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
237
238    let name = parser
239        .parse_object_name(false)
240        .map_err(ParseError::SqlParseError)?;
241
242    // Parse column definitions: ( col1 TYPE, col2 TYPE, ..., PRIMARY KEY (col1, ...) )
243    parser
244        .expect_token(&Token::LParen)
245        .map_err(ParseError::SqlParseError)?;
246
247    let mut columns = Vec::new();
248    let mut primary_key = Vec::new();
249
250    loop {
251        // Check for PRIMARY KEY clause
252        if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
253            parser
254                .expect_token(&Token::LParen)
255                .map_err(ParseError::SqlParseError)?;
256
257            loop {
258                let ident = parser
259                    .parse_identifier()
260                    .map_err(ParseError::SqlParseError)?;
261                primary_key.push(ident.value);
262
263                if !parser.consume_token(&Token::Comma) {
264                    break;
265                }
266            }
267
268            parser
269                .expect_token(&Token::RParen)
270                .map_err(ParseError::SqlParseError)?;
271
272            // Consume optional trailing comma after PRIMARY KEY clause
273            let _ = parser.consume_token(&Token::Comma);
274        } else if parser.consume_token(&Token::RParen) {
275            // End of column definitions
276            break;
277        } else {
278            // Parse column definition
279            let col = parser
280                .parse_column_def()
281                .map_err(ParseError::SqlParseError)?;
282            columns.push(col);
283
284            // Comma or closing paren
285            if !parser.consume_token(&Token::Comma) {
286                parser
287                    .expect_token(&Token::RParen)
288                    .map_err(ParseError::SqlParseError)?;
289                break;
290            }
291        }
292    }
293
294    if columns.is_empty() {
295        return Err(ParseError::StreamingError(
296            "LOOKUP TABLE must have at least one column".to_string(),
297        ));
298    }
299
300    // Parse WITH clause
301    let with_options = parse_with_options(parser)?;
302    if with_options.is_empty() {
303        return Err(ParseError::StreamingError(
304            "LOOKUP TABLE requires a WITH clause".to_string(),
305        ));
306    }
307
308    Ok(CreateLookupTableStatement {
309        name,
310        columns,
311        primary_key,
312        with_options,
313        or_replace,
314        if_not_exists,
315    })
316}
317
318/// Parse a DROP LOOKUP TABLE statement.
319///
320/// Syntax: `DROP LOOKUP TABLE [IF EXISTS] <name>`
321///
322/// # Errors
323///
324/// Returns `ParseError` if the statement syntax is invalid.
325pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
326    parser
327        .expect_keyword(Keyword::DROP)
328        .map_err(ParseError::SqlParseError)?;
329
330    expect_custom_keyword(parser, "LOOKUP")?;
331
332    parser
333        .expect_keyword(Keyword::TABLE)
334        .map_err(ParseError::SqlParseError)?;
335
336    let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
337
338    let name = parser
339        .parse_object_name(false)
340        .map_err(ParseError::SqlParseError)?;
341
342    Ok((name, if_exists))
343}
344
345/// Validate and extract typed properties from raw WITH options.
346///
347/// # Errors
348///
349/// Returns `ParseError` if required properties are missing or invalid.
350pub fn validate_properties<S: ::std::hash::BuildHasher>(
351    options: &HashMap<String, String, S>,
352) -> Result<LookupTableProperties, ParseError> {
353    let connector_str = options.get("connector").ok_or_else(|| {
354        ParseError::ValidationError("missing required property: 'connector'".to_string())
355    })?;
356    let connector = ConnectorType::parse(connector_str)?;
357
358    let connection = options.get("connection").cloned();
359
360    let strategy = match options.get("strategy") {
361        Some(s) => LookupStrategy::parse(s)?,
362        None => LookupStrategy::default(),
363    };
364
365    let cache_memory = options
366        .get("cache.memory")
367        .map(|s| ByteSize::parse(s))
368        .transpose()?;
369
370    let cache_disk = options
371        .get("cache.disk")
372        .map(|s| ByteSize::parse(s))
373        .transpose()?;
374
375    let cache_ttl = options
376        .get("cache.ttl")
377        .map(|s| {
378            s.parse::<u64>()
379                .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
380        })
381        .transpose()?;
382
383    let pushdown_mode = match options.get("pushdown") {
384        Some(s) => PushdownMode::parse(s)?,
385        None => PushdownMode::default(),
386    };
387
388    Ok(LookupTableProperties {
389        connector,
390        connection,
391        strategy,
392        cache_memory,
393        cache_disk,
394        cache_ttl,
395        pushdown_mode,
396    })
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::parser::StreamingParser;
403    use crate::parser::StreamingStatement;
404
405    /// Helper to parse SQL and return the first statement.
406    fn parse_one(sql: &str) -> StreamingStatement {
407        let stmts = StreamingParser::parse_sql(sql).unwrap();
408        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
409        stmts.into_iter().next().unwrap()
410    }
411
412    #[test]
413    fn test_parse_basic_create_lookup_table() {
414        let stmt = parse_one(
415            "CREATE LOOKUP TABLE instruments (
416                symbol VARCHAR NOT NULL,
417                name VARCHAR,
418                PRIMARY KEY (symbol)
419            ) WITH (
420                'connector' = 'postgres-cdc',
421                'connection' = 'postgresql://localhost/db'
422            )",
423        );
424        match stmt {
425            StreamingStatement::CreateLookupTable(lt) => {
426                assert_eq!(lt.name.to_string(), "instruments");
427                assert_eq!(lt.columns.len(), 2);
428                assert_eq!(lt.primary_key, vec!["symbol"]);
429                assert!(!lt.or_replace);
430                assert!(!lt.if_not_exists);
431                assert_eq!(
432                    lt.with_options.get("connector"),
433                    Some(&"postgres-cdc".to_string())
434                );
435            }
436            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
437        }
438    }
439
440    #[test]
441    fn test_parse_or_replace_and_if_not_exists() {
442        let stmt = parse_one(
443            "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
444                id INT,
445                PRIMARY KEY (id)
446            ) WITH (
447                'connector' = 'static'
448            )",
449        );
450        match stmt {
451            StreamingStatement::CreateLookupTable(lt) => {
452                assert!(lt.or_replace);
453                assert!(lt.if_not_exists);
454            }
455            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
456        }
457    }
458
459    #[test]
460    fn test_parse_with_primary_key() {
461        let stmt = parse_one(
462            "CREATE LOOKUP TABLE t (
463                a INT,
464                b VARCHAR,
465                c FLOAT,
466                PRIMARY KEY (a, b)
467            ) WITH ('connector' = 'static')",
468        );
469        match stmt {
470            StreamingStatement::CreateLookupTable(lt) => {
471                assert_eq!(lt.primary_key, vec!["a", "b"]);
472                assert_eq!(lt.columns.len(), 3);
473            }
474            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
475        }
476    }
477
478    #[test]
479    fn test_parse_with_clause_properties() {
480        let stmt = parse_one(
481            "CREATE LOOKUP TABLE t (
482                id INT,
483                PRIMARY KEY (id)
484            ) WITH (
485                'connector' = 'postgres-cdc',
486                'connection' = 'postgresql://localhost/db',
487                'strategy' = 'replicated',
488                'cache.memory' = '512mb',
489                'pushdown' = 'auto'
490            )",
491        );
492        match stmt {
493            StreamingStatement::CreateLookupTable(lt) => {
494                let props = validate_properties(&lt.with_options).unwrap();
495                assert_eq!(props.connector, ConnectorType::PostgresCdc);
496                assert_eq!(
497                    props.connection.as_deref(),
498                    Some("postgresql://localhost/db")
499                );
500                assert_eq!(props.strategy, LookupStrategy::Replicated);
501                assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
502                assert_eq!(props.pushdown_mode, PushdownMode::Auto);
503            }
504            _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
505        }
506    }
507
508    #[test]
509    fn test_parse_drop_lookup_table() {
510        let stmt = parse_one("DROP LOOKUP TABLE instruments");
511        match stmt {
512            StreamingStatement::DropLookupTable { name, if_exists } => {
513                assert_eq!(name.to_string(), "instruments");
514                assert!(!if_exists);
515            }
516            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
517        }
518    }
519
520    #[test]
521    fn test_parse_drop_lookup_table_if_exists() {
522        let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
523        match stmt {
524            StreamingStatement::DropLookupTable { name, if_exists } => {
525                assert_eq!(name.to_string(), "instruments");
526                assert!(if_exists);
527            }
528            _ => panic!("Expected DropLookupTable, got {stmt:?}"),
529        }
530    }
531
532    #[test]
533    fn test_byte_size_parsing() {
534        assert_eq!(
535            ByteSize::parse("512mb").unwrap(),
536            ByteSize(512 * 1024 * 1024)
537        );
538        assert_eq!(
539            ByteSize::parse("1gb").unwrap(),
540            ByteSize(1024 * 1024 * 1024)
541        );
542        assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
543        assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
544        assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
545        assert_eq!(
546            ByteSize::parse("2tb").unwrap(),
547            ByteSize(2 * 1024 * 1024 * 1024 * 1024)
548        );
549    }
550
551    #[test]
552    fn test_connector_type_parsing() {
553        assert_eq!(
554            ConnectorType::parse("postgres-cdc").unwrap(),
555            ConnectorType::PostgresCdc
556        );
557        assert_eq!(
558            ConnectorType::parse("mysql-cdc").unwrap(),
559            ConnectorType::MysqlCdc
560        );
561        assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
562        assert_eq!(
563            ConnectorType::parse("s3-parquet").unwrap(),
564            ConnectorType::S3Parquet
565        );
566        assert_eq!(
567            ConnectorType::parse("static").unwrap(),
568            ConnectorType::Static
569        );
570        assert_eq!(
571            ConnectorType::parse("delta-lake").unwrap(),
572            ConnectorType::DeltaLake
573        );
574        assert_eq!(
575            ConnectorType::parse("delta").unwrap(),
576            ConnectorType::DeltaLake
577        );
578        assert_eq!(
579            ConnectorType::parse("custom-src").unwrap(),
580            ConnectorType::Custom("custom-src".to_string())
581        );
582    }
583
584    #[test]
585    fn test_error_missing_columns() {
586        let result =
587            StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
588        assert!(result.is_err());
589    }
590
591    #[test]
592    fn test_error_missing_with_clause() {
593        let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
594        assert!(result.is_err());
595    }
596
597    #[test]
598    fn test_error_invalid_property() {
599        let mut options = HashMap::new();
600        options.insert("connector".to_string(), "postgres-cdc".to_string());
601        options.insert("strategy".to_string(), "invalid-strategy".to_string());
602        let result = validate_properties(&options);
603        assert!(result.is_err());
604    }
605}