clap_adapters/
periodic.rs

1//! Provides the [`Periodic`] adapter for loading files at a regular interval
2
3use std::time::Duration;
4
5use tokio::sync::watch;
6use tokio_stream::Stream;
7
8use crate::{prelude::FromReader, PathTo};
9
10/// Given a [`Path`] from the user, provides a utility that reloads the file
11/// at the path at a fixed interval
12///
13/// - Use [`Periodic::get`] to get the file contents at a given moment
14/// - Use [`Periodic::receiver`] to get a tokio [`watch::Receiver`]
15///
16/// # Example
17///
18/// ```no_run
19/// use clap::Parser;
20/// use clap_adapters::prelude::*;
21///
22/// #[derive(Debug, Parser)]
23/// struct Cli {
24///     /// Path to a Json config to be reloaded every 24 hours
25///     #[clap(long)]
26///     daily_config: Periodic<PathTo<JsonOf<serde_json::Value>>, Hours<24>>,
27///
28///     /// Path to a Json config to be reloaded every minute
29///     #[clap(long)]
30///     minutely_config: Periodic<PathTo<YamlOf<serde_json::Value>>>, // Minutes<1> is the default period
31///
32///     /// Path to a Json config to be reloaded every second
33///     #[clap(long)]
34///     secondly_config: Periodic<PathTo<TomlOf<serde_json::Value>>, Seconds<1>>,
35/// }
36///
37/// #[tokio::main]
38/// async fn main() {
39///     let cli = Cli::parse_from([
40///         "app",
41///         "--daily_config=./daily_config.json",
42///         "--minutely-config=./minutely_config.yaml",
43///         "--secondly-config=./secondly_config.toml",
44///     ]);
45///    
46///     let current_config = cli.daily_config.get();
47///     let current_config = cli.minutely_config.get();
48///     let current_config = cli.secondly_config.get();
49///    
50///     let daily_config_rx = cli.daily_config.receiver();
51///     let minutely_config_rx = cli.minutely_config.receiver();
52///     let secondly_config_rx = cli.secondly_config.receiver();
53/// }
54/// ```
55///
56/// > *Note*: [`Periodic`] requires a tokio runtime to be active before calling
57/// > any of the `clap::Parser` functions
58///
59/// [`Path`]: std::path::Path
60/// [`watch::Receiver`]: tokio::sync::watch
61#[derive(Clone)]
62#[must_use = "Dropping the `Periodic` will cancel the file watch"]
63pub struct Periodic<T, P: Time = Minutes<1>> {
64    reload_rx: watch::Receiver<T>,
65    period: std::marker::PhantomData<P>,
66}
67
68impl<T: Clone, P: Time> Periodic<T, P> {
69    /// Get the current value of the inner document
70    pub fn get(&self) -> T {
71        self.reload_rx.borrow().clone()
72    }
73
74    /// Get a receiver channel that yields updated documents after filesystem changes
75    pub fn receiver(&self) -> watch::Receiver<T> {
76        self.reload_rx.clone()
77    }
78}
79
80impl<T, P: Time> Periodic<T, P>
81where
82    T: Clone + Send + Sync + 'static,
83{
84    /// Get a stream of document changes
85    pub fn stream(&self) -> impl Stream<Item = T> {
86        tokio_stream::wrappers::WatchStream::new(self.reload_rx.clone())
87    }
88}
89
90impl<T, P: Time> std::fmt::Debug for Periodic<PathTo<T>, P>
91where
92    T: std::fmt::Debug,
93{
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_tuple("Periodic")
96            .field(&*self.reload_rx.borrow())
97            .finish()
98    }
99}
100
101impl<T, P: Time> std::str::FromStr for Periodic<PathTo<T>, P>
102where
103    T: FromReader + Clone + Send + Sync + 'static,
104{
105    type Err = anyhow::Error;
106    fn from_str(s: &str) -> Result<Self, Self::Err> {
107        let path_to = PathTo::<T>::from_str(s)?;
108        let (reload_tx, reload_rx) = watch::channel(path_to.clone());
109
110        tokio::spawn(async move {
111            let path = path_to.path;
112            let reload_tx = reload_tx;
113
114            loop {
115                let data_result = (|| -> anyhow::Result<T> {
116                    let file = std::fs::File::open(&path)?;
117                    let mut reader = std::io::BufReader::new(file);
118                    let data = T::from_reader(&mut reader)?;
119                    Ok(data)
120                })();
121
122                let data = match data_result {
123                    Ok(data) => data,
124                    Err(error) => {
125                        tracing::error!(
126                            error = format!("{error:#}"),
127                            path = %path.display(),
128                            "Failed to reload after time period",
129                        );
130                        continue;
131                    }
132                };
133
134                let updated_path_to = PathTo {
135                    path: path.clone(),
136                    data,
137                };
138
139                reload_tx.send_replace(updated_path_to);
140                tokio::time::sleep(P::PERIOD).await;
141            }
142        });
143
144        let item = Self {
145            reload_rx,
146            period: std::marker::PhantomData,
147        };
148        Ok(item)
149    }
150}
151
152/// Trait for type markers to const-evaluate to a Duration
153pub trait Time {
154    /// The duration between periodic file reloads
155    const PERIOD: Duration;
156}
157
158/// Reload the file every `N` seconds
159#[derive(Debug, Clone, Copy)]
160pub enum Seconds<const N: u64> {}
161impl<const N: u64> Time for Seconds<N> {
162    const PERIOD: Duration = Duration::from_secs(N);
163}
164
165/// Reload the file every `N` minutes
166#[derive(Debug, Clone, Copy)]
167pub enum Minutes<const N: u64> {}
168impl<const N: u64> Time for Minutes<N> {
169    const PERIOD: Duration = Duration::from_secs(60 * N);
170}
171
172/// Reload the file every `N` hours
173#[derive(Debug, Clone, Copy)]
174pub enum Hours<const N: u64> {}
175impl<const N: u64> Time for Hours<N> {
176    const PERIOD: Duration = Duration::from_secs(60 * 60 * N);
177}