1use collator_overseer::NewMinimalNode;
19
20use cumulus_client_bootnodes::bootnode_request_response_config;
21use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
22use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterface, Url};
23use network::build_collator_network;
24use polkadot_network_bridge::{peer_sets_info, IsAuthority};
25use polkadot_node_network_protocol::{
26 peer_set::{PeerSet, PeerSetProtocolNames},
27 request_response::{
28 v1, v2, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
29 },
30};
31
32use polkadot_core_primitives::{Block as RelayBlock, Hash as RelayHash};
33use polkadot_node_subsystem_util::metrics::prometheus::Registry;
34use polkadot_primitives::CollatorPair;
35use polkadot_service::{overseer::OverseerGenArgs, IsParachainNode};
36
37use sc_authority_discovery::Service as AuthorityDiscoveryService;
38use sc_network::{
39 config::FullNetworkConfiguration, request_responses::IncomingRequest as GenericIncomingRequest,
40 service::traits::NetworkService, Event, NetworkBackend, NetworkEventStream,
41};
42use sc_service::{config::PrometheusConfig, Configuration, TaskManager};
43use sp_runtime::{app_crypto::Pair, traits::Block as BlockT};
44
45use futures::{FutureExt, StreamExt};
46use std::sync::Arc;
47
48mod blockchain_rpc_client;
49mod collator_overseer;
50mod network;
51
52pub use blockchain_rpc_client::BlockChainRpcClient;
53
54const LOG_TARGET: &str = "minimal-relaychain-node";
55
56fn build_authority_discovery_service<Block: BlockT>(
57 task_manager: &TaskManager,
58 client: Arc<BlockChainRpcClient>,
59 config: &Configuration,
60 network: Arc<dyn NetworkService>,
61 prometheus_registry: Option<Registry>,
62) -> AuthorityDiscoveryService {
63 let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
64 let auth_disc_public_addresses = config.network.public_addresses.clone();
65 let authority_discovery_role = sc_authority_discovery::Role::Discover;
66 let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move {
67 match e {
68 Event::Dht(e) => Some(e),
69 _ => None,
70 }
71 });
72 let net_config_path = config.network.net_config_path.clone();
73 let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
74 sc_authority_discovery::WorkerConfig {
75 publish_non_global_ips: auth_disc_publish_non_global_ips,
76 public_addresses: auth_disc_public_addresses,
77 strict_record_validation: true,
79 persisted_cache_directory: net_config_path,
80 ..Default::default()
81 },
82 client,
83 Arc::new(network.clone()),
84 Box::pin(dht_event_stream),
85 authority_discovery_role,
86 prometheus_registry,
87 task_manager.spawn_handle(),
88 );
89
90 task_manager.spawn_handle().spawn(
91 "authority-discovery-worker",
92 Some("authority-discovery"),
93 worker.run(),
94 );
95 service
96}
97
98async fn build_interface(
99 polkadot_config: Configuration,
100 task_manager: &mut TaskManager,
101 client: RelayChainRpcClient,
102) -> RelayChainResult<(
103 Arc<dyn RelayChainInterface + 'static>,
104 Option<CollatorPair>,
105 Arc<dyn NetworkService>,
106 async_channel::Receiver<GenericIncomingRequest>,
107)> {
108 let collator_pair = CollatorPair::generate().0;
109 let blockchain_rpc_client = Arc::new(BlockChainRpcClient::new(client.clone()));
110 let collator_node = match polkadot_config.network.network_backend {
111 sc_network::config::NetworkBackendType::Libp2p => {
112 new_minimal_relay_chain::<RelayBlock, sc_network::NetworkWorker<RelayBlock, RelayHash>>(
113 polkadot_config,
114 collator_pair.clone(),
115 blockchain_rpc_client,
116 )
117 .await?
118 },
119 sc_network::config::NetworkBackendType::Litep2p => {
120 new_minimal_relay_chain::<RelayBlock, sc_network::Litep2pNetworkBackend>(
121 polkadot_config,
122 collator_pair.clone(),
123 blockchain_rpc_client,
124 )
125 .await?
126 },
127 };
128 task_manager.add_child(collator_node.task_manager);
129 Ok((
130 Arc::new(RelayChainRpcInterface::new(client, collator_node.overseer_handle)),
131 Some(collator_pair),
132 collator_node.network_service,
133 collator_node.paranode_rx,
134 ))
135}
136
137pub async fn build_minimal_relay_chain_node_with_rpc(
138 relay_chain_config: Configuration,
139 parachain_prometheus_registry: Option<&Registry>,
140 task_manager: &mut TaskManager,
141 relay_chain_url: Vec<Url>,
142) -> RelayChainResult<(
143 Arc<dyn RelayChainInterface + 'static>,
144 Option<CollatorPair>,
145 Arc<dyn NetworkService>,
146 async_channel::Receiver<GenericIncomingRequest>,
147)> {
148 let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
149 relay_chain_url,
150 task_manager,
151 parachain_prometheus_registry,
152 )
153 .await?;
154
155 build_interface(relay_chain_config, task_manager, client).await
156}
157
158#[sc_tracing::logging::prefix_logs_with("Relaychain")]
170async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlock, RelayHash>>(
171 config: Configuration,
172 collator_pair: CollatorPair,
173 relay_chain_rpc_client: Arc<BlockChainRpcClient>,
174) -> Result<NewMinimalNode, RelayChainError> {
175 let role = config.role;
176 let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
177 &config.network,
178 config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
179 );
180 let metrics = Network::register_notification_metrics(
181 config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
182 );
183 let peer_store_handle = net_config.peer_store_handle();
184
185 let prometheus_registry = config.prometheus_registry();
186 let task_manager = TaskManager::new(config.tokio_handle.clone(), prometheus_registry)?;
187
188 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
189 task_manager.spawn_handle().spawn(
190 "prometheus-endpoint",
191 None,
192 prometheus_endpoint::init_prometheus(port, registry).map(drop),
193 );
194 }
195
196 let genesis_hash = relay_chain_rpc_client.block_get_hash(Some(0)).await?.unwrap_or_default();
197 let peerset_protocol_names =
198 PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
199 let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
200 let notification_services = peer_sets_info::<_, Network>(
201 is_authority,
202 &peerset_protocol_names,
203 metrics.clone(),
204 Arc::clone(&peer_store_handle),
205 )
206 .into_iter()
207 .map(|(config, (peerset, service))| {
208 net_config.add_notification_protocol(config);
209 (peerset, service)
210 })
211 .collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();
212
213 let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
214 let (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver) =
215 build_request_response_protocol_receivers(&request_protocol_names, &mut net_config);
216
217 let (cfg, paranode_rx) = bootnode_request_response_config::<_, _, Network>(
218 genesis_hash,
219 config.chain_spec.fork_id(),
220 );
221 net_config.add_request_response_protocol(cfg);
222
223 let best_header = relay_chain_rpc_client
224 .chain_get_header(None)
225 .await?
226 .ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
227 let (network, sync_service) = build_collator_network::<Network>(
228 &config,
229 net_config,
230 task_manager.spawn_handle(),
231 genesis_hash,
232 best_header,
233 metrics,
234 )
235 .map_err(|e| RelayChainError::Application(Box::new(e)))?;
236
237 let authority_discovery_service = build_authority_discovery_service::<Block>(
238 &task_manager,
239 relay_chain_rpc_client.clone(),
240 &config,
241 network.clone(),
242 prometheus_registry.cloned(),
243 );
244
245 let overseer_args = OverseerGenArgs {
246 runtime_client: relay_chain_rpc_client.clone(),
247 network_service: network.clone(),
248 sync_service,
249 authority_discovery_service,
250 collation_req_v1_receiver,
251 collation_req_v2_receiver,
252 available_data_req_receiver,
253 registry: prometheus_registry,
254 spawner: task_manager.spawn_handle(),
255 is_parachain_node: IsParachainNode::Collator(collator_pair),
256 overseer_message_channel_capacity_override: None,
257 req_protocol_names: request_protocol_names,
258 peerset_protocol_names,
259 notification_services,
260 };
261
262 let overseer_handle =
263 collator_overseer::spawn_overseer(overseer_args, &task_manager, relay_chain_rpc_client)?;
264
265 Ok(NewMinimalNode { task_manager, overseer_handle, network_service: network, paranode_rx })
266}
267
268fn build_request_response_protocol_receivers<
269 Block: BlockT,
270 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
271>(
272 request_protocol_names: &ReqProtocolNames,
273 config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
274) -> (
275 IncomingRequestReceiver<v1::CollationFetchingRequest>,
276 IncomingRequestReceiver<v2::CollationFetchingRequest>,
277 IncomingRequestReceiver<v1::AvailableDataFetchingRequest>,
278) {
279 let (collation_req_v1_receiver, cfg) =
280 IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
281 config.add_request_response_protocol(cfg);
282 let (collation_req_v2_receiver, cfg) =
283 IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
284 config.add_request_response_protocol(cfg);
285 let (available_data_req_receiver, cfg) =
286 IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
287 config.add_request_response_protocol(cfg);
288 let cfg =
289 Protocol::ChunkFetchingV1.get_outbound_only_config::<_, Network>(request_protocol_names);
290 config.add_request_response_protocol(cfg);
291 let cfg =
292 Protocol::ChunkFetchingV2.get_outbound_only_config::<_, Network>(request_protocol_names);
293 config.add_request_response_protocol(cfg);
294 (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver)
295}