streamweave_transformers/split/
transformer.rs1use super::split_transformer::SplitTransformer;
2use async_trait::async_trait;
3use futures::stream;
4use futures::{Stream, StreamExt};
5use std::pin::Pin;
6use streamweave::{Transformer, TransformerConfig};
7use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
8
9#[async_trait]
10impl<F, T> Transformer for SplitTransformer<F, T>
11where
12 F: FnMut(&T) -> bool + Send + Clone + 'static,
13 T: std::fmt::Debug + Clone + Send + Sync + 'static,
14{
15 type InputPorts = (T,);
16 type OutputPorts = (Vec<T>,);
17
18 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
19 let predicate = self.predicate.clone();
20
21 Box::pin(futures::stream::unfold(
22 (input, predicate),
23 |(mut input, mut pred)| async move {
24 if let Some(items) = input.next().await {
25 let mut current_chunk = Vec::new();
26 let mut chunks = Vec::new();
27
28 for item in items {
29 if pred(&item) && !current_chunk.is_empty() {
30 chunks.push(std::mem::take(&mut current_chunk));
31 }
32 current_chunk.push(item);
33 }
34
35 if !current_chunk.is_empty() {
36 chunks.push(current_chunk);
37 }
38
39 if chunks.is_empty() {
40 Some((Vec::new(), (input, pred)))
41 } else {
42 let mut iter = chunks.into_iter();
43 let first = iter.next().unwrap();
44 Some((
45 first,
46 (
47 Box::pin(stream::iter(iter).chain(input))
48 as Pin<Box<dyn Stream<Item = Vec<T>> + Send>>,
49 pred,
50 ),
51 ))
52 }
53 } else {
54 None
55 }
56 },
57 ))
58 }
59
60 fn set_config_impl(&mut self, config: TransformerConfig<Vec<T>>) {
61 self.config = config;
62 }
63
64 fn get_config_impl(&self) -> &TransformerConfig<Vec<T>> {
65 &self.config
66 }
67
68 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<Vec<T>> {
69 &mut self.config
70 }
71
72 fn handle_error(&self, error: &StreamError<Vec<T>>) -> ErrorAction {
73 match self.config.error_strategy {
74 ErrorStrategy::Stop => ErrorAction::Stop,
75 ErrorStrategy::Skip => ErrorAction::Skip,
76 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
77 _ => ErrorAction::Stop,
78 }
79 }
80
81 fn create_error_context(&self, item: Option<Vec<T>>) -> ErrorContext<Vec<T>> {
82 ErrorContext {
83 timestamp: chrono::Utc::now(),
84 item,
85 component_name: self.component_info().name,
86 component_type: std::any::type_name::<Self>().to_string(),
87 }
88 }
89
90 fn component_info(&self) -> ComponentInfo {
91 ComponentInfo {
92 name: self
93 .config
94 .name
95 .clone()
96 .unwrap_or_else(|| "split_transformer".to_string()),
97 type_name: std::any::type_name::<Self>().to_string(),
98 }
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use futures::StreamExt;
106 use futures::stream;
107
108 #[tokio::test]
109 async fn test_split_by_even_numbers() {
110 let mut transformer = SplitTransformer::new(|x: &i32| x % 2 == 0);
111 let input = vec![vec![1, 2, 3, 4, 5, 6]];
112 let input_stream = Box::pin(stream::iter(input.into_iter()));
113
114 let result: Vec<Vec<i32>> = transformer.transform(input_stream).collect().await;
115
116 assert_eq!(result, vec![vec![1], vec![2, 3], vec![4, 5], vec![6]]);
117 }
118
119 #[tokio::test]
120 async fn test_split_no_splits() {
121 let mut transformer = SplitTransformer::new(|x: &i32| *x < 0); let input = vec![vec![1, 2, 3, 4, 5]];
123 let input_stream = Box::pin(stream::iter(input.into_iter()));
124
125 let result: Vec<Vec<i32>> = transformer.transform(input_stream).collect().await;
126
127 assert_eq!(result, vec![vec![1, 2, 3, 4, 5]]);
128 }
129
130 #[tokio::test]
131 async fn test_split_strings() {
132 let mut transformer = SplitTransformer::new(|s: &String| s.is_empty());
133 let input = vec![vec![
134 "hello".to_string(),
135 "".to_string(),
136 "world".to_string(),
137 "".to_string(),
138 "rust".to_string(),
139 ]];
140 let input_stream = Box::pin(stream::iter(input.into_iter()));
141
142 let result: Vec<Vec<String>> = transformer.transform(input_stream).collect().await;
143
144 assert_eq!(
145 result,
146 vec![
147 vec!["hello".to_string()],
148 vec!["".to_string(), "world".to_string()],
149 vec!["".to_string(), "rust".to_string()],
150 ]
151 );
152 }
153
154 #[tokio::test]
155 async fn test_error_handling_strategies() {
156 let transformer = SplitTransformer::new(|_: &i32| true)
157 .with_error_strategy(ErrorStrategy::<Vec<i32>>::Skip)
158 .with_name("test_transformer".to_string());
159
160 let config = transformer.config();
161 assert_eq!(config.error_strategy(), ErrorStrategy::<Vec<i32>>::Skip);
162 assert_eq!(config.name(), Some("test_transformer".to_string()));
163 }
164}