feature_probe_server_sdk/
sync.rs1use crate::FPError;
2use crate::Repository;
3use headers::HeaderValue;
4use parking_lot::{Mutex, RwLock};
5use reqwest::{header::AUTHORIZATION, Client, Method};
6use std::{sync::mpsc::sync_channel, time::Instant};
7use std::{sync::Arc, time::Duration};
8use tracing::trace;
9use tracing::{debug, error};
10use url::Url;
11
12pub type UpdateCallback = Box<dyn Fn(Repository, Repository, SyncType) + Send>;
13
14#[derive(Debug, Clone)]
15pub struct Synchronizer {
16 inner: Arc<Inner>,
17}
18
19#[derive(Debug)]
20pub enum SyncType {
21 Realtime,
22 Polling,
23}
24
25struct Inner {
26 toggles_url: Url,
27 refresh_interval: Duration,
28 auth: HeaderValue,
29 client: Client,
30 repo: Arc<RwLock<Repository>>,
31 is_init: Arc<RwLock<bool>>,
32 update_callback: Arc<Mutex<Option<UpdateCallback>>>,
33}
34
35impl std::fmt::Debug for Inner {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_tuple("SynchronizerInner")
38 .field(&self.toggles_url)
39 .field(&self.refresh_interval)
40 .field(&self.repo)
41 .field(&self.is_init)
42 .finish()
43 }
44}
45
46impl Synchronizer {
48 pub fn new(
49 toggles_url: Url,
50 refresh_interval: Duration,
51 auth: HeaderValue,
52 client: Client,
53 repo: Arc<RwLock<Repository>>,
54 ) -> Self {
55 Self {
56 inner: Arc::new(Inner {
57 toggles_url,
58 refresh_interval,
59 auth,
60 client,
61 repo,
62 is_init: Default::default(),
63 update_callback: Arc::new(Mutex::new(None)),
64 }),
65 }
66 }
67
68 pub fn initialized(&self) -> bool {
69 let lock = self.inner.is_init.read();
70 *lock
71 }
72
73 pub fn start_sync(&self, start_wait: Option<Duration>, should_stop: Arc<RwLock<bool>>) {
74 let inner = self.inner.clone();
75 let (tx, rx) = sync_channel(1);
76 let start = Instant::now();
77 let mut is_send = false;
78 let interval_duration = inner.refresh_interval;
79 let is_timeout = Self::init_timeout_fn(start_wait, interval_duration, start);
80
81 tokio::spawn(async move {
82 let mut interval = tokio::time::interval(inner.refresh_interval);
83 loop {
84 let result = inner.sync_now(SyncType::Polling).await;
85
86 if let Some(r) = Self::should_send(result, &is_timeout, is_send) {
87 is_send = true;
88 let _ = tx.try_send(r);
89 }
90
91 if *should_stop.read() {
92 break;
93 }
94 interval.tick().await;
95 }
96 });
97
98 if start_wait.is_some() {
99 let _ = rx.recv();
100 }
101 }
102
103 pub fn set_update_callback(&mut self, update_callback: UpdateCallback) {
104 let mut lock = self.inner.update_callback.lock();
105 *lock = Some(update_callback);
106 }
107
108 pub fn version(&self) -> Option<u128> {
109 let repo = self.inner.repo.read();
110 repo.version
111 }
112
113 #[cfg(test)]
114 pub fn repository(&self) -> Arc<RwLock<Repository>> {
115 self.inner.repo.clone()
116 }
117
118 #[cfg(test)]
119 fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
120 self.inner.notify_update(old_repo, new_repo, t)
121 }
122
123 fn init_timeout_fn(
124 start_wait: Option<Duration>,
125 interval: Duration,
126 start: Instant,
127 ) -> Option<Box<dyn Fn() -> bool + Send>> {
128 match start_wait {
129 Some(timeout) => Some(Box::new(move || start.elapsed() + interval > timeout)),
130 None => None,
131 }
132 }
133
134 fn should_send(
135 result: Result<(), FPError>,
136 is_timeout: &Option<Box<dyn Fn() -> bool + Send>>,
137 is_send: bool,
138 ) -> Option<Result<(), FPError>> {
139 if let Some(is_timeout) = is_timeout {
140 match result {
141 Ok(_) if !is_send => {
142 return Some(Ok(()));
143 }
144 Err(e) if !is_send && is_timeout() => {
145 error!("sync error: {}", e);
146 return Some(Err(e));
147 }
148 Err(e) => error!("sync error: {}", e),
149 _ => {}
150 }
151 }
152 None
153 }
154
155 pub fn sync_now(&self, t: SyncType) {
156 let slf = self.clone();
157 tokio::spawn(async move { slf.inner.sync_now(t).await });
158 }
159}
160
161impl Inner {
162 pub async fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
163 use http::header::USER_AGENT;
164
165 trace!("sync_now {:?} {:?}", self.auth, t);
166 let mut request = self
167 .client
168 .request(Method::GET, self.toggles_url.clone())
169 .header(AUTHORIZATION, self.auth.clone())
170 .header(USER_AGENT, &*crate::USER_AGENT)
171 .timeout(self.refresh_interval);
172
173 {
174 let repo = self.repo.read();
175 if let Some(version) = &repo.version {
176 request = request.query(&[("version", &version.to_string())]);
177 }
178 } match request.send().await {
182 Err(e) => Err(FPError::HttpError(e.to_string())),
183 Ok(resp) => match resp.text().await {
184 Err(e) => Err(FPError::HttpError(e.to_string())),
185 Ok(body) => match serde_json::from_str::<Repository>(&body) {
186 Err(e) => Err(FPError::JsonError(body, e)),
187 Ok(r) => {
188 debug!("sync success {:?}", r);
191 let mut repo = self.repo.write();
192 if r.version > repo.version {
193 let old = (*repo).clone();
194 let new = r.clone();
195 *repo = r;
196 self.notify_update(old, new, t);
197 }
198 let mut is_init = self.is_init.write();
199 *is_init = true;
200 Ok(())
201 }
202 },
203 },
204 }
205 }
206
207 fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
208 let lock = self.update_callback.lock();
209 if let Some(cb) = &*lock {
210 cb(old_repo, new_repo, t)
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::SdkAuthorization;
219 use axum::{routing::get, Json, Router, TypedHeader};
220 use headers::UserAgent;
221 use std::{fs, net::SocketAddr, path::PathBuf, sync::mpsc::channel};
222
223 #[test]
224 fn test_update_callback() {
225 let mut syncer = build_synchronizer(9000);
226 let (tx, rx) = channel();
227
228 syncer.set_update_callback(Box::new(move |_old, _new, _| tx.send(()).unwrap()));
229 let old = Repository::default();
230 let new = Repository::default();
231 syncer.notify_update(old, new, SyncType::Polling);
232
233 assert!(rx.try_recv().is_ok())
234 }
235
236 #[tokio::test]
237 async fn test_init_timeout_fn() {
238 let now = Instant::now();
239 let now = now - Duration::from_millis(10);
240
241 let is_timeout_fn = Synchronizer::init_timeout_fn(None, Duration::from_millis(1), now);
242 assert!(is_timeout_fn.is_none());
243
244 let is_timeout_fn = Synchronizer::init_timeout_fn(
245 Some(Duration::from_millis(20)),
246 Duration::from_millis(1),
247 now,
248 );
249 assert!(!is_timeout_fn.unwrap()());
250
251 let is_timeout_fn = Synchronizer::init_timeout_fn(
252 Some(Duration::from_millis(5)),
253 Duration::from_millis(1),
254 now,
255 );
256 assert!(is_timeout_fn.unwrap()());
257 }
258
259 #[test]
260 fn test_should_send() {
261 let is_timeout_fn = None;
262 let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
263 assert!(r.is_none(), "no need send because not set timeout");
264
265 let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
266 let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, false);
267 assert!(r.is_some(), "need send because not timeout, and return Ok");
268 let r = r.unwrap();
269 assert!(r.is_ok());
270
271 let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
272 let r = Synchronizer::should_send(Ok(()), &is_timeout_fn, true);
273 assert!(
274 r.is_none(),
275 "no need send because not timeout, and return error, wait next loop"
276 );
277
278 let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| false));
279 let is_send = true;
280 let r = Synchronizer::should_send(
281 Err(FPError::InternalError("unknown".to_owned())),
282 &is_timeout_fn,
283 is_send,
284 );
285 assert!(r.is_none(), "no need send because already send before");
286
287 let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
288 let r = Synchronizer::should_send(
289 Err(FPError::InternalError("unknown".to_owned())),
290 &is_timeout_fn,
291 is_send,
292 );
293 assert!(r.is_none(), "no need send because already send before");
294
295 let is_send = false;
296 let is_timeout_fn: Option<Box<dyn Fn() -> bool + Send>> = Some(Box::new(|| true));
297 let r = Synchronizer::should_send(
298 Err(FPError::InternalError("unknown".to_owned())),
299 &is_timeout_fn,
300 is_send,
301 );
302 assert!(r.is_some(), "need send because already timeout");
303 let r = r.unwrap();
304 assert!(r.is_err());
305 }
306
307 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
308 async fn test_sync() {
309 let port = 9009;
312 setup_mock_api(port).await;
313 let syncer = build_synchronizer(port);
314 let should_stop = Arc::new(RwLock::new(false));
315 syncer.start_sync(Some(Duration::from_secs(5)), should_stop);
316
317 let repo = syncer.repository();
318 let repo = repo.read();
319 assert!(!repo.toggles.is_empty());
320 assert!(syncer.initialized());
321 }
322
323 fn build_synchronizer(port: u16) -> Synchronizer {
324 let toggles_url =
325 Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
326 let refresh_interval = Duration::from_secs(10);
327 let auth = SdkAuthorization("sdk-key".to_owned()).encode();
328 Synchronizer {
329 inner: Arc::new(Inner {
330 toggles_url,
331 refresh_interval,
332 auth,
333 client: Default::default(),
334 repo: Default::default(),
335 is_init: Default::default(),
336 update_callback: Default::default(),
337 }),
338 }
339 }
340
341 async fn setup_mock_api(port: u16) {
342 let app = Router::new().route("/api/server-sdk/toggles", get(server_sdk_toggles));
343 let addr = SocketAddr::from(([0, 0, 0, 0], port));
344 tokio::spawn(async move {
345 let _ = axum::Server::bind(&addr)
346 .serve(app.into_make_service())
347 .await;
348 });
349 tokio::time::sleep(Duration::from_millis(100)).await;
350 }
351
352 async fn server_sdk_toggles(
353 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
354 TypedHeader(user_agent): TypedHeader<UserAgent>,
355 ) -> Json<Repository> {
356 assert_eq!(sdk_key, "sdk-key");
357 assert!(!user_agent.to_string().is_empty());
358 let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
359 path.push("resources/fixtures/repo.json");
360 let json_str = fs::read_to_string(path).unwrap();
361 let repo = serde_json::from_str::<Repository>(&json_str).unwrap();
362 repo.into()
363 }
364}