1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
9
10use sqlparser::ast::{ColumnDef, ObjectName};
11use sqlparser::keywords::Keyword;
12use sqlparser::parser::Parser;
13use sqlparser::tokenizer::Token;
14
15use super::tokenizer::{expect_custom_keyword, parse_with_options};
16use super::ParseError;
17
18#[derive(Debug, Clone, PartialEq)]
20pub struct CreateLookupTableStatement {
21 pub name: ObjectName,
23 pub columns: Vec<ColumnDef>,
25 pub primary_key: Vec<String>,
27 pub with_options: HashMap<String, String>,
29 pub or_replace: bool,
31 pub if_not_exists: bool,
33}
34
35#[derive(Debug, Clone, PartialEq)]
37pub struct LookupTableProperties {
38 pub connector: ConnectorType,
40 pub connection: Option<String>,
42 pub strategy: LookupStrategy,
44 pub cache_memory: Option<ByteSize>,
46 pub cache_disk: Option<ByteSize>,
48 pub cache_ttl: Option<u64>,
50 pub pushdown_mode: PushdownMode,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ConnectorType {
57 PostgresCdc,
59 MysqlCdc,
61 Redis,
63 S3Parquet,
65 Static,
67 Custom(String),
69}
70
71impl ConnectorType {
72 pub fn parse(s: &str) -> Result<Self, ParseError> {
78 if s.is_empty() {
79 return Err(ParseError::ValidationError(
80 "connector type cannot be empty".to_string(),
81 ));
82 }
83 Ok(match s.to_lowercase().as_str() {
84 "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
85 "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
86 "redis" => Self::Redis,
87 "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
88 "static" | "memory" => Self::Static,
89 other => Self::Custom(other.to_string()),
90 })
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum LookupStrategy {
97 #[default]
99 Replicated,
100 Partitioned,
102 OnDemand,
104}
105
106impl LookupStrategy {
107 pub fn parse(s: &str) -> Result<Self, ParseError> {
113 match s.to_lowercase().as_str() {
114 "replicated" | "full" => Ok(Self::Replicated),
115 "partitioned" | "sharded" => Ok(Self::Partitioned),
116 "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
117 other => Err(ParseError::ValidationError(format!(
118 "unknown lookup strategy: '{other}' \
119 (expected: replicated, partitioned, on-demand)"
120 ))),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum PushdownMode {
128 #[default]
130 Auto,
131 Enabled,
133 Disabled,
135}
136
137impl PushdownMode {
138 pub fn parse(s: &str) -> Result<Self, ParseError> {
144 match s.to_lowercase().as_str() {
145 "auto" => Ok(Self::Auto),
146 "enabled" | "true" | "on" => Ok(Self::Enabled),
147 "disabled" | "false" | "off" => Ok(Self::Disabled),
148 other => Err(ParseError::ValidationError(format!(
149 "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
150 ))),
151 }
152 }
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub struct ByteSize(pub u64);
158
159impl ByteSize {
160 pub fn parse(s: &str) -> Result<Self, ParseError> {
168 let s = s.trim().to_lowercase();
169 let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
170 (n, 1024 * 1024 * 1024 * 1024)
171 } else if let Some(n) = s.strip_suffix("gb") {
172 (n, 1024 * 1024 * 1024)
173 } else if let Some(n) = s.strip_suffix("mb") {
174 (n, 1024 * 1024)
175 } else if let Some(n) = s.strip_suffix("kb") {
176 (n, 1024)
177 } else if let Some(n) = s.strip_suffix('b') {
178 (n, 1)
179 } else {
180 (s.as_str(), 1)
182 };
183
184 let num: u64 = num_str
185 .trim()
186 .parse()
187 .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
188
189 Ok(Self(num * multiplier))
190 }
191
192 #[must_use]
194 pub fn as_bytes(&self) -> u64 {
195 self.0
196 }
197}
198
199pub fn parse_create_lookup_table(
218 parser: &mut Parser,
219) -> Result<CreateLookupTableStatement, ParseError> {
220 parser
222 .expect_keyword(Keyword::CREATE)
223 .map_err(ParseError::SqlParseError)?;
224
225 let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
226
227 expect_custom_keyword(parser, "LOOKUP")?;
228
229 parser
230 .expect_keyword(Keyword::TABLE)
231 .map_err(ParseError::SqlParseError)?;
232
233 let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
234
235 let name = parser
236 .parse_object_name(false)
237 .map_err(ParseError::SqlParseError)?;
238
239 parser
241 .expect_token(&Token::LParen)
242 .map_err(ParseError::SqlParseError)?;
243
244 let mut columns = Vec::new();
245 let mut primary_key = Vec::new();
246
247 loop {
248 if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
250 parser
251 .expect_token(&Token::LParen)
252 .map_err(ParseError::SqlParseError)?;
253
254 loop {
255 let ident = parser
256 .parse_identifier()
257 .map_err(ParseError::SqlParseError)?;
258 primary_key.push(ident.value);
259
260 if !parser.consume_token(&Token::Comma) {
261 break;
262 }
263 }
264
265 parser
266 .expect_token(&Token::RParen)
267 .map_err(ParseError::SqlParseError)?;
268
269 let _ = parser.consume_token(&Token::Comma);
271 } else if parser.consume_token(&Token::RParen) {
272 break;
274 } else {
275 let col = parser
277 .parse_column_def()
278 .map_err(ParseError::SqlParseError)?;
279 columns.push(col);
280
281 if !parser.consume_token(&Token::Comma) {
283 parser
284 .expect_token(&Token::RParen)
285 .map_err(ParseError::SqlParseError)?;
286 break;
287 }
288 }
289 }
290
291 if columns.is_empty() {
292 return Err(ParseError::StreamingError(
293 "LOOKUP TABLE must have at least one column".to_string(),
294 ));
295 }
296
297 let with_options = parse_with_options(parser)?;
299 if with_options.is_empty() {
300 return Err(ParseError::StreamingError(
301 "LOOKUP TABLE requires a WITH clause".to_string(),
302 ));
303 }
304
305 Ok(CreateLookupTableStatement {
306 name,
307 columns,
308 primary_key,
309 with_options,
310 or_replace,
311 if_not_exists,
312 })
313}
314
315pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
323 parser
324 .expect_keyword(Keyword::DROP)
325 .map_err(ParseError::SqlParseError)?;
326
327 expect_custom_keyword(parser, "LOOKUP")?;
328
329 parser
330 .expect_keyword(Keyword::TABLE)
331 .map_err(ParseError::SqlParseError)?;
332
333 let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
334
335 let name = parser
336 .parse_object_name(false)
337 .map_err(ParseError::SqlParseError)?;
338
339 Ok((name, if_exists))
340}
341
342pub fn validate_properties<S: ::std::hash::BuildHasher>(
348 options: &HashMap<String, String, S>,
349) -> Result<LookupTableProperties, ParseError> {
350 let connector_str = options.get("connector").ok_or_else(|| {
351 ParseError::ValidationError("missing required property: 'connector'".to_string())
352 })?;
353 let connector = ConnectorType::parse(connector_str)?;
354
355 let connection = options.get("connection").cloned();
356
357 let strategy = match options.get("strategy") {
358 Some(s) => LookupStrategy::parse(s)?,
359 None => LookupStrategy::default(),
360 };
361
362 let cache_memory = options
363 .get("cache.memory")
364 .map(|s| ByteSize::parse(s))
365 .transpose()?;
366
367 let cache_disk = options
368 .get("cache.disk")
369 .map(|s| ByteSize::parse(s))
370 .transpose()?;
371
372 let cache_ttl = options
373 .get("cache.ttl")
374 .map(|s| {
375 s.parse::<u64>()
376 .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
377 })
378 .transpose()?;
379
380 let pushdown_mode = match options.get("pushdown") {
381 Some(s) => PushdownMode::parse(s)?,
382 None => PushdownMode::default(),
383 };
384
385 Ok(LookupTableProperties {
386 connector,
387 connection,
388 strategy,
389 cache_memory,
390 cache_disk,
391 cache_ttl,
392 pushdown_mode,
393 })
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::parser::StreamingParser;
400 use crate::parser::StreamingStatement;
401
402 fn parse_one(sql: &str) -> StreamingStatement {
404 let stmts = StreamingParser::parse_sql(sql).unwrap();
405 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
406 stmts.into_iter().next().unwrap()
407 }
408
409 #[test]
410 fn test_parse_basic_create_lookup_table() {
411 let stmt = parse_one(
412 "CREATE LOOKUP TABLE instruments (
413 symbol VARCHAR NOT NULL,
414 name VARCHAR,
415 PRIMARY KEY (symbol)
416 ) WITH (
417 'connector' = 'postgres-cdc',
418 'connection' = 'postgresql://localhost/db'
419 )",
420 );
421 match stmt {
422 StreamingStatement::CreateLookupTable(lt) => {
423 assert_eq!(lt.name.to_string(), "instruments");
424 assert_eq!(lt.columns.len(), 2);
425 assert_eq!(lt.primary_key, vec!["symbol"]);
426 assert!(!lt.or_replace);
427 assert!(!lt.if_not_exists);
428 assert_eq!(
429 lt.with_options.get("connector"),
430 Some(&"postgres-cdc".to_string())
431 );
432 }
433 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
434 }
435 }
436
437 #[test]
438 fn test_parse_or_replace_and_if_not_exists() {
439 let stmt = parse_one(
440 "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
441 id INT,
442 PRIMARY KEY (id)
443 ) WITH (
444 'connector' = 'static'
445 )",
446 );
447 match stmt {
448 StreamingStatement::CreateLookupTable(lt) => {
449 assert!(lt.or_replace);
450 assert!(lt.if_not_exists);
451 }
452 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
453 }
454 }
455
456 #[test]
457 fn test_parse_with_primary_key() {
458 let stmt = parse_one(
459 "CREATE LOOKUP TABLE t (
460 a INT,
461 b VARCHAR,
462 c FLOAT,
463 PRIMARY KEY (a, b)
464 ) WITH ('connector' = 'static')",
465 );
466 match stmt {
467 StreamingStatement::CreateLookupTable(lt) => {
468 assert_eq!(lt.primary_key, vec!["a", "b"]);
469 assert_eq!(lt.columns.len(), 3);
470 }
471 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
472 }
473 }
474
475 #[test]
476 fn test_parse_with_clause_properties() {
477 let stmt = parse_one(
478 "CREATE LOOKUP TABLE t (
479 id INT,
480 PRIMARY KEY (id)
481 ) WITH (
482 'connector' = 'postgres-cdc',
483 'connection' = 'postgresql://localhost/db',
484 'strategy' = 'replicated',
485 'cache.memory' = '512mb',
486 'pushdown' = 'auto'
487 )",
488 );
489 match stmt {
490 StreamingStatement::CreateLookupTable(lt) => {
491 let props = validate_properties(<.with_options).unwrap();
492 assert_eq!(props.connector, ConnectorType::PostgresCdc);
493 assert_eq!(
494 props.connection.as_deref(),
495 Some("postgresql://localhost/db")
496 );
497 assert_eq!(props.strategy, LookupStrategy::Replicated);
498 assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
499 assert_eq!(props.pushdown_mode, PushdownMode::Auto);
500 }
501 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
502 }
503 }
504
505 #[test]
506 fn test_parse_drop_lookup_table() {
507 let stmt = parse_one("DROP LOOKUP TABLE instruments");
508 match stmt {
509 StreamingStatement::DropLookupTable { name, if_exists } => {
510 assert_eq!(name.to_string(), "instruments");
511 assert!(!if_exists);
512 }
513 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
514 }
515 }
516
517 #[test]
518 fn test_parse_drop_lookup_table_if_exists() {
519 let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
520 match stmt {
521 StreamingStatement::DropLookupTable { name, if_exists } => {
522 assert_eq!(name.to_string(), "instruments");
523 assert!(if_exists);
524 }
525 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
526 }
527 }
528
529 #[test]
530 fn test_byte_size_parsing() {
531 assert_eq!(
532 ByteSize::parse("512mb").unwrap(),
533 ByteSize(512 * 1024 * 1024)
534 );
535 assert_eq!(
536 ByteSize::parse("1gb").unwrap(),
537 ByteSize(1024 * 1024 * 1024)
538 );
539 assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
540 assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
541 assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
542 assert_eq!(
543 ByteSize::parse("2tb").unwrap(),
544 ByteSize(2 * 1024 * 1024 * 1024 * 1024)
545 );
546 }
547
548 #[test]
549 fn test_connector_type_parsing() {
550 assert_eq!(
551 ConnectorType::parse("postgres-cdc").unwrap(),
552 ConnectorType::PostgresCdc
553 );
554 assert_eq!(
555 ConnectorType::parse("mysql-cdc").unwrap(),
556 ConnectorType::MysqlCdc
557 );
558 assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
559 assert_eq!(
560 ConnectorType::parse("s3-parquet").unwrap(),
561 ConnectorType::S3Parquet
562 );
563 assert_eq!(
564 ConnectorType::parse("static").unwrap(),
565 ConnectorType::Static
566 );
567 assert_eq!(
568 ConnectorType::parse("custom-src").unwrap(),
569 ConnectorType::Custom("custom-src".to_string())
570 );
571 }
572
573 #[test]
574 fn test_error_missing_columns() {
575 let result =
576 StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
577 assert!(result.is_err());
578 }
579
580 #[test]
581 fn test_error_missing_with_clause() {
582 let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
583 assert!(result.is_err());
584 }
585
586 #[test]
587 fn test_error_invalid_property() {
588 let mut options = HashMap::new();
589 options.insert("connector".to_string(), "postgres-cdc".to_string());
590 options.insert("strategy".to_string(), "invalid-strategy".to_string());
591 let result = validate_properties(&options);
592 assert!(result.is_err());
593 }
594}