simperby_network/
peers.rs1use super::*;
2use async_trait::async_trait;
3use eyre::{eyre, Result};
4use serde_tc::http::*;
5use serde_tc::{serde_tc_full, StubCall};
6use simperby_core::serde_spb;
7use simperby_core::BlockHeader;
8use simperby_core::FinalizationInfo;
9use std::collections::BTreeMap;
10use std::sync::Arc;
11use tokio::fs::File;
12use tokio::io::AsyncReadExt;
13use tokio::io::AsyncWriteExt;
14use tokio::sync::RwLock;
15
16#[derive(Debug)]
17struct PeerStorage {
18 path: String,
19}
20
21impl PeerStorage {
22 pub async fn new(path: &str) -> Result<Self> {
23 Ok(Self {
24 path: path.to_owned(),
25 })
26 }
27
28 pub async fn write(&mut self, peers: Vec<Peer>) -> Result<()> {
29 let _ = tokio::fs::remove_file(&self.path).await;
30 let mut file = File::create(&self.path).await?;
31 file.write_all(serde_spb::to_string(&peers)?.as_bytes())
32 .await?;
33 file.flush().await?;
34 Ok(())
35 }
36
37 pub async fn read(&self) -> Result<Vec<Peer>> {
38 let mut file = File::open(&self.path).await?;
39 let peers: Vec<Peer> = serde_spb::from_str(&{
40 let mut buf = String::new();
41 file.read_to_string(&mut buf).await?;
42 buf
43 })?;
44 Ok(peers)
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
49pub struct PingResponse {
50 pub last_finalized_block_header: BlockHeader,
51 pub public_key: PublicKey,
52 pub timestamp: Timestamp,
53 pub msg: String,
54}
55
56#[serde_tc_full]
57pub(super) trait PeerRpcInterface: Send + Sync + 'static {
58 async fn ping(&self) -> Result<PingResponse, String>;
60 async fn port_map(&self) -> Result<BTreeMap<String, u16>, String>;
62}
63
64pub struct PeerRpcImpl {
65 peers: Arc<RwLock<Peers>>,
66 port_map: BTreeMap<String, u16>,
67}
68
69#[async_trait]
71impl PeerRpcInterface for PeerRpcImpl {
72 async fn ping(&self) -> Result<PingResponse, String> {
73 let peers = self.peers.read().await;
74 Ok(PingResponse {
75 public_key: peers.private_key.public_key(),
76 timestamp: simperby_core::utils::get_timestamp(),
77 msg: "hello?".to_string(),
78 last_finalized_block_header: peers.lfi.header.clone(),
79 })
80 }
81
82 async fn port_map(&self) -> Result<BTreeMap<String, u16>, String> {
83 Ok(self.port_map.clone())
84 }
85}
86
87#[derive(Debug)]
88pub struct Peers {
89 storage: PeerStorage,
90 lfi: FinalizationInfo,
91 private_key: PrivateKey,
92}
93
94impl Peers {
95 pub async fn new(path: &str, lfi: FinalizationInfo, private_key: PrivateKey) -> Result<Self> {
96 let storage = PeerStorage::new(path).await?;
97 Ok(Self {
98 storage,
99 lfi,
100 private_key,
101 })
102 }
103
104 pub async fn update_block(&mut self, lfi: FinalizationInfo) -> Result<()> {
105 let peers = self.storage.read().await?;
106 self.storage.write(vec![]).await?;
107 for peer in peers {
108 self.add_peer(peer.name, peer.address).await?;
109 }
110 self.lfi = lfi;
111 Ok(())
112 }
113
114 pub async fn add_peer(&mut self, name: MemberName, addr: SocketAddrV4) -> Result<()> {
119 let peer = Peer {
120 public_key: self
121 .lfi
122 .reserved_state
123 .query_public_key(&name)
124 .ok_or_else(|| eyre!("peer does not exist: {}", name))?,
125 name,
126 address: addr,
127 ports: Default::default(),
128 message: "".to_owned(),
129 recently_seen_timestamp: 0,
130 };
131 let mut peers = self.storage.read().await?;
132 peers.push(peer);
133 self.storage.write(peers).await?;
134 Ok(())
135 }
136
137 pub async fn remove_peer(&mut self, name: MemberName) -> Result<()> {
139 let mut peers = self.storage.read().await?;
140 let index = peers
141 .iter()
142 .position(|peer| peer.name == name)
143 .ok_or_else(|| eyre!("peer does not exist: {}", name))?;
144 peers.remove(index);
145 self.storage.write(peers).await?;
146 Ok(())
147 }
148
149 pub async fn update(&mut self) -> Result<()> {
151 let peers = self.storage.read().await?;
152 let mut new_peers = Vec::new();
153
154 for peer in peers {
155 let stub = PeerRpcInterfaceStub::new(Box::new(HttpClient::new(
156 format!("{}:{}/peer", peer.address.ip(), peer.address.port()),
157 reqwest::Client::new(),
158 )));
159 stub.ping()
160 .await
161 .map_err(|e| eyre!("failed to ping peer {}: {}", peer.name, e))?
162 .map_err(|e| eyre!("failed to ping peer {}: {}", peer.name, e))?;
163 let ports = stub
164 .port_map()
165 .await
166 .map_err(|e| eyre!("failed to get port map {}: {}", peer.name, e))?
167 .map_err(|e| eyre!("failed to get port map {}: {}", peer.name, e))?;
168
169 let mut new_peer = peer.clone();
170 new_peer.ports = ports;
171 new_peers.push(new_peer);
172 }
173 self.storage.write(new_peers).await?;
174 Ok(())
175 }
176
177 pub async fn list_peers(&self) -> Result<Vec<Peer>> {
178 self.storage.read().await
179 }
180
181 pub async fn serve(
182 this: Arc<RwLock<Peers>>,
183 port_map: BTreeMap<String, u16>,
184 server_network_config: ServerNetworkConfig,
185 ) -> Result<(), Error> {
186 run_server(
187 server_network_config.port,
188 [(
189 "peer".to_owned(),
190 create_http_object(Arc::new(PeerRpcImpl {
191 peers: Arc::clone(&this),
192 port_map,
193 }) as Arc<dyn PeerRpcInterface>),
194 )]
195 .iter()
196 .cloned()
197 .collect(),
198 )
199 .await;
200 std::future::pending::<()>().await;
201 Ok(())
202 }
203}