streamweave_vec/
vec_consumer.rs

1use async_trait::async_trait;
2use futures::{Stream, StreamExt};
3use std::pin::Pin;
4use streamweave::{Consumer, ConsumerConfig, Input};
5use streamweave_error::ErrorStrategy;
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, StreamError};
7
8/// A consumer that collects items into a `Vec`.
9///
10/// This consumer collects all items from the stream into an internal `Vec`,
11/// preserving the order in which they were received.
12#[derive(Clone)]
13pub struct VecConsumer<T>
14where
15  T: std::fmt::Debug + Clone + Send + Sync + 'static,
16{
17  /// The internal `Vec` where items are collected.
18  pub vec: Vec<T>,
19  /// Configuration for the consumer, including error handling strategy.
20  pub config: ConsumerConfig<T>,
21}
22
23impl<T> Default for VecConsumer<T>
24where
25  T: std::fmt::Debug + Clone + Send + Sync + 'static,
26{
27  fn default() -> Self {
28    Self::new()
29  }
30}
31
32impl<T> VecConsumer<T>
33where
34  T: std::fmt::Debug + Clone + Send + Sync + 'static,
35{
36  /// Creates a new `VecConsumer` with an empty `Vec`.
37  pub fn new() -> Self {
38    Self {
39      vec: Vec::new(),
40      config: ConsumerConfig::default(),
41    }
42  }
43
44  /// Creates a new `VecConsumer` with a pre-allocated `Vec` capacity.
45  ///
46  /// # Arguments
47  ///
48  /// * `capacity` - The initial capacity of the internal `Vec`.
49  pub fn with_capacity(capacity: usize) -> Self {
50    Self {
51      vec: Vec::with_capacity(capacity),
52      config: ConsumerConfig::default(),
53    }
54  }
55
56  /// Sets the error handling strategy for this consumer.
57  ///
58  /// # Arguments
59  ///
60  /// * `strategy` - The error handling strategy to use.
61  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
62    self.config.error_strategy = strategy;
63    self
64  }
65
66  /// Sets the name for this consumer.
67  ///
68  /// # Arguments
69  ///
70  /// * `name` - The name to assign to this consumer.
71  pub fn with_name(mut self, name: String) -> Self {
72    self.config.name = name;
73    self
74  }
75
76  /// Consumes the consumer and returns the collected `Vec`.
77  ///
78  /// # Returns
79  ///
80  /// The `Vec` containing all collected items in order.
81  pub fn into_vec(self) -> Vec<T> {
82    self.vec
83  }
84}
85
86impl<T> Input for VecConsumer<T>
87where
88  T: std::fmt::Debug + Clone + Send + Sync + 'static,
89{
90  type Input = T;
91  type InputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
92}
93
94#[async_trait]
95impl<T> Consumer for VecConsumer<T>
96where
97  T: std::fmt::Debug + Clone + Send + Sync + 'static,
98{
99  type InputPorts = (T,);
100
101  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
102    let consumer_name = self.config.name.clone();
103    println!("📥 [{}] Starting to consume stream", consumer_name);
104    let mut count = 0;
105    while let Some(value) = stream.next().await {
106      count += 1;
107      println!(
108        "   📦 [{}] Consuming item #{}: {:?}",
109        consumer_name, count, value
110      );
111      self.vec.push(value);
112    }
113    println!("✅ [{}] Finished consuming {} items", consumer_name, count);
114  }
115
116  fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
117    self.config = config;
118  }
119
120  fn get_config_impl(&self) -> &ConsumerConfig<T> {
121    &self.config
122  }
123
124  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
125    &mut self.config
126  }
127
128  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
129    match self.config.error_strategy {
130      ErrorStrategy::Stop => ErrorAction::Stop,
131      ErrorStrategy::Skip => ErrorAction::Skip,
132      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
133      _ => ErrorAction::Stop,
134    }
135  }
136
137  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
138    ErrorContext {
139      timestamp: chrono::Utc::now(),
140      item,
141      component_name: self.config.name.clone(),
142      component_type: std::any::type_name::<Self>().to_string(),
143    }
144  }
145
146  fn component_info(&self) -> ComponentInfo {
147    ComponentInfo {
148      name: self.config.name.clone(),
149      type_name: std::any::type_name::<Self>().to_string(),
150    }
151  }
152}