streamweave_vec/consumers/
consumer.rs1use super::vec_consumer::VecConsumer;
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6
7#[async_trait]
8impl<T> Consumer for VecConsumer<T>
9where
10 T: std::fmt::Debug + Clone + Send + Sync + 'static,
11{
12 type InputPorts = (T,);
13
14 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
15 let consumer_name = self.config.name.clone();
16 println!("📥 [{}] Starting to consume stream", consumer_name);
17 let mut count = 0;
18 while let Some(value) = stream.next().await {
19 count += 1;
20 println!(
21 " 📦 [{}] Consuming item #{}: {:?}",
22 consumer_name, count, value
23 );
24 self.vec.push(value);
25 }
26 println!("✅ [{}] Finished consuming {} items", consumer_name, count);
27 }
28
29 fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
30 self.config = config;
31 }
32
33 fn get_config_impl(&self) -> &ConsumerConfig<T> {
34 &self.config
35 }
36
37 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
38 &mut self.config
39 }
40
41 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
42 match self.config.error_strategy {
43 ErrorStrategy::Stop => ErrorAction::Stop,
44 ErrorStrategy::Skip => ErrorAction::Skip,
45 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
46 _ => ErrorAction::Stop,
47 }
48 }
49
50 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
51 ErrorContext {
52 timestamp: chrono::Utc::now(),
53 item,
54 component_name: self.config.name.clone(),
55 component_type: std::any::type_name::<Self>().to_string(),
56 }
57 }
58
59 fn component_info(&self) -> ComponentInfo {
60 ComponentInfo {
61 name: self.config.name.clone(),
62 type_name: std::any::type_name::<Self>().to_string(),
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70 use futures::stream;
71 use proptest::prelude::*;
72 use proptest::proptest;
73 use streamweave_error::ErrorStrategy;
74
75 async fn test_vec_consumer_basic_async(input_data: Vec<i32>) {
76 let mut consumer = VecConsumer::new();
77 let input = stream::iter(input_data.clone());
78 let boxed_input = Box::pin(input);
79
80 consumer.consume(boxed_input).await;
81 let vec = consumer.into_vec();
82 assert_eq!(vec, input_data);
83 }
84
85 async fn test_vec_consumer_with_capacity_async(input_data: Vec<i32>, capacity: usize) {
86 let mut consumer = VecConsumer::with_capacity(capacity);
87 let input = stream::iter(input_data.clone());
88 let boxed_input = Box::pin(input);
89
90 consumer.consume(boxed_input).await;
91 assert_eq!(consumer.into_vec(), input_data);
92 }
93
94 proptest! {
95 #[test]
96 fn test_vec_consumer_basic(
97 input_data in prop::collection::vec(any::<i32>(), 0..30)
98 ) {
99 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
100 rt.block_on(test_vec_consumer_basic_async(input_data));
101 }
102
103 #[test]
104 fn test_vec_consumer_empty_input(_ in prop::num::u8::ANY) {
105 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
106 rt.block_on(async {
107 let mut consumer = VecConsumer::new();
108 let input = stream::iter(Vec::<i32>::new());
109 let boxed_input = Box::pin(input);
110
111 consumer.consume(boxed_input).await;
112 let vec = consumer.into_vec();
113 assert!(vec.is_empty());
114 });
115 }
116
117 #[test]
118 fn test_vec_consumer_with_capacity(
119 input_data in prop::collection::vec(any::<i32>(), 0..30),
120 capacity in 0usize..1000usize
121 ) {
122 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
123 rt.block_on(test_vec_consumer_with_capacity_async(input_data, capacity));
124 }
125
126 #[test]
127 fn test_error_handling_strategies(
128 name in prop::string::string_regex("[a-zA-Z0-9_]+").unwrap()
129 ) {
130 let consumer = VecConsumer::<i32>::new()
131 .with_error_strategy(ErrorStrategy::<i32>::Skip)
132 .with_name(name.clone());
133
134 prop_assert_eq!(
135 &consumer.config().error_strategy,
136 &ErrorStrategy::<i32>::Skip
137 );
138 prop_assert_eq!(consumer.config().name.as_str(), name.as_str());
139 }
140 }
141}