geph4_binder_transport/
lib.rs

1mod wiretypes;
2
3use std::{
4    net::IpAddr,
5    sync::{atomic::AtomicUsize, atomic::Ordering, Arc},
6    time::Duration,
7};
8
9use smol_timeout::TimeoutExt;
10pub use wiretypes::*;
11mod http;
12pub use http::*;
13use rand::prelude::*;
14
15/// Trait that all binder clients implement.
16#[async_trait::async_trait]
17pub trait BinderClient: Sync + Send {
18    /// Send a request to the network with a certain timeout.
19    async fn request(&self, request: BinderRequestData) -> BinderResult<BinderResponse>;
20}
21
22/// Trait that all binder transport servers implement.
23pub trait BinderServer: Send + Sync {
24    /// Receive a request from the network.
25    fn next_request(&self) -> std::io::Result<BinderRequest>;
26}
27
28/// A binder request
29#[derive(derivative::Derivative)]
30#[derivative(Debug)]
31pub struct BinderRequest {
32    pub request_data: BinderRequestData,
33    probable_ip: Option<IpAddr>,
34    #[derivative(Debug = "ignore")]
35    response_send: Box<dyn FnOnce(BinderResult<BinderResponse>) + Send + Sync>,
36}
37
38impl BinderRequest {
39    /// Respond to the request.
40    pub fn respond(self, response: BinderResult<BinderResponse>) {
41        (self.response_send)(response)
42    }
43
44    /// The probable IP address of the request.
45    pub fn probable_ip(&self) -> Option<IpAddr> {
46        self.probable_ip
47    }
48}
49
50/// A BinderClient implementation wrapping multiple BinderClients.
51pub struct MultiBinderClient {
52    clients: Vec<Arc<dyn BinderClient>>,
53    index: AtomicUsize,
54}
55
56impl MultiBinderClient {
57    /// Create a MBC that doesn't have any BinderClients.
58    pub fn empty() -> Self {
59        MultiBinderClient {
60            clients: Vec::new(),
61            index: AtomicUsize::new(0),
62        }
63    }
64
65    /// Add a new MBC, returning the MBC produced.
66    pub fn add_client(mut self, new: impl BinderClient + 'static) -> Self {
67        self.clients.push(Arc::new(new));
68        self.clients.shuffle(&mut rand::thread_rng());
69        self
70    }
71}
72
73impl MultiBinderClient {
74    // does the request on ONE binder
75    async fn request_one(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
76        let mut timeout = Duration::from_secs(3);
77        loop {
78            let curr_idx = self.index.fetch_add(1, Ordering::Relaxed);
79            let client = &self.clients[curr_idx % self.clients.len()];
80            log::trace!("request_one started");
81            let res = client.request(request.clone()).timeout(timeout).await;
82            if let Some(Ok(res)) = res {
83                log::trace!("request_one succeeded");
84                self.index.fetch_sub(1, Ordering::Relaxed);
85                return Ok(res);
86            } else if let Some(Err(BinderError::Network(other))) = res {
87                log::warn!("MultiBinderClient switching backend due to error {}", other);
88            } else if let Some(e) = res {
89                return e;
90            } else {
91                log::warn!(
92                    "MultiBinderClient switching backend due to timeout {:?}",
93                    timeout
94                );
95                timeout += Duration::from_secs(1);
96            }
97        }
98    }
99
100    // does the request on all binders
101    async fn request_multi(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
102        let (send_res, recv_res) = smol::channel::unbounded();
103        let mut _tasks = vec![];
104        for (idx, client) in self.clients.iter().enumerate() {
105            let client = client.clone();
106            let request = request.clone();
107            let send_res = send_res.clone();
108            _tasks.push(smolscale::spawn(async move {
109                let _ = send_res.send((idx, client.request(request).await)).await;
110            }));
111        }
112        drop(send_res);
113
114        let mut final_res = None;
115        for _ in 0.._tasks.len() {
116            let (idx, res) = recv_res
117                .recv()
118                .await
119                .expect("result channel shouldn't have closed");
120            self.index.store(idx, Ordering::Relaxed);
121            final_res = Some(res);
122            if final_res.as_ref().unwrap().is_ok() {
123                break;
124            } else {
125                log::warn!("request_multi failed: {:?}", final_res);
126            }
127        }
128        final_res.unwrap()
129    }
130}
131
132#[async_trait::async_trait]
133impl BinderClient for MultiBinderClient {
134    async fn request(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
135        if request.is_idempotent() {
136            self.request_multi(request).await
137        } else {
138            self.request_one(request).await
139        }
140    }
141}