1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use laminar_core::streaming::config::{
46 BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
47};
48
49use crate::parser::ParseError;
50use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
51
52#[derive(Debug, Clone)]
54pub struct WatermarkSpec {
55 pub column: String,
57 pub max_out_of_orderness: Duration,
59 pub is_processing_time: bool,
64}
65
66#[derive(Debug, Clone)]
68pub struct SourceConfigOptions {
69 pub buffer_size: usize,
71 pub backpressure: BackpressureStrategy,
73 pub wait_strategy: WaitStrategy,
75 pub track_stats: bool,
77}
78
79impl Default for SourceConfigOptions {
80 fn default() -> Self {
81 Self {
82 buffer_size: DEFAULT_BUFFER_SIZE,
83 backpressure: BackpressureStrategy::Block,
84 wait_strategy: WaitStrategy::SpinYield,
85 track_stats: false,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ColumnDefinition {
93 pub name: String,
95 pub data_type: DataType,
97 pub nullable: bool,
99}
100
101#[derive(Debug, Clone)]
106pub struct SourceDefinition {
107 pub name: String,
109 pub columns: Vec<ColumnDefinition>,
111 pub schema: SchemaRef,
113 pub watermark: Option<WatermarkSpec>,
115 pub config: SourceConfigOptions,
117}
118
119impl TryFrom<CreateSourceStatement> for SourceDefinition {
120 type Error = ParseError;
121
122 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
123 translate_create_source(stmt)
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct SinkDefinition {
130 pub name: String,
132 pub input: String,
134 pub config: SourceConfigOptions,
136}
137
138impl TryFrom<CreateSinkStatement> for SinkDefinition {
139 type Error = ParseError;
140
141 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
142 translate_create_sink(stmt)
143 }
144}
145
146pub fn translate_create_source(
155 stmt: CreateSourceStatement,
156) -> Result<SourceDefinition, ParseError> {
157 validate_source_options(&stmt.with_options)?;
159
160 let config = parse_source_options(&stmt.with_options)?;
162
163 let columns = convert_columns(&stmt.columns)?;
165
166 let fields: Vec<Field> = columns
168 .iter()
169 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
170 .collect();
171 let schema = Arc::new(Schema::new(fields));
172
173 let watermark = if let Some(wm) = stmt.watermark {
175 Some(parse_watermark(&wm, &columns)?)
176 } else {
177 None
178 };
179
180 Ok(SourceDefinition {
181 name: stmt.name.to_string(),
182 columns,
183 schema,
184 watermark,
185 config,
186 })
187}
188
189pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
197 validate_source_options(&stmt.with_options)?;
199
200 let config = parse_source_options(&stmt.with_options)?;
202
203 let input = match stmt.from {
205 SinkFrom::Table(name) => name.to_string(),
206 SinkFrom::Query(_) => {
207 return Err(ParseError::ValidationError(
209 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
210 ));
211 }
212 };
213
214 Ok(SinkDefinition {
215 name: stmt.name.to_string(),
216 input,
217 config,
218 })
219}
220
221fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
223 if options.contains_key("channel") {
225 return Err(ParseError::ValidationError(
226 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
227 ));
228 }
229
230 if options.contains_key("type") {
232 return Err(ParseError::ValidationError(
233 "the 'type' option is not user-configurable for in-memory streaming sources"
234 .to_string(),
235 ));
236 }
237
238 Ok(())
239}
240
241fn parse_source_options(
243 options: &HashMap<String, String>,
244) -> Result<SourceConfigOptions, ParseError> {
245 let mut config = SourceConfigOptions::default();
246
247 for (key, value) in options {
248 match key.to_lowercase().as_str() {
249 "buffer_size" | "buffersize" => {
250 config.buffer_size = parse_buffer_size(value)?;
251 }
252 "backpressure" => {
253 config.backpressure =
254 BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
255 }
256 "wait_strategy" | "waitstrategy" => {
257 config.wait_strategy =
258 WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
259 }
260 "track_stats" | "trackstats" | "stats" => {
261 config.track_stats = parse_bool(value)?;
262 }
263 _ => {}
267 }
268 }
269
270 Ok(config)
271}
272
273fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
275 let size: usize = value.parse().map_err(|_| {
276 ParseError::ValidationError(format!(
277 "invalid buffer_size: '{}' - must be a number",
278 value
279 ))
280 })?;
281
282 if size < MIN_BUFFER_SIZE {
283 return Err(ParseError::ValidationError(format!(
284 "buffer_size {} is too small - minimum is {}",
285 size, MIN_BUFFER_SIZE
286 )));
287 }
288
289 if size > MAX_BUFFER_SIZE {
290 return Err(ParseError::ValidationError(format!(
291 "buffer_size {} is too large - maximum is {}",
292 size, MAX_BUFFER_SIZE
293 )));
294 }
295
296 Ok(size)
297}
298
299fn parse_bool(value: &str) -> Result<bool, ParseError> {
301 match value.to_lowercase().as_str() {
302 "true" | "yes" | "on" | "1" => Ok(true),
303 "false" | "no" | "off" | "0" => Ok(false),
304 _ => Err(ParseError::ValidationError(format!(
305 "invalid boolean value: '{}' - expected true/false",
306 value
307 ))),
308 }
309}
310
311fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
313 columns.iter().map(convert_column).collect()
314}
315
316fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
318 let data_type = sql_type_to_arrow(&col.data_type)?;
319
320 let nullable = !col
322 .options
323 .iter()
324 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
325
326 Ok(ColumnDefinition {
327 name: col.name.value.clone(),
328 data_type,
329 nullable,
330 })
331}
332
333pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
339 match sql_type {
340 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
342 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
343 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
344 SqlDataType::BigInt(_) => Ok(DataType::Int64),
345
346 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
351 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
352
353 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
355 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
357 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
358 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
359 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
361 Ok(DataType::Decimal128(precision, scale))
362 }
363
364 SqlDataType::Char(_)
366 | SqlDataType::Character(_)
367 | SqlDataType::Varchar(_)
368 | SqlDataType::CharacterVarying(_)
369 | SqlDataType::Text
370 | SqlDataType::String(_)
371 | SqlDataType::JSON
372 | SqlDataType::JSONB
373 | SqlDataType::Uuid => Ok(DataType::Utf8),
374
375 SqlDataType::Binary(_)
377 | SqlDataType::Varbinary(_)
378 | SqlDataType::Blob(_)
379 | SqlDataType::Bytea => Ok(DataType::Binary),
380
381 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
383
384 SqlDataType::Date => Ok(DataType::Date32),
386 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
387 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
388
389 SqlDataType::Interval { .. } => Ok(DataType::Interval(
391 arrow::datatypes::IntervalUnit::MonthDayNano,
392 )),
393
394 SqlDataType::Array(elem_def) => {
396 let item_type = match elem_def {
397 sqlparser::ast::ArrayElemTypeDef::AngleBracket(t)
398 | sqlparser::ast::ArrayElemTypeDef::SquareBracket(t, _)
399 | sqlparser::ast::ArrayElemTypeDef::Parenthesis(t) => sql_type_to_arrow(t)?,
400 sqlparser::ast::ArrayElemTypeDef::None => {
401 return Err(ParseError::ValidationError(
402 "ARRAY type requires element type, e.g. ARRAY<INT>".into(),
403 ));
404 }
405 };
406 Ok(DataType::List(Arc::new(Field::new(
407 "item", item_type, true,
408 ))))
409 }
410
411 SqlDataType::Map(key_type, value_type) => {
413 let key_dt = sql_type_to_arrow(key_type)?;
414 let value_dt = sql_type_to_arrow(value_type)?;
415 Ok(DataType::Map(
416 Arc::new(Field::new(
417 "entries",
418 DataType::Struct(Fields::from(vec![
419 Field::new("key", key_dt, false),
420 Field::new("value", value_dt, true),
421 ])),
422 false,
423 )),
424 false,
425 ))
426 }
427
428 _ => Err(ParseError::ValidationError(format!(
430 "unsupported data type: {:?}",
431 sql_type
432 ))),
433 }
434}
435
436fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
438 if let sqlparser::ast::Expr::Function(func) = expr {
439 if let Some(name) = func.name.0.last() {
440 return name.to_string().eq_ignore_ascii_case("proctime");
441 }
442 }
443 false
444}
445
446fn parse_watermark(
448 wm: &WatermarkDef,
449 columns: &[ColumnDefinition],
450) -> Result<WatermarkSpec, ParseError> {
451 let column_name = wm.column.value.clone();
452
453 let col = columns
455 .iter()
456 .find(|c| c.name == column_name)
457 .ok_or_else(|| {
458 ParseError::ValidationError(format!(
459 "watermark column '{}' not found in column list",
460 column_name
461 ))
462 })?;
463
464 if !matches!(
466 col.data_type,
467 DataType::Timestamp(_, _)
468 | DataType::Date32
469 | DataType::Date64
470 | DataType::Int64
471 | DataType::Int32
472 ) {
473 return Err(ParseError::ValidationError(format!(
474 "watermark column '{}' must be a timestamp or integer type, found {:?}",
475 column_name, col.data_type
476 )));
477 }
478
479 if let Some(expr) = &wm.expression {
481 if is_proctime_call(expr) {
482 return Ok(WatermarkSpec {
483 column: column_name,
484 max_out_of_orderness: Duration::ZERO,
485 is_processing_time: true,
486 });
487 }
488 }
489
490 let max_out_of_orderness = match &wm.expression {
493 Some(expr) => parse_watermark_expression(expr),
494 None => Duration::ZERO,
495 };
496
497 Ok(WatermarkSpec {
498 column: column_name,
499 max_out_of_orderness,
500 is_processing_time: false,
501 })
502}
503
504fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
506 use sqlparser::ast::Expr;
507
508 match expr {
509 Expr::BinaryOp { op, right, .. } => match op {
510 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
511 _ => Duration::ZERO,
512 },
513 Expr::Identifier(_) => Duration::ZERO,
515 _ => Duration::from_secs(1),
517 }
518}
519
520fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
522 use sqlparser::ast::Expr;
523
524 let Expr::Interval(interval) = expr else {
525 return Duration::from_secs(1);
526 };
527
528 let value_str = match interval.value.as_ref() {
530 Expr::Value(v) => {
531 match &v.value {
533 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
534 sqlparser::ast::Value::Number(n, _) => n.clone(),
535 _ => return Duration::from_secs(1),
536 }
537 }
538 _ => return Duration::from_secs(1),
539 };
540
541 let value: u64 = value_str.parse().unwrap_or(1);
542
543 let unit = interval
545 .leading_field
546 .as_ref()
547 .map_or("second", |u| match u {
548 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
549 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
550 sqlparser::ast::DateTimeField::Minute => "minute",
551 sqlparser::ast::DateTimeField::Hour => "hour",
552 sqlparser::ast::DateTimeField::Day => "day",
553 _ => "second",
554 });
555
556 match unit {
557 "microsecond" | "microseconds" => Duration::from_micros(value),
558 "millisecond" | "milliseconds" => Duration::from_millis(value),
559 "minute" | "minutes" => Duration::from_secs(value * 60),
560 "hour" | "hours" => Duration::from_secs(value * 3600),
561 "day" | "days" => Duration::from_secs(value * 86400),
562 _ => Duration::from_secs(value),
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::parser::{parse_streaming_sql, StreamingStatement};
570
571 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
572 let statements = parse_streaming_sql(sql)?;
573 let stmt = statements
574 .into_iter()
575 .next()
576 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
577 match stmt {
578 StreamingStatement::CreateSource(source) => translate_create_source(*source),
579 _ => Err(ParseError::StreamingError(
580 "Expected CREATE SOURCE".to_string(),
581 )),
582 }
583 }
584
585 #[test]
586 fn test_basic_source() {
587 let def =
588 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
589
590 assert_eq!(def.name, "events");
591 assert_eq!(def.columns.len(), 2);
592 assert_eq!(def.columns[0].name, "id");
593 assert_eq!(def.columns[0].data_type, DataType::Int64);
594 assert!(!def.columns[0].nullable);
595 assert_eq!(def.columns[1].name, "name");
596 assert!(def.columns[1].nullable);
597 }
598
599 #[test]
600 fn test_source_with_options() {
601 let def = parse_and_translate(
602 "CREATE SOURCE events (id BIGINT) WITH (
603 'buffer_size' = '4096',
604 'backpressure' = 'reject'
605 )",
606 )
607 .unwrap();
608
609 assert_eq!(def.config.buffer_size, 4096);
610 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
611 }
612
613 #[test]
614 fn test_source_with_watermark() {
615 let def = parse_and_translate(
616 "CREATE SOURCE events (
617 id BIGINT,
618 ts TIMESTAMP,
619 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
620 )",
621 )
622 .unwrap();
623
624 assert!(def.watermark.is_some());
625 let wm = def.watermark.unwrap();
626 assert_eq!(wm.column, "ts");
627 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
628 }
629
630 #[test]
631 fn test_reject_channel_option() {
632 let result =
633 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
634
635 assert!(result.is_err());
636 let err = result.unwrap_err();
637 assert!(err.to_string().contains("channel"));
638 }
639
640 #[test]
641 fn test_reject_type_option() {
642 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
643
644 assert!(result.is_err());
645 }
646
647 #[test]
648 fn test_buffer_size_bounds() {
649 let result =
651 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
652 assert!(result.is_err());
653
654 let result = parse_and_translate(
656 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
657 );
658 assert!(result.is_err());
659
660 let result =
662 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
663 assert!(result.is_ok());
664 }
665
666 #[test]
667 fn test_backpressure_strategies() {
668 assert_eq!(
669 BackpressureStrategy::from_str("block").unwrap(),
670 BackpressureStrategy::Block
671 );
672 assert_eq!(
673 BackpressureStrategy::from_str("drop_oldest").unwrap(),
674 BackpressureStrategy::DropOldest
675 );
676 assert_eq!(
677 BackpressureStrategy::from_str("reject").unwrap(),
678 BackpressureStrategy::Reject
679 );
680 assert!(BackpressureStrategy::from_str("invalid").is_err());
681 }
682
683 #[test]
684 fn test_wait_strategies() {
685 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
686 assert_eq!(
687 WaitStrategy::from_str("spin_yield").unwrap(),
688 WaitStrategy::SpinYield
689 );
690 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
691 assert!(WaitStrategy::from_str("invalid").is_err());
692 }
693
694 #[test]
695 fn test_sql_type_conversions() {
696 let def = parse_and_translate(
697 "CREATE SOURCE types (
698 a TINYINT,
699 b SMALLINT,
700 c INT,
701 d BIGINT,
702 e FLOAT,
703 f DOUBLE,
704 g DECIMAL(10,2),
705 h VARCHAR(255),
706 i TEXT,
707 j BOOLEAN,
708 k TIMESTAMP,
709 l DATE
710 )",
711 )
712 .unwrap();
713
714 assert_eq!(def.columns.len(), 12);
715 assert_eq!(def.columns[0].data_type, DataType::Int8);
716 assert_eq!(def.columns[1].data_type, DataType::Int16);
717 assert_eq!(def.columns[2].data_type, DataType::Int32);
718 assert_eq!(def.columns[3].data_type, DataType::Int64);
719 assert_eq!(def.columns[4].data_type, DataType::Float32);
720 assert_eq!(def.columns[5].data_type, DataType::Float64);
721 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
722 assert_eq!(def.columns[7].data_type, DataType::Utf8);
723 assert_eq!(def.columns[8].data_type, DataType::Utf8);
724 assert_eq!(def.columns[9].data_type, DataType::Boolean);
725 assert!(matches!(
726 def.columns[10].data_type,
727 DataType::Timestamp(_, _)
728 ));
729 assert_eq!(def.columns[11].data_type, DataType::Date32);
730 }
731
732 #[test]
733 fn test_schema_generation() {
734 let def = parse_and_translate(
735 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
736 )
737 .unwrap();
738
739 let schema = def.schema;
740 assert_eq!(schema.fields().len(), 3);
741 assert_eq!(schema.field(0).name(), "id");
742 assert!(!schema.field(0).is_nullable());
743 assert_eq!(schema.field(1).name(), "name");
744 assert!(!schema.field(1).is_nullable());
745 assert_eq!(schema.field(2).name(), "value");
746 assert!(schema.field(2).is_nullable());
747 }
748
749 #[test]
750 fn test_watermark_column_not_found() {
751 let result = parse_and_translate(
752 "CREATE SOURCE events (
753 id BIGINT,
754 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
755 )",
756 );
757
758 assert!(result.is_err());
759 assert!(result.unwrap_err().to_string().contains("not found"));
760 }
761
762 #[test]
763 fn test_watermark_wrong_type() {
764 let result = parse_and_translate(
765 "CREATE SOURCE events (
766 name VARCHAR,
767 WATERMARK FOR name AS name - INTERVAL '1' SECOND
768 )",
769 );
770
771 assert!(result.is_err());
772 assert!(result
773 .unwrap_err()
774 .to_string()
775 .contains("timestamp or integer type"));
776 }
777
778 #[test]
779 fn test_watermark_milliseconds() {
780 let def = parse_and_translate(
781 "CREATE SOURCE events (
782 ts TIMESTAMP,
783 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
784 )",
785 )
786 .unwrap();
787
788 let wm = def.watermark.unwrap();
789 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
790 }
791
792 #[test]
793 fn test_watermark_minutes() {
794 let def = parse_and_translate(
795 "CREATE SOURCE events (
796 ts TIMESTAMP,
797 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
798 )",
799 )
800 .unwrap();
801
802 let wm = def.watermark.unwrap();
803 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
804 }
805
806 #[test]
807 fn test_track_stats_option() {
808 let def =
809 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
810 .unwrap();
811
812 assert!(def.config.track_stats);
813 }
814
815 #[test]
816 fn test_wait_strategy_option() {
817 let def =
818 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
819 .unwrap();
820
821 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
822 }
823
824 #[test]
825 fn test_default_config() {
826 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
827
828 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
829 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
830 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
831 assert!(!def.config.track_stats);
832 }
833
834 #[test]
835 fn test_external_connector_options_ignored() {
836 let def = parse_and_translate(
838 "CREATE SOURCE events (id BIGINT) WITH (
839 'connector' = 'kafka',
840 'topic' = 'events',
841 'bootstrap.servers' = 'localhost:9092',
842 'buffer_size' = '8192'
843 )",
844 )
845 .unwrap();
846
847 assert_eq!(def.config.buffer_size, 8192);
849 }
850
851 #[test]
852 fn test_source_watermark_no_expression() {
853 let def = parse_and_translate(
854 "CREATE SOURCE events (
855 ts TIMESTAMP,
856 WATERMARK FOR ts
857 )",
858 )
859 .unwrap();
860
861 assert!(def.watermark.is_some());
862 let wm = def.watermark.unwrap();
863 assert_eq!(wm.column, "ts");
864 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
865 }
866
867 #[test]
868 fn test_source_watermark_bigint_column() {
869 let def = parse_and_translate(
870 "CREATE SOURCE events (
871 ts BIGINT,
872 WATERMARK FOR ts
873 )",
874 )
875 .unwrap();
876
877 assert!(def.watermark.is_some());
878 let wm = def.watermark.unwrap();
879 assert_eq!(wm.column, "ts");
880 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
881 }
882
883 #[test]
884 fn test_watermark_proctime() {
885 let def = parse_and_translate(
886 "CREATE SOURCE events (
887 ts TIMESTAMP,
888 WATERMARK FOR ts AS PROCTIME()
889 )",
890 )
891 .unwrap();
892
893 assert!(def.watermark.is_some());
894 let wm = def.watermark.unwrap();
895 assert_eq!(wm.column, "ts");
896 assert!(wm.is_processing_time);
897 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
898 }
899
900 #[test]
901 fn test_watermark_event_time_not_proctime() {
902 let def = parse_and_translate(
903 "CREATE SOURCE events (
904 ts TIMESTAMP,
905 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
906 )",
907 )
908 .unwrap();
909
910 let wm = def.watermark.unwrap();
911 assert!(!wm.is_processing_time);
912 }
913}