ockam_api 0.93.0

Ockam's request-response API
use crate::influxdb::influxdb_api_client::InfluxDBApi;
use crate::influxdb::lease_issuer::node_service::InfluxDBTokenLessorState;
use crate::influxdb::lease_token::LeaseToken;
use crate::ApiError;
use ockam_core::{async_trait, Processor};
use ockam_node::Context;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use tokio::sync::RwLock;

pub(crate) struct InfluxDBTokenLessorProcessor {
    state: Arc<RwLock<InfluxDBTokenLessorState>>,
}

impl InfluxDBTokenLessorProcessor {
    pub(crate) fn new(state: Arc<RwLock<InfluxDBTokenLessorState>>) -> Self {
        Self { state }
    }

    async fn list_tokens(&self) -> ockam_core::Result<Vec<LeaseToken>> {
        debug!("Listing all tokens");
        let influxdb_tokens = {
            let state = self.state.read().await;
            state
                .influxdb_api_client
                .list_tokens()
                .await?
                .into_response()
                .map_err(|_| ApiError::core("Failed to list tokens"))?
                .tokens
        };
        let lease_tokens: Vec<LeaseToken> = influxdb_tokens
            .into_iter()
            .filter_map(|token| {
                let lease_token: Result<Option<LeaseToken>, _> = token.try_into();
                lease_token.ok().flatten()
            })
            .collect();
        info!("Found {} tokens", lease_tokens.len());
        Ok(lease_tokens)
    }

    async fn revoke_outstanding_tokens(&self) -> ockam_core::Result<()> {
        let to_remove = {
            let state = self.state.read().await;
            if state.active_tokens.is_empty() {
                return Ok(());
            }
            let influxdb_api_client = &state.influxdb_api_client;
            let mut to_remove = vec![];
            for token in state.active_tokens.iter() {
                if token.0.is_expired().unwrap_or(true) {
                    let token_id = &token.0.id;
                    if influxdb_api_client.revoke_token(token_id).await.is_ok() {
                        to_remove.push(token_id.clone());
                    }
                } else {
                    break;
                }
            }
            to_remove
        };
        {
            let mut state = self.state.write().await;
            state
                .active_tokens
                .retain(|token| !to_remove.contains(&token.0.id));
        }
        Ok(())
    }
}

#[async_trait]
impl Processor for InfluxDBTokenLessorProcessor {
    type Context = Context;

    async fn initialize(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> {
        let mut max_retries = 5;
        loop {
            match self.list_tokens().await {
                Ok(tokens) => {
                    let mut state = self.state.write().await;
                    state.active_tokens =
                        BinaryHeap::from(tokens.into_iter().map(Reverse).collect::<Vec<_>>());
                    break;
                }
                Err(_) => {
                    max_retries -= 1;
                    if max_retries == 0 {
                        error!("Failed to initialize InfluxDBTokenLessorProcessor");
                        break;
                    }
                }
            }
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        }
        Ok(())
    }

    async fn shutdown(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> {
        debug!("Shutting down InfluxDBTokenLessorProcessor");
        Ok(())
    }

    async fn process(&mut self, _context: &mut Self::Context) -> ockam_core::Result<bool> {
        if let Err(err) = self.revoke_outstanding_tokens().await {
            error!("Failed to revoke outstanding tokens: {err}");
        }
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(true)
    }
}