1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
10use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
11
12use laminar_core::streaming::config::{
13 BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
14};
15
16use crate::parser::ParseError;
17use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
18
19#[derive(Debug, Clone)]
21pub struct WatermarkSpec {
22 pub column: String,
24 pub max_out_of_orderness: Duration,
26 pub is_processing_time: bool,
31}
32
33#[derive(Debug, Clone)]
35pub struct SourceConfigOptions {
36 pub buffer_size: usize,
38 pub backpressure: BackpressureStrategy,
40 pub wait_strategy: WaitStrategy,
42 pub track_stats: bool,
44}
45
46impl Default for SourceConfigOptions {
47 fn default() -> Self {
48 Self {
49 buffer_size: DEFAULT_BUFFER_SIZE,
50 backpressure: BackpressureStrategy::Block,
51 wait_strategy: WaitStrategy::SpinYield,
52 track_stats: false,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct ColumnDefinition {
60 pub name: String,
62 pub data_type: DataType,
64 pub nullable: bool,
66}
67
68#[derive(Debug, Clone)]
73pub struct SourceDefinition {
74 pub name: String,
76 pub columns: Vec<ColumnDefinition>,
78 pub schema: SchemaRef,
80 pub watermark: Option<WatermarkSpec>,
82 pub config: SourceConfigOptions,
84}
85
86impl TryFrom<CreateSourceStatement> for SourceDefinition {
87 type Error = ParseError;
88
89 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
90 translate_create_source(stmt)
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct SinkDefinition {
97 pub name: String,
99 pub input: String,
101 pub config: SourceConfigOptions,
103}
104
105impl TryFrom<CreateSinkStatement> for SinkDefinition {
106 type Error = ParseError;
107
108 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
109 translate_create_sink(stmt)
110 }
111}
112
113pub fn translate_create_source(
122 stmt: CreateSourceStatement,
123) -> Result<SourceDefinition, ParseError> {
124 let columns = convert_columns(&stmt.columns)?;
125 translate_create_source_with_columns(stmt, columns)
126}
127
128pub fn translate_create_source_with_columns(
136 stmt: CreateSourceStatement,
137 columns: Vec<ColumnDefinition>,
138) -> Result<SourceDefinition, ParseError> {
139 validate_source_options(&stmt.with_options)?;
140 let config = parse_source_options(&stmt.with_options)?;
141
142 let fields: Vec<Field> = columns
143 .iter()
144 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
145 .collect();
146 let schema = Arc::new(Schema::new(fields));
147
148 let watermark = if let Some(wm) = stmt.watermark {
149 Some(parse_watermark(&wm, &columns)?)
150 } else {
151 None
152 };
153
154 Ok(SourceDefinition {
155 name: stmt.name.to_string(),
156 columns,
157 schema,
158 watermark,
159 config,
160 })
161}
162
163pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
171 validate_source_options(&stmt.with_options)?;
173
174 let config = parse_source_options(&stmt.with_options)?;
176
177 let input = match stmt.from {
179 SinkFrom::Table(name) => name.to_string(),
180 SinkFrom::Query(_) => {
181 return Err(ParseError::ValidationError(
183 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
184 ));
185 }
186 };
187
188 Ok(SinkDefinition {
189 name: stmt.name.to_string(),
190 input,
191 config,
192 })
193}
194
195fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
197 if options.contains_key("channel") {
199 return Err(ParseError::ValidationError(
200 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
201 ));
202 }
203
204 if options.contains_key("type") {
206 return Err(ParseError::ValidationError(
207 "the 'type' option is not user-configurable for in-memory streaming sources"
208 .to_string(),
209 ));
210 }
211
212 Ok(())
213}
214
215fn parse_source_options(
217 options: &HashMap<String, String>,
218) -> Result<SourceConfigOptions, ParseError> {
219 let mut config = SourceConfigOptions::default();
220
221 for (key, value) in options {
222 match key.to_lowercase().as_str() {
223 "buffer_size" | "buffersize" => {
224 config.buffer_size = parse_buffer_size(value)?;
225 }
226 "backpressure" => {
227 config.backpressure =
228 BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
229 }
230 "wait_strategy" | "waitstrategy" => {
231 config.wait_strategy =
232 WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
233 }
234 "track_stats" | "trackstats" | "stats" => {
235 config.track_stats = parse_bool(value)?;
236 }
237 _ => {}
241 }
242 }
243
244 Ok(config)
245}
246
247fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
249 let size: usize = value.parse().map_err(|_| {
250 ParseError::ValidationError(format!(
251 "invalid buffer_size: '{}' - must be a number",
252 value
253 ))
254 })?;
255
256 if size < MIN_BUFFER_SIZE {
257 return Err(ParseError::ValidationError(format!(
258 "buffer_size {} is too small - minimum is {}",
259 size, MIN_BUFFER_SIZE
260 )));
261 }
262
263 if size > MAX_BUFFER_SIZE {
264 return Err(ParseError::ValidationError(format!(
265 "buffer_size {} is too large - maximum is {}",
266 size, MAX_BUFFER_SIZE
267 )));
268 }
269
270 Ok(size)
271}
272
273fn parse_bool(value: &str) -> Result<bool, ParseError> {
275 match value.to_lowercase().as_str() {
276 "true" | "yes" | "on" | "1" => Ok(true),
277 "false" | "no" | "off" | "0" => Ok(false),
278 _ => Err(ParseError::ValidationError(format!(
279 "invalid boolean value: '{}' - expected true/false",
280 value
281 ))),
282 }
283}
284
285fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
287 columns.iter().map(convert_column).collect()
288}
289
290fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
292 let data_type = sql_type_to_arrow(&col.data_type)?;
293
294 let nullable = !col
296 .options
297 .iter()
298 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
299
300 Ok(ColumnDefinition {
301 name: col.name.value.clone(),
302 data_type,
303 nullable,
304 })
305}
306
307pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
313 match sql_type {
314 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
316 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
317 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
318 SqlDataType::BigInt(_) => Ok(DataType::Int64),
319
320 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
325 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
326
327 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
329 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
331 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
332 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
333 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
335 Ok(DataType::Decimal128(precision, scale))
336 }
337
338 SqlDataType::Char(_)
340 | SqlDataType::Character(_)
341 | SqlDataType::Varchar(_)
342 | SqlDataType::CharacterVarying(_)
343 | SqlDataType::Text
344 | SqlDataType::String(_)
345 | SqlDataType::JSON
346 | SqlDataType::JSONB
347 | SqlDataType::Uuid => Ok(DataType::Utf8),
348
349 SqlDataType::Binary(_)
351 | SqlDataType::Varbinary(_)
352 | SqlDataType::Blob(_)
353 | SqlDataType::Bytea => Ok(DataType::Binary),
354
355 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
357
358 SqlDataType::Date => Ok(DataType::Date32),
360 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
361 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
362
363 SqlDataType::Interval { .. } => Ok(DataType::Interval(
365 arrow::datatypes::IntervalUnit::MonthDayNano,
366 )),
367
368 SqlDataType::Array(elem_def) => {
370 let item_type = match elem_def {
371 sqlparser::ast::ArrayElemTypeDef::AngleBracket(t)
372 | sqlparser::ast::ArrayElemTypeDef::SquareBracket(t, _)
373 | sqlparser::ast::ArrayElemTypeDef::Parenthesis(t) => sql_type_to_arrow(t)?,
374 sqlparser::ast::ArrayElemTypeDef::None => {
375 return Err(ParseError::ValidationError(
376 "ARRAY type requires element type, e.g. ARRAY<INT>".into(),
377 ));
378 }
379 };
380 Ok(DataType::List(Arc::new(Field::new(
381 "item", item_type, true,
382 ))))
383 }
384
385 _ => Err(ParseError::ValidationError(format!(
387 "unsupported data type in hand-declared column: {sql_type:?} \
388 — use auto-discovery with an Avro source for complex types"
389 ))),
390 }
391}
392
393fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
395 if let sqlparser::ast::Expr::Function(func) = expr {
396 if let Some(name) = func.name.0.last() {
397 return name.to_string().eq_ignore_ascii_case("proctime");
398 }
399 }
400 false
401}
402
403fn parse_watermark(
405 wm: &WatermarkDef,
406 columns: &[ColumnDefinition],
407) -> Result<WatermarkSpec, ParseError> {
408 let column_name = wm.column.value.clone();
409
410 let col = columns
412 .iter()
413 .find(|c| c.name == column_name)
414 .ok_or_else(|| {
415 ParseError::ValidationError(format!(
416 "watermark column '{}' not found in column list",
417 column_name
418 ))
419 })?;
420
421 if !matches!(
423 col.data_type,
424 DataType::Timestamp(_, _)
425 | DataType::Date32
426 | DataType::Date64
427 | DataType::Int64
428 | DataType::Int32
429 ) {
430 return Err(ParseError::ValidationError(format!(
431 "watermark column '{}' must be a timestamp or integer type, found {:?}",
432 column_name, col.data_type
433 )));
434 }
435
436 if let Some(expr) = &wm.expression {
438 if is_proctime_call(expr) {
439 return Ok(WatermarkSpec {
440 column: column_name,
441 max_out_of_orderness: Duration::ZERO,
442 is_processing_time: true,
443 });
444 }
445 }
446
447 let max_out_of_orderness = match &wm.expression {
450 Some(expr) => parse_watermark_expression(expr),
451 None => Duration::ZERO,
452 };
453
454 Ok(WatermarkSpec {
455 column: column_name,
456 max_out_of_orderness,
457 is_processing_time: false,
458 })
459}
460
461fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
463 use sqlparser::ast::Expr;
464
465 match expr {
466 Expr::BinaryOp { op, right, .. } => match op {
467 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
468 _ => Duration::ZERO,
469 },
470 Expr::Identifier(_) => Duration::ZERO,
472 _ => Duration::from_secs(1),
474 }
475}
476
477fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
479 use sqlparser::ast::Expr;
480
481 let Expr::Interval(interval) = expr else {
482 return Duration::from_secs(1);
483 };
484
485 let value_str = match interval.value.as_ref() {
487 Expr::Value(v) => {
488 match &v.value {
490 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
491 sqlparser::ast::Value::Number(n, _) => n.clone(),
492 _ => return Duration::from_secs(1),
493 }
494 }
495 _ => return Duration::from_secs(1),
496 };
497
498 let value: u64 = value_str.parse().unwrap_or(1);
499
500 let unit = interval
502 .leading_field
503 .as_ref()
504 .map_or("second", |u| match u {
505 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
506 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
507 sqlparser::ast::DateTimeField::Minute => "minute",
508 sqlparser::ast::DateTimeField::Hour => "hour",
509 sqlparser::ast::DateTimeField::Day => "day",
510 _ => "second",
511 });
512
513 match unit {
514 "microsecond" | "microseconds" => Duration::from_micros(value),
515 "millisecond" | "milliseconds" => Duration::from_millis(value),
516 "minute" | "minutes" => Duration::from_secs(value * 60),
517 "hour" | "hours" => Duration::from_secs(value * 3600),
518 "day" | "days" => Duration::from_secs(value * 86400),
519 _ => Duration::from_secs(value),
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::parser::{parse_streaming_sql, StreamingStatement};
527
528 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
529 let statements = parse_streaming_sql(sql)?;
530 let stmt = statements
531 .into_iter()
532 .next()
533 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
534 match stmt {
535 StreamingStatement::CreateSource(source) => translate_create_source(*source),
536 _ => Err(ParseError::StreamingError(
537 "Expected CREATE SOURCE".to_string(),
538 )),
539 }
540 }
541
542 #[test]
543 fn test_basic_source() {
544 let def =
545 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
546
547 assert_eq!(def.name, "events");
548 assert_eq!(def.columns.len(), 2);
549 assert_eq!(def.columns[0].name, "id");
550 assert_eq!(def.columns[0].data_type, DataType::Int64);
551 assert!(!def.columns[0].nullable);
552 assert_eq!(def.columns[1].name, "name");
553 assert!(def.columns[1].nullable);
554 }
555
556 #[test]
557 fn test_source_with_options() {
558 let def = parse_and_translate(
559 "CREATE SOURCE events (id BIGINT) WITH (
560 'buffer_size' = '4096',
561 'backpressure' = 'reject'
562 )",
563 )
564 .unwrap();
565
566 assert_eq!(def.config.buffer_size, 4096);
567 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
568 }
569
570 #[test]
571 fn test_source_with_watermark() {
572 let def = parse_and_translate(
573 "CREATE SOURCE events (
574 id BIGINT,
575 ts TIMESTAMP,
576 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
577 )",
578 )
579 .unwrap();
580
581 assert!(def.watermark.is_some());
582 let wm = def.watermark.unwrap();
583 assert_eq!(wm.column, "ts");
584 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
585 }
586
587 #[test]
588 fn test_reject_channel_option() {
589 let result =
590 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
591
592 assert!(result.is_err());
593 let err = result.unwrap_err();
594 assert!(err.to_string().contains("channel"));
595 }
596
597 #[test]
598 fn test_reject_type_option() {
599 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
600
601 assert!(result.is_err());
602 }
603
604 #[test]
605 fn test_buffer_size_bounds() {
606 let result =
608 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
609 assert!(result.is_err());
610
611 let result = parse_and_translate(
613 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
614 );
615 assert!(result.is_err());
616
617 let result =
619 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
620 assert!(result.is_ok());
621 }
622
623 #[test]
624 fn test_backpressure_strategies() {
625 assert_eq!(
626 BackpressureStrategy::from_str("block").unwrap(),
627 BackpressureStrategy::Block
628 );
629 assert_eq!(
630 BackpressureStrategy::from_str("drop_oldest").unwrap(),
631 BackpressureStrategy::DropOldest
632 );
633 assert_eq!(
634 BackpressureStrategy::from_str("reject").unwrap(),
635 BackpressureStrategy::Reject
636 );
637 assert!(BackpressureStrategy::from_str("invalid").is_err());
638 }
639
640 #[test]
641 fn test_wait_strategies() {
642 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
643 assert_eq!(
644 WaitStrategy::from_str("spin_yield").unwrap(),
645 WaitStrategy::SpinYield
646 );
647 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
648 assert!(WaitStrategy::from_str("invalid").is_err());
649 }
650
651 #[test]
652 fn test_sql_type_conversions() {
653 let def = parse_and_translate(
654 "CREATE SOURCE types (
655 a TINYINT,
656 b SMALLINT,
657 c INT,
658 d BIGINT,
659 e FLOAT,
660 f DOUBLE,
661 g DECIMAL(10,2),
662 h VARCHAR(255),
663 i TEXT,
664 j BOOLEAN,
665 k TIMESTAMP,
666 l DATE
667 )",
668 )
669 .unwrap();
670
671 assert_eq!(def.columns.len(), 12);
672 assert_eq!(def.columns[0].data_type, DataType::Int8);
673 assert_eq!(def.columns[1].data_type, DataType::Int16);
674 assert_eq!(def.columns[2].data_type, DataType::Int32);
675 assert_eq!(def.columns[3].data_type, DataType::Int64);
676 assert_eq!(def.columns[4].data_type, DataType::Float32);
677 assert_eq!(def.columns[5].data_type, DataType::Float64);
678 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
679 assert_eq!(def.columns[7].data_type, DataType::Utf8);
680 assert_eq!(def.columns[8].data_type, DataType::Utf8);
681 assert_eq!(def.columns[9].data_type, DataType::Boolean);
682 assert!(matches!(
683 def.columns[10].data_type,
684 DataType::Timestamp(_, _)
685 ));
686 assert_eq!(def.columns[11].data_type, DataType::Date32);
687 }
688
689 #[test]
690 fn test_schema_generation() {
691 let def = parse_and_translate(
692 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
693 )
694 .unwrap();
695
696 let schema = def.schema;
697 assert_eq!(schema.fields().len(), 3);
698 assert_eq!(schema.field(0).name(), "id");
699 assert!(!schema.field(0).is_nullable());
700 assert_eq!(schema.field(1).name(), "name");
701 assert!(!schema.field(1).is_nullable());
702 assert_eq!(schema.field(2).name(), "value");
703 assert!(schema.field(2).is_nullable());
704 }
705
706 #[test]
707 fn test_watermark_column_not_found() {
708 let result = parse_and_translate(
709 "CREATE SOURCE events (
710 id BIGINT,
711 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
712 )",
713 );
714
715 assert!(result.is_err());
716 assert!(result.unwrap_err().to_string().contains("not found"));
717 }
718
719 #[test]
720 fn test_watermark_wrong_type() {
721 let result = parse_and_translate(
722 "CREATE SOURCE events (
723 name VARCHAR,
724 WATERMARK FOR name AS name - INTERVAL '1' SECOND
725 )",
726 );
727
728 assert!(result.is_err());
729 assert!(result
730 .unwrap_err()
731 .to_string()
732 .contains("timestamp or integer type"));
733 }
734
735 #[test]
736 fn test_watermark_milliseconds() {
737 let def = parse_and_translate(
738 "CREATE SOURCE events (
739 ts TIMESTAMP,
740 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
741 )",
742 )
743 .unwrap();
744
745 let wm = def.watermark.unwrap();
746 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
747 }
748
749 #[test]
750 fn test_watermark_minutes() {
751 let def = parse_and_translate(
752 "CREATE SOURCE events (
753 ts TIMESTAMP,
754 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
755 )",
756 )
757 .unwrap();
758
759 let wm = def.watermark.unwrap();
760 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
761 }
762
763 #[test]
764 fn test_track_stats_option() {
765 let def =
766 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
767 .unwrap();
768
769 assert!(def.config.track_stats);
770 }
771
772 #[test]
773 fn test_wait_strategy_option() {
774 let def =
775 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
776 .unwrap();
777
778 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
779 }
780
781 #[test]
782 fn test_default_config() {
783 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
784
785 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
786 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
787 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
788 assert!(!def.config.track_stats);
789 }
790
791 #[test]
792 fn test_external_connector_options_ignored() {
793 let def = parse_and_translate(
795 "CREATE SOURCE events (id BIGINT) WITH (
796 'connector' = 'kafka',
797 'topic' = 'events',
798 'bootstrap.servers' = 'localhost:9092',
799 'buffer_size' = '8192'
800 )",
801 )
802 .unwrap();
803
804 assert_eq!(def.config.buffer_size, 8192);
806 }
807
808 #[test]
809 fn test_source_watermark_no_expression() {
810 let def = parse_and_translate(
811 "CREATE SOURCE events (
812 ts TIMESTAMP,
813 WATERMARK FOR ts
814 )",
815 )
816 .unwrap();
817
818 assert!(def.watermark.is_some());
819 let wm = def.watermark.unwrap();
820 assert_eq!(wm.column, "ts");
821 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
822 }
823
824 #[test]
825 fn test_source_watermark_bigint_column() {
826 let def = parse_and_translate(
827 "CREATE SOURCE events (
828 ts BIGINT,
829 WATERMARK FOR ts
830 )",
831 )
832 .unwrap();
833
834 assert!(def.watermark.is_some());
835 let wm = def.watermark.unwrap();
836 assert_eq!(wm.column, "ts");
837 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
838 }
839
840 #[test]
841 fn test_watermark_proctime() {
842 let def = parse_and_translate(
843 "CREATE SOURCE events (
844 ts TIMESTAMP,
845 WATERMARK FOR ts AS PROCTIME()
846 )",
847 )
848 .unwrap();
849
850 assert!(def.watermark.is_some());
851 let wm = def.watermark.unwrap();
852 assert_eq!(wm.column, "ts");
853 assert!(wm.is_processing_time);
854 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
855 }
856
857 #[test]
858 fn test_watermark_event_time_not_proctime() {
859 let def = parse_and_translate(
860 "CREATE SOURCE events (
861 ts TIMESTAMP,
862 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
863 )",
864 )
865 .unwrap();
866
867 let wm = def.watermark.unwrap();
868 assert!(!wm.is_processing_time);
869 }
870
871 #[test]
872 fn array_of_int_parses() {
873 let def = parse_and_translate("CREATE SOURCE events (tags ARRAY<INT>)").unwrap();
874 match &def.columns[0].data_type {
875 DataType::List(field) => assert_eq!(field.data_type(), &DataType::Int32),
876 other => panic!("expected DataType::List, got {other:?}"),
877 }
878 }
879
880 #[test]
881 fn decimal_with_precision_parses() {
882 let def = parse_and_translate("CREATE SOURCE events (amount DECIMAL(10, 2))").unwrap();
883 assert_eq!(def.columns[0].data_type, DataType::Decimal128(10, 2));
884 }
885
886 #[test]
888 fn hand_declared_map_column_errors_actionably() {
889 let err =
890 parse_and_translate("CREATE SOURCE events (data MAP(VARCHAR, VARCHAR))").unwrap_err();
891 let msg = err.to_string();
892 assert!(
893 msg.contains("use auto-discovery") || msg.contains("unsupported"),
894 "expected actionable error for hand-declared MAP, got: {msg}"
895 );
896 }
897}