streamweave_stdio/
stdout_consumer.rs1use async_trait::async_trait;
2use futures::StreamExt;
3use streamweave::{Consumer, ConsumerConfig, Input};
4use streamweave_error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
5use tokio::io::AsyncWriteExt;
6
7pub struct StdoutConsumer<T>
25where
26 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
27{
28 pub config: ConsumerConfig<T>,
30}
31
32impl<T> StdoutConsumer<T>
33where
34 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
35{
36 pub fn new() -> Self {
38 Self {
39 config: ConsumerConfig::default(),
40 }
41 }
42
43 pub fn with_error_strategy(mut self, strategy: ErrorStrategy<T>) -> Self {
49 self.config.error_strategy = strategy;
50 self
51 }
52
53 pub fn with_name(mut self, name: String) -> Self {
59 self.config.name = name;
60 self
61 }
62}
63
64impl<T> Default for StdoutConsumer<T>
65where
66 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
67{
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl<T> Input for StdoutConsumer<T>
76where
77 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
78{
79 type Input = T;
80 type InputStream = futures::stream::BoxStream<'static, T>;
81}
82
83#[async_trait]
84impl<T> Consumer for StdoutConsumer<T>
85where
86 T: std::fmt::Debug + Clone + Send + Sync + std::fmt::Display + 'static,
87{
88 type InputPorts = (T,);
89
90 async fn consume(&mut self, mut stream: Self::InputStream) -> () {
91 let mut stdout = tokio::io::stdout();
92 let component_name = self.config.name.clone();
93
94 while let Some(value) = stream.next().await {
95 let output = format!("{}\n", value);
96 match stdout.write_all(output.as_bytes()).await {
97 Ok(_) => {}
98 Err(e) => {
99 tracing::warn!(
100 component = %component_name,
101 error = %e,
102 "Failed to write to stdout, continuing"
103 );
104 }
105 }
106 }
107
108 if let Err(e) = stdout.flush().await {
110 tracing::warn!(
111 component = %component_name,
112 error = %e,
113 "Failed to flush stdout"
114 );
115 }
116 }
117
118 fn set_config_impl(&mut self, config: ConsumerConfig<T>) {
119 self.config = config;
120 }
121
122 fn get_config_impl(&self) -> &ConsumerConfig<T> {
123 &self.config
124 }
125
126 fn get_config_mut_impl(&mut self) -> &mut ConsumerConfig<T> {
127 &mut self.config
128 }
129
130 fn handle_error(&self, error: &StreamError<T>) -> ErrorAction {
131 match self.config.error_strategy {
132 ErrorStrategy::Stop => ErrorAction::Stop,
133 ErrorStrategy::Skip => ErrorAction::Skip,
134 ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
135 ErrorStrategy::Custom(ref handler) => handler(error),
136 _ => ErrorAction::Stop,
137 }
138 }
139
140 fn create_error_context(&self, item: Option<T>) -> ErrorContext<T> {
141 ErrorContext {
142 timestamp: chrono::Utc::now(),
143 item,
144 component_name: self.config.name.clone(),
145 component_type: std::any::type_name::<Self>().to_string(),
146 }
147 }
148
149 fn component_info(&self) -> ComponentInfo {
150 ComponentInfo {
151 name: self.config.name.clone(),
152 type_name: std::any::type_name::<Self>().to_string(),
153 }
154 }
155}