clap_adapters/
reloading.rs

1//! Provides the [`Reloading`] adapter for reloading files after fs notifications
2
3use std::sync::Arc;
4
5use notify::{RecommendedWatcher, RecursiveMode};
6use tokio::sync::watch;
7use tokio_stream::Stream;
8
9use crate::fs::PathTo;
10use crate::traits::FromReader;
11
12/// Given a [`Path`] from the user, provides a utility that reloads the file
13/// at the path whenever the file is updated
14///
15/// - Use [`Reloading::get`] to get the file contents at a given moment
16/// - Use [`Reloading::receiver`] to get a tokio [`watch::Receiver`]
17///
18/// # Example
19///
20/// ```
21/// # fn example() {
22/// use clap::Parser;
23/// use clap_adapters::prelude::*;
24///
25/// #[derive(Debug, Parser)]
26/// struct Cli {
27///     /// Path to a Json config that's reloaded
28///     #[clap(long)]
29///     config: Reloading<PathTo<JsonOf<serde_json::Value>>>,
30/// }
31///
32/// let cli = Cli::parse_from(["app", "--config=./config.json"]);
33/// let current_config = cli.config.get();
34/// let config_rx = cli.config.receiver();
35/// # }
36/// ```
37///
38/// > **Note**: [`Reloading`] is powered by [`notify`], which has some
39/// [known problems], so check out the caveats if you run into trouble
40///
41/// [`Path`]: std::path::Path
42/// [`watch::Receiver`]: tokio::sync::watch
43/// [known problems]: https://docs.rs/notify/latest/notify/#known-problems
44#[derive(Clone)]
45#[must_use = "Dropping the `Reloading` will cancel the file watch"]
46pub struct Reloading<T> {
47    reload_rx: watch::Receiver<T>,
48    _watcher: Arc<RecommendedWatcher>,
49}
50
51impl<T: Clone> Reloading<T> {
52    /// Get the current value of the inner document
53    pub fn get(&self) -> T {
54        self.reload_rx.borrow().clone()
55    }
56
57    /// Get a receiver channel that yields updated documents after filesystem changes
58    pub fn receiver(&self) -> watch::Receiver<T> {
59        self.reload_rx.clone()
60    }
61}
62
63impl<T: Clone + Send + Sync + 'static> Reloading<T> {
64    /// Get a stream of document changes
65    pub fn stream(&self) -> impl Stream<Item = T> {
66        tokio_stream::wrappers::WatchStream::new(self.reload_rx.clone())
67    }
68}
69
70impl<T: std::fmt::Debug> std::fmt::Debug for Reloading<PathTo<T>> {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_tuple("Reloading")
73            .field(&*self.reload_rx.borrow())
74            .finish()
75    }
76}
77
78impl<T: FromReader + Clone + Send + Sync + 'static> std::str::FromStr for Reloading<PathTo<T>> {
79    type Err = anyhow::Error;
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        use notify::Watcher;
82        let path_to = PathTo::<T>::from_str(s)?;
83        let (reload_tx, reload_rx) = watch::channel(path_to.clone());
84
85        let path = path_to.path.clone();
86        let mut watcher = notify::recommended_watcher(move |result| {
87            if let Err(error) = result {
88                tracing::warn!(
89                    error = format!("{error:#}"),
90                    "Notify triggered with error, skipping"
91                );
92            }
93
94            // Attempt to re-open file and read it into our typed format
95            let data_result = (|| -> anyhow::Result<T> {
96                let file = std::fs::File::open(&path)?;
97                let mut reader = std::io::BufReader::new(file);
98                let data = T::from_reader(&mut reader)?;
99                Ok(data)
100            })();
101
102            let data = match data_result {
103                Ok(data) => data,
104                Err(error) => {
105                    tracing::error!(
106                        error = format!("{error:#}"),
107                        path = %path.display(),
108                        "Failed to hotreload after notify",
109                    );
110                    return;
111                }
112            };
113
114            let updated_path_to = PathTo {
115                path: path.clone(),
116                data,
117            };
118
119            reload_tx.send_replace(updated_path_to);
120        })?;
121        watcher.watch(&path_to.path, RecursiveMode::NonRecursive)?;
122
123        let item = Self {
124            reload_rx,
125            _watcher: Arc::new(watcher),
126        };
127        Ok(item)
128    }
129}