1mod types;
5pub use types::*;
6
7use std::any::Any;
8use std::num::NonZeroU64;
9use std::str::FromStr;
10use std::sync::{Arc, OnceLock};
11use std::time::Instant;
12
13use crate::libp2p::chain_exchange::TipsetBundle;
14use crate::libp2p::{NetRPCMethods, NetworkMessage, PeerId};
15use crate::rpc::types::ApiTipsetKey;
16use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
17use anyhow::{Context as _, Result};
18use cid::multibase;
19use enumflags2::BitFlags;
20use fvm_ipld_blockstore::Blockstore;
21use itertools::Itertools as _;
22
23pub enum NetAddrsListen {}
24impl RpcMethod<0> for NetAddrsListen {
25 const NAME: &'static str = "Filecoin.NetAddrsListen";
26 const PARAM_NAMES: [&'static str; 0] = [];
27 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
28 const PERMISSION: Permission = Permission::Read;
29 const DESCRIPTION: Option<&'static str> =
30 Some("Returns a list of listening addresses and the peer ID.");
31
32 type Params = ();
33 type Ok = AddrInfo;
34
35 async fn handle(
36 ctx: Ctx<impl Blockstore>,
37 (): Self::Params,
38 _: &http::Extensions,
39 ) -> Result<Self::Ok, ServerError> {
40 let (tx, rx) = flume::bounded(1);
41 let req = NetworkMessage::JSONRPCRequest {
42 method: NetRPCMethods::AddrsListen(tx),
43 };
44
45 ctx.network_send().send_async(req).await?;
46 let (id, addrs) = rx.recv_async().await?;
47
48 Ok(AddrInfo::new(id, addrs))
49 }
50}
51
52pub enum NetPeers {}
53impl RpcMethod<0> for NetPeers {
54 const NAME: &'static str = "Filecoin.NetPeers";
55 const PARAM_NAMES: [&'static str; 0] = [];
56 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
57 const PERMISSION: Permission = Permission::Read;
58 const DESCRIPTION: Option<&'static str> = Some("Returns a list of currently connected peers.");
59
60 type Params = ();
61 type Ok = Vec<AddrInfo>;
62
63 async fn handle(
64 ctx: Ctx<impl Blockstore>,
65 (): Self::Params,
66 _: &http::Extensions,
67 ) -> Result<Self::Ok, ServerError> {
68 let (tx, rx) = flume::bounded(1);
69 let req = NetworkMessage::JSONRPCRequest {
70 method: NetRPCMethods::Peers(tx),
71 };
72
73 ctx.network_send().send_async(req).await?;
74 let peer_addresses = rx.recv_async().await?;
75
76 let connections = peer_addresses
77 .into_iter()
78 .map(|(id, addrs)| AddrInfo::new(id, addrs))
79 .collect();
80
81 Ok(connections)
82 }
83}
84
85pub enum NetFindPeer {}
86impl RpcMethod<1> for NetFindPeer {
87 const NAME: &'static str = "Filecoin.NetFindPeer";
88 const PARAM_NAMES: [&'static str; 1] = ["peer_id"];
89 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
90 const PERMISSION: Permission = Permission::Read;
91
92 type Params = (String,);
93 type Ok = AddrInfo;
94
95 async fn handle(
96 ctx: Ctx<impl Blockstore>,
97 (peer_id,): Self::Params,
98 _: &http::Extensions,
99 ) -> Result<Self::Ok, ServerError> {
100 let peer_id = PeerId::from_str(&peer_id)?;
101 let (tx, rx) = flume::bounded(1);
102 ctx.network_send()
103 .send_async(NetworkMessage::JSONRPCRequest {
104 method: NetRPCMethods::Peer(tx, peer_id),
105 })
106 .await?;
107 let addrs = rx
108 .recv_async()
109 .await?
110 .with_context(|| format!("peer {peer_id} not found"))?;
111 Ok(AddrInfo::new(peer_id, addrs))
112 }
113}
114
115pub enum NetListening {}
116impl RpcMethod<0> for NetListening {
117 const NAME: &'static str = "Filecoin.NetListening";
118 const PARAM_NAMES: [&'static str; 0] = [];
119 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
120 const PERMISSION: Permission = Permission::Read;
121 const NAME_ALIAS: Option<&'static str> = Some("net_listening");
122
123 type Params = ();
124 type Ok = bool;
125
126 async fn handle(
127 _: Ctx<impl Any>,
128 (): Self::Params,
129 _: &http::Extensions,
130 ) -> Result<Self::Ok, ServerError> {
131 Ok(true)
132 }
133}
134
135pub enum NetInfo {}
136impl RpcMethod<0> for NetInfo {
137 const NAME: &'static str = "Forest.NetInfo";
138 const PARAM_NAMES: [&'static str; 0] = [];
139 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
140 const PERMISSION: Permission = Permission::Read;
141
142 type Params = ();
143 type Ok = NetInfoResult;
144
145 async fn handle(
146 ctx: Ctx<impl Blockstore>,
147 (): Self::Params,
148 _: &http::Extensions,
149 ) -> Result<Self::Ok, ServerError> {
150 let (tx, rx) = flume::bounded(1);
151 let req = NetworkMessage::JSONRPCRequest {
152 method: NetRPCMethods::Info(tx),
153 };
154
155 ctx.network_send().send_async(req).await?;
156 Ok(rx.recv_async().await?)
157 }
158}
159
160pub enum NetConnect {}
161impl RpcMethod<1> for NetConnect {
162 const NAME: &'static str = "Filecoin.NetConnect";
163 const PARAM_NAMES: [&'static str; 1] = ["peerAddressInfo"];
164 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
165 const PERMISSION: Permission = Permission::Write;
166 const DESCRIPTION: Option<&'static str> = Some("Connects to a specified peer.");
167
168 type Params = (AddrInfo,);
169 type Ok = ();
170
171 async fn handle(
172 ctx: Ctx<impl Blockstore>,
173 (AddrInfo { id, addrs },): Self::Params,
174 _: &http::Extensions,
175 ) -> Result<Self::Ok, ServerError> {
176 let (_, id) = multibase::decode(format!("{}{}", "z", id))?;
177 let peer_id = PeerId::from_bytes(&id)?;
178
179 let (tx, rx) = flume::bounded(1);
180 let req = NetworkMessage::JSONRPCRequest {
181 method: NetRPCMethods::Connect(tx, peer_id, addrs),
182 };
183
184 ctx.network_send().send_async(req).await?;
185 let success = rx.recv_async().await?;
186
187 if success {
188 Ok(())
189 } else {
190 Err(anyhow::anyhow!("Peer could not be dialed from any address provided").into())
191 }
192 }
193}
194
195pub enum NetDisconnect {}
196impl RpcMethod<1> for NetDisconnect {
197 const NAME: &'static str = "Filecoin.NetDisconnect";
198 const PARAM_NAMES: [&'static str; 1] = ["peerId"];
199 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
200 const PERMISSION: Permission = Permission::Write;
201 const DESCRIPTION: Option<&'static str> = Some("Disconnects from the specified peer.");
202
203 type Params = (String,);
204 type Ok = ();
205
206 async fn handle(
207 ctx: Ctx<impl Blockstore>,
208 (peer_id,): Self::Params,
209 _: &http::Extensions,
210 ) -> Result<Self::Ok, ServerError> {
211 let peer_id = PeerId::from_str(&peer_id)?;
212
213 let (tx, rx) = flume::bounded(1);
214 let req = NetworkMessage::JSONRPCRequest {
215 method: NetRPCMethods::Disconnect(tx, peer_id),
216 };
217
218 ctx.network_send().send_async(req).await?;
219 rx.recv_async().await?;
220
221 Ok(())
222 }
223}
224
225pub enum NetAgentVersion {}
226impl RpcMethod<1> for NetAgentVersion {
227 const NAME: &'static str = "Filecoin.NetAgentVersion";
228 const PARAM_NAMES: [&'static str; 1] = ["peerId"];
229 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
230 const PERMISSION: Permission = Permission::Read;
231 const DESCRIPTION: Option<&'static str> = Some("Returns the agent version string.");
232
233 type Params = (String,);
234 type Ok = String;
235
236 async fn handle(
237 ctx: Ctx<impl Blockstore>,
238 (peer_id,): Self::Params,
239 _: &http::Extensions,
240 ) -> Result<Self::Ok, ServerError> {
241 let peer_id = PeerId::from_str(&peer_id)?;
242 let (tx, rx) = flume::bounded(1);
243 ctx.network_send()
244 .send_async(NetworkMessage::JSONRPCRequest {
245 method: NetRPCMethods::AgentVersion(tx, peer_id),
246 })
247 .await?;
248 Ok(rx.recv_async().await?.context("item not found")?)
249 }
250}
251
252pub enum NetAutoNatStatus {}
253impl RpcMethod<0> for NetAutoNatStatus {
254 const NAME: &'static str = "Filecoin.NetAutoNatStatus";
255 const PARAM_NAMES: [&'static str; 0] = [];
256 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
257 const PERMISSION: Permission = Permission::Read;
258
259 type Params = ();
260 type Ok = NatStatusResult;
261
262 async fn handle(
263 ctx: Ctx<impl Blockstore>,
264 (): Self::Params,
265 _: &http::Extensions,
266 ) -> Result<Self::Ok, ServerError> {
267 let (tx, rx) = flume::bounded(1);
268 let req = NetworkMessage::JSONRPCRequest {
269 method: NetRPCMethods::AutoNATStatus(tx),
270 };
271 ctx.network_send().send_async(req).await?;
272 let nat_status = rx.recv_async().await?;
273 Ok(nat_status.into())
274 }
275}
276
277pub enum NetVersion {}
278impl RpcMethod<0> for NetVersion {
279 const NAME: &'static str = "Filecoin.NetVersion";
280 const PARAM_NAMES: [&'static str; 0] = [];
281 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
282 const PERMISSION: Permission = Permission::Read;
283 const NAME_ALIAS: Option<&'static str> = Some("net_version");
284
285 type Params = ();
286 type Ok = Arc<str>;
287
288 async fn handle(
289 ctx: Ctx<impl Blockstore>,
290 (): Self::Params,
291 _: &http::Extensions,
292 ) -> Result<Self::Ok, ServerError> {
293 static CACHED: OnceLock<Arc<str>> = OnceLock::new();
295 Ok(CACHED
296 .get_or_init(|| Arc::<str>::from(ctx.chain_config().eth_chain_id.to_string()))
297 .clone())
298 }
299}
300
301pub enum NetProtectAdd {}
302impl RpcMethod<1> for NetProtectAdd {
303 const NAME: &'static str = "Filecoin.NetProtectAdd";
304 const PARAM_NAMES: [&'static str; 1] = ["peerIdList"];
305 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
306 const PERMISSION: Permission = Permission::Admin;
307 const DESCRIPTION: Option<&'static str> = Some(
308 "Protects a peer from having its connection(s) pruned in the event the libp2p host reaches its maximum number of peers.",
309 );
310
311 type Params = (Vec<String>,);
312 type Ok = ();
313
314 async fn handle(
319 ctx: Ctx<impl Blockstore>,
320 (peer_ids,): Self::Params,
321 _: &http::Extensions,
322 ) -> Result<Self::Ok, ServerError> {
323 let peer_ids = peer_ids
324 .iter()
325 .map(String::as_str)
326 .map(PeerId::from_str)
327 .try_collect()?;
328 let (tx, rx) = flume::bounded(1);
329 ctx.network_send()
330 .send_async(NetworkMessage::JSONRPCRequest {
331 method: NetRPCMethods::ProtectPeer(tx, peer_ids),
332 })
333 .await?;
334 rx.recv_async().await?;
335 Ok(())
336 }
337}
338
339pub enum NetProtectList {}
340impl RpcMethod<0> for NetProtectList {
341 const NAME: &'static str = "Filecoin.NetProtectList";
342 const PARAM_NAMES: [&'static str; 0] = [];
343 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
344 const PERMISSION: Permission = Permission::Read;
345 const DESCRIPTION: Option<&'static str> = Some("Returns the current list of protected peers.");
346
347 type Params = ();
348 type Ok = Vec<String>;
349 async fn handle(
350 ctx: Ctx<impl Blockstore>,
351 (): Self::Params,
352 _: &http::Extensions,
353 ) -> Result<Self::Ok, ServerError> {
354 let (tx, rx) = flume::bounded(1);
355 ctx.network_send()
356 .send_async(NetworkMessage::JSONRPCRequest {
357 method: NetRPCMethods::ListProtectedPeers(tx),
358 })
359 .await?;
360 let peers = rx.recv_async().await?;
361 Ok(peers.into_iter().map(|p| p.to_string()).collect())
362 }
363}
364
365pub enum NetProtectRemove {}
366impl RpcMethod<1> for NetProtectRemove {
367 const NAME: &'static str = "Filecoin.NetProtectRemove";
368 const PARAM_NAMES: [&'static str; 1] = ["peerIdList"];
369 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
370 const PERMISSION: Permission = Permission::Admin;
371 const DESCRIPTION: Option<&'static str> = Some("Remove a peer from the protected list.");
372
373 type Params = (Vec<String>,);
374 type Ok = ();
375
376 async fn handle(
378 ctx: Ctx<impl Blockstore>,
379 (peer_ids,): Self::Params,
380 _: &http::Extensions,
381 ) -> Result<Self::Ok, ServerError> {
382 let peer_ids = peer_ids
383 .iter()
384 .map(String::as_str)
385 .map(PeerId::from_str)
386 .try_collect()?;
387 let (tx, rx) = flume::bounded(1);
388 ctx.network_send()
389 .send_async(NetworkMessage::JSONRPCRequest {
390 method: NetRPCMethods::UnprotectPeer(tx, peer_ids),
391 })
392 .await?;
393 rx.recv_async().await?;
394 Ok(())
395 }
396}
397
398pub enum NetChainExchange {}
399impl RpcMethod<3> for NetChainExchange {
400 const NAME: &'static str = "Forest.NetChainExchange";
401 const PARAM_NAMES: [&'static str; 3] = ["startTipsetKey", "len", "options"];
402 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
403 const PERMISSION: Permission = Permission::Admin;
404 const DESCRIPTION: Option<&'static str> = Some("Internal API for debugging chain exchange.");
405
406 type Params = (ApiTipsetKey, u64, u64);
407 type Ok = String;
408
409 async fn handle(
410 ctx: Ctx<impl Blockstore>,
411 (tsk, request_len, options): Self::Params,
412 _: &http::Extensions,
413 ) -> Result<Self::Ok, ServerError> {
414 let request_len =
415 NonZeroU64::new(request_len).context("request length must be greater than 0")?;
416 let tsk = tsk
417 .0
418 .unwrap_or_else(|| ctx.chain_store().heaviest_tipset().key().clone());
419 let timer = Instant::now();
420 let result: Vec<TipsetBundle> = ctx
421 .sync_network_context
422 .handle_chain_exchange_request(None, &tsk, request_len, options, |_| true)
423 .await?;
424 Ok(format!(
425 "fetched {} tipsets, took {}",
426 result.len(),
427 humantime::format_duration(timer.elapsed())
428 ))
429 }
430}