arkflow_plugin/output/
file.rs1use std::fs::{File, OpenOptions};
6use std::io::Write;
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10
11use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
12use arkflow_core::{Error, MessageBatch};
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use tokio::sync::Mutex;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct FileOutputConfig {
20 pub path: String,
22 pub append_newline: Option<bool>,
24 pub append: Option<bool>,
26}
27
28pub struct FileOutput {
30 config: FileOutputConfig,
31 writer: Arc<Mutex<Option<File>>>,
32 connected: AtomicBool,
33}
34
35impl FileOutput {
36 pub fn new(config: FileOutputConfig) -> Result<Self, Error> {
38 Ok(Self {
39 config: config.clone(),
40 writer: Arc::new(Mutex::new(None)),
41 connected: AtomicBool::new(false),
42 })
43 }
44}
45
46#[async_trait]
47impl Output for FileOutput {
48 async fn connect(&self) -> Result<(), Error> {
49 let path = Path::new(&self.config.path);
50
51 if let Some(parent) = path.parent() {
53 if !parent.exists() {
54 std::fs::create_dir_all(parent).map_err(Error::Io)?
55 }
56 }
57 let append = self.config.append.unwrap_or(true);
58 let file = OpenOptions::new()
60 .write(true)
61 .create(true)
62 .append(append)
63 .truncate(!append)
64 .open(path)
65 .map_err(Error::Io)?;
66 let writer_arc = self.writer.clone();
67 let mut writer_arc_guard = writer_arc.lock().await;
68 writer_arc_guard.replace(file);
69 self.connected.store(true, Ordering::SeqCst);
70 Ok(())
71 }
72
73 async fn write(&self, msg: &MessageBatch) -> Result<(), Error> {
74 let writer_arc = self.writer.clone();
75 let writer_arc_guard = writer_arc.lock().await;
76 if !self.connected.load(Ordering::SeqCst) || writer_arc_guard.is_none() {
77 return Err(Error::Connection("The output is not connected".to_string()));
78 }
79
80 let content = msg.as_string()?;
81 let writer = writer_arc_guard.as_ref();
82 let mut file =
83 writer.ok_or(Error::Connection("The output is not connected".to_string()))?;
84
85 for x in content {
86 if self.config.append_newline.unwrap_or(true) {
87 writeln!(file, "{}", x).map_err(Error::Io)?
88 } else {
89 write!(file, "{}", x).map_err(Error::Io)?
90 }
91 }
92
93 file.flush().map_err(Error::Io)?;
94 Ok(())
95 }
96
97 async fn close(&self) -> Result<(), Error> {
98 self.connected.store(false, Ordering::SeqCst);
99 let writer_arc = self.writer.clone();
100 let mut writer_arc_mutex_guard = writer_arc.lock().await;
101 *writer_arc_mutex_guard = None;
102 Ok(())
103 }
104}
105
106pub(crate) struct FileOutputBuilder;
107impl OutputBuilder for FileOutputBuilder {
108 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
109 if config.is_none() {
110 return Err(Error::Config(
111 "File output configuration is missing".to_string(),
112 ));
113 }
114
115 let config: FileOutputConfig = serde_json::from_value(config.clone().unwrap())?;
116 Ok(Arc::new(FileOutput::new(config)?))
117 }
118}
119
120pub fn init() {
121 register_output_builder("file", Arc::new(FileOutputBuilder));
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use tempfile::tempdir;
128 use tokio::fs::read_to_string;
129
130 #[tokio::test]
132 async fn test_create_and_write_to_file() {
133 let dir = tempdir().unwrap();
134 let file_path = dir.path().join("test.txt");
135
136 let config = FileOutputConfig {
137 path: file_path.to_str().unwrap().to_string(),
138 append_newline: Some(true),
139 append: Some(false),
140 };
141
142 let output = FileOutput::new(config).unwrap();
143 output.connect().await.unwrap();
144
145 let msg = MessageBatch::from_string("test content");
146 output.write(&msg).await.unwrap();
147 output.close().await.unwrap();
148
149 let content = read_to_string(&file_path).await.unwrap();
150 assert_eq!(content, "test content\n");
151 }
152
153 #[tokio::test]
155 async fn test_append_mode() {
156 let dir = tempdir().unwrap();
157 let file_path = dir.path().join("append.txt");
158
159 let config = FileOutputConfig {
160 path: file_path.to_str().unwrap().to_string(),
161 append_newline: Some(false),
162 append: Some(true),
163 };
164
165 let output = FileOutput::new(config).unwrap();
166 output.connect().await.unwrap();
167
168 output
170 .write(&MessageBatch::from_string("first"))
171 .await
172 .unwrap();
173 output
175 .write(&MessageBatch::from_string("second"))
176 .await
177 .unwrap();
178 output.close().await.unwrap();
179
180 let content = read_to_string(&file_path).await.unwrap();
181 assert_eq!(content, "firstsecond");
182 }
183
184 #[tokio::test]
186 async fn test_newline_configuration() {
187 let dir = tempdir().unwrap();
188 let file_path = dir.path().join("newline.txt");
189
190 let config = FileOutputConfig {
191 path: file_path.to_str().unwrap().to_string(),
192 append_newline: Some(false),
193 append: Some(false),
194 };
195
196 let output = FileOutput::new(config).unwrap();
197 output.connect().await.unwrap();
198 output
199 .write(&MessageBatch::from_string("no_newline"))
200 .await
201 .unwrap();
202 output.close().await.unwrap();
203
204 let content = read_to_string(&file_path).await.unwrap();
205 assert_eq!(content, "no_newline");
206 }
207
208 #[tokio::test]
210 async fn test_invalid_directory() {
211 let config = FileOutputConfig {
212 path: "/invalid/path/test.txt".to_string(),
213 append_newline: Some(true),
214 append: Some(false),
215 };
216
217 let output = FileOutput::new(config).unwrap();
218 let result = output.connect().await;
219 assert!(matches!(result, Err(Error::Io(_))));
220 }
221}