Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to Streaming API translation.
2//!
3//! This module translates parsed SQL CREATE SOURCE/SINK statements into
4//! typed streaming definitions that can be used to configure the runtime.
5//!
6//! ## Supported Syntax
7//!
8//! ```sql
9//! -- In-memory streaming source
10//! CREATE SOURCE trades (
11//!     symbol VARCHAR NOT NULL,
12//!     price DOUBLE NOT NULL,
13//!     quantity BIGINT NOT NULL,
14//!     ts TIMESTAMP NOT NULL,
15//!     WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECONDS
16//! ) WITH (
17//!     buffer_size = 131072,
18//!     backpressure = 'block'
19//! );
20//!
21//! -- In-memory streaming sink
22//! CREATE SINK trade_aggregates AS
23//!     SELECT * FROM trade_stats
24//! WITH (
25//!     buffer_size = 65536
26//! );
27//! ```
28//!
29//! ## Validation
30//!
31//! - Rejects `channel = ...` option (channel type is auto-derived)
32//! - Validates buffer_size is within bounds
33//! - Validates backpressure strategy names
34//! - Validates wait_strategy names
35
36#[allow(clippy::disallowed_types)] // cold path: SQL translation
37use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use laminar_core::streaming::config::{
46    BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
47};
48
49use crate::parser::ParseError;
50use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
51
52/// Watermark specification for a source.
53#[derive(Debug, Clone)]
54pub struct WatermarkSpec {
55    /// Column name for event time.
56    pub column: String,
57    /// Bounded out-of-orderness duration.
58    pub max_out_of_orderness: Duration,
59    /// Whether this is a processing-time watermark (`PROCTIME()`).
60    ///
61    /// When `true`, the runtime should use `ProcessingTimeGenerator`
62    /// instead of `BoundedOutOfOrdernessGenerator`.
63    pub is_processing_time: bool,
64}
65
66/// Configuration options for a streaming source.
67#[derive(Debug, Clone)]
68pub struct SourceConfigOptions {
69    /// Buffer size for the channel.
70    pub buffer_size: usize,
71    /// Backpressure strategy.
72    pub backpressure: BackpressureStrategy,
73    /// Wait strategy for consumers.
74    pub wait_strategy: WaitStrategy,
75    /// Whether to track statistics.
76    pub track_stats: bool,
77}
78
79impl Default for SourceConfigOptions {
80    fn default() -> Self {
81        Self {
82            buffer_size: DEFAULT_BUFFER_SIZE,
83            backpressure: BackpressureStrategy::Block,
84            wait_strategy: WaitStrategy::SpinYield,
85            track_stats: false,
86        }
87    }
88}
89
90/// Column definition for a streaming source.
91#[derive(Debug, Clone)]
92pub struct ColumnDefinition {
93    /// Column name.
94    pub name: String,
95    /// Arrow data type.
96    pub data_type: DataType,
97    /// Whether the column is nullable.
98    pub nullable: bool,
99}
100
101/// A validated streaming source definition.
102///
103/// This is the output of translating a `CreateSourceStatement` to a typed
104/// configuration that can be used to create runtime sources.
105#[derive(Debug, Clone)]
106pub struct SourceDefinition {
107    /// Source name.
108    pub name: String,
109    /// Column definitions.
110    pub columns: Vec<ColumnDefinition>,
111    /// Arrow schema.
112    pub schema: SchemaRef,
113    /// Watermark specification, if defined.
114    pub watermark: Option<WatermarkSpec>,
115    /// Configuration options.
116    pub config: SourceConfigOptions,
117}
118
119impl TryFrom<CreateSourceStatement> for SourceDefinition {
120    type Error = ParseError;
121
122    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
123        translate_create_source(stmt)
124    }
125}
126
127/// A validated streaming sink definition.
128#[derive(Debug, Clone)]
129pub struct SinkDefinition {
130    /// Sink name.
131    pub name: String,
132    /// Input source or query.
133    pub input: String,
134    /// Configuration options.
135    pub config: SourceConfigOptions,
136}
137
138impl TryFrom<CreateSinkStatement> for SinkDefinition {
139    type Error = ParseError;
140
141    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
142        translate_create_sink(stmt)
143    }
144}
145
146/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
147///
148/// # Errors
149///
150/// Returns `ParseError::ValidationError` if:
151/// - The `channel` option is specified (not user-configurable)
152/// - An invalid option value is provided
153/// - Column types cannot be converted to Arrow types
154pub fn translate_create_source(
155    stmt: CreateSourceStatement,
156) -> Result<SourceDefinition, ParseError> {
157    // Validate options first - reject 'channel' option
158    validate_source_options(&stmt.with_options)?;
159
160    // Parse configuration options
161    let config = parse_source_options(&stmt.with_options)?;
162
163    // Convert columns to Arrow types
164    let columns = convert_columns(&stmt.columns)?;
165
166    // Build Arrow schema
167    let fields: Vec<Field> = columns
168        .iter()
169        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
170        .collect();
171    let schema = Arc::new(Schema::new(fields));
172
173    // Parse watermark if present
174    let watermark = if let Some(wm) = stmt.watermark {
175        Some(parse_watermark(&wm, &columns)?)
176    } else {
177        None
178    };
179
180    Ok(SourceDefinition {
181        name: stmt.name.to_string(),
182        columns,
183        schema,
184        watermark,
185        config,
186    })
187}
188
189/// Translates a CREATE SINK statement to a typed SinkDefinition.
190///
191/// # Errors
192///
193/// Returns `ParseError::ValidationError` if:
194/// - The `channel` option is specified (not user-configurable)
195/// - An invalid option value is provided
196pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
197    // Validate options first
198    validate_source_options(&stmt.with_options)?;
199
200    // Parse configuration options
201    let config = parse_source_options(&stmt.with_options)?;
202
203    // Get input name
204    let input = match stmt.from {
205        SinkFrom::Table(name) => name.to_string(),
206        SinkFrom::Query(_) => {
207            // For now, we don't support inline queries - need to create a view first
208            return Err(ParseError::ValidationError(
209                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
210            ));
211        }
212    };
213
214    Ok(SinkDefinition {
215        name: stmt.name.to_string(),
216        input,
217        config,
218    })
219}
220
221/// Validates that source options don't include disallowed keys.
222fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
223    // Reject 'channel' option - channel type is auto-derived
224    if options.contains_key("channel") {
225        return Err(ParseError::ValidationError(
226            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
227        ));
228    }
229
230    // Reject 'type' option for same reason
231    if options.contains_key("type") {
232        return Err(ParseError::ValidationError(
233            "the 'type' option is not user-configurable for in-memory streaming sources"
234                .to_string(),
235        ));
236    }
237
238    Ok(())
239}
240
241/// Parses source options from WITH clause.
242fn parse_source_options(
243    options: &HashMap<String, String>,
244) -> Result<SourceConfigOptions, ParseError> {
245    let mut config = SourceConfigOptions::default();
246
247    for (key, value) in options {
248        match key.to_lowercase().as_str() {
249            "buffer_size" | "buffersize" => {
250                config.buffer_size = parse_buffer_size(value)?;
251            }
252            "backpressure" => {
253                config.backpressure =
254                    BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
255            }
256            "wait_strategy" | "waitstrategy" => {
257                config.wait_strategy =
258                    WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
259            }
260            "track_stats" | "trackstats" | "stats" => {
261                config.track_stats = parse_bool(value)?;
262            }
263            // Ignore connector-specific and unknown options.
264            // Connector-specific: handled by connector implementations.
265            // Unknown: allow forward compatibility with new options.
266            _ => {}
267        }
268    }
269
270    Ok(config)
271}
272
273/// Parses buffer_size option.
274fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
275    let size: usize = value.parse().map_err(|_| {
276        ParseError::ValidationError(format!(
277            "invalid buffer_size: '{}' - must be a number",
278            value
279        ))
280    })?;
281
282    if size < MIN_BUFFER_SIZE {
283        return Err(ParseError::ValidationError(format!(
284            "buffer_size {} is too small - minimum is {}",
285            size, MIN_BUFFER_SIZE
286        )));
287    }
288
289    if size > MAX_BUFFER_SIZE {
290        return Err(ParseError::ValidationError(format!(
291            "buffer_size {} is too large - maximum is {}",
292            size, MAX_BUFFER_SIZE
293        )));
294    }
295
296    Ok(size)
297}
298
299/// Parses a boolean option.
300fn parse_bool(value: &str) -> Result<bool, ParseError> {
301    match value.to_lowercase().as_str() {
302        "true" | "yes" | "on" | "1" => Ok(true),
303        "false" | "no" | "off" | "0" => Ok(false),
304        _ => Err(ParseError::ValidationError(format!(
305            "invalid boolean value: '{}' - expected true/false",
306            value
307        ))),
308    }
309}
310
311/// Converts SQL column definitions to Arrow types.
312fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
313    columns.iter().map(convert_column).collect()
314}
315
316/// Converts a single SQL column definition to Arrow type.
317fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
318    let data_type = sql_type_to_arrow(&col.data_type)?;
319
320    // Check for NOT NULL constraint
321    let nullable = !col
322        .options
323        .iter()
324        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
325
326    Ok(ColumnDefinition {
327        name: col.name.value.clone(),
328        data_type,
329        nullable,
330    })
331}
332
333/// Converts SQL data type to Arrow data type.
334///
335/// # Errors
336///
337/// Returns `ParseError::ValidationError` for unsupported SQL data types.
338pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
339    match sql_type {
340        // Integer types
341        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
342        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
343        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
344        SqlDataType::BigInt(_) => Ok(DataType::Int64),
345
346        // Unsigned integer types - wrapped in Unsigned variant
347        // Note: sqlparser wraps unsigned types differently in different versions
348
349        // Floating point types
350        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
351        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
352
353        // Decimal types
354        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
355            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
356            let (precision, scale) = match info {
357                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
358                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
359                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
360            };
361            Ok(DataType::Decimal128(precision, scale))
362        }
363
364        // String types (including JSON/UUID stored as strings)
365        SqlDataType::Char(_)
366        | SqlDataType::Character(_)
367        | SqlDataType::Varchar(_)
368        | SqlDataType::CharacterVarying(_)
369        | SqlDataType::Text
370        | SqlDataType::String(_)
371        | SqlDataType::JSON
372        | SqlDataType::JSONB
373        | SqlDataType::Uuid => Ok(DataType::Utf8),
374
375        // Binary types
376        SqlDataType::Binary(_)
377        | SqlDataType::Varbinary(_)
378        | SqlDataType::Blob(_)
379        | SqlDataType::Bytea => Ok(DataType::Binary),
380
381        // Boolean type
382        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
383
384        // Date/time types
385        SqlDataType::Date => Ok(DataType::Date32),
386        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
387        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
388
389        // Interval type
390        SqlDataType::Interval { .. } => Ok(DataType::Interval(
391            arrow::datatypes::IntervalUnit::MonthDayNano,
392        )),
393
394        // Array type: ARRAY<T>, T[], Array(T)
395        SqlDataType::Array(elem_def) => {
396            let item_type = match elem_def {
397                sqlparser::ast::ArrayElemTypeDef::AngleBracket(t)
398                | sqlparser::ast::ArrayElemTypeDef::SquareBracket(t, _)
399                | sqlparser::ast::ArrayElemTypeDef::Parenthesis(t) => sql_type_to_arrow(t)?,
400                sqlparser::ast::ArrayElemTypeDef::None => {
401                    return Err(ParseError::ValidationError(
402                        "ARRAY type requires element type, e.g. ARRAY<INT>".into(),
403                    ));
404                }
405            };
406            Ok(DataType::List(Arc::new(Field::new(
407                "item", item_type, true,
408            ))))
409        }
410
411        // Map type: MAP(K, V)
412        SqlDataType::Map(key_type, value_type) => {
413            let key_dt = sql_type_to_arrow(key_type)?;
414            let value_dt = sql_type_to_arrow(value_type)?;
415            Ok(DataType::Map(
416                Arc::new(Field::new(
417                    "entries",
418                    DataType::Struct(Fields::from(vec![
419                        Field::new("key", key_dt, false),
420                        Field::new("value", value_dt, true),
421                    ])),
422                    false,
423                )),
424                false,
425            ))
426        }
427
428        // Unsupported types
429        _ => Err(ParseError::ValidationError(format!(
430            "unsupported data type: {:?}",
431            sql_type
432        ))),
433    }
434}
435
436/// Checks if an expression is a `PROCTIME()` function call.
437fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
438    if let sqlparser::ast::Expr::Function(func) = expr {
439        if let Some(name) = func.name.0.last() {
440            return name.to_string().eq_ignore_ascii_case("proctime");
441        }
442    }
443    false
444}
445
446/// Parses watermark definition.
447fn parse_watermark(
448    wm: &WatermarkDef,
449    columns: &[ColumnDefinition],
450) -> Result<WatermarkSpec, ParseError> {
451    let column_name = wm.column.value.clone();
452
453    // Verify column exists and is a timestamp type
454    let col = columns
455        .iter()
456        .find(|c| c.name == column_name)
457        .ok_or_else(|| {
458            ParseError::ValidationError(format!(
459                "watermark column '{}' not found in column list",
460                column_name
461            ))
462        })?;
463
464    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
465    if !matches!(
466        col.data_type,
467        DataType::Timestamp(_, _)
468            | DataType::Date32
469            | DataType::Date64
470            | DataType::Int64
471            | DataType::Int32
472    ) {
473        return Err(ParseError::ValidationError(format!(
474            "watermark column '{}' must be a timestamp or integer type, found {:?}",
475            column_name, col.data_type
476        )));
477    }
478
479    // Check for PROCTIME() watermark expression
480    if let Some(expr) = &wm.expression {
481        if is_proctime_call(expr) {
482            return Ok(WatermarkSpec {
483                column: column_name,
484                max_out_of_orderness: Duration::ZERO,
485                is_processing_time: true,
486            });
487        }
488    }
489
490    // Parse the watermark expression to extract out-of-orderness.
491    // When expression is None (WATERMARK FOR col without AS), use zero delay.
492    let max_out_of_orderness = match &wm.expression {
493        Some(expr) => parse_watermark_expression(expr),
494        None => Duration::ZERO,
495    };
496
497    Ok(WatermarkSpec {
498        column: column_name,
499        max_out_of_orderness,
500        is_processing_time: false,
501    })
502}
503
504/// Parses watermark expression to extract the bounded out-of-orderness.
505fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
506    use sqlparser::ast::Expr;
507
508    match expr {
509        Expr::BinaryOp { op, right, .. } => match op {
510            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
511            _ => Duration::ZERO,
512        },
513        // If just the column name, assume zero lateness
514        Expr::Identifier(_) => Duration::ZERO,
515        // Default to 1 second for complex expressions
516        _ => Duration::from_secs(1),
517    }
518}
519
520/// Parses an interval expression to a Duration.
521fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
522    use sqlparser::ast::Expr;
523
524    let Expr::Interval(interval) = expr else {
525        return Duration::from_secs(1);
526    };
527
528    // Extract value and unit from interval
529    let value_str = match interval.value.as_ref() {
530        Expr::Value(v) => {
531            // v is ValueWithSpan, access the inner value
532            match &v.value {
533                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
534                sqlparser::ast::Value::Number(n, _) => n.clone(),
535                _ => return Duration::from_secs(1),
536            }
537        }
538        _ => return Duration::from_secs(1),
539    };
540
541    let value: u64 = value_str.parse().unwrap_or(1);
542
543    // Determine unit
544    let unit = interval
545        .leading_field
546        .as_ref()
547        .map_or("second", |u| match u {
548            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
549            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
550            sqlparser::ast::DateTimeField::Minute => "minute",
551            sqlparser::ast::DateTimeField::Hour => "hour",
552            sqlparser::ast::DateTimeField::Day => "day",
553            _ => "second",
554        });
555
556    match unit {
557        "microsecond" | "microseconds" => Duration::from_micros(value),
558        "millisecond" | "milliseconds" => Duration::from_millis(value),
559        "minute" | "minutes" => Duration::from_secs(value * 60),
560        "hour" | "hours" => Duration::from_secs(value * 3600),
561        "day" | "days" => Duration::from_secs(value * 86400),
562        _ => Duration::from_secs(value),
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::parser::{parse_streaming_sql, StreamingStatement};
570
571    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
572        let statements = parse_streaming_sql(sql)?;
573        let stmt = statements
574            .into_iter()
575            .next()
576            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
577        match stmt {
578            StreamingStatement::CreateSource(source) => translate_create_source(*source),
579            _ => Err(ParseError::StreamingError(
580                "Expected CREATE SOURCE".to_string(),
581            )),
582        }
583    }
584
585    #[test]
586    fn test_basic_source() {
587        let def =
588            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
589
590        assert_eq!(def.name, "events");
591        assert_eq!(def.columns.len(), 2);
592        assert_eq!(def.columns[0].name, "id");
593        assert_eq!(def.columns[0].data_type, DataType::Int64);
594        assert!(!def.columns[0].nullable);
595        assert_eq!(def.columns[1].name, "name");
596        assert!(def.columns[1].nullable);
597    }
598
599    #[test]
600    fn test_source_with_options() {
601        let def = parse_and_translate(
602            "CREATE SOURCE events (id BIGINT) WITH (
603                'buffer_size' = '4096',
604                'backpressure' = 'reject'
605            )",
606        )
607        .unwrap();
608
609        assert_eq!(def.config.buffer_size, 4096);
610        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
611    }
612
613    #[test]
614    fn test_source_with_watermark() {
615        let def = parse_and_translate(
616            "CREATE SOURCE events (
617                id BIGINT,
618                ts TIMESTAMP,
619                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
620            )",
621        )
622        .unwrap();
623
624        assert!(def.watermark.is_some());
625        let wm = def.watermark.unwrap();
626        assert_eq!(wm.column, "ts");
627        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
628    }
629
630    #[test]
631    fn test_reject_channel_option() {
632        let result =
633            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
634
635        assert!(result.is_err());
636        let err = result.unwrap_err();
637        assert!(err.to_string().contains("channel"));
638    }
639
640    #[test]
641    fn test_reject_type_option() {
642        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
643
644        assert!(result.is_err());
645    }
646
647    #[test]
648    fn test_buffer_size_bounds() {
649        // Too small
650        let result =
651            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
652        assert!(result.is_err());
653
654        // Too large
655        let result = parse_and_translate(
656            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
657        );
658        assert!(result.is_err());
659
660        // Valid
661        let result =
662            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
663        assert!(result.is_ok());
664    }
665
666    #[test]
667    fn test_backpressure_strategies() {
668        assert_eq!(
669            BackpressureStrategy::from_str("block").unwrap(),
670            BackpressureStrategy::Block
671        );
672        assert_eq!(
673            BackpressureStrategy::from_str("drop_oldest").unwrap(),
674            BackpressureStrategy::DropOldest
675        );
676        assert_eq!(
677            BackpressureStrategy::from_str("reject").unwrap(),
678            BackpressureStrategy::Reject
679        );
680        assert!(BackpressureStrategy::from_str("invalid").is_err());
681    }
682
683    #[test]
684    fn test_wait_strategies() {
685        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
686        assert_eq!(
687            WaitStrategy::from_str("spin_yield").unwrap(),
688            WaitStrategy::SpinYield
689        );
690        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
691        assert!(WaitStrategy::from_str("invalid").is_err());
692    }
693
694    #[test]
695    fn test_sql_type_conversions() {
696        let def = parse_and_translate(
697            "CREATE SOURCE types (
698                a TINYINT,
699                b SMALLINT,
700                c INT,
701                d BIGINT,
702                e FLOAT,
703                f DOUBLE,
704                g DECIMAL(10,2),
705                h VARCHAR(255),
706                i TEXT,
707                j BOOLEAN,
708                k TIMESTAMP,
709                l DATE
710            )",
711        )
712        .unwrap();
713
714        assert_eq!(def.columns.len(), 12);
715        assert_eq!(def.columns[0].data_type, DataType::Int8);
716        assert_eq!(def.columns[1].data_type, DataType::Int16);
717        assert_eq!(def.columns[2].data_type, DataType::Int32);
718        assert_eq!(def.columns[3].data_type, DataType::Int64);
719        assert_eq!(def.columns[4].data_type, DataType::Float32);
720        assert_eq!(def.columns[5].data_type, DataType::Float64);
721        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
722        assert_eq!(def.columns[7].data_type, DataType::Utf8);
723        assert_eq!(def.columns[8].data_type, DataType::Utf8);
724        assert_eq!(def.columns[9].data_type, DataType::Boolean);
725        assert!(matches!(
726            def.columns[10].data_type,
727            DataType::Timestamp(_, _)
728        ));
729        assert_eq!(def.columns[11].data_type, DataType::Date32);
730    }
731
732    #[test]
733    fn test_schema_generation() {
734        let def = parse_and_translate(
735            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
736        )
737        .unwrap();
738
739        let schema = def.schema;
740        assert_eq!(schema.fields().len(), 3);
741        assert_eq!(schema.field(0).name(), "id");
742        assert!(!schema.field(0).is_nullable());
743        assert_eq!(schema.field(1).name(), "name");
744        assert!(!schema.field(1).is_nullable());
745        assert_eq!(schema.field(2).name(), "value");
746        assert!(schema.field(2).is_nullable());
747    }
748
749    #[test]
750    fn test_watermark_column_not_found() {
751        let result = parse_and_translate(
752            "CREATE SOURCE events (
753                id BIGINT,
754                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
755            )",
756        );
757
758        assert!(result.is_err());
759        assert!(result.unwrap_err().to_string().contains("not found"));
760    }
761
762    #[test]
763    fn test_watermark_wrong_type() {
764        let result = parse_and_translate(
765            "CREATE SOURCE events (
766                name VARCHAR,
767                WATERMARK FOR name AS name - INTERVAL '1' SECOND
768            )",
769        );
770
771        assert!(result.is_err());
772        assert!(result
773            .unwrap_err()
774            .to_string()
775            .contains("timestamp or integer type"));
776    }
777
778    #[test]
779    fn test_watermark_milliseconds() {
780        let def = parse_and_translate(
781            "CREATE SOURCE events (
782                ts TIMESTAMP,
783                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
784            )",
785        )
786        .unwrap();
787
788        let wm = def.watermark.unwrap();
789        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
790    }
791
792    #[test]
793    fn test_watermark_minutes() {
794        let def = parse_and_translate(
795            "CREATE SOURCE events (
796                ts TIMESTAMP,
797                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
798            )",
799        )
800        .unwrap();
801
802        let wm = def.watermark.unwrap();
803        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
804    }
805
806    #[test]
807    fn test_track_stats_option() {
808        let def =
809            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
810                .unwrap();
811
812        assert!(def.config.track_stats);
813    }
814
815    #[test]
816    fn test_wait_strategy_option() {
817        let def =
818            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
819                .unwrap();
820
821        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
822    }
823
824    #[test]
825    fn test_default_config() {
826        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
827
828        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
829        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
830        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
831        assert!(!def.config.track_stats);
832    }
833
834    #[test]
835    fn test_external_connector_options_ignored() {
836        // External connector options should be accepted but not affect config
837        let def = parse_and_translate(
838            "CREATE SOURCE events (id BIGINT) WITH (
839                'connector' = 'kafka',
840                'topic' = 'events',
841                'bootstrap.servers' = 'localhost:9092',
842                'buffer_size' = '8192'
843            )",
844        )
845        .unwrap();
846
847        // Only buffer_size should affect config
848        assert_eq!(def.config.buffer_size, 8192);
849    }
850
851    #[test]
852    fn test_source_watermark_no_expression() {
853        let def = parse_and_translate(
854            "CREATE SOURCE events (
855                ts TIMESTAMP,
856                WATERMARK FOR ts
857            )",
858        )
859        .unwrap();
860
861        assert!(def.watermark.is_some());
862        let wm = def.watermark.unwrap();
863        assert_eq!(wm.column, "ts");
864        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
865    }
866
867    #[test]
868    fn test_source_watermark_bigint_column() {
869        let def = parse_and_translate(
870            "CREATE SOURCE events (
871                ts BIGINT,
872                WATERMARK FOR ts
873            )",
874        )
875        .unwrap();
876
877        assert!(def.watermark.is_some());
878        let wm = def.watermark.unwrap();
879        assert_eq!(wm.column, "ts");
880        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
881    }
882
883    #[test]
884    fn test_watermark_proctime() {
885        let def = parse_and_translate(
886            "CREATE SOURCE events (
887                ts TIMESTAMP,
888                WATERMARK FOR ts AS PROCTIME()
889            )",
890        )
891        .unwrap();
892
893        assert!(def.watermark.is_some());
894        let wm = def.watermark.unwrap();
895        assert_eq!(wm.column, "ts");
896        assert!(wm.is_processing_time);
897        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
898    }
899
900    #[test]
901    fn test_watermark_event_time_not_proctime() {
902        let def = parse_and_translate(
903            "CREATE SOURCE events (
904                ts TIMESTAMP,
905                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
906            )",
907        )
908        .unwrap();
909
910        let wm = def.watermark.unwrap();
911        assert!(!wm.is_processing_time);
912    }
913}