1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
38use std::str::FromStr;
39use std::sync::Arc;
40use std::time::Duration;
41
42use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
43use sqlparser::ast::{ColumnDef, DataType as SqlDataType};
44
45use laminar_core::streaming::config::{
46 BackpressureStrategy, WaitStrategy, DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE,
47};
48
49use crate::parser::ParseError;
50use crate::parser::{CreateSinkStatement, CreateSourceStatement, SinkFrom, WatermarkDef};
51
52#[derive(Debug, Clone)]
54pub struct WatermarkSpec {
55 pub column: String,
57 pub max_out_of_orderness: Duration,
59 pub is_processing_time: bool,
64}
65
66#[derive(Debug, Clone)]
68pub struct SourceConfigOptions {
69 pub buffer_size: usize,
71 pub backpressure: BackpressureStrategy,
73 pub wait_strategy: WaitStrategy,
75 pub track_stats: bool,
77}
78
79impl Default for SourceConfigOptions {
80 fn default() -> Self {
81 Self {
82 buffer_size: DEFAULT_BUFFER_SIZE,
83 backpressure: BackpressureStrategy::Block,
84 wait_strategy: WaitStrategy::SpinYield,
85 track_stats: false,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ColumnDefinition {
93 pub name: String,
95 pub data_type: DataType,
97 pub nullable: bool,
99}
100
101#[derive(Debug, Clone)]
106pub struct SourceDefinition {
107 pub name: String,
109 pub columns: Vec<ColumnDefinition>,
111 pub schema: SchemaRef,
113 pub watermark: Option<WatermarkSpec>,
115 pub config: SourceConfigOptions,
117}
118
119impl TryFrom<CreateSourceStatement> for SourceDefinition {
120 type Error = ParseError;
121
122 fn try_from(stmt: CreateSourceStatement) -> Result<Self, Self::Error> {
123 translate_create_source(stmt)
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct SinkDefinition {
130 pub name: String,
132 pub input: String,
134 pub config: SourceConfigOptions,
136}
137
138impl TryFrom<CreateSinkStatement> for SinkDefinition {
139 type Error = ParseError;
140
141 fn try_from(stmt: CreateSinkStatement) -> Result<Self, Self::Error> {
142 translate_create_sink(stmt)
143 }
144}
145
146pub fn translate_create_source(
155 stmt: CreateSourceStatement,
156) -> Result<SourceDefinition, ParseError> {
157 validate_source_options(&stmt.with_options)?;
159
160 let config = parse_source_options(&stmt.with_options)?;
162
163 let columns = convert_columns(&stmt.columns)?;
165
166 let fields: Vec<Field> = columns
168 .iter()
169 .map(|col| Field::new(&col.name, col.data_type.clone(), col.nullable))
170 .collect();
171 let schema = Arc::new(Schema::new(fields));
172
173 let watermark = if let Some(wm) = stmt.watermark {
175 Some(parse_watermark(&wm, &columns)?)
176 } else {
177 None
178 };
179
180 Ok(SourceDefinition {
181 name: stmt.name.to_string(),
182 columns,
183 schema,
184 watermark,
185 config,
186 })
187}
188
189pub fn translate_create_sink(stmt: CreateSinkStatement) -> Result<SinkDefinition, ParseError> {
197 validate_source_options(&stmt.with_options)?;
199
200 let config = parse_source_options(&stmt.with_options)?;
202
203 let input = match stmt.from {
205 SinkFrom::Table(name) => name.to_string(),
206 SinkFrom::Query(_) => {
207 return Err(ParseError::ValidationError(
209 "inline queries not yet supported in CREATE SINK - use a view".to_string(),
210 ));
211 }
212 };
213
214 Ok(SinkDefinition {
215 name: stmt.name.to_string(),
216 input,
217 config,
218 })
219}
220
221fn validate_source_options(options: &HashMap<String, String>) -> Result<(), ParseError> {
223 if options.contains_key("channel") {
225 return Err(ParseError::ValidationError(
226 "the 'channel' option is not user-configurable - channel type is automatically derived from usage patterns".to_string(),
227 ));
228 }
229
230 if options.contains_key("type") {
232 return Err(ParseError::ValidationError(
233 "the 'type' option is not user-configurable for in-memory streaming sources"
234 .to_string(),
235 ));
236 }
237
238 Ok(())
239}
240
241fn parse_source_options(
243 options: &HashMap<String, String>,
244) -> Result<SourceConfigOptions, ParseError> {
245 let mut config = SourceConfigOptions::default();
246
247 for (key, value) in options {
248 match key.to_lowercase().as_str() {
249 "buffer_size" | "buffersize" => {
250 config.buffer_size = parse_buffer_size(value)?;
251 }
252 "backpressure" => {
253 config.backpressure =
254 BackpressureStrategy::from_str(value).map_err(ParseError::ValidationError)?;
255 }
256 "wait_strategy" | "waitstrategy" => {
257 config.wait_strategy =
258 WaitStrategy::from_str(value).map_err(ParseError::ValidationError)?;
259 }
260 "track_stats" | "trackstats" | "stats" => {
261 config.track_stats = parse_bool(value)?;
262 }
263 _ => {}
267 }
268 }
269
270 Ok(config)
271}
272
273fn parse_buffer_size(value: &str) -> Result<usize, ParseError> {
275 let size: usize = value.parse().map_err(|_| {
276 ParseError::ValidationError(format!(
277 "invalid buffer_size: '{}' - must be a number",
278 value
279 ))
280 })?;
281
282 if size < MIN_BUFFER_SIZE {
283 return Err(ParseError::ValidationError(format!(
284 "buffer_size {} is too small - minimum is {}",
285 size, MIN_BUFFER_SIZE
286 )));
287 }
288
289 if size > MAX_BUFFER_SIZE {
290 return Err(ParseError::ValidationError(format!(
291 "buffer_size {} is too large - maximum is {}",
292 size, MAX_BUFFER_SIZE
293 )));
294 }
295
296 Ok(size)
297}
298
299fn parse_bool(value: &str) -> Result<bool, ParseError> {
301 match value.to_lowercase().as_str() {
302 "true" | "yes" | "on" | "1" => Ok(true),
303 "false" | "no" | "off" | "0" => Ok(false),
304 _ => Err(ParseError::ValidationError(format!(
305 "invalid boolean value: '{}' - expected true/false",
306 value
307 ))),
308 }
309}
310
311fn convert_columns(columns: &[ColumnDef]) -> Result<Vec<ColumnDefinition>, ParseError> {
313 columns.iter().map(convert_column).collect()
314}
315
316fn convert_column(col: &ColumnDef) -> Result<ColumnDefinition, ParseError> {
318 let data_type = sql_type_to_arrow(&col.data_type)?;
319
320 let nullable = !col
322 .options
323 .iter()
324 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
325
326 Ok(ColumnDefinition {
327 name: col.name.value.clone(),
328 data_type,
329 nullable,
330 })
331}
332
333pub fn sql_type_to_arrow(sql_type: &SqlDataType) -> Result<DataType, ParseError> {
339 match sql_type {
340 SqlDataType::TinyInt(_) => Ok(DataType::Int8),
342 SqlDataType::SmallInt(_) => Ok(DataType::Int16),
343 SqlDataType::Int(_) | SqlDataType::Integer(_) => Ok(DataType::Int32),
344 SqlDataType::BigInt(_) => Ok(DataType::Int64),
345
346 SqlDataType::Float(_) | SqlDataType::Real => Ok(DataType::Float32),
351 SqlDataType::Double(_) | SqlDataType::DoublePrecision => Ok(DataType::Float64),
352
353 SqlDataType::Decimal(info) | SqlDataType::Numeric(info) => {
355 #[allow(clippy::cast_possible_truncation)] let (precision, scale) = match info {
357 sqlparser::ast::ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8),
358 sqlparser::ast::ExactNumberInfo::Precision(p) => (*p as u8, 0),
359 sqlparser::ast::ExactNumberInfo::None => (38, 9), };
361 Ok(DataType::Decimal128(precision, scale))
362 }
363
364 SqlDataType::Char(_)
366 | SqlDataType::Character(_)
367 | SqlDataType::Varchar(_)
368 | SqlDataType::CharacterVarying(_)
369 | SqlDataType::Text
370 | SqlDataType::String(_)
371 | SqlDataType::JSON
372 | SqlDataType::JSONB
373 | SqlDataType::Uuid => Ok(DataType::Utf8),
374
375 SqlDataType::Binary(_)
377 | SqlDataType::Varbinary(_)
378 | SqlDataType::Blob(_)
379 | SqlDataType::Bytea => Ok(DataType::Binary),
380
381 SqlDataType::Boolean | SqlDataType::Bool => Ok(DataType::Boolean),
383
384 SqlDataType::Date => Ok(DataType::Date32),
386 SqlDataType::Time(_, _) => Ok(DataType::Time64(TimeUnit::Microsecond)),
387 SqlDataType::Timestamp(_, _) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)),
388
389 SqlDataType::Interval { .. } => Ok(DataType::Interval(
391 arrow::datatypes::IntervalUnit::MonthDayNano,
392 )),
393
394 _ => Err(ParseError::ValidationError(format!(
396 "unsupported data type: {:?}",
397 sql_type
398 ))),
399 }
400}
401
402fn is_proctime_call(expr: &sqlparser::ast::Expr) -> bool {
404 if let sqlparser::ast::Expr::Function(func) = expr {
405 if let Some(name) = func.name.0.last() {
406 return name.to_string().eq_ignore_ascii_case("proctime");
407 }
408 }
409 false
410}
411
412fn parse_watermark(
414 wm: &WatermarkDef,
415 columns: &[ColumnDefinition],
416) -> Result<WatermarkSpec, ParseError> {
417 let column_name = wm.column.value.clone();
418
419 let col = columns
421 .iter()
422 .find(|c| c.name == column_name)
423 .ok_or_else(|| {
424 ParseError::ValidationError(format!(
425 "watermark column '{}' not found in column list",
426 column_name
427 ))
428 })?;
429
430 if !matches!(
432 col.data_type,
433 DataType::Timestamp(_, _)
434 | DataType::Date32
435 | DataType::Date64
436 | DataType::Int64
437 | DataType::Int32
438 ) {
439 return Err(ParseError::ValidationError(format!(
440 "watermark column '{}' must be a timestamp or integer type, found {:?}",
441 column_name, col.data_type
442 )));
443 }
444
445 if let Some(expr) = &wm.expression {
447 if is_proctime_call(expr) {
448 return Ok(WatermarkSpec {
449 column: column_name,
450 max_out_of_orderness: Duration::ZERO,
451 is_processing_time: true,
452 });
453 }
454 }
455
456 let max_out_of_orderness = match &wm.expression {
459 Some(expr) => parse_watermark_expression(expr),
460 None => Duration::ZERO,
461 };
462
463 Ok(WatermarkSpec {
464 column: column_name,
465 max_out_of_orderness,
466 is_processing_time: false,
467 })
468}
469
470fn parse_watermark_expression(expr: &sqlparser::ast::Expr) -> Duration {
472 use sqlparser::ast::Expr;
473
474 match expr {
475 Expr::BinaryOp { op, right, .. } => match op {
476 sqlparser::ast::BinaryOperator::Minus => parse_interval_expr(right),
477 _ => Duration::ZERO,
478 },
479 Expr::Identifier(_) => Duration::ZERO,
481 _ => Duration::from_secs(1),
483 }
484}
485
486fn parse_interval_expr(expr: &sqlparser::ast::Expr) -> Duration {
488 use sqlparser::ast::Expr;
489
490 let Expr::Interval(interval) = expr else {
491 return Duration::from_secs(1);
492 };
493
494 let value_str = match interval.value.as_ref() {
496 Expr::Value(v) => {
497 match &v.value {
499 sqlparser::ast::Value::SingleQuotedString(s) => s.clone(),
500 sqlparser::ast::Value::Number(n, _) => n.clone(),
501 _ => return Duration::from_secs(1),
502 }
503 }
504 _ => return Duration::from_secs(1),
505 };
506
507 let value: u64 = value_str.parse().unwrap_or(1);
508
509 let unit = interval
511 .leading_field
512 .as_ref()
513 .map_or("second", |u| match u {
514 sqlparser::ast::DateTimeField::Microsecond => "microsecond",
515 sqlparser::ast::DateTimeField::Millisecond => "millisecond",
516 sqlparser::ast::DateTimeField::Minute => "minute",
517 sqlparser::ast::DateTimeField::Hour => "hour",
518 sqlparser::ast::DateTimeField::Day => "day",
519 _ => "second",
520 });
521
522 match unit {
523 "microsecond" | "microseconds" => Duration::from_micros(value),
524 "millisecond" | "milliseconds" => Duration::from_millis(value),
525 "minute" | "minutes" => Duration::from_secs(value * 60),
526 "hour" | "hours" => Duration::from_secs(value * 3600),
527 "day" | "days" => Duration::from_secs(value * 86400),
528 _ => Duration::from_secs(value),
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535 use crate::parser::{parse_streaming_sql, StreamingStatement};
536
537 fn parse_and_translate(sql: &str) -> Result<SourceDefinition, ParseError> {
538 let statements = parse_streaming_sql(sql)?;
539 let stmt = statements
540 .into_iter()
541 .next()
542 .ok_or_else(|| ParseError::StreamingError("No statement found".to_string()))?;
543 match stmt {
544 StreamingStatement::CreateSource(source) => translate_create_source(*source),
545 _ => Err(ParseError::StreamingError(
546 "Expected CREATE SOURCE".to_string(),
547 )),
548 }
549 }
550
551 #[test]
552 fn test_basic_source() {
553 let def =
554 parse_and_translate("CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR)").unwrap();
555
556 assert_eq!(def.name, "events");
557 assert_eq!(def.columns.len(), 2);
558 assert_eq!(def.columns[0].name, "id");
559 assert_eq!(def.columns[0].data_type, DataType::Int64);
560 assert!(!def.columns[0].nullable);
561 assert_eq!(def.columns[1].name, "name");
562 assert!(def.columns[1].nullable);
563 }
564
565 #[test]
566 fn test_source_with_options() {
567 let def = parse_and_translate(
568 "CREATE SOURCE events (id BIGINT) WITH (
569 'buffer_size' = '4096',
570 'backpressure' = 'reject'
571 )",
572 )
573 .unwrap();
574
575 assert_eq!(def.config.buffer_size, 4096);
576 assert_eq!(def.config.backpressure, BackpressureStrategy::Reject);
577 }
578
579 #[test]
580 fn test_source_with_watermark() {
581 let def = parse_and_translate(
582 "CREATE SOURCE events (
583 id BIGINT,
584 ts TIMESTAMP,
585 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
586 )",
587 )
588 .unwrap();
589
590 assert!(def.watermark.is_some());
591 let wm = def.watermark.unwrap();
592 assert_eq!(wm.column, "ts");
593 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(5));
594 }
595
596 #[test]
597 fn test_reject_channel_option() {
598 let result =
599 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('channel' = 'mpsc')");
600
601 assert!(result.is_err());
602 let err = result.unwrap_err();
603 assert!(err.to_string().contains("channel"));
604 }
605
606 #[test]
607 fn test_reject_type_option() {
608 let result = parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('type' = 'spsc')");
609
610 assert!(result.is_err());
611 }
612
613 #[test]
614 fn test_buffer_size_bounds() {
615 let result =
617 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1')");
618 assert!(result.is_err());
619
620 let result = parse_and_translate(
622 "CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '999999999')",
623 );
624 assert!(result.is_err());
625
626 let result =
628 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('buffer_size' = '1024')");
629 assert!(result.is_ok());
630 }
631
632 #[test]
633 fn test_backpressure_strategies() {
634 assert_eq!(
635 BackpressureStrategy::from_str("block").unwrap(),
636 BackpressureStrategy::Block
637 );
638 assert_eq!(
639 BackpressureStrategy::from_str("drop_oldest").unwrap(),
640 BackpressureStrategy::DropOldest
641 );
642 assert_eq!(
643 BackpressureStrategy::from_str("reject").unwrap(),
644 BackpressureStrategy::Reject
645 );
646 assert!(BackpressureStrategy::from_str("invalid").is_err());
647 }
648
649 #[test]
650 fn test_wait_strategies() {
651 assert_eq!(WaitStrategy::from_str("spin").unwrap(), WaitStrategy::Spin);
652 assert_eq!(
653 WaitStrategy::from_str("spin_yield").unwrap(),
654 WaitStrategy::SpinYield
655 );
656 assert_eq!(WaitStrategy::from_str("park").unwrap(), WaitStrategy::Park);
657 assert!(WaitStrategy::from_str("invalid").is_err());
658 }
659
660 #[test]
661 fn test_sql_type_conversions() {
662 let def = parse_and_translate(
663 "CREATE SOURCE types (
664 a TINYINT,
665 b SMALLINT,
666 c INT,
667 d BIGINT,
668 e FLOAT,
669 f DOUBLE,
670 g DECIMAL(10,2),
671 h VARCHAR(255),
672 i TEXT,
673 j BOOLEAN,
674 k TIMESTAMP,
675 l DATE
676 )",
677 )
678 .unwrap();
679
680 assert_eq!(def.columns.len(), 12);
681 assert_eq!(def.columns[0].data_type, DataType::Int8);
682 assert_eq!(def.columns[1].data_type, DataType::Int16);
683 assert_eq!(def.columns[2].data_type, DataType::Int32);
684 assert_eq!(def.columns[3].data_type, DataType::Int64);
685 assert_eq!(def.columns[4].data_type, DataType::Float32);
686 assert_eq!(def.columns[5].data_type, DataType::Float64);
687 assert_eq!(def.columns[6].data_type, DataType::Decimal128(10, 2));
688 assert_eq!(def.columns[7].data_type, DataType::Utf8);
689 assert_eq!(def.columns[8].data_type, DataType::Utf8);
690 assert_eq!(def.columns[9].data_type, DataType::Boolean);
691 assert!(matches!(
692 def.columns[10].data_type,
693 DataType::Timestamp(_, _)
694 ));
695 assert_eq!(def.columns[11].data_type, DataType::Date32);
696 }
697
698 #[test]
699 fn test_schema_generation() {
700 let def = parse_and_translate(
701 "CREATE SOURCE events (id BIGINT NOT NULL, name VARCHAR NOT NULL, value DOUBLE)",
702 )
703 .unwrap();
704
705 let schema = def.schema;
706 assert_eq!(schema.fields().len(), 3);
707 assert_eq!(schema.field(0).name(), "id");
708 assert!(!schema.field(0).is_nullable());
709 assert_eq!(schema.field(1).name(), "name");
710 assert!(!schema.field(1).is_nullable());
711 assert_eq!(schema.field(2).name(), "value");
712 assert!(schema.field(2).is_nullable());
713 }
714
715 #[test]
716 fn test_watermark_column_not_found() {
717 let result = parse_and_translate(
718 "CREATE SOURCE events (
719 id BIGINT,
720 WATERMARK FOR nonexistent AS nonexistent - INTERVAL '1' SECOND
721 )",
722 );
723
724 assert!(result.is_err());
725 assert!(result.unwrap_err().to_string().contains("not found"));
726 }
727
728 #[test]
729 fn test_watermark_wrong_type() {
730 let result = parse_and_translate(
731 "CREATE SOURCE events (
732 name VARCHAR,
733 WATERMARK FOR name AS name - INTERVAL '1' SECOND
734 )",
735 );
736
737 assert!(result.is_err());
738 assert!(result
739 .unwrap_err()
740 .to_string()
741 .contains("timestamp or integer type"));
742 }
743
744 #[test]
745 fn test_watermark_milliseconds() {
746 let def = parse_and_translate(
747 "CREATE SOURCE events (
748 ts TIMESTAMP,
749 WATERMARK FOR ts AS ts - INTERVAL '100' MILLISECOND
750 )",
751 )
752 .unwrap();
753
754 let wm = def.watermark.unwrap();
755 assert_eq!(wm.max_out_of_orderness, Duration::from_millis(100));
756 }
757
758 #[test]
759 fn test_watermark_minutes() {
760 let def = parse_and_translate(
761 "CREATE SOURCE events (
762 ts TIMESTAMP,
763 WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
764 )",
765 )
766 .unwrap();
767
768 let wm = def.watermark.unwrap();
769 assert_eq!(wm.max_out_of_orderness, Duration::from_secs(300));
770 }
771
772 #[test]
773 fn test_track_stats_option() {
774 let def =
775 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('track_stats' = 'true')")
776 .unwrap();
777
778 assert!(def.config.track_stats);
779 }
780
781 #[test]
782 fn test_wait_strategy_option() {
783 let def =
784 parse_and_translate("CREATE SOURCE events (id BIGINT) WITH ('wait_strategy' = 'park')")
785 .unwrap();
786
787 assert_eq!(def.config.wait_strategy, WaitStrategy::Park);
788 }
789
790 #[test]
791 fn test_default_config() {
792 let def = parse_and_translate("CREATE SOURCE events (id BIGINT)").unwrap();
793
794 assert_eq!(def.config.buffer_size, DEFAULT_BUFFER_SIZE);
795 assert_eq!(def.config.backpressure, BackpressureStrategy::Block);
796 assert_eq!(def.config.wait_strategy, WaitStrategy::SpinYield);
797 assert!(!def.config.track_stats);
798 }
799
800 #[test]
801 fn test_external_connector_options_ignored() {
802 let def = parse_and_translate(
804 "CREATE SOURCE events (id BIGINT) WITH (
805 'connector' = 'kafka',
806 'topic' = 'events',
807 'bootstrap.servers' = 'localhost:9092',
808 'buffer_size' = '8192'
809 )",
810 )
811 .unwrap();
812
813 assert_eq!(def.config.buffer_size, 8192);
815 }
816
817 #[test]
818 fn test_source_watermark_no_expression() {
819 let def = parse_and_translate(
820 "CREATE SOURCE events (
821 ts TIMESTAMP,
822 WATERMARK FOR ts
823 )",
824 )
825 .unwrap();
826
827 assert!(def.watermark.is_some());
828 let wm = def.watermark.unwrap();
829 assert_eq!(wm.column, "ts");
830 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
831 }
832
833 #[test]
834 fn test_source_watermark_bigint_column() {
835 let def = parse_and_translate(
836 "CREATE SOURCE events (
837 ts BIGINT,
838 WATERMARK FOR ts
839 )",
840 )
841 .unwrap();
842
843 assert!(def.watermark.is_some());
844 let wm = def.watermark.unwrap();
845 assert_eq!(wm.column, "ts");
846 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
847 }
848
849 #[test]
850 fn test_watermark_proctime() {
851 let def = parse_and_translate(
852 "CREATE SOURCE events (
853 ts TIMESTAMP,
854 WATERMARK FOR ts AS PROCTIME()
855 )",
856 )
857 .unwrap();
858
859 assert!(def.watermark.is_some());
860 let wm = def.watermark.unwrap();
861 assert_eq!(wm.column, "ts");
862 assert!(wm.is_processing_time);
863 assert_eq!(wm.max_out_of_orderness, Duration::ZERO);
864 }
865
866 #[test]
867 fn test_watermark_event_time_not_proctime() {
868 let def = parse_and_translate(
869 "CREATE SOURCE events (
870 ts TIMESTAMP,
871 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
872 )",
873 )
874 .unwrap();
875
876 let wm = def.watermark.unwrap();
877 assert!(!wm.is_processing_time);
878 }
879}