Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to Streaming API translation.
2//!
3//! This module translates parsed SQL CREATE SOURCE/SINK statements into
4//! typed streaming definitions that can be used to configure the runtime.
5//!
6//! ## Supported Syntax
7//!
8//! ```sql
9//! -- In-memory streaming source
10//! CREATE SOURCE trades (
11//!     symbol VARCHAR NOT NULL,
12//!     price DOUBLE NOT NULL,
13//!     quantity BIGINT NOT NULL,
14//!     ts TIMESTAMP NOT NULL,
15//!     WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECONDS
16//! ) WITH (
17//!     buffer_size = 131072,
18//!     backpressure = 'block'
19//! );
20//!
21//! -- In-memory streaming sink
22//! CREATE SINK trade_aggregates AS
23//!     SELECT * FROM trade_stats
24//! WITH (
25//!     buffer_size = 65536
26//! );
27//! ```
28//!
29//! ## Validation
30//!
31//! - Rejects `channel = ...` option (channel type is auto-derived)
32//! - Validates buffer_size is within bounds
33//! - Validates backpressure strategy names
34//! - Validates wait_strategy names
35
36use std::collections::HashMap;
37use std::str::FromStr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
42use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
43
44use crate::parser::ParseError;
45use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
46
47/// Minimum buffer size for streaming channels.
48pub const MIN_BUFFER_SIZE: usize = 4;
49
50/// Maximum buffer size for streaming channels.
51pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
52
53/// Default buffer size for streaming channels.
54pub const DEFAULT_BUFFER_SIZE: usize = 2048;
55
56/// Backpressure strategy for streaming channels.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum BackpressureStrategy {
59    /// Block until space is available.
60    #[default]
61    Block,
62    /// Drop oldest item to make room.
63    DropOldest,
64    /// Reject and return error immediately.
65    Reject,
66}
67
68impl std::str::FromStr for BackpressureStrategy {
69    type Err = ParseError;
70
71    fn from_str(s: &str) -> Result<Self, Self::Err> {
72        match s.to_lowercase().as_str() {
73            "block" | "blocking" => Ok(Self::Block),
74            "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
75            "reject" | "error" => Ok(Self::Reject),
76            _ => Err(ParseError::ValidationError(format!(
77                "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
78                s
79            ))),
80        }
81    }
82}
83
84/// Wait strategy for streaming consumers.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum WaitStrategy {
87    /// Spin-loop without yielding (lowest latency, highest CPU).
88    Spin,
89    /// Spin with occasional yields (balanced).
90    #[default]
91    SpinYield,
92    /// Park the thread (lowest CPU, higher latency).
93    Park,
94}
95
96impl std::str::FromStr for WaitStrategy {
97    type Err = ParseError;
98
99    fn from_str(s: &str) -> Result<Self, Self::Err> {
100        match s.to_lowercase().as_str() {
101            "spin" => Ok(Self::Spin),
102            "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
103            "park" | "parking" => Ok(Self::Park),
104            _ => Err(ParseError::ValidationError(format!(
105                "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
106                s
107            ))),
108        }
109    }
110}
111
112/// Watermark specification for a source.
113#[derive(Debug, Clone)]
114pub struct WatermarkSpec {
115    /// Column name for event time.
116    pub column: String,
117    /// Bounded out-of-orderness duration.
118    pub max_out_of_orderness: Duration,
119}
120
121/// Configuration options for a streaming source.
122#[derive(Debug, Clone)]
123pub struct SourceConfigOptions {
124    /// Buffer size for the channel.
125    pub buffer_size: usize,
126    /// Backpressure strategy.
127    pub backpressure: BackpressureStrategy,
128    /// Wait strategy for consumers.
129    pub wait_strategy: WaitStrategy,
130    /// Whether to track statistics.
131    pub track_stats: bool,
132}
133
134impl Default for SourceConfigOptions {
135    fn default() -> Self {
136        Self {
137            buffer_size: DEFAULT_BUFFER_SIZE,
138            backpressure: BackpressureStrategy::Block,
139            wait_strategy: WaitStrategy::SpinYield,
140            track_stats: false,
141        }
142    }
143}
144
145/// Column definition for a streaming source.
146#[derive(Debug, Clone)]
147pub struct ColumnDefinition {
148    /// Column name.
149    pub name: String,
150    /// Arrow data type.
151    pub data_type: DataType,
152    /// Whether the column is nullable.
153    pub nullable: bool,
154}
155
156/// A validated streaming source definition.
157///
158/// This is the output of translating a `CreateSourceStatement` to a typed
159/// configuration that can be used to create runtime sources.
160#[derive(Debug, Clone)]
161pub struct SourceDefinition {
162    /// Source name.
163    pub name: String,
164    /// Column definitions.
165    pub columns: Vec<ColumnDefinition>,
166    /// Arrow schema.
167    pub schema: SchemaRef,
168    /// Watermark specification, if defined.
169    pub watermark: Option<WatermarkSpec>,
170    /// Configuration options.
171    pub config: SourceConfigOptions,
172}
173
174impl TryFrom<CreateSourceStatement> for SourceDefinition {
175    type Error = ParseError;
176
177    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
178        translate_create_source(stmt)
179    }
180}
181
182/// A validated streaming sink definition.
183#[derive(Debug, Clone)]
184pub struct SinkDefinition {
185    /// Sink name.
186    pub name: String,
187    /// Input source or query.
188    pub input: String,
189    /// Configuration options.
190    pub config: SourceConfigOptions,
191}
192
193impl TryFrom<CreateSinkStatement> for SinkDefinition {
194    type Error = ParseError;
195
196    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
197        translate_create_sink(stmt)
198    }
199}
200
201/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
202///
203/// # Errors
204///
205/// Returns `ParseError::ValidationError` if:
206/// - The `channel` option is specified (not user-configurable)
207/// - An invalid option value is provided
208/// - Column types cannot be converted to Arrow types
209pub fn translate_create_source(
210    stmt: CreateSourceStatement,
211) -> Result<SourceDefinition, ParseError> {
212    // Validate options first - reject 'channel' option
213    validate_source_options(&stmt.with_options)?;
214
215    // Parse configuration options
216    let config = parse_source_options(&stmt.with_options)?;
217
218    // Convert columns to Arrow types
219    let columns = convert_columns(&stmt.columns)?;
220
221    // Build Arrow schema
222    let fields: Vec<Field> = columns
223        .iter()
224        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
225        .collect();
226    let schema = Arc::new(Schema::new(fields));
227
228    // Parse watermark if present
229    let watermark = if let Some(wm) = stmt.watermark {
230        Some(parse_watermark(&wm, &columns)?)
231    } else {
232        None
233    };
234
235    Ok(SourceDefinition {
236        name: stmt.name.to_string(),
237        columns,
238        schema,
239        watermark,
240        config,
241    })
242}
243
244/// Translates a CREATE SINK statement to a typed SinkDefinition.
245///
246/// # Errors
247///
248/// Returns `ParseError::ValidationError` if:
249/// - The `channel` option is specified (not user-configurable)
250/// - An invalid option value is provided
251pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
252    // Validate options first
253    validate_source_options(&stmt.with_options)?;
254
255    // Parse configuration options
256    let config = parse_source_options(&stmt.with_options)?;
257
258    // Get input name
259    let input = match stmt.from {
260        SinkFrom::Table(name) => name.to_string(),
261        SinkFrom::Query(_) => {
262            // For now, we don't support inline queries - need to create a view first
263            return Err(ParseError::ValidationError(
264                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
265            ));
266        }
267    };
268
269    Ok(SinkDefinition {
270        name: stmt.name.to_string(),
271        input,
272        config,
273    })
274}
275
276/// Validates that source options don't include disallowed keys.
277fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
278    // Reject 'channel' option - channel type is auto-derived
279    if options.contains_key("channel") {
280        return Err(ParseError::ValidationError(
281            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
282        ));
283    }
284
285    // Reject 'type' option for same reason
286    if options.contains_key("type") {
287        return Err(ParseError::ValidationError(
288            "the 'type' option is not user-configurable for in-memory streaming sources"
289                .to_string(),
290        ));
291    }
292
293    Ok(())
294}
295
296/// Parses source options from WITH clause.
297fn parse_source_options(
298    options: &HashMap<String, String>,
299) -> Result<SourceConfigOptions, ParseError> {
300    let mut config = SourceConfigOptions::default();
301
302    for (key, value) in options {
303        match key.to_lowercase().as_str() {
304            "buffer_size" | "buffersize" => {
305                config.buffer_size = parse_buffer_size(value)?;
306            }
307            "backpressure" => {
308                config.backpressure = BackpressureStrategy::from_str(value)?;
309            }
310            "wait_strategy" | "waitstrategy" => {
311                config.wait_strategy = WaitStrategy::from_str(value)?;
312            }
313            "track_stats" | "trackstats" | "stats" => {
314                config.track_stats = parse_bool(value)?;
315            }
316            // Ignore connector-specific and unknown options.
317            // Connector-specific: handled by connector implementations.
318            // Unknown: allow forward compatibility with new options.
319            _ => {}
320        }
321    }
322
323    Ok(config)
324}
325
326/// Parses buffer_size option.
327fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
328    let size: usize = value.parse().map_err(|_| {
329        ParseError::ValidationError(format!(
330            "invalid buffer_size: '{}' - must be a number",
331            value
332        ))
333    })?;
334
335    if size < MIN_BUFFER_SIZE {
336        return Err(ParseError::ValidationError(format!(
337            "buffer_size {} is too small - minimum is {}",
338            size, MIN_BUFFER_SIZE
339        )));
340    }
341
342    if size > MAX_BUFFER_SIZE {
343        return Err(ParseError::ValidationError(format!(
344            "buffer_size {} is too large - maximum is {}",
345            size, MAX_BUFFER_SIZE
346        )));
347    }
348
349    Ok(size)
350}
351
352/// Parses a boolean option.
353fn parse_bool(value: &str) -> Result<bool, ParseError> {
354    match value.to_lowercase().as_str() {
355        "true" | "yes" | "on" | "1" => Ok(true),
356        "false" | "no" | "off" | "0" => Ok(false),
357        _ => Err(ParseError::ValidationError(format!(
358            "invalid boolean value: '{}' - expected true/false",
359            value
360        ))),
361    }
362}
363
364/// Converts SQL column definitions to Arrow types.
365fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
366    columns.iter().map(convert_column).collect()
367}
368
369/// Converts a single SQL column definition to Arrow type.
370fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
371    let data_type = sql_type_to_arrow(&col.data_type)?;
372
373    // Check for NOT NULL constraint
374    let nullable = !col
375        .options
376        .iter()
377        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
378
379    Ok(ColumnDefinition {
380        name: col.name.to_string(),
381        data_type,
382        nullable,
383    })
384}
385
386/// Converts SQL data type to Arrow data type.
387///
388/// # Errors
389///
390/// Returns `ParseError::ValidationError` for unsupported SQL data types.
391pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
392    match sql_type {
393        // Integer types
394        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
395        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
396        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
397        SqlDataType::BigInt(_) => Ok(DataType::Int64),
398
399        // Unsigned integer types - wrapped in Unsigned variant
400        // Note: sqlparser wraps unsigned types differently in different versions
401
402        // Floating point types
403        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
404        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
405
406        // Decimal types
407        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
408            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
409            let (precision, scale) = match info {
410                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
411                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
412                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
413            };
414            Ok(DataType::Decimal128(precision, scale))
415        }
416
417        // String types (including JSON/UUID stored as strings)
418        SqlDataType::Char(_)
419        | SqlDataType::Character(_)
420        | SqlDataType::Varchar(_)
421        | SqlDataType::CharacterVarying(_)
422        | SqlDataType::Text
423        | SqlDataType::String(_)
424        | SqlDataType::JSON
425        | SqlDataType::JSONB
426        | SqlDataType::Uuid => Ok(DataType::Utf8),
427
428        // Binary types
429        SqlDataType::Binary(_)
430        | SqlDataType::Varbinary(_)
431        | SqlDataType::Blob(_)
432        | SqlDataType::Bytea => Ok(DataType::Binary),
433
434        // Boolean type
435        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
436
437        // Date/time types
438        SqlDataType::Date => Ok(DataType::Date32),
439        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
440        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
441
442        // Interval type
443        SqlDataType::Interval { .. } => Ok(DataType::Interval(
444            arrow::datatypes::IntervalUnit::MonthDayNano,
445        )),
446
447        // Unsupported types
448        _ => Err(ParseError::ValidationError(format!(
449            "unsupported data type: {:?}",
450            sql_type
451        ))),
452    }
453}
454
455/// Parses watermark definition.
456fn parse_watermark(
457    wm: &WatermarkDef,
458    columns: &[ColumnDefinition],
459) -> Result<WatermarkSpec, ParseError> {
460    let column_name = wm.column.to_string();
461
462    // Verify column exists and is a timestamp type
463    let col = columns
464        .iter()
465        .find(|c| c.name == column_name)
466        .ok_or_else(|| {
467            ParseError::ValidationError(format!(
468                "watermark column '{}' not found in column list",
469                column_name
470            ))
471        })?;
472
473    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
474    if !matches!(
475        col.data_type,
476        DataType::Timestamp(_, _)
477            | DataType::Date32
478            | DataType::Date64
479            | DataType::Int64
480            | DataType::Int32
481    ) {
482        return Err(ParseError::ValidationError(format!(
483            "watermark column '{}' must be a timestamp or integer type, found {:?}",
484            column_name, col.data_type
485        )));
486    }
487
488    // Parse the watermark expression to extract out-of-orderness.
489    // When expression is None (WATERMARK FOR col without AS), use zero delay.
490    let max_out_of_orderness = match &wm.expression {
491        Some(expr) => parse_watermark_expression(expr),
492        None => Duration::ZERO,
493    };
494
495    Ok(WatermarkSpec {
496        column: column_name,
497        max_out_of_orderness,
498    })
499}
500
501/// Parses watermark expression to extract the bounded out-of-orderness.
502fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
503    use sqlparser::ast::Expr;
504
505    match expr {
506        Expr::BinaryOp { op, right, .. } => match op {
507            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
508            _ => Duration::ZERO,
509        },
510        // If just the column name, assume zero lateness
511        Expr::Identifier(_) => Duration::ZERO,
512        // Default to 1 second for complex expressions
513        _ => Duration::from_secs(1),
514    }
515}
516
517/// Parses an interval expression to a Duration.
518fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
519    use sqlparser::ast::Expr;
520
521    let Expr::Interval(interval) = expr else {
522        return Duration::from_secs(1);
523    };
524
525    // Extract value and unit from interval
526    let value_str = match interval.value.as_ref() {
527        Expr::Value(v) => {
528            // v is ValueWithSpan, access the inner value
529            match &v.value {
530                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
531                sqlparser::ast::Value::Number(n, _) => n.clone(),
532                _ => return Duration::from_secs(1),
533            }
534        }
535        _ => return Duration::from_secs(1),
536    };
537
538    let value: u64 = value_str.parse().unwrap_or(1);
539
540    // Determine unit
541    let unit = interval
542        .leading_field
543        .as_ref()
544        .map_or("second", |u| match u {
545            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
546            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
547            sqlparser::ast::DateTimeField::Minute => "minute",
548            sqlparser::ast::DateTimeField::Hour => "hour",
549            sqlparser::ast::DateTimeField::Day => "day",
550            _ => "second",
551        });
552
553    match unit {
554        "microsecond" | "microseconds" => Duration::from_micros(value),
555        "millisecond" | "milliseconds" => Duration::from_millis(value),
556        "minute" | "minutes" => Duration::from_secs(value * 60),
557        "hour" | "hours" => Duration::from_secs(value * 3600),
558        "day" | "days" => Duration::from_secs(value * 86400),
559        _ => Duration::from_secs(value),
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::parser::{parse_streaming_sql, StreamingStatement};
567
568    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
569        let statements = parse_streaming_sql(sql)?;
570        let stmt = statements
571            .into_iter()
572            .next()
573            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
574        match stmt {
575            StreamingStatement::CreateSource(source) => translate_create_source(*source),
576            _ => Err(ParseError::StreamingError(
577                "Expected CREATE SOURCE".to_string(),
578            )),
579        }
580    }
581
582    #[test]
583    fn test_basic_source() {
584        let def =
585            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
586
587        assert_eq!(def.name, "events");
588        assert_eq!(def.columns.len(), 2);
589        assert_eq!(def.columns[0].name, "id");
590        assert_eq!(def.columns[0].data_type, DataType::Int64);
591        assert!(!def.columns[0].nullable);
592        assert_eq!(def.columns[1].name, "name");
593        assert!(def.columns[1].nullable);
594    }
595
596    #[test]
597    fn test_source_with_options() {
598        let def = parse_and_translate(
599            "CREATE SOURCE events (id BIGINT) WITH (
600                'buffer_size' = '4096',
601                'backpressure' = 'reject'
602            )",
603        )
604        .unwrap();
605
606        assert_eq!(def.config.buffer_size, 4096);
607        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
608    }
609
610    #[test]
611    fn test_source_with_watermark() {
612        let def = parse_and_translate(
613            "CREATE SOURCE events (
614                id BIGINT,
615                ts TIMESTAMP,
616                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
617            )",
618        )
619        .unwrap();
620
621        assert!(def.watermark.is_some());
622        let wm = def.watermark.unwrap();
623        assert_eq!(wm.column, "ts");
624        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
625    }
626
627    #[test]
628    fn test_reject_channel_option() {
629        let result =
630            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
631
632        assert!(result.is_err());
633        let err = result.unwrap_err();
634        assert!(err.to_string().contains("channel"));
635    }
636
637    #[test]
638    fn test_reject_type_option() {
639        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
640
641        assert!(result.is_err());
642    }
643
644    #[test]
645    fn test_buffer_size_bounds() {
646        // Too small
647        let result =
648            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
649        assert!(result.is_err());
650
651        // Too large
652        let result = parse_and_translate(
653            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
654        );
655        assert!(result.is_err());
656
657        // Valid
658        let result =
659            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
660        assert!(result.is_ok());
661    }
662
663    #[test]
664    fn test_backpressure_strategies() {
665        assert_eq!(
666            BackpressureStrategy::from_str("block").unwrap(),
667            BackpressureStrategy::Block
668        );
669        assert_eq!(
670            BackpressureStrategy::from_str("drop_oldest").unwrap(),
671            BackpressureStrategy::DropOldest
672        );
673        assert_eq!(
674            BackpressureStrategy::from_str("reject").unwrap(),
675            BackpressureStrategy::Reject
676        );
677        assert!(BackpressureStrategy::from_str("invalid").is_err());
678    }
679
680    #[test]
681    fn test_wait_strategies() {
682        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
683        assert_eq!(
684            WaitStrategy::from_str("spin_yield").unwrap(),
685            WaitStrategy::SpinYield
686        );
687        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
688        assert!(WaitStrategy::from_str("invalid").is_err());
689    }
690
691    #[test]
692    fn test_sql_type_conversions() {
693        let def = parse_and_translate(
694            "CREATE SOURCE types (
695                a TINYINT,
696                b SMALLINT,
697                c INT,
698                d BIGINT,
699                e FLOAT,
700                f DOUBLE,
701                g DECIMAL(10,2),
702                h VARCHAR(255),
703                i TEXT,
704                j BOOLEAN,
705                k TIMESTAMP,
706                l DATE
707            )",
708        )
709        .unwrap();
710
711        assert_eq!(def.columns.len(), 12);
712        assert_eq!(def.columns[0].data_type, DataType::Int8);
713        assert_eq!(def.columns[1].data_type, DataType::Int16);
714        assert_eq!(def.columns[2].data_type, DataType::Int32);
715        assert_eq!(def.columns[3].data_type, DataType::Int64);
716        assert_eq!(def.columns[4].data_type, DataType::Float32);
717        assert_eq!(def.columns[5].data_type, DataType::Float64);
718        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
719        assert_eq!(def.columns[7].data_type, DataType::Utf8);
720        assert_eq!(def.columns[8].data_type, DataType::Utf8);
721        assert_eq!(def.columns[9].data_type, DataType::Boolean);
722        assert!(matches!(
723            def.columns[10].data_type,
724            DataType::Timestamp(_, _)
725        ));
726        assert_eq!(def.columns[11].data_type, DataType::Date32);
727    }
728
729    #[test]
730    fn test_schema_generation() {
731        let def = parse_and_translate(
732            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
733        )
734        .unwrap();
735
736        let schema = def.schema;
737        assert_eq!(schema.fields().len(), 3);
738        assert_eq!(schema.field(0).name(), "id");
739        assert!(!schema.field(0).is_nullable());
740        assert_eq!(schema.field(1).name(), "name");
741        assert!(!schema.field(1).is_nullable());
742        assert_eq!(schema.field(2).name(), "value");
743        assert!(schema.field(2).is_nullable());
744    }
745
746    #[test]
747    fn test_watermark_column_not_found() {
748        let result = parse_and_translate(
749            "CREATE SOURCE events (
750                id BIGINT,
751                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
752            )",
753        );
754
755        assert!(result.is_err());
756        assert!(result.unwrap_err().to_string().contains("not found"));
757    }
758
759    #[test]
760    fn test_watermark_wrong_type() {
761        let result = parse_and_translate(
762            "CREATE SOURCE events (
763                name VARCHAR,
764                WATERMARK FOR name AS name - INTERVAL '1' SECOND
765            )",
766        );
767
768        assert!(result.is_err());
769        assert!(result
770            .unwrap_err()
771            .to_string()
772            .contains("timestamp or integer type"));
773    }
774
775    #[test]
776    fn test_watermark_milliseconds() {
777        let def = parse_and_translate(
778            "CREATE SOURCE events (
779                ts TIMESTAMP,
780                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
781            )",
782        )
783        .unwrap();
784
785        let wm = def.watermark.unwrap();
786        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
787    }
788
789    #[test]
790    fn test_watermark_minutes() {
791        let def = parse_and_translate(
792            "CREATE SOURCE events (
793                ts TIMESTAMP,
794                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
795            )",
796        )
797        .unwrap();
798
799        let wm = def.watermark.unwrap();
800        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
801    }
802
803    #[test]
804    fn test_track_stats_option() {
805        let def =
806            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
807                .unwrap();
808
809        assert!(def.config.track_stats);
810    }
811
812    #[test]
813    fn test_wait_strategy_option() {
814        let def =
815            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
816                .unwrap();
817
818        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
819    }
820
821    #[test]
822    fn test_default_config() {
823        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
824
825        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
826        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
827        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
828        assert!(!def.config.track_stats);
829    }
830
831    #[test]
832    fn test_external_connector_options_ignored() {
833        // External connector options should be accepted but not affect config
834        let def = parse_and_translate(
835            "CREATE SOURCE events (id BIGINT) WITH (
836                'connector' = 'kafka',
837                'topic' = 'events',
838                'bootstrap.servers' = 'localhost:9092',
839                'buffer_size' = '8192'
840            )",
841        )
842        .unwrap();
843
844        // Only buffer_size should affect config
845        assert_eq!(def.config.buffer_size, 8192);
846    }
847
848    #[test]
849    fn test_source_watermark_no_expression() {
850        let def = parse_and_translate(
851            "CREATE SOURCE events (
852                ts TIMESTAMP,
853                WATERMARK FOR ts
854            )",
855        )
856        .unwrap();
857
858        assert!(def.watermark.is_some());
859        let wm = def.watermark.unwrap();
860        assert_eq!(wm.column, "ts");
861        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
862    }
863
864    #[test]
865    fn test_source_watermark_bigint_column() {
866        let def = parse_and_translate(
867            "CREATE SOURCE events (
868                ts BIGINT,
869                WATERMARK FOR ts
870            )",
871        )
872        .unwrap();
873
874        assert!(def.watermark.is_some());
875        let wm = def.watermark.unwrap();
876        assert_eq!(wm.column, "ts");
877        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
878    }
879}