use ahash::AHashMap;
use secrets::Secret;
use std::{cell::RefCell, time::Duration};
use task::{SecretTaskCommand, SecretsTask};
use tokio::{
sync::{
mpsc::{self, error::TrySendError},
oneshot,
},
task::JoinHandle,
};
use tracing::{debug, warn};
mod crypto;
pub mod errors;
pub mod secrets;
pub mod task;
pub use affinidi_encoding::multicodec;
#[allow(async_fn_in_trait)]
pub trait SecretsResolver {
async fn insert(&self, secret: Secret);
async fn insert_vec(&self, secrets: &[Secret]);
async fn get_secret(&self, secret_id: &str) -> Option<Secret>;
async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String>;
async fn remove_secret(&self, secret_id: &str) -> Option<Secret>;
async fn len(&self) -> usize;
async fn is_empty(&self) -> bool;
}
pub struct SimpleSecretsResolver {
known_secrets: RefCell<AHashMap<String, Secret>>,
}
impl SimpleSecretsResolver {
pub async fn new(known_secrets: &[Secret]) -> Self {
let secrets = SimpleSecretsResolver {
known_secrets: RefCell::new(AHashMap::new()),
};
secrets.insert_vec(known_secrets).await;
secrets
}
}
impl SecretsResolver for SimpleSecretsResolver {
async fn insert(&self, secret: Secret) {
self.insert_vec(&[secret]).await;
}
async fn insert_vec(&self, secrets: &[Secret]) {
for secret in secrets {
debug!("Adding secret ({})", secret.id);
self.known_secrets
.borrow_mut()
.insert(secret.id.to_owned(), secret.to_owned());
}
}
async fn get_secret(&self, secret_id: &str) -> Option<Secret> {
self.known_secrets.borrow().get(secret_id).cloned()
}
async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String> {
secret_ids
.iter()
.filter_map(|sid| {
if self.known_secrets.borrow().contains_key(sid) {
Some(sid.to_string())
} else {
None
}
})
.collect()
}
async fn remove_secret(&self, secret_id: &str) -> Option<Secret> {
self.known_secrets.borrow_mut().remove(secret_id)
}
async fn len(&self) -> usize {
self.known_secrets.borrow().len()
}
async fn is_empty(&self) -> bool {
self.known_secrets.borrow().is_empty()
}
}
#[derive(Clone)]
pub struct ThreadedSecretsResolver {
tx: mpsc::Sender<SecretTaskCommand>,
}
impl ThreadedSecretsResolver {
pub async fn new(
secrets_task_tx: Option<mpsc::Sender<SecretTaskCommand>>,
) -> (Self, Option<JoinHandle<()>>) {
if let Some(tx) = secrets_task_tx {
(ThreadedSecretsResolver { tx }, None)
} else {
let (task, tx) = SecretsTask::new();
(ThreadedSecretsResolver { tx }, Some(task.start().await))
}
}
pub async fn stop(&self) {
let _ = self.tx.send(SecretTaskCommand::Terminate).await;
}
}
impl SecretsResolver for ThreadedSecretsResolver {
async fn insert(&self, secret: Secret) {
self.insert_vec(&[secret]).await;
}
async fn insert_vec(&self, secrets: &[Secret]) {
for secret in secrets {
debug!("Adding secret ({})", secret.id);
match self.tx.try_send(SecretTaskCommand::AddSecret {
secret: secret.to_owned(),
}) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
}
}
}
}
async fn get_secret(&self, secret_id: &str) -> Option<Secret> {
let (tx, rx) = oneshot::channel();
match self.tx.try_send(SecretTaskCommand::GetSecret {
key_id: secret_id.to_string(),
tx,
}) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
return None;
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
return None;
}
}
let timeout = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(timeout);
tokio::select! {
_ = &mut timeout => None,
rx = rx => rx.unwrap_or(None)
}
}
async fn find_secrets(&self, secret_ids: &[String]) -> Vec<String> {
let (tx, rx) = oneshot::channel();
match self.tx.try_send(SecretTaskCommand::FindSecrets {
keys: secret_ids.to_vec(),
tx,
}) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
return vec![];
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
return vec![];
}
}
let timeout = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(timeout);
tokio::select! {
_ = &mut timeout => vec![],
rx = rx => rx.unwrap_or(vec![])
}
}
async fn remove_secret(&self, secret_id: &str) -> Option<Secret> {
match self.tx.try_send(SecretTaskCommand::RemoveSecret {
key_id: secret_id.to_string(),
}) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
}
}
None
}
async fn len(&self) -> usize {
let (tx, rx) = oneshot::channel();
match self.tx.try_send(SecretTaskCommand::SecretsStored { tx }) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
return 0;
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
return 0;
}
}
let timeout = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(timeout);
tokio::select! {
_ = &mut timeout => 0,
rx = rx => {
rx.unwrap_or(0)
}
}
}
async fn is_empty(&self) -> bool {
let (tx, rx) = oneshot::channel();
match self.tx.try_send(SecretTaskCommand::SecretsStored { tx }) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => {
warn!("Secrets Task has been closed");
return true;
}
Err(TrySendError::Full(_)) => {
warn!("Secrets Task channel is full");
return true;
}
}
let timeout = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(timeout);
tokio::select! {
_ = &mut timeout => true,
rx = rx => {
match rx {
Ok(length) => length == 0,
Err(_) => true,
}
}
}
}
}