arkflow_plugin/input/
file.rs

1//! File Input Component
2//!
3//! Read data from the file system
4
5use std::fs::File;
6use std::io::{BufRead, BufReader, Seek, SeekFrom};
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use tokio::sync::Mutex;
11
12use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder, NoopAck};
13use arkflow_core::{Error, MessageBatch};
14use async_trait::async_trait;
15use serde::{Deserialize, Serialize};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FileInputConfig {
19    /// Input file path
20    pub path: String,
21    /// Whether to close after reading is complete
22    pub close_on_eof: Option<bool>,
23    /// Whether to start reading from the beginning of the file (otherwise start from the end)
24    pub start_from_beginning: Option<bool>,
25}
26
27/// File input component
28pub struct FileInput {
29    config: FileInputConfig,
30    reader: Arc<Mutex<Option<BufReader<File>>>>,
31    connected: AtomicBool,
32    eof_reached: AtomicBool,
33}
34
35impl FileInput {
36    /// Create a new file input component
37    pub fn new(config: FileInputConfig) -> Result<Self, Error> {
38        Ok(Self {
39            config,
40            reader: Arc::new(Mutex::new(None)),
41            connected: AtomicBool::new(false),
42            eof_reached: AtomicBool::new(false),
43        })
44    }
45}
46
47#[async_trait]
48impl Input for FileInput {
49    async fn connect(&self) -> Result<(), Error> {
50        let path = Path::new(&self.config.path);
51
52        // Open the file
53        let file = File::open(path).map_err(|e| {
54            Error::Connection(format!("Unable to open file {}: {}", self.config.path, e))
55        })?;
56
57        let mut reader = BufReader::new(file);
58
59        // If it is not read from the beginning, it moves to the end of the file
60        if !self.config.start_from_beginning.unwrap_or(true) {
61            reader
62                .seek(SeekFrom::End(0))
63                .map_err(|e| Error::Process(format!("Unable to seek to end of file: {}", e)))?;
64        }
65
66        let reader_arc = self.reader.clone();
67        reader_arc.lock().await.replace(reader);
68        self.connected.store(true, Ordering::SeqCst);
69        self.eof_reached.store(false, Ordering::SeqCst);
70        Ok(())
71    }
72
73    async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
74        let reader_arc = self.reader.clone();
75        let mut reader_mutex = reader_arc.lock().await;
76        if !self.connected.load(Ordering::SeqCst) || reader_mutex.is_none() {
77            return Err(Error::Connection("The input is not connected".to_string()));
78        }
79
80        if self.eof_reached.load(Ordering::SeqCst) && self.config.close_on_eof.unwrap_or(true) {
81            return Err(Error::EOF);
82        }
83
84        let bytes_read;
85        let mut line = String::new();
86        {
87            let reader_mutex = reader_mutex.as_mut();
88            if reader_mutex.is_none() {
89                return Err(Error::Connection("The input is not connected".to_string()));
90            }
91
92            let reader = reader_mutex.unwrap();
93            bytes_read = reader.read_line(&mut line).map_err(Error::Io)?;
94        }
95
96        if bytes_read == 0 {
97            self.eof_reached.store(true, Ordering::SeqCst);
98
99            if self.config.close_on_eof.unwrap_or(true) {
100                return Err(Error::EOF);
101            }
102
103            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
104            return Err(Error::Process("Wait for new data".to_string()));
105        }
106
107        // Remove the trailing line break
108        if line.ends_with('\n') {
109            line.pop();
110            if line.ends_with('\r') {
111                line.pop();
112            }
113        }
114
115        Ok((MessageBatch::from_string(&line), Arc::new(NoopAck)))
116    }
117
118    async fn close(&self) -> Result<(), Error> {
119        self.connected.store(false, Ordering::SeqCst);
120        let reader_arc = self.reader.clone();
121        let mut reader_mutex = reader_arc.lock().await;
122        *reader_mutex = None;
123        Ok(())
124    }
125}
126
127pub(crate) struct FileInputBuilder;
128impl InputBuilder for FileInputBuilder {
129    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error> {
130        if config.is_none() {
131            return Err(Error::Config(
132                "File input configuration is missing".to_string(),
133            ));
134        }
135
136        let config: FileInputConfig = serde_json::from_value(config.clone().unwrap())?;
137        Ok(Arc::new(FileInput::new(config)?))
138    }
139}
140
141pub fn init() {
142    register_input_builder("file", Arc::new(FileInputBuilder));
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use std::fs::File;
149    use std::io::Write;
150    use tempfile::tempdir;
151
152    #[tokio::test]
153    async fn test_file_input_new() {
154        let config = FileInputConfig {
155            path: "test.txt".to_string(),
156            close_on_eof: Some(true),
157            start_from_beginning: Some(true),
158        };
159        let input = FileInput::new(config);
160        assert!(input.is_ok());
161    }
162
163    #[tokio::test]
164    async fn test_file_input_connect_file_not_exists() {
165        let config = FileInputConfig {
166            path: "non_existent_file.txt".to_string(),
167            close_on_eof: Some(true),
168            start_from_beginning: Some(true),
169        };
170        let input = FileInput::new(config).unwrap();
171        let result = input.connect().await;
172        assert!(result.is_err());
173        match result {
174            Err(Error::Connection(_)) => {} // Expected error type
175            _ => panic!("Expected Connection error"),
176        }
177    }
178
179    #[tokio::test]
180    async fn test_file_input_read_without_connect() {
181        let config = FileInputConfig {
182            path: "test.txt".to_string(),
183            close_on_eof: Some(true),
184            start_from_beginning: Some(true),
185        };
186        let input = FileInput::new(config).unwrap();
187        let result = input.read().await;
188        assert!(result.is_err());
189        match result {
190            Err(Error::Connection(_)) => {} // Expected error type
191            _ => panic!("Expected Connection error"),
192        }
193    }
194
195    #[tokio::test]
196    async fn test_file_input_read_from_beginning() {
197        // Create temporary directory and file
198        let dir = tempdir().unwrap();
199        let file_path = dir.path().join("test.txt");
200        let file_path_str = file_path.to_str().unwrap();
201
202        // Write test data
203        let mut file = File::create(&file_path).unwrap();
204        writeln!(file, "line1").unwrap();
205        writeln!(file, "line2").unwrap();
206        writeln!(file, "line3").unwrap();
207        file.flush().unwrap();
208
209        // Configure to read from the beginning of the file
210        let config = FileInputConfig {
211            path: file_path_str.to_string(),
212            close_on_eof: Some(true),
213            start_from_beginning: Some(true),
214        };
215        let input = FileInput::new(config).unwrap();
216
217        // Connect and read
218        assert!(input.connect().await.is_ok());
219
220        // Read the first line
221        let (batch, ack) = input.read().await.unwrap();
222        assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
223        ack.ack().await;
224
225        // Read the second line
226        let (batch, ack) = input.read().await.unwrap();
227        assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
228        ack.ack().await;
229
230        // Read the third line
231        let (batch, ack) = input.read().await.unwrap();
232        assert_eq!(batch.as_string().unwrap(), vec!["line3"]);
233        ack.ack().await;
234
235        // End of file, should return Done error
236        let result = input.read().await;
237        assert!(matches!(result, Err(Error::EOF)));
238
239        // Close the connection
240        assert!(input.close().await.is_ok());
241    }
242
243    #[tokio::test]
244    async fn test_file_input_read_from_end() {
245        // Create temporary directory and file
246        let dir = tempdir().unwrap();
247        let file_path = dir.path().join("test.txt");
248        let file_path_str = file_path.to_str().unwrap();
249
250        // Write test data
251        let mut file = File::create(&file_path).unwrap();
252        writeln!(file, "line1").unwrap();
253        writeln!(file, "line2").unwrap();
254        file.flush().unwrap();
255
256        // Configure to read from the end of the file
257        let config = FileInputConfig {
258            path: file_path_str.to_string(),
259            close_on_eof: Some(true),
260            start_from_beginning: Some(false),
261        };
262        let input = FileInput::new(config).unwrap();
263
264        // Connect
265        assert!(input.connect().await.is_ok());
266
267        // Reading from the end, should have no data, return Done error
268        let result = input.read().await;
269        assert!(matches!(result, Err(Error::EOF)));
270
271        // Append new data
272        let mut file = std::fs::OpenOptions::new()
273            .write(true)
274            .append(true)
275            .open(&file_path)
276            .unwrap();
277        writeln!(file, "line3").unwrap();
278        file.flush().unwrap();
279
280        // Reconnect
281        assert!(input.close().await.is_ok());
282        assert!(input.connect().await.is_ok());
283
284        // Now should be able to read the newly added line
285        let result = input.read().await;
286        assert!(matches!(result, Err(Error::EOF)));
287
288        // Close the connection
289        assert!(input.close().await.is_ok());
290    }
291
292    #[tokio::test]
293    async fn test_file_input_close_on_eof_false() {
294        // Create temporary directory and file
295        let dir = tempdir().unwrap();
296        let file_path = dir.path().join("test.txt");
297        let file_path_str = file_path.to_str().unwrap();
298
299        // Write test data
300        let mut file = File::create(&file_path).unwrap();
301        writeln!(file, "line1").unwrap();
302        file.flush().unwrap();
303
304        // Configure not to close after reading is complete
305        let config = FileInputConfig {
306            path: file_path_str.to_string(),
307            close_on_eof: Some(false),
308            start_from_beginning: Some(true),
309        };
310        let input = FileInput::new(config).unwrap();
311
312        // Connect and read
313        assert!(input.connect().await.is_ok());
314
315        // Read the first line
316        let (batch, ack) = input.read().await.unwrap();
317        assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
318        ack.ack().await;
319
320        // End of file, but don't close, should return Processing error
321        let result = input.read().await;
322        assert!(matches!(result, Err(Error::Process(_))));
323
324        // Append new data
325        let mut file = std::fs::OpenOptions::new()
326            .write(true)
327            .append(true)
328            .open(&file_path)
329            .unwrap();
330        writeln!(file, "line2").unwrap();
331        file.flush().unwrap();
332
333        // Should be able to read the newly added line
334        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
335        let (batch, ack) = input.read().await.unwrap();
336        assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
337        ack.ack().await;
338
339        // Close the connection
340        assert!(input.close().await.is_ok());
341    }
342}