streamweave_transformers/split-at/
transformer.rs

1use super::split_at_transformer::SplitAtTransformer;
2use async_trait::async_trait;
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T> Transformer for SplitAtTransformer<T>
10where
11  T: std::fmt::Debug + Clone + Send + Sync + 'static,
12{
13  type InputPorts = (T,);
14  type OutputPorts = ((Vec<T>, Vec<T>),);
15
16  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
17    let index = self.index;
18    Box::pin(futures::stream::unfold(
19      (input, index),
20      |(mut input, index)| async move {
21        let mut items = Vec::new();
22        while let Some(item) = input.next().await {
23          items.push(item);
24        }
25        if items.is_empty() {
26          None
27        } else {
28          // Ensure index is within bounds to prevent panic
29          let safe_index = std::cmp::min(index, items.len());
30          let (first, second) = items.split_at(safe_index);
31          Some((
32            (first.to_vec(), second.to_vec()),
33            (
34              Box::pin(futures::stream::empty()) as Pin<Box<dyn Stream<Item = T> + Send>>,
35              index,
36            ),
37          ))
38        }
39      },
40    ))
41  }
42
43  fn set_config_impl(&mut self, config: TransformerConfig<T>) {
44    self.config = config;
45  }
46
47  fn get_config_impl(&self) -> &TransformerConfig<T> {
48    &self.config
49  }
50
51  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
52    &mut self.config
53  }
54
55  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
56    match self.config.error_strategy {
57      ErrorStrategy::Stop => ErrorAction::Stop,
58      ErrorStrategy::Skip => ErrorAction::Skip,
59      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
60      _ => ErrorAction::Stop,
61    }
62  }
63
64  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
65    ErrorContext {
66      timestamp: chrono::Utc::now(),
67      item,
68      component_name: self
69        .config
70        .name
71        .clone()
72        .unwrap_or_else(|| "split_at_transformer".to_string()),
73      component_type: std::any::type_name::<Self>().to_string(),
74    }
75  }
76
77  fn component_info(&self) -> ComponentInfo {
78    ComponentInfo {
79      name: self
80        .config
81        .name
82        .clone()
83        .unwrap_or_else(|| "split_at_transformer".to_string()),
84      type_name: std::any::type_name::<Self>().to_string(),
85    }
86  }
87}
88
89#[cfg(test)]
90mod tests {
91  use super::*;
92  use futures::stream;
93  use proptest::prelude::*;
94
95  #[tokio::test]
96  async fn test_split_at_basic() {
97    let mut transformer = SplitAtTransformer::new(2);
98    let input = stream::iter(vec![1, 2, 3, 4, 5]);
99    let boxed_input = Box::pin(input);
100
101    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
102
103    assert_eq!(result, vec![(vec![1, 2], vec![3, 4, 5])]);
104  }
105
106  #[tokio::test]
107  async fn test_split_at_empty_input() {
108    let mut transformer = SplitAtTransformer::new(2);
109    let input = stream::iter(Vec::<i32>::new());
110    let boxed_input = Box::pin(input);
111
112    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
113
114    assert_eq!(result, Vec::new());
115  }
116
117  #[tokio::test]
118  async fn test_error_handling_strategies() {
119    let transformer = SplitAtTransformer::new(2)
120      .with_error_strategy(ErrorStrategy::<i32>::Skip)
121      .with_name("test_transformer".to_string());
122
123    let config = transformer.get_config_impl();
124    assert_eq!(config.error_strategy, ErrorStrategy::<i32>::Skip);
125    assert_eq!(config.name, Some("test_transformer".to_string()));
126  }
127
128  #[test]
129  fn test_split_at_transformer_new() {
130    let transformer = SplitAtTransformer::<i32>::new(5);
131
132    assert_eq!(transformer.index, 5);
133    assert_eq!(transformer.config().name(), None);
134    assert!(matches!(
135      transformer.config().error_strategy(),
136      ErrorStrategy::Stop
137    ));
138  }
139
140  #[test]
141  fn test_split_at_transformer_with_error_strategy() {
142    let transformer = SplitAtTransformer::<i32>::new(3).with_error_strategy(ErrorStrategy::Skip);
143
144    assert_eq!(transformer.index, 3);
145    assert!(matches!(
146      transformer.config().error_strategy(),
147      ErrorStrategy::Skip
148    ));
149  }
150
151  #[test]
152  fn test_split_at_transformer_with_name() {
153    let transformer = SplitAtTransformer::<i32>::new(4).with_name("test_split_at".to_string());
154
155    assert_eq!(transformer.index, 4);
156    assert_eq!(
157      transformer.config().name(),
158      Some("test_split_at".to_string())
159    );
160  }
161
162  #[tokio::test]
163  async fn test_split_at_transformer_split_at_beginning() {
164    let mut transformer = SplitAtTransformer::<i32>::new(0);
165    let input = stream::iter(vec![1, 2, 3, 4, 5]);
166    let boxed_input = Box::pin(input);
167
168    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
169
170    assert_eq!(result, vec![(vec![], vec![1, 2, 3, 4, 5])]);
171  }
172
173  #[tokio::test]
174  async fn test_split_at_transformer_split_at_end() {
175    let mut transformer = SplitAtTransformer::<i32>::new(5);
176    let input = stream::iter(vec![1, 2, 3, 4, 5]);
177    let boxed_input = Box::pin(input);
178
179    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
180
181    assert_eq!(result, vec![(vec![1, 2, 3, 4, 5], vec![])]);
182  }
183
184  #[tokio::test]
185  async fn test_split_at_transformer_split_at_middle() {
186    let mut transformer = SplitAtTransformer::<i32>::new(3);
187    let input = stream::iter(vec![1, 2, 3, 4, 5]);
188    let boxed_input = Box::pin(input);
189
190    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
191
192    assert_eq!(result, vec![(vec![1, 2, 3], vec![4, 5])]);
193  }
194
195  #[tokio::test]
196  async fn test_split_at_transformer_split_beyond_length() {
197    let mut transformer = SplitAtTransformer::<i32>::new(10);
198    let input = stream::iter(vec![1, 2, 3]);
199    let boxed_input = Box::pin(input);
200
201    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
202
203    // When index > length, split_at panics, so we need to handle this case
204    // The transformer should handle this gracefully by not calling split_at with invalid indices
205    assert_eq!(result.len(), 1);
206    let (first, second) = &result[0];
207    // Since index 10 > length 3, the first part should contain all elements and second should be empty
208    assert_eq!(first, &vec![1, 2, 3]);
209    assert_eq!(second, &Vec::<i32>::new());
210  }
211
212  #[tokio::test]
213  async fn test_split_at_transformer_single_element() {
214    let mut transformer = SplitAtTransformer::<i32>::new(1);
215    let input = stream::iter(vec![42]);
216    let boxed_input = Box::pin(input);
217
218    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
219
220    assert_eq!(result, vec![(vec![42], vec![])]);
221  }
222
223  #[tokio::test]
224  async fn test_split_at_transformer_two_elements() {
225    let mut transformer = SplitAtTransformer::<i32>::new(1);
226    let input = stream::iter(vec![10, 20]);
227    let boxed_input = Box::pin(input);
228
229    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
230
231    assert_eq!(result, vec![(vec![10], vec![20])]);
232  }
233
234  #[tokio::test]
235  async fn test_split_at_transformer_strings() {
236    let mut transformer = SplitAtTransformer::<String>::new(2);
237    let input = stream::iter(vec![
238      "apple".to_string(),
239      "banana".to_string(),
240      "cherry".to_string(),
241      "date".to_string(),
242    ]);
243    let boxed_input = Box::pin(input);
244
245    let result: Vec<(Vec<String>, Vec<String>)> =
246      transformer.transform(boxed_input).collect().await;
247
248    assert_eq!(
249      result,
250      vec![(
251        vec!["apple".to_string(), "banana".to_string()],
252        vec!["cherry".to_string(), "date".to_string()]
253      )]
254    );
255  }
256
257  #[tokio::test]
258  async fn test_split_at_transformer_different_types() {
259    // Test with u64
260    let mut transformer_u64 = SplitAtTransformer::<u64>::new(2);
261    let input_u64 = stream::iter(vec![1u64, 2u64, 3u64, 4u64]);
262    let boxed_input_u64 = Box::pin(input_u64);
263    let result_u64: Vec<(Vec<u64>, Vec<u64>)> =
264      transformer_u64.transform(boxed_input_u64).collect().await;
265    assert_eq!(result_u64, vec![(vec![1u64, 2u64], vec![3u64, 4u64])]);
266
267    // Test with i64
268    let mut transformer_i64 = SplitAtTransformer::<i64>::new(1);
269    let input_i64 = stream::iter(vec![5i64, 6i64]);
270    let boxed_input_i64 = Box::pin(input_i64);
271    let result_i64: Vec<(Vec<i64>, Vec<i64>)> =
272      transformer_i64.transform(boxed_input_i64).collect().await;
273    assert_eq!(result_i64, vec![(vec![5i64], vec![6i64])]);
274  }
275
276  #[tokio::test]
277  async fn test_split_at_transformer_very_large_input() {
278    let mut transformer = SplitAtTransformer::<i32>::new(500);
279    let input = stream::iter((0..1000).collect::<Vec<i32>>());
280    let boxed_input = Box::pin(input);
281
282    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
283
284    // Should have one result with split at index 500
285    assert_eq!(result.len(), 1);
286    let (first, second) = &result[0];
287    assert_eq!(first.len(), 500);
288    assert_eq!(second.len(), 500);
289    assert_eq!(first[0], 0);
290    assert_eq!(first[499], 499);
291    assert_eq!(second[0], 500);
292    assert_eq!(second[499], 999);
293  }
294
295  #[test]
296  fn test_split_at_transformer_error_handling() {
297    let transformer = SplitAtTransformer::<i32>::new(3);
298
299    let error = StreamError {
300      source: Box::new(std::io::Error::other("test error")),
301      context: ErrorContext {
302        timestamp: chrono::Utc::now(),
303        item: None,
304        component_name: "test".to_string(),
305        component_type: "SplitAtTransformer".to_string(),
306      },
307      component: ComponentInfo {
308        name: "test".to_string(),
309        type_name: "SplitAtTransformer".to_string(),
310      },
311      retries: 0,
312    };
313
314    // Test default error strategy (Stop)
315    assert!(matches!(
316      transformer.handle_error(&error),
317      ErrorAction::Stop
318    ));
319
320    // Test Skip strategy
321    let transformer = transformer.with_error_strategy(ErrorStrategy::Skip);
322    assert!(matches!(
323      transformer.handle_error(&error),
324      ErrorAction::Skip
325    ));
326
327    // Test Retry strategy
328    let transformer = transformer.with_error_strategy(ErrorStrategy::Retry(3));
329    assert!(matches!(
330      transformer.handle_error(&error),
331      ErrorAction::Retry
332    ));
333
334    // Test Retry strategy exhausted
335    let error = StreamError {
336      source: Box::new(std::io::Error::other("test error")),
337      context: ErrorContext {
338        timestamp: chrono::Utc::now(),
339        item: None,
340        component_name: "test".to_string(),
341        component_type: "SplitAtTransformer".to_string(),
342      },
343      component: ComponentInfo {
344        name: "test".to_string(),
345        type_name: "SplitAtTransformer".to_string(),
346      },
347      retries: 3,
348    };
349    assert!(matches!(
350      transformer.handle_error(&error),
351      ErrorAction::Stop
352    ));
353  }
354
355  #[test]
356  fn test_split_at_transformer_error_context_creation() {
357    let transformer = SplitAtTransformer::<i32>::new(2).with_name("test_split_at".to_string());
358
359    let context = transformer.create_error_context(Some(42));
360    assert_eq!(context.component_name, "test_split_at");
361    assert_eq!(
362      context.component_type,
363      std::any::type_name::<SplitAtTransformer<i32>>()
364    );
365    assert_eq!(context.item, Some(42));
366  }
367
368  #[test]
369  fn test_split_at_transformer_component_info() {
370    let transformer = SplitAtTransformer::<i32>::new(2).with_name("test_split_at".to_string());
371
372    let info = transformer.component_info();
373    assert_eq!(info.name, "test_split_at");
374    assert_eq!(
375      info.type_name,
376      std::any::type_name::<SplitAtTransformer<i32>>()
377    );
378  }
379
380  #[test]
381  fn test_split_at_transformer_default_name() {
382    let transformer = SplitAtTransformer::<i32>::new(2);
383
384    let info = transformer.component_info();
385    assert_eq!(info.name, "split_at_transformer");
386  }
387
388  #[test]
389  fn test_split_at_transformer_config_mut() {
390    let mut transformer = SplitAtTransformer::<i32>::new(2);
391    transformer.config_mut().name = Some("mutated_name".to_string());
392
393    assert_eq!(
394      transformer.config().name(),
395      Some("mutated_name".to_string())
396    );
397  }
398
399  #[tokio::test]
400  async fn test_split_at_transformer_reuse() {
401    let mut transformer = SplitAtTransformer::<i32>::new(2);
402
403    // First use
404    let input1 = stream::iter(vec![1, 2, 3, 4, 5]);
405    let boxed_input1 = Box::pin(input1);
406    let result1: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input1).collect().await;
407    assert_eq!(result1, vec![(vec![1, 2], vec![3, 4, 5])]);
408
409    // Second use
410    let input2 = stream::iter(vec![6, 7, 8, 9, 10]);
411    let boxed_input2 = Box::pin(input2);
412    let result2: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input2).collect().await;
413    assert_eq!(result2, vec![(vec![6, 7], vec![8, 9, 10])]);
414  }
415
416  #[tokio::test]
417  async fn test_split_at_transformer_edge_cases() {
418    let mut transformer = SplitAtTransformer::<i32>::new(1);
419
420    // Test with negative numbers (though index is usize, so this won't happen in practice)
421    let input = stream::iter(vec![-5, -3, -1, 0, 2]);
422    let boxed_input = Box::pin(input);
423    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
424    assert_eq!(result, vec![(vec![-5], vec![-3, -1, 0, 2])]);
425
426    // Test with mixed positive and negative
427    let input = stream::iter(vec![5, -3, 0, -7, 2]);
428    let boxed_input = Box::pin(input);
429    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
430    assert_eq!(result, vec![(vec![5], vec![-3, 0, -7, 2])]);
431  }
432
433  #[tokio::test]
434  async fn test_split_at_transformer_deterministic_behavior() {
435    let mut transformer = SplitAtTransformer::<i32>::new(2);
436
437    // Test that splitting is deterministic
438    let input = stream::iter(vec![1, 2, 3, 4, 5]);
439    let boxed_input = Box::pin(input);
440
441    let result1: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
442    assert_eq!(result1, vec![(vec![1, 2], vec![3, 4, 5])]);
443
444    // Test again to ensure consistency
445    let input = stream::iter(vec![1, 2, 3, 4, 5]);
446    let boxed_input = Box::pin(input);
447    let result2: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
448    assert_eq!(result2, vec![(vec![1, 2], vec![3, 4, 5])]);
449  }
450
451  // Property-based tests using proptest
452  proptest! {
453    #[test]
454    fn test_split_at_transformer_properties(
455      name in ".*"
456    ) {
457      let transformer = SplitAtTransformer::<i32>::new(5)
458        .with_name(name.clone());
459
460      assert_eq!(transformer.index, 5);
461      assert_eq!(transformer.config().name(), Some(name));
462      assert!(matches!(
463        transformer.config().error_strategy(),
464        ErrorStrategy::Stop
465      ));
466    }
467
468    #[test]
469    fn test_split_at_transformer_error_strategies(
470      retry_count in 0..10usize
471    ) {
472      let transformer = SplitAtTransformer::<i32>::new(3);
473
474      let error = StreamError {
475        source: Box::new(std::io::Error::other("property test error")),
476        context: ErrorContext {
477          timestamp: chrono::Utc::now(),
478          item: None,
479          component_name: "property_test".to_string(),
480          component_type: "SplitAtTransformer".to_string(),
481        },
482        component: ComponentInfo {
483          name: "property_test".to_string(),
484          type_name: "SplitAtTransformer".to_string(),
485        },
486        retries: retry_count,
487      };
488
489      // Test different error strategies
490      let transformer_skip = transformer.clone().with_error_strategy(ErrorStrategy::Skip);
491      let transformer_retry = transformer.clone().with_error_strategy(ErrorStrategy::Retry(5));
492
493      assert!(matches!(
494        transformer_skip.handle_error(&error),
495        ErrorAction::Skip
496      ));
497
498      if retry_count < 5 {
499        assert!(matches!(
500          transformer_retry.handle_error(&error),
501          ErrorAction::Retry
502        ));
503      } else {
504        assert!(matches!(
505          transformer_retry.handle_error(&error),
506          ErrorAction::Stop
507        ));
508      }
509    }
510
511    #[test]
512    fn test_split_at_transformer_config_persistence(
513      name in ".*"
514    ) {
515      let transformer = SplitAtTransformer::<i32>::new(4)
516        .with_name(name.clone())
517        .with_error_strategy(ErrorStrategy::Skip);
518
519      assert_eq!(transformer.index, 4);
520      assert_eq!(transformer.config().name(), Some(name));
521      assert!(matches!(
522        transformer.config().error_strategy(),
523        ErrorStrategy::Skip
524      ));
525    }
526
527    #[test]
528    fn test_split_at_transformer_splitting_properties(
529      _values in prop::collection::vec(0..100i32, 0..50)
530    ) {
531      // Test that the transformer can handle this data structure
532      let transformer = SplitAtTransformer::<i32>::new(2);
533      assert_eq!(transformer.index, 2);
534      assert_eq!(transformer.config().name(), None);
535      assert!(matches!(
536        transformer.config().error_strategy(),
537        ErrorStrategy::Stop
538      ));
539    }
540  }
541
542  #[tokio::test]
543  async fn test_split_at_transformer_stream_processing() {
544    let mut transformer = SplitAtTransformer::<i32>::new(3);
545
546    // Process a stream with various patterns
547    let input = stream::iter(vec![9, 3, 7, 1, 5, 2, 8, 4, 6, 0]);
548    let boxed_input = Box::pin(input);
549    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
550
551    // Should split at index 3
552    assert_eq!(result.len(), 1);
553    let (first, second) = &result[0];
554    assert_eq!(first, &vec![9, 3, 7]);
555    assert_eq!(second, &vec![1, 5, 2, 8, 4, 6, 0]);
556  }
557
558  #[tokio::test]
559  async fn test_split_at_transformer_nested_types() {
560    type NestedVecResult = Vec<(Vec<Vec<i32>>, Vec<Vec<i32>>)>;
561
562    let mut transformer = SplitAtTransformer::<Vec<i32>>::new(1);
563
564    // Test splitting vectors
565    let input = stream::iter(vec![vec![1, 2], vec![3, 4], vec![5, 6]]);
566    let boxed_input = Box::pin(input);
567    let result: NestedVecResult = transformer.transform(boxed_input).collect().await;
568
569    // Should split at index 1
570    assert_eq!(result.len(), 1);
571    let (first, second) = &result[0];
572    assert_eq!(first, &vec![vec![1, 2]]);
573    assert_eq!(second, &vec![vec![3, 4], vec![5, 6]]);
574  }
575
576  #[tokio::test]
577  async fn test_split_at_transformer_split_exact_input_length() {
578    let mut transformer = SplitAtTransformer::<i32>::new(5);
579    let input = stream::iter(vec![5, 3, 1, 4, 2]);
580    let boxed_input = Box::pin(input);
581
582    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
583
584    // Should have same length and be split at index 5
585    assert_eq!(result.len(), 1);
586    let (first, second) = &result[0];
587    assert_eq!(first.len(), 5);
588    assert_eq!(second.len(), 0);
589    assert_eq!(first, &vec![5, 3, 1, 4, 2]);
590  }
591
592  #[tokio::test]
593  async fn test_split_at_transformer_split_partial() {
594    let mut transformer = SplitAtTransformer::<i32>::new(3);
595    let input = stream::iter(vec![5, 3, 1]);
596    let boxed_input = Box::pin(input);
597
598    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
599
600    // Should be split at index 3
601    assert_eq!(result.len(), 1);
602    let (first, second) = &result[0];
603    assert_eq!(first, &vec![5, 3, 1]);
604    assert_eq!(second, &Vec::<i32>::new());
605  }
606
607  #[tokio::test]
608  async fn test_split_at_transformer_split_with_zeros() {
609    let mut transformer = SplitAtTransformer::<i32>::new(2);
610    let input = stream::iter(vec![0, 5, 0, 3, 0, 1]);
611    let boxed_input = Box::pin(input);
612
613    let result: Vec<(Vec<i32>, Vec<i32>)> = transformer.transform(boxed_input).collect().await;
614
615    // Should handle zeros correctly
616    assert_eq!(result.len(), 1);
617    let (first, second) = &result[0];
618    assert_eq!(first, &vec![0, 5]);
619    assert_eq!(second, &vec![0, 3, 0, 1]);
620  }
621
622  #[test]
623  fn test_split_at_transformer_set_config_impl() {
624    let mut transformer = SplitAtTransformer::<i32>::new(5);
625    let new_config = TransformerConfig::default()
626      .with_name("test_split_at".to_string())
627      .with_error_strategy(ErrorStrategy::Skip);
628    transformer.set_config_impl(new_config);
629    assert_eq!(transformer.config.name, Some("test_split_at".to_string()));
630    assert!(matches!(
631      transformer.config.error_strategy,
632      ErrorStrategy::Skip
633    ));
634  }
635
636  #[test]
637  fn test_split_at_transformer_get_config_impl() {
638    let transformer = SplitAtTransformer::<i32>::new(5).with_name("test".to_string());
639    let config = transformer.get_config_impl();
640    assert_eq!(config.name, Some("test".to_string()));
641  }
642
643  #[test]
644  fn test_split_at_transformer_get_config_mut_impl() {
645    let mut transformer = SplitAtTransformer::<i32>::new(5);
646    let config = transformer.get_config_mut_impl();
647    config.name = Some("mutated".to_string());
648    assert_eq!(transformer.config.name, Some("mutated".to_string()));
649  }
650
651  #[test]
652  fn test_split_at_transformer_handle_error_stop() {
653    let transformer = SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Stop);
654    let error = StreamError {
655      source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
656      context: ErrorContext {
657        timestamp: chrono::Utc::now(),
658        item: None,
659        component_name: "test".to_string(),
660        component_type: "test".to_string(),
661      },
662      component: ComponentInfo {
663        name: "test".to_string(),
664        type_name: "test".to_string(),
665      },
666      retries: 0,
667    };
668    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
669  }
670
671  #[test]
672  fn test_split_at_transformer_handle_error_skip() {
673    let transformer = SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Skip);
674    let error = StreamError {
675      source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
676      context: ErrorContext {
677        timestamp: chrono::Utc::now(),
678        item: None,
679        component_name: "test".to_string(),
680        component_type: "test".to_string(),
681      },
682      component: ComponentInfo {
683        name: "test".to_string(),
684        type_name: "test".to_string(),
685      },
686      retries: 0,
687    };
688    assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
689  }
690
691  #[test]
692  fn test_split_at_transformer_handle_error_retry() {
693    let transformer =
694      SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Retry(3));
695    let error = StreamError {
696      source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
697      context: ErrorContext {
698        timestamp: chrono::Utc::now(),
699        item: None,
700        component_name: "test".to_string(),
701        component_type: "test".to_string(),
702      },
703      component: ComponentInfo {
704        name: "test".to_string(),
705        type_name: "test".to_string(),
706      },
707      retries: 1,
708    };
709    assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
710  }
711
712  #[test]
713  fn test_split_at_transformer_handle_error_retry_exhausted() {
714    let transformer =
715      SplitAtTransformer::<i32>::new(5).with_error_strategy(ErrorStrategy::Retry(3));
716    let error = StreamError {
717      source: Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "test")),
718      context: ErrorContext {
719        timestamp: chrono::Utc::now(),
720        item: None,
721        component_name: "test".to_string(),
722        component_type: "test".to_string(),
723      },
724      component: ComponentInfo {
725        name: "test".to_string(),
726        type_name: "test".to_string(),
727      },
728      retries: 3,
729    };
730    assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
731  }
732
733  #[test]
734  fn test_split_at_transformer_create_error_context() {
735    let transformer = SplitAtTransformer::<i32>::new(5).with_name("test_split_at".to_string());
736    let context = transformer.create_error_context(Some(42));
737    assert_eq!(context.component_name, "test_split_at");
738    assert_eq!(context.item, Some(42));
739    assert!(context.component_type.contains("SplitAtTransformer"));
740  }
741
742  #[test]
743  fn test_split_at_transformer_create_error_context_no_item() {
744    let transformer = SplitAtTransformer::<i32>::new(5);
745    let context = transformer.create_error_context(None);
746    assert_eq!(context.component_name, "split_at_transformer");
747    assert_eq!(context.item, None);
748  }
749
750  #[test]
751  fn test_split_at_transformer_component_info_default() {
752    let transformer = SplitAtTransformer::<i32>::new(5);
753    let info = transformer.component_info();
754    assert_eq!(info.name, "split_at_transformer");
755  }
756}