simperby_network/dms/
rpc.rs

1use super::*;
2use crate::keys;
3use simperby_core::utils::get_timestamp;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
6pub struct PingResponse {
7    pub public_key: PublicKey,
8    pub timestamp: Timestamp,
9    pub msg: String,
10}
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
13pub struct PeerStatus {
14    pub public_key: PublicKey,
15    pub address: std::net::SocketAddr,
16    pub last_ping: String,
17    pub last_observed_timestamp: Timestamp,
18    pub last_claimed_local_timestamp: Timestamp,
19    pub last_msg: String,
20}
21
22/// The interface that will be wrapped into an HTTP RPC server for the peers.
23#[serde_tc_full]
24pub(super) trait DistributedMessageSetRpcInterface: Send + Sync + 'static {
25    /// Requests to response some packets.
26    async fn request_packets(&self) -> Result<Vec<Packet>, String>;
27
28    /// Sends packets to the peer.
29    async fn send_packets(&self, packets: Vec<Packet>) -> Result<(), String>;
30
31    async fn ping(&self) -> Result<PingResponse, String>;
32}
33
34pub(super) struct DmsWrapper<S: Storage, M: DmsMessage> {
35    #[allow(clippy::type_complexity)]
36    /// This is an `Option` because we have to explicitly drop the server
37    /// (it could live forever in the RPC server (`axum`) otherwise)
38    pub(super) dms: Arc<parking_lot::RwLock<Option<Arc<RwLock<DistributedMessageSet<S, M>>>>>>,
39}
40
41/// Server-side implementation of the RPC interface.
42#[async_trait]
43impl<S: Storage, M: DmsMessage> DistributedMessageSetRpcInterface for DmsWrapper<S, M> {
44    async fn request_packets(&self) -> Result<Vec<Packet>, String> {
45        let dms = Arc::clone(
46            self.dms
47                .read()
48                .as_ref()
49                .ok_or_else(|| "server terminated".to_owned())?,
50        );
51        let packets = dms
52            .read()
53            .await
54            .retrieve_packets()
55            .await
56            .map_err(|e| e.to_string())?;
57        Ok(packets)
58    }
59
60    async fn send_packets(&self, packets: Vec<Packet>) -> Result<(), String> {
61        let dms = Arc::clone(
62            self.dms
63                .read()
64                .as_ref()
65                .ok_or_else(|| "server terminated".to_owned())?,
66        );
67        for packet in packets {
68            dms.write()
69                .await
70                .receive_packet(packet)
71                .await
72                .map_err(|e| e.to_string())?;
73        }
74        Ok(())
75    }
76
77    async fn ping(&self) -> Result<PingResponse, String> {
78        let dms = Arc::clone(
79            self.dms
80                .read()
81                .as_ref()
82                .ok_or_else(|| "server terminated".to_owned())?,
83        );
84        let public_key = dms.read().await.private_key.public_key();
85        Ok(PingResponse {
86            public_key,
87            timestamp: get_timestamp(),
88            msg: "hello?".to_string(),
89        })
90    }
91}
92
93impl<S: Storage, M: DmsMessage> DistributedMessageSet<S, M> {
94    /// Fetches unknown messages from the peers using an RPC protocol,
95    /// and adds them to the local storage.
96    pub async fn fetch(
97        this: Arc<RwLock<Self>>,
98        network_config: &ClientNetworkConfig,
99    ) -> Result<(), Error> {
100        let mut tasks = Vec::new();
101        for peer in &network_config.peers {
102            let this_ = Arc::clone(&this);
103            let task = async move {
104                let this_read = this_.read().await;
105                let port_key = keys::port_key_dms::<M>();
106                let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
107                    format!(
108                        "{}:{}/dms",
109                        peer.address.ip(),
110                        peer.ports
111                            .get(&port_key)
112                            .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
113                    ),
114                    reqwest::Client::new(),
115                )));
116                let packets = stub
117                    .request_packets()
118                    .await
119                    .map_err(|e| eyre!("{}", e))?
120                    .map_err(|e| eyre!(e))?;
121                // Important: drop the lock before `write()`
122                drop(this_read);
123                for packet in packets {
124                    this_.write().await.receive_packet(packet).await?;
125                }
126                Result::<(), Error>::Ok(())
127            };
128            tasks.push(task);
129        }
130        let results = future::join_all(tasks).await;
131        for (result, peer) in results.into_iter().zip(network_config.peers.iter()) {
132            if let Err(e) = result {
133                log::warn!("failed to fetch from client {:?}: {}", peer, e);
134            }
135        }
136        Ok(())
137    }
138
139    /// Tries to broadcast all the message that this DMS instance has.
140    ///
141    /// Note: this function may take just `&self` due to its simple implementation,
142    /// but keeps `Arc<RwLock<Self>>` to make sure the interface to indicate
143    /// that this is a network-involved method (unlike others)
144    pub async fn broadcast(
145        this: Arc<RwLock<Self>>,
146        network_config: &ClientNetworkConfig,
147    ) -> Result<(), Error> {
148        let mut tasks_and_messages = Vec::new();
149
150        let packets = this.read().await.retrieve_packets().await?;
151        if packets.is_empty() {
152            return Ok(());
153        }
154        for peer in &network_config.peers {
155            let port_key = keys::port_key_dms::<M>();
156            let packets_ = packets.clone();
157            let task = async move {
158                let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
159                    format!(
160                        "{}:{}/dms",
161                        peer.address.ip(),
162                        peer.ports
163                            .get(&port_key)
164                            .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
165                    ),
166                    reqwest::Client::new(),
167                )));
168                stub.send_packets(packets_.clone())
169                    .await
170                    .map_err(|e| eyre!(e))?
171                    .map_err(|e| eyre!(e))?;
172                Result::<(), Error>::Ok(())
173            };
174            tasks_and_messages.push((task, format!("RPC message add to {}", peer.public_key)));
175        }
176        let (tasks, messages) = tasks_and_messages
177            .into_iter()
178            .unzip::<_, _, Vec<_>, Vec<_>>();
179
180        let results = future::join_all(tasks).await;
181        for (result, msg) in results.into_iter().zip(messages.iter()) {
182            if let Err(e) = result {
183                log::warn!("failure in {}: {}", msg, e);
184            }
185        }
186        Ok(())
187    }
188
189    pub async fn get_peer_status(
190        this: Arc<RwLock<Self>>,
191        network_config: &ClientNetworkConfig,
192    ) -> Result<Vec<PeerStatus>, Error> {
193        let mut tasks = Vec::new();
194        for peer in &network_config.peers {
195            let this_ = Arc::clone(&this);
196            let task = async move {
197                let this_read = this_.read().await;
198                let port_key = keys::port_key_dms::<M>();
199                let stub = DistributedMessageSetRpcInterfaceStub::new(Box::new(HttpClient::new(
200                    format!(
201                        "{}:{}/dms",
202                        peer.address.ip(),
203                        peer.ports
204                            .get(&port_key)
205                            .ok_or_else(|| eyre!("can't find port key: {}", port_key))?
206                    ),
207                    reqwest::Client::new(),
208                )));
209                let ping_response = stub
210                    .ping()
211                    .await
212                    .map_err(|e| eyre!("{}", e))?
213                    .map_err(|e| eyre!(e))?;
214                // Important: drop the lock before `write()`
215                drop(this_read);
216
217                if peer.public_key != ping_response.public_key {
218                    return Err(eyre!(
219                        "peer public key mismatch: expected {}, got {}",
220                        peer.public_key,
221                        ping_response.public_key
222                    ));
223                }
224                Result::<(), Error>::Ok(())
225            };
226            tasks.push(task);
227        }
228        let results = future::join_all(tasks).await;
229        let mut final_results = Vec::new();
230        let port_key = keys::port_key_dms::<M>();
231
232        for (result, peer) in results.into_iter().zip(network_config.peers.iter()) {
233            let ping = if let Err(e) = result {
234                log::warn!("failed to ping from client {:?}: {}", peer, e);
235                format!("failed: {}", e)
236            } else {
237                "success".to_owned()
238            };
239
240            let port = peer
241                .ports
242                .get(&port_key)
243                .ok_or_else(|| eyre!("can't find port key: {}", port_key))?;
244
245            final_results.push(PeerStatus {
246                public_key: peer.public_key.clone(),
247                address: format!("{}:{}", peer.address.ip(), port)
248                    .parse()
249                    .expect("valid address"),
250                last_ping: ping,
251                last_observed_timestamp: 0,      // TODO
252                last_claimed_local_timestamp: 0, // TODO
253                last_msg: "todo".to_owned(),
254            });
255        }
256        Ok(final_results)
257    }
258}