sdf_metadata/metadata/operator/
window.rs

1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4
5use crate::{
6    importer::{
7        function::{imported_assign_timestamp_config, imported_operator_config},
8        states::inject_states,
9    },
10    metadata::io::topic::KVSchemaType,
11    util::{
12        sdf_types_map::SdfTypesMap, validation_error::ValidationError,
13        validation_failure::ValidationFailure,
14    },
15    wit::{
16        dataflow::{PackageDefinition, PackageImport, Transforms},
17        operator::{
18            OperatorType, StepInvocation, TumblingWindow, Window, WindowProperties, WindowKind,
19            WatermarkConfig,
20        },
21        states::State,
22    },
23};
24
25use super::transforms::validate_transforms_steps;
26
27impl Window {
28    pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
29        let mut operators = vec![];
30
31        operators.push((self.assign_timestamp.clone(), OperatorType::AssignTimestamp));
32
33        for step in &self.transforms.steps {
34            operators.push((step.inner().clone(), step.clone().into()));
35        }
36
37        if let Some(partition) = &self.partition {
38            let p_op = partition.operators();
39            operators.extend(p_op);
40        }
41
42        if let Some(flush) = &self.flush {
43            operators.push((flush.clone(), OperatorType::WindowAggregate));
44        }
45
46        operators
47    }
48
49    pub(crate) fn add_operator(
50        &mut self,
51        index: Option<usize>,
52        partition: bool,
53        operator_type: OperatorType,
54        function: StepInvocation,
55    ) -> Result<()> {
56        if partition {
57            if let Some(partition) = &mut self.partition {
58                partition.add_operator(index, operator_type, function)
59            } else {
60                Err(anyhow!(
61                    "Cannot add operator. Window and parition were specified but window does not have a partition"
62                ))
63            }
64        } else {
65            self.transforms
66                .insert_operator(index, operator_type, function)
67        }
68    }
69
70    pub(crate) fn delete_operator(&mut self, index: Option<usize>, partition: bool) -> Result<()> {
71        if partition {
72            if let Some(partition) = &mut self.partition {
73                partition.delete_operator(index)
74            } else {
75                Err(anyhow!(
76                    "Cannot delete operator. Window and parition were specified but window does not have a partition"
77                ))
78            }
79        } else if let Some(index) = index {
80            self.transforms.delete_operator(index)
81        } else {
82            todo!("cannot delete assign timestamp unless it is made optional")
83        }
84    }
85
86    pub(crate) fn import_operator_configs(
87        &mut self,
88        imports: &[PackageImport],
89        packages: &[PackageDefinition],
90        service_states: &mut BTreeMap<String, State>,
91    ) -> Result<()> {
92        if self.assign_timestamp.is_imported(imports) {
93            self.assign_timestamp =
94                imported_assign_timestamp_config(&self.assign_timestamp, imports, packages)?;
95            inject_states(service_states, &self.assign_timestamp.states)?;
96        }
97
98        if let Some(ref mut flush) = self.flush {
99            if flush.is_imported(imports) {
100                return Err(anyhow!(
101                    "Importing functions for `Flush` is not yet supported"
102                ));
103            }
104        }
105
106        for step in &mut self.transforms.steps {
107            if step.is_imported(imports) {
108                *step = imported_operator_config(step, imports, packages)?;
109                inject_states(service_states, &step.inner().states)?;
110            }
111        }
112
113        if let Some(ref mut partition) = self.partition {
114            partition.import_operator_configs(imports, packages, service_states)?;
115        }
116
117        Ok(())
118    }
119
120    pub fn output_type(&self, input_type: KVSchemaType) -> Result<KVSchemaType, ValidationError> {
121        let mut expected_type = input_type;
122
123        let failure_message = Err(ValidationError::new(
124            "could not get output type from invalid window",
125        ));
126
127        if let Ok(transforms_output) = self.transforms.output_type(expected_type.clone()) {
128            expected_type = transforms_output;
129        } else {
130            return failure_message;
131        }
132
133        if let Some(partition) = &self.partition {
134            if let Ok(partition_output) = partition.output_type(expected_type.clone()) {
135                expected_type = partition_output;
136            } else {
137                return failure_message;
138            }
139        }
140
141        if let Some(flush) = &self.flush {
142            if let Some(flush_output) = &flush.output {
143                expected_type = flush_output.type_.clone().into();
144            } else {
145                return failure_message;
146            }
147        }
148
149        Ok(expected_type)
150    }
151
152    pub fn validate(
153        &self,
154        types: &SdfTypesMap,
155        expected_input_type: &KVSchemaType,
156        mut input_provider_name: &str,
157    ) -> Result<(), ValidationFailure> {
158        let mut errors = ValidationFailure::new();
159
160        let mut expected_input_type = expected_input_type.clone();
161
162        if let Err(assign_timestamp_error) =
163            self.validate_assign_timestamp(types, &expected_input_type, input_provider_name)
164        {
165            errors.concat(&assign_timestamp_error);
166        }
167
168        if let Err(err) = self.properties.validate() {
169            errors.concat(&err);
170        }
171        if let Err(transforms_error) = validate_transforms_steps(
172            &self.transforms.steps,
173            types,
174            expected_input_type.clone(),
175            input_provider_name.to_string(),
176        ) {
177            errors.concat_with_context("transforms block is invalid:", &transforms_error);
178        }
179
180        // update expected type after validating transforms
181        if let Ok(output_type) = self.transforms.output_type(expected_input_type.clone()) {
182            expected_input_type = output_type;
183            input_provider_name = "window";
184        } else {
185            return Err(errors);
186        };
187
188        // validate partition
189        if let Some(partition) = &self.partition {
190            if let Err(partition_error) =
191                partition.validate(types, &expected_input_type, input_provider_name)
192            {
193                errors.concat_with_context("partition is invalid:", &partition_error);
194            }
195        }
196
197        if let Some(flush) = &self.flush {
198            if let Err(flush_error) = flush.validate_window_aggregate(types) {
199                errors.concat_with_context("flush function is invalid:", &flush_error);
200            }
201        }
202
203        if errors.any() {
204            Err(errors)
205        } else {
206            Ok(())
207        }
208    }
209
210    fn validate_assign_timestamp(
211        &self,
212        types: &SdfTypesMap,
213        expected_input_type: &KVSchemaType,
214        input_provider_name: &str,
215    ) -> Result<(), ValidationFailure> {
216        let mut errors = ValidationFailure::new();
217
218        if let Err(assign_timestamp_error) = self.assign_timestamp.validate_assign_timestamp(types)
219        {
220            errors.concat(&assign_timestamp_error);
221        }
222
223        let value_param = if self.assign_timestamp.requires_key_param() {
224            let key_param = self.assign_timestamp.inputs.first();
225
226            if let Some(key_param) = key_param {
227                if let Some(ref expected_key) = expected_input_type.key {
228                    if key_param.type_.name != expected_key.name {
229                        errors.push_str(&format!(
230                            "assign-timestamp function `{}` input type should match `{}` provided by `{}` but found `{}`",
231                            self.assign_timestamp.uses,
232                            expected_key.name,
233                            &input_provider_name,
234                            key_param.type_.name
235                        ));
236                    }
237                }
238            }
239
240            self.assign_timestamp.inputs.get(1)
241        } else {
242            self.assign_timestamp.inputs.first()
243        };
244
245        //assert assign timestamps first input matches the expected input type
246        if let Some(assign_timestamp_input) = value_param {
247            if assign_timestamp_input.type_.name != expected_input_type.value.name {
248                errors.push_str(&format!(
249                    "assign-timestamp function `{}` input type should match `{}` provided by `{}` but found `{}`",
250                    self.assign_timestamp.uses,
251                    expected_input_type.value.name,
252                    &input_provider_name,
253                    assign_timestamp_input.type_.name
254                ));
255            }
256        }
257
258        if errors.any() {
259            Err(errors)
260        } else {
261            Ok(())
262        }
263    }
264
265    #[cfg(feature = "parser")]
266    pub fn update_inline_operators(&mut self) -> Result<()> {
267        self.assign_timestamp.update_signature_from_code()?;
268
269        for step in &mut self.transforms.steps {
270            step.update_signature_from_code()?;
271        }
272
273        if let Some(partition) = &mut self.partition {
274            partition.update_inline_operators()?;
275        }
276
277        if let Some(flush) = &mut self.flush {
278            flush.update_signature_from_code()?;
279        }
280
281        Ok(())
282    }
283}
284
285impl Default for Window {
286    fn default() -> Self {
287        Self {
288            properties: WindowProperties {
289                kind: WindowKind::Tumbling(TumblingWindow {
290                    duration: 0,
291                    offset: 0,
292                }),
293                watermark_config: WatermarkConfig::default(),
294            },
295            assign_timestamp: Default::default(),
296            flush: None,
297            transforms: Transforms { steps: vec![] },
298            partition: None,
299        }
300    }
301}
302
303#[cfg(test)]
304mod test {
305    use std::collections::BTreeMap;
306
307    use sdf_common::constants::DATAFLOW_STABLE_VERSION;
308
309    use crate::{
310        metadata::io::topic::KVSchemaType,
311        util::{sdf_types_map::SdfTypesMap, validation_error::ValidationError},
312        wit::{
313            dataflow::{PackageDefinition, PackageImport},
314            io::TypeRef,
315            metadata::{NamedParameter, Parameter, ParameterKind, SdfKeyedStateValue},
316            operator::{
317                PartitionOperator, StepInvocation, StepState, TransformOperator, Transforms,
318                TumblingWindow, Window, WindowProperties, WindowKind, WatermarkConfig,
319            },
320            package_interface::{FunctionImport, Header, OperatorType},
321            states::{SdfKeyedState, State, StateTyped},
322        },
323    };
324
325    fn packages() -> Vec<PackageDefinition> {
326        vec![PackageDefinition {
327            api_version: DATAFLOW_STABLE_VERSION.to_string(),
328            meta: Header {
329                name: "my-pkg".to_string(),
330                namespace: "my-ns".to_string(),
331                version: "0.1.0".to_string(),
332            },
333            functions: vec![map_fn(), assign_timestamp_fn(), assign_key_fn()],
334            imports: vec![],
335            types: vec![],
336            states: vec![],
337            dev: None,
338        }]
339    }
340
341    fn map_fn() -> (StepInvocation, OperatorType) {
342        (
343            StepInvocation {
344                uses: "map-fn".to_string(),
345                inputs: vec![NamedParameter {
346                    name: "map-input".to_string(),
347                    type_: TypeRef {
348                        name: "u8".to_string(),
349                    },
350                    optional: false,
351                    kind: ParameterKind::Value,
352                }],
353                output: Some(Parameter {
354                    type_: TypeRef {
355                        name: "u8".to_string(),
356                    }
357                    .into(),
358                    ..Default::default()
359                }),
360                states: vec![StepState::Resolved(StateTyped {
361                    name: "map-state".to_string(),
362                    type_: SdfKeyedState {
363                        key: TypeRef {
364                            name: "string".to_string(),
365                        },
366                        value: SdfKeyedStateValue::U32,
367                    },
368                })],
369
370                ..Default::default()
371            },
372            OperatorType::Map,
373        )
374    }
375
376    fn assign_timestamp_fn() -> (StepInvocation, OperatorType) {
377        (
378            StepInvocation {
379                uses: "assign-timestamp-fn".to_string(),
380                inputs: vec![
381                    NamedParameter {
382                        name: "value".to_string(),
383                        type_: TypeRef {
384                            name: "S64".to_string(),
385                        },
386                        optional: false,
387                        kind: ParameterKind::Value,
388                    },
389                    NamedParameter {
390                        name: "event-time".to_string(),
391                        type_: TypeRef {
392                            name: "String".to_string(),
393                        },
394                        optional: false,
395                        kind: ParameterKind::Value,
396                    },
397                ],
398                output: Some(Parameter {
399                    type_: TypeRef {
400                        name: "S64".to_string(),
401                    }
402                    .into(),
403                    ..Default::default()
404                }),
405                ..Default::default()
406            },
407            OperatorType::AssignTimestamp,
408        )
409    }
410
411    fn assign_key_fn() -> (StepInvocation, OperatorType) {
412        (
413            StepInvocation {
414                uses: "assign-key-fn".to_string(),
415                inputs: vec![NamedParameter {
416                    name: "word-count".to_string(),
417                    type_: TypeRef {
418                        name: "U8".to_string(),
419                    },
420                    optional: false,
421                    kind: ParameterKind::Value,
422                }],
423                output: Some(Parameter {
424                    type_: TypeRef {
425                        name: "U8".to_string(),
426                    }
427                    .into(),
428                    ..Default::default()
429                }),
430                ..Default::default()
431            },
432            OperatorType::AssignKey,
433        )
434    }
435
436    fn window() -> Window {
437        Window {
438            properties: WindowProperties {
439                kind: WindowKind::Tumbling(TumblingWindow {
440                    duration: 60,
441                    offset: 10,
442                }),
443                watermark_config: WatermarkConfig::default(),
444            },
445            assign_timestamp: StepInvocation {
446                uses: "assign-timestamp-fn".to_string(),
447                ..Default::default()
448            },
449            flush: None,
450            transforms: Transforms {
451                steps: vec![
452                    TransformOperator::Map(StepInvocation {
453                        uses: "map-fn".to_string(),
454                        ..Default::default()
455                    }),
456                    TransformOperator::Map(StepInvocation {
457                        uses: "map-fn".to_string(),
458                        ..Default::default()
459                    }),
460                ],
461            },
462            partition: Some(PartitionOperator {
463                assign_key: StepInvocation {
464                    uses: "assign-key-fn".to_string(),
465                    ..Default::default()
466                },
467                transforms: {
468                    Transforms {
469                        steps: vec![
470                            TransformOperator::Map(StepInvocation {
471                                uses: "map-fn".to_string(),
472                                ..Default::default()
473                            }),
474                            TransformOperator::Map(StepInvocation {
475                                uses: "map-fn".to_string(),
476                                ..Default::default()
477                            }),
478                        ],
479                    }
480                },
481                update_state: None,
482            }),
483        }
484    }
485
486    fn imports() -> Vec<PackageImport> {
487        vec![PackageImport {
488            metadata: Header {
489                name: "my-pkg".to_string(),
490                namespace: "my-ns".to_string(),
491                version: "0.1.0".to_string(),
492            },
493            functions: vec![
494                FunctionImport {
495                    name: "map-fn".to_string(),
496                    alias: None,
497                },
498                FunctionImport {
499                    name: "assign-key-fn".to_string(),
500                    alias: None,
501                },
502                FunctionImport {
503                    name: "assign-timestamp-fn".to_string(),
504                    alias: None,
505                },
506            ],
507            path: Some("path/to/my-pkg".to_string()),
508            types: vec![],
509            states: vec![],
510        }]
511    }
512
513    fn expected_type() -> KVSchemaType {
514        (
515            None,
516            TypeRef {
517                name: "s16".to_string(),
518            },
519        )
520            .into()
521    }
522
523    #[test]
524    fn test_import_operator_configs_merges_operators_signatures() {
525        let mut window = window();
526        let mut states: BTreeMap<String, State> = Default::default();
527
528        assert!(window.assign_timestamp.inputs.is_empty());
529        assert!(window.assign_timestamp.output.is_none());
530
531        let steps = &window.transforms.steps;
532        assert!(steps.first().unwrap().inner().inputs.is_empty());
533        assert!(steps.first().unwrap().inner().output.is_none());
534        assert!(steps.get(1).unwrap().inner().inputs.is_empty());
535        assert!(steps.get(1).unwrap().inner().output.is_none());
536
537        let partition = window.partition.as_ref().unwrap();
538        assert!(partition.assign_key.inputs.is_empty());
539        assert!(partition.assign_key.output.is_none());
540
541        let partition_steps = &partition.transforms.steps;
542        assert!(partition_steps.first().unwrap().inner().inputs.is_empty());
543        assert!(partition_steps.first().unwrap().inner().output.is_none());
544        assert!(partition_steps.get(1).unwrap().inner().inputs.is_empty());
545        assert!(partition_steps.get(1).unwrap().inner().output.is_none());
546
547        assert!(states.is_empty());
548
549        window
550            .import_operator_configs(&imports(), &packages(), &mut states)
551            .unwrap();
552
553        assert_eq!(window.assign_timestamp.inputs.len(), 2);
554        assert!(window.assign_timestamp.output.is_some());
555
556        let steps = &window.transforms.steps;
557        assert_eq!(steps.first().unwrap().inner().inputs.len(), 1);
558        assert!(steps.first().unwrap().inner().output.is_some());
559        assert_eq!(steps.get(1).unwrap().inner().inputs.len(), 1);
560        assert!(steps.get(1).unwrap().inner().output.is_some());
561
562        let partition = window.partition.as_ref().unwrap();
563        assert_eq!(partition.assign_key.inputs.len(), 1);
564        assert!(partition.assign_key.output.is_some());
565
566        let partition_steps = &partition.transforms.steps;
567        assert_eq!(partition_steps.first().unwrap().inner().inputs.len(), 1);
568        assert!(partition_steps.first().unwrap().inner().output.is_some());
569        assert_eq!(partition_steps.get(1).unwrap().inner().inputs.len(), 1);
570        assert!(partition_steps.get(1).unwrap().inner().output.is_some());
571
572        assert_eq!(states.len(), 1);
573    }
574
575    #[test]
576    fn test_validate_validates_assign_timestamp_operator() {
577        let types = SdfTypesMap::default();
578        let mut window = window();
579        window.assign_timestamp.output = None;
580
581        let res = window
582            .validate(&types, &expected_type(), "transforms block")
583            .expect_err("should fail for invalid assign timestamp operator");
584
585        assert!(res.errors.contains(&ValidationError::new(
586            "assign-timestamp type function `assign-timestamp-fn` requires an output type"
587        )));
588    }
589
590    #[test]
591    fn test_validate_validates_assign_timestamp_operator_input_matches_expected_input() {
592        let types = SdfTypesMap::default();
593        let mut window = window();
594        window.assign_timestamp.inputs = vec![NamedParameter {
595            name: "value".to_string(),
596            type_: TypeRef {
597                name: "u8".to_string(),
598            },
599            optional: false,
600            kind: ParameterKind::Value,
601        }];
602
603        let res = window
604            .validate(&types, &expected_type(), "transforms block")
605            .expect_err("should fail for assign timestamp operator with wrong input type");
606
607        assert!(res.errors.contains(&ValidationError::new(
608            "assign-timestamp function `assign-timestamp-fn` input type should match `s16` provided by `transforms block` but found `u8`"
609        )));
610    }
611
612    #[test]
613    fn test_validate_validates_tranforms() {
614        let types = SdfTypesMap::default();
615        let mut window = window();
616
617        window.transforms = Transforms {
618            steps: vec![TransformOperator::Filter(StepInvocation {
619                uses: "filter-fn".to_string(),
620                ..Default::default()
621            })],
622        };
623
624        let res = window
625            .validate(&types, &expected_type(), "transforms block")
626            .expect_err("should fail for invalid filter function");
627
628        assert!(res.errors.contains(&ValidationError::new(
629            "transforms block is invalid: filter type function `filter-fn` should have exactly 1 input type, found 0"
630        )));
631    }
632
633    #[test]
634    fn test_validate_validates_partition() {
635        let types = SdfTypesMap::default();
636        let mut window = window();
637
638        window.transforms = Transforms { steps: vec![] };
639
640        window.partition = Some(PartitionOperator {
641            assign_key: StepInvocation {
642                uses: "assign-key-fn".to_string(),
643                ..Default::default()
644            },
645            transforms: Transforms {
646                steps: vec![TransformOperator::Filter(StepInvocation {
647                    uses: "filter-fn".to_string(),
648                    inputs: vec![NamedParameter {
649                        name: "filter-input".to_string(),
650                        type_: TypeRef {
651                            name: "u8".to_string(),
652                        },
653                        optional: false,
654                        kind: ParameterKind::Value,
655                    }],
656                    ..Default::default()
657                })],
658            },
659            update_state: None,
660        });
661
662        let res = window
663            .validate(&types, &expected_type(), "transforms block")
664            .expect_err("should fail for invalid partition transforms input");
665
666        let msg = r"partition is invalid: transforms block is invalid: Function `filter-fn` input type was expected to match `s16` type provided by window, but `u8` was found.";
667
668        assert!(res.errors.contains(&ValidationError::new(msg)));
669    }
670
671    #[test]
672    fn test_validate_validates_flush_as_window_aggregate() {
673        let types = SdfTypesMap::default();
674        let mut window = window();
675
676        window.flush = Some(StepInvocation {
677            uses: "flush-fn".to_string(),
678            ..Default::default()
679        });
680        window.transforms = Transforms { steps: vec![] };
681
682        let res = window
683            .validate(&types, &expected_type(), "transforms block")
684            .expect_err("should fail for invalid filter function");
685
686        assert!(res.errors.contains(&ValidationError::new(
687            "flush function is invalid: window-aggregate type function `flush-fn` requires an output type"
688        )));
689    }
690
691    #[test]
692    fn test_window_operators() {
693        let window = window();
694        let operators = window.operators();
695
696        assert_eq!(operators.len(), 6);
697        assert_eq!(operators.first().unwrap().0.uses, "assign-timestamp-fn");
698        assert_eq!(operators.get(1).unwrap().0.uses, "map-fn");
699        assert_eq!(operators.get(2).unwrap().0.uses, "map-fn");
700        assert_eq!(operators.get(3).unwrap().0.uses, "assign-key-fn");
701        assert_eq!(operators.get(4).unwrap().0.uses, "map-fn");
702        assert_eq!(operators.get(5).unwrap().0.uses, "map-fn");
703    }
704
705    #[test]
706    fn test_add_window_operator() {
707        let mut window = window();
708
709        let (function, operator_type) = map_fn();
710
711        let res = window.add_operator(Some(0), false, operator_type, function);
712
713        assert!(res.is_ok());
714        assert_eq!(window.transforms.steps.len(), 3);
715    }
716
717    #[test]
718    fn test_add_window_partition_operator() {
719        let mut window = window();
720
721        let (function, operator_type) = map_fn();
722
723        let res = window.add_operator(Some(0), true, operator_type, function);
724
725        let partition = window.partition.expect("partition should exist");
726
727        assert!(res.is_ok());
728        assert_eq!(partition.transforms.steps.len(), 3);
729    }
730
731    #[test]
732    fn test_add_window_fails_when_partition_incorrectly_specified() {
733        let mut window = window();
734        window.partition = None;
735
736        let (function, operator_type) = map_fn();
737        let res = window.add_operator(Some(0), true, operator_type, function);
738
739        assert!(res.is_err());
740        assert_eq!(
741            res.unwrap_err().to_string(),
742            "Cannot add operator. Window and parition were specified but window does not have a partition"
743        );
744    }
745
746    #[test]
747    fn test_delete_window_operator() {
748        let mut window = window();
749
750        let res = window.delete_operator(Some(0), false);
751
752        assert!(res.is_ok());
753        assert_eq!(window.transforms.steps.len(), 1);
754    }
755
756    #[test]
757    fn test_delete_window_partition_operator() {
758        let mut window = window();
759
760        let res = window.delete_operator(Some(0), true);
761
762        let partition = window.partition.expect("partition should exist");
763
764        assert!(res.is_ok());
765        assert_eq!(partition.transforms.steps.len(), 1);
766    }
767
768    #[test]
769    fn test_delete_window_fails_when_partition_incorrectly_specified() {
770        let mut window = window();
771        window.partition = None;
772
773        let res = window.delete_operator(Some(0), true);
774
775        assert!(res.is_err());
776        assert_eq!(
777            res.unwrap_err().to_string(),
778            "Cannot delete operator. Window and parition were specified but window does not have a partition"
779        );
780    }
781
782    #[test]
783    fn test_window_idleness_validation() {
784        let mut window = window();
785        window.properties.watermark_config.idleness = Some(10);
786
787        let res = window.validate(&SdfTypesMap::default(), &expected_type(), "window");
788
789        let err = res.unwrap_err();
790        assert!(err.errors.contains(&ValidationError::new(
791            "idleness 10 should be larger than window duration 60"
792        )));
793    }
794}