feattle_sync/
background_sync.rs

1use feattle_core::{BoxError, Feattles};
2use std::sync::{Arc, Weak};
3use std::time::Duration;
4use tokio::task::JoinHandle;
5use tokio::time::sleep;
6
7/// Spawn a tokio task to poll [`Feattles::reload()`] continuously
8///
9/// A feattles instance will only ask the persistence layer for the current values when the
10/// [`Feattles::reload()`] method is called. This type would do so regularly for you, until the
11/// [`Feattles`] instance is dropped.
12///
13/// # Example
14/// ```
15/// # #[tokio::main]
16/// # async fn main() {
17/// use feattle_core::{feattles, Feattles};
18/// use feattle_sync::BackgroundSync;
19/// use feattle_core::persist::NoPersistence;
20/// use std::sync::Arc;
21///
22/// feattles! {
23///     struct MyToggles {
24///         a: bool,
25///     }
26/// }
27///
28/// // `NoPersistence` here is just a mock for the sake of the example
29/// let toggles = Arc::new(MyToggles::new(Arc::new(NoPersistence)));
30///
31/// BackgroundSync::new(&toggles).start().await;
32/// # }
33/// ```
34#[derive(Debug)]
35pub struct BackgroundSync<F> {
36    ok_interval: Duration,
37    err_interval: Duration,
38    feattles: Weak<F>,
39}
40
41impl<F> BackgroundSync<F> {
42    /// Create a new poller for the given feattles instance. It will call [`Arc::downgrade()`] to
43    /// detect when the value is dropped.
44    pub fn new(feattles: &Arc<F>) -> Self {
45        BackgroundSync {
46            ok_interval: Duration::from_secs(30),
47            err_interval: Duration::from_secs(60),
48            feattles: Arc::downgrade(feattles),
49        }
50    }
51
52    /// Set both [`Self::ok_interval`] and [`Self::err_interval`]
53    pub fn interval(&mut self, value: Duration) -> &mut Self {
54        self.ok_interval = value;
55        self.err_interval = value;
56        self
57    }
58
59    /// After a successful reload, will wait for this long before starting the next one. By default
60    /// this is 30 seconds.
61    pub fn ok_interval(&mut self, value: Duration) -> &mut Self {
62        self.ok_interval = value;
63        self
64    }
65
66    /// After a failed reload, will wait for this long before starting the next one. By default
67    /// this is 60 seconds.
68    pub fn err_interval(&mut self, value: Duration) -> &mut Self {
69        self.err_interval = value;
70        self
71    }
72}
73
74impl<F: Feattles + Sync + Send + 'static> BackgroundSync<F> {
75    /// Spawn a new tokio task, returning its handle. Usually you do not want to anything with the
76    /// returned handle, since the task will run by itself until the feattles instance gets dropped.
77    ///
78    /// Operational logs are generated with the crate [`log`].
79    #[deprecated = "use `start_sync()` that will try a first update right away"]
80    pub fn spawn(self) -> JoinHandle<()> {
81        tokio::spawn(async move {
82            while let Some(feattles) = self.feattles.upgrade() {
83                match feattles.reload().await {
84                    Ok(()) => {
85                        log::debug!("Feattles updated");
86                        sleep(self.ok_interval).await;
87                    }
88                    Err(err) => {
89                        log::warn!("Failed to sync Feattles: {:?}", err);
90                        sleep(self.err_interval).await;
91                    }
92                }
93            }
94
95            log::info!("Stop background sync since Feattles got dropped")
96        })
97    }
98
99    /// Start the sync operation by executing an update right now and then spawning a new tokio
100    /// task.
101    ///
102    /// This call will block until the first update returns. If it fails, the obtained error will be
103    /// returned.
104    ///
105    /// Note that the return type is `Option<_>` and not `Result<_>`, to avoid confusion: even if
106    /// the first update fails, the sync process will continue in the background.
107    ///
108    /// The tokio task will run by itself until the feattles instance gets dropped.
109    ///
110    /// Operational logs are generated with the crate [`log`].
111    pub async fn start(self) -> Option<BoxError> {
112        let feattles = self.feattles.upgrade()?;
113
114        let first_error = feattles.reload().await.err();
115        let first_sleep = match &first_error {
116            Some(err) => {
117                log::warn!("Failed to sync Feattles: {:?}", err);
118                self.err_interval
119            }
120            None => {
121                log::debug!("Feattles updated");
122                self.ok_interval
123            }
124        };
125
126        tokio::spawn(async move {
127            sleep(first_sleep).await;
128
129            while let Some(feattles) = self.feattles.upgrade() {
130                match feattles.reload().await {
131                    Ok(()) => {
132                        log::debug!("Feattles updated");
133                        sleep(self.ok_interval).await;
134                    }
135                    Err(err) => {
136                        log::warn!("Failed to sync Feattles: {:?}", err);
137                        sleep(self.err_interval).await;
138                    }
139                }
140            }
141
142            log::info!("Stop background sync since Feattles got dropped")
143        });
144
145        first_error
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use async_trait::async_trait;
153    use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
154    use feattle_core::{feattles, BoxError, Feattles};
155    use parking_lot::Mutex;
156    use tokio::time;
157    use tokio::time::Instant;
158
159    #[derive(Debug, thiserror::Error)]
160    #[error("Some error")]
161    struct SomeError;
162
163    #[derive(Clone)]
164    struct MockPersistence {
165        call_instants: Arc<Mutex<Vec<Instant>>>,
166    }
167
168    impl MockPersistence {
169        fn new() -> Self {
170            MockPersistence {
171                call_instants: Arc::new(Mutex::new(vec![Instant::now()])),
172            }
173        }
174
175        fn call_intervals(&self) -> Vec<Duration> {
176            self.call_instants
177                .lock()
178                .windows(2)
179                .map(|instants| instants[1] - instants[0])
180                .collect()
181        }
182    }
183
184    #[async_trait]
185    impl Persist for MockPersistence {
186        async fn save_current(&self, _value: &CurrentValues) -> Result<(), BoxError> {
187            unimplemented!()
188        }
189        async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
190            let mut call_instants = self.call_instants.lock();
191            call_instants.push(Instant::now());
192            if call_instants.len() == 3 {
193                // Second call returns an error
194                Err(Box::new(SomeError))
195            } else {
196                Ok(None)
197            }
198        }
199        async fn save_history(&self, _key: &str, _value: &ValueHistory) -> Result<(), BoxError> {
200            unimplemented!()
201        }
202        async fn load_history(&self, _key: &str) -> Result<Option<ValueHistory>, BoxError> {
203            unimplemented!()
204        }
205    }
206
207    #[tokio::test]
208    async fn test() {
209        feattles! {
210            struct MyToggles { }
211        }
212
213        time::pause();
214
215        let persistence = Arc::new(MockPersistence::new());
216        let toggles = Arc::new(MyToggles::new(persistence.clone()));
217        BackgroundSync::new(&toggles).start().await;
218
219        // First update: success
220        // Second update after 30s: fails
221        // Third update after 60s: success
222        // Forth update after 30s
223        loop {
224            let call_intervals = persistence.call_intervals();
225            if call_intervals.len() == 4 {
226                assert_eq!(call_intervals[0].as_secs_f32().round() as i32, 0);
227                assert_eq!(call_intervals[1].as_secs_f32().round() as i32, 30);
228                assert_eq!(call_intervals[2].as_secs_f32().round() as i32, 60);
229                assert_eq!(call_intervals[3].as_secs_f32().round() as i32, 30);
230                break;
231            }
232            tokio::task::yield_now().await;
233            time::sleep(Duration::from_millis(100)).await;
234        }
235
236        // No more updates
237        drop(toggles);
238        for _ in 0..5 {
239            tokio::task::yield_now().await;
240        }
241        assert_eq!(persistence.call_intervals().len(), 4);
242    }
243}