Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to Streaming API translation.
2//!
3//! This module translates parsed SQL CREATE SOURCE/SINK statements into
4//! typed streaming definitions that can be used to configure the runtime.
5//!
6//! ## Supported Syntax
7//!
8//! ```sql
9//! -- In-memory streaming source
10//! CREATE SOURCE trades (
11//!     symbol VARCHAR NOT NULL,
12//!     price DOUBLE NOT NULL,
13//!     quantity BIGINT NOT NULL,
14//!     ts TIMESTAMP NOT NULL,
15//!     WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECONDS
16//! ) WITH (
17//!     buffer_size = 131072,
18//!     backpressure = 'block'
19//! );
20//!
21//! -- In-memory streaming sink
22//! CREATE SINK trade_aggregates AS
23//!     SELECT * FROM trade_stats
24//! WITH (
25//!     buffer_size = 65536
26//! );
27//! ```
28//!
29//! ## Validation
30//!
31//! - Rejects `channel = ...` option (channel type is auto-derived)
32//! - Validates buffer_size is within bounds
33//! - Validates backpressure strategy names
34//! - Validates wait_strategy names
35
36#[allow(clippy::disallowed_types)] // cold path: SQL translation
37use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use laminar_core::streaming::config::{
46    BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
47};
48
49use crate::parser::ParseError;
50use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
51
52/// Watermark specification for a source.
53#[derive(Debug, Clone)]
54pub struct WatermarkSpec {
55    /// Column name for event time.
56    pub column: String,
57    /// Bounded out-of-orderness duration.
58    pub max_out_of_orderness: Duration,
59    /// Whether this is a processing-time watermark (`PROCTIME()`).
60    ///
61    /// When `true`, the runtime should use `ProcessingTimeGenerator`
62    /// instead of `BoundedOutOfOrdernessGenerator`.
63    pub is_processing_time: bool,
64}
65
66/// Configuration options for a streaming source.
67#[derive(Debug, Clone)]
68pub struct SourceConfigOptions {
69    /// Buffer size for the channel.
70    pub buffer_size: usize,
71    /// Backpressure strategy.
72    pub backpressure: BackpressureStrategy,
73    /// Wait strategy for consumers.
74    pub wait_strategy: WaitStrategy,
75    /// Whether to track statistics.
76    pub track_stats: bool,
77}
78
79impl Default for SourceConfigOptions {
80    fn default() -> Self {
81        Self {
82            buffer_size: DEFAULT_BUFFER_SIZE,
83            backpressure: BackpressureStrategy::Block,
84            wait_strategy: WaitStrategy::SpinYield,
85            track_stats: false,
86        }
87    }
88}
89
90/// Column definition for a streaming source.
91#[derive(Debug, Clone)]
92pub struct ColumnDefinition {
93    /// Column name.
94    pub name: String,
95    /// Arrow data type.
96    pub data_type: DataType,
97    /// Whether the column is nullable.
98    pub nullable: bool,
99}
100
101/// A validated streaming source definition.
102///
103/// This is the output of translating a `CreateSourceStatement` to a typed
104/// configuration that can be used to create runtime sources.
105#[derive(Debug, Clone)]
106pub struct SourceDefinition {
107    /// Source name.
108    pub name: String,
109    /// Column definitions.
110    pub columns: Vec<ColumnDefinition>,
111    /// Arrow schema.
112    pub schema: SchemaRef,
113    /// Watermark specification, if defined.
114    pub watermark: Option<WatermarkSpec>,
115    /// Configuration options.
116    pub config: SourceConfigOptions,
117}
118
119impl TryFrom<CreateSourceStatement> for SourceDefinition {
120    type Error = ParseError;
121
122    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
123        translate_create_source(stmt)
124    }
125}
126
127/// A validated streaming sink definition.
128#[derive(Debug, Clone)]
129pub struct SinkDefinition {
130    /// Sink name.
131    pub name: String,
132    /// Input source or query.
133    pub input: String,
134    /// Configuration options.
135    pub config: SourceConfigOptions,
136}
137
138impl TryFrom<CreateSinkStatement> for SinkDefinition {
139    type Error = ParseError;
140
141    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
142        translate_create_sink(stmt)
143    }
144}
145
146/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
147///
148/// # Errors
149///
150/// Returns `ParseError::ValidationError` if:
151/// - The `channel` option is specified (not user-configurable)
152/// - An invalid option value is provided
153/// - Column types cannot be converted to Arrow types
154pub fn translate_create_source(
155    stmt: CreateSourceStatement,
156) -> Result<SourceDefinition, ParseError> {
157    // Validate options first - reject 'channel' option
158    validate_source_options(&stmt.with_options)?;
159
160    // Parse configuration options
161    let config = parse_source_options(&stmt.with_options)?;
162
163    // Convert columns to Arrow types
164    let columns = convert_columns(&stmt.columns)?;
165
166    // Build Arrow schema
167    let fields: Vec<Field> = columns
168        .iter()
169        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
170        .collect();
171    let schema = Arc::new(Schema::new(fields));
172
173    // Parse watermark if present
174    let watermark = if let Some(wm) = stmt.watermark {
175        Some(parse_watermark(&wm, &columns)?)
176    } else {
177        None
178    };
179
180    Ok(SourceDefinition {
181        name: stmt.name.to_string(),
182        columns,
183        schema,
184        watermark,
185        config,
186    })
187}
188
189/// Translates a CREATE SINK statement to a typed SinkDefinition.
190///
191/// # Errors
192///
193/// Returns `ParseError::ValidationError` if:
194/// - The `channel` option is specified (not user-configurable)
195/// - An invalid option value is provided
196pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
197    // Validate options first
198    validate_source_options(&stmt.with_options)?;
199
200    // Parse configuration options
201    let config = parse_source_options(&stmt.with_options)?;
202
203    // Get input name
204    let input = match stmt.from {
205        SinkFrom::Table(name) => name.to_string(),
206        SinkFrom::Query(_) => {
207            // For now, we don't support inline queries - need to create a view first
208            return Err(ParseError::ValidationError(
209                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
210            ));
211        }
212    };
213
214    Ok(SinkDefinition {
215        name: stmt.name.to_string(),
216        input,
217        config,
218    })
219}
220
221/// Validates that source options don't include disallowed keys.
222fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
223    // Reject 'channel' option - channel type is auto-derived
224    if options.contains_key("channel") {
225        return Err(ParseError::ValidationError(
226            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
227        ));
228    }
229
230    // Reject 'type' option for same reason
231    if options.contains_key("type") {
232        return Err(ParseError::ValidationError(
233            "the 'type' option is not user-configurable for in-memory streaming sources"
234                .to_string(),
235        ));
236    }
237
238    Ok(())
239}
240
241/// Parses source options from WITH clause.
242fn parse_source_options(
243    options: &HashMap<String, String>,
244) -> Result<SourceConfigOptions, ParseError> {
245    let mut config = SourceConfigOptions::default();
246
247    for (key, value) in options {
248        match key.to_lowercase().as_str() {
249            "buffer_size" | "buffersize" => {
250                config.buffer_size = parse_buffer_size(value)?;
251            }
252            "backpressure" => {
253                config.backpressure =
254                    BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
255            }
256            "wait_strategy" | "waitstrategy" => {
257                config.wait_strategy =
258                    WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
259            }
260            "track_stats" | "trackstats" | "stats" => {
261                config.track_stats = parse_bool(value)?;
262            }
263            // Ignore connector-specific and unknown options.
264            // Connector-specific: handled by connector implementations.
265            // Unknown: allow forward compatibility with new options.
266            _ => {}
267        }
268    }
269
270    Ok(config)
271}
272
273/// Parses buffer_size option.
274fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
275    let size: usize = value.parse().map_err(|_| {
276        ParseError::ValidationError(format!(
277            "invalid buffer_size: '{}' - must be a number",
278            value
279        ))
280    })?;
281
282    if size < MIN_BUFFER_SIZE {
283        return Err(ParseError::ValidationError(format!(
284            "buffer_size {} is too small - minimum is {}",
285            size, MIN_BUFFER_SIZE
286        )));
287    }
288
289    if size > MAX_BUFFER_SIZE {
290        return Err(ParseError::ValidationError(format!(
291            "buffer_size {} is too large - maximum is {}",
292            size, MAX_BUFFER_SIZE
293        )));
294    }
295
296    Ok(size)
297}
298
299/// Parses a boolean option.
300fn parse_bool(value: &str) -> Result<bool, ParseError> {
301    match value.to_lowercase().as_str() {
302        "true" | "yes" | "on" | "1" => Ok(true),
303        "false" | "no" | "off" | "0" => Ok(false),
304        _ => Err(ParseError::ValidationError(format!(
305            "invalid boolean value: '{}' - expected true/false",
306            value
307        ))),
308    }
309}
310
311/// Converts SQL column definitions to Arrow types.
312fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
313    columns.iter().map(convert_column).collect()
314}
315
316/// Converts a single SQL column definition to Arrow type.
317fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
318    let data_type = sql_type_to_arrow(&col.data_type)?;
319
320    // Check for NOT NULL constraint
321    let nullable = !col
322        .options
323        .iter()
324        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
325
326    Ok(ColumnDefinition {
327        name: col.name.value.clone(),
328        data_type,
329        nullable,
330    })
331}
332
333/// Converts SQL data type to Arrow data type.
334///
335/// # Errors
336///
337/// Returns `ParseError::ValidationError` for unsupported SQL data types.
338pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
339    match sql_type {
340        // Integer types
341        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
342        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
343        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
344        SqlDataType::BigInt(_) => Ok(DataType::Int64),
345
346        // Unsigned integer types - wrapped in Unsigned variant
347        // Note: sqlparser wraps unsigned types differently in different versions
348
349        // Floating point types
350        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
351        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
352
353        // Decimal types
354        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
355            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
356            let (precision, scale) = match info {
357                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
358                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
359                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
360            };
361            Ok(DataType::Decimal128(precision, scale))
362        }
363
364        // String types (including JSON/UUID stored as strings)
365        SqlDataType::Char(_)
366        | SqlDataType::Character(_)
367        | SqlDataType::Varchar(_)
368        | SqlDataType::CharacterVarying(_)
369        | SqlDataType::Text
370        | SqlDataType::String(_)
371        | SqlDataType::JSON
372        | SqlDataType::JSONB
373        | SqlDataType::Uuid => Ok(DataType::Utf8),
374
375        // Binary types
376        SqlDataType::Binary(_)
377        | SqlDataType::Varbinary(_)
378        | SqlDataType::Blob(_)
379        | SqlDataType::Bytea => Ok(DataType::Binary),
380
381        // Boolean type
382        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
383
384        // Date/time types
385        SqlDataType::Date => Ok(DataType::Date32),
386        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
387        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
388
389        // Interval type
390        SqlDataType::Interval { .. } => Ok(DataType::Interval(
391            arrow::datatypes::IntervalUnit::MonthDayNano,
392        )),
393
394        // Unsupported types
395        _ => Err(ParseError::ValidationError(format!(
396            "unsupported data type: {:?}",
397            sql_type
398        ))),
399    }
400}
401
402/// Checks if an expression is a `PROCTIME()` function call.
403fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
404    if let sqlparser::ast::Expr::Function(func) = expr {
405        if let Some(name) = func.name.0.last() {
406            return name.to_string().eq_ignore_ascii_case("proctime");
407        }
408    }
409    false
410}
411
412/// Parses watermark definition.
413fn parse_watermark(
414    wm: &WatermarkDef,
415    columns: &[ColumnDefinition],
416) -> Result<WatermarkSpec, ParseError> {
417    let column_name = wm.column.value.clone();
418
419    // Verify column exists and is a timestamp type
420    let col = columns
421        .iter()
422        .find(|c| c.name == column_name)
423        .ok_or_else(|| {
424            ParseError::ValidationError(format!(
425                "watermark column '{}' not found in column list",
426                column_name
427            ))
428        })?;
429
430    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
431    if !matches!(
432        col.data_type,
433        DataType::Timestamp(_, _)
434            | DataType::Date32
435            | DataType::Date64
436            | DataType::Int64
437            | DataType::Int32
438    ) {
439        return Err(ParseError::ValidationError(format!(
440            "watermark column '{}' must be a timestamp or integer type, found {:?}",
441            column_name, col.data_type
442        )));
443    }
444
445    // Check for PROCTIME() watermark expression
446    if let Some(expr) = &wm.expression {
447        if is_proctime_call(expr) {
448            return Ok(WatermarkSpec {
449                column: column_name,
450                max_out_of_orderness: Duration::ZERO,
451                is_processing_time: true,
452            });
453        }
454    }
455
456    // Parse the watermark expression to extract out-of-orderness.
457    // When expression is None (WATERMARK FOR col without AS), use zero delay.
458    let max_out_of_orderness = match &wm.expression {
459        Some(expr) => parse_watermark_expression(expr),
460        None => Duration::ZERO,
461    };
462
463    Ok(WatermarkSpec {
464        column: column_name,
465        max_out_of_orderness,
466        is_processing_time: false,
467    })
468}
469
470/// Parses watermark expression to extract the bounded out-of-orderness.
471fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
472    use sqlparser::ast::Expr;
473
474    match expr {
475        Expr::BinaryOp { op, right, .. } => match op {
476            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
477            _ => Duration::ZERO,
478        },
479        // If just the column name, assume zero lateness
480        Expr::Identifier(_) => Duration::ZERO,
481        // Default to 1 second for complex expressions
482        _ => Duration::from_secs(1),
483    }
484}
485
486/// Parses an interval expression to a Duration.
487fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
488    use sqlparser::ast::Expr;
489
490    let Expr::Interval(interval) = expr else {
491        return Duration::from_secs(1);
492    };
493
494    // Extract value and unit from interval
495    let value_str = match interval.value.as_ref() {
496        Expr::Value(v) => {
497            // v is ValueWithSpan, access the inner value
498            match &v.value {
499                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
500                sqlparser::ast::Value::Number(n, _) => n.clone(),
501                _ => return Duration::from_secs(1),
502            }
503        }
504        _ => return Duration::from_secs(1),
505    };
506
507    let value: u64 = value_str.parse().unwrap_or(1);
508
509    // Determine unit
510    let unit = interval
511        .leading_field
512        .as_ref()
513        .map_or("second", |u| match u {
514            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
515            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
516            sqlparser::ast::DateTimeField::Minute => "minute",
517            sqlparser::ast::DateTimeField::Hour => "hour",
518            sqlparser::ast::DateTimeField::Day => "day",
519            _ => "second",
520        });
521
522    match unit {
523        "microsecond" | "microseconds" => Duration::from_micros(value),
524        "millisecond" | "milliseconds" => Duration::from_millis(value),
525        "minute" | "minutes" => Duration::from_secs(value * 60),
526        "hour" | "hours" => Duration::from_secs(value * 3600),
527        "day" | "days" => Duration::from_secs(value * 86400),
528        _ => Duration::from_secs(value),
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535    use crate::parser::{parse_streaming_sql, StreamingStatement};
536
537    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
538        let statements = parse_streaming_sql(sql)?;
539        let stmt = statements
540            .into_iter()
541            .next()
542            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
543        match stmt {
544            StreamingStatement::CreateSource(source) => translate_create_source(*source),
545            _ => Err(ParseError::StreamingError(
546                "Expected CREATE SOURCE".to_string(),
547            )),
548        }
549    }
550
551    #[test]
552    fn test_basic_source() {
553        let def =
554            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
555
556        assert_eq!(def.name, "events");
557        assert_eq!(def.columns.len(), 2);
558        assert_eq!(def.columns[0].name, "id");
559        assert_eq!(def.columns[0].data_type, DataType::Int64);
560        assert!(!def.columns[0].nullable);
561        assert_eq!(def.columns[1].name, "name");
562        assert!(def.columns[1].nullable);
563    }
564
565    #[test]
566    fn test_source_with_options() {
567        let def = parse_and_translate(
568            "CREATE SOURCE events (id BIGINT) WITH (
569                'buffer_size' = '4096',
570                'backpressure' = 'reject'
571            )",
572        )
573        .unwrap();
574
575        assert_eq!(def.config.buffer_size, 4096);
576        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
577    }
578
579    #[test]
580    fn test_source_with_watermark() {
581        let def = parse_and_translate(
582            "CREATE SOURCE events (
583                id BIGINT,
584                ts TIMESTAMP,
585                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
586            )",
587        )
588        .unwrap();
589
590        assert!(def.watermark.is_some());
591        let wm = def.watermark.unwrap();
592        assert_eq!(wm.column, "ts");
593        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
594    }
595
596    #[test]
597    fn test_reject_channel_option() {
598        let result =
599            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
600
601        assert!(result.is_err());
602        let err = result.unwrap_err();
603        assert!(err.to_string().contains("channel"));
604    }
605
606    #[test]
607    fn test_reject_type_option() {
608        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
609
610        assert!(result.is_err());
611    }
612
613    #[test]
614    fn test_buffer_size_bounds() {
615        // Too small
616        let result =
617            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
618        assert!(result.is_err());
619
620        // Too large
621        let result = parse_and_translate(
622            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
623        );
624        assert!(result.is_err());
625
626        // Valid
627        let result =
628            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
629        assert!(result.is_ok());
630    }
631
632    #[test]
633    fn test_backpressure_strategies() {
634        assert_eq!(
635            BackpressureStrategy::from_str("block").unwrap(),
636            BackpressureStrategy::Block
637        );
638        assert_eq!(
639            BackpressureStrategy::from_str("drop_oldest").unwrap(),
640            BackpressureStrategy::DropOldest
641        );
642        assert_eq!(
643            BackpressureStrategy::from_str("reject").unwrap(),
644            BackpressureStrategy::Reject
645        );
646        assert!(BackpressureStrategy::from_str("invalid").is_err());
647    }
648
649    #[test]
650    fn test_wait_strategies() {
651        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
652        assert_eq!(
653            WaitStrategy::from_str("spin_yield").unwrap(),
654            WaitStrategy::SpinYield
655        );
656        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
657        assert!(WaitStrategy::from_str("invalid").is_err());
658    }
659
660    #[test]
661    fn test_sql_type_conversions() {
662        let def = parse_and_translate(
663            "CREATE SOURCE types (
664                a TINYINT,
665                b SMALLINT,
666                c INT,
667                d BIGINT,
668                e FLOAT,
669                f DOUBLE,
670                g DECIMAL(10,2),
671                h VARCHAR(255),
672                i TEXT,
673                j BOOLEAN,
674                k TIMESTAMP,
675                l DATE
676            )",
677        )
678        .unwrap();
679
680        assert_eq!(def.columns.len(), 12);
681        assert_eq!(def.columns[0].data_type, DataType::Int8);
682        assert_eq!(def.columns[1].data_type, DataType::Int16);
683        assert_eq!(def.columns[2].data_type, DataType::Int32);
684        assert_eq!(def.columns[3].data_type, DataType::Int64);
685        assert_eq!(def.columns[4].data_type, DataType::Float32);
686        assert_eq!(def.columns[5].data_type, DataType::Float64);
687        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
688        assert_eq!(def.columns[7].data_type, DataType::Utf8);
689        assert_eq!(def.columns[8].data_type, DataType::Utf8);
690        assert_eq!(def.columns[9].data_type, DataType::Boolean);
691        assert!(matches!(
692            def.columns[10].data_type,
693            DataType::Timestamp(_, _)
694        ));
695        assert_eq!(def.columns[11].data_type, DataType::Date32);
696    }
697
698    #[test]
699    fn test_schema_generation() {
700        let def = parse_and_translate(
701            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
702        )
703        .unwrap();
704
705        let schema = def.schema;
706        assert_eq!(schema.fields().len(), 3);
707        assert_eq!(schema.field(0).name(), "id");
708        assert!(!schema.field(0).is_nullable());
709        assert_eq!(schema.field(1).name(), "name");
710        assert!(!schema.field(1).is_nullable());
711        assert_eq!(schema.field(2).name(), "value");
712        assert!(schema.field(2).is_nullable());
713    }
714
715    #[test]
716    fn test_watermark_column_not_found() {
717        let result = parse_and_translate(
718            "CREATE SOURCE events (
719                id BIGINT,
720                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
721            )",
722        );
723
724        assert!(result.is_err());
725        assert!(result.unwrap_err().to_string().contains("not found"));
726    }
727
728    #[test]
729    fn test_watermark_wrong_type() {
730        let result = parse_and_translate(
731            "CREATE SOURCE events (
732                name VARCHAR,
733                WATERMARK FOR name AS name - INTERVAL '1' SECOND
734            )",
735        );
736
737        assert!(result.is_err());
738        assert!(result
739            .unwrap_err()
740            .to_string()
741            .contains("timestamp or integer type"));
742    }
743
744    #[test]
745    fn test_watermark_milliseconds() {
746        let def = parse_and_translate(
747            "CREATE SOURCE events (
748                ts TIMESTAMP,
749                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
750            )",
751        )
752        .unwrap();
753
754        let wm = def.watermark.unwrap();
755        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
756    }
757
758    #[test]
759    fn test_watermark_minutes() {
760        let def = parse_and_translate(
761            "CREATE SOURCE events (
762                ts TIMESTAMP,
763                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
764            )",
765        )
766        .unwrap();
767
768        let wm = def.watermark.unwrap();
769        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
770    }
771
772    #[test]
773    fn test_track_stats_option() {
774        let def =
775            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
776                .unwrap();
777
778        assert!(def.config.track_stats);
779    }
780
781    #[test]
782    fn test_wait_strategy_option() {
783        let def =
784            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
785                .unwrap();
786
787        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
788    }
789
790    #[test]
791    fn test_default_config() {
792        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
793
794        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
795        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
796        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
797        assert!(!def.config.track_stats);
798    }
799
800    #[test]
801    fn test_external_connector_options_ignored() {
802        // External connector options should be accepted but not affect config
803        let def = parse_and_translate(
804            "CREATE SOURCE events (id BIGINT) WITH (
805                'connector' = 'kafka',
806                'topic' = 'events',
807                'bootstrap.servers' = 'localhost:9092',
808                'buffer_size' = '8192'
809            )",
810        )
811        .unwrap();
812
813        // Only buffer_size should affect config
814        assert_eq!(def.config.buffer_size, 8192);
815    }
816
817    #[test]
818    fn test_source_watermark_no_expression() {
819        let def = parse_and_translate(
820            "CREATE SOURCE events (
821                ts TIMESTAMP,
822                WATERMARK FOR ts
823            )",
824        )
825        .unwrap();
826
827        assert!(def.watermark.is_some());
828        let wm = def.watermark.unwrap();
829        assert_eq!(wm.column, "ts");
830        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
831    }
832
833    #[test]
834    fn test_source_watermark_bigint_column() {
835        let def = parse_and_translate(
836            "CREATE SOURCE events (
837                ts BIGINT,
838                WATERMARK FOR ts
839            )",
840        )
841        .unwrap();
842
843        assert!(def.watermark.is_some());
844        let wm = def.watermark.unwrap();
845        assert_eq!(wm.column, "ts");
846        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
847    }
848
849    #[test]
850    fn test_watermark_proctime() {
851        let def = parse_and_translate(
852            "CREATE SOURCE events (
853                ts TIMESTAMP,
854                WATERMARK FOR ts AS PROCTIME()
855            )",
856        )
857        .unwrap();
858
859        assert!(def.watermark.is_some());
860        let wm = def.watermark.unwrap();
861        assert_eq!(wm.column, "ts");
862        assert!(wm.is_processing_time);
863        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
864    }
865
866    #[test]
867    fn test_watermark_event_time_not_proctime() {
868        let def = parse_and_translate(
869            "CREATE SOURCE events (
870                ts TIMESTAMP,
871                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
872            )",
873        )
874        .unwrap();
875
876        let wm = def.watermark.unwrap();
877        assert!(!wm.is_processing_time);
878    }
879}