streamweave_vec/
vec_producer.rs

1use futures::{Stream, stream};
2use std::pin::Pin;
3use streamweave::{Output, Producer, ProducerConfig};
4use streamweave_error::ErrorStrategy;
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, StreamError};
6
7/// A producer that yields items from a Vec.
8///
9/// This producer iterates over all items in the Vec and produces them
10/// in order.
11#[derive(Clone)]
12pub struct VecProducer<T>
13where
14  T: std::fmt::Debug + Clone + Send + Sync + 'static,
15{
16  /// The Vec data to produce from.
17  pub data: Vec<T>,
18  /// Configuration for the producer, including error handling strategy.
19  pub config: ProducerConfig<T>,
20}
21
22impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> VecProducer<T> {
23  /// Creates a new `VecProducer` with the given Vec.
24  ///
25  /// # Arguments
26  ///
27  /// * `data` - The Vec to produce items from.
28  pub fn new(data: Vec<T>) -> Self {
29    Self {
30      data,
31      config: streamweave::ProducerConfig::default(),
32    }
33  }
34
35  /// Sets the error handling strategy for this producer.
36  ///
37  /// # Arguments
38  ///
39  /// * `strategy` - The error handling strategy to use.
40  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
41    self.config.error_strategy = strategy;
42    self
43  }
44
45  /// Sets the name for this producer.
46  ///
47  /// # Arguments
48  ///
49  /// * `name` - The name to assign to this producer.
50  pub fn with_name(mut self, name: String) -> Self {
51    self.config.name = Some(name);
52    self
53  }
54}
55
56impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Output for VecProducer<T> {
57  type Output = T;
58  type OutputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
59}
60
61impl<T: std::fmt::Debug + Clone + Send + Sync + 'static> Producer for VecProducer<T> {
62  type OutputPorts = (T,);
63
64  fn produce(&mut self) -> Self::OutputStream {
65    let producer_name = self.config.name().unwrap_or("vec_producer".to_string());
66    println!("📤 [{}] Producing {} items", producer_name, self.data.len());
67    let stream = stream::iter(self.data.clone());
68    Box::pin(stream)
69  }
70
71  fn set_config_impl(&mut self, config: ProducerConfig<T>) {
72    self.config = config;
73  }
74
75  fn get_config_impl(&self) -> &ProducerConfig<T> {
76    &self.config
77  }
78
79  fn get_config_mut_impl(&mut self) -> &mut ProducerConfig<T> {
80    &mut self.config
81  }
82
83  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
84    match self.config.error_strategy() {
85      ErrorStrategy::Stop => ErrorAction::Stop,
86      ErrorStrategy::Skip => ErrorAction::Skip,
87      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
88      ErrorStrategy::Custom(ref handler) => handler(error),
89      _ => ErrorAction::Stop,
90    }
91  }
92
93  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
94    ErrorContext {
95      timestamp: chrono::Utc::now(),
96      item,
97      component_name: self
98        .config
99        .name
100        .clone()
101        .unwrap_or_else(|| "vec_producer".to_string()),
102      component_type: std::any::type_name::<Self>().to_string(),
103    }
104  }
105
106  fn component_info(&self) -> ComponentInfo {
107    ComponentInfo {
108      name: self
109        .config
110        .name()
111        .unwrap_or_else(|| "vec_producer".to_string()),
112      type_name: std::any::type_name::<Self>().to_string(),
113    }
114  }
115}