soil_network/sync/
warp_request_handler.rs1use codec::Decode;
10use futures::{channel::oneshot, stream::StreamExt};
11use log::debug;
12
13use crate::{
14 sync::strategy::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider},
15 LOG_TARGET,
16};
17use soil_network::{
18 config::ProtocolId,
19 request_responses::{IncomingRequest, OutgoingResponse},
20 NetworkBackend, MAX_RESPONSE_SIZE,
21};
22use subsoil::runtime::traits::Block as BlockT;
23
24use std::{sync::Arc, time::Duration};
25
26const MAX_WARP_REQUEST_QUEUE: usize = 20;
28
29pub fn generate_request_response_config<
32 Hash: AsRef<[u8]>,
33 B: BlockT,
34 N: NetworkBackend<B, <B as BlockT>::Hash>,
35>(
36 protocol_id: ProtocolId,
37 genesis_hash: Hash,
38 fork_id: Option<&str>,
39 inbound_queue: async_channel::Sender<IncomingRequest>,
40) -> N::RequestResponseProtocolConfig {
41 N::request_response_config(
42 generate_protocol_name(genesis_hash, fork_id).into(),
43 std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
44 32,
45 MAX_RESPONSE_SIZE,
46 Duration::from_secs(10),
47 Some(inbound_queue),
48 )
49}
50
51fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
53 let genesis_hash = genesis_hash.as_ref();
54 if let Some(fork_id) = fork_id {
55 format!("/{}/{}/sync/warp", array_bytes::bytes2hex("", genesis_hash), fork_id)
56 } else {
57 format!("/{}/sync/warp", array_bytes::bytes2hex("", genesis_hash))
58 }
59}
60
61fn generate_legacy_protocol_name(protocol_id: ProtocolId) -> String {
63 format!("/{}/sync/warp", protocol_id.as_ref())
64}
65
66pub struct RequestHandler<TBlock: BlockT> {
68 backend: Arc<dyn WarpSyncProvider<TBlock>>,
69 request_receiver: async_channel::Receiver<IncomingRequest>,
70}
71
72impl<TBlock: BlockT> RequestHandler<TBlock> {
73 pub fn new<Hash: AsRef<[u8]>, N: NetworkBackend<TBlock, <TBlock as BlockT>::Hash>>(
75 protocol_id: ProtocolId,
76 genesis_hash: Hash,
77 fork_id: Option<&str>,
78 backend: Arc<dyn WarpSyncProvider<TBlock>>,
79 ) -> (Self, N::RequestResponseProtocolConfig) {
80 let (tx, request_receiver) = async_channel::bounded(MAX_WARP_REQUEST_QUEUE);
81
82 let request_response_config = generate_request_response_config::<_, TBlock, N>(
83 protocol_id,
84 genesis_hash,
85 fork_id,
86 tx,
87 );
88
89 (Self { backend, request_receiver }, request_response_config)
90 }
91
92 fn handle_request(
93 &self,
94 payload: Vec<u8>,
95 pending_response: oneshot::Sender<OutgoingResponse>,
96 ) -> Result<(), HandleRequestError> {
97 let request = WarpProofRequest::<TBlock>::decode(&mut &payload[..])?;
98
99 let EncodedProof(proof) = self
100 .backend
101 .generate(request.begin)
102 .map_err(HandleRequestError::InvalidRequest)?;
103
104 pending_response
105 .send(OutgoingResponse {
106 result: Ok(proof),
107 reputation_changes: Vec::new(),
108 sent_feedback: None,
109 })
110 .map_err(|_| HandleRequestError::SendResponse)
111 }
112
113 pub async fn run(mut self) {
115 while let Some(request) = self.request_receiver.next().await {
116 let IncomingRequest { peer, payload, pending_response } = request;
117
118 match self.handle_request(payload, pending_response) {
119 Ok(()) => {
120 debug!(target: LOG_TARGET, "Handled grandpa warp sync request from {}.", peer)
121 },
122 Err(e) => debug!(
123 target: LOG_TARGET,
124 "Failed to handle grandpa warp sync request from {}: {}",
125 peer, e,
126 ),
127 }
128 }
129 }
130}
131
132#[derive(Debug, thiserror::Error)]
133enum HandleRequestError {
134 #[error("Failed to decode request: {0}.")]
135 DecodeProto(#[from] prost::DecodeError),
136
137 #[error("Failed to encode response: {0}.")]
138 EncodeProto(#[from] prost::EncodeError),
139
140 #[error("Failed to decode block hash: {0}.")]
141 DecodeScale(#[from] codec::Error),
142
143 #[error(transparent)]
144 Client(#[from] soil_client::blockchain::Error),
145
146 #[error("Invalid request {0}.")]
147 InvalidRequest(#[from] Box<dyn std::error::Error + Send + Sync>),
148
149 #[error("Failed to send response.")]
150 SendResponse,
151}