fedimint-api-client 0.5.1

fedimint-api-client provides common code used by client.
Documentation
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use fedimint_core::task::jit::JitTryAnyhow;
use fedimint_core::time::now;
use fedimint_core::util::{backoff_util, SafeUrl};
use fedimint_core::PeerId;
use fedimint_logging::LOG_CLIENT_NET_API;
use tokio::sync::{Mutex, RwLock};
use tracing::debug;

use super::net::Connector;
use super::JsonRpcClient;

// TODO(tvolk131): Merge this with `FederationPeerClient`.
#[derive(Debug)]
pub struct FederationPeer<C> {
    pub url: SafeUrl,
    pub peer_id: PeerId,
    pub api_secret: Option<String>,
    pub client: RwLock<FederationPeerClient<C>>,
    pub connector: Connector,
}

impl<C> FederationPeer<C>
where
    C: JsonRpcClient + 'static,
{
    pub fn new(
        connector: Connector,
        url: SafeUrl,
        peer_id: PeerId,
        api_secret: Option<String>,
    ) -> Self {
        let client = RwLock::new(FederationPeerClient::new(
            connector,
            peer_id,
            url.clone(),
            api_secret.clone(),
        ));

        Self {
            url,
            peer_id,
            api_secret,
            client,
            connector,
        }
    }
}

/// The client in [`FederationPeer`], that takes care of reconnecting by
/// starting background Jit task
#[derive(Debug)]
pub struct FederationPeerClient<C> {
    pub client: JitTryAnyhow<C>,
    connection_state: Arc<tokio::sync::Mutex<FederationPeerClientConnectionState>>,
}

impl<C> FederationPeerClient<C>
where
    C: JsonRpcClient + 'static,
{
    fn new(
        connector: Connector,
        peer_id: PeerId,
        url: SafeUrl,
        api_secret: Option<String>,
    ) -> Self {
        let connection_state = Arc::new(tokio::sync::Mutex::new(
            FederationPeerClientConnectionState::new(),
        ));

        Self {
            client: Self::new_jit_client(
                connector,
                peer_id,
                url,
                api_secret,
                connection_state.clone(),
            ),
            connection_state,
        }
    }

    fn new_jit_client(
        connector: Connector,
        peer_id: PeerId,
        url: SafeUrl,
        api_secret: Option<String>,
        connection_state: Arc<Mutex<FederationPeerClientConnectionState>>,
    ) -> JitTryAnyhow<C> {
        JitTryAnyhow::new_try(move || async move {
            Self::wait(&peer_id, &url, &connection_state).await;

            let res = match connector {
                Connector::Tcp => C::connect(&url, api_secret).await,
                #[cfg(all(feature = "tor", not(target_family = "wasm")))]
                Connector::Tor => C::connect_with_tor(&url, api_secret).await,
                #[cfg(all(feature = "tor", target_family = "wasm"))]
                Connector::Tor => unimplemented!(),
            };

            match &res {
                Ok(_) => {
                    connection_state.lock().await.reset();
                    debug!(
                            target: LOG_CLIENT_NET_API,
                            peer_id = %peer_id,
                            url = %url,
                            "Connected to peer");
                }
                Err(err) => {
                    debug!(
                            target: LOG_CLIENT_NET_API,
                            peer_id = %peer_id,
                            url = %url,
                            %err, "Unable to connect to peer");
                }
            }
            Ok(res?)
        })
    }

    pub fn reconnect(
        &mut self,
        connector: Connector,
        peer_id: PeerId,
        url: SafeUrl,
        api_secret: Option<String>,
    ) {
        self.client = Self::new_jit_client(
            connector,
            peer_id,
            url,
            api_secret,
            self.connection_state.clone(),
        );
    }

    async fn wait(
        peer_id: &PeerId,
        url: &SafeUrl,
        connection_state: &Arc<Mutex<FederationPeerClientConnectionState>>,
    ) {
        let mut connection_state_guard = connection_state.lock().await;

        if connection_state_guard.last_connection_attempt.is_none() {
            debug!(
                target: LOG_CLIENT_NET_API,
                peer_id = %peer_id,
                url = %url,
                "Connecting to peer...");
        } else {
            debug!(
                target: LOG_CLIENT_NET_API,
                peer_id = %peer_id,
                url = %url,
                "Retrying connecting to peer...");
        }

        connection_state_guard.wait().await;
    }
}

/// Connection state shared/preserved between [`FederationPeerClient`] and the
/// Jit tasks it spawns.
#[derive(Debug)]
struct FederationPeerClientConnectionState {
    /// Last time a connection attempt was made, or `None` if no attempt has
    /// been made yet.
    last_connection_attempt: Option<SystemTime>,
    connection_backoff: backoff_util::FibonacciBackoff,
}

impl FederationPeerClientConnectionState {
    const MIN_BACKOFF: Duration = Duration::from_millis(100);
    const MAX_BACKOFF: Duration = Duration::from_secs(15);

    fn new() -> Self {
        Self {
            last_connection_attempt: None,
            connection_backoff: backoff_util::custom_backoff(
                Self::MIN_BACKOFF,
                Self::MAX_BACKOFF,
                None,
            ),
        }
    }

    /// Wait (if needed) before reconnection attempt based on number of previous
    /// attempts and update reconnection stats.
    async fn wait(&mut self) {
        let now_ts = now();
        let desired_timeout = self.connection_backoff.next().unwrap_or(Self::MAX_BACKOFF);

        let sleep_duration = match self.last_connection_attempt {
            Some(last) => {
                let since_last_connect = now_ts.duration_since(last).unwrap_or_default();
                desired_timeout.saturating_sub(since_last_connect)
            }
            None => Duration::ZERO,
        };

        if Duration::ZERO < sleep_duration {
            debug!(
                target: LOG_CLIENT_NET_API,
                duration_ms=sleep_duration.as_millis(),
                "Waiting before reconnecting");
            fedimint_core::runtime::sleep(sleep_duration).await;
        }

        self.last_connection_attempt = Some(now_ts);
    }

    fn reset(&mut self) {
        *self = Self::new();
    }
}