streamweave_stdio/consumers/
consumer.rs1use super::{stderr_consumer::StderrConsumer, stdout_consumer::StdoutConsumer};
2use async_trait::async_trait;
3use futures::StreamExt;
4use streamweave::{Consumer, ConsumerConfig};
5use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
6use tokio::io::AsyncWriteExt;
7
8#[async_trait]
9impl<T> Consumer for StdoutConsumer<T>
10where
11 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
12{
13 type InputPorts = (T,);
14
15 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
16 let mut stdout = tokio::io::stdout();
17 let component_name = self.config.name.clone();
18
19 while let Some(value) = stream.next().await {
20 let output = format!("{}\n", value);
21 match stdout.write_all(output.as_bytes()).await {
22 Ok(_) => {}
23 Err(e) => {
24 tracing::warn!(
25 component = %component_name,
26 error = %e,
27 "Failed to write to stdout, continuing"
28 );
29 }
30 }
31 }
32
33 if let Err(e) = stdout.flush().await {
35 tracing::warn!(
36 component = %component_name,
37 error = %e,
38 "Failed to flush stdout"
39 );
40 }
41 }
42
43 fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
44 self.config = config;
45 }
46
47 fn get_config_impl(&self) -> &ConsumerConfig<T> {
48 &self.config
49 }
50
51 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
52 &mut self.config
53 }
54
55 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
56 match self.config.error_strategy {
57 ErrorStrategy::Stop => ErrorAction::Stop,
58 ErrorStrategy::Skip => ErrorAction::Skip,
59 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
60 ErrorStrategy::Custom(ref handler) => handler(error),
61 _ => ErrorAction::Stop,
62 }
63 }
64
65 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
66 ErrorContext {
67 timestamp: chrono::Utc::now(),
68 item,
69 component_name: self.config.name.clone(),
70 component_type: std::any::type_name::<Self>().to_string(),
71 }
72 }
73
74 fn component_info(&self) -> ComponentInfo {
75 ComponentInfo {
76 name: self.config.name.clone(),
77 type_name: std::any::type_name::<Self>().to_string(),
78 }
79 }
80}
81
82#[async_trait]
83impl<T> Consumer for StderrConsumer<T>
84where
85 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
86{
87 type InputPorts = (T,);
88
89 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
90 let mut stderr = tokio::io::stderr();
91 let component_name = self.config.name.clone();
92
93 while let Some(value) = stream.next().await {
94 let output = format!("{}\n", value);
95 match stderr.write_all(output.as_bytes()).await {
96 Ok(_) => {}
97 Err(e) => {
98 tracing::warn!(
99 component = %component_name,
100 error = %e,
101 "Failed to write to stderr, continuing"
102 );
103 }
104 }
105 }
106
107 if let Err(e) = stderr.flush().await {
109 tracing::warn!(
110 component = %component_name,
111 error = %e,
112 "Failed to flush stderr"
113 );
114 }
115 }
116
117 fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
118 self.config = config;
119 }
120
121 fn get_config_impl(&self) -> &ConsumerConfig<T> {
122 &self.config
123 }
124
125 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
126 &mut self.config
127 }
128
129 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
130 match self.config.error_strategy {
131 ErrorStrategy::Stop => ErrorAction::Stop,
132 ErrorStrategy::Skip => ErrorAction::Skip,
133 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
134 ErrorStrategy::Custom(ref handler) => handler(error),
135 _ => ErrorAction::Stop,
136 }
137 }
138
139 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
140 ErrorContext {
141 timestamp: chrono::Utc::now(),
142 item,
143 component_name: self.config.name.clone(),
144 component_type: std::any::type_name::<Self>().to_string(),
145 }
146 }
147
148 fn component_info(&self) -> ComponentInfo {
149 ComponentInfo {
150 name: self.config.name.clone(),
151 type_name: std::any::type_name::<Self>().to_string(),
152 }
153 }
154}