geph4_binder_transport/
lib.rs1mod wiretypes;
2
3use std::{
4 net::IpAddr,
5 sync::{atomic::AtomicUsize, atomic::Ordering, Arc},
6 time::Duration,
7};
8
9use smol_timeout::TimeoutExt;
10pub use wiretypes::*;
11mod http;
12pub use http::*;
13use rand::prelude::*;
14
15#[async_trait::async_trait]
17pub trait BinderClient: Sync + Send {
18 async fn request(&self, request: BinderRequestData) -> BinderResult<BinderResponse>;
20}
21
22pub trait BinderServer: Send + Sync {
24 fn next_request(&self) -> std::io::Result<BinderRequest>;
26}
27
28#[derive(derivative::Derivative)]
30#[derivative(Debug)]
31pub struct BinderRequest {
32 pub request_data: BinderRequestData,
33 probable_ip: Option<IpAddr>,
34 #[derivative(Debug = "ignore")]
35 response_send: Box<dyn FnOnce(BinderResult<BinderResponse>) + Send + Sync>,
36}
37
38impl BinderRequest {
39 pub fn respond(self, response: BinderResult<BinderResponse>) {
41 (self.response_send)(response)
42 }
43
44 pub fn probable_ip(&self) -> Option<IpAddr> {
46 self.probable_ip
47 }
48}
49
50pub struct MultiBinderClient {
52 clients: Vec<Arc<dyn BinderClient>>,
53 index: AtomicUsize,
54}
55
56impl MultiBinderClient {
57 pub fn empty() -> Self {
59 MultiBinderClient {
60 clients: Vec::new(),
61 index: AtomicUsize::new(0),
62 }
63 }
64
65 pub fn add_client(mut self, new: impl BinderClient + 'static) -> Self {
67 self.clients.push(Arc::new(new));
68 self.clients.shuffle(&mut rand::thread_rng());
69 self
70 }
71}
72
73impl MultiBinderClient {
74 async fn request_one(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
76 let mut timeout = Duration::from_secs(3);
77 loop {
78 let curr_idx = self.index.fetch_add(1, Ordering::Relaxed);
79 let client = &self.clients[curr_idx % self.clients.len()];
80 log::trace!("request_one started");
81 let res = client.request(request.clone()).timeout(timeout).await;
82 if let Some(Ok(res)) = res {
83 log::trace!("request_one succeeded");
84 self.index.fetch_sub(1, Ordering::Relaxed);
85 return Ok(res);
86 } else if let Some(Err(BinderError::Network(other))) = res {
87 log::warn!("MultiBinderClient switching backend due to error {}", other);
88 } else if let Some(e) = res {
89 return e;
90 } else {
91 log::warn!(
92 "MultiBinderClient switching backend due to timeout {:?}",
93 timeout
94 );
95 timeout += Duration::from_secs(1);
96 }
97 }
98 }
99
100 async fn request_multi(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
102 let (send_res, recv_res) = smol::channel::unbounded();
103 let mut _tasks = vec![];
104 for (idx, client) in self.clients.iter().enumerate() {
105 let client = client.clone();
106 let request = request.clone();
107 let send_res = send_res.clone();
108 _tasks.push(smolscale::spawn(async move {
109 let _ = send_res.send((idx, client.request(request).await)).await;
110 }));
111 }
112 drop(send_res);
113
114 let mut final_res = None;
115 for _ in 0.._tasks.len() {
116 let (idx, res) = recv_res
117 .recv()
118 .await
119 .expect("result channel shouldn't have closed");
120 self.index.store(idx, Ordering::Relaxed);
121 final_res = Some(res);
122 if final_res.as_ref().unwrap().is_ok() {
123 break;
124 } else {
125 log::warn!("request_multi failed: {:?}", final_res);
126 }
127 }
128 final_res.unwrap()
129 }
130}
131
132#[async_trait::async_trait]
133impl BinderClient for MultiBinderClient {
134 async fn request(&self, request: BinderRequestData) -> BinderResult<BinderResponse> {
135 if request.is_idempotent() {
136 self.request_multi(request).await
137 } else {
138 self.request_one(request).await
139 }
140 }
141}