streamweave_vec/consumers/
consumer.rs

1use super::vec_consumer::VecConsumer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6
7#[async_trait]
8impl<T> Consumer for VecConsumer<T>
9where
10  T: std::fmt::Debug + Clone + Send + Sync + 'static,
11{
12  type InputPorts = (T,);
13
14  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
15    let consumer_name = self.config.name.clone();
16    println!("📥 [{}] Starting to consume stream", consumer_name);
17    let mut count = 0;
18    while let Some(value) = stream.next().await {
19      count += 1;
20      println!(
21        "   📦 [{}] Consuming item #{}: {:?}",
22        consumer_name, count, value
23      );
24      self.vec.push(value);
25    }
26    println!("✅ [{}] Finished consuming {} items", consumer_name, count);
27  }
28
29  fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
30    self.config = config;
31  }
32
33  fn get_config_impl(&self) -> &ConsumerConfig<T> {
34    &self.config
35  }
36
37  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
38    &mut self.config
39  }
40
41  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
42    match self.config.error_strategy {
43      ErrorStrategy::Stop => ErrorAction::Stop,
44      ErrorStrategy::Skip => ErrorAction::Skip,
45      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
46      _ => ErrorAction::Stop,
47    }
48  }
49
50  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
51    ErrorContext {
52      timestamp: chrono::Utc::now(),
53      item,
54      component_name: self.config.name.clone(),
55      component_type: std::any::type_name::<Self>().to_string(),
56    }
57  }
58
59  fn component_info(&self) -> ComponentInfo {
60    ComponentInfo {
61      name: self.config.name.clone(),
62      type_name: std::any::type_name::<Self>().to_string(),
63    }
64  }
65}
66
67#[cfg(test)]
68mod tests {
69  use super::*;
70  use futures::stream;
71  use proptest::prelude::*;
72  use proptest::proptest;
73  use streamweave_error::ErrorStrategy;
74
75  async fn test_vec_consumer_basic_async(input_data: Vec<i32>) {
76    let mut consumer = VecConsumer::new();
77    let input = stream::iter(input_data.clone());
78    let boxed_input = Box::pin(input);
79
80    consumer.consume(boxed_input).await;
81    let vec = consumer.into_vec();
82    assert_eq!(vec, input_data);
83  }
84
85  async fn test_vec_consumer_with_capacity_async(input_data: Vec<i32>, capacity: usize) {
86    let mut consumer = VecConsumer::with_capacity(capacity);
87    let input = stream::iter(input_data.clone());
88    let boxed_input = Box::pin(input);
89
90    consumer.consume(boxed_input).await;
91    assert_eq!(consumer.into_vec(), input_data);
92  }
93
94  proptest! {
95    #[test]
96    fn test_vec_consumer_basic(
97      input_data in prop::collection::vec(any::<i32>(), 0..30)
98    ) {
99      let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
100      rt.block_on(test_vec_consumer_basic_async(input_data));
101    }
102
103    #[test]
104    fn test_vec_consumer_empty_input(_ in prop::num::u8::ANY) {
105      let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
106      rt.block_on(async {
107        let mut consumer = VecConsumer::new();
108        let input = stream::iter(Vec::<i32>::new());
109        let boxed_input = Box::pin(input);
110
111        consumer.consume(boxed_input).await;
112        let vec = consumer.into_vec();
113        assert!(vec.is_empty());
114      });
115    }
116
117    #[test]
118    fn test_vec_consumer_with_capacity(
119      input_data in prop::collection::vec(any::<i32>(), 0..30),
120      capacity in 0usize..1000usize
121    ) {
122      let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
123      rt.block_on(test_vec_consumer_with_capacity_async(input_data, capacity));
124    }
125
126    #[test]
127    fn test_error_handling_strategies(
128      name in prop::string::string_regex("[a-zA-Z0-9_]+").unwrap()
129    ) {
130      let consumer = VecConsumer::<i32>::new()
131        .with_error_strategy(ErrorStrategy::<i32>::Skip)
132        .with_name(name.clone());
133
134      prop_assert_eq!(
135        &consumer.config().error_strategy,
136        &ErrorStrategy::<i32>::Skip
137      );
138      prop_assert_eq!(consumer.config().name.as_str(), name.as_str());
139    }
140  }
141}