1use anyhow::{Result, anyhow};
2
3use crate::{
4 metadata::io::topic::KVSchemaType,
5 util::{
6 sdf_types_map::SdfTypesMap, validation_error::ValidationError,
7 validation_failure::ValidationFailure,
8 },
9 wit::dataflow::{TransformOperator, Transforms},
10 wit::package_interface::{StepInvocation, OperatorType},
11};
12
13#[allow(clippy::derivable_impls)]
14impl Default for Transforms {
15 fn default() -> Self {
16 Self { steps: vec![] }
17 }
18}
19
20impl Transforms {
21 pub(crate) fn insert_operator(
22 &mut self,
23 index: Option<usize>,
24 operator_type: OperatorType,
25 step_invocation: StepInvocation,
26 ) -> Result<()> {
27 let index = match index {
28 Some(index) => index,
29 None => {
30 return Err(anyhow!(
31 "Must provide transforms index to insert operator into transforms block"
32 ));
33 }
34 };
35
36 if index > self.steps.len() {
37 return Err(anyhow!(
38 "cannot insert operator into transforms block, index is out of bounds, len = {}",
39 self.steps.len()
40 ));
41 }
42
43 let operator = match TransformOperator::new(operator_type, step_invocation) {
44 Some(operator) => operator,
45 None => {
46 return Err(anyhow!(
47 "OperatorType {:?} not supported for transforms operator",
48 operator_type
49 ));
50 }
51 };
52
53 self.steps.insert(index, operator);
54
55 Ok(())
56 }
57
58 pub(crate) fn delete_operator(&mut self, index: usize) -> Result<()> {
59 if index >= self.steps.len() {
60 return Err(anyhow!(
61 "cannot delete operator from transforms block, index is out of bounds, len = {}",
62 self.steps.len()
63 ));
64 }
65
66 self.steps.remove(index);
67
68 Ok(())
69 }
70
71 pub fn output_type(
73 &self,
74 mut transform_input_type: KVSchemaType,
75 ) -> Result<KVSchemaType, ValidationError> {
76 let failure_message = Err(ValidationError::new(
77 "could not get output type from invalid transforms",
78 ));
79
80 for step in &self.steps {
81 let step_invocation = step.inner();
82
83 let value = if step_invocation.requires_key_param() {
84 if step_invocation.inputs.is_empty() {
85 return failure_message;
86 }
87
88 step_invocation.inputs.get(1)
89 } else {
90 step_invocation.inputs.first()
91 };
92
93 if let Some(input_type) = value {
94 if input_type.type_.name.replace('-', "_")
95 != transform_input_type.value.name.replace('-', "_")
96 {
97 return failure_message;
98 }
99 } else {
100 return failure_message;
101 }
102
103 if let Some(output_type) = &step_invocation.output {
104 if !matches!(&step, TransformOperator::Filter(_)) {
105 output_type
106 .type_
107 .value_type()
108 .clone_into(&mut transform_input_type.value);
109
110 if let Some(key_type) = output_type.type_.key_type() {
111 transform_input_type.key = Some(key_type.clone());
112 }
113 }
114 }
115 }
116
117 Ok(transform_input_type)
118 }
119}
120
121pub fn validate_transforms_steps(
122 steps: &[TransformOperator],
123 types: &SdfTypesMap,
124 mut expected_type: KVSchemaType,
125 mut input_provider_name: String,
126) -> Result<(), ValidationFailure> {
127 let mut errors = ValidationFailure::new();
128
129 for step in steps {
130 if let Err(function_error) = step.validate(types) {
131 errors.concat(&function_error);
132 }
133 let step_invocation = step.inner();
134
135 let value = if step_invocation.requires_key_param() {
136 if let Some(input_key) = step_invocation.inputs.first() {
137 if let Some(ref key) = expected_type.key {
138 if input_key.type_.name.replace('-', "_") != key.name.replace('-', "_") {
139 errors.push_str(&format!(
140 "in `{}`, key type does not match expected key type. {} != {}",
141 step_invocation.uses, input_key.type_.name, key.name
142 ));
143 }
144 } else {
145 errors.push_str(
146 &format!(
147 "{} function requires a key, but none was found. Make sure that you define the right key in the topic configuration",
148 step_invocation.uses
149 )
150 );
151 }
152 } else {
153 errors.push_str(&format!(
154 "map type function `{}` should have at least 1 input type, found 0",
155 step.name()
156 ));
157 }
158 step_invocation.inputs.get(1)
159 } else {
160 step_invocation.inputs.first()
161 };
162
163 if let Some(input_type) = value {
164 if input_type.type_.name.replace('-', "_") != expected_type.value.name.replace('-', "_")
165 {
166 errors.push_str(&format!(
167 "Function `{}` input type was expected to match `{}` type provided by {}, but `{}` was found.",
168 step.name(),
169 expected_type.value.name,
170 input_provider_name,
171 input_type.type_.name
172 ));
173 }
174 }
175
176 if let Some(output_type) = &step.inner().output {
177 if !matches!(step, TransformOperator::Filter(_)) {
178 output_type
179 .type_
180 .value_type()
181 .clone_into(&mut expected_type.value);
182 if let Some(key_ty) = output_type.type_.key_type() {
183 expected_type.key = Some(key_ty.clone());
184 }
185 }
186
187 input_provider_name = format!("function `{}`", step.name());
188 }
189 }
190
191 if errors.any() {
192 Err(errors)
193 } else {
194 Ok(())
195 }
196}
197
198#[cfg(test)]
199mod test {
200 use super::validate_transforms_steps;
201 use crate::{
202 util::{sdf_types_map::SdfTypesMap, validation_error::ValidationError},
203 wit::{
204 dataflow::{TransformOperator, Transforms},
205 io::TypeRef,
206 metadata::{NamedParameter, Parameter, ParameterKind},
207 operator::StepInvocation,
208 },
209 };
210
211 #[test]
212 fn test_validate_transforms_steps_rejects_operators_without_input_type() {
213 let types = SdfTypesMap::default();
214 let steps = vec![TransformOperator::Map(StepInvocation {
215 uses: "my-function".to_string(),
216 inputs: vec![],
217 ..Default::default()
218 })];
219
220 let expected_input_type = (
221 Some(TypeRef {
222 name: "bytes".to_string(),
223 }),
224 TypeRef {
225 name: "string".to_string(),
226 },
227 )
228 .into();
229
230 let res = validate_transforms_steps(
231 &steps,
232 &types,
233 expected_input_type,
234 "topic `my-topic`".to_string(),
235 )
236 .expect_err("should error for invalid step");
237
238 assert!(res.errors.contains(&ValidationError::new(
239 "map type function `my-function` should have exactly 1 input type, found 0"
240 )));
241 }
242
243 #[test]
244 fn test_validate_transforms_steps_rejects_operators_when_first_input_type_does_not_match_passed_in_type(
245 ) {
246 let types = SdfTypesMap::default();
247 let steps = vec![TransformOperator::Map(StepInvocation {
248 uses: "my-function".to_string(),
249 inputs: vec![NamedParameter {
250 name: "input".to_string(),
251 type_: TypeRef {
252 name: "u8".to_string(),
253 },
254 optional: false,
255 kind: ParameterKind::Value,
256 }],
257 ..Default::default()
258 })];
259
260 let expected_input_type = (
261 Some(TypeRef {
262 name: "bytes".to_string(),
263 }),
264 TypeRef {
265 name: "string".to_string(),
266 },
267 )
268 .into();
269
270 let res = validate_transforms_steps(
271 &steps,
272 &types,
273 expected_input_type,
274 "Topic `my-topic`".to_string(),
275 )
276 .expect_err("should error for invalid step");
277
278 assert!(res.errors.contains(&ValidationError::new("Function `my-function` input type was expected to match `string` type provided by Topic `my-topic`, but `u8` was found.")));
279 }
280
281 #[test]
282 fn test_validate_transforms_steps_rejects_operators_when_input_type_does_not_match_last_output_type(
283 ) {
284 let types = SdfTypesMap::default();
285 let steps = vec![
286 TransformOperator::Map(StepInvocation {
287 uses: "my-function".to_string(),
288 inputs: vec![NamedParameter {
289 name: "input".to_string(),
290 type_: TypeRef {
291 name: "string".to_string(),
292 },
293
294 optional: false,
295 kind: ParameterKind::Value,
296 }],
297 output: Some(Parameter {
298 type_: TypeRef {
299 name: "u8".to_string(),
300 }
301 .into(),
302 ..Default::default()
303 }),
304 ..Default::default()
305 }),
306 TransformOperator::Map(StepInvocation {
307 uses: "my-other-function".to_string(),
308 inputs: vec![NamedParameter {
309 name: "input".to_string(),
310 type_: TypeRef {
311 name: "string".to_string(),
312 },
313 optional: false,
314 kind: ParameterKind::Value,
315 }],
316 ..Default::default()
317 }),
318 ];
319 let expected_input_type = (
320 Some(TypeRef {
321 name: "bytes".to_string(),
322 }),
323 TypeRef {
324 name: "string".to_string(),
325 },
326 )
327 .into();
328 let res = validate_transforms_steps(
329 &steps,
330 &types,
331 expected_input_type,
332 "Topic `my-topic`".to_string(),
333 )
334 .expect_err("should error for invalid input type");
335
336 assert!(
337 res.errors.contains(
338 &ValidationError::new("Function `my-other-function` input type was expected to match `u8` type provided by function `my-function`, but `string` was found.")
339 )
340 );
341 }
342
343 #[test]
344 fn test_validate_transforms_steps_rejects_operators_when_output_type_does_not_exist() {
345 let types = SdfTypesMap::default();
346 let steps = vec![TransformOperator::Map(StepInvocation {
347 uses: "my-function".to_string(),
348 inputs: vec![NamedParameter {
349 name: "input".to_string(),
350 type_: TypeRef {
351 name: "string".to_string(),
352 },
353 optional: false,
354 kind: ParameterKind::Value,
355 }],
356 output: Some(Parameter {
357 type_: TypeRef {
358 name: "foobar".to_string(),
359 }
360 .into(),
361 ..Default::default()
362 }),
363 ..Default::default()
364 })];
365
366 let expected_input_type = (
367 Some(TypeRef {
368 name: "bytes".to_string(),
369 }),
370 TypeRef {
371 name: "string".to_string(),
372 },
373 )
374 .into();
375
376 let res = validate_transforms_steps(
377 &steps,
378 &types,
379 expected_input_type,
380 "Topic `my-topic`".to_string(),
381 )
382 .expect_err("should error for invalid output type");
383
384 assert!(&res.errors.contains(&ValidationError::new(
385 "function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types"
386 )))
387 }
388
389 #[test]
390 fn test_validate_transforms_steps_rejects_functions_with_invalid_signatures() {
391 let types = SdfTypesMap::default();
392 let steps = vec![TransformOperator::Filter(StepInvocation {
393 uses: "my-function".to_string(),
394 output: Some(Parameter {
395 type_: TypeRef {
396 name: "u8".to_string(),
397 }
398 .into(),
399 optional: true,
400 }),
401 ..Default::default()
402 })];
403
404 let expected_input_type = (
405 Some(TypeRef {
406 name: "bytes".to_string(),
407 }),
408 TypeRef {
409 name: "string".to_string(),
410 },
411 )
412 .into();
413 let res = validate_transforms_steps(
414 &steps,
415 &types,
416 expected_input_type,
417 "Topic `my-topic`".to_string(),
418 )
419 .expect_err("should error for invalid signature for function");
420
421 assert!(res.errors.contains(&ValidationError::new(
422 "filter type function `my-function` requires an output type of `bool`, but found `u8`"
423 )));
424 }
425
426 #[test]
427 fn test_validate_transforms_steps_ignores_filter_when_validating_next_input_type() {
428 let types = SdfTypesMap::default();
429 let steps = vec![
430 TransformOperator::Filter(StepInvocation {
431 uses: "my-filter".to_string(),
432 inputs: vec![NamedParameter {
433 name: "input".to_string(),
434 type_: TypeRef {
435 name: "string".to_string(),
436 },
437 optional: false,
438 kind: ParameterKind::Value,
439 }],
440 output: Some(Parameter {
441 type_: TypeRef {
442 name: "bool".to_string(),
443 }
444 .into(),
445 ..Default::default()
446 }),
447 ..Default::default()
448 }),
449 TransformOperator::Map(StepInvocation {
450 uses: "my-map".to_string(),
451 inputs: vec![NamedParameter {
452 name: "input".to_string(),
453 type_: TypeRef {
454 name: "string".to_string(),
455 },
456 optional: false,
457 kind: ParameterKind::Value,
458 }],
459 output: Some(Parameter {
460 type_: TypeRef {
461 name: "string".to_string(),
462 }
463 .into(),
464 ..Default::default()
465 }),
466 ..Default::default()
467 }),
468 ];
469
470 let expected_input_type = (
471 Some(TypeRef {
472 name: "bytes".to_string(),
473 }),
474 TypeRef {
475 name: "string".to_string(),
476 },
477 )
478 .into();
479 validate_transforms_steps(
480 &steps,
481 &types,
482 expected_input_type,
483 "Topic `my-topic`".to_string(),
484 )
485 .expect("should pass for valid transforms");
486 }
487
488 #[test]
489 fn test_output_type_ignores_filter() {
490 let transforms = Transforms {
491 steps: vec![TransformOperator::Filter(StepInvocation {
492 uses: "my-filter".to_string(),
493 inputs: vec![NamedParameter {
494 name: "input".to_string(),
495 type_: TypeRef {
496 name: "string".to_string(),
497 },
498 optional: false,
499 kind: ParameterKind::Value,
500 }],
501 output: Some(Parameter {
502 type_: TypeRef {
503 name: "bool".to_string(),
504 }
505 .into(),
506 ..Default::default()
507 }),
508 ..Default::default()
509 })],
510 };
511 let expected_input_type = (
512 Some(TypeRef {
513 name: "bytes".to_string(),
514 }),
515 TypeRef {
516 name: "string".to_string(),
517 },
518 )
519 .into();
520 let res = transforms
521 .output_type(expected_input_type)
522 .expect("should pass for valid transforms");
523
524 assert_eq!(res.value.name, "string".to_string());
525 }
526
527 #[test]
528 fn test_delete_operator() {
529 let mut transforms = Transforms {
530 steps: vec![
531 TransformOperator::FilterMap(StepInvocation {
532 uses: "listing_map_job".to_string(),
533 ..Default::default()
534 }),
535 TransformOperator::Map(StepInvocation {
536 uses: "job_map_prospect".to_string(),
537 ..Default::default()
538 }),
539 ],
540 };
541
542 let res = transforms.delete_operator(0);
543
544 assert!(res.is_ok());
545 assert_eq!(transforms.steps.len(), 1);
546 }
547
548 #[test]
549 fn test_delete_operator_errors_on_index_out_of_bounds() {
550 let mut transforms = Transforms {
551 steps: vec![
552 TransformOperator::FilterMap(StepInvocation {
553 uses: "listing_map_job".to_string(),
554 ..Default::default()
555 }),
556 TransformOperator::Map(StepInvocation {
557 uses: "job_map_prospect".to_string(),
558 ..Default::default()
559 }),
560 ],
561 };
562
563 let res = transforms.delete_operator(2);
564
565 assert!(res.is_err());
566 assert_eq!(
567 res.unwrap_err().to_string(),
568 "cannot delete operator from transforms block, index is out of bounds, len = 2"
569 );
570 }
571}