use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder, NoopAck};
use arkflow_core::{Error, MessageBatch};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileInputConfig {
pub path: String,
pub close_on_eof: Option<bool>,
pub start_from_beginning: Option<bool>,
}
pub struct FileInput {
config: FileInputConfig,
reader: Arc<Mutex<Option<BufReader<File>>>>,
connected: AtomicBool,
eof_reached: AtomicBool,
}
impl FileInput {
pub fn new(config: FileInputConfig) -> Result<Self, Error> {
Ok(Self {
config,
reader: Arc::new(Mutex::new(None)),
connected: AtomicBool::new(false),
eof_reached: AtomicBool::new(false),
})
}
}
#[async_trait]
impl Input for FileInput {
async fn connect(&self) -> Result<(), Error> {
let path = Path::new(&self.config.path);
let file = File::open(path).map_err(|e| {
Error::Connection(format!("Unable to open file {}: {}", self.config.path, e))
})?;
let mut reader = BufReader::new(file);
if !self.config.start_from_beginning.unwrap_or(true) {
reader
.seek(SeekFrom::End(0))
.map_err(|e| Error::Process(format!("Unable to seek to end of file: {}", e)))?;
}
let reader_arc = self.reader.clone();
reader_arc.lock().await.replace(reader);
self.connected.store(true, Ordering::SeqCst);
self.eof_reached.store(false, Ordering::SeqCst);
Ok(())
}
async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
let reader_arc = self.reader.clone();
let mut reader_mutex = reader_arc.lock().await;
if !self.connected.load(Ordering::SeqCst) || reader_mutex.is_none() {
return Err(Error::Connection("The input is not connected".to_string()));
}
if self.eof_reached.load(Ordering::SeqCst) && self.config.close_on_eof.unwrap_or(true) {
return Err(Error::EOF);
}
let bytes_read;
let mut line = String::new();
{
let reader_mutex = reader_mutex.as_mut();
if reader_mutex.is_none() {
return Err(Error::Connection("The input is not connected".to_string()));
}
let reader = reader_mutex.unwrap();
bytes_read = reader.read_line(&mut line).map_err(Error::Io)?;
}
if bytes_read == 0 {
self.eof_reached.store(true, Ordering::SeqCst);
if self.config.close_on_eof.unwrap_or(true) {
return Err(Error::EOF);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
return Err(Error::Process("Wait for new data".to_string()));
}
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
Ok((MessageBatch::from_string(&line), Arc::new(NoopAck)))
}
async fn close(&self) -> Result<(), Error> {
self.connected.store(false, Ordering::SeqCst);
let reader_arc = self.reader.clone();
let mut reader_mutex = reader_arc.lock().await;
*reader_mutex = None;
Ok(())
}
}
pub(crate) struct FileInputBuilder;
impl InputBuilder for FileInputBuilder {
fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
"File input configuration is missing".to_string(),
));
}
let config: FileInputConfig = serde_json::from_value(config.clone().unwrap())?;
Ok(Arc::new(FileInput::new(config)?))
}
}
pub fn init() {
register_input_builder("file", Arc::new(FileInputBuilder));
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::tempdir;
#[tokio::test]
async fn test_file_input_new() {
let config = FileInputConfig {
path: "test.txt".to_string(),
close_on_eof: Some(true),
start_from_beginning: Some(true),
};
let input = FileInput::new(config);
assert!(input.is_ok());
}
#[tokio::test]
async fn test_file_input_connect_file_not_exists() {
let config = FileInputConfig {
path: "non_existent_file.txt".to_string(),
close_on_eof: Some(true),
start_from_beginning: Some(true),
};
let input = FileInput::new(config).unwrap();
let result = input.connect().await;
assert!(result.is_err());
match result {
Err(Error::Connection(_)) => {} _ => panic!("Expected Connection error"),
}
}
#[tokio::test]
async fn test_file_input_read_without_connect() {
let config = FileInputConfig {
path: "test.txt".to_string(),
close_on_eof: Some(true),
start_from_beginning: Some(true),
};
let input = FileInput::new(config).unwrap();
let result = input.read().await;
assert!(result.is_err());
match result {
Err(Error::Connection(_)) => {} _ => panic!("Expected Connection error"),
}
}
#[tokio::test]
async fn test_file_input_read_from_beginning() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let file_path_str = file_path.to_str().unwrap();
let mut file = File::create(&file_path).unwrap();
writeln!(file, "line1").unwrap();
writeln!(file, "line2").unwrap();
writeln!(file, "line3").unwrap();
file.flush().unwrap();
let config = FileInputConfig {
path: file_path_str.to_string(),
close_on_eof: Some(true),
start_from_beginning: Some(true),
};
let input = FileInput::new(config).unwrap();
assert!(input.connect().await.is_ok());
let (batch, ack) = input.read().await.unwrap();
assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
ack.ack().await;
let (batch, ack) = input.read().await.unwrap();
assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
ack.ack().await;
let (batch, ack) = input.read().await.unwrap();
assert_eq!(batch.as_string().unwrap(), vec!["line3"]);
ack.ack().await;
let result = input.read().await;
assert!(matches!(result, Err(Error::EOF)));
assert!(input.close().await.is_ok());
}
#[tokio::test]
async fn test_file_input_read_from_end() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let file_path_str = file_path.to_str().unwrap();
let mut file = File::create(&file_path).unwrap();
writeln!(file, "line1").unwrap();
writeln!(file, "line2").unwrap();
file.flush().unwrap();
let config = FileInputConfig {
path: file_path_str.to_string(),
close_on_eof: Some(true),
start_from_beginning: Some(false),
};
let input = FileInput::new(config).unwrap();
assert!(input.connect().await.is_ok());
let result = input.read().await;
assert!(matches!(result, Err(Error::EOF)));
let mut file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.open(&file_path)
.unwrap();
writeln!(file, "line3").unwrap();
file.flush().unwrap();
assert!(input.close().await.is_ok());
assert!(input.connect().await.is_ok());
let result = input.read().await;
assert!(matches!(result, Err(Error::EOF)));
assert!(input.close().await.is_ok());
}
#[tokio::test]
async fn test_file_input_close_on_eof_false() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let file_path_str = file_path.to_str().unwrap();
let mut file = File::create(&file_path).unwrap();
writeln!(file, "line1").unwrap();
file.flush().unwrap();
let config = FileInputConfig {
path: file_path_str.to_string(),
close_on_eof: Some(false),
start_from_beginning: Some(true),
};
let input = FileInput::new(config).unwrap();
assert!(input.connect().await.is_ok());
let (batch, ack) = input.read().await.unwrap();
assert_eq!(batch.as_string().unwrap(), vec!["line1"]);
ack.ack().await;
let result = input.read().await;
assert!(matches!(result, Err(Error::Process(_))));
let mut file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.open(&file_path)
.unwrap();
writeln!(file, "line2").unwrap();
file.flush().unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let (batch, ack) = input.read().await.unwrap();
assert_eq!(batch.as_string().unwrap(), vec!["line2"]);
ack.ack().await;
assert!(input.close().await.is_ok());
}
}