1use std::{
2 fmt::{self, Display},
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use backend::start_backend;
9use log::{error, info};
10use thiserror::Error;
11use tokio::{
12 select,
13 sync::{mpsc, Notify},
14};
15
16mod backend;
17#[cfg(all(feature = "inotify", target_family = "unix"))]
18mod inotify;
19
20pub struct FileWatcherConfig<T, E> {
24 pub log_name: String,
26 pub file: PathBuf,
28 pub parser: Arc<dyn Fn(Vec<u8>) -> Result<T, E> + Send + Sync>,
30 pub retry_interval: Duration,
32}
33
34#[derive(Error, Debug)]
35enum FileWatcherError<E: Display> {
36 #[error("{0}")]
37 Io(#[from] std::io::Error),
38 #[cfg(feature = "notify")]
39 #[error("{0}")]
40 Notify(#[from] notify::Error),
41 #[error("{0}")]
42 Parse(E),
43}
44
45pub(crate) struct WatcherContext {
46 pub(crate) file: PathBuf,
47 pub(crate) log_name: String,
48 pub(crate) retry_interval: Duration,
49 pub(crate) notify: Arc<Notify>,
50}
51
52pub enum Infallible {}
54
55impl fmt::Display for Infallible {
56 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
57 unreachable!()
58 }
59}
60
61impl FileWatcherConfig<Vec<u8>, Infallible> {
62 pub fn new(file: impl AsRef<Path>, log_name: impl AsRef<str>) -> Self {
63 Self {
64 file: file.as_ref().to_path_buf(),
65 log_name: log_name.as_ref().to_string(),
66 parser: Arc::new(|x| Ok(x)),
67 retry_interval: Duration::from_secs(1),
68 }
69 }
70}
71
72impl<T: Send + 'static, E: Display + Send + 'static> FileWatcherConfig<T, E> {
73 pub fn with_parser<T2: Send + 'static, E2: Display + Send + 'static>(
75 self,
76 func: impl Fn(Vec<u8>) -> Result<T2, E2> + Send + Sync + 'static,
77 ) -> FileWatcherConfig<T2, E2> {
78 FileWatcherConfig {
79 log_name: self.log_name,
80 file: self.file,
81 parser: Arc::new(func),
82 retry_interval: self.retry_interval,
83 }
84 }
85
86 pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
88 self.retry_interval = retry_interval;
89 self
90 }
91
92 pub fn start(self) -> mpsc::Receiver<T> {
94 let (sender, receiver) = mpsc::channel(3);
95 tokio::spawn(self.run(sender));
96 receiver
97 }
98
99 async fn run(self, sender: mpsc::Sender<T>) {
100 let target = loop {
101 match self.read_target().await {
102 Ok(x) => break x,
103 Err(e) => {
104 error!(
105 "failed to read initial {}: {e} @ '{}', retrying in {:.1} second(s)",
106 self.log_name,
107 self.file.display(),
108 self.retry_interval.as_secs_f64(),
109 );
110 tokio::time::sleep(self.retry_interval).await;
111 }
112 }
113 };
114 if sender.send(target).await.is_err() {
115 return;
116 }
117 let mut file = self.file.clone();
118 if file.is_relative() {
119 if let Ok(cwd) = std::env::current_dir() {
120 file = cwd.join(file);
121 }
122 }
123 let notify = Arc::new(Notify::new());
124 let watcher_context = WatcherContext {
125 file,
126 log_name: self.log_name.clone(),
127 retry_interval: self.retry_interval,
128 notify: notify.clone(),
129 };
130 start_backend::<E>(watcher_context).await;
131 loop {
132 select! {
133 _ = notify.notified() => {
134 let target = loop {
135 match self.read_target().await {
136 Ok(x) => break x,
137 Err(e) => {
138 error!("failed to read {} update: {e} @ {}, retrying in {:.1} second(s)", self.log_name, self.file.display(), self.retry_interval.as_secs_f64());
139 tokio::time::sleep(self.retry_interval).await;
140 let notify = notify.notified();
142 futures::pin_mut!(notify);
143 notify.enable();
144 }
145 }
146 };
147 if sender.send(target).await.is_err() {
148 return;
149 }
150 },
151 _ = sender.closed() => {
152 return;
153 }
154 }
155 }
156 }
157
158 async fn read_target(&self) -> Result<T, FileWatcherError<E>> {
159 info!(
160 "reading updated {} '{}'",
161 self.log_name,
162 self.file.display()
163 );
164 let raw = tokio::fs::read(&self.file).await?;
165 (self.parser)(raw).map_err(FileWatcherError::Parse)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[tokio::test]
174 async fn test_file_zone() {
175 env_logger::Builder::new()
176 .parse_env(env_logger::Env::default().default_filter_or("info"))
177 .init();
178 let mut receiver = FileWatcherConfig::new("./test.yaml", "config").start();
179 while let Some(_update) = receiver.recv().await {
180 println!("updated!");
181 }
182 }
183}