arkflow_plugin/output/
stdout.rs1use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
6use arkflow_core::{Bytes, Content, Error, MessageBatch};
7use async_trait::async_trait;
8use datafusion::arrow;
9use datafusion::arrow::array::RecordBatch;
10use serde::{Deserialize, Serialize};
11use std::io::{self, Stdout, Write};
12use std::string::String;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StdoutOutputConfig {
19 pub append_newline: Option<bool>,
21}
22
23struct StdoutOutput<T> {
25 config: StdoutOutputConfig,
26 writer: Mutex<T>,
27}
28
29impl<T: StdWriter> StdoutOutput<T> {
30 pub fn new(config: StdoutOutputConfig, writer: T) -> Result<Self, Error> {
32 Ok(Self {
33 config,
34 writer: Mutex::new(writer),
35 })
36 }
37}
38
39#[async_trait]
40impl<T> Output for StdoutOutput<T>
41where
42 T: StdWriter,
43{
44 async fn connect(&self) -> Result<(), Error> {
45 Ok(())
46 }
47
48 async fn write(&self, batch: &MessageBatch) -> Result<(), Error> {
49 match &batch.content {
50 Content::Arrow(v) => self.arrow_stdout(&v).await,
51 Content::Binary(v) => self.binary_stdout(&v).await,
52 }
53 }
54
55 async fn close(&self) -> Result<(), Error> {
56 Ok(())
57 }
58}
59impl<T: StdWriter> StdoutOutput<T> {
60 async fn arrow_stdout(&self, message_batch: &RecordBatch) -> Result<(), Error> {
61 let mut writer_std = self.writer.lock().await;
62
63 let mut buf = Vec::new();
65 let mut writer = arrow::json::ArrayWriter::new(&mut buf);
66 writer
67 .write(message_batch)
68 .map_err(|e| Error::Process(format!("Arrow JSON serialization error: {}", e)))?;
69 writer
70 .finish()
71 .map_err(|e| Error::Process(format!("Arrow JSON serialization error: {}", e)))?;
72 let s = String::from_utf8_lossy(&buf);
73
74 if self.config.append_newline.unwrap_or(true) {
75 writeln!(writer_std, "{}", s).map_err(Error::Io)?
76 } else {
77 write!(writer_std, "{}", s).map_err(Error::Io)?
78 }
79
80 writer_std.flush().map_err(Error::Io)?;
81 Ok(())
82 }
83 async fn binary_stdout(&self, msg: &[Bytes]) -> Result<(), Error> {
84 let mut writer_std = self.writer.lock().await;
85 for x in msg {
86 if self.config.append_newline.unwrap_or(true) {
87 writeln!(writer_std, "{}", String::from_utf8_lossy(&x)).map_err(Error::Io)?
88 } else {
89 write!(writer_std, "{}", String::from_utf8_lossy(&x)).map_err(Error::Io)?
90 }
91 }
92 Ok(())
93 }
94}
95
96pub(crate) struct StdoutOutputBuilder;
97impl OutputBuilder for StdoutOutputBuilder {
98 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
99 if config.is_none() {
100 return Err(Error::Config(
101 "Stdout output configuration is missing".to_string(),
102 ));
103 }
104 let config: StdoutOutputConfig = serde_json::from_value(config.clone().unwrap())?;
105 Ok(Arc::new(StdoutOutput::new(config, io::stdout())?))
106 }
107}
108
109pub fn init() {
110 register_output_builder("stdout", Arc::new(StdoutOutputBuilder));
111}
112
113trait StdWriter: Write + Send + Sync {}
114
115impl StdWriter for Stdout {}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use std::io::Cursor;
121
122 struct MockWriter(Cursor<Vec<u8>>);
124
125 impl MockWriter {
126 fn new() -> Self {
127 Self(Cursor::new(Vec::new()))
128 }
129
130 fn get_output(&self) -> String {
131 String::from_utf8_lossy(&self.0.get_ref()).to_string()
132 }
133 }
134
135 impl Write for MockWriter {
136 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
137 self.0.write(buf)
138 }
139
140 fn flush(&mut self) -> io::Result<()> {
141 self.0.flush()
142 }
143 }
144
145 impl StdWriter for MockWriter {}
146
147 #[tokio::test]
149 async fn test_basic_functionality() {
150 let config = StdoutOutputConfig {
151 append_newline: Some(true),
152 };
153 let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
154
155 assert!(output.connect().await.is_ok());
157
158 let msg = MessageBatch::from_string("test message");
160 assert!(output.write(&msg).await.is_ok());
161
162 assert!(output.close().await.is_ok());
164 }
165
166 #[tokio::test]
168 async fn test_data_type_handling() {
169 let config = StdoutOutputConfig {
170 append_newline: Some(true),
171 };
172 let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
173
174 let binary_msg = MessageBatch::from_string("binary test");
176 assert!(output.write(&binary_msg).await.is_ok());
177
178 }
181
182 #[tokio::test]
184 async fn test_newline_config() {
185 let config = StdoutOutputConfig {
187 append_newline: Some(true),
188 };
189 let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
190 let msg = MessageBatch::from_string("test");
191 output.write(&msg).await.unwrap();
192 let writer = output.writer.lock().await;
193 assert_eq!(writer.get_output(), "test\n");
194
195 let config = StdoutOutputConfig {
197 append_newline: Some(false),
198 };
199 let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
200 let msg = MessageBatch::from_string("test");
201 output.write(&msg).await.unwrap();
202 let writer = output.writer.lock().await;
203 assert_eq!(writer.get_output(), "test");
204 }
205
206 #[tokio::test]
208 async fn test_output_content() {
209 let config = StdoutOutputConfig {
210 append_newline: Some(true),
211 };
212 let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
213
214 output
217 .write(&MessageBatch::from_string("first"))
218 .await
219 .unwrap();
220 output
221 .write(&MessageBatch::from_string("second"))
222 .await
223 .unwrap();
224 output
225 .write(&MessageBatch::from_string("third"))
226 .await
227 .unwrap();
228
229 let writer = output.writer.lock().await;
230 let output_content = writer.get_output();
231 assert!(output_content.contains("first\n"));
232 assert!(output_content.contains("second\n"));
233 assert!(output_content.contains("third\n"));
234 }
235}