1use std::collections::HashSet;
2use std::{sync::Arc, time::Duration};
3
4pub mod delta_refresh;
5
6use chrono::{TimeDelta, Utc};
7use dashmap::DashMap;
8use etag::EntityTag;
9use reqwest::StatusCode;
10use tracing::{debug, info, warn};
11use unleash_edge_delta::cache_manager::DeltaCacheManager;
12use unleash_edge_feature_cache::FeatureCache;
13use unleash_edge_feature_filters::{FeatureFilterSet, filter_client_features};
14use unleash_edge_http_client::{ClientMetaInformation, UnleashClient};
15use unleash_edge_persistence::EdgePersistence;
16use unleash_edge_types::errors::{EdgeError, FeatureError};
17use unleash_edge_types::tokens::{EdgeToken, cache_key, simplify};
18use unleash_edge_types::{
19 ClientFeaturesRequest, ClientFeaturesResponse, EdgeResult, TokenRefresh, build,
20};
21use unleash_types::client_features::ClientFeatures;
22use unleash_types::client_metrics::{ClientApplication, MetricsMetadata, SdkType};
23use unleash_yggdrasil::{EngineState, UpdateMessage};
24
25use crate::delta_refresh::DeltaRefresher;
26
27pub fn frontend_token_is_covered_by_tokens(
28 frontend_token: &EdgeToken,
29 tokens_to_refresh: Arc<DashMap<String, TokenRefresh>>,
30) -> bool {
31 tokens_to_refresh.iter().any(|client_token| {
32 client_token
33 .token
34 .same_environment_and_broader_or_equal_project_access(frontend_token)
35 })
36}
37
38#[derive(Clone)]
39pub enum HydratorType {
40 Streaming(Arc<DeltaRefresher>),
41 Polling(Arc<FeatureRefresher>),
42}
43
44impl HydratorType {
45 pub async fn hydrate_new_tokens(&self) {
46 match self {
47 HydratorType::Streaming(delta_refresher) => delta_refresher.hydrate_new_tokens().await,
48 HydratorType::Polling(feature_refresher) => {
49 feature_refresher.hydrate_new_tokens().await
50 }
51 }
52 }
53
54 pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option<EntityTag>) {
55 match self {
56 HydratorType::Streaming(delta_refresher) => {
57 delta_refresher
58 .register_token_for_refresh(token, etag)
59 .await
60 }
61 HydratorType::Polling(feature_refresher) => {
62 feature_refresher
63 .register_token_for_refresh(token, etag)
64 .await
65 }
66 }
67 }
68
69 pub fn tokens_to_refresh(self) -> TokenRefreshSet {
70 match self {
71 HydratorType::Streaming(delta_refresher) => delta_refresher.tokens_to_refresh.clone(),
72 HydratorType::Polling(feature_refresher) => feature_refresher.tokens_to_refresh.clone(),
73 }
74 }
75}
76
77type TokenRefreshSet = Arc<DashMap<String, TokenRefresh>>;
78
79trait TokenRefreshStatus {
80 fn get_tokens_due_for_refresh(&self) -> Vec<TokenRefresh>;
81 fn get_tokens_never_refreshed(&self) -> Vec<TokenRefresh>;
82 fn token_is_subsumed(&self, token: &EdgeToken) -> bool;
83 fn backoff(&self, token: &EdgeToken, refresh_interval: &TimeDelta);
84 fn update_last_refresh(
85 &self,
86 token: &EdgeToken,
87 etag: Option<EntityTag>,
88 feature_count: usize,
89 refresh_interval: &TimeDelta,
90 );
91 fn update_last_check(&self, token: &EdgeToken, refresh_interval: &TimeDelta);
92}
93
94impl TokenRefreshStatus for TokenRefreshSet {
95 fn get_tokens_due_for_refresh(&self) -> Vec<TokenRefresh> {
96 self.iter()
97 .map(|e| e.value().clone())
98 .filter(|token| {
99 token
100 .next_refresh
101 .map(|refresh| Utc::now() > refresh)
102 .unwrap_or(true)
103 })
104 .collect()
105 }
106
107 fn get_tokens_never_refreshed(&self) -> Vec<TokenRefresh> {
108 self.iter()
109 .map(|e| e.value().clone())
110 .filter(|token| token.last_refreshed.is_none() && token.last_check.is_none())
111 .collect()
112 }
113
114 fn token_is_subsumed(&self, token: &EdgeToken) -> bool {
115 self.iter()
116 .filter(|r| r.token.environment == token.environment)
117 .any(|t| t.token.subsumes(token))
118 }
119
120 fn backoff(&self, token: &EdgeToken, refresh_interval: &TimeDelta) {
121 self.alter(&token.token, |_k, old_refresh| {
122 old_refresh.backoff(refresh_interval)
123 });
124 }
125
126 fn update_last_refresh(
127 &self,
128 token: &EdgeToken,
129 etag: Option<EntityTag>,
130 feature_count: usize,
131 refresh_interval: &TimeDelta,
132 ) {
133 self.alter(&token.token, |_k, old_refresh| {
134 old_refresh.successful_refresh(refresh_interval, etag, feature_count)
135 });
136 }
137
138 fn update_last_check(&self, token: &EdgeToken, refresh_interval: &TimeDelta) {
139 self.alter(&token.token, |_k, old_refresh| {
140 old_refresh.successful_check(refresh_interval)
141 });
142 }
143}
144
145#[derive(Clone)]
146pub struct FeatureRefresher {
147 pub unleash_client: Arc<UnleashClient>,
148 pub tokens_to_refresh: TokenRefreshSet,
149 pub features_cache: Arc<FeatureCache>,
150 pub delta_cache_manager: Arc<DeltaCacheManager>,
151 pub engine_cache: Arc<DashMap<String, EngineState>>,
152 pub refresh_interval: chrono::Duration,
153 pub persistence: Option<Arc<dyn EdgePersistence>>,
154 pub client_meta_information: ClientMetaInformation,
155}
156
157impl Default for FeatureRefresher {
158 fn default() -> Self {
159 Self {
160 refresh_interval: chrono::Duration::seconds(15),
161 unleash_client: Default::default(),
162 tokens_to_refresh: Arc::new(DashMap::default()),
163 features_cache: Arc::new(Default::default()),
164 delta_cache_manager: Arc::new(DeltaCacheManager::new()),
165 engine_cache: Default::default(),
166 persistence: None,
167 client_meta_information: Default::default(),
168 }
169 }
170}
171
172fn client_application_from_token_and_name(
173 token: EdgeToken,
174 refresh_interval: i64,
175 client_meta_information: ClientMetaInformation,
176) -> ClientApplication {
177 ClientApplication {
178 app_name: client_meta_information.app_name,
179 connect_via: None,
180 environment: token.environment,
181 projects: Some(token.projects),
182 instance_id: Some(client_meta_information.instance_id),
183 connection_id: Some(client_meta_information.connection_id),
184 interval: refresh_interval as u32,
185 started: Utc::now(),
186 strategies: vec![],
187 metadata: MetricsMetadata {
188 platform_name: None,
189 platform_version: None,
190 sdk_version: Some(format!("unleash-edge:{}", build::PKG_VERSION)),
191 sdk_type: Some(SdkType::Backend),
192 yggdrasil_version: None,
193 },
194 }
195}
196
197pub async fn start_refresh_features_background_task(refresher: Arc<FeatureRefresher>) {
198 loop {
199 tokio::select! {
200 _ = tokio::time::sleep(Duration::from_secs(5)) => {
201 refresher.refresh_features().await;
202 }
203 }
204 }
205}
206
207pub struct FeatureRefreshConfig {
208 features_refresh_interval: chrono::Duration,
209 client_meta_information: ClientMetaInformation,
210}
211
212impl FeatureRefreshConfig {
213 pub fn new(
214 features_refresh_interval: chrono::Duration,
215 client_meta_information: ClientMetaInformation,
216 ) -> Self {
217 Self {
218 features_refresh_interval,
219 client_meta_information,
220 }
221 }
222}
223
224impl FeatureRefresher {
225 pub fn new(
226 unleash_client: Arc<UnleashClient>,
227 features_cache: Arc<FeatureCache>,
228 delta_cache_manager: Arc<DeltaCacheManager>,
229 engines: Arc<DashMap<String, EngineState>>,
230 persistence: Option<Arc<dyn EdgePersistence>>,
231 config: FeatureRefreshConfig,
232 ) -> Self {
233 FeatureRefresher {
234 unleash_client,
235 tokens_to_refresh: Arc::new(DashMap::default()),
236 features_cache,
237 delta_cache_manager,
238 engine_cache: engines,
239 refresh_interval: config.features_refresh_interval,
240 persistence,
241 client_meta_information: config.client_meta_information,
242 }
243 }
244
245 pub fn with_client(client: Arc<UnleashClient>) -> Self {
246 Self {
247 unleash_client: client,
248 ..Default::default()
249 }
250 }
251
252 pub async fn register_and_hydrate_token(&self, token: &EdgeToken) {
256 self.register_token_for_refresh(token.clone(), None).await;
257 self.hydrate_new_tokens().await;
258 }
259
260 pub fn features_for_filter(
261 &self,
262 token: EdgeToken,
263 filters: &FeatureFilterSet,
264 ) -> EdgeResult<ClientFeatures> {
265 match self.get_features_by_filter(&token, filters) {
266 Some(features) if self.tokens_to_refresh.token_is_subsumed(&token) => Ok(features),
267 Some(_features) if !self.tokens_to_refresh.token_is_subsumed(&token) => {
268 debug!("Token is not subsumed by any registered tokens. Returning error");
269 Err(EdgeError::InvalidToken)
270 }
271 _ => {
272 debug!("No features set available. Edge isn't ready");
273 Err(EdgeError::InvalidToken)
274 }
275 }
276 }
277
278 pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option<EntityTag>) {
280 if !self.tokens_to_refresh.contains_key(&token.token) {
281 self.unleash_client
282 .register_as_client(
283 token.token.clone(),
284 client_application_from_token_and_name(
285 token.clone(),
286 self.refresh_interval.num_seconds(),
287 self.client_meta_information.clone(),
288 ),
289 )
290 .await
291 .unwrap_or_default();
292 let mut registered_tokens: Vec<TokenRefresh> =
293 self.tokens_to_refresh.iter().map(|t| t.clone()).collect();
294 registered_tokens.push(TokenRefresh::new(token.clone(), etag));
295 let minimum = simplify(®istered_tokens);
296 let mut keys = HashSet::new();
297 for refreshes in minimum {
298 keys.insert(refreshes.token.token.clone());
299 self.tokens_to_refresh
300 .insert(refreshes.token.token.clone(), refreshes.clone());
301 }
302 self.tokens_to_refresh.retain(|key, _| keys.contains(key));
303 }
304 }
305
306 pub async fn hydrate_new_tokens(&self) {
307 for hydration in self.tokens_to_refresh.get_tokens_never_refreshed() {
308 self.refresh_single(hydration).await;
309 }
310 }
311
312 pub async fn refresh_features(&self) {
313 for refresh in self.tokens_to_refresh.get_tokens_due_for_refresh() {
314 self.refresh_single(refresh).await;
315 }
316 }
317
318 async fn handle_client_features_updated(
319 &self,
320 refresh_token: &EdgeToken,
321 features: ClientFeatures,
322 etag: Option<EntityTag>,
323 ) {
324 debug!("Got updated client features. Updating features with {etag:?}");
325 let key = cache_key(refresh_token);
326 self.tokens_to_refresh.update_last_refresh(
327 refresh_token,
328 etag,
329 features.features.len(),
330 &self.refresh_interval,
331 );
332 self.features_cache
333 .modify(key.clone(), refresh_token, features.clone());
334 self.engine_cache
335 .entry(key.clone())
336 .and_modify(|engine| {
337 if let Some(f) = self.features_cache.get(&key) {
338 let mut new_state = EngineState::default();
339 let warnings = new_state.take_state(UpdateMessage::FullResponse(f.clone()));
340 if let Some(warnings) = warnings {
341 warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
342 };
343 *engine = new_state;
344
345 }
346 })
347 .or_insert_with(|| {
348 let mut new_state = EngineState::default();
349
350 let warnings = new_state.take_state(UpdateMessage::FullResponse(features));
351 if let Some(warnings) = warnings {
352 warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
353 };
354 new_state
355 });
356 }
357
358 pub async fn refresh_single(&self, refresh: TokenRefresh) {
359 let features_result = self
360 .unleash_client
361 .get_client_features(ClientFeaturesRequest {
362 api_key: refresh.token.token.clone(),
363 etag: refresh.etag.clone(),
364 interval: Some(self.refresh_interval.num_milliseconds()),
365 })
366 .await;
367 match features_result {
368 Ok(feature_response) => match feature_response {
369 ClientFeaturesResponse::NoUpdate(tag) => {
370 debug!("No update needed. Will update last check time with {tag}");
371 self.tokens_to_refresh
372 .update_last_check(&refresh.token.clone(), &self.refresh_interval);
373 }
374 ClientFeaturesResponse::Updated(features, etag) => {
375 self.handle_client_features_updated(&refresh.token, features, etag)
376 .await;
377 }
378 },
379 Err(e) => {
380 match e {
381 EdgeError::ClientFeaturesFetchError(fe) => {
382 match fe {
383 FeatureError::Retriable(status_code) => match status_code {
384 StatusCode::INTERNAL_SERVER_ERROR
385 | StatusCode::BAD_GATEWAY
386 | StatusCode::SERVICE_UNAVAILABLE
387 | StatusCode::GATEWAY_TIMEOUT => {
388 info!(
389 "Upstream is having some problems, increasing my waiting period"
390 );
391 self.tokens_to_refresh
392 .backoff(&refresh.token, &self.refresh_interval);
393 }
394 StatusCode::TOO_MANY_REQUESTS => {
395 info!("Got told that upstream is receiving too many requests");
396 self.tokens_to_refresh
397 .backoff(&refresh.token, &self.refresh_interval);
398 }
399 _ => {
400 info!("Couldn't refresh features, but will retry next go")
401 }
402 },
403 FeatureError::AccessDenied => {
404 info!(
405 "Token used to fetch features was Forbidden, will remove from list of refresh tasks"
406 );
407 self.tokens_to_refresh.remove(&refresh.token.token);
408 if !self.tokens_to_refresh.iter().any(|e| {
409 e.value().token.environment == refresh.token.environment
410 }) {
411 let cache_key = cache_key(&refresh.token);
412 self.features_cache.remove(&cache_key);
414 self.engine_cache.remove(&cache_key);
415 }
416 }
417 FeatureError::NotFound => {
418 info!(
419 "Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"
420 );
421 self.tokens_to_refresh
422 .backoff(&refresh.token, &self.refresh_interval);
423 }
424 }
425 }
426 EdgeError::ClientCacheError => {
427 info!("Couldn't refresh features, but will retry next go")
428 }
429 _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
430 }
431 }
432 }
433 }
434
435 fn get_features_by_filter(
436 &self,
437 token: &EdgeToken,
438 filters: &FeatureFilterSet,
439 ) -> Option<ClientFeatures> {
440 self.features_cache
441 .get(&cache_key(token))
442 .map(|client_features| filter_client_features(&client_features, filters))
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use crate::TokenRefreshStatus;
449
450 use super::FeatureRefresher;
451 use chrono::{Duration, Utc};
452 use dashmap::DashMap;
453 use etag::EntityTag;
454 use reqwest::Url;
455 use std::sync::Arc;
456 use unleash_edge_feature_cache::FeatureCache;
457 use unleash_edge_http_client::{
458 ClientMetaInformation, HttpClientArgs, UnleashClient, new_reqwest_client,
459 };
460 use unleash_edge_types::TokenRefresh;
461 use unleash_edge_types::tokens::EdgeToken;
462
463 fn create_test_client() -> UnleashClient {
464 let http_client = new_reqwest_client(HttpClientArgs {
465 client_meta_information: ClientMetaInformation::test_config(),
466 ..Default::default()
467 })
468 .expect("Failed to create client");
469
470 UnleashClient::from_url_with_backing_client(
471 Url::parse("http://localhost:4242").unwrap(),
472 "Authorization".to_string(),
473 http_client,
474 ClientMetaInformation::test_config(),
475 )
476 }
477
478 #[tokio::test]
479 pub async fn registering_token_for_refresh_works() {
480 let unleash_client = create_test_client();
481 let features_cache = Arc::new(FeatureCache::default());
482 let engine_cache = Arc::new(DashMap::default());
483
484 let duration = Duration::seconds(5);
485 let feature_refresher = FeatureRefresher {
486 unleash_client: Arc::new(unleash_client),
487 features_cache,
488 engine_cache,
489 refresh_interval: duration,
490 ..Default::default()
491 };
492 let token =
493 EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
494 feature_refresher
495 .register_token_for_refresh(token, None)
496 .await;
497
498 assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
499 }
500
501 #[tokio::test]
502 pub async fn registering_multiple_tokens_with_same_environment_reduces_tokens_to_valid_minimal_set()
503 {
504 let unleash_client = create_test_client();
505 let features_cache = Arc::new(FeatureCache::default());
506 let engine_cache = Arc::new(DashMap::default());
507
508 let duration = Duration::seconds(5);
509 let feature_refresher = FeatureRefresher {
510 unleash_client: Arc::new(unleash_client),
511 features_cache,
512 engine_cache,
513 refresh_interval: duration,
514 ..Default::default()
515 };
516 let token1 =
517 EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
518 let token2 =
519 EdgeToken::try_from("*:development.zyxwvutsrqponmlkjihgfedcba".to_string()).unwrap();
520 feature_refresher
521 .register_token_for_refresh(token1, None)
522 .await;
523 feature_refresher
524 .register_token_for_refresh(token2, None)
525 .await;
526
527 assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
528 }
529
530 #[tokio::test]
531 pub async fn registering_multiple_non_overlapping_tokens_will_keep_all() {
532 let unleash_client = create_test_client();
533 let features_cache = Arc::new(FeatureCache::default());
534 let engine_cache = Arc::new(DashMap::default());
535 let duration = Duration::seconds(5);
536 let feature_refresher = FeatureRefresher {
537 unleash_client: Arc::new(unleash_client),
538 features_cache,
539 engine_cache,
540 refresh_interval: duration,
541 ..Default::default()
542 };
543 let project_a_token =
544 EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
545 .unwrap();
546 let project_b_token =
547 EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
548 .unwrap();
549 let project_c_token =
550 EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
551 .unwrap();
552 feature_refresher
553 .register_token_for_refresh(project_a_token, None)
554 .await;
555 feature_refresher
556 .register_token_for_refresh(project_b_token, None)
557 .await;
558 feature_refresher
559 .register_token_for_refresh(project_c_token, None)
560 .await;
561
562 assert_eq!(feature_refresher.tokens_to_refresh.len(), 3);
563 }
564
565 #[tokio::test]
566 pub async fn registering_wildcard_project_token_only_keeps_the_wildcard() {
567 let unleash_client = create_test_client();
568 let features_cache = Arc::new(FeatureCache::default());
569 let engine_cache = Arc::new(DashMap::default());
570 let duration = Duration::seconds(5);
571 let feature_refresher = FeatureRefresher {
572 unleash_client: Arc::new(unleash_client),
573 features_cache,
574 engine_cache,
575 refresh_interval: duration,
576 ..Default::default()
577 };
578 let project_a_token =
579 EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
580 .unwrap();
581 let project_b_token =
582 EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
583 .unwrap();
584 let project_c_token =
585 EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
586 .unwrap();
587 let wildcard_token =
588 EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
589
590 feature_refresher
591 .register_token_for_refresh(project_a_token, None)
592 .await;
593 feature_refresher
594 .register_token_for_refresh(project_b_token, None)
595 .await;
596 feature_refresher
597 .register_token_for_refresh(project_c_token, None)
598 .await;
599 feature_refresher
600 .register_token_for_refresh(wildcard_token, None)
601 .await;
602
603 assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
604 assert!(
605 feature_refresher
606 .tokens_to_refresh
607 .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")
608 )
609 }
610
611 #[tokio::test]
612 pub async fn registering_tokens_with_multiple_projects_overwrites_single_tokens() {
613 let unleash_client = create_test_client();
614 let features_cache = Arc::new(FeatureCache::default());
615 let engine_cache = Arc::new(DashMap::default());
616 let duration = Duration::seconds(5);
617 let feature_refresher = FeatureRefresher {
618 unleash_client: Arc::new(unleash_client),
619 features_cache,
620 engine_cache,
621 refresh_interval: duration,
622 ..Default::default()
623 };
624 let project_a_token =
625 EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
626 .unwrap();
627 let project_b_token =
628 EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
629 .unwrap();
630 let project_c_token =
631 EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
632 .unwrap();
633 let mut project_a_and_c_token =
634 EdgeToken::try_from("[]:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
635 project_a_and_c_token.projects = vec!["projecta".into(), "projectc".into()];
636
637 feature_refresher
638 .register_token_for_refresh(project_a_token, None)
639 .await;
640 feature_refresher
641 .register_token_for_refresh(project_b_token, None)
642 .await;
643 feature_refresher
644 .register_token_for_refresh(project_c_token, None)
645 .await;
646 feature_refresher
647 .register_token_for_refresh(project_a_and_c_token, None)
648 .await;
649
650 assert_eq!(feature_refresher.tokens_to_refresh.len(), 2);
651 assert!(
652 feature_refresher
653 .tokens_to_refresh
654 .contains_key("[]:development.abcdefghijklmnopqrstuvwxyz")
655 );
656 assert!(
657 feature_refresher
658 .tokens_to_refresh
659 .contains_key("projectb:development.abcdefghijklmnopqrstuvwxyz")
660 );
661 }
662
663 #[tokio::test]
664 pub async fn registering_a_token_that_is_already_subsumed_does_nothing() {
665 let unleash_client = create_test_client();
666 let features_cache = Arc::new(FeatureCache::default());
667 let engine_cache = Arc::new(DashMap::default());
668
669 let duration = Duration::seconds(5);
670 let feature_refresher = FeatureRefresher {
671 unleash_client: Arc::new(unleash_client),
672 features_cache,
673 engine_cache,
674 refresh_interval: duration,
675 ..Default::default()
676 };
677 let star_token =
678 EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
679 let project_a_token =
680 EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
681 .unwrap();
682
683 feature_refresher
684 .register_token_for_refresh(star_token, None)
685 .await;
686 feature_refresher
687 .register_token_for_refresh(project_a_token, None)
688 .await;
689
690 assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
691 assert!(
692 feature_refresher
693 .tokens_to_refresh
694 .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")
695 );
696 }
697
698 #[tokio::test]
699 pub async fn simplification_only_happens_in_same_environment() {
700 let unleash_client = create_test_client();
701 let features_cache = Arc::new(FeatureCache::default());
702 let engine_cache = Arc::new(DashMap::default());
703
704 let duration = Duration::seconds(5);
705 let feature_refresher = FeatureRefresher {
706 unleash_client: Arc::new(unleash_client),
707 features_cache,
708 engine_cache,
709 refresh_interval: duration,
710 ..Default::default()
711 };
712 let project_a_token =
713 EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
714 .unwrap();
715 let production_wildcard_token =
716 EdgeToken::try_from("*:production.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
717 feature_refresher
718 .register_token_for_refresh(project_a_token, None)
719 .await;
720 feature_refresher
721 .register_token_for_refresh(production_wildcard_token, None)
722 .await;
723 assert_eq!(feature_refresher.tokens_to_refresh.len(), 2);
724 }
725
726 #[tokio::test]
727 pub async fn is_able_to_only_fetch_for_tokens_due_to_refresh() {
728 let unleash_client = create_test_client();
729 let features_cache = Arc::new(FeatureCache::default());
730 let engine_cache = Arc::new(DashMap::default());
731 let tokens_to_refresh = Arc::new(DashMap::default());
732
733 let duration = Duration::seconds(5);
734 let feature_refresher = FeatureRefresher {
735 unleash_client: Arc::new(unleash_client),
736 features_cache,
737 engine_cache,
738 tokens_to_refresh: tokens_to_refresh.clone(),
739 refresh_interval: duration,
740 ..Default::default()
741 };
742 let no_etag_due_for_refresh_token =
743 EdgeToken::try_from("projecta:development.no_etag_due_for_refresh_token".to_string())
744 .unwrap();
745 let no_etag_so_is_due_for_refresh = TokenRefresh {
746 token: no_etag_due_for_refresh_token,
747 etag: None,
748 next_refresh: None,
749 last_refreshed: None,
750 last_check: None,
751 failure_count: 0,
752 last_feature_count: None,
753 };
754 let etag_and_last_refreshed_token =
755 EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string())
756 .unwrap();
757 let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh {
758 token: etag_and_last_refreshed_token,
759 etag: Some(EntityTag::new(true, "abcde")),
760 next_refresh: Some(Utc::now() + Duration::seconds(10)),
761 last_refreshed: Some(Utc::now()),
762 last_check: Some(Utc::now()),
763 failure_count: 0,
764 last_feature_count: None,
765 };
766 let etag_but_old_token =
767 EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap();
768
769 let ten_seconds_ago = Utc::now() - Duration::seconds(10);
770 let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh {
771 token: etag_but_old_token,
772 etag: Some(EntityTag::new(true, "abcde")),
773 next_refresh: None,
774 last_refreshed: Some(ten_seconds_ago),
775 last_check: Some(ten_seconds_ago),
776 failure_count: 0,
777 last_feature_count: None,
778 };
779 feature_refresher.tokens_to_refresh.insert(
780 etag_but_last_refreshed_ten_seconds_ago.token.token.clone(),
781 etag_but_last_refreshed_ten_seconds_ago.clone(),
782 );
783 feature_refresher.tokens_to_refresh.insert(
784 etag_and_last_refreshed_less_than_duration_ago
785 .token
786 .token
787 .clone(),
788 etag_and_last_refreshed_less_than_duration_ago,
789 );
790 feature_refresher.tokens_to_refresh.insert(
791 no_etag_so_is_due_for_refresh.token.token.clone(),
792 no_etag_so_is_due_for_refresh.clone(),
793 );
794 let tokens_to_refresh = tokens_to_refresh.get_tokens_due_for_refresh();
795 assert_eq!(tokens_to_refresh.len(), 2);
796 assert!(tokens_to_refresh.contains(&etag_but_last_refreshed_ten_seconds_ago));
797 assert!(tokens_to_refresh.contains(&no_etag_so_is_due_for_refresh));
798 }
799}