1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21 pub name: ObjectName,
23 pub columns: Vec<ColumnDef>,
25 pub primary_key: Vec<String>,
27 pub with_options: HashMap<String, String>,
29 pub or_replace: bool,
31 pub if_not_exists: bool,
33}
34
35#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38 pub connector: ConnectorType,
40 pub connection: Option<String>,
42 pub strategy: LookupStrategy,
44 pub cache_memory: Option<ByteSize>,
46 pub cache_disk: Option<ByteSize>,
48 pub cache_ttl: Option<u64>,
50 pub pushdown_mode: PushdownMode,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57 Postgres,
59 PostgresCdc,
61 MysqlCdc,
63 Redis,
65 S3Parquet,
67 DeltaLake,
69 Static,
71 Custom(String),
73}
74
75impl ConnectorType {
76 pub fn parse(s: &str) -> Result<Self, ParseError> {
82 if s.is_empty() {
83 return Err(ParseError::ValidationError(
84 "connector type cannot be empty".to_string(),
85 ));
86 }
87 Ok(match s.to_lowercase().as_str() {
88 "postgres" => Self::Postgres,
89 "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
90 "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
91 "redis" => Self::Redis,
92 "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
93 "delta-lake" | "delta_lake" | "delta" | "deltalake" => Self::DeltaLake,
94 "static" | "memory" => Self::Static,
95 other => Self::Custom(other.to_string()),
96 })
97 }
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
102pub enum LookupStrategy {
103 #[default]
105 Replicated,
106 Partitioned,
108 OnDemand,
110}
111
112impl LookupStrategy {
113 pub fn parse(s: &str) -> Result<Self, ParseError> {
119 match s.to_lowercase().as_str() {
120 "replicated" | "full" | "poll" | "snapshot" | "cdc" => Ok(Self::Replicated),
121 "partitioned" | "sharded" => Ok(Self::Partitioned),
122 "on-demand" | "on_demand" | "lazy" | "manual" => Ok(Self::OnDemand),
123 other => Err(ParseError::ValidationError(format!(
124 "unknown lookup strategy: '{other}' \
125 (expected: replicated, partitioned, on-demand)"
126 ))),
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
133pub enum PushdownMode {
134 #[default]
136 Auto,
137 Enabled,
139 Disabled,
141}
142
143impl PushdownMode {
144 pub fn parse(s: &str) -> Result<Self, ParseError> {
150 match s.to_lowercase().as_str() {
151 "auto" => Ok(Self::Auto),
152 "enabled" | "true" | "on" => Ok(Self::Enabled),
153 "disabled" | "false" | "off" => Ok(Self::Disabled),
154 other => Err(ParseError::ValidationError(format!(
155 "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
156 ))),
157 }
158 }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct ByteSize(pub u64);
164
165impl ByteSize {
166 pub fn parse(s: &str) -> Result<Self, ParseError> {
174 let s = s.trim().to_lowercase();
175 let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
176 (n, 1024 * 1024 * 1024 * 1024)
177 } else if let Some(n) = s.strip_suffix("gb") {
178 (n, 1024 * 1024 * 1024)
179 } else if let Some(n) = s.strip_suffix("mb") {
180 (n, 1024 * 1024)
181 } else if let Some(n) = s.strip_suffix("kb") {
182 (n, 1024)
183 } else if let Some(n) = s.strip_suffix('b') {
184 (n, 1)
185 } else {
186 (s.as_str(), 1)
188 };
189
190 let num: u64 = num_str
191 .trim()
192 .parse()
193 .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
194
195 Ok(Self(num * multiplier))
196 }
197
198 #[must_use]
200 pub fn as_bytes(&self) -> u64 {
201 self.0
202 }
203}
204
205pub fn parse_create_lookup_table(
224 parser: &mut Parser,
225) -> Result<CreateLookupTableStatement, ParseError> {
226 parser
228 .expect_keyword(Keyword::CREATE)
229 .map_err(ParseError::SqlParseError)?;
230
231 let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
232
233 expect_custom_keyword(parser, "LOOKUP")?;
234
235 parser
236 .expect_keyword(Keyword::TABLE)
237 .map_err(ParseError::SqlParseError)?;
238
239 let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
240
241 let name = parser
242 .parse_object_name(false)
243 .map_err(ParseError::SqlParseError)?;
244
245 parser
247 .expect_token(&Token::LParen)
248 .map_err(ParseError::SqlParseError)?;
249
250 let mut columns = Vec::new();
251 let mut primary_key = Vec::new();
252
253 loop {
254 if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
256 parser
257 .expect_token(&Token::LParen)
258 .map_err(ParseError::SqlParseError)?;
259
260 loop {
261 let ident = parser
262 .parse_identifier()
263 .map_err(ParseError::SqlParseError)?;
264 primary_key.push(ident.value);
265
266 if !parser.consume_token(&Token::Comma) {
267 break;
268 }
269 }
270
271 parser
272 .expect_token(&Token::RParen)
273 .map_err(ParseError::SqlParseError)?;
274
275 let _ = parser.consume_token(&Token::Comma);
277 } else if parser.consume_token(&Token::RParen) {
278 break;
280 } else {
281 let col = parser
283 .parse_column_def()
284 .map_err(ParseError::SqlParseError)?;
285 columns.push(col);
286
287 if !parser.consume_token(&Token::Comma) {
289 parser
290 .expect_token(&Token::RParen)
291 .map_err(ParseError::SqlParseError)?;
292 break;
293 }
294 }
295 }
296
297 if columns.is_empty() {
298 return Err(ParseError::StreamingError(
299 "LOOKUP TABLE must have at least one column".to_string(),
300 ));
301 }
302
303 let with_options = parse_with_options(parser)?;
305 if with_options.is_empty() {
306 return Err(ParseError::StreamingError(
307 "LOOKUP TABLE requires a WITH clause".to_string(),
308 ));
309 }
310
311 Ok(CreateLookupTableStatement {
312 name,
313 columns,
314 primary_key,
315 with_options,
316 or_replace,
317 if_not_exists,
318 })
319}
320
321pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
329 parser
330 .expect_keyword(Keyword::DROP)
331 .map_err(ParseError::SqlParseError)?;
332
333 expect_custom_keyword(parser, "LOOKUP")?;
334
335 parser
336 .expect_keyword(Keyword::TABLE)
337 .map_err(ParseError::SqlParseError)?;
338
339 let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
340
341 let name = parser
342 .parse_object_name(false)
343 .map_err(ParseError::SqlParseError)?;
344
345 Ok((name, if_exists))
346}
347
348pub fn validate_properties<S: ::std::hash::BuildHasher>(
354 options: &HashMap<String, String, S>,
355) -> Result<LookupTableProperties, ParseError> {
356 let connector_str = options.get("connector").ok_or_else(|| {
357 ParseError::ValidationError("missing required property: 'connector'".to_string())
358 })?;
359 let connector = ConnectorType::parse(connector_str)?;
360
361 let connection = options.get("connection").cloned();
362
363 let strategy = match options.get("strategy") {
364 Some(s) => LookupStrategy::parse(s)?,
365 None => LookupStrategy::default(),
366 };
367
368 let cache_memory = options
369 .get("cache.memory")
370 .map(|s| ByteSize::parse(s))
371 .transpose()?;
372
373 let cache_disk = options
374 .get("cache.disk")
375 .map(|s| ByteSize::parse(s))
376 .transpose()?;
377
378 let cache_ttl = options
379 .get("cache.ttl")
380 .map(|s| {
381 s.parse::<u64>()
382 .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
383 })
384 .transpose()?;
385
386 let pushdown_mode = match options.get("pushdown") {
387 Some(s) => PushdownMode::parse(s)?,
388 None => PushdownMode::default(),
389 };
390
391 Ok(LookupTableProperties {
392 connector,
393 connection,
394 strategy,
395 cache_memory,
396 cache_disk,
397 cache_ttl,
398 pushdown_mode,
399 })
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use crate::parser::StreamingParser;
406 use crate::parser::StreamingStatement;
407
408 fn parse_one(sql: &str) -> StreamingStatement {
410 let stmts = StreamingParser::parse_sql(sql).unwrap();
411 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
412 stmts.into_iter().next().unwrap()
413 }
414
415 #[test]
416 fn test_parse_basic_create_lookup_table() {
417 let stmt = parse_one(
418 "CREATE LOOKUP TABLE instruments (
419 symbol VARCHAR NOT NULL,
420 name VARCHAR,
421 PRIMARY KEY (symbol)
422 ) WITH (
423 'connector' = 'postgres-cdc',
424 'connection' = 'postgresql://localhost/db'
425 )",
426 );
427 match stmt {
428 StreamingStatement::CreateLookupTable(lt) => {
429 assert_eq!(lt.name.to_string(), "instruments");
430 assert_eq!(lt.columns.len(), 2);
431 assert_eq!(lt.primary_key, vec!["symbol"]);
432 assert!(!lt.or_replace);
433 assert!(!lt.if_not_exists);
434 assert_eq!(
435 lt.with_options.get("connector"),
436 Some(&"postgres-cdc".to_string())
437 );
438 }
439 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
440 }
441 }
442
443 #[test]
444 fn test_parse_or_replace_and_if_not_exists() {
445 let stmt = parse_one(
446 "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
447 id INT,
448 PRIMARY KEY (id)
449 ) WITH (
450 'connector' = 'static'
451 )",
452 );
453 match stmt {
454 StreamingStatement::CreateLookupTable(lt) => {
455 assert!(lt.or_replace);
456 assert!(lt.if_not_exists);
457 }
458 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
459 }
460 }
461
462 #[test]
463 fn test_parse_with_primary_key() {
464 let stmt = parse_one(
465 "CREATE LOOKUP TABLE t (
466 a INT,
467 b VARCHAR,
468 c FLOAT,
469 PRIMARY KEY (a, b)
470 ) WITH ('connector' = 'static')",
471 );
472 match stmt {
473 StreamingStatement::CreateLookupTable(lt) => {
474 assert_eq!(lt.primary_key, vec!["a", "b"]);
475 assert_eq!(lt.columns.len(), 3);
476 }
477 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
478 }
479 }
480
481 #[test]
482 fn test_parse_with_clause_properties() {
483 let stmt = parse_one(
484 "CREATE LOOKUP TABLE t (
485 id INT,
486 PRIMARY KEY (id)
487 ) WITH (
488 'connector' = 'postgres-cdc',
489 'connection' = 'postgresql://localhost/db',
490 'strategy' = 'replicated',
491 'cache.memory' = '512mb',
492 'pushdown' = 'auto'
493 )",
494 );
495 match stmt {
496 StreamingStatement::CreateLookupTable(lt) => {
497 let props = validate_properties(<.with_options).unwrap();
498 assert_eq!(props.connector, ConnectorType::PostgresCdc);
499 assert_eq!(
500 props.connection.as_deref(),
501 Some("postgresql://localhost/db")
502 );
503 assert_eq!(props.strategy, LookupStrategy::Replicated);
504 assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
505 assert_eq!(props.pushdown_mode, PushdownMode::Auto);
506 }
507 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
508 }
509 }
510
511 #[test]
512 fn test_parse_drop_lookup_table() {
513 let stmt = parse_one("DROP LOOKUP TABLE instruments");
514 match stmt {
515 StreamingStatement::DropLookupTable { name, if_exists } => {
516 assert_eq!(name.to_string(), "instruments");
517 assert!(!if_exists);
518 }
519 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
520 }
521 }
522
523 #[test]
524 fn test_parse_drop_lookup_table_if_exists() {
525 let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
526 match stmt {
527 StreamingStatement::DropLookupTable { name, if_exists } => {
528 assert_eq!(name.to_string(), "instruments");
529 assert!(if_exists);
530 }
531 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
532 }
533 }
534
535 #[test]
536 fn test_byte_size_parsing() {
537 assert_eq!(
538 ByteSize::parse("512mb").unwrap(),
539 ByteSize(512 * 1024 * 1024)
540 );
541 assert_eq!(
542 ByteSize::parse("1gb").unwrap(),
543 ByteSize(1024 * 1024 * 1024)
544 );
545 assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
546 assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
547 assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
548 assert_eq!(
549 ByteSize::parse("2tb").unwrap(),
550 ByteSize(2 * 1024 * 1024 * 1024 * 1024)
551 );
552 }
553
554 #[test]
555 fn test_connector_type_parsing() {
556 assert_eq!(
557 ConnectorType::parse("postgres-cdc").unwrap(),
558 ConnectorType::PostgresCdc
559 );
560 assert_eq!(
561 ConnectorType::parse("mysql-cdc").unwrap(),
562 ConnectorType::MysqlCdc
563 );
564 assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
565 assert_eq!(
566 ConnectorType::parse("s3-parquet").unwrap(),
567 ConnectorType::S3Parquet
568 );
569 assert_eq!(
570 ConnectorType::parse("static").unwrap(),
571 ConnectorType::Static
572 );
573 assert_eq!(
574 ConnectorType::parse("delta-lake").unwrap(),
575 ConnectorType::DeltaLake
576 );
577 assert_eq!(
578 ConnectorType::parse("delta").unwrap(),
579 ConnectorType::DeltaLake
580 );
581 assert_eq!(
582 ConnectorType::parse("custom-src").unwrap(),
583 ConnectorType::Custom("custom-src".to_string())
584 );
585 }
586
587 #[test]
588 fn test_error_missing_columns() {
589 let result =
590 StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
591 assert!(result.is_err());
592 }
593
594 #[test]
595 fn test_error_missing_with_clause() {
596 let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
597 assert!(result.is_err());
598 }
599
600 #[test]
601 fn test_error_invalid_property() {
602 let mut options = HashMap::new();
603 options.insert("connector".to_string(), "postgres-cdc".to_string());
604 options.insert("strategy".to_string(), "invalid-strategy".to_string());
605 let result = validate_properties(&options);
606 assert!(result.is_err());
607 }
608}