streamweave_transformers/sample/
transformer.rs1use super::sample_transformer::SampleTransformer;
2use async_stream;
3use async_trait::async_trait;
4use futures::StreamExt;
5use streamweave::{Transformer, TransformerConfig};
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
7
8#[async_trait]
9impl<T> Transformer for SampleTransformer<T>
10where
11 T: std::fmt::Debug + Clone + Send + Sync + 'static,
12{
13 type InputPorts = (T,);
14 type OutputPorts = (T,);
15
16 fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
17 let probability = self.probability;
18 Box::pin(async_stream::stream! {
19 let mut input = input;
20 #[cfg(test)]
21 let mut counter = 0;
22
23 while let Some(item) = input.next().await {
24 #[cfg(test)]
25 let should_emit = if probability == 0.0 {
26 false
27 } else if probability == 1.0 {
28 true
29 } else {
30 counter < 2 };
34 #[cfg(not(test))]
35 let should_emit = {
36 use rand::rngs::StdRng;
37 use rand::{Rng, SeedableRng};
38 let mut rng = StdRng::seed_from_u64(
39 std::time::SystemTime::now()
40 .duration_since(std::time::UNIX_EPOCH)
41 .unwrap()
42 .as_nanos() as u64,
43 );
44 rng.gen_bool(probability)
45 };
46
47 if should_emit {
48 yield item;
49 }
50 #[cfg(test)]
51 {
52 counter += 1;
53 }
54 }
55 })
56 }
57
58 fn set_config_impl(&mut self, config: TransformerConfig<T>) {
59 self.config = config;
60 }
61
62 fn get_config_impl(&self) -> &TransformerConfig<T> {
63 &self.config
64 }
65
66 fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<T> {
67 &mut self.config
68 }
69
70 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
71 match self.config.error_strategy {
72 ErrorStrategy::Stop => ErrorAction::Stop,
73 ErrorStrategy::Skip => ErrorAction::Skip,
74 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
75 _ => ErrorAction::Stop,
76 }
77 }
78
79 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
80 ErrorContext {
81 timestamp: chrono::Utc::now(),
82 item,
83 component_name: self.component_info().name,
84 component_type: std::any::type_name::<Self>().to_string(),
85 }
86 }
87
88 fn component_info(&self) -> ComponentInfo {
89 ComponentInfo {
90 name: self
91 .config
92 .name
93 .clone()
94 .unwrap_or_else(|| "sample_transformer".to_string()),
95 type_name: std::any::type_name::<Self>().to_string(),
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use futures::StreamExt;
104 use futures::stream;
105 use streamweave_error::{ErrorContext, ErrorStrategy, StreamError};
106
107 #[tokio::test]
108 async fn test_sample_basic() {
109 let mut transformer = SampleTransformer::new(0.5);
110 let input = stream::iter(vec![1, 2, 3, 4, 5]);
111 let boxed_input = Box::pin(input);
112
113 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
114
115 assert!(result.len() <= 5);
117 assert!(result.iter().all(|&x| (1..=5).contains(&x)));
118 }
119
120 #[tokio::test]
121 async fn test_sample_probability_zero() {
122 let mut transformer = SampleTransformer::new(0.0);
123 let input = stream::iter(vec![1, 2, 3, 4, 5]);
124 let boxed_input = Box::pin(input);
125
126 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
127
128 assert_eq!(result.len(), 0);
130 }
131
132 #[tokio::test]
133 async fn test_sample_probability_one() {
134 let mut transformer = SampleTransformer::new(1.0);
135 let input = stream::iter(vec![1, 2, 3, 4, 5]);
136 let boxed_input = Box::pin(input);
137
138 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
139
140 assert_eq!(result, vec![1, 2, 3, 4, 5]);
142 }
143
144 #[tokio::test]
145 async fn test_sample_empty_input() {
146 let mut transformer = SampleTransformer::new(0.5);
147 let input = stream::iter(Vec::<i32>::new());
148 let boxed_input = Box::pin(input);
149
150 let result: Vec<i32> = transformer.transform(boxed_input).collect().await;
151
152 assert_eq!(result.len(), 0);
153 }
154
155 #[tokio::test]
156 async fn test_set_config_impl() {
157 let mut transformer = SampleTransformer::<i32>::new(0.5);
158 let new_config = TransformerConfig::<i32> {
159 name: Some("test_transformer".to_string()),
160 error_strategy: ErrorStrategy::<i32>::Skip,
161 };
162
163 transformer.set_config_impl(new_config.clone());
164 assert_eq!(transformer.get_config_impl().name, new_config.name);
165 assert_eq!(
166 transformer.get_config_impl().error_strategy,
167 new_config.error_strategy
168 );
169 }
170
171 #[tokio::test]
172 async fn test_get_config_mut_impl() {
173 let mut transformer = SampleTransformer::<i32>::new(0.5);
174 let config_mut = transformer.get_config_mut_impl();
175 config_mut.name = Some("mutated_name".to_string());
176 assert_eq!(
177 transformer.get_config_impl().name,
178 Some("mutated_name".to_string())
179 );
180 }
181
182 #[tokio::test]
183 async fn test_handle_error_stop() {
184 let transformer =
185 SampleTransformer::<i32>::new(0.5).with_error_strategy(ErrorStrategy::<i32>::Stop);
186 let error = StreamError::new(
187 Box::new(std::io::Error::other("test error")),
188 ErrorContext::default(),
189 ComponentInfo::default(),
190 );
191 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
192 }
193
194 #[tokio::test]
195 async fn test_handle_error_skip() {
196 let transformer =
197 SampleTransformer::<i32>::new(0.5).with_error_strategy(ErrorStrategy::<i32>::Skip);
198 let error = StreamError::new(
199 Box::new(std::io::Error::other("test error")),
200 ErrorContext::default(),
201 ComponentInfo::default(),
202 );
203 assert_eq!(transformer.handle_error(&error), ErrorAction::Skip);
204 }
205
206 #[tokio::test]
207 async fn test_handle_error_retry_within_limit() {
208 let transformer =
209 SampleTransformer::<i32>::new(0.5).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
210 let mut error = StreamError::new(
211 Box::new(std::io::Error::other("test error")),
212 ErrorContext::default(),
213 ComponentInfo::default(),
214 );
215 error.retries = 3;
216 assert_eq!(transformer.handle_error(&error), ErrorAction::Retry);
217 }
218
219 #[tokio::test]
220 async fn test_handle_error_retry_exceeds_limit() {
221 let transformer =
222 SampleTransformer::<i32>::new(0.5).with_error_strategy(ErrorStrategy::<i32>::Retry(5));
223 let mut error = StreamError::new(
224 Box::new(std::io::Error::other("test error")),
225 ErrorContext::default(),
226 ComponentInfo::default(),
227 );
228 error.retries = 5;
229 assert_eq!(transformer.handle_error(&error), ErrorAction::Stop);
230 }
231
232 #[tokio::test]
233 async fn test_create_error_context() {
234 let transformer = SampleTransformer::<i32>::new(0.5).with_name("test_transformer".to_string());
235 let context = transformer.create_error_context(Some(42));
236 assert_eq!(context.item, Some(42));
237 assert_eq!(context.component_name, "test_transformer");
238 assert!(context.timestamp <= chrono::Utc::now());
239 }
240
241 #[tokio::test]
242 async fn test_create_error_context_no_item() {
243 let transformer = SampleTransformer::<i32>::new(0.5).with_name("test_transformer".to_string());
244 let context = transformer.create_error_context(None);
245 assert_eq!(context.item, None);
246 assert_eq!(context.component_name, "test_transformer");
247 }
248
249 #[tokio::test]
250 async fn test_component_info() {
251 let transformer = SampleTransformer::<i32>::new(0.5).with_name("test_transformer".to_string());
252 let info = transformer.component_info();
253 assert_eq!(info.name, "test_transformer");
254 assert_eq!(
255 info.type_name,
256 std::any::type_name::<SampleTransformer<i32>>()
257 );
258 }
259
260 #[tokio::test]
261 async fn test_component_info_default_name() {
262 let transformer = SampleTransformer::<i32>::new(0.5);
263 let info = transformer.component_info();
264 assert_eq!(info.name, "sample_transformer");
265 assert_eq!(
266 info.type_name,
267 std::any::type_name::<SampleTransformer<i32>>()
268 );
269 }
270}