1use super::*;
2use crate::keys;
3use simperby_core::utils::get_timestamp;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
6pub struct PingResponse {
7 pub public_key: PublicKey,
8 pub timestamp: Timestamp,
9 pub msg: String,
10}
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
13pub struct PeerStatus {
14 pub public_key: PublicKey,
15 pub address: std::net::SocketAddr,
16 pub last_ping: String,
17 pub last_observed_timestamp: Timestamp,
18 pub last_claimed_local_timestamp: Timestamp,
19 pub last_msg: String,
20}
21
22#[serde_tc_full]
24pub(super) trait DistributedMessageSetRpcInterface: Send + Sync + 'static {
25 async fn request_packets(&self) -> Result<Vec<Packet>, String>;
27
28 async fn send_packets(&self, packets: Vec<Packet>) -> Result<(), String>;
30
31 async fn ping(&self) -> Result<PingResponse, String>;
32}
33
34pub(super) struct DmsWrapper<S: Storage, M: DmsMessage> {
35 #[allow(clippy::type_complexity)]
36 pub(super) dms: Arc<parking_lot::RwLock<Option<Arc<RwLock<DistributedMessageSet<S, M>>>>>>,
39}
40
41#[async_trait]
43impl<S: Storage, M: DmsMessage> DistributedMessageSetRpcInterface for DmsWrapper<S, M> {
44 async fn request_packets(&self) -> Result<Vec<Packet>, String> {
45 let dms = Arc::clone(
46 self.dms
47 .read()
48 .as_ref()
49 .ok_or_else(|| "server terminated".to_owned())?,
50 );
51 let packets = dms
52 .read()
53 .await
54 .retrieve_packets()
55 .await
56 .map_err(|e| e.to_string())?;
57 Ok(packets)
58 }
59
60 async fn send_packets(&self, packets: Vec<Packet>) -> Result<(), String> {
61 let dms = Arc::clone(
62 self.dms
63 .read()
64 .as_ref()
65 .ok_or_else(|| "server terminated".to_owned())?,
66 );
67 for packet in packets {
68 dms.write()
69 .await
70 .receive_packet(packet)
71 .await
72 .map_err(|e| e.to_string())?;
73 }
74 Ok(())
75 }
76
77 async fn ping(&self) -> Result<PingResponse, String> {
78 let dms = Arc::clone(
79 self.dms
80 .read()
81 .as_ref()
82 .ok_or_else(|| "server terminated".to_owned())?,
83 );
84 let public_key = dms.read().await.private_key.public_key();
85 Ok(PingResponse {
86 public_key,
87 timestamp: get_timestamp(),
88 msg: "hello?".to_string(),
89 })
90 }
91}
92
93impl<S: Storage, M: DmsMessage> DistributedMessageSet<S, M> {
94 pub async fn fetch(
97 this: Arc<RwLock<Self>>,
98 network_config: &ClientNetworkConfig,
99 ) -> Result<(), Error> {
100 let mut tasks = Vec::new();
101 for peer in &network_config.peers {
102 let this_ = Arc::clone(&this);
103 let task = async move {
104 let this_read = this_.read().await;
105 let port_key = keys::port_key_dms::<M>();
106 let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
107 format!(
108 "{}:{}/dms",
109 peer.address.ip(),
110 peer.ports
111 .get(&port_key)
112 .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
113 ),
114 reqwest::Client::new(),
115 )));
116 let packets = stub
117 .request_packets()
118 .await
119 .map_err(|e| eyre!("{}", e))?
120 .map_err(|e| eyre!(e))?;
121 drop(this_read);
123 for packet in packets {
124 this_.write().await.receive_packet(packet).await?;
125 }
126 Result::<(), Error>::Ok(())
127 };
128 tasks.push(task);
129 }
130 let results = future::join_all(tasks).await;
131 for (result, peer) in results.into_iter().zip(network_config.peers.iter()) {
132 if let Err(e) = result {
133 log::warn!("failed to fetch from client {:?}: {}", peer, e);
134 }
135 }
136 Ok(())
137 }
138
139 pub async fn broadcast(
145 this: Arc<RwLock<Self>>,
146 network_config: &ClientNetworkConfig,
147 ) -> Result<(), Error> {
148 let mut tasks_and_messages = Vec::new();
149
150 let packets = this.read().await.retrieve_packets().await?;
151 if packets.is_empty() {
152 return Ok(());
153 }
154 for peer in &network_config.peers {
155 let port_key = keys::port_key_dms::<M>();
156 let packets_ = packets.clone();
157 let task = async move {
158 let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
159 format!(
160 "{}:{}/dms",
161 peer.address.ip(),
162 peer.ports
163 .get(&port_key)
164 .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
165 ),
166 reqwest::Client::new(),
167 )));
168 stub.send_packets(packets_.clone())
169 .await
170 .map_err(|e| eyre!(e))?
171 .map_err(|e| eyre!(e))?;
172 Result::<(), Error>::Ok(())
173 };
174 tasks_and_messages.push((task, format!("RPC message add to {}", peer.public_key)));
175 }
176 let (tasks, messages) = tasks_and_messages
177 .into_iter()
178 .unzip::<_, _, Vec<_>, Vec<_>>();
179
180 let results = future::join_all(tasks).await;
181 for (result, msg) in results.into_iter().zip(messages.iter()) {
182 if let Err(e) = result {
183 log::warn!("failure in {}: {}", msg, e);
184 }
185 }
186 Ok(())
187 }
188
189 pub async fn get_peer_status(
190 this: Arc<RwLock<Self>>,
191 network_config: &ClientNetworkConfig,
192 ) -> Result<Vec<PeerStatus>, Error> {
193 let mut tasks = Vec::new();
194 for peer in &network_config.peers {
195 let this_ = Arc::clone(&this);
196 let task = async move {
197 let this_read = this_.read().await;
198 let port_key = keys::port_key_dms::<M>();
199 let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
200 format!(
201 "{}:{}/dms",
202 peer.address.ip(),
203 peer.ports
204 .get(&port_key)
205 .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
206 ),
207 reqwest::Client::new(),
208 )));
209 let ping_response = stub
210 .ping()
211 .await
212 .map_err(|e| eyre!("{}", e))?
213 .map_err(|e| eyre!(e))?;
214 drop(this_read);
216
217 if peer.public_key != ping_response.public_key {
218 return Err(eyre!(
219 "peer public key mismatch: expected {}, got {}",
220 peer.public_key,
221 ping_response.public_key
222 ));
223 }
224 Result::<(), Error>::Ok(())
225 };
226 tasks.push(task);
227 }
228 let results = future::join_all(tasks).await;
229 let mut final_results = Vec::new();
230 let port_key = keys::port_key_dms::<M>();
231
232 for (result, peer) in results.into_iter().zip(network_config.peers.iter()) {
233 let ping = if let Err(e) = result {
234 log::warn!("failed to ping from client {:?}: {}", peer, e);
235 format!("failed: {}", e)
236 } else {
237 "success".to_owned()
238 };
239
240 let port = peer
241 .ports
242 .get(&port_key)
243 .ok_or_else(|| eyre!("can't find port key: {}", port_key))?;
244
245 final_results.push(PeerStatus {
246 public_key: peer.public_key.clone(),
247 address: format!("{}:{}", peer.address.ip(), port)
248 .parse()
249 .expect("valid address"),
250 last_ping: ping,
251 last_observed_timestamp: 0, last_claimed_local_timestamp: 0, last_msg: "todo".to_owned(),
254 });
255 }
256 Ok(final_results)
257 }
258}