streamweave_stdio/
stdout_consumer.rs

1use async_trait::async_trait;
2use futures::StreamExt;
3use streamweave::{Consumer, ConsumerConfig, Input};
4use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
5use tokio::io::AsyncWriteExt;
6
7/// A consumer that writes items to standard output (stdout).
8///
9/// This consumer writes each item from the stream to stdout, one per line.
10/// It's useful for building command-line tools that output to stdout.
11///
12/// # Example
13///
14/// ```rust,no_run
15/// use streamweave_stdio::StdoutConsumer;
16/// use streamweave::Pipeline;
17///
18/// let consumer = StdoutConsumer::new();
19/// let pipeline = Pipeline::new()
20///     .producer(/* ... */)
21///     .transformer(/* ... */)
22///     .consumer(consumer);
23/// ```
24pub struct StdoutConsumer<T>
25where
26  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
27{
28  /// Configuration for the consumer, including error handling strategy.
29  pub config: ConsumerConfig<T>,
30}
31
32impl<T> StdoutConsumer<T>
33where
34  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
35{
36  /// Creates a new `StdoutConsumer` with default configuration.
37  pub fn new() -> Self {
38    Self {
39      config: ConsumerConfig::default(),
40    }
41  }
42
43  /// Sets the error handling strategy for this consumer.
44  ///
45  /// # Arguments
46  ///
47  /// * `strategy` - The error handling strategy to use.
48  pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
49    self.config.error_strategy = strategy;
50    self
51  }
52
53  /// Sets the name for this consumer.
54  ///
55  /// # Arguments
56  ///
57  /// * `name` - The name to assign to this consumer.
58  pub fn with_name(mut self, name: String) -> Self {
59    self.config.name = name;
60    self
61  }
62}
63
64impl<T> Default for StdoutConsumer<T>
65where
66  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
67{
68  fn default() -> Self {
69    Self::new()
70  }
71}
72
73// Trait implementations for StdoutConsumer
74
75impl<T> Input for StdoutConsumer<T>
76where
77  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
78{
79  type Input = T;
80  type InputStream = futures::stream::BoxStream<'static, T>;
81}
82
83#[async_trait]
84impl<T> Consumer for StdoutConsumer<T>
85where
86  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
87{
88  type InputPorts = (T,);
89
90  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
91    let mut stdout = tokio::io::stdout();
92    let component_name = self.config.name.clone();
93
94    while let Some(value) = stream.next().await {
95      let output = format!("{}\n", value);
96      match stdout.write_all(output.as_bytes()).await {
97        Ok(_) => {}
98        Err(e) => {
99          tracing::warn!(
100            component = %component_name,
101            error = %e,
102            "Failed to write to stdout, continuing"
103          );
104        }
105      }
106    }
107
108    // Flush stdout to ensure all output is written
109    if let Err(e) = stdout.flush().await {
110      tracing::warn!(
111        component = %component_name,
112        error = %e,
113        "Failed to flush stdout"
114      );
115    }
116  }
117
118  fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
119    self.config = config;
120  }
121
122  fn get_config_impl(&self) -> &ConsumerConfig<T> {
123    &self.config
124  }
125
126  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
127    &mut self.config
128  }
129
130  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
131    match self.config.error_strategy {
132      ErrorStrategy::Stop => ErrorAction::Stop,
133      ErrorStrategy::Skip => ErrorAction::Skip,
134      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
135      ErrorStrategy::Custom(ref handler) => handler(error),
136      _ => ErrorAction::Stop,
137    }
138  }
139
140  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
141    ErrorContext {
142      timestamp: chrono::Utc::now(),
143      item,
144      component_name: self.config.name.clone(),
145      component_type: std::any::type_name::<Self>().to_string(),
146    }
147  }
148
149  fn component_info(&self) -> ComponentInfo {
150    ComponentInfo {
151      name: self.config.name.clone(),
152      type_name: std::any::type_name::<Self>().to_string(),
153    }
154  }
155}