sdf_metadata/metadata/operator/
transforms.rs

1use anyhow::{Result, anyhow};
2
3use crate::{
4    metadata::io::topic::KVSchemaType,
5    util::{
6        sdf_types_map::SdfTypesMap, validation_error::ValidationError,
7        validation_failure::ValidationFailure,
8    },
9    wit::dataflow::{TransformOperator, Transforms},
10    wit::package_interface::{StepInvocation, OperatorType},
11};
12
13#[allow(clippy::derivable_impls)]
14impl Default for Transforms {
15    fn default() -> Self {
16        Self { steps: vec![] }
17    }
18}
19
20impl Transforms {
21    pub(crate) fn insert_operator(
22        &mut self,
23        index: Option<usize>,
24        operator_type: OperatorType,
25        step_invocation: StepInvocation,
26    ) -> Result<()> {
27        let index = match index {
28            Some(index) => index,
29            None => {
30                return Err(anyhow!(
31                    "Must provide transforms index to insert operator into transforms block"
32                ));
33            }
34        };
35
36        if index > self.steps.len() {
37            return Err(anyhow!(
38                "cannot insert operator into transforms block, index is out of bounds, len = {}",
39                self.steps.len()
40            ));
41        }
42
43        let operator = match TransformOperator::new(operator_type, step_invocation) {
44            Some(operator) => operator,
45            None => {
46                return Err(anyhow!(
47                    "OperatorType {:?} not supported for transforms operator",
48                    operator_type
49                ));
50            }
51        };
52
53        self.steps.insert(index, operator);
54
55        Ok(())
56    }
57
58    pub(crate) fn delete_operator(&mut self, index: usize) -> Result<()> {
59        if index >= self.steps.len() {
60            return Err(anyhow!(
61                "cannot delete operator from transforms block, index is out of bounds, len = {}",
62                self.steps.len()
63            ));
64        }
65
66        self.steps.remove(index);
67
68        Ok(())
69    }
70
71    // gets the output type of the transforms if it is valid, otherwise returns an opaque error
72    pub fn output_type(
73        &self,
74        mut transform_input_type: KVSchemaType,
75    ) -> Result<KVSchemaType, ValidationError> {
76        let failure_message = Err(ValidationError::new(
77            "could not get output type from invalid transforms",
78        ));
79
80        for step in &self.steps {
81            let step_invocation = step.inner();
82
83            let value = if step_invocation.requires_key_param() {
84                if step_invocation.inputs.is_empty() {
85                    return failure_message;
86                }
87
88                step_invocation.inputs.get(1)
89            } else {
90                step_invocation.inputs.first()
91            };
92
93            if let Some(input_type) = value {
94                if input_type.type_.name.replace('-', "_")
95                    != transform_input_type.value.name.replace('-', "_")
96                {
97                    return failure_message;
98                }
99            } else {
100                return failure_message;
101            }
102
103            if let Some(output_type) = &step_invocation.output {
104                if !matches!(&step, TransformOperator::Filter(_)) {
105                    output_type
106                        .type_
107                        .value_type()
108                        .clone_into(&mut transform_input_type.value);
109
110                    if let Some(key_type) = output_type.type_.key_type() {
111                        transform_input_type.key = Some(key_type.clone());
112                    }
113                }
114            }
115        }
116
117        Ok(transform_input_type)
118    }
119}
120
121pub fn validate_transforms_steps(
122    steps: &[TransformOperator],
123    types: &SdfTypesMap,
124    mut expected_type: KVSchemaType,
125    mut input_provider_name: String,
126) -> Result<(), ValidationFailure> {
127    let mut errors = ValidationFailure::new();
128
129    for step in steps {
130        if let Err(function_error) = step.validate(types) {
131            errors.concat(&function_error);
132        }
133        let step_invocation = step.inner();
134
135        let value = if step_invocation.requires_key_param() {
136            if let Some(input_key) = step_invocation.inputs.first() {
137                if let Some(ref key) = expected_type.key {
138                    if input_key.type_.name.replace('-', "_") != key.name.replace('-', "_") {
139                        errors.push_str(&format!(
140                            "in `{}`, key type does not match expected key type. {} != {}",
141                            step_invocation.uses, input_key.type_.name, key.name
142                        ));
143                    }
144                } else {
145                    errors.push_str(
146                        &format!(
147                            "{} function requires a key, but none was found. Make sure that you define the right key in the topic configuration",
148                            step_invocation.uses
149                        )
150                    );
151                }
152            } else {
153                errors.push_str(&format!(
154                    "map type function `{}` should have at least 1 input type, found 0",
155                    step.name()
156                ));
157            }
158            step_invocation.inputs.get(1)
159        } else {
160            step_invocation.inputs.first()
161        };
162
163        if let Some(input_type) = value {
164            if input_type.type_.name.replace('-', "_") != expected_type.value.name.replace('-', "_")
165            {
166                errors.push_str(&format!(
167                    "Function `{}` input type was expected to match `{}` type provided by {}, but `{}` was found.",
168                    step.name(),
169                    expected_type.value.name,
170                    input_provider_name,
171                    input_type.type_.name
172                ));
173            }
174        }
175
176        if let Some(output_type) = &step.inner().output {
177            if !matches!(step, TransformOperator::Filter(_)) {
178                output_type
179                    .type_
180                    .value_type()
181                    .clone_into(&mut expected_type.value);
182                if let Some(key_ty) = output_type.type_.key_type() {
183                    expected_type.key = Some(key_ty.clone());
184                }
185            }
186
187            input_provider_name = format!("function `{}`", step.name());
188        }
189    }
190
191    if errors.any() {
192        Err(errors)
193    } else {
194        Ok(())
195    }
196}
197
198#[cfg(test)]
199mod test {
200    use super::validate_transforms_steps;
201    use crate::{
202        util::{sdf_types_map::SdfTypesMap, validation_error::ValidationError},
203        wit::{
204            dataflow::{TransformOperator, Transforms},
205            io::TypeRef,
206            metadata::{NamedParameter, Parameter, ParameterKind},
207            operator::StepInvocation,
208        },
209    };
210
211    #[test]
212    fn test_validate_transforms_steps_rejects_operators_without_input_type() {
213        let types = SdfTypesMap::default();
214        let steps = vec![TransformOperator::Map(StepInvocation {
215            uses: "my-function".to_string(),
216            inputs: vec![],
217            ..Default::default()
218        })];
219
220        let expected_input_type = (
221            Some(TypeRef {
222                name: "bytes".to_string(),
223            }),
224            TypeRef {
225                name: "string".to_string(),
226            },
227        )
228            .into();
229
230        let res = validate_transforms_steps(
231            &steps,
232            &types,
233            expected_input_type,
234            "topic `my-topic`".to_string(),
235        )
236        .expect_err("should error for invalid step");
237
238        assert!(res.errors.contains(&ValidationError::new(
239            "map type function `my-function` should have exactly 1 input type, found 0"
240        )));
241    }
242
243    #[test]
244    fn test_validate_transforms_steps_rejects_operators_when_first_input_type_does_not_match_passed_in_type(
245    ) {
246        let types = SdfTypesMap::default();
247        let steps = vec![TransformOperator::Map(StepInvocation {
248            uses: "my-function".to_string(),
249            inputs: vec![NamedParameter {
250                name: "input".to_string(),
251                type_: TypeRef {
252                    name: "u8".to_string(),
253                },
254                optional: false,
255                kind: ParameterKind::Value,
256            }],
257            ..Default::default()
258        })];
259
260        let expected_input_type = (
261            Some(TypeRef {
262                name: "bytes".to_string(),
263            }),
264            TypeRef {
265                name: "string".to_string(),
266            },
267        )
268            .into();
269
270        let res = validate_transforms_steps(
271            &steps,
272            &types,
273            expected_input_type,
274            "Topic `my-topic`".to_string(),
275        )
276        .expect_err("should error for invalid step");
277
278        assert!(res.errors.contains(&ValidationError::new("Function `my-function` input type was expected to match `string` type provided by Topic `my-topic`, but `u8` was found.")));
279    }
280
281    #[test]
282    fn test_validate_transforms_steps_rejects_operators_when_input_type_does_not_match_last_output_type(
283    ) {
284        let types = SdfTypesMap::default();
285        let steps = vec![
286            TransformOperator::Map(StepInvocation {
287                uses: "my-function".to_string(),
288                inputs: vec![NamedParameter {
289                    name: "input".to_string(),
290                    type_: TypeRef {
291                        name: "string".to_string(),
292                    },
293
294                    optional: false,
295                    kind: ParameterKind::Value,
296                }],
297                output: Some(Parameter {
298                    type_: TypeRef {
299                        name: "u8".to_string(),
300                    }
301                    .into(),
302                    ..Default::default()
303                }),
304                ..Default::default()
305            }),
306            TransformOperator::Map(StepInvocation {
307                uses: "my-other-function".to_string(),
308                inputs: vec![NamedParameter {
309                    name: "input".to_string(),
310                    type_: TypeRef {
311                        name: "string".to_string(),
312                    },
313                    optional: false,
314                    kind: ParameterKind::Value,
315                }],
316                ..Default::default()
317            }),
318        ];
319        let expected_input_type = (
320            Some(TypeRef {
321                name: "bytes".to_string(),
322            }),
323            TypeRef {
324                name: "string".to_string(),
325            },
326        )
327            .into();
328        let res = validate_transforms_steps(
329            &steps,
330            &types,
331            expected_input_type,
332            "Topic `my-topic`".to_string(),
333        )
334        .expect_err("should error for invalid input type");
335
336        assert!(
337            res.errors.contains(
338                &ValidationError::new("Function `my-other-function` input type was expected to match `u8` type provided by function `my-function`, but `string` was found.")
339            )
340        );
341    }
342
343    #[test]
344    fn test_validate_transforms_steps_rejects_operators_when_output_type_does_not_exist() {
345        let types = SdfTypesMap::default();
346        let steps = vec![TransformOperator::Map(StepInvocation {
347            uses: "my-function".to_string(),
348            inputs: vec![NamedParameter {
349                name: "input".to_string(),
350                type_: TypeRef {
351                    name: "string".to_string(),
352                },
353                optional: false,
354                kind: ParameterKind::Value,
355            }],
356            output: Some(Parameter {
357                type_: TypeRef {
358                    name: "foobar".to_string(),
359                }
360                .into(),
361                ..Default::default()
362            }),
363            ..Default::default()
364        })];
365
366        let expected_input_type = (
367            Some(TypeRef {
368                name: "bytes".to_string(),
369            }),
370            TypeRef {
371                name: "string".to_string(),
372            },
373        )
374            .into();
375
376        let res = validate_transforms_steps(
377            &steps,
378            &types,
379            expected_input_type,
380            "Topic `my-topic`".to_string(),
381        )
382        .expect_err("should error for invalid output type");
383
384        assert!(&res.errors.contains(&ValidationError::new(
385            "function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types"
386        )))
387    }
388
389    #[test]
390    fn test_validate_transforms_steps_rejects_functions_with_invalid_signatures() {
391        let types = SdfTypesMap::default();
392        let steps = vec![TransformOperator::Filter(StepInvocation {
393            uses: "my-function".to_string(),
394            output: Some(Parameter {
395                type_: TypeRef {
396                    name: "u8".to_string(),
397                }
398                .into(),
399                optional: true,
400            }),
401            ..Default::default()
402        })];
403
404        let expected_input_type = (
405            Some(TypeRef {
406                name: "bytes".to_string(),
407            }),
408            TypeRef {
409                name: "string".to_string(),
410            },
411        )
412            .into();
413        let res = validate_transforms_steps(
414            &steps,
415            &types,
416            expected_input_type,
417            "Topic `my-topic`".to_string(),
418        )
419        .expect_err("should error for invalid signature for function");
420
421        assert!(res.errors.contains(&ValidationError::new(
422            "filter type function `my-function` requires an output type of `bool`, but found `u8`"
423        )));
424    }
425
426    #[test]
427    fn test_validate_transforms_steps_ignores_filter_when_validating_next_input_type() {
428        let types = SdfTypesMap::default();
429        let steps = vec![
430            TransformOperator::Filter(StepInvocation {
431                uses: "my-filter".to_string(),
432                inputs: vec![NamedParameter {
433                    name: "input".to_string(),
434                    type_: TypeRef {
435                        name: "string".to_string(),
436                    },
437                    optional: false,
438                    kind: ParameterKind::Value,
439                }],
440                output: Some(Parameter {
441                    type_: TypeRef {
442                        name: "bool".to_string(),
443                    }
444                    .into(),
445                    ..Default::default()
446                }),
447                ..Default::default()
448            }),
449            TransformOperator::Map(StepInvocation {
450                uses: "my-map".to_string(),
451                inputs: vec![NamedParameter {
452                    name: "input".to_string(),
453                    type_: TypeRef {
454                        name: "string".to_string(),
455                    },
456                    optional: false,
457                    kind: ParameterKind::Value,
458                }],
459                output: Some(Parameter {
460                    type_: TypeRef {
461                        name: "string".to_string(),
462                    }
463                    .into(),
464                    ..Default::default()
465                }),
466                ..Default::default()
467            }),
468        ];
469
470        let expected_input_type = (
471            Some(TypeRef {
472                name: "bytes".to_string(),
473            }),
474            TypeRef {
475                name: "string".to_string(),
476            },
477        )
478            .into();
479        validate_transforms_steps(
480            &steps,
481            &types,
482            expected_input_type,
483            "Topic `my-topic`".to_string(),
484        )
485        .expect("should pass for valid transforms");
486    }
487
488    #[test]
489    fn test_output_type_ignores_filter() {
490        let transforms = Transforms {
491            steps: vec![TransformOperator::Filter(StepInvocation {
492                uses: "my-filter".to_string(),
493                inputs: vec![NamedParameter {
494                    name: "input".to_string(),
495                    type_: TypeRef {
496                        name: "string".to_string(),
497                    },
498                    optional: false,
499                    kind: ParameterKind::Value,
500                }],
501                output: Some(Parameter {
502                    type_: TypeRef {
503                        name: "bool".to_string(),
504                    }
505                    .into(),
506                    ..Default::default()
507                }),
508                ..Default::default()
509            })],
510        };
511        let expected_input_type = (
512            Some(TypeRef {
513                name: "bytes".to_string(),
514            }),
515            TypeRef {
516                name: "string".to_string(),
517            },
518        )
519            .into();
520        let res = transforms
521            .output_type(expected_input_type)
522            .expect("should pass for valid transforms");
523
524        assert_eq!(res.value.name, "string".to_string());
525    }
526
527    #[test]
528    fn test_delete_operator() {
529        let mut transforms = Transforms {
530            steps: vec![
531                TransformOperator::FilterMap(StepInvocation {
532                    uses: "listing_map_job".to_string(),
533                    ..Default::default()
534                }),
535                TransformOperator::Map(StepInvocation {
536                    uses: "job_map_prospect".to_string(),
537                    ..Default::default()
538                }),
539            ],
540        };
541
542        let res = transforms.delete_operator(0);
543
544        assert!(res.is_ok());
545        assert_eq!(transforms.steps.len(), 1);
546    }
547
548    #[test]
549    fn test_delete_operator_errors_on_index_out_of_bounds() {
550        let mut transforms = Transforms {
551            steps: vec![
552                TransformOperator::FilterMap(StepInvocation {
553                    uses: "listing_map_job".to_string(),
554                    ..Default::default()
555                }),
556                TransformOperator::Map(StepInvocation {
557                    uses: "job_map_prospect".to_string(),
558                    ..Default::default()
559                }),
560            ],
561        };
562
563        let res = transforms.delete_operator(2);
564
565        assert!(res.is_err());
566        assert_eq!(
567            res.unwrap_err().to_string(),
568            "cannot delete operator from transforms block, index is out of bounds, len = 2"
569        );
570    }
571}