1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use crate::parser::ParseError;
46use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
47
48pub const MIN_BUFFER_SIZE: usize = 4;
50
51pub const MAX_BUFFER_SIZE: usize = 1 << 20; pub const DEFAULT_BUFFER_SIZE: usize = 2048;
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59pub enum BackpressureStrategy {
60 #[default]
62 Block,
63 DropOldest,
65 Reject,
67}
68
69impl std::str::FromStr for BackpressureStrategy {
70 type Err = ParseError;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 match s.to_lowercase().as_str() {
74 "block" | "blocking" => Ok(Self::Block),
75 "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
76 "reject" | "error" => Ok(Self::Reject),
77 _ => Err(ParseError::ValidationError(format!(
78 "invalid backpressure strategy: '{}'. Valid values: block, drop_oldest, reject",
79 s
80 ))),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
87pub enum WaitStrategy {
88 Spin,
90 #[default]
92 SpinYield,
93 Park,
95}
96
97impl std::str::FromStr for WaitStrategy {
98 type Err = ParseError;
99
100 fn from_str(s: &str) -> Result<Self, Self::Err> {
101 match s.to_lowercase().as_str() {
102 "spin" => Ok(Self::Spin),
103 "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
104 "park" | "parking" => Ok(Self::Park),
105 _ => Err(ParseError::ValidationError(format!(
106 "invalid wait strategy: '{}'. Valid values: spin, spin_yield, park",
107 s
108 ))),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct WatermarkSpec {
116 pub column: String,
118 pub max_out_of_orderness: Duration,
120 pub is_processing_time: bool,
125}
126
127#[derive(Debug, Clone)]
129pub struct SourceConfigOptions {
130 pub buffer_size: usize,
132 pub backpressure: BackpressureStrategy,
134 pub wait_strategy: WaitStrategy,
136 pub track_stats: bool,
138}
139
140impl Default for SourceConfigOptions {
141 fn default() -> Self {
142 Self {
143 buffer_size: DEFAULT_BUFFER_SIZE,
144 backpressure: BackpressureStrategy::Block,
145 wait_strategy: WaitStrategy::SpinYield,
146 track_stats: false,
147 }
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct ColumnDefinition {
154 pub name: String,
156 pub data_type: DataType,
158 pub nullable: bool,
160}
161
162#[derive(Debug, Clone)]
167pub struct SourceDefinition {
168 pub name: String,
170 pub columns: Vec<ColumnDefinition>,
172 pub schema: SchemaRef,
174 pub watermark: Option<WatermarkSpec>,
176 pub config: SourceConfigOptions,
178}
179
180impl TryFrom<CreateSourceStatement> for SourceDefinition {
181 type Error = ParseError;
182
183 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
184 translate_create_source(stmt)
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct SinkDefinition {
191 pub name: String,
193 pub input: String,
195 pub config: SourceConfigOptions,
197}
198
199impl TryFrom<CreateSinkStatement> for SinkDefinition {
200 type Error = ParseError;
201
202 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
203 translate_create_sink(stmt)
204 }
205}
206
207pub fn translate_create_source(
216 stmt: CreateSourceStatement,
217) -> Result<SourceDefinition, ParseError> {
218 validate_source_options(&stmt.with_options)?;
220
221 let config = parse_source_options(&stmt.with_options)?;
223
224 let columns = convert_columns(&stmt.columns)?;
226
227 let fields: Vec<Field> = columns
229 .iter()
230 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
231 .collect();
232 let schema = Arc::new(Schema::new(fields));
233
234 let watermark = if let Some(wm) = stmt.watermark {
236 Some(parse_watermark(&wm, &columns)?)
237 } else {
238 None
239 };
240
241 Ok(SourceDefinition {
242 name: stmt.name.to_string(),
243 columns,
244 schema,
245 watermark,
246 config,
247 })
248}
249
250pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
258 validate_source_options(&stmt.with_options)?;
260
261 let config = parse_source_options(&stmt.with_options)?;
263
264 let input = match stmt.from {
266 SinkFrom::Table(name) => name.to_string(),
267 SinkFrom::Query(_) => {
268 return Err(ParseError::ValidationError(
270 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
271 ));
272 }
273 };
274
275 Ok(SinkDefinition {
276 name: stmt.name.to_string(),
277 input,
278 config,
279 })
280}
281
282fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
284 if options.contains_key("channel") {
286 return Err(ParseError::ValidationError(
287 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
288 ));
289 }
290
291 if options.contains_key("type") {
293 return Err(ParseError::ValidationError(
294 "the 'type' option is not user-configurable for in-memory streaming sources"
295 .to_string(),
296 ));
297 }
298
299 Ok(())
300}
301
302fn parse_source_options(
304 options: &HashMap<String, String>,
305) -> Result<SourceConfigOptions, ParseError> {
306 let mut config = SourceConfigOptions::default();
307
308 for (key, value) in options {
309 match key.to_lowercase().as_str() {
310 "buffer_size" | "buffersize" => {
311 config.buffer_size = parse_buffer_size(value)?;
312 }
313 "backpressure" => {
314 config.backpressure = BackpressureStrategy::from_str(value)?;
315 }
316 "wait_strategy" | "waitstrategy" => {
317 config.wait_strategy = WaitStrategy::from_str(value)?;
318 }
319 "track_stats" | "trackstats" | "stats" => {
320 config.track_stats = parse_bool(value)?;
321 }
322 _ => {}
326 }
327 }
328
329 Ok(config)
330}
331
332fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
334 let size: usize = value.parse().map_err(|_| {
335 ParseError::ValidationError(format!(
336 "invalid buffer_size: '{}' - must be a number",
337 value
338 ))
339 })?;
340
341 if size < MIN_BUFFER_SIZE {
342 return Err(ParseError::ValidationError(format!(
343 "buffer_size {} is too small - minimum is {}",
344 size, MIN_BUFFER_SIZE
345 )));
346 }
347
348 if size > MAX_BUFFER_SIZE {
349 return Err(ParseError::ValidationError(format!(
350 "buffer_size {} is too large - maximum is {}",
351 size, MAX_BUFFER_SIZE
352 )));
353 }
354
355 Ok(size)
356}
357
358fn parse_bool(value: &str) -> Result<bool, ParseError> {
360 match value.to_lowercase().as_str() {
361 "true" | "yes" | "on" | "1" => Ok(true),
362 "false" | "no" | "off" | "0" => Ok(false),
363 _ => Err(ParseError::ValidationError(format!(
364 "invalid boolean value: '{}' - expected true/false",
365 value
366 ))),
367 }
368}
369
370fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
372 columns.iter().map(convert_column).collect()
373}
374
375fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
377 let data_type = sql_type_to_arrow(&col.data_type)?;
378
379 let nullable = !col
381 .options
382 .iter()
383 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
384
385 Ok(ColumnDefinition {
386 name: col.name.value.clone(),
387 data_type,
388 nullable,
389 })
390}
391
392pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
398 match sql_type {
399 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
401 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
402 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
403 SqlDataType::BigInt(_) => Ok(DataType::Int64),
404
405 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
410 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
411
412 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
414 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
416 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
417 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
418 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
420 Ok(DataType::Decimal128(precision, scale))
421 }
422
423 SqlDataType::Char(_)
425 | SqlDataType::Character(_)
426 | SqlDataType::Varchar(_)
427 | SqlDataType::CharacterVarying(_)
428 | SqlDataType::Text
429 | SqlDataType::String(_)
430 | SqlDataType::JSON
431 | SqlDataType::JSONB
432 | SqlDataType::Uuid => Ok(DataType::Utf8),
433
434 SqlDataType::Binary(_)
436 | SqlDataType::Varbinary(_)
437 | SqlDataType::Blob(_)
438 | SqlDataType::Bytea => Ok(DataType::Binary),
439
440 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
442
443 SqlDataType::Date => Ok(DataType::Date32),
445 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
446 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
447
448 SqlDataType::Interval { .. } => Ok(DataType::Interval(
450 arrow::datatypes::IntervalUnit::MonthDayNano,
451 )),
452
453 _ => Err(ParseError::ValidationError(format!(
455 "unsupported data type: {:?}",
456 sql_type
457 ))),
458 }
459}
460
461fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
463 if let sqlparser::ast::Expr::Function(func) = expr {
464 if let Some(name) = func.name.0.last() {
465 return name.to_string().eq_ignore_ascii_case("proctime");
466 }
467 }
468 false
469}
470
471fn parse_watermark(
473 wm: &WatermarkDef,
474 columns: &[ColumnDefinition],
475) -> Result<WatermarkSpec, ParseError> {
476 let column_name = wm.column.value.clone();
477
478 let col = columns
480 .iter()
481 .find(|c| c.name == column_name)
482 .ok_or_else(|| {
483 ParseError::ValidationError(format!(
484 "watermark column '{}' not found in column list",
485 column_name
486 ))
487 })?;
488
489 if !matches!(
491 col.data_type,
492 DataType::Timestamp(_, _)
493 | DataType::Date32
494 | DataType::Date64
495 | DataType::Int64
496 | DataType::Int32
497 ) {
498 return Err(ParseError::ValidationError(format!(
499 "watermark column '{}' must be a timestamp or integer type, found {:?}",
500 column_name, col.data_type
501 )));
502 }
503
504 if let Some(expr) = &wm.expression {
506 if is_proctime_call(expr) {
507 return Ok(WatermarkSpec {
508 column: column_name,
509 max_out_of_orderness: Duration::ZERO,
510 is_processing_time: true,
511 });
512 }
513 }
514
515 let max_out_of_orderness = match &wm.expression {
518 Some(expr) => parse_watermark_expression(expr),
519 None => Duration::ZERO,
520 };
521
522 Ok(WatermarkSpec {
523 column: column_name,
524 max_out_of_orderness,
525 is_processing_time: false,
526 })
527}
528
529fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
531 use sqlparser::ast::Expr;
532
533 match expr {
534 Expr::BinaryOp { op, right, .. } => match op {
535 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
536 _ => Duration::ZERO,
537 },
538 Expr::Identifier(_) => Duration::ZERO,
540 _ => Duration::from_secs(1),
542 }
543}
544
545fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
547 use sqlparser::ast::Expr;
548
549 let Expr::Interval(interval) = expr else {
550 return Duration::from_secs(1);
551 };
552
553 let value_str = match interval.value.as_ref() {
555 Expr::Value(v) => {
556 match &v.value {
558 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
559 sqlparser::ast::Value::Number(n, _) => n.clone(),
560 _ => return Duration::from_secs(1),
561 }
562 }
563 _ => return Duration::from_secs(1),
564 };
565
566 let value: u64 = value_str.parse().unwrap_or(1);
567
568 let unit = interval
570 .leading_field
571 .as_ref()
572 .map_or("second", |u| match u {
573 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
574 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
575 sqlparser::ast::DateTimeField::Minute => "minute",
576 sqlparser::ast::DateTimeField::Hour => "hour",
577 sqlparser::ast::DateTimeField::Day => "day",
578 _ => "second",
579 });
580
581 match unit {
582 "microsecond" | "microseconds" => Duration::from_micros(value),
583 "millisecond" | "milliseconds" => Duration::from_millis(value),
584 "minute" | "minutes" => Duration::from_secs(value * 60),
585 "hour" | "hours" => Duration::from_secs(value * 3600),
586 "day" | "days" => Duration::from_secs(value * 86400),
587 _ => Duration::from_secs(value),
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::parser::{parse_streaming_sql, StreamingStatement};
595
596 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
597 let statements = parse_streaming_sql(sql)?;
598 let stmt = statements
599 .into_iter()
600 .next()
601 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
602 match stmt {
603 StreamingStatement::CreateSource(source) => translate_create_source(*source),
604 _ => Err(ParseError::StreamingError(
605 "Expected CREATE SOURCE".to_string(),
606 )),
607 }
608 }
609
610 #[test]
611 fn test_basic_source() {
612 let def =
613 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
614
615 assert_eq!(def.name, "events");
616 assert_eq!(def.columns.len(), 2);
617 assert_eq!(def.columns[0].name, "id");
618 assert_eq!(def.columns[0].data_type, DataType::Int64);
619 assert!(!def.columns[0].nullable);
620 assert_eq!(def.columns[1].name, "name");
621 assert!(def.columns[1].nullable);
622 }
623
624 #[test]
625 fn test_source_with_options() {
626 let def = parse_and_translate(
627 "CREATE SOURCE events (id BIGINT) WITH (
628 'buffer_size' = '4096',
629 'backpressure' = 'reject'
630 )",
631 )
632 .unwrap();
633
634 assert_eq!(def.config.buffer_size, 4096);
635 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
636 }
637
638 #[test]
639 fn test_source_with_watermark() {
640 let def = parse_and_translate(
641 "CREATE SOURCE events (
642 id BIGINT,
643 ts TIMESTAMP,
644 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
645 )",
646 )
647 .unwrap();
648
649 assert!(def.watermark.is_some());
650 let wm = def.watermark.unwrap();
651 assert_eq!(wm.column, "ts");
652 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
653 }
654
655 #[test]
656 fn test_reject_channel_option() {
657 let result =
658 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
659
660 assert!(result.is_err());
661 let err = result.unwrap_err();
662 assert!(err.to_string().contains("channel"));
663 }
664
665 #[test]
666 fn test_reject_type_option() {
667 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
668
669 assert!(result.is_err());
670 }
671
672 #[test]
673 fn test_buffer_size_bounds() {
674 let result =
676 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
677 assert!(result.is_err());
678
679 let result = parse_and_translate(
681 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
682 );
683 assert!(result.is_err());
684
685 let result =
687 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
688 assert!(result.is_ok());
689 }
690
691 #[test]
692 fn test_backpressure_strategies() {
693 assert_eq!(
694 BackpressureStrategy::from_str("block").unwrap(),
695 BackpressureStrategy::Block
696 );
697 assert_eq!(
698 BackpressureStrategy::from_str("drop_oldest").unwrap(),
699 BackpressureStrategy::DropOldest
700 );
701 assert_eq!(
702 BackpressureStrategy::from_str("reject").unwrap(),
703 BackpressureStrategy::Reject
704 );
705 assert!(BackpressureStrategy::from_str("invalid").is_err());
706 }
707
708 #[test]
709 fn test_wait_strategies() {
710 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
711 assert_eq!(
712 WaitStrategy::from_str("spin_yield").unwrap(),
713 WaitStrategy::SpinYield
714 );
715 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
716 assert!(WaitStrategy::from_str("invalid").is_err());
717 }
718
719 #[test]
720 fn test_sql_type_conversions() {
721 let def = parse_and_translate(
722 "CREATE SOURCE types (
723 a TINYINT,
724 b SMALLINT,
725 c INT,
726 d BIGINT,
727 e FLOAT,
728 f DOUBLE,
729 g DECIMAL(10,2),
730 h VARCHAR(255),
731 i TEXT,
732 j BOOLEAN,
733 k TIMESTAMP,
734 l DATE
735 )",
736 )
737 .unwrap();
738
739 assert_eq!(def.columns.len(), 12);
740 assert_eq!(def.columns[0].data_type, DataType::Int8);
741 assert_eq!(def.columns[1].data_type, DataType::Int16);
742 assert_eq!(def.columns[2].data_type, DataType::Int32);
743 assert_eq!(def.columns[3].data_type, DataType::Int64);
744 assert_eq!(def.columns[4].data_type, DataType::Float32);
745 assert_eq!(def.columns[5].data_type, DataType::Float64);
746 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
747 assert_eq!(def.columns[7].data_type, DataType::Utf8);
748 assert_eq!(def.columns[8].data_type, DataType::Utf8);
749 assert_eq!(def.columns[9].data_type, DataType::Boolean);
750 assert!(matches!(
751 def.columns[10].data_type,
752 DataType::Timestamp(_, _)
753 ));
754 assert_eq!(def.columns[11].data_type, DataType::Date32);
755 }
756
757 #[test]
758 fn test_schema_generation() {
759 let def = parse_and_translate(
760 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
761 )
762 .unwrap();
763
764 let schema = def.schema;
765 assert_eq!(schema.fields().len(), 3);
766 assert_eq!(schema.field(0).name(), "id");
767 assert!(!schema.field(0).is_nullable());
768 assert_eq!(schema.field(1).name(), "name");
769 assert!(!schema.field(1).is_nullable());
770 assert_eq!(schema.field(2).name(), "value");
771 assert!(schema.field(2).is_nullable());
772 }
773
774 #[test]
775 fn test_watermark_column_not_found() {
776 let result = parse_and_translate(
777 "CREATE SOURCE events (
778 id BIGINT,
779 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
780 )",
781 );
782
783 assert!(result.is_err());
784 assert!(result.unwrap_err().to_string().contains("not found"));
785 }
786
787 #[test]
788 fn test_watermark_wrong_type() {
789 let result = parse_and_translate(
790 "CREATE SOURCE events (
791 name VARCHAR,
792 WATERMARK FOR name AS name - INTERVAL '1' SECOND
793 )",
794 );
795
796 assert!(result.is_err());
797 assert!(result
798 .unwrap_err()
799 .to_string()
800 .contains("timestamp or integer type"));
801 }
802
803 #[test]
804 fn test_watermark_milliseconds() {
805 let def = parse_and_translate(
806 "CREATE SOURCE events (
807 ts TIMESTAMP,
808 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
809 )",
810 )
811 .unwrap();
812
813 let wm = def.watermark.unwrap();
814 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
815 }
816
817 #[test]
818 fn test_watermark_minutes() {
819 let def = parse_and_translate(
820 "CREATE SOURCE events (
821 ts TIMESTAMP,
822 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
823 )",
824 )
825 .unwrap();
826
827 let wm = def.watermark.unwrap();
828 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
829 }
830
831 #[test]
832 fn test_track_stats_option() {
833 let def =
834 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
835 .unwrap();
836
837 assert!(def.config.track_stats);
838 }
839
840 #[test]
841 fn test_wait_strategy_option() {
842 let def =
843 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
844 .unwrap();
845
846 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
847 }
848
849 #[test]
850 fn test_default_config() {
851 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
852
853 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
854 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
855 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
856 assert!(!def.config.track_stats);
857 }
858
859 #[test]
860 fn test_external_connector_options_ignored() {
861 let def = parse_and_translate(
863 "CREATE SOURCE events (id BIGINT) WITH (
864 'connector' = 'kafka',
865 'topic' = 'events',
866 'bootstrap.servers' = 'localhost:9092',
867 'buffer_size' = '8192'
868 )",
869 )
870 .unwrap();
871
872 assert_eq!(def.config.buffer_size, 8192);
874 }
875
876 #[test]
877 fn test_source_watermark_no_expression() {
878 let def = parse_and_translate(
879 "CREATE SOURCE events (
880 ts TIMESTAMP,
881 WATERMARK FOR ts
882 )",
883 )
884 .unwrap();
885
886 assert!(def.watermark.is_some());
887 let wm = def.watermark.unwrap();
888 assert_eq!(wm.column, "ts");
889 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
890 }
891
892 #[test]
893 fn test_source_watermark_bigint_column() {
894 let def = parse_and_translate(
895 "CREATE SOURCE events (
896 ts BIGINT,
897 WATERMARK FOR ts
898 )",
899 )
900 .unwrap();
901
902 assert!(def.watermark.is_some());
903 let wm = def.watermark.unwrap();
904 assert_eq!(wm.column, "ts");
905 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
906 }
907
908 #[test]
909 fn test_watermark_proctime() {
910 let def = parse_and_translate(
911 "CREATE SOURCE events (
912 ts TIMESTAMP,
913 WATERMARK FOR ts AS PROCTIME()
914 )",
915 )
916 .unwrap();
917
918 assert!(def.watermark.is_some());
919 let wm = def.watermark.unwrap();
920 assert_eq!(wm.column, "ts");
921 assert!(wm.is_processing_time);
922 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
923 }
924
925 #[test]
926 fn test_watermark_event_time_not_proctime() {
927 let def = parse_and_translate(
928 "CREATE SOURCE events (
929 ts TIMESTAMP,
930 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
931 )",
932 )
933 .unwrap();
934
935 let wm = def.watermark.unwrap();
936 assert!(!wm.is_processing_time);
937 }
938}