1use super::zip_transformer::ZipTransformer;
2use async_stream;
3use async_trait::async_trait;
4use futures::StreamExt;
5use streamweave::Transformer;
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Transformer for ZipTransformer<T> {
10 type InputPorts = (Vec<T>,);
11 type OutputPorts = (Vec<T>,);
12
13 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
14 Box::pin(async_stream::stream! {
15 let mut input = input;
16 let mut buffers: Vec<Vec<T>> = Vec::new();
17
18 while let Some(items) = input.next().await {
19 buffers.push(items);
20 }
21
22 if buffers.is_empty() {
24 return;
25 }
26
27 let max_len = buffers.iter().map(|v| v.len()).max().unwrap_or(0);
29
30 for i in 0..max_len {
32 let mut result = Vec::new();
33 for buffer in &buffers {
34 if let Some(item) = buffer.get(i) {
35 result.push(item.clone());
36 }
37 }
38 if !result.is_empty() {
39 yield result;
40 }
41 }
42 })
43 }
44
45 fn set_config_impl(&mut self, config: streamweave::TransformerConfig<Vec<T>>) {
46 self.config = config;
47 }
48
49 fn get_config_impl(&self) -> &streamweave::TransformerConfig<Vec<T>> {
50 &self.config
51 }
52
53 fn get_config_mut_impl(&mut self) -> &mut streamweave::TransformerConfig<Vec<T>> {
54 &mut self.config
55 }
56
57 fn handle_error(&self, error: &StreamError<Vec<T>>) -> ErrorAction {
58 match self.config.error_strategy {
59 ErrorStrategy::Stop => ErrorAction::Stop,
60 ErrorStrategy::Skip => ErrorAction::Skip,
61 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
62 _ => ErrorAction::Stop,
63 }
64 }
65
66 fn create_error_context(&self, item: Option<Vec<T>>) -> ErrorContext<Vec<T>> {
67 ErrorContext {
68 timestamp: chrono::Utc::now(),
69 item,
70 component_name: self
71 .config
72 .name
73 .clone()
74 .unwrap_or_else(|| "zip_transformer".to_string()),
75 component_type: std::any::type_name::<Self>().to_string(),
76 }
77 }
78
79 fn component_info(&self) -> ComponentInfo {
80 ComponentInfo {
81 name: self
82 .config
83 .name
84 .clone()
85 .unwrap_or_else(|| "zip_transformer".to_string()),
86 type_name: std::any::type_name::<Self>().to_string(),
87 }
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use futures::stream;
95 use proptest::prelude::*;
96
97 #[tokio::test]
98 async fn test_zip_basic() {
99 let mut transformer = ZipTransformer::new();
100 let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
101 let boxed_input = Box::pin(input);
102
103 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
104
105 assert_eq!(result, vec![vec![1, 4, 7], vec![2, 5, 8], vec![3, 6, 9]]);
106 }
107
108 #[tokio::test]
109 async fn test_zip_empty_input() {
110 let mut transformer = ZipTransformer::new();
111 let input = stream::iter(Vec::<Vec<i32>>::new());
112 let boxed_input = Box::pin(input);
113
114 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
115
116 assert_eq!(result, Vec::<Vec<i32>>::new());
117 }
118
119 #[tokio::test]
120 async fn test_error_handling_strategies() {
121 let transformer = ZipTransformer::new()
122 .with_error_strategy(ErrorStrategy::<Vec<i32>>::Skip)
123 .with_name("test_transformer".to_string());
124
125 let config = transformer.get_config_impl();
126 assert_eq!(config.error_strategy, ErrorStrategy::<Vec<i32>>::Skip);
127 assert_eq!(config.name, Some("test_transformer".to_string()));
128 }
129
130 #[test]
131 fn test_zip_transformer_new() {
132 let transformer = ZipTransformer::<i32>::new();
133
134 assert_eq!(transformer.config().name(), None);
135 assert!(matches!(
136 transformer.config().error_strategy(),
137 ErrorStrategy::Stop
138 ));
139 }
140
141 #[test]
142 fn test_zip_transformer_with_error_strategy() {
143 let transformer = ZipTransformer::<i32>::new().with_error_strategy(ErrorStrategy::Skip);
144
145 assert!(matches!(
146 transformer.config().error_strategy(),
147 ErrorStrategy::Skip
148 ));
149 }
150
151 #[test]
152 fn test_zip_transformer_with_name() {
153 let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
154
155 assert_eq!(transformer.config().name(), Some("test_zip".to_string()));
156 }
157
158 #[tokio::test]
159 async fn test_zip_transformer_different_lengths() {
160 let mut transformer = ZipTransformer::<i32>::new();
161 let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5], vec![7, 8, 9, 10]]);
162 let boxed_input = Box::pin(input);
163
164 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
165
166 assert_eq!(
169 result,
170 vec![
171 vec![1, 4, 7], vec![2, 5, 8], vec![3, 9], vec![10] ]
176 );
177 }
178
179 #[tokio::test]
180 async fn test_zip_transformer_single_vector() {
181 let mut transformer = ZipTransformer::<i32>::new();
182 let input = stream::iter(vec![vec![1, 2, 3]]);
183 let boxed_input = Box::pin(input);
184
185 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
186
187 assert_eq!(result, vec![vec![1], vec![2], vec![3]]);
188 }
189
190 #[tokio::test]
191 async fn test_zip_transformer_empty_vectors() {
192 let mut transformer = ZipTransformer::<i32>::new();
193 let input = stream::iter(vec![vec![], vec![], vec![]]);
194 let boxed_input = Box::pin(input);
195
196 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
197
198 assert_eq!(result, Vec::<Vec<i32>>::new());
199 }
200
201 #[tokio::test]
202 async fn test_zip_transformer_mixed_empty_vectors() {
203 let mut transformer = ZipTransformer::<i32>::new();
204 let input = stream::iter(vec![vec![1, 2], vec![], vec![3, 4]]);
205 let boxed_input = Box::pin(input);
206
207 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
208
209 assert_eq!(
212 result,
213 vec![
214 vec![1, 3], vec![2, 4] ]
217 );
218 }
219
220 #[tokio::test]
221 async fn test_zip_transformer_strings() {
222 let mut transformer = ZipTransformer::<String>::new();
223 let input = stream::iter(vec![
224 vec!["a".to_string(), "b".to_string()],
225 vec!["c".to_string(), "d".to_string()],
226 vec!["e".to_string(), "f".to_string()],
227 ]);
228 let boxed_input = Box::pin(input);
229
230 let result: Vec<Vec<String>> = transformer.transform(boxed_input).collect().await;
231
232 assert_eq!(
233 result,
234 vec![
235 vec!["a".to_string(), "c".to_string(), "e".to_string()],
236 vec!["b".to_string(), "d".to_string(), "f".to_string()]
237 ]
238 );
239 }
240
241 #[tokio::test]
242 async fn test_zip_transformer_different_types() {
243 let mut transformer_u64 = ZipTransformer::<u64>::new();
245 let input_u64 = stream::iter(vec![vec![1u64, 2u64], vec![3u64, 4u64]]);
246 let boxed_input_u64 = Box::pin(input_u64);
247 let result_u64: Vec<Vec<u64>> = transformer_u64.transform(boxed_input_u64).collect().await;
248 assert_eq!(result_u64, vec![vec![1, 3], vec![2, 4]]);
249
250 let mut transformer_f64 = ZipTransformer::<f64>::new();
252 let input_f64 = stream::iter(vec![vec![1.5f64, 2.5f64], vec![3.5f64, 4.5f64]]);
253 let boxed_input_f64 = Box::pin(input_f64);
254 let result_f64: Vec<Vec<f64>> = transformer_f64.transform(boxed_input_f64).collect().await;
255 assert_eq!(result_f64, vec![vec![1.5, 3.5], vec![2.5, 4.5]]);
256 }
257
258 #[tokio::test]
259 async fn test_zip_transformer_very_long_vectors() {
260 let mut transformer = ZipTransformer::<i32>::new();
261 let input = stream::iter(vec![
262 (0..1000).collect::<Vec<i32>>(),
263 (1000..2000).collect::<Vec<i32>>(),
264 (2000..3000).collect::<Vec<i32>>(),
265 ]);
266 let boxed_input = Box::pin(input);
267
268 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
269
270 assert_eq!(result.len(), 1000);
272
273 assert_eq!(result[0], vec![0, 1000, 2000]);
275 assert_eq!(result[999], vec![999, 1999, 2999]);
276 }
277
278 #[test]
279 fn test_zip_transformer_error_handling() {
280 let transformer = ZipTransformer::<i32>::new();
281
282 let error = StreamError {
283 source: Box::new(std::io::Error::other("test error")),
284 context: ErrorContext {
285 timestamp: chrono::Utc::now(),
286 item: None,
287 component_name: "test".to_string(),
288 component_type: "ZipTransformer".to_string(),
289 },
290 component: ComponentInfo {
291 name: "test".to_string(),
292 type_name: "ZipTransformer".to_string(),
293 },
294 retries: 0,
295 };
296
297 assert!(matches!(
299 transformer.handle_error(&error),
300 ErrorAction::Stop
301 ));
302
303 let transformer = transformer.with_error_strategy(ErrorStrategy::Skip);
305 assert!(matches!(
306 transformer.handle_error(&error),
307 ErrorAction::Skip
308 ));
309
310 let transformer = transformer.with_error_strategy(ErrorStrategy::Retry(3));
312 assert!(matches!(
313 transformer.handle_error(&error),
314 ErrorAction::Retry
315 ));
316
317 let error = StreamError {
319 source: Box::new(std::io::Error::other("test error")),
320 context: ErrorContext {
321 timestamp: chrono::Utc::now(),
322 item: None,
323 component_name: "test".to_string(),
324 component_type: "ZipTransformer".to_string(),
325 },
326 component: ComponentInfo {
327 name: "test".to_string(),
328 type_name: "ZipTransformer".to_string(),
329 },
330 retries: 3,
331 };
332 assert!(matches!(
333 transformer.handle_error(&error),
334 ErrorAction::Stop
335 ));
336 }
337
338 #[test]
339 fn test_zip_transformer_error_context_creation() {
340 let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
341
342 let context = transformer.create_error_context(Some(vec![1, 2, 3]));
343 assert_eq!(context.component_name, "test_zip");
344 assert_eq!(
345 context.component_type,
346 std::any::type_name::<ZipTransformer<i32>>()
347 );
348 assert_eq!(context.item, Some(vec![1, 2, 3]));
349 }
350
351 #[test]
352 fn test_zip_transformer_component_info() {
353 let transformer = ZipTransformer::<i32>::new().with_name("test_zip".to_string());
354
355 let info = transformer.component_info();
356 assert_eq!(info.name, "test_zip");
357 assert_eq!(info.type_name, std::any::type_name::<ZipTransformer<i32>>());
358 }
359
360 #[test]
361 fn test_zip_transformer_default_name() {
362 let transformer = ZipTransformer::<i32>::new();
363
364 let info = transformer.component_info();
365 assert_eq!(info.name, "zip_transformer");
366 }
367
368 #[test]
369 fn test_zip_transformer_config_mut() {
370 let mut transformer = ZipTransformer::<i32>::new();
371 transformer.config_mut().name = Some("mutated_name".to_string());
372
373 assert_eq!(
374 transformer.config().name(),
375 Some("mutated_name".to_string())
376 );
377 }
378
379 #[tokio::test]
380 async fn test_zip_transformer_reuse() {
381 let mut transformer = ZipTransformer::<i32>::new();
382
383 let input1 = stream::iter(vec![vec![1, 2], vec![3, 4]]);
385 let boxed_input1 = Box::pin(input1);
386 let result1: Vec<Vec<i32>> = transformer.transform(boxed_input1).collect().await;
387 assert_eq!(result1, vec![vec![1, 3], vec![2, 4]]);
388
389 let input2 = stream::iter(vec![vec![5, 6], vec![7, 8]]);
391 let boxed_input2 = Box::pin(input2);
392 let result2: Vec<Vec<i32>> = transformer.transform(boxed_input2).collect().await;
393 assert_eq!(result2, vec![vec![5, 7], vec![6, 8]]);
394 }
395
396 #[tokio::test]
397 async fn test_zip_transformer_edge_cases() {
398 let mut transformer = ZipTransformer::<i32>::new();
399
400 let input = stream::iter(vec![(0..100).collect::<Vec<i32>>(), vec![100, 101]]);
402 let boxed_input = Box::pin(input);
403
404 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
405
406 assert_eq!(result.len(), 100);
408 assert_eq!(result[0], vec![0, 100]);
409 assert_eq!(result[1], vec![1, 101]);
410 assert_eq!(result[2], vec![2]);
412 assert_eq!(result[99], vec![99]);
413 }
414
415 #[tokio::test]
416 async fn test_zip_transformer_deterministic_ordering() {
417 let mut transformer = ZipTransformer::<i32>::new();
418
419 let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
421 let boxed_input = Box::pin(input);
422
423 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
424
425 assert_eq!(
427 result,
428 vec![
429 vec![1, 4, 7], vec![2, 5, 8], vec![3, 6, 9] ]
433 );
434 }
435
436 proptest! {
438 #[test]
439 fn test_zip_transformer_properties(
440 name in ".*"
441 ) {
442 let transformer = ZipTransformer::<i32>::new()
443 .with_name(name.clone());
444
445 assert_eq!(transformer.config().name(), Some(name));
446 assert!(matches!(
447 transformer.config().error_strategy(),
448 ErrorStrategy::Stop
449 ));
450 }
451
452 #[test]
453 fn test_zip_transformer_error_strategies(
454 retry_count in 0..10usize
455 ) {
456 let transformer = ZipTransformer::<i32>::new();
457
458 let error = StreamError {
459 source: Box::new(std::io::Error::other("property test error")),
460 context: ErrorContext {
461 timestamp: chrono::Utc::now(),
462 item: None,
463 component_name: "property_test".to_string(),
464 component_type: "ZipTransformer".to_string(),
465 },
466 component: ComponentInfo {
467 name: "property_test".to_string(),
468 type_name: "ZipTransformer".to_string(),
469 },
470 retries: retry_count,
471 };
472
473 let transformer_skip = transformer.clone().with_error_strategy(ErrorStrategy::Skip);
475 let transformer_retry = transformer.clone().with_error_strategy(ErrorStrategy::Retry(5));
476
477 assert!(matches!(
478 transformer_skip.handle_error(&error),
479 ErrorAction::Skip
480 ));
481
482 if retry_count < 5 {
483 assert!(matches!(
484 transformer_retry.handle_error(&error),
485 ErrorAction::Retry
486 ));
487 } else {
488 assert!(matches!(
489 transformer_retry.handle_error(&error),
490 ErrorAction::Stop
491 ));
492 }
493 }
494
495 #[test]
496 fn test_zip_transformer_config_persistence(
497 name in ".*"
498 ) {
499 let transformer = ZipTransformer::<i32>::new()
500 .with_name(name.clone())
501 .with_error_strategy(ErrorStrategy::Skip);
502
503 assert_eq!(transformer.config().name(), Some(name));
504 assert!(matches!(
505 transformer.config().error_strategy(),
506 ErrorStrategy::Skip
507 ));
508 }
509
510 #[test]
511 fn test_zip_transformer_vector_processing(
512 _vector_count in 0..10usize,
513 _elements_per_vector in 0..10usize
514 ) {
515 let transformer = ZipTransformer::<i32>::new();
517 assert_eq!(transformer.config().name(), None);
518 assert!(matches!(
519 transformer.config().error_strategy(),
520 ErrorStrategy::Stop
521 ));
522 }
523 }
524
525 #[tokio::test]
526 async fn test_zip_transformer_stream_processing() {
527 let mut transformer = ZipTransformer::<i32>::new();
528
529 let input = stream::iter(vec![vec![1, 2, 3], vec![4, 5], vec![6, 7, 8, 9]]);
531 let boxed_input = Box::pin(input);
532 let result: Vec<Vec<i32>> = transformer.transform(boxed_input).collect().await;
533
534 assert_eq!(result.len(), 4);
536 assert_eq!(result[0], vec![1, 4, 6]);
537 assert_eq!(result[1], vec![2, 5, 7]);
538 assert_eq!(result[2], vec![3, 8]); assert_eq!(result[3], vec![9]); }
541
542 #[tokio::test]
543 async fn test_zip_transformer_nested_vectors() {
544 let mut transformer = ZipTransformer::<Vec<i32>>::new();
545
546 let input = stream::iter(vec![
548 vec![vec![1, 2], vec![3, 4]],
549 vec![vec![5, 6], vec![7, 8]],
550 vec![vec![9, 10], vec![11, 12]],
551 ]);
552 let boxed_input = Box::pin(input);
553 let result: Vec<Vec<Vec<i32>>> = transformer.transform(boxed_input).collect().await;
554
555 assert_eq!(
556 result,
557 vec![
558 vec![vec![1, 2], vec![5, 6], vec![9, 10]],
559 vec![vec![3, 4], vec![7, 8], vec![11, 12]]
560 ]
561 );
562 }
563}