1use std::collections::HashMap;
37use std::str::FromStr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
42use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
43
44use crate::parser::ParseError;
45use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
46
47pub const MIN_BUFFER_SIZE: usize = 4;
49
50pub const MAX_BUFFER_SIZE: usize = 1 << 20; pub const DEFAULT_BUFFER_SIZE: usize = 2048;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum BackpressureStrategy {
59 #[default]
61 Block,
62 DropOldest,
64 Reject,
66}
67
68impl std::str::FromStr for BackpressureStrategy {
69 type Err = ParseError;
70
71 fn from_str(s: &str) -> Result<Self, Self::Err> {
72 match s.to_lowercase().as_str() {
73 "block" | "blocking" => Ok(Self::Block),
74 "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
75 "reject" | "error" => Ok(Self::Reject),
76 _ => Err(ParseError::ValidationError(format!(
77 "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
78 s
79 ))),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum WaitStrategy {
87 Spin,
89 #[default]
91 SpinYield,
92 Park,
94}
95
96impl std::str::FromStr for WaitStrategy {
97 type Err = ParseError;
98
99 fn from_str(s: &str) -> Result<Self, Self::Err> {
100 match s.to_lowercase().as_str() {
101 "spin" => Ok(Self::Spin),
102 "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
103 "park" | "parking" => Ok(Self::Park),
104 _ => Err(ParseError::ValidationError(format!(
105 "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
106 s
107 ))),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct WatermarkSpec {
115 pub column: String,
117 pub max_out_of_orderness: Duration,
119}
120
121#[derive(Debug, Clone)]
123pub struct SourceConfigOptions {
124 pub buffer_size: usize,
126 pub backpressure: BackpressureStrategy,
128 pub wait_strategy: WaitStrategy,
130 pub track_stats: bool,
132}
133
134impl Default for SourceConfigOptions {
135 fn default() -> Self {
136 Self {
137 buffer_size: DEFAULT_BUFFER_SIZE,
138 backpressure: BackpressureStrategy::Block,
139 wait_strategy: WaitStrategy::SpinYield,
140 track_stats: false,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct ColumnDefinition {
148 pub name: String,
150 pub data_type: DataType,
152 pub nullable: bool,
154}
155
156#[derive(Debug, Clone)]
161pub struct SourceDefinition {
162 pub name: String,
164 pub columns: Vec<ColumnDefinition>,
166 pub schema: SchemaRef,
168 pub watermark: Option<WatermarkSpec>,
170 pub config: SourceConfigOptions,
172}
173
174impl TryFrom<CreateSourceStatement> for SourceDefinition {
175 type Error = ParseError;
176
177 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
178 translate_create_source(stmt)
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct SinkDefinition {
185 pub name: String,
187 pub input: String,
189 pub config: SourceConfigOptions,
191}
192
193impl TryFrom<CreateSinkStatement> for SinkDefinition {
194 type Error = ParseError;
195
196 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
197 translate_create_sink(stmt)
198 }
199}
200
201pub fn translate_create_source(
210 stmt: CreateSourceStatement,
211) -> Result<SourceDefinition, ParseError> {
212 validate_source_options(&stmt.with_options)?;
214
215 let config = parse_source_options(&stmt.with_options)?;
217
218 let columns = convert_columns(&stmt.columns)?;
220
221 let fields: Vec<Field> = columns
223 .iter()
224 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
225 .collect();
226 let schema = Arc::new(Schema::new(fields));
227
228 let watermark = if let Some(wm) = stmt.watermark {
230 Some(parse_watermark(&wm, &columns)?)
231 } else {
232 None
233 };
234
235 Ok(SourceDefinition {
236 name: stmt.name.to_string(),
237 columns,
238 schema,
239 watermark,
240 config,
241 })
242}
243
244pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
252 validate_source_options(&stmt.with_options)?;
254
255 let config = parse_source_options(&stmt.with_options)?;
257
258 let input = match stmt.from {
260 SinkFrom::Table(name) => name.to_string(),
261 SinkFrom::Query(_) => {
262 return Err(ParseError::ValidationError(
264 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
265 ));
266 }
267 };
268
269 Ok(SinkDefinition {
270 name: stmt.name.to_string(),
271 input,
272 config,
273 })
274}
275
276fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
278 if options.contains_key("channel") {
280 return Err(ParseError::ValidationError(
281 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
282 ));
283 }
284
285 if options.contains_key("type") {
287 return Err(ParseError::ValidationError(
288 "the 'type' option is not user-configurable for in-memory streaming sources"
289 .to_string(),
290 ));
291 }
292
293 Ok(())
294}
295
296fn parse_source_options(
298 options: &HashMap<String, String>,
299) -> Result<SourceConfigOptions, ParseError> {
300 let mut config = SourceConfigOptions::default();
301
302 for (key, value) in options {
303 match key.to_lowercase().as_str() {
304 "buffer_size" | "buffersize" => {
305 config.buffer_size = parse_buffer_size(value)?;
306 }
307 "backpressure" => {
308 config.backpressure = BackpressureStrategy::from_str(value)?;
309 }
310 "wait_strategy" | "waitstrategy" => {
311 config.wait_strategy = WaitStrategy::from_str(value)?;
312 }
313 "track_stats" | "trackstats" | "stats" => {
314 config.track_stats = parse_bool(value)?;
315 }
316 _ => {}
320 }
321 }
322
323 Ok(config)
324}
325
326fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
328 let size: usize = value.parse().map_err(|_| {
329 ParseError::ValidationError(format!(
330 "invalid buffer_size: '{}' - must be a number",
331 value
332 ))
333 })?;
334
335 if size < MIN_BUFFER_SIZE {
336 return Err(ParseError::ValidationError(format!(
337 "buffer_size {} is too small - minimum is {}",
338 size, MIN_BUFFER_SIZE
339 )));
340 }
341
342 if size > MAX_BUFFER_SIZE {
343 return Err(ParseError::ValidationError(format!(
344 "buffer_size {} is too large - maximum is {}",
345 size, MAX_BUFFER_SIZE
346 )));
347 }
348
349 Ok(size)
350}
351
352fn parse_bool(value: &str) -> Result<bool, ParseError> {
354 match value.to_lowercase().as_str() {
355 "true" | "yes" | "on" | "1" => Ok(true),
356 "false" | "no" | "off" | "0" => Ok(false),
357 _ => Err(ParseError::ValidationError(format!(
358 "invalid boolean value: '{}' - expected true/false",
359 value
360 ))),
361 }
362}
363
364fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
366 columns.iter().map(convert_column).collect()
367}
368
369fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
371 let data_type = sql_type_to_arrow(&col.data_type)?;
372
373 let nullable = !col
375 .options
376 .iter()
377 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
378
379 Ok(ColumnDefinition {
380 name: col.name.to_string(),
381 data_type,
382 nullable,
383 })
384}
385
386pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
392 match sql_type {
393 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
395 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
396 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
397 SqlDataType::BigInt(_) => Ok(DataType::Int64),
398
399 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
404 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
405
406 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
408 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
410 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
411 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
412 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
414 Ok(DataType::Decimal128(precision, scale))
415 }
416
417 SqlDataType::Char(_)
419 | SqlDataType::Character(_)
420 | SqlDataType::Varchar(_)
421 | SqlDataType::CharacterVarying(_)
422 | SqlDataType::Text
423 | SqlDataType::String(_)
424 | SqlDataType::JSON
425 | SqlDataType::JSONB
426 | SqlDataType::Uuid => Ok(DataType::Utf8),
427
428 SqlDataType::Binary(_)
430 | SqlDataType::Varbinary(_)
431 | SqlDataType::Blob(_)
432 | SqlDataType::Bytea => Ok(DataType::Binary),
433
434 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
436
437 SqlDataType::Date => Ok(DataType::Date32),
439 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
440 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
441
442 SqlDataType::Interval { .. } => Ok(DataType::Interval(
444 arrow::datatypes::IntervalUnit::MonthDayNano,
445 )),
446
447 _ => Err(ParseError::ValidationError(format!(
449 "unsupported data type: {:?}",
450 sql_type
451 ))),
452 }
453}
454
455fn parse_watermark(
457 wm: &WatermarkDef,
458 columns: &[ColumnDefinition],
459) -> Result<WatermarkSpec, ParseError> {
460 let column_name = wm.column.to_string();
461
462 let col = columns
464 .iter()
465 .find(|c| c.name == column_name)
466 .ok_or_else(|| {
467 ParseError::ValidationError(format!(
468 "watermark column '{}' not found in column list",
469 column_name
470 ))
471 })?;
472
473 if !matches!(
475 col.data_type,
476 DataType::Timestamp(_, _)
477 | DataType::Date32
478 | DataType::Date64
479 | DataType::Int64
480 | DataType::Int32
481 ) {
482 return Err(ParseError::ValidationError(format!(
483 "watermark column '{}' must be a timestamp or integer type, found {:?}",
484 column_name, col.data_type
485 )));
486 }
487
488 let max_out_of_orderness = match &wm.expression {
491 Some(expr) => parse_watermark_expression(expr),
492 None => Duration::ZERO,
493 };
494
495 Ok(WatermarkSpec {
496 column: column_name,
497 max_out_of_orderness,
498 })
499}
500
501fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
503 use sqlparser::ast::Expr;
504
505 match expr {
506 Expr::BinaryOp { op, right, .. } => match op {
507 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
508 _ => Duration::ZERO,
509 },
510 Expr::Identifier(_) => Duration::ZERO,
512 _ => Duration::from_secs(1),
514 }
515}
516
517fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
519 use sqlparser::ast::Expr;
520
521 let Expr::Interval(interval) = expr else {
522 return Duration::from_secs(1);
523 };
524
525 let value_str = match interval.value.as_ref() {
527 Expr::Value(v) => {
528 match &v.value {
530 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
531 sqlparser::ast::Value::Number(n, _) => n.clone(),
532 _ => return Duration::from_secs(1),
533 }
534 }
535 _ => return Duration::from_secs(1),
536 };
537
538 let value: u64 = value_str.parse().unwrap_or(1);
539
540 let unit = interval
542 .leading_field
543 .as_ref()
544 .map_or("second", |u| match u {
545 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
546 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
547 sqlparser::ast::DateTimeField::Minute => "minute",
548 sqlparser::ast::DateTimeField::Hour => "hour",
549 sqlparser::ast::DateTimeField::Day => "day",
550 _ => "second",
551 });
552
553 match unit {
554 "microsecond" | "microseconds" => Duration::from_micros(value),
555 "millisecond" | "milliseconds" => Duration::from_millis(value),
556 "minute" | "minutes" => Duration::from_secs(value * 60),
557 "hour" | "hours" => Duration::from_secs(value * 3600),
558 "day" | "days" => Duration::from_secs(value * 86400),
559 _ => Duration::from_secs(value),
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::parser::{parse_streaming_sql, StreamingStatement};
567
568 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
569 let statements = parse_streaming_sql(sql)?;
570 let stmt = statements
571 .into_iter()
572 .next()
573 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
574 match stmt {
575 StreamingStatement::CreateSource(source) => translate_create_source(*source),
576 _ => Err(ParseError::StreamingError(
577 "Expected CREATE SOURCE".to_string(),
578 )),
579 }
580 }
581
582 #[test]
583 fn test_basic_source() {
584 let def =
585 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
586
587 assert_eq!(def.name, "events");
588 assert_eq!(def.columns.len(), 2);
589 assert_eq!(def.columns[0].name, "id");
590 assert_eq!(def.columns[0].data_type, DataType::Int64);
591 assert!(!def.columns[0].nullable);
592 assert_eq!(def.columns[1].name, "name");
593 assert!(def.columns[1].nullable);
594 }
595
596 #[test]
597 fn test_source_with_options() {
598 let def = parse_and_translate(
599 "CREATE SOURCE events (id BIGINT) WITH (
600 'buffer_size' = '4096',
601 'backpressure' = 'reject'
602 )",
603 )
604 .unwrap();
605
606 assert_eq!(def.config.buffer_size, 4096);
607 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
608 }
609
610 #[test]
611 fn test_source_with_watermark() {
612 let def = parse_and_translate(
613 "CREATE SOURCE events (
614 id BIGINT,
615 ts TIMESTAMP,
616 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
617 )",
618 )
619 .unwrap();
620
621 assert!(def.watermark.is_some());
622 let wm = def.watermark.unwrap();
623 assert_eq!(wm.column, "ts");
624 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
625 }
626
627 #[test]
628 fn test_reject_channel_option() {
629 let result =
630 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
631
632 assert!(result.is_err());
633 let err = result.unwrap_err();
634 assert!(err.to_string().contains("channel"));
635 }
636
637 #[test]
638 fn test_reject_type_option() {
639 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
640
641 assert!(result.is_err());
642 }
643
644 #[test]
645 fn test_buffer_size_bounds() {
646 let result =
648 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
649 assert!(result.is_err());
650
651 let result = parse_and_translate(
653 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
654 );
655 assert!(result.is_err());
656
657 let result =
659 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
660 assert!(result.is_ok());
661 }
662
663 #[test]
664 fn test_backpressure_strategies() {
665 assert_eq!(
666 BackpressureStrategy::from_str("block").unwrap(),
667 BackpressureStrategy::Block
668 );
669 assert_eq!(
670 BackpressureStrategy::from_str("drop_oldest").unwrap(),
671 BackpressureStrategy::DropOldest
672 );
673 assert_eq!(
674 BackpressureStrategy::from_str("reject").unwrap(),
675 BackpressureStrategy::Reject
676 );
677 assert!(BackpressureStrategy::from_str("invalid").is_err());
678 }
679
680 #[test]
681 fn test_wait_strategies() {
682 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
683 assert_eq!(
684 WaitStrategy::from_str("spin_yield").unwrap(),
685 WaitStrategy::SpinYield
686 );
687 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
688 assert!(WaitStrategy::from_str("invalid").is_err());
689 }
690
691 #[test]
692 fn test_sql_type_conversions() {
693 let def = parse_and_translate(
694 "CREATE SOURCE types (
695 a TINYINT,
696 b SMALLINT,
697 c INT,
698 d BIGINT,
699 e FLOAT,
700 f DOUBLE,
701 g DECIMAL(10,2),
702 h VARCHAR(255),
703 i TEXT,
704 j BOOLEAN,
705 k TIMESTAMP,
706 l DATE
707 )",
708 )
709 .unwrap();
710
711 assert_eq!(def.columns.len(), 12);
712 assert_eq!(def.columns[0].data_type, DataType::Int8);
713 assert_eq!(def.columns[1].data_type, DataType::Int16);
714 assert_eq!(def.columns[2].data_type, DataType::Int32);
715 assert_eq!(def.columns[3].data_type, DataType::Int64);
716 assert_eq!(def.columns[4].data_type, DataType::Float32);
717 assert_eq!(def.columns[5].data_type, DataType::Float64);
718 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
719 assert_eq!(def.columns[7].data_type, DataType::Utf8);
720 assert_eq!(def.columns[8].data_type, DataType::Utf8);
721 assert_eq!(def.columns[9].data_type, DataType::Boolean);
722 assert!(matches!(
723 def.columns[10].data_type,
724 DataType::Timestamp(_, _)
725 ));
726 assert_eq!(def.columns[11].data_type, DataType::Date32);
727 }
728
729 #[test]
730 fn test_schema_generation() {
731 let def = parse_and_translate(
732 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
733 )
734 .unwrap();
735
736 let schema = def.schema;
737 assert_eq!(schema.fields().len(), 3);
738 assert_eq!(schema.field(0).name(), "id");
739 assert!(!schema.field(0).is_nullable());
740 assert_eq!(schema.field(1).name(), "name");
741 assert!(!schema.field(1).is_nullable());
742 assert_eq!(schema.field(2).name(), "value");
743 assert!(schema.field(2).is_nullable());
744 }
745
746 #[test]
747 fn test_watermark_column_not_found() {
748 let result = parse_and_translate(
749 "CREATE SOURCE events (
750 id BIGINT,
751 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
752 )",
753 );
754
755 assert!(result.is_err());
756 assert!(result.unwrap_err().to_string().contains("not found"));
757 }
758
759 #[test]
760 fn test_watermark_wrong_type() {
761 let result = parse_and_translate(
762 "CREATE SOURCE events (
763 name VARCHAR,
764 WATERMARK FOR name AS name - INTERVAL '1' SECOND
765 )",
766 );
767
768 assert!(result.is_err());
769 assert!(result
770 .unwrap_err()
771 .to_string()
772 .contains("timestamp or integer type"));
773 }
774
775 #[test]
776 fn test_watermark_milliseconds() {
777 let def = parse_and_translate(
778 "CREATE SOURCE events (
779 ts TIMESTAMP,
780 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
781 )",
782 )
783 .unwrap();
784
785 let wm = def.watermark.unwrap();
786 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
787 }
788
789 #[test]
790 fn test_watermark_minutes() {
791 let def = parse_and_translate(
792 "CREATE SOURCE events (
793 ts TIMESTAMP,
794 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
795 )",
796 )
797 .unwrap();
798
799 let wm = def.watermark.unwrap();
800 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
801 }
802
803 #[test]
804 fn test_track_stats_option() {
805 let def =
806 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
807 .unwrap();
808
809 assert!(def.config.track_stats);
810 }
811
812 #[test]
813 fn test_wait_strategy_option() {
814 let def =
815 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
816 .unwrap();
817
818 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
819 }
820
821 #[test]
822 fn test_default_config() {
823 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
824
825 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
826 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
827 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
828 assert!(!def.config.track_stats);
829 }
830
831 #[test]
832 fn test_external_connector_options_ignored() {
833 let def = parse_and_translate(
835 "CREATE SOURCE events (id BIGINT) WITH (
836 'connector' = 'kafka',
837 'topic' = 'events',
838 'bootstrap.servers' = 'localhost:9092',
839 'buffer_size' = '8192'
840 )",
841 )
842 .unwrap();
843
844 assert_eq!(def.config.buffer_size, 8192);
846 }
847
848 #[test]
849 fn test_source_watermark_no_expression() {
850 let def = parse_and_translate(
851 "CREATE SOURCE events (
852 ts TIMESTAMP,
853 WATERMARK FOR ts
854 )",
855 )
856 .unwrap();
857
858 assert!(def.watermark.is_some());
859 let wm = def.watermark.unwrap();
860 assert_eq!(wm.column, "ts");
861 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
862 }
863
864 #[test]
865 fn test_source_watermark_bigint_column() {
866 let def = parse_and_translate(
867 "CREATE SOURCE events (
868 ts BIGINT,
869 WATERMARK FOR ts
870 )",
871 )
872 .unwrap();
873
874 assert!(def.watermark.is_some());
875 let wm = def.watermark.unwrap();
876 assert_eq!(wm.column, "ts");
877 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
878 }
879}