streamweave_transformers/batch/
batch_transformer.rs

1use std::marker::PhantomData;
2use streamweave::TransformerConfig;
3use streamweave_error::{ComponentInfo, ErrorContext, ErrorStrategy, StreamError};
4
5/// A transformer that groups items in a stream into batches of a specified size.
6///
7/// This transformer collects incoming items until the specified `size` is reached,
8/// then emits them as a `Vec<T>`.
9pub struct BatchTransformer<T>
10where
11  T: std::fmt::Debug + Clone + Send + Sync + 'static,
12{
13  /// The number of items to include in each batch.
14  pub size: usize,
15  /// Configuration for the transformer, including error handling strategy.
16  pub config: TransformerConfig<T>,
17  /// Phantom data to track the input type parameter.
18  pub _phantom: PhantomData<T>,
19}
20
21impl<T> BatchTransformer<T>
22where
23  T: std::fmt::Debug + Clone + Send + Sync + 'static,
24{
25  /// Creates a new `BatchTransformer` with the given batch size.
26  ///
27  /// # Arguments
28  ///
29  /// * `size` - The number of items to include in each batch. Must be greater than zero.
30  ///
31  /// # Returns
32  ///
33  /// A `Result` containing the `BatchTransformer` if `size` is valid, or an error if `size` is zero.
34  pub fn new(size: usize) -> Result<Self, Box<StreamError<T>>> {
35    if size == 0 {
36      return Err(Box::new(StreamError::new(
37        Box::new(std::io::Error::new(
38          std::io::ErrorKind::InvalidInput,
39          "Batch size must be greater than zero",
40        )),
41        ErrorContext {
42          timestamp: chrono::Utc::now(),
43          item: None,
44          component_name: "batch_transformer".to_string(),
45          component_type: std::any::type_name::<Self>().to_string(),
46        },
47        ComponentInfo {
48          name: "batch_transformer".to_string(),
49          type_name: std::any::type_name::<Self>().to_string(),
50        },
51      )));
52    }
53    Ok(Self {
54      size,
55      config: TransformerConfig::default(),
56      _phantom: PhantomData,
57    })
58  }
59
60  /// Sets the error handling strategy for this transformer.
61  ///
62  /// # Arguments
63  ///
64  /// * `strategy` - The error handling strategy to use.
65  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
66    self.config.error_strategy = strategy;
67    self
68  }
69
70  /// Sets the name for this transformer.
71  ///
72  /// # Arguments
73  ///
74  /// * `name` - The name to assign to this transformer.
75  pub fn with_name(mut self, name: String) -> Self {
76    self.config.name = Some(name);
77    self
78  }
79}