use std::{
fmt::{self, Display},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use backend::start_backend;
use log::{error, info};
use thiserror::Error;
use tokio::{
select,
sync::{mpsc, Notify},
};
mod backend;
#[cfg(all(feature = "inotify", target_family = "unix"))]
mod inotify;
pub struct FileWatcherConfig<T, E> {
pub log_name: String,
pub file: PathBuf,
pub parser: Arc<dyn Fn(Vec<u8>) -> Result<T, E> + Send + Sync>,
pub retry_interval: Duration,
}
#[derive(Error, Debug)]
enum FileWatcherError<E: Display> {
#[error("{0}")]
Io(#[from] std::io::Error),
#[cfg(feature = "notify")]
#[error("{0}")]
Notify(#[from] notify::Error),
#[error("{0}")]
Parse(E),
}
pub(crate) struct WatcherContext {
pub(crate) file: PathBuf,
pub(crate) log_name: String,
pub(crate) retry_interval: Duration,
pub(crate) notify: Arc<Notify>,
}
pub enum Infallible {}
impl fmt::Display for Infallible {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
unreachable!()
}
}
impl FileWatcherConfig<Vec<u8>, Infallible> {
pub fn new(file: impl AsRef<Path>, log_name: impl AsRef<str>) -> Self {
Self {
file: file.as_ref().to_path_buf(),
log_name: log_name.as_ref().to_string(),
parser: Arc::new(|x| Ok(x)),
retry_interval: Duration::from_secs(1),
}
}
}
impl<T: Send + 'static, E: Display + Send + 'static> FileWatcherConfig<T, E> {
pub fn with_parser<T2: Send + 'static, E2: Display + Send + 'static>(
self,
func: impl Fn(Vec<u8>) -> Result<T2, E2> + Send + Sync + 'static,
) -> FileWatcherConfig<T2, E2> {
FileWatcherConfig {
log_name: self.log_name,
file: self.file,
parser: Arc::new(func),
retry_interval: self.retry_interval,
}
}
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.retry_interval = retry_interval;
self
}
pub fn start(self) -> mpsc::Receiver<T> {
let (sender, receiver) = mpsc::channel(3);
tokio::spawn(self.run(sender));
receiver
}
async fn run(self, sender: mpsc::Sender<T>) {
let target = loop {
match self.read_target().await {
Ok(x) => break x,
Err(e) => {
error!(
"failed to read initial {}: {e} @ '{}', retrying in {:.1} second(s)",
self.log_name,
self.file.display(),
self.retry_interval.as_secs_f64(),
);
tokio::time::sleep(self.retry_interval).await;
}
}
};
if sender.send(target).await.is_err() {
return;
}
let mut file = self.file.clone();
if file.is_relative() {
if let Ok(cwd) = std::env::current_dir() {
file = cwd.join(file);
}
}
let notify = Arc::new(Notify::new());
let watcher_context = WatcherContext {
file,
log_name: self.log_name.clone(),
retry_interval: self.retry_interval,
notify: notify.clone(),
};
start_backend::<E>(watcher_context).await;
loop {
select! {
_ = notify.notified() => {
let target = loop {
match self.read_target().await {
Ok(x) => break x,
Err(e) => {
error!("failed to read {} update: {e} @ {}, retrying in {:.1} second(s)", self.log_name, self.file.display(), self.retry_interval.as_secs_f64());
tokio::time::sleep(self.retry_interval).await;
let notify = notify.notified();
futures::pin_mut!(notify);
notify.enable();
}
}
};
if sender.send(target).await.is_err() {
return;
}
},
_ = sender.closed() => {
return;
}
}
}
}
async fn read_target(&self) -> Result<T, FileWatcherError<E>> {
info!(
"reading updated {} '{}'",
self.log_name,
self.file.display()
);
let raw = tokio::fs::read(&self.file).await?;
(self.parser)(raw).map_err(FileWatcherError::Parse)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_file_zone() {
env_logger::Builder::new()
.parse_env(env_logger::Env::default().default_filter_or("info"))
.init();
let mut receiver = FileWatcherConfig::new("./test.yaml", "config").start();
while let Some(_update) = receiver.recv().await {
println!("updated!");
}
}
}