Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to Streaming API translation.
2//!
3//! This module translates parsed SQL CREATE SOURCE/SINK statements into
4//! typed streaming definitions that can be used to configure the runtime.
5//!
6//! ## Supported Syntax
7//!
8//! ```sql
9//! -- In-memory streaming source
10//! CREATE SOURCE trades (
11//!     symbol VARCHAR NOT NULL,
12//!     price DOUBLE NOT NULL,
13//!     quantity BIGINT NOT NULL,
14//!     ts TIMESTAMP NOT NULL,
15//!     WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECONDS
16//! ) WITH (
17//!     buffer_size = 131072,
18//!     backpressure = 'block'
19//! );
20//!
21//! -- In-memory streaming sink
22//! CREATE SINK trade_aggregates AS
23//!     SELECT * FROM trade_stats
24//! WITH (
25//!     buffer_size = 65536
26//! );
27//! ```
28//!
29//! ## Validation
30//!
31//! - Rejects `channel = ...` option (channel type is auto-derived)
32//! - Validates buffer_size is within bounds
33//! - Validates backpressure strategy names
34//! - Validates wait_strategy names
35
36#[allow(clippy::disallowed_types)] // cold path: SQL translation
37use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use crate::parser::ParseError;
46use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
47
48/// Minimum buffer size for streaming channels.
49pub const MIN_BUFFER_SIZE: usize = 4;
50
51/// Maximum buffer size for streaming channels.
52pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
53
54/// Default buffer size for streaming channels.
55pub const DEFAULT_BUFFER_SIZE: usize = 2048;
56
57/// Backpressure strategy for streaming channels.
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59pub enum BackpressureStrategy {
60    /// Block until space is available.
61    #[default]
62    Block,
63    /// Drop oldest item to make room.
64    DropOldest,
65    /// Reject and return error immediately.
66    Reject,
67}
68
69impl std::str::FromStr for BackpressureStrategy {
70    type Err = ParseError;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        match s.to_lowercase().as_str() {
74            "block" | "blocking" => Ok(Self::Block),
75            "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
76            "reject" | "error" => Ok(Self::Reject),
77            _ => Err(ParseError::ValidationError(format!(
78                "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
79                s
80            ))),
81        }
82    }
83}
84
85/// Wait strategy for streaming consumers.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
87pub enum WaitStrategy {
88    /// Spin-loop without yielding (lowest latency, highest CPU).
89    Spin,
90    /// Spin with occasional yields (balanced).
91    #[default]
92    SpinYield,
93    /// Park the thread (lowest CPU, higher latency).
94    Park,
95}
96
97impl std::str::FromStr for WaitStrategy {
98    type Err = ParseError;
99
100    fn from_str(s: &str) -> Result<Self, Self::Err> {
101        match s.to_lowercase().as_str() {
102            "spin" => Ok(Self::Spin),
103            "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
104            "park" | "parking" => Ok(Self::Park),
105            _ => Err(ParseError::ValidationError(format!(
106                "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
107                s
108            ))),
109        }
110    }
111}
112
113/// Watermark specification for a source.
114#[derive(Debug, Clone)]
115pub struct WatermarkSpec {
116    /// Column name for event time.
117    pub column: String,
118    /// Bounded out-of-orderness duration.
119    pub max_out_of_orderness: Duration,
120    /// Whether this is a processing-time watermark (`PROCTIME()`).
121    ///
122    /// When `true`, the runtime should use `ProcessingTimeGenerator`
123    /// instead of `BoundedOutOfOrdernessGenerator`.
124    pub is_processing_time: bool,
125}
126
127/// Configuration options for a streaming source.
128#[derive(Debug, Clone)]
129pub struct SourceConfigOptions {
130    /// Buffer size for the channel.
131    pub buffer_size: usize,
132    /// Backpressure strategy.
133    pub backpressure: BackpressureStrategy,
134    /// Wait strategy for consumers.
135    pub wait_strategy: WaitStrategy,
136    /// Whether to track statistics.
137    pub track_stats: bool,
138}
139
140impl Default for SourceConfigOptions {
141    fn default() -> Self {
142        Self {
143            buffer_size: DEFAULT_BUFFER_SIZE,
144            backpressure: BackpressureStrategy::Block,
145            wait_strategy: WaitStrategy::SpinYield,
146            track_stats: false,
147        }
148    }
149}
150
151/// Column definition for a streaming source.
152#[derive(Debug, Clone)]
153pub struct ColumnDefinition {
154    /// Column name.
155    pub name: String,
156    /// Arrow data type.
157    pub data_type: DataType,
158    /// Whether the column is nullable.
159    pub nullable: bool,
160}
161
162/// A validated streaming source definition.
163///
164/// This is the output of translating a `CreateSourceStatement` to a typed
165/// configuration that can be used to create runtime sources.
166#[derive(Debug, Clone)]
167pub struct SourceDefinition {
168    /// Source name.
169    pub name: String,
170    /// Column definitions.
171    pub columns: Vec<ColumnDefinition>,
172    /// Arrow schema.
173    pub schema: SchemaRef,
174    /// Watermark specification, if defined.
175    pub watermark: Option<WatermarkSpec>,
176    /// Configuration options.
177    pub config: SourceConfigOptions,
178}
179
180impl TryFrom<CreateSourceStatement> for SourceDefinition {
181    type Error = ParseError;
182
183    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
184        translate_create_source(stmt)
185    }
186}
187
188/// A validated streaming sink definition.
189#[derive(Debug, Clone)]
190pub struct SinkDefinition {
191    /// Sink name.
192    pub name: String,
193    /// Input source or query.
194    pub input: String,
195    /// Configuration options.
196    pub config: SourceConfigOptions,
197}
198
199impl TryFrom<CreateSinkStatement> for SinkDefinition {
200    type Error = ParseError;
201
202    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
203        translate_create_sink(stmt)
204    }
205}
206
207/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
208///
209/// # Errors
210///
211/// Returns `ParseError::ValidationError` if:
212/// - The `channel` option is specified (not user-configurable)
213/// - An invalid option value is provided
214/// - Column types cannot be converted to Arrow types
215pub fn translate_create_source(
216    stmt: CreateSourceStatement,
217) -> Result<SourceDefinition, ParseError> {
218    // Validate options first - reject 'channel' option
219    validate_source_options(&stmt.with_options)?;
220
221    // Parse configuration options
222    let config = parse_source_options(&stmt.with_options)?;
223
224    // Convert columns to Arrow types
225    let columns = convert_columns(&stmt.columns)?;
226
227    // Build Arrow schema
228    let fields: Vec<Field> = columns
229        .iter()
230        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
231        .collect();
232    let schema = Arc::new(Schema::new(fields));
233
234    // Parse watermark if present
235    let watermark = if let Some(wm) = stmt.watermark {
236        Some(parse_watermark(&wm, &columns)?)
237    } else {
238        None
239    };
240
241    Ok(SourceDefinition {
242        name: stmt.name.to_string(),
243        columns,
244        schema,
245        watermark,
246        config,
247    })
248}
249
250/// Translates a CREATE SINK statement to a typed SinkDefinition.
251///
252/// # Errors
253///
254/// Returns `ParseError::ValidationError` if:
255/// - The `channel` option is specified (not user-configurable)
256/// - An invalid option value is provided
257pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
258    // Validate options first
259    validate_source_options(&stmt.with_options)?;
260
261    // Parse configuration options
262    let config = parse_source_options(&stmt.with_options)?;
263
264    // Get input name
265    let input = match stmt.from {
266        SinkFrom::Table(name) => name.to_string(),
267        SinkFrom::Query(_) => {
268            // For now, we don't support inline queries - need to create a view first
269            return Err(ParseError::ValidationError(
270                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
271            ));
272        }
273    };
274
275    Ok(SinkDefinition {
276        name: stmt.name.to_string(),
277        input,
278        config,
279    })
280}
281
282/// Validates that source options don't include disallowed keys.
283fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
284    // Reject 'channel' option - channel type is auto-derived
285    if options.contains_key("channel") {
286        return Err(ParseError::ValidationError(
287            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
288        ));
289    }
290
291    // Reject 'type' option for same reason
292    if options.contains_key("type") {
293        return Err(ParseError::ValidationError(
294            "the 'type' option is not user-configurable for in-memory streaming sources"
295                .to_string(),
296        ));
297    }
298
299    Ok(())
300}
301
302/// Parses source options from WITH clause.
303fn parse_source_options(
304    options: &HashMap<String, String>,
305) -> Result<SourceConfigOptions, ParseError> {
306    let mut config = SourceConfigOptions::default();
307
308    for (key, value) in options {
309        match key.to_lowercase().as_str() {
310            "buffer_size" | "buffersize" => {
311                config.buffer_size = parse_buffer_size(value)?;
312            }
313            "backpressure" => {
314                config.backpressure = BackpressureStrategy::from_str(value)?;
315            }
316            "wait_strategy" | "waitstrategy" => {
317                config.wait_strategy = WaitStrategy::from_str(value)?;
318            }
319            "track_stats" | "trackstats" | "stats" => {
320                config.track_stats = parse_bool(value)?;
321            }
322            // Ignore connector-specific and unknown options.
323            // Connector-specific: handled by connector implementations.
324            // Unknown: allow forward compatibility with new options.
325            _ => {}
326        }
327    }
328
329    Ok(config)
330}
331
332/// Parses buffer_size option.
333fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
334    let size: usize = value.parse().map_err(|_| {
335        ParseError::ValidationError(format!(
336            "invalid buffer_size: '{}' - must be a number",
337            value
338        ))
339    })?;
340
341    if size < MIN_BUFFER_SIZE {
342        return Err(ParseError::ValidationError(format!(
343            "buffer_size {} is too small - minimum is {}",
344            size, MIN_BUFFER_SIZE
345        )));
346    }
347
348    if size > MAX_BUFFER_SIZE {
349        return Err(ParseError::ValidationError(format!(
350            "buffer_size {} is too large - maximum is {}",
351            size, MAX_BUFFER_SIZE
352        )));
353    }
354
355    Ok(size)
356}
357
358/// Parses a boolean option.
359fn parse_bool(value: &str) -> Result<bool, ParseError> {
360    match value.to_lowercase().as_str() {
361        "true" | "yes" | "on" | "1" => Ok(true),
362        "false" | "no" | "off" | "0" => Ok(false),
363        _ => Err(ParseError::ValidationError(format!(
364            "invalid boolean value: '{}' - expected true/false",
365            value
366        ))),
367    }
368}
369
370/// Converts SQL column definitions to Arrow types.
371fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
372    columns.iter().map(convert_column).collect()
373}
374
375/// Converts a single SQL column definition to Arrow type.
376fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
377    let data_type = sql_type_to_arrow(&col.data_type)?;
378
379    // Check for NOT NULL constraint
380    let nullable = !col
381        .options
382        .iter()
383        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
384
385    Ok(ColumnDefinition {
386        name: col.name.value.clone(),
387        data_type,
388        nullable,
389    })
390}
391
392/// Converts SQL data type to Arrow data type.
393///
394/// # Errors
395///
396/// Returns `ParseError::ValidationError` for unsupported SQL data types.
397pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
398    match sql_type {
399        // Integer types
400        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
401        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
402        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
403        SqlDataType::BigInt(_) => Ok(DataType::Int64),
404
405        // Unsigned integer types - wrapped in Unsigned variant
406        // Note: sqlparser wraps unsigned types differently in different versions
407
408        // Floating point types
409        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
410        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
411
412        // Decimal types
413        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
414            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
415            let (precision, scale) = match info {
416                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
417                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
418                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
419            };
420            Ok(DataType::Decimal128(precision, scale))
421        }
422
423        // String types (including JSON/UUID stored as strings)
424        SqlDataType::Char(_)
425        | SqlDataType::Character(_)
426        | SqlDataType::Varchar(_)
427        | SqlDataType::CharacterVarying(_)
428        | SqlDataType::Text
429        | SqlDataType::String(_)
430        | SqlDataType::JSON
431        | SqlDataType::JSONB
432        | SqlDataType::Uuid => Ok(DataType::Utf8),
433
434        // Binary types
435        SqlDataType::Binary(_)
436        | SqlDataType::Varbinary(_)
437        | SqlDataType::Blob(_)
438        | SqlDataType::Bytea => Ok(DataType::Binary),
439
440        // Boolean type
441        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
442
443        // Date/time types
444        SqlDataType::Date => Ok(DataType::Date32),
445        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
446        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
447
448        // Interval type
449        SqlDataType::Interval { .. } => Ok(DataType::Interval(
450            arrow::datatypes::IntervalUnit::MonthDayNano,
451        )),
452
453        // Unsupported types
454        _ => Err(ParseError::ValidationError(format!(
455            "unsupported data type: {:?}",
456            sql_type
457        ))),
458    }
459}
460
461/// Checks if an expression is a `PROCTIME()` function call.
462fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
463    if let sqlparser::ast::Expr::Function(func) = expr {
464        if let Some(name) = func.name.0.last() {
465            return name.to_string().eq_ignore_ascii_case("proctime");
466        }
467    }
468    false
469}
470
471/// Parses watermark definition.
472fn parse_watermark(
473    wm: &WatermarkDef,
474    columns: &[ColumnDefinition],
475) -> Result<WatermarkSpec, ParseError> {
476    let column_name = wm.column.value.clone();
477
478    // Verify column exists and is a timestamp type
479    let col = columns
480        .iter()
481        .find(|c| c.name == column_name)
482        .ok_or_else(|| {
483            ParseError::ValidationError(format!(
484                "watermark column '{}' not found in column list",
485                column_name
486            ))
487        })?;
488
489    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
490    if !matches!(
491        col.data_type,
492        DataType::Timestamp(_, _)
493            | DataType::Date32
494            | DataType::Date64
495            | DataType::Int64
496            | DataType::Int32
497    ) {
498        return Err(ParseError::ValidationError(format!(
499            "watermark column '{}' must be a timestamp or integer type, found {:?}",
500            column_name, col.data_type
501        )));
502    }
503
504    // Check for PROCTIME() watermark expression
505    if let Some(expr) = &wm.expression {
506        if is_proctime_call(expr) {
507            return Ok(WatermarkSpec {
508                column: column_name,
509                max_out_of_orderness: Duration::ZERO,
510                is_processing_time: true,
511            });
512        }
513    }
514
515    // Parse the watermark expression to extract out-of-orderness.
516    // When expression is None (WATERMARK FOR col without AS), use zero delay.
517    let max_out_of_orderness = match &wm.expression {
518        Some(expr) => parse_watermark_expression(expr),
519        None => Duration::ZERO,
520    };
521
522    Ok(WatermarkSpec {
523        column: column_name,
524        max_out_of_orderness,
525        is_processing_time: false,
526    })
527}
528
529/// Parses watermark expression to extract the bounded out-of-orderness.
530fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
531    use sqlparser::ast::Expr;
532
533    match expr {
534        Expr::BinaryOp { op, right, .. } => match op {
535            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
536            _ => Duration::ZERO,
537        },
538        // If just the column name, assume zero lateness
539        Expr::Identifier(_) => Duration::ZERO,
540        // Default to 1 second for complex expressions
541        _ => Duration::from_secs(1),
542    }
543}
544
545/// Parses an interval expression to a Duration.
546fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
547    use sqlparser::ast::Expr;
548
549    let Expr::Interval(interval) = expr else {
550        return Duration::from_secs(1);
551    };
552
553    // Extract value and unit from interval
554    let value_str = match interval.value.as_ref() {
555        Expr::Value(v) => {
556            // v is ValueWithSpan, access the inner value
557            match &v.value {
558                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
559                sqlparser::ast::Value::Number(n, _) => n.clone(),
560                _ => return Duration::from_secs(1),
561            }
562        }
563        _ => return Duration::from_secs(1),
564    };
565
566    let value: u64 = value_str.parse().unwrap_or(1);
567
568    // Determine unit
569    let unit = interval
570        .leading_field
571        .as_ref()
572        .map_or("second", |u| match u {
573            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
574            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
575            sqlparser::ast::DateTimeField::Minute => "minute",
576            sqlparser::ast::DateTimeField::Hour => "hour",
577            sqlparser::ast::DateTimeField::Day => "day",
578            _ => "second",
579        });
580
581    match unit {
582        "microsecond" | "microseconds" => Duration::from_micros(value),
583        "millisecond" | "milliseconds" => Duration::from_millis(value),
584        "minute" | "minutes" => Duration::from_secs(value * 60),
585        "hour" | "hours" => Duration::from_secs(value * 3600),
586        "day" | "days" => Duration::from_secs(value * 86400),
587        _ => Duration::from_secs(value),
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::parser::{parse_streaming_sql, StreamingStatement};
595
596    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
597        let statements = parse_streaming_sql(sql)?;
598        let stmt = statements
599            .into_iter()
600            .next()
601            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
602        match stmt {
603            StreamingStatement::CreateSource(source) => translate_create_source(*source),
604            _ => Err(ParseError::StreamingError(
605                "Expected CREATE SOURCE".to_string(),
606            )),
607        }
608    }
609
610    #[test]
611    fn test_basic_source() {
612        let def =
613            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
614
615        assert_eq!(def.name, "events");
616        assert_eq!(def.columns.len(), 2);
617        assert_eq!(def.columns[0].name, "id");
618        assert_eq!(def.columns[0].data_type, DataType::Int64);
619        assert!(!def.columns[0].nullable);
620        assert_eq!(def.columns[1].name, "name");
621        assert!(def.columns[1].nullable);
622    }
623
624    #[test]
625    fn test_source_with_options() {
626        let def = parse_and_translate(
627            "CREATE SOURCE events (id BIGINT) WITH (
628                'buffer_size' = '4096',
629                'backpressure' = 'reject'
630            )",
631        )
632        .unwrap();
633
634        assert_eq!(def.config.buffer_size, 4096);
635        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
636    }
637
638    #[test]
639    fn test_source_with_watermark() {
640        let def = parse_and_translate(
641            "CREATE SOURCE events (
642                id BIGINT,
643                ts TIMESTAMP,
644                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
645            )",
646        )
647        .unwrap();
648
649        assert!(def.watermark.is_some());
650        let wm = def.watermark.unwrap();
651        assert_eq!(wm.column, "ts");
652        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
653    }
654
655    #[test]
656    fn test_reject_channel_option() {
657        let result =
658            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
659
660        assert!(result.is_err());
661        let err = result.unwrap_err();
662        assert!(err.to_string().contains("channel"));
663    }
664
665    #[test]
666    fn test_reject_type_option() {
667        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
668
669        assert!(result.is_err());
670    }
671
672    #[test]
673    fn test_buffer_size_bounds() {
674        // Too small
675        let result =
676            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
677        assert!(result.is_err());
678
679        // Too large
680        let result = parse_and_translate(
681            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
682        );
683        assert!(result.is_err());
684
685        // Valid
686        let result =
687            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
688        assert!(result.is_ok());
689    }
690
691    #[test]
692    fn test_backpressure_strategies() {
693        assert_eq!(
694            BackpressureStrategy::from_str("block").unwrap(),
695            BackpressureStrategy::Block
696        );
697        assert_eq!(
698            BackpressureStrategy::from_str("drop_oldest").unwrap(),
699            BackpressureStrategy::DropOldest
700        );
701        assert_eq!(
702            BackpressureStrategy::from_str("reject").unwrap(),
703            BackpressureStrategy::Reject
704        );
705        assert!(BackpressureStrategy::from_str("invalid").is_err());
706    }
707
708    #[test]
709    fn test_wait_strategies() {
710        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
711        assert_eq!(
712            WaitStrategy::from_str("spin_yield").unwrap(),
713            WaitStrategy::SpinYield
714        );
715        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
716        assert!(WaitStrategy::from_str("invalid").is_err());
717    }
718
719    #[test]
720    fn test_sql_type_conversions() {
721        let def = parse_and_translate(
722            "CREATE SOURCE types (
723                a TINYINT,
724                b SMALLINT,
725                c INT,
726                d BIGINT,
727                e FLOAT,
728                f DOUBLE,
729                g DECIMAL(10,2),
730                h VARCHAR(255),
731                i TEXT,
732                j BOOLEAN,
733                k TIMESTAMP,
734                l DATE
735            )",
736        )
737        .unwrap();
738
739        assert_eq!(def.columns.len(), 12);
740        assert_eq!(def.columns[0].data_type, DataType::Int8);
741        assert_eq!(def.columns[1].data_type, DataType::Int16);
742        assert_eq!(def.columns[2].data_type, DataType::Int32);
743        assert_eq!(def.columns[3].data_type, DataType::Int64);
744        assert_eq!(def.columns[4].data_type, DataType::Float32);
745        assert_eq!(def.columns[5].data_type, DataType::Float64);
746        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
747        assert_eq!(def.columns[7].data_type, DataType::Utf8);
748        assert_eq!(def.columns[8].data_type, DataType::Utf8);
749        assert_eq!(def.columns[9].data_type, DataType::Boolean);
750        assert!(matches!(
751            def.columns[10].data_type,
752            DataType::Timestamp(_, _)
753        ));
754        assert_eq!(def.columns[11].data_type, DataType::Date32);
755    }
756
757    #[test]
758    fn test_schema_generation() {
759        let def = parse_and_translate(
760            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
761        )
762        .unwrap();
763
764        let schema = def.schema;
765        assert_eq!(schema.fields().len(), 3);
766        assert_eq!(schema.field(0).name(), "id");
767        assert!(!schema.field(0).is_nullable());
768        assert_eq!(schema.field(1).name(), "name");
769        assert!(!schema.field(1).is_nullable());
770        assert_eq!(schema.field(2).name(), "value");
771        assert!(schema.field(2).is_nullable());
772    }
773
774    #[test]
775    fn test_watermark_column_not_found() {
776        let result = parse_and_translate(
777            "CREATE SOURCE events (
778                id BIGINT,
779                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
780            )",
781        );
782
783        assert!(result.is_err());
784        assert!(result.unwrap_err().to_string().contains("not found"));
785    }
786
787    #[test]
788    fn test_watermark_wrong_type() {
789        let result = parse_and_translate(
790            "CREATE SOURCE events (
791                name VARCHAR,
792                WATERMARK FOR name AS name - INTERVAL '1' SECOND
793            )",
794        );
795
796        assert!(result.is_err());
797        assert!(result
798            .unwrap_err()
799            .to_string()
800            .contains("timestamp or integer type"));
801    }
802
803    #[test]
804    fn test_watermark_milliseconds() {
805        let def = parse_and_translate(
806            "CREATE SOURCE events (
807                ts TIMESTAMP,
808                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
809            )",
810        )
811        .unwrap();
812
813        let wm = def.watermark.unwrap();
814        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
815    }
816
817    #[test]
818    fn test_watermark_minutes() {
819        let def = parse_and_translate(
820            "CREATE SOURCE events (
821                ts TIMESTAMP,
822                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
823            )",
824        )
825        .unwrap();
826
827        let wm = def.watermark.unwrap();
828        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
829    }
830
831    #[test]
832    fn test_track_stats_option() {
833        let def =
834            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
835                .unwrap();
836
837        assert!(def.config.track_stats);
838    }
839
840    #[test]
841    fn test_wait_strategy_option() {
842        let def =
843            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
844                .unwrap();
845
846        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
847    }
848
849    #[test]
850    fn test_default_config() {
851        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
852
853        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
854        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
855        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
856        assert!(!def.config.track_stats);
857    }
858
859    #[test]
860    fn test_external_connector_options_ignored() {
861        // External connector options should be accepted but not affect config
862        let def = parse_and_translate(
863            "CREATE SOURCE events (id BIGINT) WITH (
864                'connector' = 'kafka',
865                'topic' = 'events',
866                'bootstrap.servers' = 'localhost:9092',
867                'buffer_size' = '8192'
868            )",
869        )
870        .unwrap();
871
872        // Only buffer_size should affect config
873        assert_eq!(def.config.buffer_size, 8192);
874    }
875
876    #[test]
877    fn test_source_watermark_no_expression() {
878        let def = parse_and_translate(
879            "CREATE SOURCE events (
880                ts TIMESTAMP,
881                WATERMARK FOR ts
882            )",
883        )
884        .unwrap();
885
886        assert!(def.watermark.is_some());
887        let wm = def.watermark.unwrap();
888        assert_eq!(wm.column, "ts");
889        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
890    }
891
892    #[test]
893    fn test_source_watermark_bigint_column() {
894        let def = parse_and_translate(
895            "CREATE SOURCE events (
896                ts BIGINT,
897                WATERMARK FOR ts
898            )",
899        )
900        .unwrap();
901
902        assert!(def.watermark.is_some());
903        let wm = def.watermark.unwrap();
904        assert_eq!(wm.column, "ts");
905        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
906    }
907
908    #[test]
909    fn test_watermark_proctime() {
910        let def = parse_and_translate(
911            "CREATE SOURCE events (
912                ts TIMESTAMP,
913                WATERMARK FOR ts AS PROCTIME()
914            )",
915        )
916        .unwrap();
917
918        assert!(def.watermark.is_some());
919        let wm = def.watermark.unwrap();
920        assert_eq!(wm.column, "ts");
921        assert!(wm.is_processing_time);
922        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
923    }
924
925    #[test]
926    fn test_watermark_event_time_not_proctime() {
927        let def = parse_and_translate(
928            "CREATE SOURCE events (
929                ts TIMESTAMP,
930                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
931            )",
932        )
933        .unwrap();
934
935        let wm = def.watermark.unwrap();
936        assert!(!wm.is_processing_time);
937    }
938}