streamweave_transformers/map/
transformer.rs

1use super::map_transformer::MapTransformer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Transformer, TransformerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6
7#[async_trait]
8#[allow(dead_code)]
9impl<F, I, O> Transformer for MapTransformer<F, I, O>
10where
11  F: FnMut(I) -> O + Send + Clone + 'static,
12  I: std::fmt::Debug + Clone + Send + Sync + 'static,
13  O: std::fmt::Debug + Clone + Send + Sync + 'static,
14{
15  type InputPorts = (I,);
16  type OutputPorts = (O,);
17
18  fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
19    let f = self.f.clone();
20    Box::pin(input.map(f))
21  }
22
23  fn set_config_impl(&mut self, config: TransformerConfig<I>) {
24    self.config = config;
25  }
26
27  fn get_config_impl(&self) -> &TransformerConfig<I> {
28    &self.config
29  }
30
31  fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<I> {
32    &mut self.config
33  }
34
35  fn handle_error(&self, error: &StreamError<I>) -> ErrorAction {
36    match self.config.error_strategy {
37      ErrorStrategy::Stop => ErrorAction::Stop,
38      ErrorStrategy::Skip => ErrorAction::Skip,
39      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
40      _ => ErrorAction::Stop,
41    }
42  }
43
44  fn create_error_context(&self, item: Option<I>) -> ErrorContext<I> {
45    ErrorContext {
46      timestamp: chrono::Utc::now(),
47      item,
48      component_name: self.component_info().name,
49      component_type: self.component_info().type_name,
50    }
51  }
52
53  fn component_info(&self) -> ComponentInfo {
54    ComponentInfo {
55      name: self
56        .config
57        .name
58        .clone()
59        .unwrap_or_else(|| "map_transformer".to_string()),
60      type_name: std::any::type_name::<Self>().to_string(),
61    }
62  }
63}
64
65#[cfg(test)]
66mod tests {
67  use super::*;
68  use futures::StreamExt;
69  use futures::stream;
70
71  #[tokio::test]
72  async fn test_map_transformer() {
73    let mut transformer = MapTransformer::new(|x: i32| x * 2);
74    let input = stream::iter(vec![1, 2, 3]);
75    let boxed_input = Box::pin(input);
76
77    let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
78
79    assert_eq!(result, vec![2, 4, 6]);
80  }
81
82  #[tokio::test]
83  async fn test_map_transformer_type_conversion() {
84    let mut transformer = MapTransformer::new(|x: i32| x.to_string());
85    let input = stream::iter(vec![1, 2, 3]);
86    let boxed_input = Box::pin(input);
87
88    let result: Vec<String> = transformer.transform(boxed_input).collect().await;
89
90    assert_eq!(result, vec!["1", "2", "3"]);
91  }
92
93  #[tokio::test]
94  async fn test_map_transformer_reuse() {
95    let mut transformer = MapTransformer::new(|x: i32| x * 2);
96
97    // First transform
98    let input1 = stream::iter(vec![1, 2, 3]);
99    let boxed_input1 = Box::pin(input1);
100    let result1: Vec<i32> = transformer.transform(boxed_input1).collect().await;
101    assert_eq!(result1, vec![2, 4, 6]);
102
103    // Second transform
104    let input2 = stream::iter(vec![4, 5, 6]);
105    let boxed_input2 = Box::pin(input2);
106    let result2: Vec<i32> = transformer.transform(boxed_input2).collect().await;
107    assert_eq!(result2, vec![8, 10, 12]);
108  }
109
110  #[tokio::test]
111  async fn test_error_handling_strategies() {
112    let transformer = MapTransformer::new(|x: i32| x * 2)
113      .with_error_strategy(ErrorStrategy::<i32>::Skip)
114      .with_name("test_transformer".to_string());
115
116    let config = transformer.config();
117    assert_eq!(config.error_strategy(), ErrorStrategy::<i32>::Skip);
118    assert_eq!(config.name(), Some("test_transformer".to_string()));
119  }
120}