simperby_network/
peers.rs

1use super::*;
2use async_trait::async_trait;
3use eyre::{eyre, Result};
4use serde_tc::http::*;
5use serde_tc::{serde_tc_full, StubCall};
6use simperby_core::serde_spb;
7use simperby_core::BlockHeader;
8use simperby_core::FinalizationInfo;
9use std::collections::BTreeMap;
10use std::sync::Arc;
11use tokio::fs::File;
12use tokio::io::AsyncReadExt;
13use tokio::io::AsyncWriteExt;
14use tokio::sync::RwLock;
15
16#[derive(Debug)]
17struct PeerStorage {
18    path: String,
19}
20
21impl PeerStorage {
22    pub async fn new(path: &str) -> Result<Self> {
23        Ok(Self {
24            path: path.to_owned(),
25        })
26    }
27
28    pub async fn write(&mut self, peers: Vec<Peer>) -> Result<()> {
29        let _ = tokio::fs::remove_file(&self.path).await;
30        let mut file = File::create(&self.path).await?;
31        file.write_all(serde_spb::to_string(&peers)?.as_bytes())
32            .await?;
33        file.flush().await?;
34        Ok(())
35    }
36
37    pub async fn read(&self) -> Result<Vec<Peer>> {
38        let mut file = File::open(&self.path).await?;
39        let peers: Vec<Peer> = serde_spb::from_str(&{
40            let mut buf = String::new();
41            file.read_to_string(&mut buf).await?;
42            buf
43        })?;
44        Ok(peers)
45    }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
49pub struct PingResponse {
50    pub last_finalized_block_header: BlockHeader,
51    pub public_key: PublicKey,
52    pub timestamp: Timestamp,
53    pub msg: String,
54}
55
56#[serde_tc_full]
57pub(super) trait PeerRpcInterface: Send + Sync + 'static {
58    /// Requests to response some packets.
59    async fn ping(&self) -> Result<PingResponse, String>;
60    /// Requests to response the port map of this node.
61    async fn port_map(&self) -> Result<BTreeMap<String, u16>, String>;
62}
63
64pub struct PeerRpcImpl {
65    peers: Arc<RwLock<Peers>>,
66    port_map: BTreeMap<String, u16>,
67}
68
69/// Server-side implementation of the RPC interface.
70#[async_trait]
71impl PeerRpcInterface for PeerRpcImpl {
72    async fn ping(&self) -> Result<PingResponse, String> {
73        let peers = self.peers.read().await;
74        Ok(PingResponse {
75            public_key: peers.private_key.public_key(),
76            timestamp: simperby_core::utils::get_timestamp(),
77            msg: "hello?".to_string(),
78            last_finalized_block_header: peers.lfi.header.clone(),
79        })
80    }
81
82    async fn port_map(&self) -> Result<BTreeMap<String, u16>, String> {
83        Ok(self.port_map.clone())
84    }
85}
86
87#[derive(Debug)]
88pub struct Peers {
89    storage: PeerStorage,
90    lfi: FinalizationInfo,
91    private_key: PrivateKey,
92}
93
94impl Peers {
95    pub async fn new(path: &str, lfi: FinalizationInfo, private_key: PrivateKey) -> Result<Self> {
96        let storage = PeerStorage::new(path).await?;
97        Ok(Self {
98            storage,
99            lfi,
100            private_key,
101        })
102    }
103
104    pub async fn update_block(&mut self, lfi: FinalizationInfo) -> Result<()> {
105        let peers = self.storage.read().await?;
106        self.storage.write(vec![]).await?;
107        for peer in peers {
108            self.add_peer(peer.name, peer.address).await?;
109        }
110        self.lfi = lfi;
111        Ok(())
112    }
113
114    /// Adds a peer to the list of known peers. This will try to connect to the peer and ask information.
115    ///
116    /// - `name` - the name of the peer as it is known in the reserved state.
117    /// - `addr` - the address of the peer. The port must be the one of the peer discovery RPC.
118    pub async fn add_peer(&mut self, name: MemberName, addr: SocketAddrV4) -> Result<()> {
119        let peer = Peer {
120            public_key: self
121                .lfi
122                .reserved_state
123                .query_public_key(&name)
124                .ok_or_else(|| eyre!("peer does not exist: {}", name))?,
125            name,
126            address: addr,
127            ports: Default::default(),
128            message: "".to_owned(),
129            recently_seen_timestamp: 0,
130        };
131        let mut peers = self.storage.read().await?;
132        peers.push(peer);
133        self.storage.write(peers).await?;
134        Ok(())
135    }
136
137    /// Removes a peer in the list of known peers.
138    pub async fn remove_peer(&mut self, name: MemberName) -> Result<()> {
139        let mut peers = self.storage.read().await?;
140        let index = peers
141            .iter()
142            .position(|peer| peer.name == name)
143            .ok_or_else(|| eyre!("peer does not exist: {}", name))?;
144        peers.remove(index);
145        self.storage.write(peers).await?;
146        Ok(())
147    }
148
149    /// Performs the actual peer update (including discovery) and applies to the storage.
150    pub async fn update(&mut self) -> Result<()> {
151        let peers = self.storage.read().await?;
152        let mut new_peers = Vec::new();
153
154        for peer in peers {
155            let stub = PeerRpcInterfaceStub::new(Box::new(HttpClient::new(
156                format!("{}:{}/peer", peer.address.ip(), peer.address.port()),
157                reqwest::Client::new(),
158            )));
159            stub.ping()
160                .await
161                .map_err(|e| eyre!("failed to ping peer {}: {}", peer.name, e))?
162                .map_err(|e| eyre!("failed to ping peer {}: {}", peer.name, e))?;
163            let ports = stub
164                .port_map()
165                .await
166                .map_err(|e| eyre!("failed to get port map {}: {}", peer.name, e))?
167                .map_err(|e| eyre!("failed to get port map {}: {}", peer.name, e))?;
168
169            let mut new_peer = peer.clone();
170            new_peer.ports = ports;
171            new_peers.push(new_peer);
172        }
173        self.storage.write(new_peers).await?;
174        Ok(())
175    }
176
177    pub async fn list_peers(&self) -> Result<Vec<Peer>> {
178        self.storage.read().await
179    }
180
181    pub async fn serve(
182        this: Arc<RwLock<Peers>>,
183        port_map: BTreeMap<String, u16>,
184        server_network_config: ServerNetworkConfig,
185    ) -> Result<(), Error> {
186        run_server(
187            server_network_config.port,
188            [(
189                "peer".to_owned(),
190                create_http_object(Arc::new(PeerRpcImpl {
191                    peers: Arc::clone(&this),
192                    port_map,
193                }) as Arc<dyn PeerRpcInterface>),
194            )]
195            .iter()
196            .cloned()
197            .collect(),
198        )
199        .await;
200        std::future::pending::<()>().await;
201        Ok(())
202    }
203}