1use super::split_at_transformer::SplitAtTransformer;
2use async_trait::async_trait;
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T> Transformer for SplitAtTransformer<T>
10where
11 T: std::fmt::Debug + Clone + Send + Sync + 'static,
12{
13 type InputPorts = (T,);
14 type OutputPorts = ((Vec<T>, Vec<T>),);
15
16 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
17 let index = self.index;
18 Box::pin(futures::stream::unfold(
19 (input, index),
20 |(mut input, index)| async move {
21 let mut items = Vec::new();
22 while let Some(item) = input.next().await {
23 items.push(item);
24 }
25 if items.is_empty() {
26 None
27 } else {
28 let safe_index = std::cmp::min(index, items.len());
30 let (first, second) = items.split_at(safe_index);
31 Some((
32 (first.to_vec(), second.to_vec()),
33 (
34 Box::pin(futures::stream::empty()) as Pin<Box<dyn Stream<Item = T> + Send>>,
35 index,
36 ),
37 ))
38 }
39 },
40 ))
41 }
42
43 fn set_config_impl(&mut self, config: TransformerConfig<T>) {
44 self.config = config;
45 }
46
47 fn get_config_impl(&self) -> &TransformerConfig<T> {
48 &self.config
49 }
50
51 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
52 &mut self.config
53 }
54
55 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
56 match self.config.error_strategy {
57 ErrorStrategy::Stop => ErrorAction::Stop,
58 ErrorStrategy::Skip => ErrorAction::Skip,
59 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
60 _ => ErrorAction::Stop,
61 }
62 }
63
64 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
65 ErrorContext {
66 timestamp: chrono::Utc::now(),
67 item,
68 component_name: self
69 .config
70 .name
71 .clone()
72 .unwrap_or_else(|| "split_at_transformer".to_string()),
73 component_type: std::any::type_name::<Self>().to_string(),
74 }
75 }
76
77 fn component_info(&self) -> ComponentInfo {
78 ComponentInfo {
79 name: self
80 .config
81 .name
82 .clone()
83 .unwrap_or_else(|| "split_at_transformer".to_string()),
84 type_name: std::any::type_name::<Self>().to_string(),
85 }
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use futures::stream;
93 use proptest::prelude::*;
94
95 #[tokio::test]
96 async fn test_split_at_basic() {
97 let mut transformer = SplitAtTransformer::new(2);
98 let input = stream::iter(vec![1, 2, 3, 4, 5]);
99 let boxed_input = Box::pin(input);
100
101 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
102
103 assert_eq!(result, vec![(vec![1, 2], vec![3, 4, 5])]);
104 }
105
106 #[tokio::test]
107 async fn test_split_at_empty_input() {
108 let mut transformer = SplitAtTransformer::new(2);
109 let input = stream::iter(Vec::<i32>::new());
110 let boxed_input = Box::pin(input);
111
112 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
113
114 assert_eq!(result, Vec::new());
115 }
116
117 #[tokio::test]
118 async fn test_error_handling_strategies() {
119 let transformer = SplitAtTransformer::new(2)
120 .with_error_strategy(ErrorStrategy::<i32>::Skip)
121 .with_name("test_transformer".to_string());
122
123 let config = transformer.get_config_impl();
124 assert_eq!(config.error_strategy, ErrorStrategy::<i32>::Skip);
125 assert_eq!(config.name, Some("test_transformer".to_string()));
126 }
127
128 #[test]
129 fn test_split_at_transformer_new() {
130 let transformer = SplitAtTransformer::<i32>::new(5);
131
132 assert_eq!(transformer.index, 5);
133 assert_eq!(transformer.config().name(), None);
134 assert!(matches!(
135 transformer.config().error_strategy(),
136 ErrorStrategy::Stop
137 ));
138 }
139
140 #[test]
141 fn test_split_at_transformer_with_error_strategy() {
142 let transformer = SplitAtTransformer::<i32>::new(3).with_error_strategy(ErrorStrategy::Skip);
143
144 assert_eq!(transformer.index, 3);
145 assert!(matches!(
146 transformer.config().error_strategy(),
147 ErrorStrategy::Skip
148 ));
149 }
150
151 #[test]
152 fn test_split_at_transformer_with_name() {
153 let transformer = SplitAtTransformer::<i32>::new(4).with_name("test_split_at".to_string());
154
155 assert_eq!(transformer.index, 4);
156 assert_eq!(
157 transformer.config().name(),
158 Some("test_split_at".to_string())
159 );
160 }
161
162 #[tokio::test]
163 async fn test_split_at_transformer_split_at_beginning() {
164 let mut transformer = SplitAtTransformer::<i32>::new(0);
165 let input = stream::iter(vec![1, 2, 3, 4, 5]);
166 let boxed_input = Box::pin(input);
167
168 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
169
170 assert_eq!(result, vec![(vec![], vec![1, 2, 3, 4, 5])]);
171 }
172
173 #[tokio::test]
174 async fn test_split_at_transformer_split_at_end() {
175 let mut transformer = SplitAtTransformer::<i32>::new(5);
176 let input = stream::iter(vec![1, 2, 3, 4, 5]);
177 let boxed_input = Box::pin(input);
178
179 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
180
181 assert_eq!(result, vec![(vec![1, 2, 3, 4, 5], vec![])]);
182 }
183
184 #[tokio::test]
185 async fn test_split_at_transformer_split_at_middle() {
186 let mut transformer = SplitAtTransformer::<i32>::new(3);
187 let input = stream::iter(vec![1, 2, 3, 4, 5]);
188 let boxed_input = Box::pin(input);
189
190 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
191
192 assert_eq!(result, vec![(vec![1, 2, 3], vec![4, 5])]);
193 }
194
195 #[tokio::test]
196 async fn test_split_at_transformer_split_beyond_length() {
197 let mut transformer = SplitAtTransformer::<i32>::new(10);
198 let input = stream::iter(vec![1, 2, 3]);
199 let boxed_input = Box::pin(input);
200
201 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
202
203 assert_eq!(result.len(), 1);
206 let (first, second) = &result[0];
207 assert_eq!(first, &vec![1, 2, 3]);
209 assert_eq!(second, &Vec::<i32>::new());
210 }
211
212 #[tokio::test]
213 async fn test_split_at_transformer_single_element() {
214 let mut transformer = SplitAtTransformer::<i32>::new(1);
215 let input = stream::iter(vec![42]);
216 let boxed_input = Box::pin(input);
217
218 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
219
220 assert_eq!(result, vec![(vec![42], vec![])]);
221 }
222
223 #[tokio::test]
224 async fn test_split_at_transformer_two_elements() {
225 let mut transformer = SplitAtTransformer::<i32>::new(1);
226 let input = stream::iter(vec![10, 20]);
227 let boxed_input = Box::pin(input);
228
229 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
230
231 assert_eq!(result, vec![(vec![10], vec![20])]);
232 }
233
234 #[tokio::test]
235 async fn test_split_at_transformer_strings() {
236 let mut transformer = SplitAtTransformer::<String>::new(2);
237 let input = stream::iter(vec![
238 "apple".to_string(),
239 "banana".to_string(),
240 "cherry".to_string(),
241 "date".to_string(),
242 ]);
243 let boxed_input = Box::pin(input);
244
245 let result: Vec<(Vec<String>, Vec<String>)> =
246 transformer.transform(boxed_input).collect().await;
247
248 assert_eq!(
249 result,
250 vec![(
251 vec!["apple".to_string(), "banana".to_string()],
252 vec!["cherry".to_string(), "date".to_string()]
253 )]
254 );
255 }
256
257 #[tokio::test]
258 async fn test_split_at_transformer_different_types() {
259 let mut transformer_u64 = SplitAtTransformer::<u64>::new(2);
261 let input_u64 = stream::iter(vec![1u64, 2u64, 3u64, 4u64]);
262 let boxed_input_u64 = Box::pin(input_u64);
263 let result_u64: Vec<(Vec<u64>, Vec<u64>)> =
264 transformer_u64.transform(boxed_input_u64).collect().await;
265 assert_eq!(result_u64, vec![(vec![1u64, 2u64], vec![3u64, 4u64])]);
266
267 let mut transformer_i64 = SplitAtTransformer::<i64>::new(1);
269 let input_i64 = stream::iter(vec![5i64, 6i64]);
270 let boxed_input_i64 = Box::pin(input_i64);
271 let result_i64: Vec<(Vec<i64>, Vec<i64>)> =
272 transformer_i64.transform(boxed_input_i64).collect().await;
273 assert_eq!(result_i64, vec![(vec![5i64], vec![6i64])]);
274 }
275
276 #[tokio::test]
277 async fn test_split_at_transformer_very_large_input() {
278 let mut transformer = SplitAtTransformer::<i32>::new(500);
279 let input = stream::iter((0..1000).collect::<Vec<i32>>());
280 let boxed_input = Box::pin(input);
281
282 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
283
284 assert_eq!(result.len(), 1);
286 let (first, second) = &result[0];
287 assert_eq!(first.len(), 500);
288 assert_eq!(second.len(), 500);
289 assert_eq!(first[0], 0);
290 assert_eq!(first[499], 499);
291 assert_eq!(second[0], 500);
292 assert_eq!(second[499], 999);
293 }
294
295 #[test]
296 fn test_split_at_transformer_error_handling() {
297 let transformer = SplitAtTransformer::<i32>::new(3);
298
299 let error = StreamError {
300 source: Box::new(std::io::Error::other("test error")),
301 context: ErrorContext {
302 timestamp: chrono::Utc::now(),
303 item: None,
304 component_name: "test".to_string(),
305 component_type: "SplitAtTransformer".to_string(),
306 },
307 component: ComponentInfo {
308 name: "test".to_string(),
309 type_name: "SplitAtTransformer".to_string(),
310 },
311 retries: 0,
312 };
313
314 assert!(matches!(
316 transformer.handle_error(&error),
317 ErrorAction::Stop
318 ));
319
320 let transformer = transformer.with_error_strategy(ErrorStrategy::Skip);
322 assert!(matches!(
323 transformer.handle_error(&error),
324 ErrorAction::Skip
325 ));
326
327 let transformer = transformer.with_error_strategy(ErrorStrategy::Retry(3));
329 assert!(matches!(
330 transformer.handle_error(&error),
331 ErrorAction::Retry
332 ));
333
334 let error = StreamError {
336 source: Box::new(std::io::Error::other("test error")),
337 context: ErrorContext {
338 timestamp: chrono::Utc::now(),
339 item: None,
340 component_name: "test".to_string(),
341 component_type: "SplitAtTransformer".to_string(),
342 },
343 component: ComponentInfo {
344 name: "test".to_string(),
345 type_name: "SplitAtTransformer".to_string(),
346 },
347 retries: 3,
348 };
349 assert!(matches!(
350 transformer.handle_error(&error),
351 ErrorAction::Stop
352 ));
353 }
354
355 #[test]
356 fn test_split_at_transformer_error_context_creation() {
357 let transformer = SplitAtTransformer::<i32>::new(2).with_name("test_split_at".to_string());
358
359 let context = transformer.create_error_context(Some(42));
360 assert_eq!(context.component_name, "test_split_at");
361 assert_eq!(
362 context.component_type,
363 std::any::type_name::<SplitAtTransformer<i32>>()
364 );
365 assert_eq!(context.item, Some(42));
366 }
367
368 #[test]
369 fn test_split_at_transformer_component_info() {
370 let transformer = SplitAtTransformer::<i32>::new(2).with_name("test_split_at".to_string());
371
372 let info = transformer.component_info();
373 assert_eq!(info.name, "test_split_at");
374 assert_eq!(
375 info.type_name,
376 std::any::type_name::<SplitAtTransformer<i32>>()
377 );
378 }
379
380 #[test]
381 fn test_split_at_transformer_default_name() {
382 let transformer = SplitAtTransformer::<i32>::new(2);
383
384 let info = transformer.component_info();
385 assert_eq!(info.name, "split_at_transformer");
386 }
387
388 #[test]
389 fn test_split_at_transformer_config_mut() {
390 let mut transformer = SplitAtTransformer::<i32>::new(2);
391 transformer.config_mut().name = Some("mutated_name".to_string());
392
393 assert_eq!(
394 transformer.config().name(),
395 Some("mutated_name".to_string())
396 );
397 }
398
399 #[tokio::test]
400 async fn test_split_at_transformer_reuse() {
401 let mut transformer = SplitAtTransformer::<i32>::new(2);
402
403 let input1 = stream::iter(vec![1, 2, 3, 4, 5]);
405 let boxed_input1 = Box::pin(input1);
406 let result1: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input1).collect().await;
407 assert_eq!(result1, vec![(vec![1, 2], vec![3, 4, 5])]);
408
409 let input2 = stream::iter(vec![6, 7, 8, 9, 10]);
411 let boxed_input2 = Box::pin(input2);
412 let result2: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input2).collect().await;
413 assert_eq!(result2, vec![(vec![6, 7], vec![8, 9, 10])]);
414 }
415
416 #[tokio::test]
417 async fn test_split_at_transformer_edge_cases() {
418 let mut transformer = SplitAtTransformer::<i32>::new(1);
419
420 let input = stream::iter(vec![-5, -3, -1, 0, 2]);
422 let boxed_input = Box::pin(input);
423 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
424 assert_eq!(result, vec![(vec![-5], vec![-3, -1, 0, 2])]);
425
426 let input = stream::iter(vec![5, -3, 0, -7, 2]);
428 let boxed_input = Box::pin(input);
429 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
430 assert_eq!(result, vec![(vec![5], vec![-3, 0, -7, 2])]);
431 }
432
433 #[tokio::test]
434 async fn test_split_at_transformer_deterministic_behavior() {
435 let mut transformer = SplitAtTransformer::<i32>::new(2);
436
437 let input = stream::iter(vec![1, 2, 3, 4, 5]);
439 let boxed_input = Box::pin(input);
440
441 let result1: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
442 assert_eq!(result1, vec![(vec![1, 2], vec![3, 4, 5])]);
443
444 let input = stream::iter(vec![1, 2, 3, 4, 5]);
446 let boxed_input = Box::pin(input);
447 let result2: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
448 assert_eq!(result2, vec![(vec![1, 2], vec![3, 4, 5])]);
449 }
450
451 proptest! {
453 #[test]
454 fn test_split_at_transformer_properties(
455 name in ".*"
456 ) {
457 let transformer = SplitAtTransformer::<i32>::new(5)
458 .with_name(name.clone());
459
460 assert_eq!(transformer.index, 5);
461 assert_eq!(transformer.config().name(), Some(name));
462 assert!(matches!(
463 transformer.config().error_strategy(),
464 ErrorStrategy::Stop
465 ));
466 }
467
468 #[test]
469 fn test_split_at_transformer_error_strategies(
470 retry_count in 0..10usize
471 ) {
472 let transformer = SplitAtTransformer::<i32>::new(3);
473
474 let error = StreamError {
475 source: Box::new(std::io::Error::other("property test error")),
476 context: ErrorContext {
477 timestamp: chrono::Utc::now(),
478 item: None,
479 component_name: "property_test".to_string(),
480 component_type: "SplitAtTransformer".to_string(),
481 },
482 component: ComponentInfo {
483 name: "property_test".to_string(),
484 type_name: "SplitAtTransformer".to_string(),
485 },
486 retries: retry_count,
487 };
488
489 let transformer_skip = transformer.clone().with_error_strategy(ErrorStrategy::Skip);
491 let transformer_retry = transformer.clone().with_error_strategy(ErrorStrategy::Retry(5));
492
493 assert!(matches!(
494 transformer_skip.handle_error(&error),
495 ErrorAction::Skip
496 ));
497
498 if retry_count < 5 {
499 assert!(matches!(
500 transformer_retry.handle_error(&error),
501 ErrorAction::Retry
502 ));
503 } else {
504 assert!(matches!(
505 transformer_retry.handle_error(&error),
506 ErrorAction::Stop
507 ));
508 }
509 }
510
511 #[test]
512 fn test_split_at_transformer_config_persistence(
513 name in ".*"
514 ) {
515 let transformer = SplitAtTransformer::<i32>::new(4)
516 .with_name(name.clone())
517 .with_error_strategy(ErrorStrategy::Skip);
518
519 assert_eq!(transformer.index, 4);
520 assert_eq!(transformer.config().name(), Some(name));
521 assert!(matches!(
522 transformer.config().error_strategy(),
523 ErrorStrategy::Skip
524 ));
525 }
526
527 #[test]
528 fn test_split_at_transformer_splitting_properties(
529 _values in prop::collection::vec(0..100i32, 0..50)
530 ) {
531 let transformer = SplitAtTransformer::<i32>::new(2);
533 assert_eq!(transformer.index, 2);
534 assert_eq!(transformer.config().name(), None);
535 assert!(matches!(
536 transformer.config().error_strategy(),
537 ErrorStrategy::Stop
538 ));
539 }
540 }
541
542 #[tokio::test]
543 async fn test_split_at_transformer_stream_processing() {
544 let mut transformer = SplitAtTransformer::<i32>::new(3);
545
546 let input = stream::iter(vec![9, 3, 7, 1, 5, 2, 8, 4, 6, 0]);
548 let boxed_input = Box::pin(input);
549 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
550
551 assert_eq!(result.len(), 1);
553 let (first, second) = &result[0];
554 assert_eq!(first, &vec![9, 3, 7]);
555 assert_eq!(second, &vec![1, 5, 2, 8, 4, 6, 0]);
556 }
557
558 #[tokio::test]
559 async fn test_split_at_transformer_nested_types() {
560 type NestedVecResult = Vec<(Vec<Vec<i32>>, Vec<Vec<i32>>)>;
561
562 let mut transformer = SplitAtTransformer::<Vec<i32>>::new(1);
563
564 let input = stream::iter(vec![vec![1, 2], vec![3, 4], vec![5, 6]]);
566 let boxed_input = Box::pin(input);
567 let result: NestedVecResult = transformer.transform(boxed_input).collect().await;
568
569 assert_eq!(result.len(), 1);
571 let (first, second) = &result[0];
572 assert_eq!(first, &vec![vec![1, 2]]);
573 assert_eq!(second, &vec![vec![3, 4], vec![5, 6]]);
574 }
575
576 #[tokio::test]
577 async fn test_split_at_transformer_split_exact_input_length() {
578 let mut transformer = SplitAtTransformer::<i32>::new(5);
579 let input = stream::iter(vec![5, 3, 1, 4, 2]);
580 let boxed_input = Box::pin(input);
581
582 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
583
584 assert_eq!(result.len(), 1);
586 let (first, second) = &result[0];
587 assert_eq!(first.len(), 5);
588 assert_eq!(second.len(), 0);
589 assert_eq!(first, &vec![5, 3, 1, 4, 2]);
590 }
591
592 #[tokio::test]
593 async fn test_split_at_transformer_split_partial() {
594 let mut transformer = SplitAtTransformer::<i32>::new(3);
595 let input = stream::iter(vec![5, 3, 1]);
596 let boxed_input = Box::pin(input);
597
598 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
599
600 assert_eq!(result.len(), 1);
602 let (first, second) = &result[0];
603 assert_eq!(first, &vec![5, 3, 1]);
604 assert_eq!(second, &Vec::<i32>::new());
605 }
606
607 #[tokio::test]
608 async fn test_split_at_transformer_split_with_zeros() {
609 let mut transformer = SplitAtTransformer::<i32>::new(2);
610 let input = stream::iter(vec![0, 5, 0, 3, 0, 1]);
611 let boxed_input = Box::pin(input);
612
613 let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
614
615 assert_eq!(result.len(), 1);
617 let (first, second) = &result[0];
618 assert_eq!(first, &vec![0, 5]);
619 assert_eq!(second, &vec![0, 3, 0, 1]);
620 }
621
622 #[test]
623 fn test_split_at_transformer_set_config_impl() {
624 let mut transformer = SplitAtTransformer::<i32>::new(5);
625 let new_config = TransformerConfig::default()
626 .with_name("test_split_at".to_string())
627 .with_error_strategy(ErrorStrategy::Skip);
628 transformer.set_config_impl(new_config);
629 assert_eq!(transformer.config.name, Some("test_split_at".to_string()));
630 assert!(matches!(
631 transformer.config.error_strategy,
632 ErrorStrategy::Skip
633 ));
634 }
635
636 #[test]
637 fn test_split_at_transformer_get_config_impl() {
638 let transformer = SplitAtTransformer::<i32>::new(5).with_name("test".to_string());
639 let config = transformer.get_config_impl();
640 assert_eq!(config.name, Some("test".to_string()));
641 }
642
643 #[test]
644 fn test_split_at_transformer_get_config_mut_impl() {
645 let mut transformer = SplitAtTransformer::<i32>::new(5);
646 let config = transformer.get_config_mut_impl();
647 config.name = Some("mutated".to_string());
648 assert_eq!(transformer.config.name, Some("mutated".to_string()));
649 }
650
651 #[test]
652 fn test_split_at_transformer_handle_error_stop() {
653 let transformer = SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Stop);
654 let error = StreamError {
655 source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
656 context: ErrorContext {
657 timestamp: chrono::Utc::now(),
658 item: None,
659 component_name: "test".to_string(),
660 component_type: "test".to_string(),
661 },
662 component: ComponentInfo {
663 name: "test".to_string(),
664 type_name: "test".to_string(),
665 },
666 retries: 0,
667 };
668 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
669 }
670
671 #[test]
672 fn test_split_at_transformer_handle_error_skip() {
673 let transformer = SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Skip);
674 let error = StreamError {
675 source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
676 context: ErrorContext {
677 timestamp: chrono::Utc::now(),
678 item: None,
679 component_name: "test".to_string(),
680 component_type: "test".to_string(),
681 },
682 component: ComponentInfo {
683 name: "test".to_string(),
684 type_name: "test".to_string(),
685 },
686 retries: 0,
687 };
688 assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
689 }
690
691 #[test]
692 fn test_split_at_transformer_handle_error_retry() {
693 let transformer =
694 SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Retry(3));
695 let error = StreamError {
696 source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
697 context: ErrorContext {
698 timestamp: chrono::Utc::now(),
699 item: None,
700 component_name: "test".to_string(),
701 component_type: "test".to_string(),
702 },
703 component: ComponentInfo {
704 name: "test".to_string(),
705 type_name: "test".to_string(),
706 },
707 retries: 1,
708 };
709 assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
710 }
711
712 #[test]
713 fn test_split_at_transformer_handle_error_retry_exhausted() {
714 let transformer =
715 SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Retry(3));
716 let error = StreamError {
717 source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
718 context: ErrorContext {
719 timestamp: chrono::Utc::now(),
720 item: None,
721 component_name: "test".to_string(),
722 component_type: "test".to_string(),
723 },
724 component: ComponentInfo {
725 name: "test".to_string(),
726 type_name: "test".to_string(),
727 },
728 retries: 3,
729 };
730 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
731 }
732
733 #[test]
734 fn test_split_at_transformer_create_error_context() {
735 let transformer = SplitAtTransformer::<i32>::new(5).with_name("test_split_at".to_string());
736 let context = transformer.create_error_context(Some(42));
737 assert_eq!(context.component_name, "test_split_at");
738 assert_eq!(context.item, Some(42));
739 assert!(context.component_type.contains("SplitAtTransformer"));
740 }
741
742 #[test]
743 fn test_split_at_transformer_create_error_context_no_item() {
744 let transformer = SplitAtTransformer::<i32>::new(5);
745 let context = transformer.create_error_context(None);
746 assert_eq!(context.component_name, "split_at_transformer");
747 assert_eq!(context.item, None);
748 }
749
750 #[test]
751 fn test_split_at_transformer_component_info_default() {
752 let transformer = SplitAtTransformer::<i32>::new(5);
753 let info = transformer.component_info();
754 assert_eq!(info.name, "split_at_transformer");
755 }
756}