1#[cfg(feature = "realtime")]
2use crate::realtime::RealtimeSocket;
3use crate::FPServerError;
4use crate::{base::ServerConfig, secrets::SecretMapping};
5use feature_probe_server_sdk::{
6 EvalDetail, FPConfig, FPUser, FeatureProbe as FPClient, SyncType, Url,
7};
8#[cfg(feature = "unstable")]
9use feature_probe_server_sdk::{Segment, Toggle};
10use parking_lot::RwLock;
11use reqwest::Method;
12use serde_json::Value;
13use std::{collections::HashMap, sync::Arc};
14use tracing::{debug, error, info};
15
16#[derive(Debug, Clone)]
17pub struct SdkRepository {
18 inner: Arc<Inner>,
19}
20
21#[derive(Debug)]
22struct Inner {
23 server_config: ServerConfig,
24 http_client: reqwest::Client,
25 sdk_clients: RwLock<HashMap<String, FPClient>>,
26 secret_mapping: RwLock<SecretMapping>,
27 #[cfg(feature = "realtime")]
28 realtime_socket: RealtimeSocket,
29}
30
31impl SdkRepository {
32 pub fn new(
33 server_config: ServerConfig,
34 #[cfg(feature = "realtime")] realtime_socket: RealtimeSocket,
35 ) -> Self {
36 Self {
37 inner: Arc::new(Inner {
38 server_config,
39 http_client: Default::default(),
40 sdk_clients: Default::default(),
41 secret_mapping: Default::default(),
42 #[cfg(feature = "realtime")]
43 realtime_socket,
44 }),
45 }
46 }
47
48 #[cfg(feature = "unstable")]
49 pub fn update_segments(
50 &self,
51 _segments: HashMap<String, Segment>,
52 ) -> Result<(), FPServerError> {
53 Ok(())
55 }
56
57 #[cfg(feature = "unstable")]
58 pub fn update_toggles(
59 &self,
60 _server_sdk_key: &str,
61 _toggles: HashMap<String, Toggle>,
62 ) -> Result<(), FPServerError> {
63 Ok(())
65 }
66
67 pub fn secret_keys(&self) -> HashMap<String, String> {
68 let secret_mapping = self.inner.secret_mapping.read();
69 secret_mapping.mapping_clone()
70 }
71
72 pub fn sync(&self, client_sdk_key: String, server_sdk_key: String, version: u128) {
73 self.inner.sync(&server_sdk_key);
74 let mut secret_mapping = self.inner.secret_mapping.write();
75 secret_mapping.insert(client_sdk_key, server_sdk_key, version);
76 }
77
78 pub fn sync_with(&self, keys_url: Url) {
79 self.sync_secret_keys(keys_url);
80 let inner = self.inner.clone();
81 tokio::spawn(async move {
82 let mut interval = tokio::time::interval(inner.server_config.refresh_interval);
83 loop {
84 {
85 inner.update_clients();
86 }
87 interval.tick().await;
88 }
89 });
90 }
91
92 fn sync_secret_keys(&self, keys_url: Url) {
93 let inner = self.inner.clone();
94 let mut interval = tokio::time::interval(inner.server_config.refresh_interval);
95 tokio::spawn(async move {
96 loop {
97 let url = keys_url.clone();
98 let request = inner
99 .http_client
100 .request(Method::GET, url)
101 .timeout(inner.server_config.refresh_interval);
102 match request.send().await {
103 Err(e) => error!("sync_secret_keys error: {}", e),
104 Ok(resp) => match resp.text().await {
105 Err(e) => error!("sync_secret_keys: {}", e),
106 Ok(body) => match serde_json::from_str::<SecretMapping>(&body) {
107 Err(e) => error!("sync_secret_keys json error: {}", e),
108 Ok(r) => {
109 debug!("sync_secret_keys success. version: {:?}", r.version(),);
110 inner.update_mapping(r);
111 }
112 },
113 },
114 }
115 interval.tick().await;
116 }
117 });
118 }
119
120 pub fn server_sdk_repo_string(&self, server_sdk_key: &str) -> Result<String, FPServerError> {
121 let secret_mapping = self.inner.secret_mapping.read();
122 if secret_mapping.version() == 0 {
123 return Err(FPServerError::NotReady(server_sdk_key.to_string()));
124 }
125 if !secret_mapping.contains_server_sdk_key(server_sdk_key) {
126 return Err(FPServerError::NotFound(server_sdk_key.to_string()));
127 }
128 match self.inner.repo_string(server_sdk_key) {
129 Ok(repo) => Ok(repo),
130 Err(e) => Err(e),
131 }
132 }
133
134 pub fn client_sdk_eval_string(
135 &self,
136 client_sdk_key: &str,
137 user: &FPUser,
138 ) -> Result<String, FPServerError> {
139 let secret_mapping = self.inner.secret_mapping.read();
140 if secret_mapping.version() == 0 {
141 return Err(FPServerError::NotReady(client_sdk_key.to_string()));
142 }
143 let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) {
144 Some(sdk_key) => sdk_key,
145 None => return Err(FPServerError::NotFound(client_sdk_key.to_string())),
146 };
147 self.inner.all_evaluated_string(server_sdk_key, user)
148 }
149
150 pub fn client_sdk_events_string(&self, client_sdk_key: &str) -> Result<String, FPServerError> {
151 let secret_mapping = self.inner.secret_mapping.read();
152 if secret_mapping.version() == 0 {
153 return Err(FPServerError::NotReady(client_sdk_key.to_string()));
154 }
155 let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) {
156 Some(sdk_key) => sdk_key,
157 None => return Err(FPServerError::NotFound(client_sdk_key.to_string())),
158 };
159 self.inner.all_event_string(server_sdk_key)
160 }
161
162 pub fn client_sync_now(&self, sdk_key: &str, t: SyncType) -> Result<String, FPServerError> {
163 let sdk_clients = self.inner.sdk_clients.write();
164 let client = match sdk_clients.get(sdk_key) {
165 Some(client) => client,
166 None => return Err(FPServerError::NotFound(sdk_key.to_string())),
167 };
168 client.sync_now(t);
169 Ok(sdk_key.to_string())
170 }
171
172 #[cfg(test)]
173 #[cfg(feature = "unstable")]
174 fn sdk_client(&self, sdk_key: &str) -> Option<FPClient> {
175 let sdk_clients = self.inner.sdk_clients.read();
176 sdk_clients.get(sdk_key).cloned()
177 }
178}
179
180impl Inner {
181 pub fn sync(&self, server_sdk_key: &str) {
182 let should_sync = {
183 let sdks = self.sdk_clients.read();
184 !sdks.contains_key(server_sdk_key)
185 };
186
187 if !should_sync {
188 return;
189 }
190
191 let mut mut_sdks = self.sdk_clients.write();
192 let config = FPConfig {
193 server_sdk_key: server_sdk_key.to_owned(),
194 remote_url: Url::parse("http://nouse.com").unwrap(),
195 toggles_url: Some(self.server_config.toggles_url.clone()),
196 refresh_interval: self.server_config.refresh_interval,
197 http_client: Some(self.http_client.clone()),
198 ..Default::default()
199 };
200 info!("{:?} added", server_sdk_key);
201
202 #[cfg(feature = "realtime")]
203 {
204 let mut client = FPClient::new(config);
205 self.setup_notify(server_sdk_key, &mut client);
206 let _ = &mut_sdks.insert(server_sdk_key.to_owned(), client);
207 }
208
209 #[cfg(not(feature = "realtime"))]
210 let _ = &mut_sdks.insert(server_sdk_key.to_owned(), FPClient::new(config));
211 }
212
213 pub fn remove_client(&self, server_sdk_key: &str) {
214 let mut sdks = self.sdk_clients.write();
215 sdks.remove(server_sdk_key);
216 }
217
218 pub fn update_clients(&self) {
219 let secret_mapping = self.secret_mapping.read();
220 let clients = self.sdk_clients.read().clone();
221 if secret_mapping.version() > 0 {
222 let server_sdk_keys = secret_mapping.server_sdk_keys();
223 for server_sdk_key in &server_sdk_keys {
224 self.sync(server_sdk_key);
225 }
226
227 for server_sdk_key in clients.keys() {
228 if !server_sdk_keys.contains(&server_sdk_key) {
229 info!("{:?} removed.", server_sdk_key);
230 self.remove_client(server_sdk_key);
231 }
232 }
233 }
234 }
235
236 pub fn update_mapping(&self, new: SecretMapping) {
237 let version = self.secret_mapping.read().version();
238 if new.version() > version {
239 let mut secret_mapping = self.secret_mapping.write();
240 secret_mapping.update_mapping(new)
241 }
242 }
243
244 #[cfg(feature = "realtime")]
245 fn setup_notify(&self, server_sdk_key: &str, client: &mut FPClient) {
246 let sdk_key = server_sdk_key.to_owned();
247 let realtime_socket = self.realtime_socket.clone();
248 let client_sdk_key = {
249 let mapping = self.secret_mapping.read();
250 mapping.client_sdk_key(server_sdk_key).cloned()
251 };
252
253 client.set_update_callback(Box::new(move |_old, _new, _type| {
254 let server_key = sdk_key.clone();
255 let client_key = client_sdk_key.clone();
256 let socket = realtime_socket.clone();
257 tokio::spawn(async move {
258 socket
259 .notify_sdk(server_key, client_key, "update", serde_json::json!(""))
260 .await;
261 });
262 }));
263 }
264
265 fn repo_string(&self, sdk_key: &str) -> Result<String, FPServerError> {
266 let clients = self.sdk_clients.read();
267 let client = match clients.get(sdk_key) {
268 Some(client) if !client.initialized() => {
269 return Err(FPServerError::NotReady(sdk_key.to_string()))
270 }
271 Some(client) => client,
272 None => return Err(FPServerError::NotReady(sdk_key.to_string())),
273 };
274 let arc_repo = client.repo();
275 let repo = arc_repo.read();
276 serde_json::to_string(&*repo).map_err(|e| FPServerError::JsonError(e.to_string()))
277 }
278
279 fn all_evaluated_string(&self, sdk_key: &str, user: &FPUser) -> Result<String, FPServerError> {
280 let clients = self.sdk_clients.read();
281 let client = match clients.get(sdk_key) {
282 Some(client) if !client.initialized() => {
283 return Err(FPServerError::NotReady(sdk_key.to_string()))
284 }
285 Some(client) => client,
286 None => return Err(FPServerError::NotReady(sdk_key.to_string())),
287 };
288 let arc_repo = client.repo();
289 let repo = arc_repo.read();
290 let map: HashMap<String, EvalDetail<Value>> = repo
291 .toggles
292 .iter()
293 .filter(|(_, t)| t.is_for_client())
294 .map(|(key, toggle)| (key.to_owned(), toggle.eval_detail(user, &repo.segments)))
295 .collect();
296 serde_json::to_string(&map).map_err(|e| FPServerError::JsonError(e.to_string()))
297 }
298
299 fn all_event_string(&self, sdk_key: &str) -> Result<String, FPServerError> {
300 let clients = self.sdk_clients.read();
301 let client = match clients.get(sdk_key) {
302 Some(client) if !client.initialized() => {
303 return Err(FPServerError::NotReady(sdk_key.to_string()))
304 }
305 Some(client) => client,
306 None => return Err(FPServerError::NotReady(sdk_key.to_string())),
307 };
308 let arc_repo = client.repo();
309 let repo = arc_repo.read();
310 serde_json::to_string(&repo.events).map_err(|e| FPServerError::JsonError(e.to_string()))
311 }
312}
313
314#[cfg(test)]
315mod tests {
316
317 use super::*;
318 use crate::FPServerError::{NotFound, NotReady};
319 use axum::{routing::get, Json, Router, TypedHeader};
320 #[cfg(feature = "unstable")]
321 use feature_probe_server_sdk::FPUser;
322 use feature_probe_server_sdk::{Repository, SdkAuthorization};
323 #[cfg(feature = "unstable")]
324 use serde_json::json;
325 use std::{fs, net::SocketAddr, path::PathBuf, time::Duration};
326
327 #[tokio::test]
328 async fn test_repo_sync() {
329 let port = 9590;
330 setup_mock_api(port);
331 let client_sdk_key = "client-sdk-key".to_owned();
332 let server_sdk_key = "server-sdk-key".to_owned();
333 let client_sdk_key2 = "client-sdk-key2".to_owned();
334 let server_sdk_key2 = "server-sdk-key2".to_owned();
335 let repository = setup_repository(port, &client_sdk_key, &server_sdk_key).await;
336
337 let repo_string = repository.server_sdk_repo_string(&server_sdk_key);
338 assert!(repo_string.is_ok());
339 let r = serde_json::from_str::<Repository>(&repo_string.unwrap()).unwrap();
340 assert_eq!(r, repo_from_test_file());
341
342 let secret_keys = repository.secret_keys();
343 assert_eq!(secret_keys.len(), 1);
344 assert_eq!(secret_keys.get(&client_sdk_key), Some(&server_sdk_key));
345
346 let mut mapping = HashMap::new();
349 mapping.insert(client_sdk_key2.to_string(), server_sdk_key2.to_string());
350 let new = SecretMapping::new(2, mapping);
351 let clients = { (repository.inner.sdk_clients.read()).clone() };
352 assert!(clients.contains_key(&server_sdk_key));
353 repository.inner.update_mapping(new);
354 let secret_mapping = { (repository.inner.secret_mapping.read()).clone() };
355 let secret = &secret_mapping.server_sdk_key(&client_sdk_key2);
356 assert_eq!(secret_mapping.version(), 2);
357 assert_eq!(secret.unwrap(), &server_sdk_key2.to_string());
358
359 repository.inner.update_clients();
361 let clients = { (repository.inner.sdk_clients.read()).clone() };
362 assert!(!clients.contains_key(&server_sdk_key));
363 assert!(clients.contains_key(&server_sdk_key2));
364
365 let sdk_key = repository.client_sync_now(&server_sdk_key2, SyncType::Polling);
366 assert!(sdk_key.is_ok());
367 }
368
369 #[tokio::test]
370 async fn test_repo_sync2() {
371 let port = 9591;
372 setup_mock_api(port);
373 let client_sdk_key = "client-sdk-key".to_owned();
374 let server_sdk_key = "server-sdk-key".to_owned();
375 let non_sdk_key = "non-exist-sdk-key".to_owned();
376 let repository = setup_repository2(port).await;
377
378 let repo_string_err = repository.server_sdk_repo_string(&non_sdk_key);
379 assert_eq!(repo_string_err.err(), Some(NotFound(non_sdk_key)));
380 let events_string = repository.client_sdk_events_string(&client_sdk_key);
381 assert!(events_string.is_ok());
382 let repo_string = repository.server_sdk_repo_string(&server_sdk_key);
383 assert!(repo_string.is_ok());
384 let r = serde_json::from_str::<Repository>(&repo_string.unwrap()).unwrap();
385 assert!(r == repo_from_test_file());
386 let secret_keys = repository.secret_keys();
387 let secret_keys_version = repository.inner.secret_mapping.read().version();
388 assert!(secret_keys_version == 1);
389 assert!(secret_keys.len() == 1);
390 assert!(secret_keys.get(&client_sdk_key) == Some(&server_sdk_key));
391 }
392
393 #[tokio::test]
394 async fn test_not_ready_repo_sync() {
395 let port = 9592;
396 setup_mock_api(port);
397 let client_sdk_key = "client-sdk-key".to_owned();
398 let server_sdk_key = "server-sdk-key".to_owned();
399 let repository = setup_not_ready_repository(port, &client_sdk_key, &server_sdk_key).await;
400
401 let repo_string_err = repository.server_sdk_repo_string(&server_sdk_key);
402 assert_eq!(repo_string_err.err(), Some(NotReady(server_sdk_key)));
403 }
404
405 #[cfg(feature = "unstable")]
406 #[tokio::test]
407 async fn test_update_toggles() {
408 let port = 9592;
409 setup_mock_api(port);
410
411 let server_sdk_key = "sdk-key1".to_owned();
412 let client_sdk_key = "client-sdk-key".to_owned();
413 let repository = setup_repository(port, &client_sdk_key, &server_sdk_key).await;
414 let client = repository.sdk_client(&server_sdk_key);
415 assert!(client.is_some());
416
417 let client = client.unwrap();
418 let user = FPUser::new().with("city", "4");
419 let default: HashMap<String, String> = HashMap::default();
420 let v = client.json_value("json_toggle", &user, json!(default));
421 assert!(v.get("variation_1").is_some());
422
423 let mut map = update_toggles_from_file();
424 let update_toggles = map.remove(&server_sdk_key);
425 assert!(update_toggles.is_some());
426
427 let update_toggles = update_toggles.unwrap();
428 let result = repository.update_toggles(&server_sdk_key, update_toggles);
429 assert!(result.is_ok());
430 }
431
432 async fn setup_repository(
433 port: u16,
434 client_sdk_key: &str,
435 server_sdk_key: &str,
436 ) -> SdkRepository {
437 let toggles_url =
438 Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
439 let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
440 let analysis_url = None;
441 let config = ServerConfig {
442 toggles_url,
443 events_url,
444 analysis_url,
445 refresh_interval: Duration::from_secs(1),
446 client_sdk_key: Some(client_sdk_key.to_owned()),
447 server_sdk_key: Some(server_sdk_key.to_owned()),
448 keys_url: None,
449 server_port: port,
450 #[cfg(feature = "realtime")]
451 realtime_port: port + 100,
452 #[cfg(feature = "realtime")]
453 realtime_path: "/server/realtime".to_owned(),
454 };
455
456 #[cfg(feature = "realtime")]
457 let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
458
459 let repo = SdkRepository::new(
460 config,
461 #[cfg(feature = "realtime")]
462 rs,
463 );
464 repo.sync(client_sdk_key.to_owned(), server_sdk_key.to_owned(), 1);
465 tokio::time::sleep(Duration::from_millis(100)).await;
466 repo
467 }
468
469 async fn setup_not_ready_repository(
470 port: u16,
471 client_sdk_key: &str,
472 server_sdk_key: &str,
473 ) -> SdkRepository {
474 let toggles_url =
475 Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
476 let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
477 let analysis_url = None;
478 let config = ServerConfig {
479 toggles_url,
480 events_url,
481 analysis_url,
482 refresh_interval: Duration::from_secs(1),
483 client_sdk_key: Some(client_sdk_key.to_owned()),
484 server_sdk_key: Some(server_sdk_key.to_owned()),
485 keys_url: None,
486 server_port: port,
487 #[cfg(feature = "realtime")]
488 realtime_port: port + 100,
489 #[cfg(feature = "realtime")]
490 realtime_path: "/server/realtime".to_owned(),
491 };
492
493 #[cfg(feature = "realtime")]
494 let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
495
496 let repo = SdkRepository::new(
497 config,
498 #[cfg(feature = "realtime")]
499 rs,
500 );
501 repo.sync(client_sdk_key.to_owned(), server_sdk_key.to_owned(), 0);
502 tokio::time::sleep(Duration::from_millis(100)).await;
503 repo
504 }
505
506 async fn setup_repository2(port: u16) -> SdkRepository {
507 let toggles_url =
508 Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap();
509 let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap();
510 let keys_url = Url::parse(&format!("http://127.0.0.1:{}/api/secret-keys", port)).unwrap();
511 let analysis_url = None;
512 let config = ServerConfig {
513 toggles_url,
514 events_url,
515 analysis_url,
516 refresh_interval: Duration::from_millis(100),
517 client_sdk_key: None,
518 server_sdk_key: None,
519 keys_url: Some(keys_url.clone()),
520 server_port: port,
521 #[cfg(feature = "realtime")]
522 realtime_port: port + 100,
523 realtime_path: "/server/realtime".to_owned(),
524 };
525
526 #[cfg(feature = "realtime")]
527 let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
528
529 let repo = SdkRepository::new(
530 config,
531 #[cfg(feature = "realtime")]
532 rs,
533 );
534 repo.sync_with(keys_url);
535 tokio::time::sleep(Duration::from_millis(300)).await;
536 repo
537 }
538
539 async fn server_sdk_toggles(
540 TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
541 ) -> Json<Repository> {
542 repo_from_test_file().into()
543 }
544
545 async fn secret_keys() -> String {
546 r#" { "version": 1, "mapping": { "client-sdk-key": "server-sdk-key" } }"#.to_owned()
547 }
548
549 fn setup_mock_api(port: u16) {
550 let app = Router::new()
551 .route("/api/secret-keys", get(secret_keys))
552 .route("/api/server-sdk/toggles", get(server_sdk_toggles));
553 let addr = SocketAddr::from(([0, 0, 0, 0], port));
554 tokio::spawn(async move {
555 let _ = axum::Server::bind(&addr)
556 .serve(app.into_make_service())
557 .await;
558 });
559 }
560
561 fn repo_from_test_file() -> Repository {
562 let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
563 path.push("resources/fixtures/repo.json");
564 let json_str = fs::read_to_string(path).unwrap();
565 serde_json::from_str::<Repository>(&json_str).unwrap()
566 }
567
568 #[cfg(feature = "unstable")]
569 fn update_toggles_from_file() -> HashMap<String, HashMap<String, Toggle>> {
570 let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
571 path.push("resources/fixtures/toggles_update.json");
572 let json_str = fs::read_to_string(path).unwrap();
573 serde_json::from_str::<HashMap<String, HashMap<String, Toggle>>>(&json_str).unwrap()
574 }
575}