streamweave_vec/
vec_producer.rs1use futures::{Stream, stream};
2use std::pin::Pin;
3use streamweave::{Output, Producer, ProducerConfig};
4use streamweave_error::ErrorStrategy;
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, StreamError};
6
7#[derive(Clone)]
12pub struct VecProducer<T>
13where
14 T: std::fmt::Debug + Clone + Send + Sync + 'static,
15{
16 pub data: Vec<T>,
18 pub config: ProducerConfig<T>,
20}
21
22impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> VecProducer<T> {
23 pub fn new(data: Vec<T>) -> Self {
29 Self {
30 data,
31 config: streamweave::ProducerConfig::default(),
32 }
33 }
34
35 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
41 self.config.error_strategy = strategy;
42 self
43 }
44
45 pub fn with_name(mut self, name: String) -> Self {
51 self.config.name = Some(name);
52 self
53 }
54}
55
56impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Output for VecProducer<T> {
57 type Output = T;
58 type OutputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
59}
60
61impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Producer for VecProducer<T> {
62 type OutputPorts = (T,);
63
64 fn produce(&mut self) -> Self::OutputStream {
65 let producer_name = self.config.name().unwrap_or("vec_producer".to_string());
66 println!("📤 [{}] Producing {} items", producer_name, self.data.len());
67 let stream = stream::iter(self.data.clone());
68 Box::pin(stream)
69 }
70
71 fn set_config_impl(&mut self, config: ProducerConfig<T>) {
72 self.config = config;
73 }
74
75 fn get_config_impl(&self) -> &ProducerConfig<T> {
76 &self.config
77 }
78
79 fn get_config_mut_impl(&mut self) -> &mut ProducerConfig<T> {
80 &mut self.config
81 }
82
83 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
84 match self.config.error_strategy() {
85 ErrorStrategy::Stop => ErrorAction::Stop,
86 ErrorStrategy::Skip => ErrorAction::Skip,
87 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
88 ErrorStrategy::Custom(ref handler) => handler(error),
89 _ => ErrorAction::Stop,
90 }
91 }
92
93 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
94 ErrorContext {
95 timestamp: chrono::Utc::now(),
96 item,
97 component_name: self
98 .config
99 .name
100 .clone()
101 .unwrap_or_else(|| "vec_producer".to_string()),
102 component_type: std::any::type_name::<Self>().to_string(),
103 }
104 }
105
106 fn component_info(&self) -> ComponentInfo {
107 ComponentInfo {
108 name: self
109 .config
110 .name()
111 .unwrap_or_else(|| "vec_producer".to_string()),
112 type_name: std::any::type_name::<Self>().to_string(),
113 }
114 }
115}