unleash_edge_feature_refresh/
lib.rs

1use std::collections::HashSet;
2use std::{sync::Arc, time::Duration};
3
4pub mod delta_refresh;
5
6use chrono::{TimeDelta, Utc};
7use dashmap::DashMap;
8use etag::EntityTag;
9use reqwest::StatusCode;
10use tracing::{debug, info, warn};
11use unleash_edge_delta::cache_manager::DeltaCacheManager;
12use unleash_edge_feature_cache::FeatureCache;
13use unleash_edge_feature_filters::{FeatureFilterSet, filter_client_features};
14use unleash_edge_http_client::{ClientMetaInformation, UnleashClient};
15use unleash_edge_persistence::EdgePersistence;
16use unleash_edge_types::errors::{EdgeError, FeatureError};
17use unleash_edge_types::tokens::{EdgeToken, cache_key, simplify};
18use unleash_edge_types::{
19    ClientFeaturesRequest, ClientFeaturesResponse, EdgeResult, TokenRefresh, build,
20};
21use unleash_types::client_features::ClientFeatures;
22use unleash_types::client_metrics::{ClientApplication, MetricsMetadata, SdkType};
23use unleash_yggdrasil::{EngineState, UpdateMessage};
24
25use crate::delta_refresh::DeltaRefresher;
26
27pub fn frontend_token_is_covered_by_tokens(
28    frontend_token: &EdgeToken,
29    tokens_to_refresh: Arc<DashMap<String, TokenRefresh>>,
30) -> bool {
31    tokens_to_refresh.iter().any(|client_token| {
32        client_token
33            .token
34            .same_environment_and_broader_or_equal_project_access(frontend_token)
35    })
36}
37
38#[derive(Clone)]
39pub enum HydratorType {
40    Streaming(Arc<DeltaRefresher>),
41    Polling(Arc<FeatureRefresher>),
42}
43
44impl HydratorType {
45    pub async fn hydrate_new_tokens(&self) {
46        match self {
47            HydratorType::Streaming(delta_refresher) => delta_refresher.hydrate_new_tokens().await,
48            HydratorType::Polling(feature_refresher) => {
49                feature_refresher.hydrate_new_tokens().await
50            }
51        }
52    }
53
54    pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option<EntityTag>) {
55        match self {
56            HydratorType::Streaming(delta_refresher) => {
57                delta_refresher
58                    .register_token_for_refresh(token, etag)
59                    .await
60            }
61            HydratorType::Polling(feature_refresher) => {
62                feature_refresher
63                    .register_token_for_refresh(token, etag)
64                    .await
65            }
66        }
67    }
68
69    pub fn tokens_to_refresh(self) -> TokenRefreshSet {
70        match self {
71            HydratorType::Streaming(delta_refresher) => delta_refresher.tokens_to_refresh.clone(),
72            HydratorType::Polling(feature_refresher) => feature_refresher.tokens_to_refresh.clone(),
73        }
74    }
75}
76
77type TokenRefreshSet = Arc<DashMap<String, TokenRefresh>>;
78
79trait TokenRefreshStatus {
80    fn get_tokens_due_for_refresh(&self) -> Vec<TokenRefresh>;
81    fn get_tokens_never_refreshed(&self) -> Vec<TokenRefresh>;
82    fn token_is_subsumed(&self, token: &EdgeToken) -> bool;
83    fn backoff(&self, token: &EdgeToken, refresh_interval: &TimeDelta);
84    fn update_last_refresh(
85        &self,
86        token: &EdgeToken,
87        etag: Option<EntityTag>,
88        feature_count: usize,
89        refresh_interval: &TimeDelta,
90    );
91    fn update_last_check(&self, token: &EdgeToken, refresh_interval: &TimeDelta);
92}
93
94impl TokenRefreshStatus for TokenRefreshSet {
95    fn get_tokens_due_for_refresh(&self) -> Vec<TokenRefresh> {
96        self.iter()
97            .map(|e| e.value().clone())
98            .filter(|token| {
99                token
100                    .next_refresh
101                    .map(|refresh| Utc::now() > refresh)
102                    .unwrap_or(true)
103            })
104            .collect()
105    }
106
107    fn get_tokens_never_refreshed(&self) -> Vec<TokenRefresh> {
108        self.iter()
109            .map(|e| e.value().clone())
110            .filter(|token| token.last_refreshed.is_none() && token.last_check.is_none())
111            .collect()
112    }
113
114    fn token_is_subsumed(&self, token: &EdgeToken) -> bool {
115        self.iter()
116            .filter(|r| r.token.environment == token.environment)
117            .any(|t| t.token.subsumes(token))
118    }
119
120    fn backoff(&self, token: &EdgeToken, refresh_interval: &TimeDelta) {
121        self.alter(&token.token, |_k, old_refresh| {
122            old_refresh.backoff(refresh_interval)
123        });
124    }
125
126    fn update_last_refresh(
127        &self,
128        token: &EdgeToken,
129        etag: Option<EntityTag>,
130        feature_count: usize,
131        refresh_interval: &TimeDelta,
132    ) {
133        self.alter(&token.token, |_k, old_refresh| {
134            old_refresh.successful_refresh(refresh_interval, etag, feature_count)
135        });
136    }
137
138    fn update_last_check(&self, token: &EdgeToken, refresh_interval: &TimeDelta) {
139        self.alter(&token.token, |_k, old_refresh| {
140            old_refresh.successful_check(refresh_interval)
141        });
142    }
143}
144
145#[derive(Clone)]
146pub struct FeatureRefresher {
147    pub unleash_client: Arc<UnleashClient>,
148    pub tokens_to_refresh: TokenRefreshSet,
149    pub features_cache: Arc<FeatureCache>,
150    pub delta_cache_manager: Arc<DeltaCacheManager>,
151    pub engine_cache: Arc<DashMap<String, EngineState>>,
152    pub refresh_interval: chrono::Duration,
153    pub persistence: Option<Arc<dyn EdgePersistence>>,
154    pub client_meta_information: ClientMetaInformation,
155}
156
157impl Default for FeatureRefresher {
158    fn default() -> Self {
159        Self {
160            refresh_interval: chrono::Duration::seconds(15),
161            unleash_client: Default::default(),
162            tokens_to_refresh: Arc::new(DashMap::default()),
163            features_cache: Arc::new(Default::default()),
164            delta_cache_manager: Arc::new(DeltaCacheManager::new()),
165            engine_cache: Default::default(),
166            persistence: None,
167            client_meta_information: Default::default(),
168        }
169    }
170}
171
172fn client_application_from_token_and_name(
173    token: EdgeToken,
174    refresh_interval: i64,
175    client_meta_information: ClientMetaInformation,
176) -> ClientApplication {
177    ClientApplication {
178        app_name: client_meta_information.app_name,
179        connect_via: None,
180        environment: token.environment,
181        projects: Some(token.projects),
182        instance_id: Some(client_meta_information.instance_id),
183        connection_id: Some(client_meta_information.connection_id),
184        interval: refresh_interval as u32,
185        started: Utc::now(),
186        strategies: vec![],
187        metadata: MetricsMetadata {
188            platform_name: None,
189            platform_version: None,
190            sdk_version: Some(format!("unleash-edge:{}", build::PKG_VERSION)),
191            sdk_type: Some(SdkType::Backend),
192            yggdrasil_version: None,
193        },
194    }
195}
196
197pub async fn start_refresh_features_background_task(refresher: Arc<FeatureRefresher>) {
198    loop {
199        tokio::select! {
200            _ = tokio::time::sleep(Duration::from_secs(5)) => {
201                refresher.refresh_features().await;
202            }
203        }
204    }
205}
206
207pub struct FeatureRefreshConfig {
208    features_refresh_interval: chrono::Duration,
209    client_meta_information: ClientMetaInformation,
210}
211
212impl FeatureRefreshConfig {
213    pub fn new(
214        features_refresh_interval: chrono::Duration,
215        client_meta_information: ClientMetaInformation,
216    ) -> Self {
217        Self {
218            features_refresh_interval,
219            client_meta_information,
220        }
221    }
222}
223
224impl FeatureRefresher {
225    pub fn new(
226        unleash_client: Arc<UnleashClient>,
227        features_cache: Arc<FeatureCache>,
228        delta_cache_manager: Arc<DeltaCacheManager>,
229        engines: Arc<DashMap<String, EngineState>>,
230        persistence: Option<Arc<dyn EdgePersistence>>,
231        config: FeatureRefreshConfig,
232    ) -> Self {
233        FeatureRefresher {
234            unleash_client,
235            tokens_to_refresh: Arc::new(DashMap::default()),
236            features_cache,
237            delta_cache_manager,
238            engine_cache: engines,
239            refresh_interval: config.features_refresh_interval,
240            persistence,
241            client_meta_information: config.client_meta_information,
242        }
243    }
244
245    pub fn with_client(client: Arc<UnleashClient>) -> Self {
246        Self {
247            unleash_client: client,
248            ..Default::default()
249        }
250    }
251
252    /// This method no longer returns any data. Its responsibility lies in adding the token to our
253    /// list of tokens to perform refreshes for, as well as calling out to hydrate tokens that we haven't seen before.
254    /// Other tokens will be refreshed due to the scheduled task that refreshes tokens that haven been refreshed in ${refresh_interval} seconds
255    pub async fn register_and_hydrate_token(&self, token: &EdgeToken) {
256        self.register_token_for_refresh(token.clone(), None).await;
257        self.hydrate_new_tokens().await;
258    }
259
260    pub fn features_for_filter(
261        &self,
262        token: EdgeToken,
263        filters: &FeatureFilterSet,
264    ) -> EdgeResult<ClientFeatures> {
265        match self.get_features_by_filter(&token, filters) {
266            Some(features) if self.tokens_to_refresh.token_is_subsumed(&token) => Ok(features),
267            Some(_features) if !self.tokens_to_refresh.token_is_subsumed(&token) => {
268                debug!("Token is not subsumed by any registered tokens. Returning error");
269                Err(EdgeError::InvalidToken)
270            }
271            _ => {
272                debug!("No features set available. Edge isn't ready");
273                Err(EdgeError::InvalidToken)
274            }
275        }
276    }
277
278    /// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token
279    pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option<EntityTag>) {
280        if !self.tokens_to_refresh.contains_key(&token.token) {
281            self.unleash_client
282                .register_as_client(
283                    token.token.clone(),
284                    client_application_from_token_and_name(
285                        token.clone(),
286                        self.refresh_interval.num_seconds(),
287                        self.client_meta_information.clone(),
288                    ),
289                )
290                .await
291                .unwrap_or_default();
292            let mut registered_tokens: Vec<TokenRefresh> =
293                self.tokens_to_refresh.iter().map(|t| t.clone()).collect();
294            registered_tokens.push(TokenRefresh::new(token.clone(), etag));
295            let minimum = simplify(&registered_tokens);
296            let mut keys = HashSet::new();
297            for refreshes in minimum {
298                keys.insert(refreshes.token.token.clone());
299                self.tokens_to_refresh
300                    .insert(refreshes.token.token.clone(), refreshes.clone());
301            }
302            self.tokens_to_refresh.retain(|key, _| keys.contains(key));
303        }
304    }
305
306    pub async fn hydrate_new_tokens(&self) {
307        for hydration in self.tokens_to_refresh.get_tokens_never_refreshed() {
308            self.refresh_single(hydration).await;
309        }
310    }
311
312    pub async fn refresh_features(&self) {
313        for refresh in self.tokens_to_refresh.get_tokens_due_for_refresh() {
314            self.refresh_single(refresh).await;
315        }
316    }
317
318    async fn handle_client_features_updated(
319        &self,
320        refresh_token: &EdgeToken,
321        features: ClientFeatures,
322        etag: Option<EntityTag>,
323    ) {
324        debug!("Got updated client features. Updating features with {etag:?}");
325        let key = cache_key(refresh_token);
326        self.tokens_to_refresh.update_last_refresh(
327            refresh_token,
328            etag,
329            features.features.len(),
330            &self.refresh_interval,
331        );
332        self.features_cache
333            .modify(key.clone(), refresh_token, features.clone());
334        self.engine_cache
335            .entry(key.clone())
336            .and_modify(|engine| {
337                if let Some(f) = self.features_cache.get(&key) {
338                    let mut new_state = EngineState::default();
339                    let warnings = new_state.take_state(UpdateMessage::FullResponse(f.clone()));
340                    if let Some(warnings) = warnings {
341                        warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
342                    };
343                    *engine = new_state;
344
345                }
346            })
347            .or_insert_with(|| {
348                let mut new_state = EngineState::default();
349
350                let warnings = new_state.take_state(UpdateMessage::FullResponse(features));
351                if let Some(warnings) = warnings {
352                    warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
353                };
354                new_state
355            });
356    }
357
358    pub async fn refresh_single(&self, refresh: TokenRefresh) {
359        let features_result = self
360            .unleash_client
361            .get_client_features(ClientFeaturesRequest {
362                api_key: refresh.token.token.clone(),
363                etag: refresh.etag.clone(),
364                interval: Some(self.refresh_interval.num_milliseconds()),
365            })
366            .await;
367        match features_result {
368            Ok(feature_response) => match feature_response {
369                ClientFeaturesResponse::NoUpdate(tag) => {
370                    debug!("No update needed. Will update last check time with {tag}");
371                    self.tokens_to_refresh
372                        .update_last_check(&refresh.token.clone(), &self.refresh_interval);
373                }
374                ClientFeaturesResponse::Updated(features, etag) => {
375                    self.handle_client_features_updated(&refresh.token, features, etag)
376                        .await;
377                }
378            },
379            Err(e) => {
380                match e {
381                    EdgeError::ClientFeaturesFetchError(fe) => {
382                        match fe {
383                            FeatureError::Retriable(status_code) => match status_code {
384                                StatusCode::INTERNAL_SERVER_ERROR
385                                | StatusCode::BAD_GATEWAY
386                                | StatusCode::SERVICE_UNAVAILABLE
387                                | StatusCode::GATEWAY_TIMEOUT => {
388                                    info!(
389                                        "Upstream is having some problems, increasing my waiting period"
390                                    );
391                                    self.tokens_to_refresh
392                                        .backoff(&refresh.token, &self.refresh_interval);
393                                }
394                                StatusCode::TOO_MANY_REQUESTS => {
395                                    info!("Got told that upstream is receiving too many requests");
396                                    self.tokens_to_refresh
397                                        .backoff(&refresh.token, &self.refresh_interval);
398                                }
399                                _ => {
400                                    info!("Couldn't refresh features, but will retry next go")
401                                }
402                            },
403                            FeatureError::AccessDenied => {
404                                info!(
405                                    "Token used to fetch features was Forbidden, will remove from list of refresh tasks"
406                                );
407                                self.tokens_to_refresh.remove(&refresh.token.token);
408                                if !self.tokens_to_refresh.iter().any(|e| {
409                                    e.value().token.environment == refresh.token.environment
410                                }) {
411                                    let cache_key = cache_key(&refresh.token);
412                                    // No tokens left that access the environment of our current refresh. Deleting client features and engine cache
413                                    self.features_cache.remove(&cache_key);
414                                    self.engine_cache.remove(&cache_key);
415                                }
416                            }
417                            FeatureError::NotFound => {
418                                info!(
419                                    "Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"
420                                );
421                                self.tokens_to_refresh
422                                    .backoff(&refresh.token, &self.refresh_interval);
423                            }
424                        }
425                    }
426                    EdgeError::ClientCacheError => {
427                        info!("Couldn't refresh features, but will retry next go")
428                    }
429                    _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
430                }
431            }
432        }
433    }
434
435    fn get_features_by_filter(
436        &self,
437        token: &EdgeToken,
438        filters: &FeatureFilterSet,
439    ) -> Option<ClientFeatures> {
440        self.features_cache
441            .get(&cache_key(token))
442            .map(|client_features| filter_client_features(&client_features, filters))
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use crate::TokenRefreshStatus;
449
450    use super::FeatureRefresher;
451    use chrono::{Duration, Utc};
452    use dashmap::DashMap;
453    use etag::EntityTag;
454    use reqwest::Url;
455    use std::sync::Arc;
456    use unleash_edge_feature_cache::FeatureCache;
457    use unleash_edge_http_client::{
458        ClientMetaInformation, HttpClientArgs, UnleashClient, new_reqwest_client,
459    };
460    use unleash_edge_types::TokenRefresh;
461    use unleash_edge_types::tokens::EdgeToken;
462
463    fn create_test_client() -> UnleashClient {
464        let http_client = new_reqwest_client(HttpClientArgs {
465            client_meta_information: ClientMetaInformation::test_config(),
466            ..Default::default()
467        })
468        .expect("Failed to create client");
469
470        UnleashClient::from_url_with_backing_client(
471            Url::parse("http://localhost:4242").unwrap(),
472            "Authorization".to_string(),
473            http_client,
474            ClientMetaInformation::test_config(),
475        )
476    }
477
478    #[tokio::test]
479    pub async fn registering_token_for_refresh_works() {
480        let unleash_client = create_test_client();
481        let features_cache = Arc::new(FeatureCache::default());
482        let engine_cache = Arc::new(DashMap::default());
483
484        let duration = Duration::seconds(5);
485        let feature_refresher = FeatureRefresher {
486            unleash_client: Arc::new(unleash_client),
487            features_cache,
488            engine_cache,
489            refresh_interval: duration,
490            ..Default::default()
491        };
492        let token =
493            EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
494        feature_refresher
495            .register_token_for_refresh(token, None)
496            .await;
497
498        assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
499    }
500
501    #[tokio::test]
502    pub async fn registering_multiple_tokens_with_same_environment_reduces_tokens_to_valid_minimal_set()
503     {
504        let unleash_client = create_test_client();
505        let features_cache = Arc::new(FeatureCache::default());
506        let engine_cache = Arc::new(DashMap::default());
507
508        let duration = Duration::seconds(5);
509        let feature_refresher = FeatureRefresher {
510            unleash_client: Arc::new(unleash_client),
511            features_cache,
512            engine_cache,
513            refresh_interval: duration,
514            ..Default::default()
515        };
516        let token1 =
517            EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
518        let token2 =
519            EdgeToken::try_from("*:development.zyxwvutsrqponmlkjihgfedcba".to_string()).unwrap();
520        feature_refresher
521            .register_token_for_refresh(token1, None)
522            .await;
523        feature_refresher
524            .register_token_for_refresh(token2, None)
525            .await;
526
527        assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
528    }
529
530    #[tokio::test]
531    pub async fn registering_multiple_non_overlapping_tokens_will_keep_all() {
532        let unleash_client = create_test_client();
533        let features_cache = Arc::new(FeatureCache::default());
534        let engine_cache = Arc::new(DashMap::default());
535        let duration = Duration::seconds(5);
536        let feature_refresher = FeatureRefresher {
537            unleash_client: Arc::new(unleash_client),
538            features_cache,
539            engine_cache,
540            refresh_interval: duration,
541            ..Default::default()
542        };
543        let project_a_token =
544            EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
545                .unwrap();
546        let project_b_token =
547            EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
548                .unwrap();
549        let project_c_token =
550            EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
551                .unwrap();
552        feature_refresher
553            .register_token_for_refresh(project_a_token, None)
554            .await;
555        feature_refresher
556            .register_token_for_refresh(project_b_token, None)
557            .await;
558        feature_refresher
559            .register_token_for_refresh(project_c_token, None)
560            .await;
561
562        assert_eq!(feature_refresher.tokens_to_refresh.len(), 3);
563    }
564
565    #[tokio::test]
566    pub async fn registering_wildcard_project_token_only_keeps_the_wildcard() {
567        let unleash_client = create_test_client();
568        let features_cache = Arc::new(FeatureCache::default());
569        let engine_cache = Arc::new(DashMap::default());
570        let duration = Duration::seconds(5);
571        let feature_refresher = FeatureRefresher {
572            unleash_client: Arc::new(unleash_client),
573            features_cache,
574            engine_cache,
575            refresh_interval: duration,
576            ..Default::default()
577        };
578        let project_a_token =
579            EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
580                .unwrap();
581        let project_b_token =
582            EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
583                .unwrap();
584        let project_c_token =
585            EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
586                .unwrap();
587        let wildcard_token =
588            EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
589
590        feature_refresher
591            .register_token_for_refresh(project_a_token, None)
592            .await;
593        feature_refresher
594            .register_token_for_refresh(project_b_token, None)
595            .await;
596        feature_refresher
597            .register_token_for_refresh(project_c_token, None)
598            .await;
599        feature_refresher
600            .register_token_for_refresh(wildcard_token, None)
601            .await;
602
603        assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
604        assert!(
605            feature_refresher
606                .tokens_to_refresh
607                .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")
608        )
609    }
610
611    #[tokio::test]
612    pub async fn registering_tokens_with_multiple_projects_overwrites_single_tokens() {
613        let unleash_client = create_test_client();
614        let features_cache = Arc::new(FeatureCache::default());
615        let engine_cache = Arc::new(DashMap::default());
616        let duration = Duration::seconds(5);
617        let feature_refresher = FeatureRefresher {
618            unleash_client: Arc::new(unleash_client),
619            features_cache,
620            engine_cache,
621            refresh_interval: duration,
622            ..Default::default()
623        };
624        let project_a_token =
625            EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
626                .unwrap();
627        let project_b_token =
628            EdgeToken::try_from("projectb:development.abcdefghijklmnopqrstuvwxyz".to_string())
629                .unwrap();
630        let project_c_token =
631            EdgeToken::try_from("projectc:development.abcdefghijklmnopqrstuvwxyz".to_string())
632                .unwrap();
633        let mut project_a_and_c_token =
634            EdgeToken::try_from("[]:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
635        project_a_and_c_token.projects = vec!["projecta".into(), "projectc".into()];
636
637        feature_refresher
638            .register_token_for_refresh(project_a_token, None)
639            .await;
640        feature_refresher
641            .register_token_for_refresh(project_b_token, None)
642            .await;
643        feature_refresher
644            .register_token_for_refresh(project_c_token, None)
645            .await;
646        feature_refresher
647            .register_token_for_refresh(project_a_and_c_token, None)
648            .await;
649
650        assert_eq!(feature_refresher.tokens_to_refresh.len(), 2);
651        assert!(
652            feature_refresher
653                .tokens_to_refresh
654                .contains_key("[]:development.abcdefghijklmnopqrstuvwxyz")
655        );
656        assert!(
657            feature_refresher
658                .tokens_to_refresh
659                .contains_key("projectb:development.abcdefghijklmnopqrstuvwxyz")
660        );
661    }
662
663    #[tokio::test]
664    pub async fn registering_a_token_that_is_already_subsumed_does_nothing() {
665        let unleash_client = create_test_client();
666        let features_cache = Arc::new(FeatureCache::default());
667        let engine_cache = Arc::new(DashMap::default());
668
669        let duration = Duration::seconds(5);
670        let feature_refresher = FeatureRefresher {
671            unleash_client: Arc::new(unleash_client),
672            features_cache,
673            engine_cache,
674            refresh_interval: duration,
675            ..Default::default()
676        };
677        let star_token =
678            EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
679        let project_a_token =
680            EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
681                .unwrap();
682
683        feature_refresher
684            .register_token_for_refresh(star_token, None)
685            .await;
686        feature_refresher
687            .register_token_for_refresh(project_a_token, None)
688            .await;
689
690        assert_eq!(feature_refresher.tokens_to_refresh.len(), 1);
691        assert!(
692            feature_refresher
693                .tokens_to_refresh
694                .contains_key("*:development.abcdefghijklmnopqrstuvwxyz")
695        );
696    }
697
698    #[tokio::test]
699    pub async fn simplification_only_happens_in_same_environment() {
700        let unleash_client = create_test_client();
701        let features_cache = Arc::new(FeatureCache::default());
702        let engine_cache = Arc::new(DashMap::default());
703
704        let duration = Duration::seconds(5);
705        let feature_refresher = FeatureRefresher {
706            unleash_client: Arc::new(unleash_client),
707            features_cache,
708            engine_cache,
709            refresh_interval: duration,
710            ..Default::default()
711        };
712        let project_a_token =
713            EdgeToken::try_from("projecta:development.abcdefghijklmnopqrstuvwxyz".to_string())
714                .unwrap();
715        let production_wildcard_token =
716            EdgeToken::try_from("*:production.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
717        feature_refresher
718            .register_token_for_refresh(project_a_token, None)
719            .await;
720        feature_refresher
721            .register_token_for_refresh(production_wildcard_token, None)
722            .await;
723        assert_eq!(feature_refresher.tokens_to_refresh.len(), 2);
724    }
725
726    #[tokio::test]
727    pub async fn is_able_to_only_fetch_for_tokens_due_to_refresh() {
728        let unleash_client = create_test_client();
729        let features_cache = Arc::new(FeatureCache::default());
730        let engine_cache = Arc::new(DashMap::default());
731        let tokens_to_refresh = Arc::new(DashMap::default());
732
733        let duration = Duration::seconds(5);
734        let feature_refresher = FeatureRefresher {
735            unleash_client: Arc::new(unleash_client),
736            features_cache,
737            engine_cache,
738            tokens_to_refresh: tokens_to_refresh.clone(),
739            refresh_interval: duration,
740            ..Default::default()
741        };
742        let no_etag_due_for_refresh_token =
743            EdgeToken::try_from("projecta:development.no_etag_due_for_refresh_token".to_string())
744                .unwrap();
745        let no_etag_so_is_due_for_refresh = TokenRefresh {
746            token: no_etag_due_for_refresh_token,
747            etag: None,
748            next_refresh: None,
749            last_refreshed: None,
750            last_check: None,
751            failure_count: 0,
752            last_feature_count: None,
753        };
754        let etag_and_last_refreshed_token =
755            EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string())
756                .unwrap();
757        let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh {
758            token: etag_and_last_refreshed_token,
759            etag: Some(EntityTag::new(true, "abcde")),
760            next_refresh: Some(Utc::now() + Duration::seconds(10)),
761            last_refreshed: Some(Utc::now()),
762            last_check: Some(Utc::now()),
763            failure_count: 0,
764            last_feature_count: None,
765        };
766        let etag_but_old_token =
767            EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap();
768
769        let ten_seconds_ago = Utc::now() - Duration::seconds(10);
770        let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh {
771            token: etag_but_old_token,
772            etag: Some(EntityTag::new(true, "abcde")),
773            next_refresh: None,
774            last_refreshed: Some(ten_seconds_ago),
775            last_check: Some(ten_seconds_ago),
776            failure_count: 0,
777            last_feature_count: None,
778        };
779        feature_refresher.tokens_to_refresh.insert(
780            etag_but_last_refreshed_ten_seconds_ago.token.token.clone(),
781            etag_but_last_refreshed_ten_seconds_ago.clone(),
782        );
783        feature_refresher.tokens_to_refresh.insert(
784            etag_and_last_refreshed_less_than_duration_ago
785                .token
786                .token
787                .clone(),
788            etag_and_last_refreshed_less_than_duration_ago,
789        );
790        feature_refresher.tokens_to_refresh.insert(
791            no_etag_so_is_due_for_refresh.token.token.clone(),
792            no_etag_so_is_due_for_refresh.clone(),
793        );
794        let tokens_to_refresh = tokens_to_refresh.get_tokens_due_for_refresh();
795        assert_eq!(tokens_to_refresh.len(), 2);
796        assert!(tokens_to_refresh.contains(&etag_but_last_refreshed_ten_seconds_ago));
797        assert!(tokens_to_refresh.contains(&no_etag_so_is_due_for_refresh));
798    }
799}