affinidi_secrets_resolver/
task.rs1use crate::secrets::Secret;
8use ahash::AHashMap;
9use tokio::{
10 sync::{mpsc, oneshot},
11 task::JoinHandle,
12};
13use tracing::{debug, warn};
14
15pub struct SecretsTask {
16 channel_rx: mpsc::Receiver<SecretTaskCommand>,
17}
18
19pub enum SecretTaskCommand {
21 AddSecret { secret: Secret },
23
24 AddSecrets { secrets: Vec<Secret> },
26
27 RemoveSecret { key_id: String },
29
30 GetSecret {
32 key_id: String,
33 tx: oneshot::Sender<Option<Secret>>,
34 },
35
36 FindSecrets {
38 keys: Vec<String>,
39 tx: oneshot::Sender<Vec<String>>,
40 },
41
42 SecretsStored { tx: oneshot::Sender<usize> },
44
45 Terminate,
47}
48
49impl SecretsTask {
50 pub fn new() -> (Self, mpsc::Sender<SecretTaskCommand>) {
52 let (tx, rx) = mpsc::channel(32);
53
54 (SecretsTask { channel_rx: rx }, tx)
55 }
56
57 pub async fn start(self) -> JoinHandle<()> {
59 tokio::spawn(async move {
60 self.run().await;
61 })
62 }
63
64 async fn run(mut self) {
66 let mut secrets_cache: AHashMap<String, Secret> = AHashMap::new();
67
68 loop {
69 tokio::select! {
70 msg = self.channel_rx.recv() => {
71 if _handle_msg(&mut secrets_cache, msg) {
72 break;
73 }
74 }
75 } }
77
78 debug!("Exiting Secrets Task");
79 }
80}
81
82fn _handle_msg(
83 secrets_cache: &mut AHashMap<String, Secret>,
84 msg: Option<SecretTaskCommand>,
85) -> bool {
86 let mut exit_flag = false;
87 match msg {
88 Some(SecretTaskCommand::AddSecret { secret }) => {
89 secrets_cache.insert(secret.id.clone(), secret);
90 }
91 Some(SecretTaskCommand::AddSecrets { secrets }) => {
92 for secret in secrets {
93 secrets_cache.insert(secret.id.clone(), secret);
94 }
95 }
96 Some(SecretTaskCommand::RemoveSecret { key_id }) => {
97 secrets_cache.remove(&key_id);
98 }
99 Some(SecretTaskCommand::GetSecret { key_id, tx }) => {
100 let _ = tx.send(secrets_cache.get(&key_id).cloned());
101 }
102 Some(SecretTaskCommand::FindSecrets { keys, tx }) => {
103 let _ = tx.send(
104 keys.iter()
105 .filter(|sid| secrets_cache.contains_key(sid.as_str()))
106 .cloned()
107 .collect(),
108 );
109 }
110 Some(SecretTaskCommand::SecretsStored { tx }) => {
111 let _ = tx.send(secrets_cache.len());
112 }
113 Some(SecretTaskCommand::Terminate) => {
114 debug!("Terminating Secrets Task");
115 exit_flag = true;
116 }
117 None => {
118 warn!("Secrets Task channel closed unexpectedly");
119 exit_flag = true;
120 }
121 }
122
123 exit_flag
124}