arkflow_plugin/output/
stdout.rs

1//! Standard output components
2//!
3//! Outputs the processed data to standard output
4
5use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
6use arkflow_core::{Bytes, Content, Error, MessageBatch};
7use async_trait::async_trait;
8use datafusion::arrow;
9use datafusion::arrow::array::RecordBatch;
10use serde::{Deserialize, Serialize};
11use std::io::{self, Stdout, Write};
12use std::string::String;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16/// Standard output configuration
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct StdoutOutputConfig {
19    /// Whether to add a line break after each message
20    pub append_newline: Option<bool>,
21}
22
23/// Standard output components
24struct StdoutOutput<T> {
25    config: StdoutOutputConfig,
26    writer: Mutex<T>,
27}
28
29impl<T: StdWriter> StdoutOutput<T> {
30    /// Create a new standard output component
31    pub fn new(config: StdoutOutputConfig, writer: T) -> Result<Self, Error> {
32        Ok(Self {
33            config,
34            writer: Mutex::new(writer),
35        })
36    }
37}
38
39#[async_trait]
40impl<T> Output for StdoutOutput<T>
41where
42    T: StdWriter,
43{
44    async fn connect(&self) -> Result<(), Error> {
45        Ok(())
46    }
47
48    async fn write(&self, batch: &MessageBatch) -> Result<(), Error> {
49        match &batch.content {
50            Content::Arrow(v) => self.arrow_stdout(&v).await,
51            Content::Binary(v) => self.binary_stdout(&v).await,
52        }
53    }
54
55    async fn close(&self) -> Result<(), Error> {
56        Ok(())
57    }
58}
59impl<T: StdWriter> StdoutOutput<T> {
60    async fn arrow_stdout(&self, message_batch: &RecordBatch) -> Result<(), Error> {
61        let mut writer_std = self.writer.lock().await;
62
63        // Use Arrow's JSON serialization functionality
64        let mut buf = Vec::new();
65        let mut writer = arrow::json::ArrayWriter::new(&mut buf);
66        writer
67            .write(message_batch)
68            .map_err(|e| Error::Process(format!("Arrow JSON serialization error: {}", e)))?;
69        writer
70            .finish()
71            .map_err(|e| Error::Process(format!("Arrow JSON serialization error: {}", e)))?;
72        let s = String::from_utf8_lossy(&buf);
73
74        if self.config.append_newline.unwrap_or(true) {
75            writeln!(writer_std, "{}", s).map_err(Error::Io)?
76        } else {
77            write!(writer_std, "{}", s).map_err(Error::Io)?
78        }
79
80        writer_std.flush().map_err(Error::Io)?;
81        Ok(())
82    }
83    async fn binary_stdout(&self, msg: &[Bytes]) -> Result<(), Error> {
84        let mut writer_std = self.writer.lock().await;
85        for x in msg {
86            if self.config.append_newline.unwrap_or(true) {
87                writeln!(writer_std, "{}", String::from_utf8_lossy(&x)).map_err(Error::Io)?
88            } else {
89                write!(writer_std, "{}", String::from_utf8_lossy(&x)).map_err(Error::Io)?
90            }
91        }
92        Ok(())
93    }
94}
95
96pub(crate) struct StdoutOutputBuilder;
97impl OutputBuilder for StdoutOutputBuilder {
98    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
99        if config.is_none() {
100            return Err(Error::Config(
101                "Stdout output configuration is missing".to_string(),
102            ));
103        }
104        let config: StdoutOutputConfig = serde_json::from_value(config.clone().unwrap())?;
105        Ok(Arc::new(StdoutOutput::new(config, io::stdout())?))
106    }
107}
108
109pub fn init() {
110    register_output_builder("stdout", Arc::new(StdoutOutputBuilder));
111}
112
113trait StdWriter: Write + Send + Sync {}
114
115impl StdWriter for Stdout {}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use std::io::Cursor;
121
122    // Mock writer for testing
123    struct MockWriter(Cursor<Vec<u8>>);
124
125    impl MockWriter {
126        fn new() -> Self {
127            Self(Cursor::new(Vec::new()))
128        }
129
130        fn get_output(&self) -> String {
131            String::from_utf8_lossy(&self.0.get_ref()).to_string()
132        }
133    }
134
135    impl Write for MockWriter {
136        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
137            self.0.write(buf)
138        }
139
140        fn flush(&mut self) -> io::Result<()> {
141            self.0.flush()
142        }
143    }
144
145    impl StdWriter for MockWriter {}
146
147    /// Test basic functionality of StdoutOutput
148    #[tokio::test]
149    async fn test_basic_functionality() {
150        let config = StdoutOutputConfig {
151            append_newline: Some(true),
152        };
153        let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
154
155        // Test connect
156        assert!(output.connect().await.is_ok());
157
158        // Test write with simple text
159        let msg = MessageBatch::from_string("test message");
160        assert!(output.write(&msg).await.is_ok());
161
162        // Test close
163        assert!(output.close().await.is_ok());
164    }
165
166    /// Test handling of different data types (Arrow and Binary)
167    #[tokio::test]
168    async fn test_data_type_handling() {
169        let config = StdoutOutputConfig {
170            append_newline: Some(true),
171        };
172        let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
173
174        // Test binary data
175        let binary_msg = MessageBatch::from_string("binary test");
176        assert!(output.write(&binary_msg).await.is_ok());
177
178        // Test Arrow data (would need more complex setup)
179        // TODO: Add Arrow data type test cases
180    }
181
182    /// Test newline configuration behavior
183    #[tokio::test]
184    async fn test_newline_config() {
185        // Test with newline enabled
186        let config = StdoutOutputConfig {
187            append_newline: Some(true),
188        };
189        let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
190        let msg = MessageBatch::from_string("test");
191        output.write(&msg).await.unwrap();
192        let writer = output.writer.lock().await;
193        assert_eq!(writer.get_output(), "test\n");
194
195        // Test with newline disabled
196        let config = StdoutOutputConfig {
197            append_newline: Some(false),
198        };
199        let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
200        let msg = MessageBatch::from_string("test");
201        output.write(&msg).await.unwrap();
202        let writer = output.writer.lock().await;
203        assert_eq!(writer.get_output(), "test");
204    }
205
206    /// Test output content verification
207    #[tokio::test]
208    async fn test_output_content() {
209        let config = StdoutOutputConfig {
210            append_newline: Some(true),
211        };
212        let output = StdoutOutput::new(config, MockWriter::new()).unwrap();
213
214        // Test multiple messages
215        // Write multiple messages one by one
216        output
217            .write(&MessageBatch::from_string("first"))
218            .await
219            .unwrap();
220        output
221            .write(&MessageBatch::from_string("second"))
222            .await
223            .unwrap();
224        output
225            .write(&MessageBatch::from_string("third"))
226            .await
227            .unwrap();
228
229        let writer = output.writer.lock().await;
230        let output_content = writer.get_output();
231        assert!(output_content.contains("first\n"));
232        assert!(output_content.contains("second\n"));
233        assert!(output_content.contains("third\n"));
234    }
235}