1use std::collections::HashMap;
37use std::str::FromStr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
42use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
43
44use crate::parser::ParseError;
45use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
46
47pub const MIN_BUFFER_SIZE: usize = 4;
49
50pub const MAX_BUFFER_SIZE: usize = 1 << 20; pub const DEFAULT_BUFFER_SIZE: usize = 2048;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub enum BackpressureStrategy {
59 #[default]
61 Block,
62 DropOldest,
64 Reject,
66}
67
68impl std::str::FromStr for BackpressureStrategy {
69 type Err = ParseError;
70
71 fn from_str(s: &str) -> Result<Self, Self::Err> {
72 match s.to_lowercase().as_str() {
73 "block" | "blocking" => Ok(Self::Block),
74 "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
75 "reject" | "error" => Ok(Self::Reject),
76 _ => Err(ParseError::ValidationError(format!(
77 "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
78 s
79 ))),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum WaitStrategy {
87 Spin,
89 #[default]
91 SpinYield,
92 Park,
94}
95
96impl std::str::FromStr for WaitStrategy {
97 type Err = ParseError;
98
99 fn from_str(s: &str) -> Result<Self, Self::Err> {
100 match s.to_lowercase().as_str() {
101 "spin" => Ok(Self::Spin),
102 "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
103 "park" | "parking" => Ok(Self::Park),
104 _ => Err(ParseError::ValidationError(format!(
105 "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
106 s
107 ))),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct WatermarkSpec {
115 pub column: String,
117 pub max_out_of_orderness: Duration,
119 pub is_processing_time: bool,
124}
125
126#[derive(Debug, Clone)]
128pub struct SourceConfigOptions {
129 pub buffer_size: usize,
131 pub backpressure: BackpressureStrategy,
133 pub wait_strategy: WaitStrategy,
135 pub track_stats: bool,
137}
138
139impl Default for SourceConfigOptions {
140 fn default() -> Self {
141 Self {
142 buffer_size: DEFAULT_BUFFER_SIZE,
143 backpressure: BackpressureStrategy::Block,
144 wait_strategy: WaitStrategy::SpinYield,
145 track_stats: false,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct ColumnDefinition {
153 pub name: String,
155 pub data_type: DataType,
157 pub nullable: bool,
159}
160
161#[derive(Debug, Clone)]
166pub struct SourceDefinition {
167 pub name: String,
169 pub columns: Vec<ColumnDefinition>,
171 pub schema: SchemaRef,
173 pub watermark: Option<WatermarkSpec>,
175 pub config: SourceConfigOptions,
177}
178
179impl TryFrom<CreateSourceStatement> for SourceDefinition {
180 type Error = ParseError;
181
182 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
183 translate_create_source(stmt)
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct SinkDefinition {
190 pub name: String,
192 pub input: String,
194 pub config: SourceConfigOptions,
196}
197
198impl TryFrom<CreateSinkStatement> for SinkDefinition {
199 type Error = ParseError;
200
201 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
202 translate_create_sink(stmt)
203 }
204}
205
206pub fn translate_create_source(
215 stmt: CreateSourceStatement,
216) -> Result<SourceDefinition, ParseError> {
217 validate_source_options(&stmt.with_options)?;
219
220 let config = parse_source_options(&stmt.with_options)?;
222
223 let columns = convert_columns(&stmt.columns)?;
225
226 let fields: Vec<Field> = columns
228 .iter()
229 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
230 .collect();
231 let schema = Arc::new(Schema::new(fields));
232
233 let watermark = if let Some(wm) = stmt.watermark {
235 Some(parse_watermark(&wm, &columns)?)
236 } else {
237 None
238 };
239
240 Ok(SourceDefinition {
241 name: stmt.name.to_string(),
242 columns,
243 schema,
244 watermark,
245 config,
246 })
247}
248
249pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
257 validate_source_options(&stmt.with_options)?;
259
260 let config = parse_source_options(&stmt.with_options)?;
262
263 let input = match stmt.from {
265 SinkFrom::Table(name) => name.to_string(),
266 SinkFrom::Query(_) => {
267 return Err(ParseError::ValidationError(
269 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
270 ));
271 }
272 };
273
274 Ok(SinkDefinition {
275 name: stmt.name.to_string(),
276 input,
277 config,
278 })
279}
280
281fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
283 if options.contains_key("channel") {
285 return Err(ParseError::ValidationError(
286 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
287 ));
288 }
289
290 if options.contains_key("type") {
292 return Err(ParseError::ValidationError(
293 "the 'type' option is not user-configurable for in-memory streaming sources"
294 .to_string(),
295 ));
296 }
297
298 Ok(())
299}
300
301fn parse_source_options(
303 options: &HashMap<String, String>,
304) -> Result<SourceConfigOptions, ParseError> {
305 let mut config = SourceConfigOptions::default();
306
307 for (key, value) in options {
308 match key.to_lowercase().as_str() {
309 "buffer_size" | "buffersize" => {
310 config.buffer_size = parse_buffer_size(value)?;
311 }
312 "backpressure" => {
313 config.backpressure = BackpressureStrategy::from_str(value)?;
314 }
315 "wait_strategy" | "waitstrategy" => {
316 config.wait_strategy = WaitStrategy::from_str(value)?;
317 }
318 "track_stats" | "trackstats" | "stats" => {
319 config.track_stats = parse_bool(value)?;
320 }
321 _ => {}
325 }
326 }
327
328 Ok(config)
329}
330
331fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
333 let size: usize = value.parse().map_err(|_| {
334 ParseError::ValidationError(format!(
335 "invalid buffer_size: '{}' - must be a number",
336 value
337 ))
338 })?;
339
340 if size < MIN_BUFFER_SIZE {
341 return Err(ParseError::ValidationError(format!(
342 "buffer_size {} is too small - minimum is {}",
343 size, MIN_BUFFER_SIZE
344 )));
345 }
346
347 if size > MAX_BUFFER_SIZE {
348 return Err(ParseError::ValidationError(format!(
349 "buffer_size {} is too large - maximum is {}",
350 size, MAX_BUFFER_SIZE
351 )));
352 }
353
354 Ok(size)
355}
356
357fn parse_bool(value: &str) -> Result<bool, ParseError> {
359 match value.to_lowercase().as_str() {
360 "true" | "yes" | "on" | "1" => Ok(true),
361 "false" | "no" | "off" | "0" => Ok(false),
362 _ => Err(ParseError::ValidationError(format!(
363 "invalid boolean value: '{}' - expected true/false",
364 value
365 ))),
366 }
367}
368
369fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
371 columns.iter().map(convert_column).collect()
372}
373
374fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
376 let data_type = sql_type_to_arrow(&col.data_type)?;
377
378 let nullable = !col
380 .options
381 .iter()
382 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
383
384 Ok(ColumnDefinition {
385 name: col.name.to_string(),
386 data_type,
387 nullable,
388 })
389}
390
391pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
397 match sql_type {
398 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
400 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
401 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
402 SqlDataType::BigInt(_) => Ok(DataType::Int64),
403
404 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
409 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
410
411 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
413 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
415 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
416 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
417 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
419 Ok(DataType::Decimal128(precision, scale))
420 }
421
422 SqlDataType::Char(_)
424 | SqlDataType::Character(_)
425 | SqlDataType::Varchar(_)
426 | SqlDataType::CharacterVarying(_)
427 | SqlDataType::Text
428 | SqlDataType::String(_)
429 | SqlDataType::JSON
430 | SqlDataType::JSONB
431 | SqlDataType::Uuid => Ok(DataType::Utf8),
432
433 SqlDataType::Binary(_)
435 | SqlDataType::Varbinary(_)
436 | SqlDataType::Blob(_)
437 | SqlDataType::Bytea => Ok(DataType::Binary),
438
439 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
441
442 SqlDataType::Date => Ok(DataType::Date32),
444 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
445 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
446
447 SqlDataType::Interval { .. } => Ok(DataType::Interval(
449 arrow::datatypes::IntervalUnit::MonthDayNano,
450 )),
451
452 _ => Err(ParseError::ValidationError(format!(
454 "unsupported data type: {:?}",
455 sql_type
456 ))),
457 }
458}
459
460fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
462 if let sqlparser::ast::Expr::Function(func) = expr {
463 if let Some(name) = func.name.0.last() {
464 return name.to_string().eq_ignore_ascii_case("proctime");
465 }
466 }
467 false
468}
469
470fn parse_watermark(
472 wm: &WatermarkDef,
473 columns: &[ColumnDefinition],
474) -> Result<WatermarkSpec, ParseError> {
475 let column_name = wm.column.to_string();
476
477 let col = columns
479 .iter()
480 .find(|c| c.name == column_name)
481 .ok_or_else(|| {
482 ParseError::ValidationError(format!(
483 "watermark column '{}' not found in column list",
484 column_name
485 ))
486 })?;
487
488 if !matches!(
490 col.data_type,
491 DataType::Timestamp(_, _)
492 | DataType::Date32
493 | DataType::Date64
494 | DataType::Int64
495 | DataType::Int32
496 ) {
497 return Err(ParseError::ValidationError(format!(
498 "watermark column '{}' must be a timestamp or integer type, found {:?}",
499 column_name, col.data_type
500 )));
501 }
502
503 if let Some(expr) = &wm.expression {
505 if is_proctime_call(expr) {
506 return Ok(WatermarkSpec {
507 column: column_name,
508 max_out_of_orderness: Duration::ZERO,
509 is_processing_time: true,
510 });
511 }
512 }
513
514 let max_out_of_orderness = match &wm.expression {
517 Some(expr) => parse_watermark_expression(expr),
518 None => Duration::ZERO,
519 };
520
521 Ok(WatermarkSpec {
522 column: column_name,
523 max_out_of_orderness,
524 is_processing_time: false,
525 })
526}
527
528fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
530 use sqlparser::ast::Expr;
531
532 match expr {
533 Expr::BinaryOp { op, right, .. } => match op {
534 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
535 _ => Duration::ZERO,
536 },
537 Expr::Identifier(_) => Duration::ZERO,
539 _ => Duration::from_secs(1),
541 }
542}
543
544fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
546 use sqlparser::ast::Expr;
547
548 let Expr::Interval(interval) = expr else {
549 return Duration::from_secs(1);
550 };
551
552 let value_str = match interval.value.as_ref() {
554 Expr::Value(v) => {
555 match &v.value {
557 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
558 sqlparser::ast::Value::Number(n, _) => n.clone(),
559 _ => return Duration::from_secs(1),
560 }
561 }
562 _ => return Duration::from_secs(1),
563 };
564
565 let value: u64 = value_str.parse().unwrap_or(1);
566
567 let unit = interval
569 .leading_field
570 .as_ref()
571 .map_or("second", |u| match u {
572 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
573 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
574 sqlparser::ast::DateTimeField::Minute => "minute",
575 sqlparser::ast::DateTimeField::Hour => "hour",
576 sqlparser::ast::DateTimeField::Day => "day",
577 _ => "second",
578 });
579
580 match unit {
581 "microsecond" | "microseconds" => Duration::from_micros(value),
582 "millisecond" | "milliseconds" => Duration::from_millis(value),
583 "minute" | "minutes" => Duration::from_secs(value * 60),
584 "hour" | "hours" => Duration::from_secs(value * 3600),
585 "day" | "days" => Duration::from_secs(value * 86400),
586 _ => Duration::from_secs(value),
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use crate::parser::{parse_streaming_sql, StreamingStatement};
594
595 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
596 let statements = parse_streaming_sql(sql)?;
597 let stmt = statements
598 .into_iter()
599 .next()
600 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
601 match stmt {
602 StreamingStatement::CreateSource(source) => translate_create_source(*source),
603 _ => Err(ParseError::StreamingError(
604 "Expected CREATE SOURCE".to_string(),
605 )),
606 }
607 }
608
609 #[test]
610 fn test_basic_source() {
611 let def =
612 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
613
614 assert_eq!(def.name, "events");
615 assert_eq!(def.columns.len(), 2);
616 assert_eq!(def.columns[0].name, "id");
617 assert_eq!(def.columns[0].data_type, DataType::Int64);
618 assert!(!def.columns[0].nullable);
619 assert_eq!(def.columns[1].name, "name");
620 assert!(def.columns[1].nullable);
621 }
622
623 #[test]
624 fn test_source_with_options() {
625 let def = parse_and_translate(
626 "CREATE SOURCE events (id BIGINT) WITH (
627 'buffer_size' = '4096',
628 'backpressure' = 'reject'
629 )",
630 )
631 .unwrap();
632
633 assert_eq!(def.config.buffer_size, 4096);
634 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
635 }
636
637 #[test]
638 fn test_source_with_watermark() {
639 let def = parse_and_translate(
640 "CREATE SOURCE events (
641 id BIGINT,
642 ts TIMESTAMP,
643 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
644 )",
645 )
646 .unwrap();
647
648 assert!(def.watermark.is_some());
649 let wm = def.watermark.unwrap();
650 assert_eq!(wm.column, "ts");
651 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
652 }
653
654 #[test]
655 fn test_reject_channel_option() {
656 let result =
657 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
658
659 assert!(result.is_err());
660 let err = result.unwrap_err();
661 assert!(err.to_string().contains("channel"));
662 }
663
664 #[test]
665 fn test_reject_type_option() {
666 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
667
668 assert!(result.is_err());
669 }
670
671 #[test]
672 fn test_buffer_size_bounds() {
673 let result =
675 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
676 assert!(result.is_err());
677
678 let result = parse_and_translate(
680 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
681 );
682 assert!(result.is_err());
683
684 let result =
686 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
687 assert!(result.is_ok());
688 }
689
690 #[test]
691 fn test_backpressure_strategies() {
692 assert_eq!(
693 BackpressureStrategy::from_str("block").unwrap(),
694 BackpressureStrategy::Block
695 );
696 assert_eq!(
697 BackpressureStrategy::from_str("drop_oldest").unwrap(),
698 BackpressureStrategy::DropOldest
699 );
700 assert_eq!(
701 BackpressureStrategy::from_str("reject").unwrap(),
702 BackpressureStrategy::Reject
703 );
704 assert!(BackpressureStrategy::from_str("invalid").is_err());
705 }
706
707 #[test]
708 fn test_wait_strategies() {
709 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
710 assert_eq!(
711 WaitStrategy::from_str("spin_yield").unwrap(),
712 WaitStrategy::SpinYield
713 );
714 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
715 assert!(WaitStrategy::from_str("invalid").is_err());
716 }
717
718 #[test]
719 fn test_sql_type_conversions() {
720 let def = parse_and_translate(
721 "CREATE SOURCE types (
722 a TINYINT,
723 b SMALLINT,
724 c INT,
725 d BIGINT,
726 e FLOAT,
727 f DOUBLE,
728 g DECIMAL(10,2),
729 h VARCHAR(255),
730 i TEXT,
731 j BOOLEAN,
732 k TIMESTAMP,
733 l DATE
734 )",
735 )
736 .unwrap();
737
738 assert_eq!(def.columns.len(), 12);
739 assert_eq!(def.columns[0].data_type, DataType::Int8);
740 assert_eq!(def.columns[1].data_type, DataType::Int16);
741 assert_eq!(def.columns[2].data_type, DataType::Int32);
742 assert_eq!(def.columns[3].data_type, DataType::Int64);
743 assert_eq!(def.columns[4].data_type, DataType::Float32);
744 assert_eq!(def.columns[5].data_type, DataType::Float64);
745 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
746 assert_eq!(def.columns[7].data_type, DataType::Utf8);
747 assert_eq!(def.columns[8].data_type, DataType::Utf8);
748 assert_eq!(def.columns[9].data_type, DataType::Boolean);
749 assert!(matches!(
750 def.columns[10].data_type,
751 DataType::Timestamp(_, _)
752 ));
753 assert_eq!(def.columns[11].data_type, DataType::Date32);
754 }
755
756 #[test]
757 fn test_schema_generation() {
758 let def = parse_and_translate(
759 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
760 )
761 .unwrap();
762
763 let schema = def.schema;
764 assert_eq!(schema.fields().len(), 3);
765 assert_eq!(schema.field(0).name(), "id");
766 assert!(!schema.field(0).is_nullable());
767 assert_eq!(schema.field(1).name(), "name");
768 assert!(!schema.field(1).is_nullable());
769 assert_eq!(schema.field(2).name(), "value");
770 assert!(schema.field(2).is_nullable());
771 }
772
773 #[test]
774 fn test_watermark_column_not_found() {
775 let result = parse_and_translate(
776 "CREATE SOURCE events (
777 id BIGINT,
778 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
779 )",
780 );
781
782 assert!(result.is_err());
783 assert!(result.unwrap_err().to_string().contains("not found"));
784 }
785
786 #[test]
787 fn test_watermark_wrong_type() {
788 let result = parse_and_translate(
789 "CREATE SOURCE events (
790 name VARCHAR,
791 WATERMARK FOR name AS name - INTERVAL '1' SECOND
792 )",
793 );
794
795 assert!(result.is_err());
796 assert!(result
797 .unwrap_err()
798 .to_string()
799 .contains("timestamp or integer type"));
800 }
801
802 #[test]
803 fn test_watermark_milliseconds() {
804 let def = parse_and_translate(
805 "CREATE SOURCE events (
806 ts TIMESTAMP,
807 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
808 )",
809 )
810 .unwrap();
811
812 let wm = def.watermark.unwrap();
813 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
814 }
815
816 #[test]
817 fn test_watermark_minutes() {
818 let def = parse_and_translate(
819 "CREATE SOURCE events (
820 ts TIMESTAMP,
821 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
822 )",
823 )
824 .unwrap();
825
826 let wm = def.watermark.unwrap();
827 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
828 }
829
830 #[test]
831 fn test_track_stats_option() {
832 let def =
833 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
834 .unwrap();
835
836 assert!(def.config.track_stats);
837 }
838
839 #[test]
840 fn test_wait_strategy_option() {
841 let def =
842 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
843 .unwrap();
844
845 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
846 }
847
848 #[test]
849 fn test_default_config() {
850 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
851
852 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
853 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
854 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
855 assert!(!def.config.track_stats);
856 }
857
858 #[test]
859 fn test_external_connector_options_ignored() {
860 let def = parse_and_translate(
862 "CREATE SOURCE events (id BIGINT) WITH (
863 'connector' = 'kafka',
864 'topic' = 'events',
865 'bootstrap.servers' = 'localhost:9092',
866 'buffer_size' = '8192'
867 )",
868 )
869 .unwrap();
870
871 assert_eq!(def.config.buffer_size, 8192);
873 }
874
875 #[test]
876 fn test_source_watermark_no_expression() {
877 let def = parse_and_translate(
878 "CREATE SOURCE events (
879 ts TIMESTAMP,
880 WATERMARK FOR ts
881 )",
882 )
883 .unwrap();
884
885 assert!(def.watermark.is_some());
886 let wm = def.watermark.unwrap();
887 assert_eq!(wm.column, "ts");
888 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
889 }
890
891 #[test]
892 fn test_source_watermark_bigint_column() {
893 let def = parse_and_translate(
894 "CREATE SOURCE events (
895 ts BIGINT,
896 WATERMARK FOR ts
897 )",
898 )
899 .unwrap();
900
901 assert!(def.watermark.is_some());
902 let wm = def.watermark.unwrap();
903 assert_eq!(wm.column, "ts");
904 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
905 }
906
907 #[test]
908 fn test_watermark_proctime() {
909 let def = parse_and_translate(
910 "CREATE SOURCE events (
911 ts TIMESTAMP,
912 WATERMARK FOR ts AS PROCTIME()
913 )",
914 )
915 .unwrap();
916
917 assert!(def.watermark.is_some());
918 let wm = def.watermark.unwrap();
919 assert_eq!(wm.column, "ts");
920 assert!(wm.is_processing_time);
921 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
922 }
923
924 #[test]
925 fn test_watermark_event_time_not_proctime() {
926 let def = parse_and_translate(
927 "CREATE SOURCE events (
928 ts TIMESTAMP,
929 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
930 )",
931 )
932 .unwrap();
933
934 let wm = def.watermark.unwrap();
935 assert!(!wm.is_processing_time);
936 }
937}