authly-client 0.0.6

A Rust client for Authly
Documentation
use std::{sync::Arc, time::Duration};

use authly_common::proto::service::{self as proto};
use tonic::Streaming;

use crate::{
    access_control,
    connection::{make_connection, ConnectionParams},
    error, ClientState, Error,
};

pub async fn spawn_background_worker(
    state: Arc<ClientState>,
    reconfigured_tx: tokio::sync::watch::Sender<Arc<ConnectionParams>>,
    closed_rx: tokio::sync::watch::Receiver<()>,
) -> Result<(), Error> {
    let msg_stream = init_message_stream(&state).await?;
    tokio::spawn(background_worker(
        state,
        reconfigured_tx,
        closed_rx,
        msg_stream,
    ));

    Ok(())
}

async fn background_worker(
    state: Arc<ClientState>,
    reconfigured_tx: tokio::sync::watch::Sender<Arc<ConnectionParams>>,
    mut closed_rx: tokio::sync::watch::Receiver<()>,
    mut msg_stream: Streaming<proto::ServiceMessage>,
) {
    loop {
        tokio::select! {
            msg_result = msg_stream.message() => {
                handle_message_result(&state, msg_result, &mut msg_stream, &reconfigured_tx).await;
            }
            _ = closed_rx.changed() => {
                tracing::info!("Authly channel closed");
                return;
            }
        }
    }
}

async fn handle_message_result(
    state: &ClientState,
    msg_result: Result<Option<proto::ServiceMessage>, tonic::Status>,
    msg_stream: &mut Streaming<proto::ServiceMessage>,
    reconfigured_tx: &tokio::sync::watch::Sender<Arc<ConnectionParams>>,
) {
    match msg_result {
        Ok(Some(msg)) => {
            if let Some(kind) = msg.service_message_kind {
                handle_message_kind(state, kind, msg_stream, reconfigured_tx).await;
            }
        }
        Ok(None) => {
            reconfigure_loop(state, msg_stream, reconfigured_tx).await;
        }
        Err(_error) => {
            reconfigure_loop(state, msg_stream, reconfigured_tx).await;
        }
    }
}

async fn handle_message_kind(
    state: &ClientState,
    msg_kind: proto::service_message::ServiceMessageKind,
    msg_stream: &mut Streaming<proto::ServiceMessage>,
    reconfigured_tx: &tokio::sync::watch::Sender<Arc<ConnectionParams>>,
) {
    tracing::info!(?msg_kind, "Received Authly message");

    match msg_kind {
        proto::service_message::ServiceMessageKind::ReloadCa(_) => {
            reconfigure_loop(state, msg_stream, reconfigured_tx).await;
        }
        proto::service_message::ServiceMessageKind::ReloadCache(_) => {
            reload_local_cache(state).await;
        }
        proto::service_message::ServiceMessageKind::Ping(_) => {
            let _result = state
                .conn
                .load()
                .authly_service
                .clone()
                .pong(tonic::Request::new(proto::Empty {}))
                .await;
        }
    }
}

async fn reconfigure_loop(
    state: &ClientState,
    msg_stream: &mut Streaming<proto::ServiceMessage>,
    reconfigured_tx: &tokio::sync::watch::Sender<Arc<ConnectionParams>>,
) {
    loop {
        match try_reconfigure(state, msg_stream, reconfigured_tx).await {
            Ok(()) => return,
            Err(err) => {
                tracing::error!(?err, "background reconfigure error");

                tokio::time::sleep(Duration::from_secs(10)).await;
            }
        }
    }
}

async fn try_reconfigure(
    state: &ClientState,
    msg_stream: &mut Streaming<proto::ServiceMessage>,
    reconfigured_tx: &tokio::sync::watch::Sender<Arc<ConnectionParams>>,
) -> Result<(), Error> {
    let params = state.reconfigure.new_connection_params().await?;
    let connection = Arc::new(make_connection(params.clone()).await?);

    state.conn.store(connection.clone());

    *msg_stream = init_message_stream(state).await?;
    reload_local_cache(state).await;

    if let Err(err) = reconfigured_tx.send(params) {
        tracing::error!(?err, "Could not publish reconfigured connection params");
    }

    Ok(())
}

async fn init_message_stream(
    state: &ClientState,
) -> Result<Streaming<proto::ServiceMessage>, Error> {
    let mut current_service = state.conn.load().authly_service.clone();
    let response = current_service
        .messages(tonic::Request::new(proto::Empty {}))
        .await
        .map_err(error::tonic)?;

    Ok(response.into_inner())
}

async fn reload_local_cache(state: &ClientState) {
    match access_control::get_resource_property_mapping(state.conn.load().authly_service.clone())
        .await
    {
        Ok(property_mapping) => {
            state.resource_property_mapping.store(property_mapping);
        }
        Err(err) => {
            tracing::error!(?err, "failed to reload resource property mapping");
        }
    }
}