streamweave_transformers/map/
transformer.rs1use super::map_transformer::MapTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Transformer, TransformerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6
7#[async_trait]
8#[allow(dead_code)]
9impl<F, I, O> Transformer for MapTransformer<F, I, O>
10where
11 F: FnMut(I) -> O + Send + Clone + 'static,
12 I: std::fmt::Debug + Clone + Send + Sync + 'static,
13 O: std::fmt::Debug + Clone + Send + Sync + 'static,
14{
15 type InputPorts = (I,);
16 type OutputPorts = (O,);
17
18 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
19 let f = self.f.clone();
20 Box::pin(input.map(f))
21 }
22
23 fn set_config_impl(&mut self, config: TransformerConfig<I>) {
24 self.config = config;
25 }
26
27 fn get_config_impl(&self) -> &TransformerConfig<I> {
28 &self.config
29 }
30
31 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<I> {
32 &mut self.config
33 }
34
35 fn handle_error(&self, error: &StreamError<I>) -> ErrorAction {
36 match self.config.error_strategy {
37 ErrorStrategy::Stop => ErrorAction::Stop,
38 ErrorStrategy::Skip => ErrorAction::Skip,
39 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
40 _ => ErrorAction::Stop,
41 }
42 }
43
44 fn create_error_context(&self, item: Option<I>) -> ErrorContext<I> {
45 ErrorContext {
46 timestamp: chrono::Utc::now(),
47 item,
48 component_name: self.component_info().name,
49 component_type: self.component_info().type_name,
50 }
51 }
52
53 fn component_info(&self) -> ComponentInfo {
54 ComponentInfo {
55 name: self
56 .config
57 .name
58 .clone()
59 .unwrap_or_else(|| "map_transformer".to_string()),
60 type_name: std::any::type_name::<Self>().to_string(),
61 }
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68 use futures::StreamExt;
69 use futures::stream;
70
71 #[tokio::test]
72 async fn test_map_transformer() {
73 let mut transformer = MapTransformer::new(|x: i32| x * 2);
74 let input = stream::iter(vec![1, 2, 3]);
75 let boxed_input = Box::pin(input);
76
77 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
78
79 assert_eq!(result, vec![2, 4, 6]);
80 }
81
82 #[tokio::test]
83 async fn test_map_transformer_type_conversion() {
84 let mut transformer = MapTransformer::new(|x: i32| x.to_string());
85 let input = stream::iter(vec![1, 2, 3]);
86 let boxed_input = Box::pin(input);
87
88 let result: Vec<String> = transformer.transform(boxed_input).collect().await;
89
90 assert_eq!(result, vec!["1", "2", "3"]);
91 }
92
93 #[tokio::test]
94 async fn test_map_transformer_reuse() {
95 let mut transformer = MapTransformer::new(|x: i32| x * 2);
96
97 let input1 = stream::iter(vec![1, 2, 3]);
99 let boxed_input1 = Box::pin(input1);
100 let result1: Vec<i32> = transformer.transform(boxed_input1).collect().await;
101 assert_eq!(result1, vec![2, 4, 6]);
102
103 let input2 = stream::iter(vec![4, 5, 6]);
105 let boxed_input2 = Box::pin(input2);
106 let result2: Vec<i32> = transformer.transform(boxed_input2).collect().await;
107 assert_eq!(result2, vec![8, 10, 12]);
108 }
109
110 #[tokio::test]
111 async fn test_error_handling_strategies() {
112 let transformer = MapTransformer::new(|x: i32| x * 2)
113 .with_error_strategy(ErrorStrategy::<i32>::Skip)
114 .with_name("test_transformer".to_string());
115
116 let config = transformer.config();
117 assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
118 assert_eq!(config.name(), Some("test_transformer".to_string()));
119 }
120}