really_notify/
lib.rs

1use std::{
2    fmt::{self, Display},
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use backend::start_backend;
9use log::{error, info};
10use thiserror::Error;
11use tokio::{
12    select,
13    sync::{mpsc, Notify},
14};
15
16mod backend;
17#[cfg(all(feature = "inotify", target_family = "unix"))]
18mod inotify;
19
20/// `really-notify` primary input.
21/// [`T`] is the target parse type, i.e. your serde-deserializable `Config` struct.
22/// [`E`] is the generic error type that your parser can fail with.
23pub struct FileWatcherConfig<T, E> {
24    /// Cosmetic, used for logs to be consistent with application terminology
25    pub log_name: String,
26    /// Path to the file you are interested in changes of. Do your worse with symlinks here.
27    pub file: PathBuf,
28    /// Parser function to transform a modified target file into our desired output. If you just want raw bytes, you can pass it through, or not set this at all.
29    pub parser: Arc<dyn Fn(Vec<u8>) -> Result<T, E> + Send + Sync>,
30    /// Defaults to one second, how often to attempt reparsing/error recovery.
31    pub retry_interval: Duration,
32}
33
34#[derive(Error, Debug)]
35enum FileWatcherError<E: Display> {
36    #[error("{0}")]
37    Io(#[from] std::io::Error),
38    #[cfg(feature = "notify")]
39    #[error("{0}")]
40    Notify(#[from] notify::Error),
41    #[error("{0}")]
42    Parse(E),
43}
44
45pub(crate) struct WatcherContext {
46    pub(crate) file: PathBuf,
47    pub(crate) log_name: String,
48    pub(crate) retry_interval: Duration,
49    pub(crate) notify: Arc<Notify>,
50}
51
52/// Impossible to fail converting a Vec<u8> to a Vec<u8>
53pub enum Infallible {}
54
55impl fmt::Display for Infallible {
56    fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
57        unreachable!()
58    }
59}
60
61impl FileWatcherConfig<Vec<u8>, Infallible> {
62    pub fn new(file: impl AsRef<Path>, log_name: impl AsRef<str>) -> Self {
63        Self {
64            file: file.as_ref().to_path_buf(),
65            log_name: log_name.as_ref().to_string(),
66            parser: Arc::new(|x| Ok(x)),
67            retry_interval: Duration::from_secs(1),
68        }
69    }
70}
71
72impl<T: Send + 'static, E: Display + Send + 'static> FileWatcherConfig<T, E> {
73    /// Set a new parser and adjust the FileWatcherConfig type parameters as needed.
74    pub fn with_parser<T2: Send + 'static, E2: Display + Send + 'static>(
75        self,
76        func: impl Fn(Vec<u8>) -> Result<T2, E2> + Send + Sync + 'static,
77    ) -> FileWatcherConfig<T2, E2> {
78        FileWatcherConfig {
79            log_name: self.log_name,
80            file: self.file,
81            parser: Arc::new(func),
82            retry_interval: self.retry_interval,
83        }
84    }
85
86    /// Set an alternative retry_interval
87    pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
88        self.retry_interval = retry_interval;
89        self
90    }
91
92    /// Run the watcher. Dropping/closing this receiver will cause an immediate cleanup.
93    pub fn start(self) -> mpsc::Receiver<T> {
94        let (sender, receiver) = mpsc::channel(3);
95        tokio::spawn(self.run(sender));
96        receiver
97    }
98
99    async fn run(self, sender: mpsc::Sender<T>) {
100        let target = loop {
101            match self.read_target().await {
102                Ok(x) => break x,
103                Err(e) => {
104                    error!(
105                        "failed to read initial {}: {e} @ '{}', retrying in {:.1} second(s)",
106                        self.log_name,
107                        self.file.display(),
108                        self.retry_interval.as_secs_f64(),
109                    );
110                    tokio::time::sleep(self.retry_interval).await;
111                }
112            }
113        };
114        if sender.send(target).await.is_err() {
115            return;
116        }
117        let mut file = self.file.clone();
118        if file.is_relative() {
119            if let Ok(cwd) = std::env::current_dir() {
120                file = cwd.join(file);
121            }
122        }
123        let notify = Arc::new(Notify::new());
124        let watcher_context = WatcherContext {
125            file,
126            log_name: self.log_name.clone(),
127            retry_interval: self.retry_interval,
128            notify: notify.clone(),
129        };
130        start_backend::<E>(watcher_context).await;
131        loop {
132            select! {
133                _ = notify.notified() => {
134                    let target = loop {
135                        match self.read_target().await {
136                            Ok(x) => break x,
137                            Err(e) => {
138                                error!("failed to read {} update: {e} @ {}, retrying in {:.1} second(s)", self.log_name, self.file.display(), self.retry_interval.as_secs_f64());
139                                tokio::time::sleep(self.retry_interval).await;
140                                // toss out any pending notification, since we will already try again
141                                let notify = notify.notified();
142                                futures::pin_mut!(notify);
143                                notify.enable();
144                            }
145                        }
146                    };
147                    if sender.send(target).await.is_err() {
148                        return;
149                    }
150                },
151                _ = sender.closed() => {
152                    return;
153                }
154            }
155        }
156    }
157
158    async fn read_target(&self) -> Result<T, FileWatcherError<E>> {
159        info!(
160            "reading updated {} '{}'",
161            self.log_name,
162            self.file.display()
163        );
164        let raw = tokio::fs::read(&self.file).await?;
165        (self.parser)(raw).map_err(FileWatcherError::Parse)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[tokio::test]
174    async fn test_file_zone() {
175        env_logger::Builder::new()
176            .parse_env(env_logger::Env::default().default_filter_or("info"))
177            .init();
178        let mut receiver = FileWatcherConfig::new("./test.yaml", "config").start();
179        while let Some(_update) = receiver.recv().await {
180            println!("updated!");
181        }
182    }
183}