streamweave_transformers/split/
transformer.rs

1use super::split_transformer::SplitTransformer;
2use async_trait::async_trait;
3use futures::stream;
4use futures::{Stream, StreamExt};
5use std::pin::Pin;
6use streamweave::{Transformer, TransformerConfig};
7use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
8
9#[async_trait]
10impl<F, T> Transformer for SplitTransformer<F, T>
11where
12  F: FnMut(&T) -> bool + Send + Clone + 'static,
13  T: std::fmt::Debug + Clone + Send + Sync + 'static,
14{
15  type InputPorts = (T,);
16  type OutputPorts = (Vec<T>,);
17
18  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
19    let predicate = self.predicate.clone();
20
21    Box::pin(futures::stream::unfold(
22      (input, predicate),
23      |(mut input, mut pred)| async move {
24        if let Some(items) = input.next().await {
25          let mut current_chunk = Vec::new();
26          let mut chunks = Vec::new();
27
28          for item in items {
29            if pred(&item) && !current_chunk.is_empty() {
30              chunks.push(std::mem::take(&mut current_chunk));
31            }
32            current_chunk.push(item);
33          }
34
35          if !current_chunk.is_empty() {
36            chunks.push(current_chunk);
37          }
38
39          if chunks.is_empty() {
40            Some((Vec::new(), (input, pred)))
41          } else {
42            let mut iter = chunks.into_iter();
43            let first = iter.next().unwrap();
44            Some((
45              first,
46              (
47                Box::pin(stream::iter(iter).chain(input))
48                  as Pin<Box<dyn Stream<Item = Vec<T>> + Send>>,
49                pred,
50              ),
51            ))
52          }
53        } else {
54          None
55        }
56      },
57    ))
58  }
59
60  fn set_config_impl(&mut self, config: TransformerConfig<Vec<T>>) {
61    self.config = config;
62  }
63
64  fn get_config_impl(&self) -> &TransformerConfig<Vec<T>> {
65    &self.config
66  }
67
68  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<Vec<T>> {
69    &mut self.config
70  }
71
72  fn handle_error(&self, error: &StreamError<Vec<T>>) -> ErrorAction {
73    match self.config.error_strategy {
74      ErrorStrategy::Stop => ErrorAction::Stop,
75      ErrorStrategy::Skip => ErrorAction::Skip,
76      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
77      _ => ErrorAction::Stop,
78    }
79  }
80
81  fn create_error_context(&self, item: Option<Vec<T>>) -> ErrorContext<Vec<T>> {
82    ErrorContext {
83      timestamp: chrono::Utc::now(),
84      item,
85      component_name: self.component_info().name,
86      component_type: std::any::type_name::<Self>().to_string(),
87    }
88  }
89
90  fn component_info(&self) -> ComponentInfo {
91    ComponentInfo {
92      name: self
93        .config
94        .name
95        .clone()
96        .unwrap_or_else(|| "split_transformer".to_string()),
97      type_name: std::any::type_name::<Self>().to_string(),
98    }
99  }
100}
101
102#[cfg(test)]
103mod tests {
104  use super::*;
105  use futures::StreamExt;
106  use futures::stream;
107
108  #[tokio::test]
109  async fn test_split_by_even_numbers() {
110    let mut transformer = SplitTransformer::new(|x: &i32| x % 2 == 0);
111    let input = vec![vec![1, 2, 3, 4, 5, 6]];
112    let input_stream = Box::pin(stream::iter(input.into_iter()));
113
114    let result: Vec<Vec<i32>> = transformer.transform(input_stream).collect().await;
115
116    assert_eq!(result, vec![vec![1], vec![2, 3], vec![4, 5], vec![6]]);
117  }
118
119  #[tokio::test]
120  async fn test_split_no_splits() {
121    let mut transformer = SplitTransformer::new(|x: &i32| *x < 0); // No negatives in input
122    let input = vec![vec![1, 2, 3, 4, 5]];
123    let input_stream = Box::pin(stream::iter(input.into_iter()));
124
125    let result: Vec<Vec<i32>> = transformer.transform(input_stream).collect().await;
126
127    assert_eq!(result, vec![vec![1, 2, 3, 4, 5]]);
128  }
129
130  #[tokio::test]
131  async fn test_split_strings() {
132    let mut transformer = SplitTransformer::new(|s: &String| s.is_empty());
133    let input = vec![vec![
134      "hello".to_string(),
135      "".to_string(),
136      "world".to_string(),
137      "".to_string(),
138      "rust".to_string(),
139    ]];
140    let input_stream = Box::pin(stream::iter(input.into_iter()));
141
142    let result: Vec<Vec<String>> = transformer.transform(input_stream).collect().await;
143
144    assert_eq!(
145      result,
146      vec![
147        vec!["hello".to_string()],
148        vec!["".to_string(), "world".to_string()],
149        vec!["".to_string(), "rust".to_string()],
150      ]
151    );
152  }
153
154  #[tokio::test]
155  async fn test_error_handling_strategies() {
156    let transformer = SplitTransformer::new(|_: &i32| true)
157      .with_error_strategy(ErrorStrategy::<Vec<i32>>::Skip)
158      .with_name("test_transformer".to_string());
159
160    let config = transformer.config();
161    assert_eq!(config.error_strategy(), ErrorStrategy::<Vec<i32>>::Skip);
162    assert_eq!(config.name(), Some("test_transformer".to_string()));
163  }
164}