1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21 pub name: ObjectName,
23 pub columns: Vec<ColumnDef>,
25 pub primary_key: Vec<String>,
27 pub with_options: HashMap<String, String>,
29 pub or_replace: bool,
31 pub if_not_exists: bool,
33}
34
35#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38 pub connector: ConnectorType,
40 pub connection: Option<String>,
42 pub strategy: LookupStrategy,
44 pub cache_memory: Option<ByteSize>,
46 pub cache_disk: Option<ByteSize>,
48 pub cache_ttl: Option<u64>,
50 pub pushdown_mode: PushdownMode,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57 PostgresCdc,
59 MysqlCdc,
61 Redis,
63 S3Parquet,
65 DeltaLake,
67 Static,
69 Custom(String),
71}
72
73impl ConnectorType {
74 pub fn parse(s: &str) -> Result<Self, ParseError> {
80 if s.is_empty() {
81 return Err(ParseError::ValidationError(
82 "connector type cannot be empty".to_string(),
83 ));
84 }
85 Ok(match s.to_lowercase().as_str() {
86 "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
87 "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
88 "redis" => Self::Redis,
89 "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
90 "delta-lake" | "delta_lake" | "delta" | "deltalake" => Self::DeltaLake,
91 "static" | "memory" => Self::Static,
92 other => Self::Custom(other.to_string()),
93 })
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum LookupStrategy {
100 #[default]
102 Replicated,
103 Partitioned,
105 OnDemand,
107}
108
109impl LookupStrategy {
110 pub fn parse(s: &str) -> Result<Self, ParseError> {
116 match s.to_lowercase().as_str() {
117 "replicated" | "full" => Ok(Self::Replicated),
118 "partitioned" | "sharded" => Ok(Self::Partitioned),
119 "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
120 other => Err(ParseError::ValidationError(format!(
121 "unknown lookup strategy: '{other}' \
122 (expected: replicated, partitioned, on-demand)"
123 ))),
124 }
125 }
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum PushdownMode {
131 #[default]
133 Auto,
134 Enabled,
136 Disabled,
138}
139
140impl PushdownMode {
141 pub fn parse(s: &str) -> Result<Self, ParseError> {
147 match s.to_lowercase().as_str() {
148 "auto" => Ok(Self::Auto),
149 "enabled" | "true" | "on" => Ok(Self::Enabled),
150 "disabled" | "false" | "off" => Ok(Self::Disabled),
151 other => Err(ParseError::ValidationError(format!(
152 "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
153 ))),
154 }
155 }
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub struct ByteSize(pub u64);
161
162impl ByteSize {
163 pub fn parse(s: &str) -> Result<Self, ParseError> {
171 let s = s.trim().to_lowercase();
172 let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
173 (n, 1024 * 1024 * 1024 * 1024)
174 } else if let Some(n) = s.strip_suffix("gb") {
175 (n, 1024 * 1024 * 1024)
176 } else if let Some(n) = s.strip_suffix("mb") {
177 (n, 1024 * 1024)
178 } else if let Some(n) = s.strip_suffix("kb") {
179 (n, 1024)
180 } else if let Some(n) = s.strip_suffix('b') {
181 (n, 1)
182 } else {
183 (s.as_str(), 1)
185 };
186
187 let num: u64 = num_str
188 .trim()
189 .parse()
190 .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
191
192 Ok(Self(num * multiplier))
193 }
194
195 #[must_use]
197 pub fn as_bytes(&self) -> u64 {
198 self.0
199 }
200}
201
202pub fn parse_create_lookup_table(
221 parser: &mut Parser,
222) -> Result<CreateLookupTableStatement, ParseError> {
223 parser
225 .expect_keyword(Keyword::CREATE)
226 .map_err(ParseError::SqlParseError)?;
227
228 let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
229
230 expect_custom_keyword(parser, "LOOKUP")?;
231
232 parser
233 .expect_keyword(Keyword::TABLE)
234 .map_err(ParseError::SqlParseError)?;
235
236 let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
237
238 let name = parser
239 .parse_object_name(false)
240 .map_err(ParseError::SqlParseError)?;
241
242 parser
244 .expect_token(&Token::LParen)
245 .map_err(ParseError::SqlParseError)?;
246
247 let mut columns = Vec::new();
248 let mut primary_key = Vec::new();
249
250 loop {
251 if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
253 parser
254 .expect_token(&Token::LParen)
255 .map_err(ParseError::SqlParseError)?;
256
257 loop {
258 let ident = parser
259 .parse_identifier()
260 .map_err(ParseError::SqlParseError)?;
261 primary_key.push(ident.value);
262
263 if !parser.consume_token(&Token::Comma) {
264 break;
265 }
266 }
267
268 parser
269 .expect_token(&Token::RParen)
270 .map_err(ParseError::SqlParseError)?;
271
272 let _ = parser.consume_token(&Token::Comma);
274 } else if parser.consume_token(&Token::RParen) {
275 break;
277 } else {
278 let col = parser
280 .parse_column_def()
281 .map_err(ParseError::SqlParseError)?;
282 columns.push(col);
283
284 if !parser.consume_token(&Token::Comma) {
286 parser
287 .expect_token(&Token::RParen)
288 .map_err(ParseError::SqlParseError)?;
289 break;
290 }
291 }
292 }
293
294 if columns.is_empty() {
295 return Err(ParseError::StreamingError(
296 "LOOKUP TABLE must have at least one column".to_string(),
297 ));
298 }
299
300 let with_options = parse_with_options(parser)?;
302 if with_options.is_empty() {
303 return Err(ParseError::StreamingError(
304 "LOOKUP TABLE requires a WITH clause".to_string(),
305 ));
306 }
307
308 Ok(CreateLookupTableStatement {
309 name,
310 columns,
311 primary_key,
312 with_options,
313 or_replace,
314 if_not_exists,
315 })
316}
317
318pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
326 parser
327 .expect_keyword(Keyword::DROP)
328 .map_err(ParseError::SqlParseError)?;
329
330 expect_custom_keyword(parser, "LOOKUP")?;
331
332 parser
333 .expect_keyword(Keyword::TABLE)
334 .map_err(ParseError::SqlParseError)?;
335
336 let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
337
338 let name = parser
339 .parse_object_name(false)
340 .map_err(ParseError::SqlParseError)?;
341
342 Ok((name, if_exists))
343}
344
345pub fn validate_properties<S: ::std::hash::BuildHasher>(
351 options: &HashMap<String, String, S>,
352) -> Result<LookupTableProperties, ParseError> {
353 let connector_str = options.get("connector").ok_or_else(|| {
354 ParseError::ValidationError("missing required property: 'connector'".to_string())
355 })?;
356 let connector = ConnectorType::parse(connector_str)?;
357
358 let connection = options.get("connection").cloned();
359
360 let strategy = match options.get("strategy") {
361 Some(s) => LookupStrategy::parse(s)?,
362 None => LookupStrategy::default(),
363 };
364
365 let cache_memory = options
366 .get("cache.memory")
367 .map(|s| ByteSize::parse(s))
368 .transpose()?;
369
370 let cache_disk = options
371 .get("cache.disk")
372 .map(|s| ByteSize::parse(s))
373 .transpose()?;
374
375 let cache_ttl = options
376 .get("cache.ttl")
377 .map(|s| {
378 s.parse::<u64>()
379 .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
380 })
381 .transpose()?;
382
383 let pushdown_mode = match options.get("pushdown") {
384 Some(s) => PushdownMode::parse(s)?,
385 None => PushdownMode::default(),
386 };
387
388 Ok(LookupTableProperties {
389 connector,
390 connection,
391 strategy,
392 cache_memory,
393 cache_disk,
394 cache_ttl,
395 pushdown_mode,
396 })
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::parser::StreamingParser;
403 use crate::parser::StreamingStatement;
404
405 fn parse_one(sql: &str) -> StreamingStatement {
407 let stmts = StreamingParser::parse_sql(sql).unwrap();
408 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
409 stmts.into_iter().next().unwrap()
410 }
411
412 #[test]
413 fn test_parse_basic_create_lookup_table() {
414 let stmt = parse_one(
415 "CREATE LOOKUP TABLE instruments (
416 symbol VARCHAR NOT NULL,
417 name VARCHAR,
418 PRIMARY KEY (symbol)
419 ) WITH (
420 'connector' = 'postgres-cdc',
421 'connection' = 'postgresql://localhost/db'
422 )",
423 );
424 match stmt {
425 StreamingStatement::CreateLookupTable(lt) => {
426 assert_eq!(lt.name.to_string(), "instruments");
427 assert_eq!(lt.columns.len(), 2);
428 assert_eq!(lt.primary_key, vec!["symbol"]);
429 assert!(!lt.or_replace);
430 assert!(!lt.if_not_exists);
431 assert_eq!(
432 lt.with_options.get("connector"),
433 Some(&"postgres-cdc".to_string())
434 );
435 }
436 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
437 }
438 }
439
440 #[test]
441 fn test_parse_or_replace_and_if_not_exists() {
442 let stmt = parse_one(
443 "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
444 id INT,
445 PRIMARY KEY (id)
446 ) WITH (
447 'connector' = 'static'
448 )",
449 );
450 match stmt {
451 StreamingStatement::CreateLookupTable(lt) => {
452 assert!(lt.or_replace);
453 assert!(lt.if_not_exists);
454 }
455 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
456 }
457 }
458
459 #[test]
460 fn test_parse_with_primary_key() {
461 let stmt = parse_one(
462 "CREATE LOOKUP TABLE t (
463 a INT,
464 b VARCHAR,
465 c FLOAT,
466 PRIMARY KEY (a, b)
467 ) WITH ('connector' = 'static')",
468 );
469 match stmt {
470 StreamingStatement::CreateLookupTable(lt) => {
471 assert_eq!(lt.primary_key, vec!["a", "b"]);
472 assert_eq!(lt.columns.len(), 3);
473 }
474 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
475 }
476 }
477
478 #[test]
479 fn test_parse_with_clause_properties() {
480 let stmt = parse_one(
481 "CREATE LOOKUP TABLE t (
482 id INT,
483 PRIMARY KEY (id)
484 ) WITH (
485 'connector' = 'postgres-cdc',
486 'connection' = 'postgresql://localhost/db',
487 'strategy' = 'replicated',
488 'cache.memory' = '512mb',
489 'pushdown' = 'auto'
490 )",
491 );
492 match stmt {
493 StreamingStatement::CreateLookupTable(lt) => {
494 let props = validate_properties(<.with_options).unwrap();
495 assert_eq!(props.connector, ConnectorType::PostgresCdc);
496 assert_eq!(
497 props.connection.as_deref(),
498 Some("postgresql://localhost/db")
499 );
500 assert_eq!(props.strategy, LookupStrategy::Replicated);
501 assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
502 assert_eq!(props.pushdown_mode, PushdownMode::Auto);
503 }
504 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
505 }
506 }
507
508 #[test]
509 fn test_parse_drop_lookup_table() {
510 let stmt = parse_one("DROP LOOKUP TABLE instruments");
511 match stmt {
512 StreamingStatement::DropLookupTable { name, if_exists } => {
513 assert_eq!(name.to_string(), "instruments");
514 assert!(!if_exists);
515 }
516 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
517 }
518 }
519
520 #[test]
521 fn test_parse_drop_lookup_table_if_exists() {
522 let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
523 match stmt {
524 StreamingStatement::DropLookupTable { name, if_exists } => {
525 assert_eq!(name.to_string(), "instruments");
526 assert!(if_exists);
527 }
528 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
529 }
530 }
531
532 #[test]
533 fn test_byte_size_parsing() {
534 assert_eq!(
535 ByteSize::parse("512mb").unwrap(),
536 ByteSize(512 * 1024 * 1024)
537 );
538 assert_eq!(
539 ByteSize::parse("1gb").unwrap(),
540 ByteSize(1024 * 1024 * 1024)
541 );
542 assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
543 assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
544 assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
545 assert_eq!(
546 ByteSize::parse("2tb").unwrap(),
547 ByteSize(2 * 1024 * 1024 * 1024 * 1024)
548 );
549 }
550
551 #[test]
552 fn test_connector_type_parsing() {
553 assert_eq!(
554 ConnectorType::parse("postgres-cdc").unwrap(),
555 ConnectorType::PostgresCdc
556 );
557 assert_eq!(
558 ConnectorType::parse("mysql-cdc").unwrap(),
559 ConnectorType::MysqlCdc
560 );
561 assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
562 assert_eq!(
563 ConnectorType::parse("s3-parquet").unwrap(),
564 ConnectorType::S3Parquet
565 );
566 assert_eq!(
567 ConnectorType::parse("static").unwrap(),
568 ConnectorType::Static
569 );
570 assert_eq!(
571 ConnectorType::parse("delta-lake").unwrap(),
572 ConnectorType::DeltaLake
573 );
574 assert_eq!(
575 ConnectorType::parse("delta").unwrap(),
576 ConnectorType::DeltaLake
577 );
578 assert_eq!(
579 ConnectorType::parse("custom-src").unwrap(),
580 ConnectorType::Custom("custom-src".to_string())
581 );
582 }
583
584 #[test]
585 fn test_error_missing_columns() {
586 let result =
587 StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
588 assert!(result.is_err());
589 }
590
591 #[test]
592 fn test_error_missing_with_clause() {
593 let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
594 assert!(result.is_err());
595 }
596
597 #[test]
598 fn test_error_invalid_property() {
599 let mut options = HashMap::new();
600 options.insert("connector".to_string(), "postgres-cdc".to_string());
601 options.insert("strategy".to_string(), "invalid-strategy".to_string());
602 let result = validate_properties(&options);
603 assert!(result.is_err());
604 }
605}