streamweave_transformers/zip/
transformer.rs

1use super::zip_transformer::ZipTransformer;
2use async_stream;
3use async_trait::async_trait;
4use futures::StreamExt;
5use streamweave::Transformer;
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for ZipTransformer<T> {
10  type InputPorts = (Vec<T>,);
11  type OutputPorts = (Vec<T>,);
12
13  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14    Box::pin(async_stream::stream! {
15      let mut input = input;
16      let mut buffers: Vec<Vec<T>> = Vec::new();
17
18      while let Some(items) = input.next().await {
19        buffers.push(items);
20      }
21
22      // Early return if no input
23      if buffers.is_empty() {
24        return;
25      }
26
27      // Get the length of the longest vector
28      let max_len = buffers.iter().map(|v| v.len()).max().unwrap_or(0);
29
30      // Yield transposed vectors
31      for i in 0..max_len {
32        let mut result = Vec::new();
33        for buffer in &buffers {
34          if let Some(item) = buffer.get(i) {
35            result.push(item.clone());
36          }
37        }
38        if !result.is_empty() {
39          yield result;
40        }
41      }
42    })
43  }
44
45  fn set_config_impl(&mut self, config: streamweave::TransformerConfig<Vec<T>>) {
46    self.config = config;
47  }
48
49  fn get_config_impl(&self) -> &streamweave::TransformerConfig<Vec<T>> {
50    &self.config
51  }
52
53  fn get_config_mut_impl(&mut self) -> &mut streamweave::TransformerConfig<Vec<T>> {
54    &mut self.config
55  }
56
57  fn handle_error(&self, error: &StreamError<Vec<T>>) -> ErrorAction {
58    match self.config.error_strategy {
59      ErrorStrategy::Stop => ErrorAction::Stop,
60      ErrorStrategy::Skip => ErrorAction::Skip,
61      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
62      _ => ErrorAction::Stop,
63    }
64  }
65
66  fn create_error_context(&self, item: Option<Vec<T>>) -> ErrorContext<Vec<T>> {
67    ErrorContext {
68      timestamp: chrono::Utc::now(),
69      item,
70      component_name: self
71        .config
72        .name
73        .clone()
74        .unwrap_or_else(|| "zip_transformer".to_string()),
75      component_type: std::any::type_name::<Self>().to_string(),
76    }
77  }
78
79  fn component_info(&self) -> ComponentInfo {
80    ComponentInfo {
81      name: self
82        .config
83        .name
84        .clone()
85        .unwrap_or_else(|| "zip_transformer".to_string()),
86      type_name: std::any::type_name::<Self>().to_string(),
87    }
88  }
89}
90
91#[cfg(test)]
92mod tests {
93  use super::*;
94  use futures::stream;
95  use proptest::prelude::*;
96
97  #[tokio::test]
98  async fn test_zip_basic() {
99    let mut transformer = ZipTransformer::new();
100    let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
101    let boxed_input = Box::pin(input);
102
103    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
104
105    assert_eq!(result, vec![vec![1, 4, 7], vec![2, 5, 8], vec![3, 6, 9]]);
106  }
107
108  #[tokio::test]
109  async fn test_zip_empty_input() {
110    let mut transformer = ZipTransformer::new();
111    let input = stream::iter(Vec::<Vec<i32>>::new());
112    let boxed_input = Box::pin(input);
113
114    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
115
116    assert_eq!(result, Vec::<Vec<i32>>::new());
117  }
118
119  #[tokio::test]
120  async fn test_error_handling_strategies() {
121    let transformer = ZipTransformer::new()
122      .with_error_strategy(ErrorStrategy::<Vec<i32>>::Skip)
123      .with_name("test_transformer".to_string());
124
125    let config = transformer.get_config_impl();
126    assert_eq!(config.error_strategy, ErrorStrategy::<Vec<i32>>::Skip);
127    assert_eq!(config.name, Some("test_transformer".to_string()));
128  }
129
130  #[test]
131  fn test_zip_transformer_new() {
132    let transformer = ZipTransformer::<i32>::new();
133
134    assert_eq!(transformer.config().name(), None);
135    assert!(matches!(
136      transformer.config().error_strategy(),
137      ErrorStrategy::Stop
138    ));
139  }
140
141  #[test]
142  fn test_zip_transformer_with_error_strategy() {
143    let transformer = ZipTransformer::<i32>::new().with_error_strategy(ErrorStrategy::Skip);
144
145    assert!(matches!(
146      transformer.config().error_strategy(),
147      ErrorStrategy::Skip
148    ));
149  }
150
151  #[test]
152  fn test_zip_transformer_with_name() {
153    let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
154
155    assert_eq!(transformer.config().name(), Some("test_zip".to_string()));
156  }
157
158  #[tokio::test]
159  async fn test_zip_transformer_different_lengths() {
160    let mut transformer = ZipTransformer::<i32>::new();
161    let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5], vec![7, 8, 9, 10]]);
162    let boxed_input = Box::pin(input);
163
164    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
165
166    // The transformer processes all vectors and yields transposed results
167    // It doesn't stop at the shortest length - it continues and yields partial results
168    assert_eq!(
169      result,
170      vec![
171        vec![1, 4, 7], // First elements from each vector
172        vec![2, 5, 8], // Second elements from each vector
173        vec![3, 9],    // Third elements (only from vectors that have them)
174        vec![10]       // Fourth element (only from the longest vector)
175      ]
176    );
177  }
178
179  #[tokio::test]
180  async fn test_zip_transformer_single_vector() {
181    let mut transformer = ZipTransformer::<i32>::new();
182    let input = stream::iter(vec![vec![1, 2, 3]]);
183    let boxed_input = Box::pin(input);
184
185    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
186
187    assert_eq!(result, vec![vec![1], vec![2], vec![3]]);
188  }
189
190  #[tokio::test]
191  async fn test_zip_transformer_empty_vectors() {
192    let mut transformer = ZipTransformer::<i32>::new();
193    let input = stream::iter(vec![vec![], vec![], vec![]]);
194    let boxed_input = Box::pin(input);
195
196    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
197
198    assert_eq!(result, Vec::<Vec<i32>>::new());
199  }
200
201  #[tokio::test]
202  async fn test_zip_transformer_mixed_empty_vectors() {
203    let mut transformer = ZipTransformer::<i32>::new();
204    let input = stream::iter(vec![vec![1, 2], vec![], vec![3, 4]]);
205    let boxed_input = Box::pin(input);
206
207    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
208
209    // The transformer processes all vectors and yields transposed results
210    // Even with empty vectors, it still processes the non-empty ones
211    assert_eq!(
212      result,
213      vec![
214        vec![1, 3], // First elements from non-empty vectors
215        vec![2, 4]  // Second elements from non-empty vectors
216      ]
217    );
218  }
219
220  #[tokio::test]
221  async fn test_zip_transformer_strings() {
222    let mut transformer = ZipTransformer::<String>::new();
223    let input = stream::iter(vec![
224      vec!["a".to_string(), "b".to_string()],
225      vec!["c".to_string(), "d".to_string()],
226      vec!["e".to_string(), "f".to_string()],
227    ]);
228    let boxed_input = Box::pin(input);
229
230    let result: Vec<Vec<String>> = transformer.transform(boxed_input).collect().await;
231
232    assert_eq!(
233      result,
234      vec![
235        vec!["a".to_string(), "c".to_string(), "e".to_string()],
236        vec!["b".to_string(), "d".to_string(), "f".to_string()]
237      ]
238    );
239  }
240
241  #[tokio::test]
242  async fn test_zip_transformer_different_types() {
243    // Test with u64
244    let mut transformer_u64 = ZipTransformer::<u64>::new();
245    let input_u64 = stream::iter(vec![vec![1u64, 2u64], vec![3u64, 4u64]]);
246    let boxed_input_u64 = Box::pin(input_u64);
247    let result_u64: Vec<Vec<u64>> = transformer_u64.transform(boxed_input_u64).collect().await;
248    assert_eq!(result_u64, vec![vec![1, 3], vec![2, 4]]);
249
250    // Test with f64
251    let mut transformer_f64 = ZipTransformer::<f64>::new();
252    let input_f64 = stream::iter(vec![vec![1.5f64, 2.5f64], vec![3.5f64, 4.5f64]]);
253    let boxed_input_f64 = Box::pin(input_f64);
254    let result_f64: Vec<Vec<f64>> = transformer_f64.transform(boxed_input_f64).collect().await;
255    assert_eq!(result_f64, vec![vec![1.5, 3.5], vec![2.5, 4.5]]);
256  }
257
258  #[tokio::test]
259  async fn test_zip_transformer_very_long_vectors() {
260    let mut transformer = ZipTransformer::<i32>::new();
261    let input = stream::iter(vec![
262      (0..1000).collect::<Vec<i32>>(),
263      (1000..2000).collect::<Vec<i32>>(),
264      (2000..3000).collect::<Vec<i32>>(),
265    ]);
266    let boxed_input = Box::pin(input);
267
268    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
269
270    // Should have 1000 zipped vectors
271    assert_eq!(result.len(), 1000);
272
273    // Check first and last elements
274    assert_eq!(result[0], vec![0, 1000, 2000]);
275    assert_eq!(result[999], vec![999, 1999, 2999]);
276  }
277
278  #[test]
279  fn test_zip_transformer_error_handling() {
280    let transformer = ZipTransformer::<i32>::new();
281
282    let error = StreamError {
283      source: Box::new(std::io::Error::other("test error")),
284      context: ErrorContext {
285        timestamp: chrono::Utc::now(),
286        item: None,
287        component_name: "test".to_string(),
288        component_type: "ZipTransformer".to_string(),
289      },
290      component: ComponentInfo {
291        name: "test".to_string(),
292        type_name: "ZipTransformer".to_string(),
293      },
294      retries: 0,
295    };
296
297    // Test default error strategy (Stop)
298    assert!(matches!(
299      transformer.handle_error(&error),
300      ErrorAction::Stop
301    ));
302
303    // Test Skip strategy
304    let transformer = transformer.with_error_strategy(ErrorStrategy::Skip);
305    assert!(matches!(
306      transformer.handle_error(&error),
307      ErrorAction::Skip
308    ));
309
310    // Test Retry strategy
311    let transformer = transformer.with_error_strategy(ErrorStrategy::Retry(3));
312    assert!(matches!(
313      transformer.handle_error(&error),
314      ErrorAction::Retry
315    ));
316
317    // Test Retry strategy exhausted
318    let error = StreamError {
319      source: Box::new(std::io::Error::other("test error")),
320      context: ErrorContext {
321        timestamp: chrono::Utc::now(),
322        item: None,
323        component_name: "test".to_string(),
324        component_type: "ZipTransformer".to_string(),
325      },
326      component: ComponentInfo {
327        name: "test".to_string(),
328        type_name: "ZipTransformer".to_string(),
329      },
330      retries: 3,
331    };
332    assert!(matches!(
333      transformer.handle_error(&error),
334      ErrorAction::Stop
335    ));
336  }
337
338  #[test]
339  fn test_zip_transformer_error_context_creation() {
340    let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
341
342    let context = transformer.create_error_context(Some(vec![1, 2, 3]));
343    assert_eq!(context.component_name, "test_zip");
344    assert_eq!(
345      context.component_type,
346      std::any::type_name::<ZipTransformer<i32>>()
347    );
348    assert_eq!(context.item, Some(vec![1, 2, 3]));
349  }
350
351  #[test]
352  fn test_zip_transformer_component_info() {
353    let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
354
355    let info = transformer.component_info();
356    assert_eq!(info.name, "test_zip");
357    assert_eq!(info.type_name, std::any::type_name::<ZipTransformer<i32>>());
358  }
359
360  #[test]
361  fn test_zip_transformer_default_name() {
362    let transformer = ZipTransformer::<i32>::new();
363
364    let info = transformer.component_info();
365    assert_eq!(info.name, "zip_transformer");
366  }
367
368  #[test]
369  fn test_zip_transformer_config_mut() {
370    let mut transformer = ZipTransformer::<i32>::new();
371    transformer.config_mut().name = Some("mutated_name".to_string());
372
373    assert_eq!(
374      transformer.config().name(),
375      Some("mutated_name".to_string())
376    );
377  }
378
379  #[tokio::test]
380  async fn test_zip_transformer_reuse() {
381    let mut transformer = ZipTransformer::<i32>::new();
382
383    // First use
384    let input1 = stream::iter(vec![vec![1, 2], vec![3, 4]]);
385    let boxed_input1 = Box::pin(input1);
386    let result1: Vec<Vec<i32>> = transformer.transform(boxed_input1).collect().await;
387    assert_eq!(result1, vec![vec![1, 3], vec![2, 4]]);
388
389    // Second use
390    let input2 = stream::iter(vec![vec![5, 6], vec![7, 8]]);
391    let boxed_input2 = Box::pin(input2);
392    let result2: Vec<Vec<i32>> = transformer.transform(boxed_input2).collect().await;
393    assert_eq!(result2, vec![vec![5, 7], vec![6, 8]]);
394  }
395
396  #[tokio::test]
397  async fn test_zip_transformer_edge_cases() {
398    let mut transformer = ZipTransformer::<i32>::new();
399
400    // Test with one very long vector and one short vector
401    let input = stream::iter(vec![(0..100).collect::<Vec<i32>>(), vec![100, 101]]);
402    let boxed_input = Box::pin(input);
403
404    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
405
406    // Should have 100 zipped vectors (the length of the longest vector)
407    assert_eq!(result.len(), 100);
408    assert_eq!(result[0], vec![0, 100]);
409    assert_eq!(result[1], vec![1, 101]);
410    // After index 1, the second vector has no more elements
411    assert_eq!(result[2], vec![2]);
412    assert_eq!(result[99], vec![99]);
413  }
414
415  #[tokio::test]
416  async fn test_zip_transformer_deterministic_ordering() {
417    let mut transformer = ZipTransformer::<i32>::new();
418
419    // Test that zipping maintains order
420    let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
421    let boxed_input = Box::pin(input);
422
423    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
424
425    // Should maintain the order of vectors in the input
426    assert_eq!(
427      result,
428      vec![
429        vec![1, 4, 7], // First elements from each vector
430        vec![2, 5, 8], // Second elements from each vector
431        vec![3, 6, 9]  // Third elements from each vector
432      ]
433    );
434  }
435
436  // Property-based tests using proptest
437  proptest! {
438    #[test]
439    fn test_zip_transformer_properties(
440      name in ".*"
441    ) {
442      let transformer = ZipTransformer::<i32>::new()
443        .with_name(name.clone());
444
445      assert_eq!(transformer.config().name(), Some(name));
446      assert!(matches!(
447        transformer.config().error_strategy(),
448        ErrorStrategy::Stop
449      ));
450    }
451
452    #[test]
453    fn test_zip_transformer_error_strategies(
454      retry_count in 0..10usize
455    ) {
456      let transformer = ZipTransformer::<i32>::new();
457
458      let error = StreamError {
459        source: Box::new(std::io::Error::other("property test error")),
460        context: ErrorContext {
461          timestamp: chrono::Utc::now(),
462          item: None,
463          component_name: "property_test".to_string(),
464          component_type: "ZipTransformer".to_string(),
465        },
466        component: ComponentInfo {
467          name: "property_test".to_string(),
468          type_name: "ZipTransformer".to_string(),
469        },
470        retries: retry_count,
471      };
472
473      // Test different error strategies
474      let transformer_skip = transformer.clone().with_error_strategy(ErrorStrategy::Skip);
475      let transformer_retry = transformer.clone().with_error_strategy(ErrorStrategy::Retry(5));
476
477      assert!(matches!(
478        transformer_skip.handle_error(&error),
479        ErrorAction::Skip
480      ));
481
482      if retry_count < 5 {
483        assert!(matches!(
484          transformer_retry.handle_error(&error),
485          ErrorAction::Retry
486        ));
487      } else {
488        assert!(matches!(
489          transformer_retry.handle_error(&error),
490          ErrorAction::Stop
491        ));
492      }
493    }
494
495    #[test]
496    fn test_zip_transformer_config_persistence(
497      name in ".*"
498    ) {
499      let transformer = ZipTransformer::<i32>::new()
500        .with_name(name.clone())
501        .with_error_strategy(ErrorStrategy::Skip);
502
503      assert_eq!(transformer.config().name(), Some(name));
504      assert!(matches!(
505        transformer.config().error_strategy(),
506        ErrorStrategy::Skip
507      ));
508    }
509
510    #[test]
511    fn test_zip_transformer_vector_processing(
512      _vector_count in 0..10usize,
513      _elements_per_vector in 0..10usize
514    ) {
515      // Test that the transformer can handle this data structure
516      let transformer = ZipTransformer::<i32>::new();
517      assert_eq!(transformer.config().name(), None);
518      assert!(matches!(
519        transformer.config().error_strategy(),
520        ErrorStrategy::Stop
521      ));
522    }
523  }
524
525  #[tokio::test]
526  async fn test_zip_transformer_stream_processing() {
527    let mut transformer = ZipTransformer::<i32>::new();
528
529    // Process a stream with various vector patterns
530    let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5], vec![6, 7, 8, 9]]);
531    let boxed_input = Box::pin(input);
532    let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
533
534    // Should have 4 zipped vectors (the length of the longest vector)
535    assert_eq!(result.len(), 4);
536    assert_eq!(result[0], vec![1, 4, 6]);
537    assert_eq!(result[1], vec![2, 5, 7]);
538    assert_eq!(result[2], vec![3, 8]); // Third vector has no third element
539    assert_eq!(result[3], vec![9]); // Only the longest vector has a fourth element
540  }
541
542  #[tokio::test]
543  async fn test_zip_transformer_nested_vectors() {
544    let mut transformer = ZipTransformer::<Vec<i32>>::new();
545
546    // Test zipping vectors of vectors
547    let input = stream::iter(vec![
548      vec![vec![1, 2], vec![3, 4]],
549      vec![vec![5, 6], vec![7, 8]],
550      vec![vec![9, 10], vec![11, 12]],
551    ]);
552    let boxed_input = Box::pin(input);
553    let result: Vec<Vec<Vec<i32>>> = transformer.transform(boxed_input).collect().await;
554
555    assert_eq!(
556      result,
557      vec![
558        vec![vec![1, 2], vec![5, 6], vec![9, 10]],
559        vec![vec![3, 4], vec![7, 8], vec![11, 12]]
560      ]
561    );
562  }
563}