streamweave_transformers/split-at/
split_at_transformer.rs

1use std::marker::PhantomData;
2use streamweave::TransformerConfig;
3use streamweave_error::ErrorStrategy;
4
5/// A transformer that splits a stream of items at a specific index.
6///
7/// This transformer collects items from the input stream and splits them
8/// into two groups: items before the index and items at or after the index.
9#[derive(Clone)]
10pub struct SplitAtTransformer<T>
11where
12  T: std::fmt::Debug + Clone + Send + Sync + 'static,
13{
14  /// The index at which to split the stream.
15  pub index: usize,
16  /// Configuration for the transformer, including error handling strategy.
17  pub config: TransformerConfig<T>,
18  /// Phantom data to track the type parameter.
19  pub _phantom: std::marker::PhantomData<T>,
20}
21
22impl<T> SplitAtTransformer<T>
23where
24  T: std::fmt::Debug + Clone + Send + Sync + 'static,
25{
26  /// Creates a new `SplitAtTransformer` with the given index.
27  ///
28  /// # Arguments
29  ///
30  /// * `index` - The index at which to split the stream.
31  pub fn new(index: usize) -> Self {
32    Self {
33      index,
34      config: TransformerConfig::<T>::default(),
35      _phantom: PhantomData,
36    }
37  }
38
39  /// Sets the error handling strategy for this transformer.
40  ///
41  /// # Arguments
42  ///
43  /// * `strategy` - The error handling strategy to use.
44  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
45    self.config.error_strategy = strategy;
46    self
47  }
48
49  /// Sets the name for this transformer.
50  ///
51  /// # Arguments
52  ///
53  /// * `name` - The name to assign to this transformer.
54  pub fn with_name(mut self, name: String) -> Self {
55    self.config.name = Some(name);
56    self
57  }
58}
59
60#[cfg(test)]
61mod tests {
62  use super::*;
63
64  #[test]
65  fn test_split_at_transformer_new() {
66    let transformer = SplitAtTransformer::<i32>::new(5);
67    assert_eq!(transformer.index, 5);
68  }
69
70  #[test]
71  fn test_split_at_transformer_with_error_strategy() {
72    let transformer =
73      SplitAtTransformer::<i32>::new(3).with_error_strategy(ErrorStrategy::<i32>::Skip);
74    assert!(matches!(
75      transformer.config.error_strategy,
76      ErrorStrategy::Skip
77    ));
78  }
79
80  #[test]
81  fn test_split_at_transformer_with_name() {
82    let transformer = SplitAtTransformer::<i32>::new(3).with_name("test_split_at".to_string());
83    assert_eq!(transformer.config.name, Some("test_split_at".to_string()));
84  }
85
86  #[test]
87  fn test_split_at_transformer_clone() {
88    let transformer1 = SplitAtTransformer::<i32>::new(10)
89      .with_error_strategy(ErrorStrategy::<i32>::Retry(5))
90      .with_name("test_split_at".to_string());
91    let transformer2 = transformer1.clone();
92
93    assert_eq!(transformer1.index, transformer2.index);
94    assert_eq!(transformer1.config.name, transformer2.config.name);
95  }
96
97  #[test]
98  fn test_split_at_transformer_chaining() {
99    let transformer = SplitAtTransformer::<i32>::new(10)
100      .with_error_strategy(ErrorStrategy::<i32>::Retry(3))
101      .with_name("chained_split_at".to_string());
102
103    assert_eq!(transformer.index, 10);
104    assert!(matches!(
105      transformer.config.error_strategy,
106      ErrorStrategy::Retry(3)
107    ));
108    assert_eq!(
109      transformer.config.name,
110      Some("chained_split_at".to_string())
111    );
112  }
113}