feature_probe_server/
repo.rs

1#[cfg(feature = "realtime")]
2use crate::realtime::RealtimeSocket;
3use crate::FPServerError;
4use crate::{base::ServerConfig, secrets::SecretMapping};
5use feature_probe_server_sdk::{
6    EvalDetail, FPConfig, FPUser, FeatureProbe as FPClient, SyncType, Url,
7};
8#[cfg(feature = "unstable")]
9use feature_probe_server_sdk::{Segment, Toggle};
10use parking_lot::RwLock;
11use reqwest::Method;
12use serde_json::Value;
13use std::{collections::HashMap, sync::Arc};
14use tracing::{debug, error, info};
15
16#[derive(Debug, Clone)]
17pub struct SdkRepository {
18    inner: Arc<Inner>,
19}
20
21#[derive(Debug)]
22struct Inner {
23    server_config: ServerConfig,
24    http_client: reqwest::Client,
25    sdk_clients: RwLock<HashMap<String, FPClient>>,
26    secret_mapping: RwLock<SecretMapping>,
27    #[cfg(feature = "realtime")]
28    realtime_socket: RealtimeSocket,
29}
30
31impl SdkRepository {
32    pub fn new(
33        server_config: ServerConfig,
34        #[cfg(feature = "realtime")] realtime_socket: RealtimeSocket,
35    ) -> Self {
36        Self {
37            inner: Arc::new(Inner {
38                server_config,
39                http_client: Default::default(),
40                sdk_clients: Default::default(),
41                secret_mapping: Default::default(),
42                #[cfg(feature = "realtime")]
43                realtime_socket,
44            }),
45        }
46    }
47
48    #[cfg(feature = "unstable")]
49    pub fn update_segments(
50        &self,
51        _segments: HashMap<String, Segment>,
52    ) -> Result<(), FPServerError> {
53        // TODO:
54        Ok(())
55    }
56
57    #[cfg(feature = "unstable")]
58    pub fn update_toggles(
59        &self,
60        _server_sdk_key: &str,
61        _toggles: HashMap<String, Toggle>,
62    ) -> Result<(), FPServerError> {
63        // TODO:
64        Ok(())
65    }
66
67    pub fn secret_keys(&self) -> HashMap<String, String> {
68        let secret_mapping = self.inner.secret_mapping.read();
69        secret_mapping.mapping_clone()
70    }
71
72    pub fn sync(&self, client_sdk_key: String, server_sdk_key: String, version: u128) {
73        self.inner.sync(&server_sdk_key);
74        let mut secret_mapping = self.inner.secret_mapping.write();
75        secret_mapping.insert(client_sdk_key, server_sdk_key, version);
76    }
77
78    pub fn sync_with(&self, keys_url: Url) {
79        self.sync_secret_keys(keys_url);
80        let inner = self.inner.clone();
81        tokio::spawn(async move {
82            let mut interval = tokio::time::interval(inner.server_config.refresh_interval);
83            loop {
84                {
85                    inner.update_clients();
86                }
87                interval.tick().await;
88            }
89        });
90    }
91
92    fn sync_secret_keys(&self, keys_url: Url) {
93        let inner = self.inner.clone();
94        let mut interval = tokio::time::interval(inner.server_config.refresh_interval);
95        tokio::spawn(async move {
96            loop {
97                let url = keys_url.clone();
98                let request = inner
99                    .http_client
100                    .request(Method::GET, url)
101                    .timeout(inner.server_config.refresh_interval);
102                match request.send().await {
103                    Err(e) => error!("sync_secret_keys error: {}", e),
104                    Ok(resp) => match resp.text().await {
105                        Err(e) => error!("sync_secret_keys: {}", e),
106                        Ok(body) => match serde_json::from_str::<SecretMapping>(&body) {
107                            Err(e) => error!("sync_secret_keys json error: {}", e),
108                            Ok(r) => {
109                                debug!("sync_secret_keys success. version: {:?}", r.version(),);
110                                inner.update_mapping(r);
111                            }
112                        },
113                    },
114                }
115                interval.tick().await;
116            }
117        });
118    }
119
120    pub fn server_sdk_repo_string(&self, server_sdk_key: &str) -> Result<String, FPServerError> {
121        let secret_mapping = self.inner.secret_mapping.read();
122        if secret_mapping.version() == 0 {
123            return Err(FPServerError::NotReady(server_sdk_key.to_string()));
124        }
125        if !secret_mapping.contains_server_sdk_key(server_sdk_key) {
126            return Err(FPServerError::NotFound(server_sdk_key.to_string()));
127        }
128        match self.inner.repo_string(server_sdk_key) {
129            Ok(repo) => Ok(repo),
130            Err(e) => Err(e),
131        }
132    }
133
134    pub fn client_sdk_eval_string(
135        &self,
136        client_sdk_key: &str,
137        user: &FPUser,
138    ) -> Result<String, FPServerError> {
139        let secret_mapping = self.inner.secret_mapping.read();
140        if secret_mapping.version() == 0 {
141            return Err(FPServerError::NotReady(client_sdk_key.to_string()));
142        }
143        let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) {
144            Some(sdk_key) => sdk_key,
145            None => return Err(FPServerError::NotFound(client_sdk_key.to_string())),
146        };
147        self.inner.all_evaluated_string(server_sdk_key, user)
148    }
149
150    pub fn client_sdk_events_string(&self, client_sdk_key: &str) -> Result<String, FPServerError> {
151        let secret_mapping = self.inner.secret_mapping.read();
152        if secret_mapping.version() == 0 {
153            return Err(FPServerError::NotReady(client_sdk_key.to_string()));
154        }
155        let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) {
156            Some(sdk_key) => sdk_key,
157            None => return Err(FPServerError::NotFound(client_sdk_key.to_string())),
158        };
159        self.inner.all_event_string(server_sdk_key)
160    }
161
162    pub fn client_sync_now(&self, sdk_key: &str, t: SyncType) -> Result<String, FPServerError> {
163        let sdk_clients = self.inner.sdk_clients.write();
164        let client = match sdk_clients.get(sdk_key) {
165            Some(client) => client,
166            None => return Err(FPServerError::NotFound(sdk_key.to_string())),
167        };
168        client.sync_now(t);
169        Ok(sdk_key.to_string())
170    }
171
172    #[cfg(test)]
173    #[cfg(feature = "unstable")]
174    fn sdk_client(&self, sdk_key: &str) -> Option<FPClient> {
175        let sdk_clients = self.inner.sdk_clients.read();
176        sdk_clients.get(sdk_key).cloned()
177    }
178}
179
180impl Inner {
181    pub fn sync(&self, server_sdk_key: &str) {
182        let should_sync = {
183            let sdks = self.sdk_clients.read();
184            !sdks.contains_key(server_sdk_key)
185        };
186
187        if !should_sync {
188            return;
189        }
190
191        let mut mut_sdks = self.sdk_clients.write();
192        let config = FPConfig {
193            server_sdk_key: server_sdk_key.to_owned(),
194            remote_url: Url::parse("http://nouse.com").unwrap(),
195            toggles_url: Some(self.server_config.toggles_url.clone()),
196            refresh_interval: self.server_config.refresh_interval,
197            http_client: Some(self.http_client.clone()),
198            ..Default::default()
199        };
200        info!("{:?} added", server_sdk_key);
201
202        #[cfg(feature = "realtime")]
203        {
204            let mut client = FPClient::new(config);
205            self.setup_notify(server_sdk_key, &mut client);
206            let _ = &mut_sdks.insert(server_sdk_key.to_owned(), client);
207        }
208
209        #[cfg(not(feature = "realtime"))]
210        let _ = &mut_sdks.insert(server_sdk_key.to_owned(), FPClient::new(config));
211    }
212
213    pub fn remove_client(&self, server_sdk_key: &str) {
214        let mut sdks = self.sdk_clients.write();
215        sdks.remove(server_sdk_key);
216    }
217
218    pub fn update_clients(&self) {
219        let secret_mapping = self.secret_mapping.read();
220        let clients = self.sdk_clients.read().clone();
221        if secret_mapping.version() > 0 {
222            let server_sdk_keys = secret_mapping.server_sdk_keys();
223            for server_sdk_key in &server_sdk_keys {
224                self.sync(server_sdk_key);
225            }
226
227            for server_sdk_key in clients.keys() {
228                if !server_sdk_keys.contains(&server_sdk_key) {
229                    info!("{:?} removed.", server_sdk_key);
230                    self.remove_client(server_sdk_key);
231                }
232            }
233        }
234    }
235
236    pub fn update_mapping(&self, new: SecretMapping) {
237        let version = self.secret_mapping.read().version();
238        if new.version() > version {
239            let mut secret_mapping = self.secret_mapping.write();
240            secret_mapping.update_mapping(new)
241        }
242    }
243
244    #[cfg(feature = "realtime")]
245    fn setup_notify(&self, server_sdk_key: &str, client: &mut FPClient) {
246        let sdk_key = server_sdk_key.to_owned();
247        let realtime_socket = self.realtime_socket.clone();
248        let client_sdk_key = {
249            let mapping = self.secret_mapping.read();
250            mapping.client_sdk_key(server_sdk_key).cloned()
251        };
252
253        client.set_update_callback(Box::new(move |_old, _new, _type| {
254            let server_key = sdk_key.clone();
255            let client_key = client_sdk_key.clone();
256            let socket = realtime_socket.clone();
257            tokio::spawn(async move {
258                socket
259                    .notify_sdk(server_key, client_key, "update", serde_json::json!(""))
260                    .await;
261            });
262        }));
263    }
264
265    fn repo_string(&self, sdk_key: &str) -> Result<String, FPServerError> {
266        let clients = self.sdk_clients.read();
267        let client = match clients.get(sdk_key) {
268            Some(client) if !client.initialized() => {
269                return Err(FPServerError::NotReady(sdk_key.to_string()))
270            }
271            Some(client) => client,
272            None => return Err(FPServerError::NotReady(sdk_key.to_string())),
273        };
274        let arc_repo = client.repo();
275        let repo = arc_repo.read();
276        serde_json::to_string(&*repo).map_err(|e| FPServerError::JsonError(e.to_string()))
277    }
278
279    fn all_evaluated_string(&self, sdk_key: &str, user: &FPUser) -> Result<String, FPServerError> {
280        let clients = self.sdk_clients.read();
281        let client = match clients.get(sdk_key) {
282            Some(client) if !client.initialized() => {
283                return Err(FPServerError::NotReady(sdk_key.to_string()))
284            }
285            Some(client) => client,
286            None => return Err(FPServerError::NotReady(sdk_key.to_string())),
287        };
288        let arc_repo = client.repo();
289        let repo = arc_repo.read();
290        let map: HashMap<String, EvalDetail<Value>> = repo
291            .toggles
292            .iter()
293            .filter(|(_, t)| t.is_for_client())
294            .map(|(key, toggle)| (key.to_owned(), toggle.eval_detail(user, &repo.segments)))
295            .collect();
296        serde_json::to_string(&map).map_err(|e| FPServerError::JsonError(e.to_string()))
297    }
298
299    fn all_event_string(&self, sdk_key: &str) -> Result<String, FPServerError> {
300        let clients = self.sdk_clients.read();
301        let client = match clients.get(sdk_key) {
302            Some(client) if !client.initialized() => {
303                return Err(FPServerError::NotReady(sdk_key.to_string()))
304            }
305            Some(client) => client,
306            None => return Err(FPServerError::NotReady(sdk_key.to_string())),
307        };
308        let arc_repo = client.repo();
309        let repo = arc_repo.read();
310        serde_json::to_string(&repo.events).map_err(|e| FPServerError::JsonError(e.to_string()))
311    }
312}
313
314#[cfg(test)]
315mod tests {
316
317    use super::*;
318    use crate::FPServerError::{NotFound, NotReady};
319    use axum::{routing::get, Json, Router, TypedHeader};
320    #[cfg(feature = "unstable")]
321    use feature_probe_server_sdk::FPUser;
322    use feature_probe_server_sdk::{Repository, SdkAuthorization};
323    #[cfg(feature = "unstable")]
324    use serde_json::json;
325    use std::{fs, net::SocketAddr, path::PathBuf, time::Duration};
326
327    #[tokio::test]
328    async fn test_repo_sync() {
329        let port = 9590;
330        setup_mock_api(port);
331        let client_sdk_key = "client-sdk-key".to_owned();
332        let server_sdk_key = "server-sdk-key".to_owned();
333        let client_sdk_key2 = "client-sdk-key2".to_owned();
334        let server_sdk_key2 = "server-sdk-key2".to_owned();
335        let repository = setup_repository(port, &client_sdk_key, &server_sdk_key).await;
336
337        let repo_string = repository.server_sdk_repo_string(&server_sdk_key);
338        assert!(repo_string.is_ok());
339        let r = serde_json::from_str::<Repository>(&repo_string.unwrap()).unwrap();
340        assert_eq!(r, repo_from_test_file());
341
342        let secret_keys = repository.secret_keys();
343        assert_eq!(secret_keys.len(), 1);
344        assert_eq!(secret_keys.get(&client_sdk_key), Some(&server_sdk_key));
345
346        // test mapping sync
347
348        let mut mapping = HashMap::new();
349        mapping.insert(client_sdk_key2.to_string(), server_sdk_key2.to_string());
350        let new = SecretMapping::new(2, mapping);
351        let clients = { (repository.inner.sdk_clients.read()).clone() };
352        assert!(clients.contains_key(&server_sdk_key));
353        repository.inner.update_mapping(new);
354        let secret_mapping = { (repository.inner.secret_mapping.read()).clone() };
355        let secret = &secret_mapping.server_sdk_key(&client_sdk_key2);
356        assert_eq!(secret_mapping.version(), 2);
357        assert_eq!(secret.unwrap(), &server_sdk_key2.to_string());
358
359        // test clients sync
360        repository.inner.update_clients();
361        let clients = { (repository.inner.sdk_clients.read()).clone() };
362        assert!(!clients.contains_key(&server_sdk_key));
363        assert!(clients.contains_key(&server_sdk_key2));
364
365        let sdk_key = repository.client_sync_now(&server_sdk_key2, SyncType::Polling);
366        assert!(sdk_key.is_ok());
367    }
368
369    #[tokio::test]
370    async fn test_repo_sync2() {
371        let port = 9591;
372        setup_mock_api(port);
373        let client_sdk_key = "client-sdk-key".to_owned();
374        let server_sdk_key = "server-sdk-key".to_owned();
375        let non_sdk_key = "non-exist-sdk-key".to_owned();
376        let repository = setup_repository2(port).await;
377
378        let repo_string_err = repository.server_sdk_repo_string(&non_sdk_key);
379        assert_eq!(repo_string_err.err(), Some(NotFound(non_sdk_key)));
380        let events_string = repository.client_sdk_events_string(&client_sdk_key);
381        assert!(events_string.is_ok());
382        let repo_string = repository.server_sdk_repo_string(&server_sdk_key);
383        assert!(repo_string.is_ok());
384        let r = serde_json::from_str::<Repository>(&repo_string.unwrap()).unwrap();
385        assert!(r == repo_from_test_file());
386        let secret_keys = repository.secret_keys();
387        let secret_keys_version = repository.inner.secret_mapping.read().version();
388        assert!(secret_keys_version == 1);
389        assert!(secret_keys.len() == 1);
390        assert!(secret_keys.get(&client_sdk_key) == Some(&server_sdk_key));
391    }
392
393    #[tokio::test]
394    async fn test_not_ready_repo_sync() {
395        let port = 9592;
396        setup_mock_api(port);
397        let client_sdk_key = "client-sdk-key".to_owned();
398        let server_sdk_key = "server-sdk-key".to_owned();
399        let repository = setup_not_ready_repository(port, &client_sdk_key, &server_sdk_key).await;
400
401        let repo_string_err = repository.server_sdk_repo_string(&server_sdk_key);
402        assert_eq!(repo_string_err.err(), Some(NotReady(server_sdk_key)));
403    }
404
405    #[cfg(feature = "unstable")]
406    #[tokio::test]
407    async fn test_update_toggles() {
408        let port = 9592;
409        setup_mock_api(port);
410
411        let server_sdk_key = "sdk-key1".to_owned();
412        let client_sdk_key = "client-sdk-key".to_owned();
413        let repository = setup_repository(port, &client_sdk_key, &server_sdk_key).await;
414        let client = repository.sdk_client(&server_sdk_key);
415        assert!(client.is_some());
416
417        let client = client.unwrap();
418        let user = FPUser::new().with("city", "4");
419        let default: HashMap<String, String> = HashMap::default();
420        let v = client.json_value("json_toggle", &user, json!(default));
421        assert!(v.get("variation_1").is_some());
422
423        let mut map = update_toggles_from_file();
424        let update_toggles = map.remove(&server_sdk_key);
425        assert!(update_toggles.is_some());
426
427        let update_toggles = update_toggles.unwrap();
428        let result = repository.update_toggles(&server_sdk_key, update_toggles);
429        assert!(result.is_ok());
430    }
431
432    async fn setup_repository(
433        port: u16,
434        client_sdk_key: &str,
435        server_sdk_key: &str,
436    ) -> SdkRepository {
437        let toggles_url =
438            Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
439        let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
440        let analysis_url = None;
441        let config = ServerConfig {
442            toggles_url,
443            events_url,
444            analysis_url,
445            refresh_interval: Duration::from_secs(1),
446            client_sdk_key: Some(client_sdk_key.to_owned()),
447            server_sdk_key: Some(server_sdk_key.to_owned()),
448            keys_url: None,
449            server_port: port,
450            #[cfg(feature = "realtime")]
451            realtime_port: port + 100,
452            #[cfg(feature = "realtime")]
453            realtime_path: "/server/realtime".to_owned(),
454        };
455
456        #[cfg(feature = "realtime")]
457        let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
458
459        let repo = SdkRepository::new(
460            config,
461            #[cfg(feature = "realtime")]
462            rs,
463        );
464        repo.sync(client_sdk_key.to_owned(), server_sdk_key.to_owned(), 1);
465        tokio::time::sleep(Duration::from_millis(100)).await;
466        repo
467    }
468
469    async fn setup_not_ready_repository(
470        port: u16,
471        client_sdk_key: &str,
472        server_sdk_key: &str,
473    ) -> SdkRepository {
474        let toggles_url =
475            Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
476        let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
477        let analysis_url = None;
478        let config = ServerConfig {
479            toggles_url,
480            events_url,
481            analysis_url,
482            refresh_interval: Duration::from_secs(1),
483            client_sdk_key: Some(client_sdk_key.to_owned()),
484            server_sdk_key: Some(server_sdk_key.to_owned()),
485            keys_url: None,
486            server_port: port,
487            #[cfg(feature = "realtime")]
488            realtime_port: port + 100,
489            #[cfg(feature = "realtime")]
490            realtime_path: "/server/realtime".to_owned(),
491        };
492
493        #[cfg(feature = "realtime")]
494        let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
495
496        let repo = SdkRepository::new(
497            config,
498            #[cfg(feature = "realtime")]
499            rs,
500        );
501        repo.sync(client_sdk_key.to_owned(), server_sdk_key.to_owned(), 0);
502        tokio::time::sleep(Duration::from_millis(100)).await;
503        repo
504    }
505
506    async fn setup_repository2(port: u16) -> SdkRepository {
507        let toggles_url =
508            Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
509        let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
510        let keys_url = Url::parse(&format!("http://127.0.0.1:{}/api/secret-keys", port)).unwrap();
511        let analysis_url = None;
512        let config = ServerConfig {
513            toggles_url,
514            events_url,
515            analysis_url,
516            refresh_interval: Duration::from_millis(100),
517            client_sdk_key: None,
518            server_sdk_key: None,
519            keys_url: Some(keys_url.clone()),
520            server_port: port,
521            #[cfg(feature = "realtime")]
522            realtime_port: port + 100,
523            realtime_path: "/server/realtime".to_owned(),
524        };
525
526        #[cfg(feature = "realtime")]
527        let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
528
529        let repo = SdkRepository::new(
530            config,
531            #[cfg(feature = "realtime")]
532            rs,
533        );
534        repo.sync_with(keys_url);
535        tokio::time::sleep(Duration::from_millis(300)).await;
536        repo
537    }
538
539    async fn server_sdk_toggles(
540        TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
541    ) -> Json<Repository> {
542        repo_from_test_file().into()
543    }
544
545    async fn secret_keys() -> String {
546        r#" { "version": 1, "mapping": { "client-sdk-key": "server-sdk-key" } }"#.to_owned()
547    }
548
549    fn setup_mock_api(port: u16) {
550        let app = Router::new()
551            .route("/api/secret-keys", get(secret_keys))
552            .route("/api/server-sdk/toggles", get(server_sdk_toggles));
553        let addr = SocketAddr::from(([0, 0, 0, 0], port));
554        tokio::spawn(async move {
555            let _ = axum::Server::bind(&addr)
556                .serve(app.into_make_service())
557                .await;
558        });
559    }
560
561    fn repo_from_test_file() -> Repository {
562        let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
563        path.push("resources/fixtures/repo.json");
564        let json_str = fs::read_to_string(path).unwrap();
565        serde_json::from_str::<Repository>(&json_str).unwrap()
566    }
567
568    #[cfg(feature = "unstable")]
569    fn update_toggles_from_file() -> HashMap<String, HashMap<String, Toggle>> {
570        let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
571        path.push("resources/fixtures/toggles_update.json");
572        let json_str = fs::read_to_string(path).unwrap();
573        serde_json::from_str::<HashMap<String, HashMap<String, Toggle>>>(&json_str).unwrap()
574    }
575}