feature_probe_server_sdk/
sync.rs

1use crate::FPError;
2use crate::Repository;
3use headers::HeaderValue;
4use parking_lot::{Mutex, RwLock};
5use reqwest::{header::AUTHORIZATION, Client, Method};
6use std::{sync::mpsc::sync_channel, time::Instant};
7use std::{sync::Arc, time::Duration};
8use tracing::trace;
9use tracing::{debug, error};
10use url::Url;
11
12pub type UpdateCallback = Box<dyn Fn(Repository, Repository, SyncType) + Send>;
13
14#[derive(Debug, Clone)]
15pub struct Synchronizer {
16    inner: Arc<Inner>,
17}
18
19#[derive(Debug)]
20pub enum SyncType {
21    Realtime,
22    Polling,
23}
24
25struct Inner {
26    toggles_url: Url,
27    refresh_interval: Duration,
28    auth: HeaderValue,
29    client: Client,
30    repo: Arc<RwLock<Repository>>,
31    is_init: Arc<RwLock<bool>>,
32    update_callback: Arc<Mutex<Option<UpdateCallback>>>,
33}
34
35impl std::fmt::Debug for Inner {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_tuple("SynchronizerInner")
38            .field(&self.toggles_url)
39            .field(&self.refresh_interval)
40            .field(&self.repo)
41            .field(&self.is_init)
42            .finish()
43    }
44}
45
46//TODO: graceful shutdown
47impl Synchronizer {
48    pub fn new(
49        toggles_url: Url,
50        refresh_interval: Duration,
51        auth: HeaderValue,
52        client: Client,
53        repo: Arc<RwLock<Repository>>,
54    ) -> Self {
55        Self {
56            inner: Arc::new(Inner {
57                toggles_url,
58                refresh_interval,
59                auth,
60                client,
61                repo,
62                is_init: Default::default(),
63                update_callback: Arc::new(Mutex::new(None)),
64            }),
65        }
66    }
67
68    pub fn initialized(&self) -> bool {
69        let lock = self.inner.is_init.read();
70        *lock
71    }
72
73    pub fn start_sync(&self, start_wait: Option<Duration>, should_stop: Arc<RwLock<bool>>) {
74        let inner = self.inner.clone();
75        let (tx, rx) = sync_channel(1);
76        let start = Instant::now();
77        let mut is_send = false;
78        let interval_duration = inner.refresh_interval;
79        let is_timeout = Self::init_timeout_fn(start_wait, interval_duration, start);
80
81        tokio::spawn(async move {
82            let mut interval = tokio::time::interval(inner.refresh_interval);
83            loop {
84                let result = inner.sync_now(SyncType::Polling).await;
85
86                if let Some(r) = Self::should_send(result, &is_timeout, is_send) {
87                    is_send = true;
88                    let _ = tx.try_send(r);
89                }
90
91                if *should_stop.read() {
92                    break;
93                }
94                interval.tick().await;
95            }
96        });
97
98        if start_wait.is_some() {
99            let _ = rx.recv();
100        }
101    }
102
103    pub fn set_update_callback(&mut self, update_callback: UpdateCallback) {
104        let mut lock = self.inner.update_callback.lock();
105        *lock = Some(update_callback);
106    }
107
108    pub fn version(&self) -> Option<u128> {
109        let repo = self.inner.repo.read();
110        repo.version
111    }
112
113    #[cfg(test)]
114    pub fn repository(&self) -> Arc<RwLock<Repository>> {
115        self.inner.repo.clone()
116    }
117
118    #[cfg(test)]
119    fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
120        self.inner.notify_update(old_repo, new_repo, t)
121    }
122
123    fn init_timeout_fn(
124        start_wait: Option<Duration>,
125        interval: Duration,
126        start: Instant,
127    ) -> Option<Box<dyn Fn() -> bool + Send>> {
128        match start_wait {
129            Some(timeout) => Some(Box::new(move || start.elapsed() + interval > timeout)),
130            None => None,
131        }
132    }
133
134    fn should_send(
135        result: Result<(), FPError>,
136        is_timeout: &Option<Box<dyn Fn() -> bool + Send>>,
137        is_send: bool,
138    ) -> Option<Result<(), FPError>> {
139        if let Some(is_timeout) = is_timeout {
140            match result {
141                Ok(_) if !is_send => {
142                    return Some(Ok(()));
143                }
144                Err(e) if !is_send && is_timeout() => {
145                    error!("sync error: {}", e);
146                    return Some(Err(e));
147                }
148                Err(e) => error!("sync error: {}", e),
149                _ => {}
150            }
151        }
152        None
153    }
154
155    pub fn sync_now(&self, t: SyncType) {
156        let slf = self.clone();
157        tokio::spawn(async move { slf.inner.sync_now(t).await });
158    }
159}
160
161impl Inner {
162    pub async fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
163        use http::header::USER_AGENT;
164
165        trace!("sync_now {:?} {:?}", self.auth, t);
166        let mut request = self
167            .client
168            .request(Method::GET, self.toggles_url.clone())
169            .header(AUTHORIZATION, self.auth.clone())
170            .header(USER_AGENT, &*crate::USER_AGENT)
171            .timeout(self.refresh_interval);
172
173        {
174            let repo = self.repo.read();
175            if let Some(version) = &repo.version {
176                request = request.query(&[("version", &version.to_string())]);
177            }
178        } // drop repo lock
179
180        //TODO: report failure
181        match request.send().await {
182            Err(e) => Err(FPError::HttpError(e.to_string())),
183            Ok(resp) => match resp.text().await {
184                Err(e) => Err(FPError::HttpError(e.to_string())),
185                Ok(body) => match serde_json::from_str::<Repository>(&body) {
186                    Err(e) => Err(FPError::JsonError(body, e)),
187                    Ok(r) => {
188                        // TODO: validate repo
189                        // TODO: diff change, notify subscriber
190                        debug!("sync success {:?}", r);
191                        let mut repo = self.repo.write();
192                        if r.version > repo.version {
193                            let old = (*repo).clone();
194                            let new = r.clone();
195                            *repo = r;
196                            self.notify_update(old, new, t);
197                        }
198                        let mut is_init = self.is_init.write();
199                        *is_init = true;
200                        Ok(())
201                    }
202                },
203            },
204        }
205    }
206
207    fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
208        let lock = self.update_callback.lock();
209        if let Some(cb) = &*lock {
210            cb(old_repo, new_repo, t)
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::SdkAuthorization;
219    use axum::{routing::get, Json, Router, TypedHeader};
220    use headers::UserAgent;
221    use std::{fs, net::SocketAddr, path::PathBuf, sync::mpsc::channel};
222
223    #[test]
224    fn test_update_callback() {
225        let mut syncer = build_synchronizer(9000);
226        let (tx, rx) = channel();
227
228        syncer.set_update_callback(Box::new(move |_old, _new, _| tx.send(()).unwrap()));
229        let old = Repository::default();
230        let new = Repository::default();
231        syncer.notify_update(old, new, SyncType::Polling);
232
233        assert!(rx.try_recv().is_ok())
234    }
235
236    #[tokio::test]
237    async fn test_init_timeout_fn() {
238        let now = Instant::now();
239        let now = now - Duration::from_millis(10);
240
241        let is_timeout_fn = Synchronizer::init_timeout_fn(None, Duration::from_millis(1), now);
242        assert!(is_timeout_fn.is_none());
243
244        let is_timeout_fn = Synchronizer::init_timeout_fn(
245            Some(Duration::from_millis(20)),
246            Duration::from_millis(1),
247            now,
248        );
249        assert!(!is_timeout_fn.unwrap()());
250
251        let is_timeout_fn = Synchronizer::init_timeout_fn(
252            Some(Duration::from_millis(5)),
253            Duration::from_millis(1),
254            now,
255        );
256        assert!(is_timeout_fn.unwrap()());
257    }
258
259    #[test]
260    fn test_should_send() {
261        let is_timeout_fn = None;
262        let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
263        assert!(r.is_none(), "no need send because not set timeout");
264
265        let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
266        let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
267        assert!(r.is_some(), "need send because not timeout, and return Ok");
268        let r = r.unwrap();
269        assert!(r.is_ok());
270
271        let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
272        let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, true);
273        assert!(
274            r.is_none(),
275            "no need send because not timeout, and return error, wait next loop"
276        );
277
278        let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
279        let is_send = true;
280        let r = Synchronizer::should_send(
281            Err(FPError::InternalError("unknown".to_owned())),
282            &is_timeout_fn,
283            is_send,
284        );
285        assert!(r.is_none(), "no need send because already send before");
286
287        let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
288        let r = Synchronizer::should_send(
289            Err(FPError::InternalError("unknown".to_owned())),
290            &is_timeout_fn,
291            is_send,
292        );
293        assert!(r.is_none(), "no need send because already send before");
294
295        let is_send = false;
296        let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
297        let r = Synchronizer::should_send(
298            Err(FPError::InternalError("unknown".to_owned())),
299            &is_timeout_fn,
300            is_send,
301        );
302        assert!(r.is_some(), "need send because already timeout");
303        let r = r.unwrap();
304        assert!(r.is_err());
305    }
306
307    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
308    async fn test_sync() {
309        // let _ = tracing_subscriber::fmt().init();
310
311        let port = 9009;
312        setup_mock_api(port).await;
313        let syncer = build_synchronizer(port);
314        let should_stop = Arc::new(RwLock::new(false));
315        syncer.start_sync(Some(Duration::from_secs(5)), should_stop);
316
317        let repo = syncer.repository();
318        let repo = repo.read();
319        assert!(!repo.toggles.is_empty());
320        assert!(syncer.initialized());
321    }
322
323    fn build_synchronizer(port: u16) -> Synchronizer {
324        let toggles_url =
325            Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
326        let refresh_interval = Duration::from_secs(10);
327        let auth = SdkAuthorization("sdk-key".to_owned()).encode();
328        Synchronizer {
329            inner: Arc::new(Inner {
330                toggles_url,
331                refresh_interval,
332                auth,
333                client: Default::default(),
334                repo: Default::default(),
335                is_init: Default::default(),
336                update_callback: Default::default(),
337            }),
338        }
339    }
340
341    async fn setup_mock_api(port: u16) {
342        let app = Router::new().route("/api/server-sdk/toggles", get(server_sdk_toggles));
343        let addr = SocketAddr::from(([0, 0, 0, 0], port));
344        tokio::spawn(async move {
345            let _ = axum::Server::bind(&addr)
346                .serve(app.into_make_service())
347                .await;
348        });
349        tokio::time::sleep(Duration::from_millis(100)).await;
350    }
351
352    async fn server_sdk_toggles(
353        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
354        TypedHeader(user_agent): TypedHeader<UserAgent>,
355    ) -> Json<Repository> {
356        assert_eq!(sdk_key, "sdk-key");
357        assert!(!user_agent.to_string().is_empty());
358        let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
359        path.push("resources/fixtures/repo.json");
360        let json_str = fs::read_to_string(path).unwrap();
361        let repo = serde_json::from_str::<Repository>(&json_str).unwrap();
362        repo.into()
363    }
364}