feattle_sync/
background_sync.rs1use feattle_core::{BoxError, Feattles};
2use std::sync::{Arc, Weak};
3use std::time::Duration;
4use tokio::task::JoinHandle;
5use tokio::time::sleep;
6
7#[derive(Debug)]
35pub struct BackgroundSync<F> {
36 ok_interval: Duration,
37 err_interval: Duration,
38 feattles: Weak<F>,
39}
40
41impl<F> BackgroundSync<F> {
42 pub fn new(feattles: &Arc<F>) -> Self {
45 BackgroundSync {
46 ok_interval: Duration::from_secs(30),
47 err_interval: Duration::from_secs(60),
48 feattles: Arc::downgrade(feattles),
49 }
50 }
51
52 pub fn interval(&mut self, value: Duration) -> &mut Self {
54 self.ok_interval = value;
55 self.err_interval = value;
56 self
57 }
58
59 pub fn ok_interval(&mut self, value: Duration) -> &mut Self {
62 self.ok_interval = value;
63 self
64 }
65
66 pub fn err_interval(&mut self, value: Duration) -> &mut Self {
69 self.err_interval = value;
70 self
71 }
72}
73
74impl<F: Feattles + Sync + Send + 'static> BackgroundSync<F> {
75 #[deprecated = "use `start_sync()` that will try a first update right away"]
80 pub fn spawn(self) -> JoinHandle<()> {
81 tokio::spawn(async move {
82 while let Some(feattles) = self.feattles.upgrade() {
83 match feattles.reload().await {
84 Ok(()) => {
85 log::debug!("Feattles updated");
86 sleep(self.ok_interval).await;
87 }
88 Err(err) => {
89 log::warn!("Failed to sync Feattles: {:?}", err);
90 sleep(self.err_interval).await;
91 }
92 }
93 }
94
95 log::info!("Stop background sync since Feattles got dropped")
96 })
97 }
98
99 pub async fn start(self) -> Option<BoxError> {
112 let feattles = self.feattles.upgrade()?;
113
114 let first_error = feattles.reload().await.err();
115 let first_sleep = match &first_error {
116 Some(err) => {
117 log::warn!("Failed to sync Feattles: {:?}", err);
118 self.err_interval
119 }
120 None => {
121 log::debug!("Feattles updated");
122 self.ok_interval
123 }
124 };
125
126 tokio::spawn(async move {
127 sleep(first_sleep).await;
128
129 while let Some(feattles) = self.feattles.upgrade() {
130 match feattles.reload().await {
131 Ok(()) => {
132 log::debug!("Feattles updated");
133 sleep(self.ok_interval).await;
134 }
135 Err(err) => {
136 log::warn!("Failed to sync Feattles: {:?}", err);
137 sleep(self.err_interval).await;
138 }
139 }
140 }
141
142 log::info!("Stop background sync since Feattles got dropped")
143 });
144
145 first_error
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use async_trait::async_trait;
153 use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
154 use feattle_core::{feattles, BoxError, Feattles};
155 use parking_lot::Mutex;
156 use tokio::time;
157 use tokio::time::Instant;
158
159 #[derive(Debug, thiserror::Error)]
160 #[error("Some error")]
161 struct SomeError;
162
163 #[derive(Clone)]
164 struct MockPersistence {
165 call_instants: Arc<Mutex<Vec<Instant>>>,
166 }
167
168 impl MockPersistence {
169 fn new() -> Self {
170 MockPersistence {
171 call_instants: Arc::new(Mutex::new(vec![Instant::now()])),
172 }
173 }
174
175 fn call_intervals(&self) -> Vec<Duration> {
176 self.call_instants
177 .lock()
178 .windows(2)
179 .map(|instants| instants[1] - instants[0])
180 .collect()
181 }
182 }
183
184 #[async_trait]
185 impl Persist for MockPersistence {
186 async fn save_current(&self, _value: &CurrentValues) -> Result<(), BoxError> {
187 unimplemented!()
188 }
189 async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
190 let mut call_instants = self.call_instants.lock();
191 call_instants.push(Instant::now());
192 if call_instants.len() == 3 {
193 Err(Box::new(SomeError))
195 } else {
196 Ok(None)
197 }
198 }
199 async fn save_history(&self, _key: &str, _value: &ValueHistory) -> Result<(), BoxError> {
200 unimplemented!()
201 }
202 async fn load_history(&self, _key: &str) -> Result<Option<ValueHistory>, BoxError> {
203 unimplemented!()
204 }
205 }
206
207 #[tokio::test]
208 async fn test() {
209 feattles! {
210 struct MyToggles { }
211 }
212
213 time::pause();
214
215 let persistence = Arc::new(MockPersistence::new());
216 let toggles = Arc::new(MyToggles::new(persistence.clone()));
217 BackgroundSync::new(&toggles).start().await;
218
219 loop {
224 let call_intervals = persistence.call_intervals();
225 if call_intervals.len() == 4 {
226 assert_eq!(call_intervals[0].as_secs_f32().round() as i32, 0);
227 assert_eq!(call_intervals[1].as_secs_f32().round() as i32, 30);
228 assert_eq!(call_intervals[2].as_secs_f32().round() as i32, 60);
229 assert_eq!(call_intervals[3].as_secs_f32().round() as i32, 30);
230 break;
231 }
232 tokio::task::yield_now().await;
233 time::sleep(Duration::from_millis(100)).await;
234 }
235
236 drop(toggles);
238 for _ in 0..5 {
239 tokio::task::yield_now().await;
240 }
241 assert_eq!(persistence.call_intervals().len(), 4);
242 }
243}