Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to Streaming API translation.
2//!
3//! This module translates parsed SQL CREATE SOURCE/SINK statements into
4//! typed streaming definitions that can be used to configure the runtime.
5//!
6//! ## Supported Syntax
7//!
8//! ```sql
9//! -- In-memory streaming source
10//! CREATE SOURCE trades (
11//!     symbol VARCHAR NOT NULL,
12//!     price DOUBLE NOT NULL,
13//!     quantity BIGINT NOT NULL,
14//!     ts TIMESTAMP NOT NULL,
15//!     WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECONDS
16//! ) WITH (
17//!     buffer_size = 131072,
18//!     backpressure = 'block'
19//! );
20//!
21//! -- In-memory streaming sink
22//! CREATE SINK trade_aggregates AS
23//!     SELECT * FROM trade_stats
24//! WITH (
25//!     buffer_size = 65536
26//! );
27//! ```
28//!
29//! ## Validation
30//!
31//! - Rejects `channel = ...` option (channel type is auto-derived)
32//! - Validates buffer_size is within bounds
33//! - Validates backpressure strategy names
34//! - Validates wait_strategy names
35
36use std::collections::HashMap;
37use std::str::FromStr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
42use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
43
44use crate::parser::ParseError;
45use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
46
47/// Minimum buffer size for streaming channels.
48pub const MIN_BUFFER_SIZE: usize = 4;
49
50/// Maximum buffer size for streaming channels.
51pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
52
53/// Default buffer size for streaming channels.
54pub const DEFAULT_BUFFER_SIZE: usize = 2048;
55
56/// Backpressure strategy for streaming channels.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum BackpressureStrategy {
59    /// Block until space is available.
60    #[default]
61    Block,
62    /// Drop oldest item to make room.
63    DropOldest,
64    /// Reject and return error immediately.
65    Reject,
66}
67
68impl std::str::FromStr for BackpressureStrategy {
69    type Err = ParseError;
70
71    fn from_str(s: &str) -> Result<Self, Self::Err> {
72        match s.to_lowercase().as_str() {
73            "block" | "blocking" => Ok(Self::Block),
74            "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
75            "reject" | "error" => Ok(Self::Reject),
76            _ => Err(ParseError::ValidationError(format!(
77                "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
78                s
79            ))),
80        }
81    }
82}
83
84/// Wait strategy for streaming consumers.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum WaitStrategy {
87    /// Spin-loop without yielding (lowest latency, highest CPU).
88    Spin,
89    /// Spin with occasional yields (balanced).
90    #[default]
91    SpinYield,
92    /// Park the thread (lowest CPU, higher latency).
93    Park,
94}
95
96impl std::str::FromStr for WaitStrategy {
97    type Err = ParseError;
98
99    fn from_str(s: &str) -> Result<Self, Self::Err> {
100        match s.to_lowercase().as_str() {
101            "spin" => Ok(Self::Spin),
102            "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
103            "park" | "parking" => Ok(Self::Park),
104            _ => Err(ParseError::ValidationError(format!(
105                "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
106                s
107            ))),
108        }
109    }
110}
111
112/// Watermark specification for a source.
113#[derive(Debug, Clone)]
114pub struct WatermarkSpec {
115    /// Column name for event time.
116    pub column: String,
117    /// Bounded out-of-orderness duration.
118    pub max_out_of_orderness: Duration,
119    /// Whether this is a processing-time watermark (`PROCTIME()`).
120    ///
121    /// When `true`, the runtime should use `ProcessingTimeGenerator`
122    /// instead of `BoundedOutOfOrdernessGenerator`.
123    pub is_processing_time: bool,
124}
125
126/// Configuration options for a streaming source.
127#[derive(Debug, Clone)]
128pub struct SourceConfigOptions {
129    /// Buffer size for the channel.
130    pub buffer_size: usize,
131    /// Backpressure strategy.
132    pub backpressure: BackpressureStrategy,
133    /// Wait strategy for consumers.
134    pub wait_strategy: WaitStrategy,
135    /// Whether to track statistics.
136    pub track_stats: bool,
137}
138
139impl Default for SourceConfigOptions {
140    fn default() -> Self {
141        Self {
142            buffer_size: DEFAULT_BUFFER_SIZE,
143            backpressure: BackpressureStrategy::Block,
144            wait_strategy: WaitStrategy::SpinYield,
145            track_stats: false,
146        }
147    }
148}
149
150/// Column definition for a streaming source.
151#[derive(Debug, Clone)]
152pub struct ColumnDefinition {
153    /// Column name.
154    pub name: String,
155    /// Arrow data type.
156    pub data_type: DataType,
157    /// Whether the column is nullable.
158    pub nullable: bool,
159}
160
161/// A validated streaming source definition.
162///
163/// This is the output of translating a `CreateSourceStatement` to a typed
164/// configuration that can be used to create runtime sources.
165#[derive(Debug, Clone)]
166pub struct SourceDefinition {
167    /// Source name.
168    pub name: String,
169    /// Column definitions.
170    pub columns: Vec<ColumnDefinition>,
171    /// Arrow schema.
172    pub schema: SchemaRef,
173    /// Watermark specification, if defined.
174    pub watermark: Option<WatermarkSpec>,
175    /// Configuration options.
176    pub config: SourceConfigOptions,
177}
178
179impl TryFrom<CreateSourceStatement> for SourceDefinition {
180    type Error = ParseError;
181
182    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
183        translate_create_source(stmt)
184    }
185}
186
187/// A validated streaming sink definition.
188#[derive(Debug, Clone)]
189pub struct SinkDefinition {
190    /// Sink name.
191    pub name: String,
192    /// Input source or query.
193    pub input: String,
194    /// Configuration options.
195    pub config: SourceConfigOptions,
196}
197
198impl TryFrom<CreateSinkStatement> for SinkDefinition {
199    type Error = ParseError;
200
201    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
202        translate_create_sink(stmt)
203    }
204}
205
206/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
207///
208/// # Errors
209///
210/// Returns `ParseError::ValidationError` if:
211/// - The `channel` option is specified (not user-configurable)
212/// - An invalid option value is provided
213/// - Column types cannot be converted to Arrow types
214pub fn translate_create_source(
215    stmt: CreateSourceStatement,
216) -> Result<SourceDefinition, ParseError> {
217    // Validate options first - reject 'channel' option
218    validate_source_options(&stmt.with_options)?;
219
220    // Parse configuration options
221    let config = parse_source_options(&stmt.with_options)?;
222
223    // Convert columns to Arrow types
224    let columns = convert_columns(&stmt.columns)?;
225
226    // Build Arrow schema
227    let fields: Vec<Field> = columns
228        .iter()
229        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
230        .collect();
231    let schema = Arc::new(Schema::new(fields));
232
233    // Parse watermark if present
234    let watermark = if let Some(wm) = stmt.watermark {
235        Some(parse_watermark(&wm, &columns)?)
236    } else {
237        None
238    };
239
240    Ok(SourceDefinition {
241        name: stmt.name.to_string(),
242        columns,
243        schema,
244        watermark,
245        config,
246    })
247}
248
249/// Translates a CREATE SINK statement to a typed SinkDefinition.
250///
251/// # Errors
252///
253/// Returns `ParseError::ValidationError` if:
254/// - The `channel` option is specified (not user-configurable)
255/// - An invalid option value is provided
256pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
257    // Validate options first
258    validate_source_options(&stmt.with_options)?;
259
260    // Parse configuration options
261    let config = parse_source_options(&stmt.with_options)?;
262
263    // Get input name
264    let input = match stmt.from {
265        SinkFrom::Table(name) => name.to_string(),
266        SinkFrom::Query(_) => {
267            // For now, we don't support inline queries - need to create a view first
268            return Err(ParseError::ValidationError(
269                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
270            ));
271        }
272    };
273
274    Ok(SinkDefinition {
275        name: stmt.name.to_string(),
276        input,
277        config,
278    })
279}
280
281/// Validates that source options don't include disallowed keys.
282fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
283    // Reject 'channel' option - channel type is auto-derived
284    if options.contains_key("channel") {
285        return Err(ParseError::ValidationError(
286            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
287        ));
288    }
289
290    // Reject 'type' option for same reason
291    if options.contains_key("type") {
292        return Err(ParseError::ValidationError(
293            "the 'type' option is not user-configurable for in-memory streaming sources"
294                .to_string(),
295        ));
296    }
297
298    Ok(())
299}
300
301/// Parses source options from WITH clause.
302fn parse_source_options(
303    options: &HashMap<String, String>,
304) -> Result<SourceConfigOptions, ParseError> {
305    let mut config = SourceConfigOptions::default();
306
307    for (key, value) in options {
308        match key.to_lowercase().as_str() {
309            "buffer_size" | "buffersize" => {
310                config.buffer_size = parse_buffer_size(value)?;
311            }
312            "backpressure" => {
313                config.backpressure = BackpressureStrategy::from_str(value)?;
314            }
315            "wait_strategy" | "waitstrategy" => {
316                config.wait_strategy = WaitStrategy::from_str(value)?;
317            }
318            "track_stats" | "trackstats" | "stats" => {
319                config.track_stats = parse_bool(value)?;
320            }
321            // Ignore connector-specific and unknown options.
322            // Connector-specific: handled by connector implementations.
323            // Unknown: allow forward compatibility with new options.
324            _ => {}
325        }
326    }
327
328    Ok(config)
329}
330
331/// Parses buffer_size option.
332fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
333    let size: usize = value.parse().map_err(|_| {
334        ParseError::ValidationError(format!(
335            "invalid buffer_size: '{}' - must be a number",
336            value
337        ))
338    })?;
339
340    if size < MIN_BUFFER_SIZE {
341        return Err(ParseError::ValidationError(format!(
342            "buffer_size {} is too small - minimum is {}",
343            size, MIN_BUFFER_SIZE
344        )));
345    }
346
347    if size > MAX_BUFFER_SIZE {
348        return Err(ParseError::ValidationError(format!(
349            "buffer_size {} is too large - maximum is {}",
350            size, MAX_BUFFER_SIZE
351        )));
352    }
353
354    Ok(size)
355}
356
357/// Parses a boolean option.
358fn parse_bool(value: &str) -> Result<bool, ParseError> {
359    match value.to_lowercase().as_str() {
360        "true" | "yes" | "on" | "1" => Ok(true),
361        "false" | "no" | "off" | "0" => Ok(false),
362        _ => Err(ParseError::ValidationError(format!(
363            "invalid boolean value: '{}' - expected true/false",
364            value
365        ))),
366    }
367}
368
369/// Converts SQL column definitions to Arrow types.
370fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
371    columns.iter().map(convert_column).collect()
372}
373
374/// Converts a single SQL column definition to Arrow type.
375fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
376    let data_type = sql_type_to_arrow(&col.data_type)?;
377
378    // Check for NOT NULL constraint
379    let nullable = !col
380        .options
381        .iter()
382        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
383
384    Ok(ColumnDefinition {
385        name: col.name.to_string(),
386        data_type,
387        nullable,
388    })
389}
390
391/// Converts SQL data type to Arrow data type.
392///
393/// # Errors
394///
395/// Returns `ParseError::ValidationError` for unsupported SQL data types.
396pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
397    match sql_type {
398        // Integer types
399        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
400        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
401        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
402        SqlDataType::BigInt(_) => Ok(DataType::Int64),
403
404        // Unsigned integer types - wrapped in Unsigned variant
405        // Note: sqlparser wraps unsigned types differently in different versions
406
407        // Floating point types
408        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
409        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
410
411        // Decimal types
412        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
413            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
414            let (precision, scale) = match info {
415                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
416                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
417                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
418            };
419            Ok(DataType::Decimal128(precision, scale))
420        }
421
422        // String types (including JSON/UUID stored as strings)
423        SqlDataType::Char(_)
424        | SqlDataType::Character(_)
425        | SqlDataType::Varchar(_)
426        | SqlDataType::CharacterVarying(_)
427        | SqlDataType::Text
428        | SqlDataType::String(_)
429        | SqlDataType::JSON
430        | SqlDataType::JSONB
431        | SqlDataType::Uuid => Ok(DataType::Utf8),
432
433        // Binary types
434        SqlDataType::Binary(_)
435        | SqlDataType::Varbinary(_)
436        | SqlDataType::Blob(_)
437        | SqlDataType::Bytea => Ok(DataType::Binary),
438
439        // Boolean type
440        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
441
442        // Date/time types
443        SqlDataType::Date => Ok(DataType::Date32),
444        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
445        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
446
447        // Interval type
448        SqlDataType::Interval { .. } => Ok(DataType::Interval(
449            arrow::datatypes::IntervalUnit::MonthDayNano,
450        )),
451
452        // Unsupported types
453        _ => Err(ParseError::ValidationError(format!(
454            "unsupported data type: {:?}",
455            sql_type
456        ))),
457    }
458}
459
460/// Checks if an expression is a `PROCTIME()` function call.
461fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
462    if let sqlparser::ast::Expr::Function(func) = expr {
463        if let Some(name) = func.name.0.last() {
464            return name.to_string().eq_ignore_ascii_case("proctime");
465        }
466    }
467    false
468}
469
470/// Parses watermark definition.
471fn parse_watermark(
472    wm: &WatermarkDef,
473    columns: &[ColumnDefinition],
474) -> Result<WatermarkSpec, ParseError> {
475    let column_name = wm.column.to_string();
476
477    // Verify column exists and is a timestamp type
478    let col = columns
479        .iter()
480        .find(|c| c.name == column_name)
481        .ok_or_else(|| {
482            ParseError::ValidationError(format!(
483                "watermark column '{}' not found in column list",
484                column_name
485            ))
486        })?;
487
488    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
489    if !matches!(
490        col.data_type,
491        DataType::Timestamp(_, _)
492            | DataType::Date32
493            | DataType::Date64
494            | DataType::Int64
495            | DataType::Int32
496    ) {
497        return Err(ParseError::ValidationError(format!(
498            "watermark column '{}' must be a timestamp or integer type, found {:?}",
499            column_name, col.data_type
500        )));
501    }
502
503    // Check for PROCTIME() watermark expression
504    if let Some(expr) = &wm.expression {
505        if is_proctime_call(expr) {
506            return Ok(WatermarkSpec {
507                column: column_name,
508                max_out_of_orderness: Duration::ZERO,
509                is_processing_time: true,
510            });
511        }
512    }
513
514    // Parse the watermark expression to extract out-of-orderness.
515    // When expression is None (WATERMARK FOR col without AS), use zero delay.
516    let max_out_of_orderness = match &wm.expression {
517        Some(expr) => parse_watermark_expression(expr),
518        None => Duration::ZERO,
519    };
520
521    Ok(WatermarkSpec {
522        column: column_name,
523        max_out_of_orderness,
524        is_processing_time: false,
525    })
526}
527
528/// Parses watermark expression to extract the bounded out-of-orderness.
529fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
530    use sqlparser::ast::Expr;
531
532    match expr {
533        Expr::BinaryOp { op, right, .. } => match op {
534            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
535            _ => Duration::ZERO,
536        },
537        // If just the column name, assume zero lateness
538        Expr::Identifier(_) => Duration::ZERO,
539        // Default to 1 second for complex expressions
540        _ => Duration::from_secs(1),
541    }
542}
543
544/// Parses an interval expression to a Duration.
545fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
546    use sqlparser::ast::Expr;
547
548    let Expr::Interval(interval) = expr else {
549        return Duration::from_secs(1);
550    };
551
552    // Extract value and unit from interval
553    let value_str = match interval.value.as_ref() {
554        Expr::Value(v) => {
555            // v is ValueWithSpan, access the inner value
556            match &v.value {
557                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
558                sqlparser::ast::Value::Number(n, _) => n.clone(),
559                _ => return Duration::from_secs(1),
560            }
561        }
562        _ => return Duration::from_secs(1),
563    };
564
565    let value: u64 = value_str.parse().unwrap_or(1);
566
567    // Determine unit
568    let unit = interval
569        .leading_field
570        .as_ref()
571        .map_or("second", |u| match u {
572            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
573            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
574            sqlparser::ast::DateTimeField::Minute => "minute",
575            sqlparser::ast::DateTimeField::Hour => "hour",
576            sqlparser::ast::DateTimeField::Day => "day",
577            _ => "second",
578        });
579
580    match unit {
581        "microsecond" | "microseconds" => Duration::from_micros(value),
582        "millisecond" | "milliseconds" => Duration::from_millis(value),
583        "minute" | "minutes" => Duration::from_secs(value * 60),
584        "hour" | "hours" => Duration::from_secs(value * 3600),
585        "day" | "days" => Duration::from_secs(value * 86400),
586        _ => Duration::from_secs(value),
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use crate::parser::{parse_streaming_sql, StreamingStatement};
594
595    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
596        let statements = parse_streaming_sql(sql)?;
597        let stmt = statements
598            .into_iter()
599            .next()
600            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
601        match stmt {
602            StreamingStatement::CreateSource(source) => translate_create_source(*source),
603            _ => Err(ParseError::StreamingError(
604                "Expected CREATE SOURCE".to_string(),
605            )),
606        }
607    }
608
609    #[test]
610    fn test_basic_source() {
611        let def =
612            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
613
614        assert_eq!(def.name, "events");
615        assert_eq!(def.columns.len(), 2);
616        assert_eq!(def.columns[0].name, "id");
617        assert_eq!(def.columns[0].data_type, DataType::Int64);
618        assert!(!def.columns[0].nullable);
619        assert_eq!(def.columns[1].name, "name");
620        assert!(def.columns[1].nullable);
621    }
622
623    #[test]
624    fn test_source_with_options() {
625        let def = parse_and_translate(
626            "CREATE SOURCE events (id BIGINT) WITH (
627                'buffer_size' = '4096',
628                'backpressure' = 'reject'
629            )",
630        )
631        .unwrap();
632
633        assert_eq!(def.config.buffer_size, 4096);
634        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
635    }
636
637    #[test]
638    fn test_source_with_watermark() {
639        let def = parse_and_translate(
640            "CREATE SOURCE events (
641                id BIGINT,
642                ts TIMESTAMP,
643                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
644            )",
645        )
646        .unwrap();
647
648        assert!(def.watermark.is_some());
649        let wm = def.watermark.unwrap();
650        assert_eq!(wm.column, "ts");
651        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
652    }
653
654    #[test]
655    fn test_reject_channel_option() {
656        let result =
657            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
658
659        assert!(result.is_err());
660        let err = result.unwrap_err();
661        assert!(err.to_string().contains("channel"));
662    }
663
664    #[test]
665    fn test_reject_type_option() {
666        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
667
668        assert!(result.is_err());
669    }
670
671    #[test]
672    fn test_buffer_size_bounds() {
673        // Too small
674        let result =
675            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
676        assert!(result.is_err());
677
678        // Too large
679        let result = parse_and_translate(
680            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
681        );
682        assert!(result.is_err());
683
684        // Valid
685        let result =
686            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
687        assert!(result.is_ok());
688    }
689
690    #[test]
691    fn test_backpressure_strategies() {
692        assert_eq!(
693            BackpressureStrategy::from_str("block").unwrap(),
694            BackpressureStrategy::Block
695        );
696        assert_eq!(
697            BackpressureStrategy::from_str("drop_oldest").unwrap(),
698            BackpressureStrategy::DropOldest
699        );
700        assert_eq!(
701            BackpressureStrategy::from_str("reject").unwrap(),
702            BackpressureStrategy::Reject
703        );
704        assert!(BackpressureStrategy::from_str("invalid").is_err());
705    }
706
707    #[test]
708    fn test_wait_strategies() {
709        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
710        assert_eq!(
711            WaitStrategy::from_str("spin_yield").unwrap(),
712            WaitStrategy::SpinYield
713        );
714        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
715        assert!(WaitStrategy::from_str("invalid").is_err());
716    }
717
718    #[test]
719    fn test_sql_type_conversions() {
720        let def = parse_and_translate(
721            "CREATE SOURCE types (
722                a TINYINT,
723                b SMALLINT,
724                c INT,
725                d BIGINT,
726                e FLOAT,
727                f DOUBLE,
728                g DECIMAL(10,2),
729                h VARCHAR(255),
730                i TEXT,
731                j BOOLEAN,
732                k TIMESTAMP,
733                l DATE
734            )",
735        )
736        .unwrap();
737
738        assert_eq!(def.columns.len(), 12);
739        assert_eq!(def.columns[0].data_type, DataType::Int8);
740        assert_eq!(def.columns[1].data_type, DataType::Int16);
741        assert_eq!(def.columns[2].data_type, DataType::Int32);
742        assert_eq!(def.columns[3].data_type, DataType::Int64);
743        assert_eq!(def.columns[4].data_type, DataType::Float32);
744        assert_eq!(def.columns[5].data_type, DataType::Float64);
745        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
746        assert_eq!(def.columns[7].data_type, DataType::Utf8);
747        assert_eq!(def.columns[8].data_type, DataType::Utf8);
748        assert_eq!(def.columns[9].data_type, DataType::Boolean);
749        assert!(matches!(
750            def.columns[10].data_type,
751            DataType::Timestamp(_, _)
752        ));
753        assert_eq!(def.columns[11].data_type, DataType::Date32);
754    }
755
756    #[test]
757    fn test_schema_generation() {
758        let def = parse_and_translate(
759            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
760        )
761        .unwrap();
762
763        let schema = def.schema;
764        assert_eq!(schema.fields().len(), 3);
765        assert_eq!(schema.field(0).name(), "id");
766        assert!(!schema.field(0).is_nullable());
767        assert_eq!(schema.field(1).name(), "name");
768        assert!(!schema.field(1).is_nullable());
769        assert_eq!(schema.field(2).name(), "value");
770        assert!(schema.field(2).is_nullable());
771    }
772
773    #[test]
774    fn test_watermark_column_not_found() {
775        let result = parse_and_translate(
776            "CREATE SOURCE events (
777                id BIGINT,
778                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
779            )",
780        );
781
782        assert!(result.is_err());
783        assert!(result.unwrap_err().to_string().contains("not found"));
784    }
785
786    #[test]
787    fn test_watermark_wrong_type() {
788        let result = parse_and_translate(
789            "CREATE SOURCE events (
790                name VARCHAR,
791                WATERMARK FOR name AS name - INTERVAL '1' SECOND
792            )",
793        );
794
795        assert!(result.is_err());
796        assert!(result
797            .unwrap_err()
798            .to_string()
799            .contains("timestamp or integer type"));
800    }
801
802    #[test]
803    fn test_watermark_milliseconds() {
804        let def = parse_and_translate(
805            "CREATE SOURCE events (
806                ts TIMESTAMP,
807                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
808            )",
809        )
810        .unwrap();
811
812        let wm = def.watermark.unwrap();
813        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
814    }
815
816    #[test]
817    fn test_watermark_minutes() {
818        let def = parse_and_translate(
819            "CREATE SOURCE events (
820                ts TIMESTAMP,
821                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
822            )",
823        )
824        .unwrap();
825
826        let wm = def.watermark.unwrap();
827        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
828    }
829
830    #[test]
831    fn test_track_stats_option() {
832        let def =
833            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
834                .unwrap();
835
836        assert!(def.config.track_stats);
837    }
838
839    #[test]
840    fn test_wait_strategy_option() {
841        let def =
842            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
843                .unwrap();
844
845        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
846    }
847
848    #[test]
849    fn test_default_config() {
850        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
851
852        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
853        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
854        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
855        assert!(!def.config.track_stats);
856    }
857
858    #[test]
859    fn test_external_connector_options_ignored() {
860        // External connector options should be accepted but not affect config
861        let def = parse_and_translate(
862            "CREATE SOURCE events (id BIGINT) WITH (
863                'connector' = 'kafka',
864                'topic' = 'events',
865                'bootstrap.servers' = 'localhost:9092',
866                'buffer_size' = '8192'
867            )",
868        )
869        .unwrap();
870
871        // Only buffer_size should affect config
872        assert_eq!(def.config.buffer_size, 8192);
873    }
874
875    #[test]
876    fn test_source_watermark_no_expression() {
877        let def = parse_and_translate(
878            "CREATE SOURCE events (
879                ts TIMESTAMP,
880                WATERMARK FOR ts
881            )",
882        )
883        .unwrap();
884
885        assert!(def.watermark.is_some());
886        let wm = def.watermark.unwrap();
887        assert_eq!(wm.column, "ts");
888        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
889    }
890
891    #[test]
892    fn test_source_watermark_bigint_column() {
893        let def = parse_and_translate(
894            "CREATE SOURCE events (
895                ts BIGINT,
896                WATERMARK FOR ts
897            )",
898        )
899        .unwrap();
900
901        assert!(def.watermark.is_some());
902        let wm = def.watermark.unwrap();
903        assert_eq!(wm.column, "ts");
904        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
905    }
906
907    #[test]
908    fn test_watermark_proctime() {
909        let def = parse_and_translate(
910            "CREATE SOURCE events (
911                ts TIMESTAMP,
912                WATERMARK FOR ts AS PROCTIME()
913            )",
914        )
915        .unwrap();
916
917        assert!(def.watermark.is_some());
918        let wm = def.watermark.unwrap();
919        assert_eq!(wm.column, "ts");
920        assert!(wm.is_processing_time);
921        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
922    }
923
924    #[test]
925    fn test_watermark_event_time_not_proctime() {
926        let def = parse_and_translate(
927            "CREATE SOURCE events (
928                ts TIMESTAMP,
929                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
930            )",
931        )
932        .unwrap();
933
934        let wm = def.watermark.unwrap();
935        assert!(!wm.is_processing_time);
936    }
937}