streamweave_transformers/batch/
batch_transformer.rs1use std::marker::PhantomData;
2use streamweave::TransformerConfig;
3use streamweave_error::{ComponentInfo, ErrorContext, ErrorStrategy, StreamError};
4
5pub struct BatchTransformer<T>
10where
11 T: std::fmt::Debug + Clone + Send + Sync + 'static,
12{
13 pub size: usize,
15 pub config: TransformerConfig<T>,
17 pub _phantom: PhantomData<T>,
19}
20
21impl<T> BatchTransformer<T>
22where
23 T: std::fmt::Debug + Clone + Send + Sync + 'static,
24{
25 pub fn new(size: usize) -> Result<Self, Box<StreamError<T>>> {
35 if size == 0 {
36 return Err(Box::new(StreamError::new(
37 Box::new(std::io::Error::new(
38 std::io::ErrorKind::InvalidInput,
39 "Batch size must be greater than zero",
40 )),
41 ErrorContext {
42 timestamp: chrono::Utc::now(),
43 item: None,
44 component_name: "batch_transformer".to_string(),
45 component_type: std::any::type_name::<Self>().to_string(),
46 },
47 ComponentInfo {
48 name: "batch_transformer".to_string(),
49 type_name: std::any::type_name::<Self>().to_string(),
50 },
51 )));
52 }
53 Ok(Self {
54 size,
55 config: TransformerConfig::default(),
56 _phantom: PhantomData,
57 })
58 }
59
60 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
66 self.config.error_strategy = strategy;
67 self
68 }
69
70 pub fn with_name(mut self, name: String) -> Self {
76 self.config.name = Some(name);
77 self
78 }
79}