unleash_edge_persistence/
lib.rs

1use ahash::HashMap;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{debug, warn};
8use unleash_edge_feature_cache::FeatureCache;
9use unleash_edge_types::tokens::EdgeToken;
10use unleash_edge_types::{EdgeResult, TokenValidationStatus};
11use unleash_types::client_features::ClientFeatures;
12
13pub mod file;
14pub mod redis;
15pub mod s3;
16
17#[async_trait]
18pub trait EdgePersistence: Send + Sync {
19    async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>>;
20    async fn save_tokens(&self, tokens: Vec<EdgeToken>) -> EdgeResult<()>;
21    async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>>;
22    async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()>;
23}
24
25async fn persist(
26    persistence: Arc<dyn EdgePersistence>,
27    token_cache: Arc<DashMap<String, EdgeToken>>,
28    features_cache: Arc<FeatureCache>,
29) {
30    save_known_tokens(&token_cache, &persistence).await;
31    save_features(&features_cache, &persistence).await;
32}
33
34pub fn create_persist_data_task(
35    persistence: Arc<dyn EdgePersistence>,
36    token_cache: Arc<DashMap<String, EdgeToken>>,
37    features_cache: Arc<FeatureCache>,
38) -> Pin<Box<dyn Future<Output = ()> + Send>> {
39    Box::pin(async move {
40        loop {
41            tokio::select! {
42                _ = tokio::time::sleep(Duration::from_secs(60)) => {
43                    persist(
44                        persistence.clone(),
45                        token_cache.clone(),
46                        features_cache.clone()
47                    ).await;
48                }
49            }
50        }
51    })
52}
53
54pub fn create_once_off_persist(
55    persistence: Arc<dyn EdgePersistence>,
56    token_cache: Arc<DashMap<String, EdgeToken>>,
57    features_cache: Arc<FeatureCache>,
58) -> Pin<Box<dyn Future<Output = ()> + Send>> {
59    let token_cache = token_cache.clone();
60    let features_cache = features_cache.clone();
61    let persistence = persistence.clone();
62    Box::pin(async move { persist(persistence, token_cache, features_cache).await })
63}
64
65async fn save_known_tokens(
66    token_cache: &Arc<DashMap<String, EdgeToken>>,
67    persister: &Arc<dyn EdgePersistence>,
68) {
69    if !token_cache.is_empty() {
70        match persister
71            .save_tokens(
72                token_cache
73                    .iter()
74                    .filter(|t| t.value().status == TokenValidationStatus::Validated)
75                    .map(|e| e.value().clone())
76                    .collect(),
77            )
78            .await
79        {
80            Ok(()) => debug!("Persisted tokens"),
81            Err(save_error) => warn!("Could not persist tokens: {save_error:?}"),
82        }
83    } else {
84        debug!("No validated tokens found, skipping tokens persistence");
85    }
86}
87
88async fn save_features(features_cache: &FeatureCache, persister: &Arc<dyn EdgePersistence>) {
89    if !features_cache.is_empty() {
90        match persister
91            .save_features(
92                features_cache
93                    .iter()
94                    .map(|e| (e.key().clone(), e.value().clone()))
95                    .collect(),
96            )
97            .await
98        {
99            Ok(()) => debug!("Persisted features"),
100            Err(save_error) => warn!("Could not persist features: {save_error:?}"),
101        }
102    } else {
103        debug!("No features found, skipping features persistence");
104    }
105}
106
107#[cfg(test)]
108pub mod tests {
109    use super::*;
110
111    struct MockPersistence {}
112
113    fn build_mock_persistence() -> Arc<dyn EdgePersistence> {
114        Arc::new(MockPersistence {})
115    }
116
117    #[async_trait]
118    impl EdgePersistence for MockPersistence {
119        async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
120            panic!("Not expected to be called");
121        }
122
123        async fn save_tokens(&self, _: Vec<EdgeToken>) -> EdgeResult<()> {
124            panic!("Not expected to be called");
125        }
126
127        async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
128            panic!("Not expected to be called");
129        }
130
131        async fn save_features(&self, _: Vec<(String, ClientFeatures)>) -> EdgeResult<()> {
132            panic!("Not expected to be called");
133        }
134    }
135
136    #[tokio::test]
137    async fn persistence_ignores_empty_feature_sets() {
138        let cache: DashMap<String, ClientFeatures> = DashMap::new();
139        let persister = build_mock_persistence();
140
141        save_features(&Arc::new(FeatureCache::new(cache)), &persister.clone()).await;
142    }
143
144    #[tokio::test]
145    async fn persistence_ignores_empty_token_sets() {
146        let cache: DashMap<String, EdgeToken> = DashMap::new();
147        let persister = build_mock_persistence();
148
149        save_known_tokens(&Arc::new(cache), &persister.clone()).await;
150    }
151}