1use std::collections::HashMap;
8
9use sqlparser::ast::{ColumnDef, ObjectName};
10use sqlparser::keywords::Keyword;
11use sqlparser::parser::Parser;
12use sqlparser::tokenizer::Token;
13
14use super::tokenizer::{expect_custom_keyword, parse_with_options};
15use super::ParseError;
16
17#[derive(Debug, Clone, PartialEq)]
19pub struct CreateLookupTableStatement {
20 pub name: ObjectName,
22 pub columns: Vec<ColumnDef>,
24 pub primary_key: Vec<String>,
26 pub with_options: HashMap<String, String>,
28 pub or_replace: bool,
30 pub if_not_exists: bool,
32}
33
34#[derive(Debug, Clone, PartialEq)]
36pub struct LookupTableProperties {
37 pub connector: ConnectorType,
39 pub connection: Option<String>,
41 pub strategy: LookupStrategy,
43 pub cache_memory: Option<ByteSize>,
45 pub cache_disk: Option<ByteSize>,
47 pub cache_ttl: Option<u64>,
49 pub pushdown_mode: PushdownMode,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum ConnectorType {
56 PostgresCdc,
58 MysqlCdc,
60 Redis,
62 S3Parquet,
64 Static,
66 Custom(String),
68}
69
70impl ConnectorType {
71 pub fn parse(s: &str) -> Result<Self, ParseError> {
77 if s.is_empty() {
78 return Err(ParseError::ValidationError(
79 "connector type cannot be empty".to_string(),
80 ));
81 }
82 Ok(match s.to_lowercase().as_str() {
83 "postgres-cdc" | "postgres_cdc" | "postgresql" => Self::PostgresCdc,
84 "mysql-cdc" | "mysql_cdc" | "mysql" => Self::MysqlCdc,
85 "redis" => Self::Redis,
86 "s3-parquet" | "s3_parquet" | "s3" => Self::S3Parquet,
87 "static" | "memory" => Self::Static,
88 other => Self::Custom(other.to_string()),
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum LookupStrategy {
96 #[default]
98 Replicated,
99 Partitioned,
101 OnDemand,
103}
104
105impl LookupStrategy {
106 pub fn parse(s: &str) -> Result<Self, ParseError> {
112 match s.to_lowercase().as_str() {
113 "replicated" | "full" => Ok(Self::Replicated),
114 "partitioned" | "sharded" => Ok(Self::Partitioned),
115 "on-demand" | "on_demand" | "lazy" => Ok(Self::OnDemand),
116 other => Err(ParseError::ValidationError(format!(
117 "unknown lookup strategy: '{other}' \
118 (expected: replicated, partitioned, on-demand)"
119 ))),
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
126pub enum PushdownMode {
127 #[default]
129 Auto,
130 Enabled,
132 Disabled,
134}
135
136impl PushdownMode {
137 pub fn parse(s: &str) -> Result<Self, ParseError> {
143 match s.to_lowercase().as_str() {
144 "auto" => Ok(Self::Auto),
145 "enabled" | "true" | "on" => Ok(Self::Enabled),
146 "disabled" | "false" | "off" => Ok(Self::Disabled),
147 other => Err(ParseError::ValidationError(format!(
148 "unknown pushdown mode: '{other}' (expected: auto, enabled, disabled)"
149 ))),
150 }
151 }
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub struct ByteSize(pub u64);
157
158impl ByteSize {
159 pub fn parse(s: &str) -> Result<Self, ParseError> {
167 let s = s.trim().to_lowercase();
168 let (num_str, multiplier) = if let Some(n) = s.strip_suffix("tb") {
169 (n, 1024 * 1024 * 1024 * 1024)
170 } else if let Some(n) = s.strip_suffix("gb") {
171 (n, 1024 * 1024 * 1024)
172 } else if let Some(n) = s.strip_suffix("mb") {
173 (n, 1024 * 1024)
174 } else if let Some(n) = s.strip_suffix("kb") {
175 (n, 1024)
176 } else if let Some(n) = s.strip_suffix('b') {
177 (n, 1)
178 } else {
179 (s.as_str(), 1)
181 };
182
183 let num: u64 = num_str
184 .trim()
185 .parse()
186 .map_err(|_| ParseError::ValidationError(format!("invalid byte size: '{s}'")))?;
187
188 Ok(Self(num * multiplier))
189 }
190
191 #[must_use]
193 pub fn as_bytes(&self) -> u64 {
194 self.0
195 }
196}
197
198pub fn parse_create_lookup_table(
217 parser: &mut Parser,
218) -> Result<CreateLookupTableStatement, ParseError> {
219 parser
221 .expect_keyword(Keyword::CREATE)
222 .map_err(ParseError::SqlParseError)?;
223
224 let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
225
226 expect_custom_keyword(parser, "LOOKUP")?;
227
228 parser
229 .expect_keyword(Keyword::TABLE)
230 .map_err(ParseError::SqlParseError)?;
231
232 let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
233
234 let name = parser
235 .parse_object_name(false)
236 .map_err(ParseError::SqlParseError)?;
237
238 parser
240 .expect_token(&Token::LParen)
241 .map_err(ParseError::SqlParseError)?;
242
243 let mut columns = Vec::new();
244 let mut primary_key = Vec::new();
245
246 loop {
247 if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
249 parser
250 .expect_token(&Token::LParen)
251 .map_err(ParseError::SqlParseError)?;
252
253 loop {
254 let ident = parser
255 .parse_identifier()
256 .map_err(ParseError::SqlParseError)?;
257 primary_key.push(ident.value);
258
259 if !parser.consume_token(&Token::Comma) {
260 break;
261 }
262 }
263
264 parser
265 .expect_token(&Token::RParen)
266 .map_err(ParseError::SqlParseError)?;
267
268 let _ = parser.consume_token(&Token::Comma);
270 } else if parser.consume_token(&Token::RParen) {
271 break;
273 } else {
274 let col = parser
276 .parse_column_def()
277 .map_err(ParseError::SqlParseError)?;
278 columns.push(col);
279
280 if !parser.consume_token(&Token::Comma) {
282 parser
283 .expect_token(&Token::RParen)
284 .map_err(ParseError::SqlParseError)?;
285 break;
286 }
287 }
288 }
289
290 if columns.is_empty() {
291 return Err(ParseError::StreamingError(
292 "LOOKUP TABLE must have at least one column".to_string(),
293 ));
294 }
295
296 let with_options = parse_with_options(parser)?;
298 if with_options.is_empty() {
299 return Err(ParseError::StreamingError(
300 "LOOKUP TABLE requires a WITH clause".to_string(),
301 ));
302 }
303
304 Ok(CreateLookupTableStatement {
305 name,
306 columns,
307 primary_key,
308 with_options,
309 or_replace,
310 if_not_exists,
311 })
312}
313
314pub fn parse_drop_lookup_table(parser: &mut Parser) -> Result<(ObjectName, bool), ParseError> {
322 parser
323 .expect_keyword(Keyword::DROP)
324 .map_err(ParseError::SqlParseError)?;
325
326 expect_custom_keyword(parser, "LOOKUP")?;
327
328 parser
329 .expect_keyword(Keyword::TABLE)
330 .map_err(ParseError::SqlParseError)?;
331
332 let if_exists = parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
333
334 let name = parser
335 .parse_object_name(false)
336 .map_err(ParseError::SqlParseError)?;
337
338 Ok((name, if_exists))
339}
340
341pub fn validate_properties<S: ::std::hash::BuildHasher>(
347 options: &HashMap<String, String, S>,
348) -> Result<LookupTableProperties, ParseError> {
349 let connector_str = options.get("connector").ok_or_else(|| {
350 ParseError::ValidationError("missing required property: 'connector'".to_string())
351 })?;
352 let connector = ConnectorType::parse(connector_str)?;
353
354 let connection = options.get("connection").cloned();
355
356 let strategy = match options.get("strategy") {
357 Some(s) => LookupStrategy::parse(s)?,
358 None => LookupStrategy::default(),
359 };
360
361 let cache_memory = options
362 .get("cache.memory")
363 .map(|s| ByteSize::parse(s))
364 .transpose()?;
365
366 let cache_disk = options
367 .get("cache.disk")
368 .map(|s| ByteSize::parse(s))
369 .transpose()?;
370
371 let cache_ttl = options
372 .get("cache.ttl")
373 .map(|s| {
374 s.parse::<u64>()
375 .map_err(|_| ParseError::ValidationError(format!("invalid cache.ttl: '{s}'")))
376 })
377 .transpose()?;
378
379 let pushdown_mode = match options.get("pushdown") {
380 Some(s) => PushdownMode::parse(s)?,
381 None => PushdownMode::default(),
382 };
383
384 Ok(LookupTableProperties {
385 connector,
386 connection,
387 strategy,
388 cache_memory,
389 cache_disk,
390 cache_ttl,
391 pushdown_mode,
392 })
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use crate::parser::StreamingParser;
399 use crate::parser::StreamingStatement;
400
401 fn parse_one(sql: &str) -> StreamingStatement {
403 let stmts = StreamingParser::parse_sql(sql).unwrap();
404 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
405 stmts.into_iter().next().unwrap()
406 }
407
408 #[test]
409 fn test_parse_basic_create_lookup_table() {
410 let stmt = parse_one(
411 "CREATE LOOKUP TABLE instruments (
412 symbol VARCHAR NOT NULL,
413 name VARCHAR,
414 PRIMARY KEY (symbol)
415 ) WITH (
416 'connector' = 'postgres-cdc',
417 'connection' = 'postgresql://localhost/db'
418 )",
419 );
420 match stmt {
421 StreamingStatement::CreateLookupTable(lt) => {
422 assert_eq!(lt.name.to_string(), "instruments");
423 assert_eq!(lt.columns.len(), 2);
424 assert_eq!(lt.primary_key, vec!["symbol"]);
425 assert!(!lt.or_replace);
426 assert!(!lt.if_not_exists);
427 assert_eq!(
428 lt.with_options.get("connector"),
429 Some(&"postgres-cdc".to_string())
430 );
431 }
432 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
433 }
434 }
435
436 #[test]
437 fn test_parse_or_replace_and_if_not_exists() {
438 let stmt = parse_one(
439 "CREATE OR REPLACE LOOKUP TABLE IF NOT EXISTS dims (
440 id INT,
441 PRIMARY KEY (id)
442 ) WITH (
443 'connector' = 'static'
444 )",
445 );
446 match stmt {
447 StreamingStatement::CreateLookupTable(lt) => {
448 assert!(lt.or_replace);
449 assert!(lt.if_not_exists);
450 }
451 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
452 }
453 }
454
455 #[test]
456 fn test_parse_with_primary_key() {
457 let stmt = parse_one(
458 "CREATE LOOKUP TABLE t (
459 a INT,
460 b VARCHAR,
461 c FLOAT,
462 PRIMARY KEY (a, b)
463 ) WITH ('connector' = 'static')",
464 );
465 match stmt {
466 StreamingStatement::CreateLookupTable(lt) => {
467 assert_eq!(lt.primary_key, vec!["a", "b"]);
468 assert_eq!(lt.columns.len(), 3);
469 }
470 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
471 }
472 }
473
474 #[test]
475 fn test_parse_with_clause_properties() {
476 let stmt = parse_one(
477 "CREATE LOOKUP TABLE t (
478 id INT,
479 PRIMARY KEY (id)
480 ) WITH (
481 'connector' = 'postgres-cdc',
482 'connection' = 'postgresql://localhost/db',
483 'strategy' = 'replicated',
484 'cache.memory' = '512mb',
485 'pushdown' = 'auto'
486 )",
487 );
488 match stmt {
489 StreamingStatement::CreateLookupTable(lt) => {
490 let props = validate_properties(<.with_options).unwrap();
491 assert_eq!(props.connector, ConnectorType::PostgresCdc);
492 assert_eq!(
493 props.connection.as_deref(),
494 Some("postgresql://localhost/db")
495 );
496 assert_eq!(props.strategy, LookupStrategy::Replicated);
497 assert_eq!(props.cache_memory, Some(ByteSize(512 * 1024 * 1024)));
498 assert_eq!(props.pushdown_mode, PushdownMode::Auto);
499 }
500 _ => panic!("Expected CreateLookupTable, got {stmt:?}"),
501 }
502 }
503
504 #[test]
505 fn test_parse_drop_lookup_table() {
506 let stmt = parse_one("DROP LOOKUP TABLE instruments");
507 match stmt {
508 StreamingStatement::DropLookupTable { name, if_exists } => {
509 assert_eq!(name.to_string(), "instruments");
510 assert!(!if_exists);
511 }
512 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
513 }
514 }
515
516 #[test]
517 fn test_parse_drop_lookup_table_if_exists() {
518 let stmt = parse_one("DROP LOOKUP TABLE IF EXISTS instruments");
519 match stmt {
520 StreamingStatement::DropLookupTable { name, if_exists } => {
521 assert_eq!(name.to_string(), "instruments");
522 assert!(if_exists);
523 }
524 _ => panic!("Expected DropLookupTable, got {stmt:?}"),
525 }
526 }
527
528 #[test]
529 fn test_byte_size_parsing() {
530 assert_eq!(
531 ByteSize::parse("512mb").unwrap(),
532 ByteSize(512 * 1024 * 1024)
533 );
534 assert_eq!(
535 ByteSize::parse("1gb").unwrap(),
536 ByteSize(1024 * 1024 * 1024)
537 );
538 assert_eq!(ByteSize::parse("10kb").unwrap(), ByteSize(10 * 1024));
539 assert_eq!(ByteSize::parse("100b").unwrap(), ByteSize(100));
540 assert_eq!(ByteSize::parse("1024").unwrap(), ByteSize(1024));
541 assert_eq!(
542 ByteSize::parse("2tb").unwrap(),
543 ByteSize(2 * 1024 * 1024 * 1024 * 1024)
544 );
545 }
546
547 #[test]
548 fn test_connector_type_parsing() {
549 assert_eq!(
550 ConnectorType::parse("postgres-cdc").unwrap(),
551 ConnectorType::PostgresCdc
552 );
553 assert_eq!(
554 ConnectorType::parse("mysql-cdc").unwrap(),
555 ConnectorType::MysqlCdc
556 );
557 assert_eq!(ConnectorType::parse("redis").unwrap(), ConnectorType::Redis);
558 assert_eq!(
559 ConnectorType::parse("s3-parquet").unwrap(),
560 ConnectorType::S3Parquet
561 );
562 assert_eq!(
563 ConnectorType::parse("static").unwrap(),
564 ConnectorType::Static
565 );
566 assert_eq!(
567 ConnectorType::parse("custom-src").unwrap(),
568 ConnectorType::Custom("custom-src".to_string())
569 );
570 }
571
572 #[test]
573 fn test_error_missing_columns() {
574 let result =
575 StreamingParser::parse_sql("CREATE LOOKUP TABLE t () WITH ('connector' = 'static')");
576 assert!(result.is_err());
577 }
578
579 #[test]
580 fn test_error_missing_with_clause() {
581 let result = StreamingParser::parse_sql("CREATE LOOKUP TABLE t (id INT, PRIMARY KEY (id))");
582 assert!(result.is_err());
583 }
584
585 #[test]
586 fn test_error_invalid_property() {
587 let mut options = HashMap::new();
588 options.insert("connector".to_string(), "postgres-cdc".to_string());
589 options.insert("strategy".to_string(), "invalid-strategy".to_string());
590 let result = validate_properties(&options);
591 assert!(result.is_err());
592 }
593}