#![allow(clippy::needless_return)]
use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::Message;
use crate::{new_callback_chan, CallbackChan, Status};
use crate::{Closer, Error, Input};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use flume::{bounded, Receiver, Sender};
use serde::Deserialize;
use serde_yaml::Value;
use std::fs::{self, read_to_string, File};
use std::io::{prelude::*, BufReader, SeekFrom};
use std::path::Path;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, trace, warn};
const MAX_FILE_NOT_FOUND_RETRIES: u32 = 5;
const FILE_NOT_FOUND_RETRY_DELAY_SECS: u64 = 5;
#[derive(Deserialize, Default)]
enum CodecType {
#[default]
Lines,
ToEnd,
Tail,
}
enum ReaderType {
Lines(std::io::Lines<BufReader<File>>),
ToEnd(File),
Tail(String, u64, Sender<u64>),
}
#[derive(Deserialize, Default)]
struct FileReaderConfig {
filename: String,
codec: CodecType,
position_filename: Option<String>,
}
pub struct FileReader {
receiver: Receiver<Result<(Message, Option<CallbackChan>), Error>>,
}
async fn read_file(
reader: ReaderType,
sender: Sender<Result<(Message, Option<CallbackChan>), Error>>,
) -> Result<(), Error> {
match reader {
ReaderType::Lines(li) => {
for line in li {
match line {
Ok(line) => {
sender
.send_async(Ok((
Message {
bytes: line.into_bytes(),
..Default::default()
},
None,
)))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
}
Err(e) => {
sender
.send_async(Err(Error::InputError(format!("{e}"))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
return Ok(());
}
}
}
sender
.send_async(Err(Error::EndOfInput))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
return Ok(());
}
ReaderType::ToEnd(mut f) => {
let mut contents = String::new();
match f.read_to_string(&mut contents) {
Ok(_) => {}
Err(e) => {
sender
.send_async(Err(Error::InputError(format!("{e}"))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
return Ok(());
}
};
sender
.send_async(Ok((
Message {
bytes: contents.into_bytes(),
..Default::default()
},
None,
)))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
sender
.send_async(Err(Error::EndOfInput))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
return Ok(());
}
ReaderType::Tail(filename, pos, sync) => {
let mut file = File::open(&filename)
.map_err(|e| Error::InputError(format!("{}: {}", filename, e)))?;
let mut current_pos = pos;
let mut file_not_found_count: u32 = 0;
let metadata = file
.metadata()
.map_err(|e| Error::InputError(format!("{}: {}", filename, e)))?;
if metadata.len() < pos {
current_pos = 0;
sync.send_async(current_pos)
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
};
let _ = file
.seek(SeekFrom::Start(current_pos))
.map_err(|e| Error::InputError(format!("{}: {}", filename, e)))?;
let mut reader = BufReader::new(file);
loop {
if !Path::new(&filename).exists() {
file_not_found_count += 1;
warn!(
filename = filename,
retry_count = file_not_found_count,
max_retries = MAX_FILE_NOT_FOUND_RETRIES,
"File no longer exists, waiting for it to reappear"
);
if file_not_found_count >= MAX_FILE_NOT_FOUND_RETRIES {
error!(
filename = filename,
"File not found after max retries, stopping tail"
);
sender
.send_async(Err(Error::InputError(format!(
"File no longer exists after {} retries: {}",
MAX_FILE_NOT_FOUND_RETRIES, filename
))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
return Ok(());
}
sleep(Duration::from_secs(FILE_NOT_FOUND_RETRY_DELAY_SECS)).await;
continue;
}
file_not_found_count = 0;
let mut line = String::new();
let len = reader
.read_line(&mut line)
.map_err(|e| Error::InputError(format!("{}: {}", filename.clone(), e)))?;
current_pos += len as u64;
let s = sync.clone();
if len == 0 {
sleep(Duration::from_secs(2)).await;
continue;
};
if line.ends_with('\n') {
let _ = line.pop();
if line.ends_with('\r') {
let _ = line.pop();
}
};
let (tx, rx) = new_callback_chan();
tokio::spawn(async move {
if let Ok(Status::Processed) = rx.await {
if let Err(e) = s.send_async(current_pos).await {
debug!(error = %e, "Failed to send position update, channel may be closed");
}
};
});
sender
.send_async(Ok((
Message {
bytes: line.into_bytes(),
..Default::default()
},
Some(tx),
)))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{e}")))?;
}
}
}
}
impl Closer for FileReader {}
#[async_trait]
impl Input for FileReader {
async fn read(&mut self) -> Result<(Message, Option<CallbackChan>), Error> {
match self.receiver.recv_async().await {
Ok(i) => i,
Err(_e) => Err(Error::EndOfInput),
}
}
}
#[fiddler_registration_func]
fn create_file(conf: Value) -> Result<ExecutionType, Error> {
let c: FileReaderConfig = serde_yaml::from_value(conf.clone())?;
if let CodecType::Tail = c.codec {
if c.position_filename.is_none() {
return Err(Error::ConfigFailedValidation(
"position_filename must be included with tail type".into(),
));
}
}
let mut file = File::open(c.filename.clone())
.map_err(|e| Error::InputError(format!("{}: {}", c.filename.clone(), e)))?;
let inner = match c.codec {
CodecType::Lines => {
let reader = BufReader::new(file);
ReaderType::Lines(reader.lines())
}
CodecType::ToEnd => ReaderType::ToEnd(file),
CodecType::Tail => {
let (sync_sender, receiver) = bounded(0);
let position_file_name = c
.position_filename
.clone()
.ok_or(Error::InputError("position file must be included".into()))?;
let position = read_to_string(position_file_name.clone()).unwrap_or("0".into());
let mut current_position = position.parse::<u64>().unwrap_or(0);
let metadata = file
.metadata()
.map_err(|e| Error::InputError(format!("{}: {}", c.filename.clone(), e)))?;
if metadata.len() < current_position {
current_position = 0;
fs::write(position_file_name.clone(), format!("{current_position}"))
.map_err(|e| Error::InputError(format!("{}: {}", c.filename, e)))?;
};
let _ = file
.seek(SeekFrom::Start(current_position))
.map_err(|e| Error::InputError(format!("{}: {}", c.filename.clone(), e)))?;
let passed_position_reader = current_position;
let filename = c.filename.clone();
let (timer_tx, timer) = bounded(0);
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
if let Err(_e) = timer_tx.send_async(()).await {
debug!("timer channel closed, exiting");
return;
};
}
});
tokio::spawn(async move {
let mut last_known_position = current_position;
loop {
tokio::select! {
Ok(_) = timer.recv_async() => {
if current_position != last_known_position {
trace!("writing position to disk");
if let Err(e) = fs::write(
position_file_name.clone(),
format!("{current_position}"),
) {
error!(error = %e, filename = %filename, "Failed to write position file");
} else {
last_known_position = current_position;
}
}
},
m = receiver.recv_async() => {
match m {
Ok(msg) => {
if msg == 0 {
current_position = 0;
trace!("writing position to disk");
if let Err(e) = fs::write(
position_file_name.clone(),
format!("{current_position}"),
) {
error!(error = %e, filename = %filename, "Failed to write position file");
} else {
last_known_position = current_position;
}
};
if msg > current_position {
current_position = msg;
};
}
Err(e) => {
error!(error = %e, "closing file state handler");
return;
}
}
},
}
}
});
ReaderType::Tail(c.filename.clone(), passed_position_reader, sync_sender)
}
};
let (sender, receiver) = bounded(0);
tokio::task::spawn_blocking(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("file_reader")
.worker_threads(1)
.build()
.map_err(|e| Error::ExecutionError(format!("Failed to create tokio runtime: {e}")))?;
runtime.block_on(read_file(inner, sender))
});
Ok(ExecutionType::Input(Box::new(FileReader { receiver })))
}
pub(super) fn register_file() -> Result<(), Error> {
let config = "type: object
properties:
filename:
type: string
codec:
type: string
position_file:
type: string";
let conf_spec = ConfigSpec::from_schema(config)?;
register_plugin("file".into(), ItemType::Input, conf_spec, create_file)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn register_plugin() {
register_file().unwrap()
}
}