use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::input::{Ack, NoopAck};
use crate::{input::Input, Error, MessageBatch};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileInputConfig {
pub path: String,
pub close_on_eof: Option<bool>,
pub start_from_beginning: Option<bool>,
}
pub struct FileInput {
config: FileInputConfig,
reader: Arc<Mutex<Option<BufReader<File>>>>,
connected: AtomicBool,
eof_reached: AtomicBool,
}
impl FileInput {
pub fn new(config: &FileInputConfig) -> Result<Self, Error> {
Ok(Self {
config: config.clone(),
reader: Arc::new(Mutex::new(None)),
connected: AtomicBool::new(false),
eof_reached: AtomicBool::new(false),
})
}
}
#[async_trait]
impl Input for FileInput {
async fn connect(&self) -> Result<(), Error> {
let path = Path::new(&self.config.path);
let file = File::open(path)
.map_err(|e| Error::Connection(format!("无法打开文件 {}: {}", self.config.path, e)))?;
let mut reader = BufReader::new(file);
if !self.config.start_from_beginning.unwrap_or(true) {
reader
.seek(SeekFrom::End(0))
.map_err(|e| Error::Processing(format!("无法定位到文件末尾: {}", e)))?;
}
let reader_arc = self.reader.clone();
reader_arc.lock().await.replace(reader);
self.connected.store(true, Ordering::SeqCst);
self.eof_reached.store(false, Ordering::SeqCst);
Ok(())
}
async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
let reader_arc = self.reader.clone();
let mut reader_mutex = reader_arc.lock().await;
if !self.connected.load(Ordering::SeqCst) || reader_mutex.is_none() {
return Err(Error::Connection("输入未连接".to_string()));
}
if self.eof_reached.load(Ordering::SeqCst) && self.config.close_on_eof.unwrap_or(true) {
return Err(Error::Done);
}
let bytes_read;
let mut line = String::new();
{
let reader_mutex = reader_mutex.as_mut();
if reader_mutex.is_none() {
return Err(Error::Connection("输入未连接".to_string()));
}
let reader = reader_mutex.unwrap();
bytes_read = reader.read_line(&mut line).map_err(Error::Io)?;
}
if bytes_read == 0 {
self.eof_reached.store(true, Ordering::SeqCst);
if self.config.close_on_eof.unwrap_or(true) {
return Err(Error::Done);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
return Err(Error::Processing("等待新数据".to_string()));
}
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
Ok((MessageBatch::from_string(&line), Arc::new(NoopAck)))
}
async fn close(&self) -> Result<(), Error> {
self.connected.store(false, Ordering::SeqCst);
let reader_arc = self.reader.clone();
let mut reader_mutex = reader_arc.lock().await;
*reader_mutex = None;
Ok(())
}
}