affinidi_secrets_resolver/
lib.rs1use ahash::AHashMap;
15use secrets::Secret;
16use std::{cell::RefCell, time::Duration};
17use task::{SecretTaskCommand, SecretsTask};
18use tokio::{
19 sync::{
20 mpsc::{self, error::TrySendError},
21 oneshot,
22 },
23 task::JoinHandle,
24};
25use tracing::{debug, warn};
26
27mod crypto;
29
30pub mod errors;
31pub mod secrets;
32pub mod task;
33
34pub use affinidi_encoding::multicodec;
36
37#[allow(async_fn_in_trait)]
39pub trait SecretsResolver {
40 async fn insert(&self, secret: Secret);
42
43 async fn insert_vec(&self, secrets: &[Secret]);
45
46 async fn get_secret(&self, secret_id: &str) -> Option<Secret>;
48
49 async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String>;
56
57 async fn remove_secret(&self, secret_id: &str) -> Option<Secret>;
59
60 async fn len(&self) -> usize;
62
63 async fn is_empty(&self) -> bool;
65}
66
67pub struct SimpleSecretsResolver {
71 known_secrets: RefCell<AHashMap<String, Secret>>,
72}
73
74impl SimpleSecretsResolver {
75 pub async fn new(known_secrets: &[Secret]) -> Self {
89 let secrets = SimpleSecretsResolver {
90 known_secrets: RefCell::new(AHashMap::new()),
91 };
92
93 secrets.insert_vec(known_secrets).await;
94
95 secrets
96 }
97}
98
99impl SecretsResolver for SimpleSecretsResolver {
100 async fn insert(&self, secret: Secret) {
101 self.insert_vec(&[secret]).await;
102 }
103
104 async fn insert_vec(&self, secrets: &[Secret]) {
105 for secret in secrets {
106 debug!("Adding secret ({})", secret.id);
107 self.known_secrets
108 .borrow_mut()
109 .insert(secret.id.to_owned(), secret.to_owned());
110 }
111 }
112
113 async fn get_secret(&self, secret_id: &str) -> Option<Secret> {
114 self.known_secrets.borrow().get(secret_id).cloned()
115 }
116
117 async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String> {
118 secret_ids
119 .iter()
120 .filter_map(|sid| {
121 if self.known_secrets.borrow().contains_key(sid) {
122 Some(sid.to_string())
123 } else {
124 None
125 }
126 })
127 .collect()
128 }
129
130 async fn remove_secret(&self, secret_id: &str) -> Option<Secret> {
131 self.known_secrets.borrow_mut().remove(secret_id)
132 }
133
134 async fn len(&self) -> usize {
135 self.known_secrets.borrow().len()
136 }
137
138 async fn is_empty(&self) -> bool {
139 self.known_secrets.borrow().is_empty()
140 }
141}
142
143#[derive(Clone)]
150pub struct ThreadedSecretsResolver {
151 tx: mpsc::Sender<SecretTaskCommand>,
152}
153
154impl ThreadedSecretsResolver {
155 pub async fn new(
156 secrets_task_tx: Option<mpsc::Sender<SecretTaskCommand>>,
157 ) -> (Self, Option<JoinHandle<()>>) {
158 if let Some(tx) = secrets_task_tx {
159 (ThreadedSecretsResolver { tx }, None)
160 } else {
161 let (task, tx) = SecretsTask::new();
162 (ThreadedSecretsResolver { tx }, Some(task.start().await))
163 }
164 }
165
166 pub async fn stop(&self) {
168 let _ = self.tx.send(SecretTaskCommand::Terminate).await;
169 }
170}
171
172impl SecretsResolver for ThreadedSecretsResolver {
173 async fn insert(&self, secret: Secret) {
174 self.insert_vec(&[secret]).await;
175 }
176
177 async fn insert_vec(&self, secrets: &[Secret]) {
178 for secret in secrets {
179 debug!("Adding secret ({})", secret.id);
180 match self.tx.try_send(SecretTaskCommand::AddSecret {
181 secret: secret.to_owned(),
182 }) {
183 Ok(_) => (),
184 Err(TrySendError::Closed(_)) => {
185 warn!("Secrets Task has been closed");
186 }
187 Err(TrySendError::Full(_)) => {
188 warn!("Secrets Task channel is full");
189 }
190 }
191 }
192 }
193
194 async fn get_secret(&self, secret_id: &str) -> Option<Secret> {
195 let (tx, rx) = oneshot::channel();
196 match self.tx.try_send(SecretTaskCommand::GetSecret {
197 key_id: secret_id.to_string(),
198 tx,
199 }) {
200 Ok(_) => (),
201 Err(TrySendError::Closed(_)) => {
202 warn!("Secrets Task has been closed");
203 return None;
204 }
205 Err(TrySendError::Full(_)) => {
206 warn!("Secrets Task channel is full");
207 return None;
208 }
209 }
210
211 let timeout = tokio::time::sleep(Duration::from_secs(1));
212 tokio::pin!(timeout);
213
214 tokio::select! {
215 _ = &mut timeout => None,
216 rx = rx => rx.unwrap_or(None)
217 }
218 }
219
220 async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String> {
221 let (tx, rx) = oneshot::channel();
222 match self.tx.try_send(SecretTaskCommand::FindSecrets {
223 keys: secret_ids.to_vec(),
224 tx,
225 }) {
226 Ok(_) => (),
227 Err(TrySendError::Closed(_)) => {
228 warn!("Secrets Task has been closed");
229 return vec![];
230 }
231 Err(TrySendError::Full(_)) => {
232 warn!("Secrets Task channel is full");
233 return vec![];
234 }
235 }
236
237 let timeout = tokio::time::sleep(Duration::from_secs(1));
238 tokio::pin!(timeout);
239
240 tokio::select! {
241 _ = &mut timeout => vec![],
242 rx = rx => rx.unwrap_or(vec![])
243 }
244 }
245
246 async fn remove_secret(&self, secret_id: &str) -> Option<Secret> {
248 match self.tx.try_send(SecretTaskCommand::RemoveSecret {
249 key_id: secret_id.to_string(),
250 }) {
251 Ok(_) => (),
252 Err(TrySendError::Closed(_)) => {
253 warn!("Secrets Task has been closed");
254 }
255 Err(TrySendError::Full(_)) => {
256 warn!("Secrets Task channel is full");
257 }
258 }
259
260 None
261 }
262
263 async fn len(&self) -> usize {
264 let (tx, rx) = oneshot::channel();
265 match self.tx.try_send(SecretTaskCommand::SecretsStored { tx }) {
266 Ok(_) => (),
267 Err(TrySendError::Closed(_)) => {
268 warn!("Secrets Task has been closed");
269 return 0;
270 }
271 Err(TrySendError::Full(_)) => {
272 warn!("Secrets Task channel is full");
273 return 0;
274 }
275 }
276
277 let timeout = tokio::time::sleep(Duration::from_secs(1));
278 tokio::pin!(timeout);
279
280 tokio::select! {
281 _ = &mut timeout => 0,
282 rx = rx => {
283 rx.unwrap_or(0)
284 }
285 }
286 }
287
288 async fn is_empty(&self) -> bool {
289 let (tx, rx) = oneshot::channel();
290 match self.tx.try_send(SecretTaskCommand::SecretsStored { tx }) {
291 Ok(_) => (),
292 Err(TrySendError::Closed(_)) => {
293 warn!("Secrets Task has been closed");
294 return true;
295 }
296 Err(TrySendError::Full(_)) => {
297 warn!("Secrets Task channel is full");
298 return true;
299 }
300 }
301
302 let timeout = tokio::time::sleep(Duration::from_secs(1));
303 tokio::pin!(timeout);
304
305 tokio::select! {
306 _ = &mut timeout => true,
307 rx = rx => {
308 match rx {
309 Ok(length) => length == 0,
310 Err(_) => true,
311 }
312 }
313 }
314 }
315}