arkflow_plugin/output/
file.rs

1//! File output component
2//!
3//! Output the processed data to a file
4
5use std::fs::{File, OpenOptions};
6use std::io::Write;
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10
11use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
12use arkflow_core::{Error, MessageBatch};
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use tokio::sync::Mutex;
16
17/// File output configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct FileOutputConfig {
20    /// Output file path
21    pub path: String,
22    /// Whether to add a newline after each message
23    pub append_newline: Option<bool>,
24    /// Whether to append to the end of the file (instead of overwriting)
25    pub append: Option<bool>,
26}
27
28/// File output component
29pub struct FileOutput {
30    config: FileOutputConfig,
31    writer: Arc<Mutex<Option<File>>>,
32    connected: AtomicBool,
33}
34
35impl FileOutput {
36    /// Create a new file output component
37    pub fn new(config: FileOutputConfig) -> Result<Self, Error> {
38        Ok(Self {
39            config: config.clone(),
40            writer: Arc::new(Mutex::new(None)),
41            connected: AtomicBool::new(false),
42        })
43    }
44}
45
46#[async_trait]
47impl Output for FileOutput {
48    async fn connect(&self) -> Result<(), Error> {
49        let path = Path::new(&self.config.path);
50
51        // Make sure the directory exists
52        if let Some(parent) = path.parent() {
53            if !parent.exists() {
54                std::fs::create_dir_all(parent).map_err(Error::Io)?
55            }
56        }
57        let append = self.config.append.unwrap_or(true);
58        // Open the file
59        let file = OpenOptions::new()
60            .write(true)
61            .create(true)
62            .append(append)
63            .truncate(!append)
64            .open(path)
65            .map_err(Error::Io)?;
66        let writer_arc = self.writer.clone();
67        let mut writer_arc_guard = writer_arc.lock().await;
68        writer_arc_guard.replace(file);
69        self.connected.store(true, Ordering::SeqCst);
70        Ok(())
71    }
72
73    async fn write(&self, msg: &MessageBatch) -> Result<(), Error> {
74        let writer_arc = self.writer.clone();
75        let writer_arc_guard = writer_arc.lock().await;
76        if !self.connected.load(Ordering::SeqCst) || writer_arc_guard.is_none() {
77            return Err(Error::Connection("The output is not connected".to_string()));
78        }
79
80        let content = msg.as_string()?;
81        let writer = writer_arc_guard.as_ref();
82        let mut file =
83            writer.ok_or(Error::Connection("The output is not connected".to_string()))?;
84
85        for x in content {
86            if self.config.append_newline.unwrap_or(true) {
87                writeln!(file, "{}", x).map_err(Error::Io)?
88            } else {
89                write!(file, "{}", x).map_err(Error::Io)?
90            }
91        }
92
93        file.flush().map_err(Error::Io)?;
94        Ok(())
95    }
96
97    async fn close(&self) -> Result<(), Error> {
98        self.connected.store(false, Ordering::SeqCst);
99        let writer_arc = self.writer.clone();
100        let mut writer_arc_mutex_guard = writer_arc.lock().await;
101        *writer_arc_mutex_guard = None;
102        Ok(())
103    }
104}
105
106pub(crate) struct FileOutputBuilder;
107impl OutputBuilder for FileOutputBuilder {
108    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
109        if config.is_none() {
110            return Err(Error::Config(
111                "File output configuration is missing".to_string(),
112            ));
113        }
114
115        let config: FileOutputConfig = serde_json::from_value(config.clone().unwrap())?;
116        Ok(Arc::new(FileOutput::new(config)?))
117    }
118}
119
120pub fn init() {
121    register_output_builder("file", Arc::new(FileOutputBuilder));
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use tempfile::tempdir;
128    use tokio::fs::read_to_string;
129
130    /// Test creating new file and writing content
131    #[tokio::test]
132    async fn test_create_and_write_to_file() {
133        let dir = tempdir().unwrap();
134        let file_path = dir.path().join("test.txt");
135
136        let config = FileOutputConfig {
137            path: file_path.to_str().unwrap().to_string(),
138            append_newline: Some(true),
139            append: Some(false),
140        };
141
142        let output = FileOutput::new(config).unwrap();
143        output.connect().await.unwrap();
144
145        let msg = MessageBatch::from_string("test content");
146        output.write(&msg).await.unwrap();
147        output.close().await.unwrap();
148
149        let content = read_to_string(&file_path).await.unwrap();
150        assert_eq!(content, "test content\n");
151    }
152
153    /// Test appending to existing file
154    #[tokio::test]
155    async fn test_append_mode() {
156        let dir = tempdir().unwrap();
157        let file_path = dir.path().join("append.txt");
158
159        let config = FileOutputConfig {
160            path: file_path.to_str().unwrap().to_string(),
161            append_newline: Some(false),
162            append: Some(true),
163        };
164
165        let output = FileOutput::new(config).unwrap();
166        output.connect().await.unwrap();
167
168        // First write
169        output
170            .write(&MessageBatch::from_string("first"))
171            .await
172            .unwrap();
173        // Second write
174        output
175            .write(&MessageBatch::from_string("second"))
176            .await
177            .unwrap();
178        output.close().await.unwrap();
179
180        let content = read_to_string(&file_path).await.unwrap();
181        assert_eq!(content, "firstsecond");
182    }
183
184    /// Test newline configuration handling
185    #[tokio::test]
186    async fn test_newline_configuration() {
187        let dir = tempdir().unwrap();
188        let file_path = dir.path().join("newline.txt");
189
190        let config = FileOutputConfig {
191            path: file_path.to_str().unwrap().to_string(),
192            append_newline: Some(false),
193            append: Some(false),
194        };
195
196        let output = FileOutput::new(config).unwrap();
197        output.connect().await.unwrap();
198        output
199            .write(&MessageBatch::from_string("no_newline"))
200            .await
201            .unwrap();
202        output.close().await.unwrap();
203
204        let content = read_to_string(&file_path).await.unwrap();
205        assert_eq!(content, "no_newline");
206    }
207
208    /// Test error handling for invalid directory
209    #[tokio::test]
210    async fn test_invalid_directory() {
211        let config = FileOutputConfig {
212            path: "/invalid/path/test.txt".to_string(),
213            append_newline: Some(true),
214            append: Some(false),
215        };
216
217        let output = FileOutput::new(config).unwrap();
218        let result = output.connect().await;
219        assert!(matches!(result, Err(Error::Io(_))));
220    }
221}