arkflow_plugin/input/
file.rs1use std::fs::File;
6use std::io::{BufRead, BufReader, Seek, SeekFrom};
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use tokio::sync::Mutex;
11
12use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder, NoopAck};
13use arkflow_core::{Error, MessageBatch};
14use async_trait::async_trait;
15use serde::{Deserialize, Serialize};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FileInputConfig {
19 pub path: String,
21 pub close_on_eof: Option<bool>,
23 pub start_from_beginning: Option<bool>,
25}
26
27pub struct FileInput {
29 config: FileInputConfig,
30 reader: Arc<Mutex<Option<BufReader<File>>>>,
31 connected: AtomicBool,
32 eof_reached: AtomicBool,
33}
34
35impl FileInput {
36 pub fn new(config: FileInputConfig) -> Result<Self, Error> {
38 Ok(Self {
39 config,
40 reader: Arc::new(Mutex::new(None)),
41 connected: AtomicBool::new(false),
42 eof_reached: AtomicBool::new(false),
43 })
44 }
45}
46
47#[async_trait]
48impl Input for FileInput {
49 async fn connect(&self) -> Result<(), Error> {
50 let path = Path::new(&self.config.path);
51
52 let file = File::open(path).map_err(|e| {
54 Error::Connection(format!("Unable to open file {}: {}", self.config.path, e))
55 })?;
56
57 let mut reader = BufReader::new(file);
58
59 if !self.config.start_from_beginning.unwrap_or(true) {
61 reader
62 .seek(SeekFrom::End(0))
63 .map_err(|e| Error::Process(format!("Unable to seek to end of file: {}", e)))?;
64 }
65
66 let reader_arc = self.reader.clone();
67 reader_arc.lock().await.replace(reader);
68 self.connected.store(true, Ordering::SeqCst);
69 self.eof_reached.store(false, Ordering::SeqCst);
70 Ok(())
71 }
72
73 async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
74 let reader_arc = self.reader.clone();
75 let mut reader_mutex = reader_arc.lock().await;
76 if !self.connected.load(Ordering::SeqCst) || reader_mutex.is_none() {
77 return Err(Error::Connection("The input is not connected".to_string()));
78 }
79
80 if self.eof_reached.load(Ordering::SeqCst) && self.config.close_on_eof.unwrap_or(true) {
81 return Err(Error::EOF);
82 }
83
84 let bytes_read;
85 let mut line = String::new();
86 {
87 let reader_mutex = reader_mutex.as_mut();
88 if reader_mutex.is_none() {
89 return Err(Error::Connection("The input is not connected".to_string()));
90 }
91
92 let reader = reader_mutex.unwrap();
93 bytes_read = reader.read_line(&mut line).map_err(Error::Io)?;
94 }
95
96 if bytes_read == 0 {
97 self.eof_reached.store(true, Ordering::SeqCst);
98
99 if self.config.close_on_eof.unwrap_or(true) {
100 return Err(Error::EOF);
101 }
102
103 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
104 return Err(Error::Process("Wait for new data".to_string()));
105 }
106
107 if line.ends_with('\n') {
109 line.pop();
110 if line.ends_with('\r') {
111 line.pop();
112 }
113 }
114
115 Ok((MessageBatch::from_string(&line), Arc::new(NoopAck)))
116 }
117
118 async fn close(&self) -> Result<(), Error> {
119 self.connected.store(false, Ordering::SeqCst);
120 let reader_arc = self.reader.clone();
121 let mut reader_mutex = reader_arc.lock().await;
122 *reader_mutex = None;
123 Ok(())
124 }
125}
126
127pub(crate) struct FileInputBuilder;
128impl InputBuilder for FileInputBuilder {
129 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error> {
130 if config.is_none() {
131 return Err(Error::Config(
132 "File input configuration is missing".to_string(),
133 ));
134 }
135
136 let config: FileInputConfig = serde_json::from_value(config.clone().unwrap())?;
137 Ok(Arc::new(FileInput::new(config)?))
138 }
139}
140
141pub fn init() {
142 register_input_builder("file", Arc::new(FileInputBuilder));
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use std::fs::File;
149 use std::io::Write;
150 use tempfile::tempdir;
151
152 #[tokio::test]
153 async fn test_file_input_new() {
154 let config = FileInputConfig {
155 path: "test.txt".to_string(),
156 close_on_eof: Some(true),
157 start_from_beginning: Some(true),
158 };
159 let input = FileInput::new(config);
160 assert!(input.is_ok());
161 }
162
163 #[tokio::test]
164 async fn test_file_input_connect_file_not_exists() {
165 let config = FileInputConfig {
166 path: "non_existent_file.txt".to_string(),
167 close_on_eof: Some(true),
168 start_from_beginning: Some(true),
169 };
170 let input = FileInput::new(config).unwrap();
171 let result = input.connect().await;
172 assert!(result.is_err());
173 match result {
174 Err(Error::Connection(_)) => {} _ => panic!("Expected Connection error"),
176 }
177 }
178
179 #[tokio::test]
180 async fn test_file_input_read_without_connect() {
181 let config = FileInputConfig {
182 path: "test.txt".to_string(),
183 close_on_eof: Some(true),
184 start_from_beginning: Some(true),
185 };
186 let input = FileInput::new(config).unwrap();
187 let result = input.read().await;
188 assert!(result.is_err());
189 match result {
190 Err(Error::Connection(_)) => {} _ => panic!("Expected Connection error"),
192 }
193 }
194
195 #[tokio::test]
196 async fn test_file_input_read_from_beginning() {
197 let dir = tempdir().unwrap();
199 let file_path = dir.path().join("test.txt");
200 let file_path_str = file_path.to_str().unwrap();
201
202 let mut file = File::create(&file_path).unwrap();
204 writeln!(file, "line1").unwrap();
205 writeln!(file, "line2").unwrap();
206 writeln!(file, "line3").unwrap();
207 file.flush().unwrap();
208
209 let config = FileInputConfig {
211 path: file_path_str.to_string(),
212 close_on_eof: Some(true),
213 start_from_beginning: Some(true),
214 };
215 let input = FileInput::new(config).unwrap();
216
217 assert!(input.connect().await.is_ok());
219
220 let (batch, ack) = input.read().await.unwrap();
222 assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
223 ack.ack().await;
224
225 let (batch, ack) = input.read().await.unwrap();
227 assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
228 ack.ack().await;
229
230 let (batch, ack) = input.read().await.unwrap();
232 assert_eq!(batch.as_string().unwrap(), vec!["line3"]);
233 ack.ack().await;
234
235 let result = input.read().await;
237 assert!(matches!(result, Err(Error::EOF)));
238
239 assert!(input.close().await.is_ok());
241 }
242
243 #[tokio::test]
244 async fn test_file_input_read_from_end() {
245 let dir = tempdir().unwrap();
247 let file_path = dir.path().join("test.txt");
248 let file_path_str = file_path.to_str().unwrap();
249
250 let mut file = File::create(&file_path).unwrap();
252 writeln!(file, "line1").unwrap();
253 writeln!(file, "line2").unwrap();
254 file.flush().unwrap();
255
256 let config = FileInputConfig {
258 path: file_path_str.to_string(),
259 close_on_eof: Some(true),
260 start_from_beginning: Some(false),
261 };
262 let input = FileInput::new(config).unwrap();
263
264 assert!(input.connect().await.is_ok());
266
267 let result = input.read().await;
269 assert!(matches!(result, Err(Error::EOF)));
270
271 let mut file = std::fs::OpenOptions::new()
273 .write(true)
274 .append(true)
275 .open(&file_path)
276 .unwrap();
277 writeln!(file, "line3").unwrap();
278 file.flush().unwrap();
279
280 assert!(input.close().await.is_ok());
282 assert!(input.connect().await.is_ok());
283
284 let result = input.read().await;
286 assert!(matches!(result, Err(Error::EOF)));
287
288 assert!(input.close().await.is_ok());
290 }
291
292 #[tokio::test]
293 async fn test_file_input_close_on_eof_false() {
294 let dir = tempdir().unwrap();
296 let file_path = dir.path().join("test.txt");
297 let file_path_str = file_path.to_str().unwrap();
298
299 let mut file = File::create(&file_path).unwrap();
301 writeln!(file, "line1").unwrap();
302 file.flush().unwrap();
303
304 let config = FileInputConfig {
306 path: file_path_str.to_string(),
307 close_on_eof: Some(false),
308 start_from_beginning: Some(true),
309 };
310 let input = FileInput::new(config).unwrap();
311
312 assert!(input.connect().await.is_ok());
314
315 let (batch, ack) = input.read().await.unwrap();
317 assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
318 ack.ack().await;
319
320 let result = input.read().await;
322 assert!(matches!(result, Err(Error::Process(_))));
323
324 let mut file = std::fs::OpenOptions::new()
326 .write(true)
327 .append(true)
328 .open(&file_path)
329 .unwrap();
330 writeln!(file, "line2").unwrap();
331 file.flush().unwrap();
332
333 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
335 let (batch, ack) = input.read().await.unwrap();
336 assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
337 ack.ack().await;
338
339 assert!(input.close().await.is_ok());
341 }
342}