Skip to main content

affinidi_secrets_resolver/
task.rs

1/*!
2 * In multi-threaded applications, it is suggested to use a separate task to handle secrets
3 *
4 * This removes the need for locks and copying secrets around
5 */
6
7use crate::secrets::Secret;
8use ahash::AHashMap;
9use tokio::{
10    sync::{mpsc, oneshot},
11    task::JoinHandle,
12};
13use tracing::{debug, warn};
14
15pub struct SecretsTask {
16    channel_rx: mpsc::Receiver<SecretTaskCommand>,
17}
18
19/// Secrets Task Commands
20pub enum SecretTaskCommand {
21    /// Add a Secret
22    AddSecret { secret: Secret },
23
24    /// Add many Secrets
25    AddSecrets { secrets: Vec<Secret> },
26
27    /// Remove a secret by its key ID
28    RemoveSecret { key_id: String },
29
30    /// Get a secret by its name
31    GetSecret {
32        key_id: String,
33        tx: oneshot::Sender<Option<Secret>>,
34    },
35
36    /// Check if a number of Key ID's exist in the Secrets Resolver
37    FindSecrets {
38        keys: Vec<String>,
39        tx: oneshot::Sender<Vec<String>>,
40    },
41
42    /// Number of secrets stored
43    SecretsStored { tx: oneshot::Sender<usize> },
44
45    /// Terminate the Secrets Task
46    Terminate,
47}
48
49impl SecretsTask {
50    /// Create a new SecretsTask
51    pub fn new() -> (Self, mpsc::Sender<SecretTaskCommand>) {
52        let (tx, rx) = mpsc::channel(32);
53
54        (SecretsTask { channel_rx: rx }, tx)
55    }
56
57    /// Start the Secrets Task
58    pub async fn start(self) -> JoinHandle<()> {
59        tokio::spawn(async move {
60            self.run().await;
61        })
62    }
63
64    /// Main loop of the Secrets Task
65    async fn run(mut self) {
66        let mut secrets_cache: AHashMap<String, Secret> = AHashMap::new();
67
68        loop {
69            tokio::select! {
70                    msg = self.channel_rx.recv() => {
71                        if _handle_msg(&mut secrets_cache, msg) {
72                            break;
73                        }
74                }
75            } // End of loop
76        }
77
78        debug!("Exiting Secrets Task");
79    }
80}
81
82fn _handle_msg(
83    secrets_cache: &mut AHashMap<String, Secret>,
84    msg: Option<SecretTaskCommand>,
85) -> bool {
86    let mut exit_flag = false;
87    match msg {
88        Some(SecretTaskCommand::AddSecret { secret }) => {
89            secrets_cache.insert(secret.id.clone(), secret);
90        }
91        Some(SecretTaskCommand::AddSecrets { secrets }) => {
92            for secret in secrets {
93                secrets_cache.insert(secret.id.clone(), secret);
94            }
95        }
96        Some(SecretTaskCommand::RemoveSecret { key_id }) => {
97            secrets_cache.remove(&key_id);
98        }
99        Some(SecretTaskCommand::GetSecret { key_id, tx }) => {
100            let _ = tx.send(secrets_cache.get(&key_id).cloned());
101        }
102        Some(SecretTaskCommand::FindSecrets { keys, tx }) => {
103            let _ = tx.send(
104                keys.iter()
105                    .filter(|sid| secrets_cache.contains_key(sid.as_str()))
106                    .cloned()
107                    .collect(),
108            );
109        }
110        Some(SecretTaskCommand::SecretsStored { tx }) => {
111            let _ = tx.send(secrets_cache.len());
112        }
113        Some(SecretTaskCommand::Terminate) => {
114            debug!("Terminating Secrets Task");
115            exit_flag = true;
116        }
117        None => {
118            warn!("Secrets Task channel closed unexpectedly");
119            exit_flag = true;
120        }
121    }
122
123    exit_flag
124}