geph5-client 0.2.68

Geph5 client
Documentation
use std::time::Duration;

use async_trait::async_trait;
use futures_concurrency::future::FutureGroup;
use futures_util::StreamExt;
use nanorpc::{DynRpcTransport, JrpcRequest, JrpcResponse, RpcTransport};
use smol_timeout2::TimeoutExt as _;

pub struct RaceTransport {
    choices: Vec<DynRpcTransport>,
    selected: smol::lock::Mutex<Option<usize>>,
}

impl RaceTransport {
    pub fn new(choices: Vec<DynRpcTransport>) -> Self {
        Self {
            choices,
            selected: Default::default(),
        }
    }
}

#[async_trait]
impl RpcTransport for RaceTransport {
    type Error = anyhow::Error;

    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
        let mut selected = self.selected.lock().await;
        if let Some(idx) = selected.as_ref().copied() {
            tracing::debug!(method = &req.method, idx, "picked working transport");
            let res = self.choices[idx].call_raw(req).await;
            if res.is_err() {
                *selected = None;
            }
            res
        } else {
            tracing::debug!(
                method = &req.method,
                count = self.choices.len(),
                "racing for best transport"
            );
            let mut err = None;
            let mut future_group = FutureGroup::new();
            for (idx, choice) in self.choices.iter().enumerate() {
                let req = req.clone();
                future_group.insert(Box::pin(async move {
                    (
                        idx,
                        choice
                            .call_raw(req)
                            .timeout(Duration::from_secs(10))
                            .await
                            .ok_or_else(|| anyhow::anyhow!("broker call timed out"))
                            .and_then(|s| s),
                    )
                }));
            }
            while let Some((idx, res)) = future_group.next().await {
                match res {
                    Ok(res) => {
                        *selected = Some(idx);
                        return Ok(res);
                    }
                    Err(e) => err = Some(e),
                }
            }
            Err(err.unwrap())
        }
    }
}