streamweave_vec/
vec_consumer.rs1use async_trait::async_trait;
2use futures::{Stream, StreamExt};
3use std::pin::Pin;
4use streamweave::{Consumer, ConsumerConfig, Input};
5use streamweave_error::ErrorStrategy;
6use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, StreamError};
7
8#[derive(Clone)]
13pub struct VecConsumer<T>
14where
15 T: std::fmt::Debug + Clone + Send + Sync + 'static,
16{
17 pub vec: Vec<T>,
19 pub config: ConsumerConfig<T>,
21}
22
23impl<T> Default for VecConsumer<T>
24where
25 T: std::fmt::Debug + Clone + Send + Sync + 'static,
26{
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl<T> VecConsumer<T>
33where
34 T: std::fmt::Debug + Clone + Send + Sync + 'static,
35{
36 pub fn new() -> Self {
38 Self {
39 vec: Vec::new(),
40 config: ConsumerConfig::default(),
41 }
42 }
43
44 pub fn with_capacity(capacity: usize) -> Self {
50 Self {
51 vec: Vec::with_capacity(capacity),
52 config: ConsumerConfig::default(),
53 }
54 }
55
56 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
62 self.config.error_strategy = strategy;
63 self
64 }
65
66 pub fn with_name(mut self, name: String) -> Self {
72 self.config.name = name;
73 self
74 }
75
76 pub fn into_vec(self) -> Vec<T> {
82 self.vec
83 }
84}
85
86impl<T> Input for VecConsumer<T>
87where
88 T: std::fmt::Debug + Clone + Send + Sync + 'static,
89{
90 type Input = T;
91 type InputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
92}
93
94#[async_trait]
95impl<T> Consumer for VecConsumer<T>
96where
97 T: std::fmt::Debug + Clone + Send + Sync + 'static,
98{
99 type InputPorts = (T,);
100
101 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
102 let consumer_name = self.config.name.clone();
103 println!("📥 [{}] Starting to consume stream", consumer_name);
104 let mut count = 0;
105 while let Some(value) = stream.next().await {
106 count += 1;
107 println!(
108 " 📦 [{}] Consuming item #{}: {:?}",
109 consumer_name, count, value
110 );
111 self.vec.push(value);
112 }
113 println!("✅ [{}] Finished consuming {} items", consumer_name, count);
114 }
115
116 fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
117 self.config = config;
118 }
119
120 fn get_config_impl(&self) -> &ConsumerConfig<T> {
121 &self.config
122 }
123
124 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
125 &mut self.config
126 }
127
128 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
129 match self.config.error_strategy {
130 ErrorStrategy::Stop => ErrorAction::Stop,
131 ErrorStrategy::Skip => ErrorAction::Skip,
132 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
133 _ => ErrorAction::Stop,
134 }
135 }
136
137 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
138 ErrorContext {
139 timestamp: chrono::Utc::now(),
140 item,
141 component_name: self.config.name.clone(),
142 component_type: std::any::type_name::<Self>().to_string(),
143 }
144 }
145
146 fn component_info(&self) -> ComponentInfo {
147 ComponentInfo {
148 name: self.config.name.clone(),
149 type_name: std::any::type_name::<Self>().to_string(),
150 }
151 }
152}