1mod types;
5
6use crate::blocks::{Block, FullTipset, GossipBlock};
7use crate::libp2p::{IdentTopic, NetworkMessage, PUBSUB_BLOCK_STR};
8use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
9use anyhow::{Context as _, anyhow};
10use cid::Cid;
11use enumflags2::BitFlags;
12use fvm_ipld_blockstore::Blockstore;
13use fvm_ipld_encoding::to_vec;
14pub use types::*;
15
16use crate::chain;
17use crate::chain_sync::{NodeSyncStatus, SyncStatusReport, TipsetValidator};
18
19pub enum SyncCheckBad {}
20impl RpcMethod<1> for SyncCheckBad {
21 const NAME: &'static str = "Filecoin.SyncCheckBad";
22 const PARAM_NAMES: [&'static str; 1] = ["cid"];
23 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
24 const PERMISSION: Permission = Permission::Read;
25
26 type Params = (Cid,);
27 type Ok = String;
28
29 async fn handle(
30 ctx: Ctx<impl Blockstore>,
31 (cid,): Self::Params,
32 _: &http::Extensions,
33 ) -> Result<Self::Ok, ServerError> {
34 Ok(ctx
35 .bad_blocks
36 .as_ref()
37 .context("bad block cache is disabled")?
38 .peek(&cid)
39 .map(|_| "bad".to_string())
40 .unwrap_or_default())
41 }
42}
43
44pub enum SyncMarkBad {}
45impl RpcMethod<1> for SyncMarkBad {
46 const NAME: &'static str = "Filecoin.SyncMarkBad";
47 const PARAM_NAMES: [&'static str; 1] = ["cid"];
48 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
49 const PERMISSION: Permission = Permission::Admin;
50
51 type Params = (Cid,);
52 type Ok = ();
53
54 async fn handle(
55 ctx: Ctx<impl Blockstore>,
56 (cid,): Self::Params,
57 _: &http::Extensions,
58 ) -> Result<Self::Ok, ServerError> {
59 ctx.bad_blocks
60 .as_ref()
61 .context("bad block cache is disabled")?
62 .push(cid);
63 Ok(())
64 }
65}
66
67pub enum SyncSnapshotProgress {}
68impl RpcMethod<0> for SyncSnapshotProgress {
69 const NAME: &'static str = "Forest.SyncSnapshotProgress";
70 const PARAM_NAMES: [&'static str; 0] = [];
71 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
72 const PERMISSION: Permission = Permission::Read;
73 const DESCRIPTION: Option<&'static str> =
74 Some("Returns the snapshot download progress. Return Null if the tracking isn't started");
75
76 type Params = ();
77 type Ok = SnapshotProgressState;
78
79 async fn handle(
80 ctx: Ctx<impl Blockstore>,
81 (): Self::Params,
82 _: &http::Extensions,
83 ) -> Result<Self::Ok, ServerError> {
84 Ok(ctx.get_snapshot_progress_tracker())
85 }
86}
87
88pub enum SyncStatus {}
89impl RpcMethod<0> for SyncStatus {
90 const NAME: &'static str = "Forest.SyncStatus";
91 const PARAM_NAMES: [&'static str; 0] = [];
92 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
93 const PERMISSION: Permission = Permission::Read;
94 const DESCRIPTION: Option<&'static str> = Some("Returns the current sync status of the node.");
95
96 type Params = ();
97 type Ok = SyncStatusReport;
98
99 async fn handle(
100 ctx: Ctx<impl Blockstore>,
101 (): Self::Params,
102 _: &http::Extensions,
103 ) -> Result<Self::Ok, ServerError> {
104 let sync_status = ctx.sync_status.as_ref().read().clone();
105 Ok(sync_status)
106 }
107}
108
109pub enum SyncSubmitBlock {}
110impl RpcMethod<1> for SyncSubmitBlock {
111 const NAME: &'static str = "Filecoin.SyncSubmitBlock";
112 const PARAM_NAMES: [&'static str; 1] = ["block"];
113 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
114 const PERMISSION: Permission = Permission::Write;
115 const DESCRIPTION: Option<&'static str> = Some("Submits a newly created block to the network.");
116
117 type Params = (GossipBlock,);
118 type Ok = ();
119
120 async fn handle(
123 ctx: Ctx<impl Blockstore>,
124 (block_msg,): Self::Params,
125 _: &http::Extensions,
126 ) -> Result<Self::Ok, ServerError> {
127 if !matches!(ctx.sync_status.read().status, NodeSyncStatus::Synced) {
128 Err(anyhow!("the node isn't in 'follow' mode"))?
129 }
130 let genesis_network_name = ctx.chain_config().network.genesis_name();
131 let encoded_message = to_vec(&block_msg)?;
132 let pubsub_block_str = format!("{PUBSUB_BLOCK_STR}/{genesis_network_name}");
133 let (bls_messages, secp_messages) =
134 chain::store::block_messages(ctx.store(), &block_msg.header)?;
135 let block = Block {
136 header: block_msg.header.clone(),
137 bls_messages,
138 secp_messages,
139 };
140 let ts = FullTipset::from(block);
141 let genesis_ts = ctx.chain_store().genesis_tipset();
142
143 TipsetValidator(&ts)
144 .validate(
145 ctx.chain_store(),
146 ctx.bad_blocks.as_ref().map(AsRef::as_ref),
147 &genesis_ts,
148 ctx.chain_config().block_delay_secs,
149 )
150 .context("failed to validate the tipset")?;
151
152 ctx.tipset_send
153 .try_send(ts)
154 .context("tipset queue is full")?;
155
156 ctx.network_send().send(NetworkMessage::PubsubMessage {
157 topic: IdentTopic::new(pubsub_block_str),
158 message: encoded_message,
159 })?;
160 Ok(())
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use std::sync::Arc;
167
168 use super::*;
169 use crate::blocks::RawBlockHeader;
170 use crate::blocks::{CachingBlockHeader, Tipset};
171 use crate::chain::ChainStore;
172 use crate::chain_sync::network_context::SyncNetworkContext;
173 use crate::db::MemoryDB;
174 use crate::key_management::{KeyStore, KeyStoreConfig};
175 use crate::libp2p::{NetworkMessage, PeerManager};
176 use crate::message_pool::{MessagePool, MpoolRpcProvider};
177 use crate::networks::ChainConfig;
178 use crate::rpc::RPCState;
179 use crate::rpc::eth::filter::EthEventHandler;
180 use crate::shim::address::Address;
181 use crate::state_manager::StateManager;
182 use crate::utils::encoding::from_slice_with_fallback;
183 use parking_lot::RwLock;
184 use tokio::sync::mpsc;
185 use tokio::task::JoinSet;
186
187 fn ctx() -> (Arc<RPCState<MemoryDB>>, flume::Receiver<NetworkMessage>) {
188 let (network_send, network_rx) = flume::bounded(5);
189 let (tipset_send, _) = flume::bounded(5);
190 let mut services = JoinSet::new();
191 let db = Arc::new(MemoryDB::default());
192 let chain_config = Arc::new(ChainConfig::default());
193
194 let genesis_header = CachingBlockHeader::new(RawBlockHeader {
195 miner_address: Address::new_id(0),
196 timestamp: 7777,
197 ..Default::default()
198 });
199
200 let cs_arc = Arc::new(
201 ChainStore::new(db.clone(), db.clone(), db, chain_config, genesis_header).unwrap(),
202 );
203
204 let state_manager = Arc::new(StateManager::new(cs_arc.clone()).unwrap());
205 let state_manager_for_thread = state_manager.clone();
206 let cs_for_test = &cs_arc;
207 let mpool_network_send = network_send.clone();
208 let pool = {
209 let bz = hex::decode("904300e80781586082cb7477a801f55c1f2ea5e5d1167661feea60a39f697e1099af132682b81cc5047beacf5b6e80d5f52b9fd90323fb8510a5396416dd076c13c85619e176558582744053a3faef6764829aa02132a1571a76aabdc498a638ea0054d3bb57f41d82015860812d2396cc4592cdf7f829374b01ffd03c5469a4b0a9acc5ccc642797aa0a5498b97b28d90820fedc6f79ff0a6005f5c15dbaca3b8a45720af7ed53000555667207a0ccb50073cd24510995abd4c4e45c1e9e114905018b2da9454190499941e818201582012dd0a6a7d0e222a97926da03adb5a7768d31cc7c5c2bd6828e14a7d25fa3a608182004b76616c69642070726f6f6681d82a5827000171a0e4022030f89a8b0373ad69079dbcbc5addfe9b34dce932189786e50d3eb432ede3ba9c43000f0001d82a5827000171a0e4022052238c7d15c100c1b9ebf849541810c9e3c2d86e826512c6c416d2318fcd496dd82a5827000171a0e40220e5658b3d18cd06e1db9015b4b0ec55c123a24d5be1ea24d83938c5b8397b4f2fd82a5827000171a0e4022018d351341c302a21786b585708c9873565a0d07c42521d4aaf52da3ff6f2e461586102c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001a5f2c5439586102b5cd48724dce0fec8799d77fd6c5113276e7f470c8391faa0b5a6033a3eaf357d635705c36abe10309d73592727289680515afd9d424793ba4796b052682d21b03c5c8a37d94827fecc59cdc5750e198fdf20dee012f4d627c6665132298ab95004500053724e0").unwrap();
210 let header = from_slice_with_fallback::<CachingBlockHeader>(&bz).unwrap();
211 let ts = Tipset::from(header);
212 let db = cs_for_test.blockstore();
213 let tsk = ts.key();
214 cs_for_test.set_heaviest_tipset(ts.clone()).unwrap();
215
216 for i in tsk.to_cids() {
217 let bz2 = bz.clone();
218 db.put_keyed(&i, &bz2).unwrap();
219 }
220
221 let provider =
222 MpoolRpcProvider::new(cs_arc.publisher().clone(), state_manager_for_thread.clone());
223 MessagePool::new(
224 provider,
225 mpool_network_send,
226 Default::default(),
227 state_manager_for_thread.chain_config().clone(),
228 &mut services,
229 )
230 .unwrap()
231 };
232 let start_time = chrono::Utc::now();
233
234 let peer_manager = Arc::new(PeerManager::default());
235 let sync_network_context =
236 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
237 let state = Arc::new(RPCState {
238 state_manager,
239 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())),
240 mpool: Arc::new(pool),
241 bad_blocks: Some(Default::default()),
242 msgs_in_tipset: Default::default(),
243 sync_status: Arc::new(RwLock::new(SyncStatusReport::default())),
244 eth_event_handler: Arc::new(EthEventHandler::new()),
245 sync_network_context,
246 start_time,
247 shutdown: mpsc::channel(1).0, tipset_send,
249 snapshot_progress_tracker: Default::default(),
250 });
251 (state, network_rx)
252 }
253
254 #[tokio::test]
255 async fn set_check_bad() {
256 let (ctx, _) = ctx();
257
258 let cid = "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
259 .parse::<Cid>()
260 .unwrap();
261
262 let reason = SyncCheckBad::handle(ctx.clone(), (cid,), &Default::default())
263 .await
264 .unwrap();
265 assert_eq!(reason, "");
266
267 SyncMarkBad::handle(ctx.clone(), (cid,), &Default::default())
269 .await
270 .unwrap();
271
272 let reason = SyncCheckBad::handle(ctx.clone(), (cid,), &Default::default())
273 .await
274 .unwrap();
275 assert_eq!(reason, "bad");
276 }
277
278 #[tokio::test]
279 async fn sync_status_test() {
280 let (ctx, _) = ctx();
281
282 let st_copy = ctx.sync_status.clone();
283
284 let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default())
285 .await
286 .unwrap();
287 assert_eq!(sync_status, st_copy.as_ref().read().clone());
288
289 st_copy.write().status = NodeSyncStatus::Syncing;
291 st_copy.write().current_head_epoch = 4;
292
293 let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default())
294 .await
295 .unwrap();
296
297 assert_eq!(sync_status, st_copy.as_ref().read().clone());
298 }
299}