Skip to main content

laminar_sql/translator/
streaming_ddl.rs

1//! SQL DDL to streaming API translation.
2
3#[allow(clippy::disallowed_types)] // cold path: SQL translation
4use std::collections::HashMap;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
10use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
11
12use laminar_core::streaming::config::{
13    BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
14};
15
16use crate::parser::ParseError;
17use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
18
19/// Watermark specification for a source.
20#[derive(Debug, Clone)]
21pub struct WatermarkSpec {
22    /// Column name for event time.
23    pub column: String,
24    /// Bounded out-of-orderness duration.
25    pub max_out_of_orderness: Duration,
26    /// Whether this is a processing-time watermark (`PROCTIME()`).
27    ///
28    /// When `true`, the runtime should use `ProcessingTimeGenerator`
29    /// instead of `BoundedOutOfOrdernessGenerator`.
30    pub is_processing_time: bool,
31}
32
33/// Configuration options for a streaming source.
34#[derive(Debug, Clone)]
35pub struct SourceConfigOptions {
36    /// Buffer size for the channel.
37    pub buffer_size: usize,
38    /// Backpressure strategy.
39    pub backpressure: BackpressureStrategy,
40    /// Wait strategy for consumers.
41    pub wait_strategy: WaitStrategy,
42    /// Whether to track statistics.
43    pub track_stats: bool,
44}
45
46impl Default for SourceConfigOptions {
47    fn default() -> Self {
48        Self {
49            buffer_size: DEFAULT_BUFFER_SIZE,
50            backpressure: BackpressureStrategy::Block,
51            wait_strategy: WaitStrategy::SpinYield,
52            track_stats: false,
53        }
54    }
55}
56
57/// Column definition for a streaming source.
58#[derive(Debug, Clone)]
59pub struct ColumnDefinition {
60    /// Column name.
61    pub name: String,
62    /// Arrow data type.
63    pub data_type: DataType,
64    /// Whether the column is nullable.
65    pub nullable: bool,
66}
67
68/// A validated streaming source definition.
69///
70/// This is the output of translating a `CreateSourceStatement` to a typed
71/// configuration that can be used to create runtime sources.
72#[derive(Debug, Clone)]
73pub struct SourceDefinition {
74    /// Source name.
75    pub name: String,
76    /// Column definitions.
77    pub columns: Vec<ColumnDefinition>,
78    /// Arrow schema.
79    pub schema: SchemaRef,
80    /// Watermark specification, if defined.
81    pub watermark: Option<WatermarkSpec>,
82    /// Configuration options.
83    pub config: SourceConfigOptions,
84}
85
86impl TryFrom<CreateSourceStatement> for SourceDefinition {
87    type Error = ParseError;
88
89    fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
90        translate_create_source(stmt)
91    }
92}
93
94/// A validated streaming sink definition.
95#[derive(Debug, Clone)]
96pub struct SinkDefinition {
97    /// Sink name.
98    pub name: String,
99    /// Input source or query.
100    pub input: String,
101    /// Configuration options.
102    pub config: SourceConfigOptions,
103}
104
105impl TryFrom<CreateSinkStatement> for SinkDefinition {
106    type Error = ParseError;
107
108    fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
109        translate_create_sink(stmt)
110    }
111}
112
113/// Translates a CREATE SOURCE statement to a typed SourceDefinition.
114///
115/// # Errors
116///
117/// Returns `ParseError::ValidationError` if:
118/// - The `channel` option is specified (not user-configurable)
119/// - An invalid option value is provided
120/// - Column types cannot be converted to Arrow types
121pub fn translate_create_source(
122    stmt: CreateSourceStatement,
123) -> Result<SourceDefinition, ParseError> {
124    let columns = convert_columns(&stmt.columns)?;
125    translate_create_source_with_columns(stmt, columns)
126}
127
128/// Translate a CREATE SOURCE statement using an already-resolved column
129/// list. Used by the DDL layer after `discover_schema` so `WATERMARK FOR`
130/// validates against the discovered columns rather than the SQL text.
131///
132/// # Errors
133///
134/// Returns `ParseError` from option validation or watermark parsing.
135pub fn translate_create_source_with_columns(
136    stmt: CreateSourceStatement,
137    columns: Vec<ColumnDefinition>,
138) -> Result<SourceDefinition, ParseError> {
139    validate_source_options(&stmt.with_options)?;
140    let config = parse_source_options(&stmt.with_options)?;
141
142    let fields: Vec<Field> = columns
143        .iter()
144        .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
145        .collect();
146    let schema = Arc::new(Schema::new(fields));
147
148    let watermark = if let Some(wm) = stmt.watermark {
149        Some(parse_watermark(&wm, &columns)?)
150    } else {
151        None
152    };
153
154    Ok(SourceDefinition {
155        name: stmt.name.to_string(),
156        columns,
157        schema,
158        watermark,
159        config,
160    })
161}
162
163/// Translates a CREATE SINK statement to a typed SinkDefinition.
164///
165/// # Errors
166///
167/// Returns `ParseError::ValidationError` if:
168/// - The `channel` option is specified (not user-configurable)
169/// - An invalid option value is provided
170pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
171    // Validate options first
172    validate_source_options(&stmt.with_options)?;
173
174    // Parse configuration options
175    let config = parse_source_options(&stmt.with_options)?;
176
177    // Get input name
178    let input = match stmt.from {
179        SinkFrom::Table(name) => name.to_string(),
180        SinkFrom::Query(_) => {
181            // For now, we don't support inline queries - need to create a view first
182            return Err(ParseError::ValidationError(
183                "inline queries not yet supported in CREATE SINK - use a view".to_string(),
184            ));
185        }
186    };
187
188    Ok(SinkDefinition {
189        name: stmt.name.to_string(),
190        input,
191        config,
192    })
193}
194
195/// Validates that source options don't include disallowed keys.
196fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
197    // Reject 'channel' option - channel type is auto-derived
198    if options.contains_key("channel") {
199        return Err(ParseError::ValidationError(
200            "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
201        ));
202    }
203
204    // Reject 'type' option for same reason
205    if options.contains_key("type") {
206        return Err(ParseError::ValidationError(
207            "the 'type' option is not user-configurable for in-memory streaming sources"
208                .to_string(),
209        ));
210    }
211
212    Ok(())
213}
214
215/// Parses source options from WITH clause.
216fn parse_source_options(
217    options: &HashMap<String, String>,
218) -> Result<SourceConfigOptions, ParseError> {
219    let mut config = SourceConfigOptions::default();
220
221    for (key, value) in options {
222        match key.to_lowercase().as_str() {
223            "buffer_size" | "buffersize" => {
224                config.buffer_size = parse_buffer_size(value)?;
225            }
226            "backpressure" => {
227                config.backpressure =
228                    BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
229            }
230            "wait_strategy" | "waitstrategy" => {
231                config.wait_strategy =
232                    WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
233            }
234            "track_stats" | "trackstats" | "stats" => {
235                config.track_stats = parse_bool(value)?;
236            }
237            // Ignore connector-specific and unknown options.
238            // Connector-specific: handled by connector implementations.
239            // Unknown: allow forward compatibility with new options.
240            _ => {}
241        }
242    }
243
244    Ok(config)
245}
246
247/// Parses buffer_size option.
248fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
249    let size: usize = value.parse().map_err(|_| {
250        ParseError::ValidationError(format!(
251            "invalid buffer_size: '{}' - must be a number",
252            value
253        ))
254    })?;
255
256    if size < MIN_BUFFER_SIZE {
257        return Err(ParseError::ValidationError(format!(
258            "buffer_size {} is too small - minimum is {}",
259            size, MIN_BUFFER_SIZE
260        )));
261    }
262
263    if size > MAX_BUFFER_SIZE {
264        return Err(ParseError::ValidationError(format!(
265            "buffer_size {} is too large - maximum is {}",
266            size, MAX_BUFFER_SIZE
267        )));
268    }
269
270    Ok(size)
271}
272
273/// Parses a boolean option.
274fn parse_bool(value: &str) -> Result<bool, ParseError> {
275    match value.to_lowercase().as_str() {
276        "true" | "yes" | "on" | "1" => Ok(true),
277        "false" | "no" | "off" | "0" => Ok(false),
278        _ => Err(ParseError::ValidationError(format!(
279            "invalid boolean value: '{}' - expected true/false",
280            value
281        ))),
282    }
283}
284
285/// Converts SQL column definitions to Arrow types.
286fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
287    columns.iter().map(convert_column).collect()
288}
289
290/// Converts a single SQL column definition to Arrow type.
291fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
292    let data_type = sql_type_to_arrow(&col.data_type)?;
293
294    // Check for NOT NULL constraint
295    let nullable = !col
296        .options
297        .iter()
298        .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
299
300    Ok(ColumnDefinition {
301        name: col.name.value.clone(),
302        data_type,
303        nullable,
304    })
305}
306
307/// Converts SQL data type to Arrow data type.
308///
309/// # Errors
310///
311/// Returns `ParseError::ValidationError` for unsupported SQL data types.
312pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
313    match sql_type {
314        // Integer types
315        SqlDataType::TinyInt(_) => Ok(DataType::Int8),
316        SqlDataType::SmallInt(_) => Ok(DataType::Int16),
317        SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
318        SqlDataType::BigInt(_) => Ok(DataType::Int64),
319
320        // Unsigned integer types - wrapped in Unsigned variant
321        // Note: sqlparser wraps unsigned types differently in different versions
322
323        // Floating point types
324        SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
325        SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
326
327        // Decimal types
328        SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
329            #[allow(clippy::cast_possible_truncation)] // Precision/scale are typically small values
330            let (precision, scale) = match info {
331                sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
332                sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
333                sqlparser::ast::ExactNumberInfo::None => (38, 9), // Default precision/scale
334            };
335            Ok(DataType::Decimal128(precision, scale))
336        }
337
338        // String types (including JSON/UUID stored as strings)
339        SqlDataType::Char(_)
340        | SqlDataType::Character(_)
341        | SqlDataType::Varchar(_)
342        | SqlDataType::CharacterVarying(_)
343        | SqlDataType::Text
344        | SqlDataType::String(_)
345        | SqlDataType::JSON
346        | SqlDataType::JSONB
347        | SqlDataType::Uuid => Ok(DataType::Utf8),
348
349        // Binary types
350        SqlDataType::Binary(_)
351        | SqlDataType::Varbinary(_)
352        | SqlDataType::Blob(_)
353        | SqlDataType::Bytea => Ok(DataType::Binary),
354
355        // Boolean type
356        SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
357
358        // Date/time types
359        SqlDataType::Date => Ok(DataType::Date32),
360        SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
361        SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
362
363        // Interval type
364        SqlDataType::Interval { .. } => Ok(DataType::Interval(
365            arrow::datatypes::IntervalUnit::MonthDayNano,
366        )),
367
368        // Array type: ARRAY<T>, T[], Array(T)
369        SqlDataType::Array(elem_def) => {
370            let item_type = match elem_def {
371                sqlparser::ast::ArrayElemTypeDef::AngleBracket(t)
372                | sqlparser::ast::ArrayElemTypeDef::SquareBracket(t, _)
373                | sqlparser::ast::ArrayElemTypeDef::Parenthesis(t) => sql_type_to_arrow(t)?,
374                sqlparser::ast::ArrayElemTypeDef::None => {
375                    return Err(ParseError::ValidationError(
376                        "ARRAY type requires element type, e.g. ARRAY<INT>".into(),
377                    ));
378                }
379            };
380            Ok(DataType::List(Arc::new(Field::new(
381                "item", item_type, true,
382            ))))
383        }
384
385        // Complex types (MAP, STRUCT, nested records) — use auto-discovery instead.
386        _ => Err(ParseError::ValidationError(format!(
387            "unsupported data type in hand-declared column: {sql_type:?} \
388             — use auto-discovery with an Avro source for complex types"
389        ))),
390    }
391}
392
393/// Checks if an expression is a `PROCTIME()` function call.
394fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
395    if let sqlparser::ast::Expr::Function(func) = expr {
396        if let Some(name) = func.name.0.last() {
397            return name.to_string().eq_ignore_ascii_case("proctime");
398        }
399    }
400    false
401}
402
403/// Parses watermark definition.
404fn parse_watermark(
405    wm: &WatermarkDef,
406    columns: &[ColumnDefinition],
407) -> Result<WatermarkSpec, ParseError> {
408    let column_name = wm.column.value.clone();
409
410    // Verify column exists and is a timestamp type
411    let col = columns
412        .iter()
413        .find(|c| c.name == column_name)
414        .ok_or_else(|| {
415            ParseError::ValidationError(format!(
416                "watermark column '{}' not found in column list",
417                column_name
418            ))
419        })?;
420
421    // Check column is a timestamp or integer type (BIGINT is common for Unix millis)
422    if !matches!(
423        col.data_type,
424        DataType::Timestamp(_, _)
425            | DataType::Date32
426            | DataType::Date64
427            | DataType::Int64
428            | DataType::Int32
429    ) {
430        return Err(ParseError::ValidationError(format!(
431            "watermark column '{}' must be a timestamp or integer type, found {:?}",
432            column_name, col.data_type
433        )));
434    }
435
436    // Check for PROCTIME() watermark expression
437    if let Some(expr) = &wm.expression {
438        if is_proctime_call(expr) {
439            return Ok(WatermarkSpec {
440                column: column_name,
441                max_out_of_orderness: Duration::ZERO,
442                is_processing_time: true,
443            });
444        }
445    }
446
447    // Parse the watermark expression to extract out-of-orderness.
448    // When expression is None (WATERMARK FOR col without AS), use zero delay.
449    let max_out_of_orderness = match &wm.expression {
450        Some(expr) => parse_watermark_expression(expr),
451        None => Duration::ZERO,
452    };
453
454    Ok(WatermarkSpec {
455        column: column_name,
456        max_out_of_orderness,
457        is_processing_time: false,
458    })
459}
460
461/// Parses watermark expression to extract the bounded out-of-orderness.
462fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
463    use sqlparser::ast::Expr;
464
465    match expr {
466        Expr::BinaryOp { op, right, .. } => match op {
467            sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
468            _ => Duration::ZERO,
469        },
470        // If just the column name, assume zero lateness
471        Expr::Identifier(_) => Duration::ZERO,
472        // Default to 1 second for complex expressions
473        _ => Duration::from_secs(1),
474    }
475}
476
477/// Parses an interval expression to a Duration.
478fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
479    use sqlparser::ast::Expr;
480
481    let Expr::Interval(interval) = expr else {
482        return Duration::from_secs(1);
483    };
484
485    // Extract value and unit from interval
486    let value_str = match interval.value.as_ref() {
487        Expr::Value(v) => {
488            // v is ValueWithSpan, access the inner value
489            match &v.value {
490                sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
491                sqlparser::ast::Value::Number(n, _) => n.clone(),
492                _ => return Duration::from_secs(1),
493            }
494        }
495        _ => return Duration::from_secs(1),
496    };
497
498    let value: u64 = value_str.parse().unwrap_or(1);
499
500    // Determine unit
501    let unit = interval
502        .leading_field
503        .as_ref()
504        .map_or("second", |u| match u {
505            sqlparser::ast::DateTimeField::Microsecond => "microsecond",
506            sqlparser::ast::DateTimeField::Millisecond => "millisecond",
507            sqlparser::ast::DateTimeField::Minute => "minute",
508            sqlparser::ast::DateTimeField::Hour => "hour",
509            sqlparser::ast::DateTimeField::Day => "day",
510            _ => "second",
511        });
512
513    match unit {
514        "microsecond" | "microseconds" => Duration::from_micros(value),
515        "millisecond" | "milliseconds" => Duration::from_millis(value),
516        "minute" | "minutes" => Duration::from_secs(value * 60),
517        "hour" | "hours" => Duration::from_secs(value * 3600),
518        "day" | "days" => Duration::from_secs(value * 86400),
519        _ => Duration::from_secs(value),
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::parser::{parse_streaming_sql, StreamingStatement};
527
528    fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
529        let statements = parse_streaming_sql(sql)?;
530        let stmt = statements
531            .into_iter()
532            .next()
533            .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
534        match stmt {
535            StreamingStatement::CreateSource(source) => translate_create_source(*source),
536            _ => Err(ParseError::StreamingError(
537                "Expected CREATE SOURCE".to_string(),
538            )),
539        }
540    }
541
542    #[test]
543    fn test_basic_source() {
544        let def =
545            parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
546
547        assert_eq!(def.name, "events");
548        assert_eq!(def.columns.len(), 2);
549        assert_eq!(def.columns[0].name, "id");
550        assert_eq!(def.columns[0].data_type, DataType::Int64);
551        assert!(!def.columns[0].nullable);
552        assert_eq!(def.columns[1].name, "name");
553        assert!(def.columns[1].nullable);
554    }
555
556    #[test]
557    fn test_source_with_options() {
558        let def = parse_and_translate(
559            "CREATE SOURCE events (id BIGINT) WITH (
560                'buffer_size' = '4096',
561                'backpressure' = 'reject'
562            )",
563        )
564        .unwrap();
565
566        assert_eq!(def.config.buffer_size, 4096);
567        assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
568    }
569
570    #[test]
571    fn test_source_with_watermark() {
572        let def = parse_and_translate(
573            "CREATE SOURCE events (
574                id BIGINT,
575                ts TIMESTAMP,
576                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
577            )",
578        )
579        .unwrap();
580
581        assert!(def.watermark.is_some());
582        let wm = def.watermark.unwrap();
583        assert_eq!(wm.column, "ts");
584        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
585    }
586
587    #[test]
588    fn test_reject_channel_option() {
589        let result =
590            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
591
592        assert!(result.is_err());
593        let err = result.unwrap_err();
594        assert!(err.to_string().contains("channel"));
595    }
596
597    #[test]
598    fn test_reject_type_option() {
599        let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
600
601        assert!(result.is_err());
602    }
603
604    #[test]
605    fn test_buffer_size_bounds() {
606        // Too small
607        let result =
608            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
609        assert!(result.is_err());
610
611        // Too large
612        let result = parse_and_translate(
613            "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
614        );
615        assert!(result.is_err());
616
617        // Valid
618        let result =
619            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
620        assert!(result.is_ok());
621    }
622
623    #[test]
624    fn test_backpressure_strategies() {
625        assert_eq!(
626            BackpressureStrategy::from_str("block").unwrap(),
627            BackpressureStrategy::Block
628        );
629        assert_eq!(
630            BackpressureStrategy::from_str("drop_oldest").unwrap(),
631            BackpressureStrategy::DropOldest
632        );
633        assert_eq!(
634            BackpressureStrategy::from_str("reject").unwrap(),
635            BackpressureStrategy::Reject
636        );
637        assert!(BackpressureStrategy::from_str("invalid").is_err());
638    }
639
640    #[test]
641    fn test_wait_strategies() {
642        assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
643        assert_eq!(
644            WaitStrategy::from_str("spin_yield").unwrap(),
645            WaitStrategy::SpinYield
646        );
647        assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
648        assert!(WaitStrategy::from_str("invalid").is_err());
649    }
650
651    #[test]
652    fn test_sql_type_conversions() {
653        let def = parse_and_translate(
654            "CREATE SOURCE types (
655                a TINYINT,
656                b SMALLINT,
657                c INT,
658                d BIGINT,
659                e FLOAT,
660                f DOUBLE,
661                g DECIMAL(10,2),
662                h VARCHAR(255),
663                i TEXT,
664                j BOOLEAN,
665                k TIMESTAMP,
666                l DATE
667            )",
668        )
669        .unwrap();
670
671        assert_eq!(def.columns.len(), 12);
672        assert_eq!(def.columns[0].data_type, DataType::Int8);
673        assert_eq!(def.columns[1].data_type, DataType::Int16);
674        assert_eq!(def.columns[2].data_type, DataType::Int32);
675        assert_eq!(def.columns[3].data_type, DataType::Int64);
676        assert_eq!(def.columns[4].data_type, DataType::Float32);
677        assert_eq!(def.columns[5].data_type, DataType::Float64);
678        assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
679        assert_eq!(def.columns[7].data_type, DataType::Utf8);
680        assert_eq!(def.columns[8].data_type, DataType::Utf8);
681        assert_eq!(def.columns[9].data_type, DataType::Boolean);
682        assert!(matches!(
683            def.columns[10].data_type,
684            DataType::Timestamp(_, _)
685        ));
686        assert_eq!(def.columns[11].data_type, DataType::Date32);
687    }
688
689    #[test]
690    fn test_schema_generation() {
691        let def = parse_and_translate(
692            "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
693        )
694        .unwrap();
695
696        let schema = def.schema;
697        assert_eq!(schema.fields().len(), 3);
698        assert_eq!(schema.field(0).name(), "id");
699        assert!(!schema.field(0).is_nullable());
700        assert_eq!(schema.field(1).name(), "name");
701        assert!(!schema.field(1).is_nullable());
702        assert_eq!(schema.field(2).name(), "value");
703        assert!(schema.field(2).is_nullable());
704    }
705
706    #[test]
707    fn test_watermark_column_not_found() {
708        let result = parse_and_translate(
709            "CREATE SOURCE events (
710                id BIGINT,
711                WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
712            )",
713        );
714
715        assert!(result.is_err());
716        assert!(result.unwrap_err().to_string().contains("not found"));
717    }
718
719    #[test]
720    fn test_watermark_wrong_type() {
721        let result = parse_and_translate(
722            "CREATE SOURCE events (
723                name VARCHAR,
724                WATERMARK FOR name AS name - INTERVAL '1' SECOND
725            )",
726        );
727
728        assert!(result.is_err());
729        assert!(result
730            .unwrap_err()
731            .to_string()
732            .contains("timestamp or integer type"));
733    }
734
735    #[test]
736    fn test_watermark_milliseconds() {
737        let def = parse_and_translate(
738            "CREATE SOURCE events (
739                ts TIMESTAMP,
740                WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
741            )",
742        )
743        .unwrap();
744
745        let wm = def.watermark.unwrap();
746        assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
747    }
748
749    #[test]
750    fn test_watermark_minutes() {
751        let def = parse_and_translate(
752            "CREATE SOURCE events (
753                ts TIMESTAMP,
754                WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
755            )",
756        )
757        .unwrap();
758
759        let wm = def.watermark.unwrap();
760        assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
761    }
762
763    #[test]
764    fn test_track_stats_option() {
765        let def =
766            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
767                .unwrap();
768
769        assert!(def.config.track_stats);
770    }
771
772    #[test]
773    fn test_wait_strategy_option() {
774        let def =
775            parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
776                .unwrap();
777
778        assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
779    }
780
781    #[test]
782    fn test_default_config() {
783        let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
784
785        assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
786        assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
787        assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
788        assert!(!def.config.track_stats);
789    }
790
791    #[test]
792    fn test_external_connector_options_ignored() {
793        // External connector options should be accepted but not affect config
794        let def = parse_and_translate(
795            "CREATE SOURCE events (id BIGINT) WITH (
796                'connector' = 'kafka',
797                'topic' = 'events',
798                'bootstrap.servers' = 'localhost:9092',
799                'buffer_size' = '8192'
800            )",
801        )
802        .unwrap();
803
804        // Only buffer_size should affect config
805        assert_eq!(def.config.buffer_size, 8192);
806    }
807
808    #[test]
809    fn test_source_watermark_no_expression() {
810        let def = parse_and_translate(
811            "CREATE SOURCE events (
812                ts TIMESTAMP,
813                WATERMARK FOR ts
814            )",
815        )
816        .unwrap();
817
818        assert!(def.watermark.is_some());
819        let wm = def.watermark.unwrap();
820        assert_eq!(wm.column, "ts");
821        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
822    }
823
824    #[test]
825    fn test_source_watermark_bigint_column() {
826        let def = parse_and_translate(
827            "CREATE SOURCE events (
828                ts BIGINT,
829                WATERMARK FOR ts
830            )",
831        )
832        .unwrap();
833
834        assert!(def.watermark.is_some());
835        let wm = def.watermark.unwrap();
836        assert_eq!(wm.column, "ts");
837        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
838    }
839
840    #[test]
841    fn test_watermark_proctime() {
842        let def = parse_and_translate(
843            "CREATE SOURCE events (
844                ts TIMESTAMP,
845                WATERMARK FOR ts AS PROCTIME()
846            )",
847        )
848        .unwrap();
849
850        assert!(def.watermark.is_some());
851        let wm = def.watermark.unwrap();
852        assert_eq!(wm.column, "ts");
853        assert!(wm.is_processing_time);
854        assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
855    }
856
857    #[test]
858    fn test_watermark_event_time_not_proctime() {
859        let def = parse_and_translate(
860            "CREATE SOURCE events (
861                ts TIMESTAMP,
862                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
863            )",
864        )
865        .unwrap();
866
867        let wm = def.watermark.unwrap();
868        assert!(!wm.is_processing_time);
869    }
870
871    #[test]
872    fn array_of_int_parses() {
873        let def = parse_and_translate("CREATE SOURCE events (tags ARRAY<INT>)").unwrap();
874        match &def.columns[0].data_type {
875            DataType::List(field) => assert_eq!(field.data_type(), &DataType::Int32),
876            other => panic!("expected DataType::List, got {other:?}"),
877        }
878    }
879
880    #[test]
881    fn decimal_with_precision_parses() {
882        let def = parse_and_translate("CREATE SOURCE events (amount DECIMAL(10, 2))").unwrap();
883        assert_eq!(def.columns[0].data_type, DataType::Decimal128(10, 2));
884    }
885
886    /// Hand-declared MAP columns point users at auto-discovery.
887    #[test]
888    fn hand_declared_map_column_errors_actionably() {
889        let err =
890            parse_and_translate("CREATE SOURCE events (data MAP(VARCHAR, VARCHAR))").unwrap_err();
891        let msg = err.to_string();
892        assert!(
893            msg.contains("use auto-discovery") || msg.contains("unsupported"),
894            "expected actionable error for hand-declared MAP, got: {msg}"
895        );
896    }
897}