1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4
5use crate::{
6 importer::{
7 function::{imported_assign_timestamp_config, imported_operator_config},
8 states::inject_states,
9 },
10 metadata::io::topic::KVSchemaType,
11 util::{
12 sdf_types_map::SdfTypesMap, validation_error::ValidationError,
13 validation_failure::ValidationFailure,
14 },
15 wit::{
16 dataflow::{PackageDefinition, PackageImport, Transforms},
17 operator::{
18 OperatorType, StepInvocation, TumblingWindow, Window, WindowProperties, WindowKind,
19 WatermarkConfig,
20 },
21 states::State,
22 },
23};
24
25use super::transforms::validate_transforms_steps;
26
27impl Window {
28 pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
29 let mut operators = vec![];
30
31 operators.push((self.assign_timestamp.clone(), OperatorType::AssignTimestamp));
32
33 for step in &self.transforms.steps {
34 operators.push((step.inner().clone(), step.clone().into()));
35 }
36
37 if let Some(partition) = &self.partition {
38 let p_op = partition.operators();
39 operators.extend(p_op);
40 }
41
42 if let Some(flush) = &self.flush {
43 operators.push((flush.clone(), OperatorType::WindowAggregate));
44 }
45
46 operators
47 }
48
49 pub(crate) fn add_operator(
50 &mut self,
51 index: Option<usize>,
52 partition: bool,
53 operator_type: OperatorType,
54 function: StepInvocation,
55 ) -> Result<()> {
56 if partition {
57 if let Some(partition) = &mut self.partition {
58 partition.add_operator(index, operator_type, function)
59 } else {
60 Err(anyhow!(
61 "Cannot add operator. Window and parition were specified but window does not have a partition"
62 ))
63 }
64 } else {
65 self.transforms
66 .insert_operator(index, operator_type, function)
67 }
68 }
69
70 pub(crate) fn delete_operator(&mut self, index: Option<usize>, partition: bool) -> Result<()> {
71 if partition {
72 if let Some(partition) = &mut self.partition {
73 partition.delete_operator(index)
74 } else {
75 Err(anyhow!(
76 "Cannot delete operator. Window and parition were specified but window does not have a partition"
77 ))
78 }
79 } else if let Some(index) = index {
80 self.transforms.delete_operator(index)
81 } else {
82 todo!("cannot delete assign timestamp unless it is made optional")
83 }
84 }
85
86 pub(crate) fn import_operator_configs(
87 &mut self,
88 imports: &[PackageImport],
89 packages: &[PackageDefinition],
90 service_states: &mut BTreeMap<String, State>,
91 ) -> Result<()> {
92 if self.assign_timestamp.is_imported(imports) {
93 self.assign_timestamp =
94 imported_assign_timestamp_config(&self.assign_timestamp, imports, packages)?;
95 inject_states(service_states, &self.assign_timestamp.states)?;
96 }
97
98 if let Some(ref mut flush) = self.flush {
99 if flush.is_imported(imports) {
100 return Err(anyhow!(
101 "Importing functions for `Flush` is not yet supported"
102 ));
103 }
104 }
105
106 for step in &mut self.transforms.steps {
107 if step.is_imported(imports) {
108 *step = imported_operator_config(step, imports, packages)?;
109 inject_states(service_states, &step.inner().states)?;
110 }
111 }
112
113 if let Some(ref mut partition) = self.partition {
114 partition.import_operator_configs(imports, packages, service_states)?;
115 }
116
117 Ok(())
118 }
119
120 pub fn output_type(&self, input_type: KVSchemaType) -> Result<KVSchemaType, ValidationError> {
121 let mut expected_type = input_type;
122
123 let failure_message = Err(ValidationError::new(
124 "could not get output type from invalid window",
125 ));
126
127 if let Ok(transforms_output) = self.transforms.output_type(expected_type.clone()) {
128 expected_type = transforms_output;
129 } else {
130 return failure_message;
131 }
132
133 if let Some(partition) = &self.partition {
134 if let Ok(partition_output) = partition.output_type(expected_type.clone()) {
135 expected_type = partition_output;
136 } else {
137 return failure_message;
138 }
139 }
140
141 if let Some(flush) = &self.flush {
142 if let Some(flush_output) = &flush.output {
143 expected_type = flush_output.type_.clone().into();
144 } else {
145 return failure_message;
146 }
147 }
148
149 Ok(expected_type)
150 }
151
152 pub fn validate(
153 &self,
154 types: &SdfTypesMap,
155 expected_input_type: &KVSchemaType,
156 mut input_provider_name: &str,
157 ) -> Result<(), ValidationFailure> {
158 let mut errors = ValidationFailure::new();
159
160 let mut expected_input_type = expected_input_type.clone();
161
162 if let Err(assign_timestamp_error) =
163 self.validate_assign_timestamp(types, &expected_input_type, input_provider_name)
164 {
165 errors.concat(&assign_timestamp_error);
166 }
167
168 if let Err(err) = self.properties.validate() {
169 errors.concat(&err);
170 }
171 if let Err(transforms_error) = validate_transforms_steps(
172 &self.transforms.steps,
173 types,
174 expected_input_type.clone(),
175 input_provider_name.to_string(),
176 ) {
177 errors.concat_with_context("transforms block is invalid:", &transforms_error);
178 }
179
180 if let Ok(output_type) = self.transforms.output_type(expected_input_type.clone()) {
182 expected_input_type = output_type;
183 input_provider_name = "window";
184 } else {
185 return Err(errors);
186 };
187
188 if let Some(partition) = &self.partition {
190 if let Err(partition_error) =
191 partition.validate(types, &expected_input_type, input_provider_name)
192 {
193 errors.concat_with_context("partition is invalid:", &partition_error);
194 }
195 }
196
197 if let Some(flush) = &self.flush {
198 if let Err(flush_error) = flush.validate_window_aggregate(types) {
199 errors.concat_with_context("flush function is invalid:", &flush_error);
200 }
201 }
202
203 if errors.any() {
204 Err(errors)
205 } else {
206 Ok(())
207 }
208 }
209
210 fn validate_assign_timestamp(
211 &self,
212 types: &SdfTypesMap,
213 expected_input_type: &KVSchemaType,
214 input_provider_name: &str,
215 ) -> Result<(), ValidationFailure> {
216 let mut errors = ValidationFailure::new();
217
218 if let Err(assign_timestamp_error) = self.assign_timestamp.validate_assign_timestamp(types)
219 {
220 errors.concat(&assign_timestamp_error);
221 }
222
223 let value_param = if self.assign_timestamp.requires_key_param() {
224 let key_param = self.assign_timestamp.inputs.first();
225
226 if let Some(key_param) = key_param {
227 if let Some(ref expected_key) = expected_input_type.key {
228 if key_param.type_.name != expected_key.name {
229 errors.push_str(&format!(
230 "assign-timestamp function `{}` input type should match `{}` provided by `{}` but found `{}`",
231 self.assign_timestamp.uses,
232 expected_key.name,
233 &input_provider_name,
234 key_param.type_.name
235 ));
236 }
237 }
238 }
239
240 self.assign_timestamp.inputs.get(1)
241 } else {
242 self.assign_timestamp.inputs.first()
243 };
244
245 if let Some(assign_timestamp_input) = value_param {
247 if assign_timestamp_input.type_.name != expected_input_type.value.name {
248 errors.push_str(&format!(
249 "assign-timestamp function `{}` input type should match `{}` provided by `{}` but found `{}`",
250 self.assign_timestamp.uses,
251 expected_input_type.value.name,
252 &input_provider_name,
253 assign_timestamp_input.type_.name
254 ));
255 }
256 }
257
258 if errors.any() {
259 Err(errors)
260 } else {
261 Ok(())
262 }
263 }
264
265 #[cfg(feature = "parser")]
266 pub fn update_inline_operators(&mut self) -> Result<()> {
267 self.assign_timestamp.update_signature_from_code()?;
268
269 for step in &mut self.transforms.steps {
270 step.update_signature_from_code()?;
271 }
272
273 if let Some(partition) = &mut self.partition {
274 partition.update_inline_operators()?;
275 }
276
277 if let Some(flush) = &mut self.flush {
278 flush.update_signature_from_code()?;
279 }
280
281 Ok(())
282 }
283}
284
285impl Default for Window {
286 fn default() -> Self {
287 Self {
288 properties: WindowProperties {
289 kind: WindowKind::Tumbling(TumblingWindow {
290 duration: 0,
291 offset: 0,
292 }),
293 watermark_config: WatermarkConfig::default(),
294 },
295 assign_timestamp: Default::default(),
296 flush: None,
297 transforms: Transforms { steps: vec![] },
298 partition: None,
299 }
300 }
301}
302
303#[cfg(test)]
304mod test {
305 use std::collections::BTreeMap;
306
307 use sdf_common::constants::DATAFLOW_STABLE_VERSION;
308
309 use crate::{
310 metadata::io::topic::KVSchemaType,
311 util::{sdf_types_map::SdfTypesMap, validation_error::ValidationError},
312 wit::{
313 dataflow::{PackageDefinition, PackageImport},
314 io::TypeRef,
315 metadata::{NamedParameter, Parameter, ParameterKind, SdfKeyedStateValue},
316 operator::{
317 PartitionOperator, StepInvocation, StepState, TransformOperator, Transforms,
318 TumblingWindow, Window, WindowProperties, WindowKind, WatermarkConfig,
319 },
320 package_interface::{FunctionImport, Header, OperatorType},
321 states::{SdfKeyedState, State, StateTyped},
322 },
323 };
324
325 fn packages() -> Vec<PackageDefinition> {
326 vec![PackageDefinition {
327 api_version: DATAFLOW_STABLE_VERSION.to_string(),
328 meta: Header {
329 name: "my-pkg".to_string(),
330 namespace: "my-ns".to_string(),
331 version: "0.1.0".to_string(),
332 },
333 functions: vec![map_fn(), assign_timestamp_fn(), assign_key_fn()],
334 imports: vec![],
335 types: vec![],
336 states: vec![],
337 dev: None,
338 }]
339 }
340
341 fn map_fn() -> (StepInvocation, OperatorType) {
342 (
343 StepInvocation {
344 uses: "map-fn".to_string(),
345 inputs: vec![NamedParameter {
346 name: "map-input".to_string(),
347 type_: TypeRef {
348 name: "u8".to_string(),
349 },
350 optional: false,
351 kind: ParameterKind::Value,
352 }],
353 output: Some(Parameter {
354 type_: TypeRef {
355 name: "u8".to_string(),
356 }
357 .into(),
358 ..Default::default()
359 }),
360 states: vec![StepState::Resolved(StateTyped {
361 name: "map-state".to_string(),
362 type_: SdfKeyedState {
363 key: TypeRef {
364 name: "string".to_string(),
365 },
366 value: SdfKeyedStateValue::U32,
367 },
368 })],
369
370 ..Default::default()
371 },
372 OperatorType::Map,
373 )
374 }
375
376 fn assign_timestamp_fn() -> (StepInvocation, OperatorType) {
377 (
378 StepInvocation {
379 uses: "assign-timestamp-fn".to_string(),
380 inputs: vec![
381 NamedParameter {
382 name: "value".to_string(),
383 type_: TypeRef {
384 name: "S64".to_string(),
385 },
386 optional: false,
387 kind: ParameterKind::Value,
388 },
389 NamedParameter {
390 name: "event-time".to_string(),
391 type_: TypeRef {
392 name: "String".to_string(),
393 },
394 optional: false,
395 kind: ParameterKind::Value,
396 },
397 ],
398 output: Some(Parameter {
399 type_: TypeRef {
400 name: "S64".to_string(),
401 }
402 .into(),
403 ..Default::default()
404 }),
405 ..Default::default()
406 },
407 OperatorType::AssignTimestamp,
408 )
409 }
410
411 fn assign_key_fn() -> (StepInvocation, OperatorType) {
412 (
413 StepInvocation {
414 uses: "assign-key-fn".to_string(),
415 inputs: vec![NamedParameter {
416 name: "word-count".to_string(),
417 type_: TypeRef {
418 name: "U8".to_string(),
419 },
420 optional: false,
421 kind: ParameterKind::Value,
422 }],
423 output: Some(Parameter {
424 type_: TypeRef {
425 name: "U8".to_string(),
426 }
427 .into(),
428 ..Default::default()
429 }),
430 ..Default::default()
431 },
432 OperatorType::AssignKey,
433 )
434 }
435
436 fn window() -> Window {
437 Window {
438 properties: WindowProperties {
439 kind: WindowKind::Tumbling(TumblingWindow {
440 duration: 60,
441 offset: 10,
442 }),
443 watermark_config: WatermarkConfig::default(),
444 },
445 assign_timestamp: StepInvocation {
446 uses: "assign-timestamp-fn".to_string(),
447 ..Default::default()
448 },
449 flush: None,
450 transforms: Transforms {
451 steps: vec![
452 TransformOperator::Map(StepInvocation {
453 uses: "map-fn".to_string(),
454 ..Default::default()
455 }),
456 TransformOperator::Map(StepInvocation {
457 uses: "map-fn".to_string(),
458 ..Default::default()
459 }),
460 ],
461 },
462 partition: Some(PartitionOperator {
463 assign_key: StepInvocation {
464 uses: "assign-key-fn".to_string(),
465 ..Default::default()
466 },
467 transforms: {
468 Transforms {
469 steps: vec![
470 TransformOperator::Map(StepInvocation {
471 uses: "map-fn".to_string(),
472 ..Default::default()
473 }),
474 TransformOperator::Map(StepInvocation {
475 uses: "map-fn".to_string(),
476 ..Default::default()
477 }),
478 ],
479 }
480 },
481 update_state: None,
482 }),
483 }
484 }
485
486 fn imports() -> Vec<PackageImport> {
487 vec![PackageImport {
488 metadata: Header {
489 name: "my-pkg".to_string(),
490 namespace: "my-ns".to_string(),
491 version: "0.1.0".to_string(),
492 },
493 functions: vec![
494 FunctionImport {
495 name: "map-fn".to_string(),
496 alias: None,
497 },
498 FunctionImport {
499 name: "assign-key-fn".to_string(),
500 alias: None,
501 },
502 FunctionImport {
503 name: "assign-timestamp-fn".to_string(),
504 alias: None,
505 },
506 ],
507 path: Some("path/to/my-pkg".to_string()),
508 types: vec![],
509 states: vec![],
510 }]
511 }
512
513 fn expected_type() -> KVSchemaType {
514 (
515 None,
516 TypeRef {
517 name: "s16".to_string(),
518 },
519 )
520 .into()
521 }
522
523 #[test]
524 fn test_import_operator_configs_merges_operators_signatures() {
525 let mut window = window();
526 let mut states: BTreeMap<String, State> = Default::default();
527
528 assert!(window.assign_timestamp.inputs.is_empty());
529 assert!(window.assign_timestamp.output.is_none());
530
531 let steps = &window.transforms.steps;
532 assert!(steps.first().unwrap().inner().inputs.is_empty());
533 assert!(steps.first().unwrap().inner().output.is_none());
534 assert!(steps.get(1).unwrap().inner().inputs.is_empty());
535 assert!(steps.get(1).unwrap().inner().output.is_none());
536
537 let partition = window.partition.as_ref().unwrap();
538 assert!(partition.assign_key.inputs.is_empty());
539 assert!(partition.assign_key.output.is_none());
540
541 let partition_steps = &partition.transforms.steps;
542 assert!(partition_steps.first().unwrap().inner().inputs.is_empty());
543 assert!(partition_steps.first().unwrap().inner().output.is_none());
544 assert!(partition_steps.get(1).unwrap().inner().inputs.is_empty());
545 assert!(partition_steps.get(1).unwrap().inner().output.is_none());
546
547 assert!(states.is_empty());
548
549 window
550 .import_operator_configs(&imports(), &packages(), &mut states)
551 .unwrap();
552
553 assert_eq!(window.assign_timestamp.inputs.len(), 2);
554 assert!(window.assign_timestamp.output.is_some());
555
556 let steps = &window.transforms.steps;
557 assert_eq!(steps.first().unwrap().inner().inputs.len(), 1);
558 assert!(steps.first().unwrap().inner().output.is_some());
559 assert_eq!(steps.get(1).unwrap().inner().inputs.len(), 1);
560 assert!(steps.get(1).unwrap().inner().output.is_some());
561
562 let partition = window.partition.as_ref().unwrap();
563 assert_eq!(partition.assign_key.inputs.len(), 1);
564 assert!(partition.assign_key.output.is_some());
565
566 let partition_steps = &partition.transforms.steps;
567 assert_eq!(partition_steps.first().unwrap().inner().inputs.len(), 1);
568 assert!(partition_steps.first().unwrap().inner().output.is_some());
569 assert_eq!(partition_steps.get(1).unwrap().inner().inputs.len(), 1);
570 assert!(partition_steps.get(1).unwrap().inner().output.is_some());
571
572 assert_eq!(states.len(), 1);
573 }
574
575 #[test]
576 fn test_validate_validates_assign_timestamp_operator() {
577 let types = SdfTypesMap::default();
578 let mut window = window();
579 window.assign_timestamp.output = None;
580
581 let res = window
582 .validate(&types, &expected_type(), "transforms block")
583 .expect_err("should fail for invalid assign timestamp operator");
584
585 assert!(res.errors.contains(&ValidationError::new(
586 "assign-timestamp type function `assign-timestamp-fn` requires an output type"
587 )));
588 }
589
590 #[test]
591 fn test_validate_validates_assign_timestamp_operator_input_matches_expected_input() {
592 let types = SdfTypesMap::default();
593 let mut window = window();
594 window.assign_timestamp.inputs = vec![NamedParameter {
595 name: "value".to_string(),
596 type_: TypeRef {
597 name: "u8".to_string(),
598 },
599 optional: false,
600 kind: ParameterKind::Value,
601 }];
602
603 let res = window
604 .validate(&types, &expected_type(), "transforms block")
605 .expect_err("should fail for assign timestamp operator with wrong input type");
606
607 assert!(res.errors.contains(&ValidationError::new(
608 "assign-timestamp function `assign-timestamp-fn` input type should match `s16` provided by `transforms block` but found `u8`"
609 )));
610 }
611
612 #[test]
613 fn test_validate_validates_tranforms() {
614 let types = SdfTypesMap::default();
615 let mut window = window();
616
617 window.transforms = Transforms {
618 steps: vec![TransformOperator::Filter(StepInvocation {
619 uses: "filter-fn".to_string(),
620 ..Default::default()
621 })],
622 };
623
624 let res = window
625 .validate(&types, &expected_type(), "transforms block")
626 .expect_err("should fail for invalid filter function");
627
628 assert!(res.errors.contains(&ValidationError::new(
629 "transforms block is invalid: filter type function `filter-fn` should have exactly 1 input type, found 0"
630 )));
631 }
632
633 #[test]
634 fn test_validate_validates_partition() {
635 let types = SdfTypesMap::default();
636 let mut window = window();
637
638 window.transforms = Transforms { steps: vec![] };
639
640 window.partition = Some(PartitionOperator {
641 assign_key: StepInvocation {
642 uses: "assign-key-fn".to_string(),
643 ..Default::default()
644 },
645 transforms: Transforms {
646 steps: vec![TransformOperator::Filter(StepInvocation {
647 uses: "filter-fn".to_string(),
648 inputs: vec![NamedParameter {
649 name: "filter-input".to_string(),
650 type_: TypeRef {
651 name: "u8".to_string(),
652 },
653 optional: false,
654 kind: ParameterKind::Value,
655 }],
656 ..Default::default()
657 })],
658 },
659 update_state: None,
660 });
661
662 let res = window
663 .validate(&types, &expected_type(), "transforms block")
664 .expect_err("should fail for invalid partition transforms input");
665
666 let msg = r"partition is invalid: transforms block is invalid: Function `filter-fn` input type was expected to match `s16` type provided by window, but `u8` was found.";
667
668 assert!(res.errors.contains(&ValidationError::new(msg)));
669 }
670
671 #[test]
672 fn test_validate_validates_flush_as_window_aggregate() {
673 let types = SdfTypesMap::default();
674 let mut window = window();
675
676 window.flush = Some(StepInvocation {
677 uses: "flush-fn".to_string(),
678 ..Default::default()
679 });
680 window.transforms = Transforms { steps: vec![] };
681
682 let res = window
683 .validate(&types, &expected_type(), "transforms block")
684 .expect_err("should fail for invalid filter function");
685
686 assert!(res.errors.contains(&ValidationError::new(
687 "flush function is invalid: window-aggregate type function `flush-fn` requires an output type"
688 )));
689 }
690
691 #[test]
692 fn test_window_operators() {
693 let window = window();
694 let operators = window.operators();
695
696 assert_eq!(operators.len(), 6);
697 assert_eq!(operators.first().unwrap().0.uses, "assign-timestamp-fn");
698 assert_eq!(operators.get(1).unwrap().0.uses, "map-fn");
699 assert_eq!(operators.get(2).unwrap().0.uses, "map-fn");
700 assert_eq!(operators.get(3).unwrap().0.uses, "assign-key-fn");
701 assert_eq!(operators.get(4).unwrap().0.uses, "map-fn");
702 assert_eq!(operators.get(5).unwrap().0.uses, "map-fn");
703 }
704
705 #[test]
706 fn test_add_window_operator() {
707 let mut window = window();
708
709 let (function, operator_type) = map_fn();
710
711 let res = window.add_operator(Some(0), false, operator_type, function);
712
713 assert!(res.is_ok());
714 assert_eq!(window.transforms.steps.len(), 3);
715 }
716
717 #[test]
718 fn test_add_window_partition_operator() {
719 let mut window = window();
720
721 let (function, operator_type) = map_fn();
722
723 let res = window.add_operator(Some(0), true, operator_type, function);
724
725 let partition = window.partition.expect("partition should exist");
726
727 assert!(res.is_ok());
728 assert_eq!(partition.transforms.steps.len(), 3);
729 }
730
731 #[test]
732 fn test_add_window_fails_when_partition_incorrectly_specified() {
733 let mut window = window();
734 window.partition = None;
735
736 let (function, operator_type) = map_fn();
737 let res = window.add_operator(Some(0), true, operator_type, function);
738
739 assert!(res.is_err());
740 assert_eq!(
741 res.unwrap_err().to_string(),
742 "Cannot add operator. Window and parition were specified but window does not have a partition"
743 );
744 }
745
746 #[test]
747 fn test_delete_window_operator() {
748 let mut window = window();
749
750 let res = window.delete_operator(Some(0), false);
751
752 assert!(res.is_ok());
753 assert_eq!(window.transforms.steps.len(), 1);
754 }
755
756 #[test]
757 fn test_delete_window_partition_operator() {
758 let mut window = window();
759
760 let res = window.delete_operator(Some(0), true);
761
762 let partition = window.partition.expect("partition should exist");
763
764 assert!(res.is_ok());
765 assert_eq!(partition.transforms.steps.len(), 1);
766 }
767
768 #[test]
769 fn test_delete_window_fails_when_partition_incorrectly_specified() {
770 let mut window = window();
771 window.partition = None;
772
773 let res = window.delete_operator(Some(0), true);
774
775 assert!(res.is_err());
776 assert_eq!(
777 res.unwrap_err().to_string(),
778 "Cannot delete operator. Window and parition were specified but window does not have a partition"
779 );
780 }
781
782 #[test]
783 fn test_window_idleness_validation() {
784 let mut window = window();
785 window.properties.watermark_config.idleness = Some(10);
786
787 let res = window.validate(&SdfTypesMap::default(), &expected_type(), "window");
788
789 let err = res.unwrap_err();
790 assert!(err.errors.contains(&ValidationError::new(
791 "idleness 10 should be larger than window duration 60"
792 )));
793 }
794}