unleash_edge_persistence/
lib.rs1use ahash::HashMap;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{debug, warn};
8use unleash_edge_feature_cache::FeatureCache;
9use unleash_edge_types::tokens::EdgeToken;
10use unleash_edge_types::{EdgeResult, TokenValidationStatus};
11use unleash_types::client_features::ClientFeatures;
12
13pub mod file;
14pub mod redis;
15pub mod s3;
16
17#[async_trait]
18pub trait EdgePersistence: Send + Sync {
19 async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>>;
20 async fn save_tokens(&self, tokens: Vec<EdgeToken>) -> EdgeResult<()>;
21 async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>>;
22 async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()>;
23}
24
25async fn persist(
26 persistence: Arc<dyn EdgePersistence>,
27 token_cache: Arc<DashMap<String, EdgeToken>>,
28 features_cache: Arc<FeatureCache>,
29) {
30 save_known_tokens(&token_cache, &persistence).await;
31 save_features(&features_cache, &persistence).await;
32}
33
34pub fn create_persist_data_task(
35 persistence: Arc<dyn EdgePersistence>,
36 token_cache: Arc<DashMap<String, EdgeToken>>,
37 features_cache: Arc<FeatureCache>,
38) -> Pin<Box<dyn Future<Output = ()> + Send>> {
39 Box::pin(async move {
40 loop {
41 tokio::select! {
42 _ = tokio::time::sleep(Duration::from_secs(60)) => {
43 persist(
44 persistence.clone(),
45 token_cache.clone(),
46 features_cache.clone()
47 ).await;
48 }
49 }
50 }
51 })
52}
53
54pub fn create_once_off_persist(
55 persistence: Arc<dyn EdgePersistence>,
56 token_cache: Arc<DashMap<String, EdgeToken>>,
57 features_cache: Arc<FeatureCache>,
58) -> Pin<Box<dyn Future<Output = ()> + Send>> {
59 let token_cache = token_cache.clone();
60 let features_cache = features_cache.clone();
61 let persistence = persistence.clone();
62 Box::pin(async move { persist(persistence, token_cache, features_cache).await })
63}
64
65async fn save_known_tokens(
66 token_cache: &Arc<DashMap<String, EdgeToken>>,
67 persister: &Arc<dyn EdgePersistence>,
68) {
69 if !token_cache.is_empty() {
70 match persister
71 .save_tokens(
72 token_cache
73 .iter()
74 .filter(|t| t.value().status == TokenValidationStatus::Validated)
75 .map(|e| e.value().clone())
76 .collect(),
77 )
78 .await
79 {
80 Ok(()) => debug!("Persisted tokens"),
81 Err(save_error) => warn!("Could not persist tokens: {save_error:?}"),
82 }
83 } else {
84 debug!("No validated tokens found, skipping tokens persistence");
85 }
86}
87
88async fn save_features(features_cache: &FeatureCache, persister: &Arc<dyn EdgePersistence>) {
89 if !features_cache.is_empty() {
90 match persister
91 .save_features(
92 features_cache
93 .iter()
94 .map(|e| (e.key().clone(), e.value().clone()))
95 .collect(),
96 )
97 .await
98 {
99 Ok(()) => debug!("Persisted features"),
100 Err(save_error) => warn!("Could not persist features: {save_error:?}"),
101 }
102 } else {
103 debug!("No features found, skipping features persistence");
104 }
105}
106
107#[cfg(test)]
108pub mod tests {
109 use super::*;
110
111 struct MockPersistence {}
112
113 fn build_mock_persistence() -> Arc<dyn EdgePersistence> {
114 Arc::new(MockPersistence {})
115 }
116
117 #[async_trait]
118 impl EdgePersistence for MockPersistence {
119 async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
120 panic!("Not expected to be called");
121 }
122
123 async fn save_tokens(&self, _: Vec<EdgeToken>) -> EdgeResult<()> {
124 panic!("Not expected to be called");
125 }
126
127 async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
128 panic!("Not expected to be called");
129 }
130
131 async fn save_features(&self, _: Vec<(String, ClientFeatures)>) -> EdgeResult<()> {
132 panic!("Not expected to be called");
133 }
134 }
135
136 #[tokio::test]
137 async fn persistence_ignores_empty_feature_sets() {
138 let cache: DashMap<String, ClientFeatures> = DashMap::new();
139 let persister = build_mock_persistence();
140
141 save_features(&Arc::new(FeatureCache::new(cache)), &persister.clone()).await;
142 }
143
144 #[tokio::test]
145 async fn persistence_ignores_empty_token_sets() {
146 let cache: DashMap<String, EdgeToken> = DashMap::new();
147 let persister = build_mock_persistence();
148
149 save_known_tokens(&Arc::new(cache), &persister.clone()).await;
150 }
151}