use crate::influxdb::lease_issuer::node_service::InfluxDBTokenLessorNodeServiceTrait;
use crate::nodes::InMemoryNode;
use ockam::{compat::time::now, Address, Mailboxes};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{api::Error, AllowAll, DenyAll};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use std::cmp::max;
use std::sync::{Arc, RwLock as SyncRwLock, Weak};
#[derive(Clone)]
pub struct TokenLeaseRefresher {
token: Arc<SyncRwLock<Option<String>>>,
}
impl TokenLeaseRefresher {
pub fn new_with_fixed_token(token: String) -> TokenLeaseRefresher {
let token = Arc::new(SyncRwLock::new(Some(token)));
Self { token }
}
pub fn new(
ctx: &Context,
node_manager: Weak<InMemoryNode>,
lease_issuer_route: MultiAddr,
) -> Result<TokenLeaseRefresher, Error> {
let token = Arc::new(SyncRwLock::new(None));
let mailboxes = Mailboxes::primary(
Address::random_tagged("LeaseRetriever"),
Arc::new(DenyAll),
Arc::new(AllowAll),
);
let new_ctx = ctx.new_detached_with_mailboxes(mailboxes)?;
let token_clone = token.clone();
ockam_node::spawn(async move {
if let Err(err) = refresh_loop(
token_clone,
new_ctx,
node_manager.clone(),
lease_issuer_route,
)
.await
{
error!("Token refresher terminated with error: {:}", err);
}
});
Ok(Self { token })
}
pub fn get_token(&self) -> Option<String> {
self.token.read().unwrap().clone()
}
}
async fn refresh_loop(
token: Arc<SyncRwLock<Option<String>>>,
ctx: Context,
node_manager: Weak<InMemoryNode>,
lease_issuer_route: MultiAddr,
) -> ockam_core::Result<()> {
loop {
debug!("refreshing token");
let node_manager = node_manager.upgrade().ok_or_else(|| {
ockam_core::Error::new(Origin::Node, Kind::Internal, "node manager was shut down")
})?;
let token_result = node_manager.create_token(&ctx, &lease_issuer_route).await;
let now_t = now()?;
let wait_secs = match token_result {
Ok(new_token) => {
let duration = new_token.expires_at as u64 - now_t;
debug!("Auth Token obtained expires at {}", new_token.expires_at);
let mut t = token.write().unwrap();
*t = Some(new_token.token);
max(duration / 2, 5)
}
Err(err) => {
warn!("Error retrieving token {:}", err);
15
}
};
debug!("waiting for {} seconds before refreshing token", wait_secs);
ctx.sleep_long_until(now_t + wait_secs).await;
}
}