use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use anyhow::Context;
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use geph4_protocol::binder::client::E2eeHttpTransport;
use itertools::Itertools;
use nanorpc::{DynRpcTransport, RpcTransport};
use smol_timeout::TimeoutExt;
pub fn parse_fronts(
binder_lpk: [u8; 32],
fronts: impl IntoIterator<Item = (String, String)>,
) -> DynRpcTransport {
let alternatives = fronts
.into_iter()
.map(|(endpoint, real_host)| {
DynRpcTransport::new(E2eeHttpTransport::new(
binder_lpk,
endpoint,
vec![("host".to_string(), real_host)],
))
})
.collect_vec();
let unified = MultiRpcTransport(alternatives);
DynRpcTransport::new(unified)
}
struct MultiRpcTransport(Vec<DynRpcTransport>);
#[async_trait]
impl RpcTransport for MultiRpcTransport {
type Error = anyhow::Error;
async fn call_raw(
&self,
req: nanorpc::JrpcRequest,
) -> Result<nanorpc::JrpcResponse, Self::Error> {
let mut backoff = ExponentialBackoffBuilder::new().build();
loop {
static IDX: AtomicUsize = AtomicUsize::new(0);
let idx = IDX.load(Ordering::Relaxed) % self.0.len();
let random_element = &self.0[idx];
log::debug!("selecting binder front {idx}");
let req = req.clone();
let vv = async {
anyhow::Ok(
random_element
.call_raw(req)
.timeout(Duration::from_secs(3))
.await
.context("timeout on one of the transports")??,
)
};
match vv.await {
Ok(v) => return Ok(v),
Err(err) => {
log::warn!("binder front {idx} failed: {:?}", err);
IDX.fetch_add(1, Ordering::Relaxed);
if let Some(next) = backoff.next_backoff() {
smol::Timer::after(next).await;
} else {
return Err(err);
}
}
}
}
}
}