streamweave_stdio/consumers/
consumer.rs

1use super::{stderr_consumer::StderrConsumer, stdout_consumer::StdoutConsumer};
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6use tokio::io::AsyncWriteExt;
7
8#[async_trait]
9impl<T> Consumer for StdoutConsumer<T>
10where
11  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
12{
13  type InputPorts = (T,);
14
15  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
16    let mut stdout = tokio::io::stdout();
17    let component_name = self.config.name.clone();
18
19    while let Some(value) = stream.next().await {
20      let output = format!("{}\n", value);
21      match stdout.write_all(output.as_bytes()).await {
22        Ok(_) => {}
23        Err(e) => {
24          tracing::warn!(
25            component = %component_name,
26            error = %e,
27            "Failed to write to stdout, continuing"
28          );
29        }
30      }
31    }
32
33    // Flush stdout to ensure all output is written
34    if let Err(e) = stdout.flush().await {
35      tracing::warn!(
36        component = %component_name,
37        error = %e,
38        "Failed to flush stdout"
39      );
40    }
41  }
42
43  fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
44    self.config = config;
45  }
46
47  fn get_config_impl(&self) -> &ConsumerConfig<T> {
48    &self.config
49  }
50
51  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
52    &mut self.config
53  }
54
55  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
56    match self.config.error_strategy {
57      ErrorStrategy::Stop => ErrorAction::Stop,
58      ErrorStrategy::Skip => ErrorAction::Skip,
59      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
60      ErrorStrategy::Custom(ref handler) => handler(error),
61      _ => ErrorAction::Stop,
62    }
63  }
64
65  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
66    ErrorContext {
67      timestamp: chrono::Utc::now(),
68      item,
69      component_name: self.config.name.clone(),
70      component_type: std::any::type_name::<Self>().to_string(),
71    }
72  }
73
74  fn component_info(&self) -> ComponentInfo {
75    ComponentInfo {
76      name: self.config.name.clone(),
77      type_name: std::any::type_name::<Self>().to_string(),
78    }
79  }
80}
81
82#[async_trait]
83impl<T> Consumer for StderrConsumer<T>
84where
85  T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
86{
87  type InputPorts = (T,);
88
89  async fn consume(&mut self, mut stream: Self::InputStream) -> () {
90    let mut stderr = tokio::io::stderr();
91    let component_name = self.config.name.clone();
92
93    while let Some(value) = stream.next().await {
94      let output = format!("{}\n", value);
95      match stderr.write_all(output.as_bytes()).await {
96        Ok(_) => {}
97        Err(e) => {
98          tracing::warn!(
99            component = %component_name,
100            error = %e,
101            "Failed to write to stderr, continuing"
102          );
103        }
104      }
105    }
106
107    // Flush stderr to ensure all output is written
108    if let Err(e) = stderr.flush().await {
109      tracing::warn!(
110        component = %component_name,
111        error = %e,
112        "Failed to flush stderr"
113      );
114    }
115  }
116
117  fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
118    self.config = config;
119  }
120
121  fn get_config_impl(&self) -> &ConsumerConfig<T> {
122    &self.config
123  }
124
125  fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
126    &mut self.config
127  }
128
129  fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
130    match self.config.error_strategy {
131      ErrorStrategy::Stop => ErrorAction::Stop,
132      ErrorStrategy::Skip => ErrorAction::Skip,
133      ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
134      ErrorStrategy::Custom(ref handler) => handler(error),
135      _ => ErrorAction::Stop,
136    }
137  }
138
139  fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
140    ErrorContext {
141      timestamp: chrono::Utc::now(),
142      item,
143      component_name: self.config.name.clone(),
144      component_type: std::any::type_name::<Self>().to_string(),
145    }
146  }
147
148  fn component_info(&self) -> ComponentInfo {
149    ComponentInfo {
150      name: self.config.name.clone(),
151      type_name: std::any::type_name::<Self>().to_string(),
152    }
153  }
154}