use std::{collections::HashMap, time::Duration, u64};
use pezkuwi_primitives::MAX_CODE_SIZE;
use pezsc_network::{NetworkBackend, MAX_RESPONSE_SIZE};
use pezsp_runtime::traits::Block;
use strum::{EnumIter, IntoEnumIterator};
pub use pezsc_network::{config as network, config::RequestResponseConfig, ProtocolName};
pub mod incoming;
pub mod outgoing;
pub use incoming::{IncomingRequest, IncomingRequestReceiver};
pub use outgoing::{OutgoingRequest, OutgoingResult, Recipient, Requests, ResponseSender};
pub mod v1;
pub mod v2;
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter)]
pub enum Protocol {
ChunkFetchingV1,
CollationFetchingV1,
CollationFetchingV2,
PoVFetchingV1,
AvailableDataFetchingV1,
DisputeSendingV1,
AttestedCandidateV2,
ChunkFetchingV2,
}
const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;
#[allow(dead_code)]
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1);
pub const CHUNK_REQUEST_TIMEOUT: Duration = DEFAULT_REQUEST_TIMEOUT_CONNECTED;
const POV_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_millis(2000);
const ATTESTED_CANDIDATE_TIMEOUT: Duration = Duration::from_millis(2500);
pub const MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS: u32 = 5;
const POV_RESPONSE_SIZE: u64 = MAX_RESPONSE_SIZE;
const ATTESTED_CANDIDATE_RESPONSE_SIZE: u64 = MAX_CODE_SIZE as u64 + 100_000;
pub const DISPUTE_REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
impl Protocol {
pub fn get_outbound_only_config<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
self,
req_protocol_names: &ReqProtocolNames,
) -> N::RequestResponseProtocolConfig {
self.create_config::<B, N>(req_protocol_names, None)
}
pub fn get_config<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
self,
req_protocol_names: &ReqProtocolNames,
) -> (async_channel::Receiver<network::IncomingRequest>, N::RequestResponseProtocolConfig) {
let (tx, rx) = async_channel::bounded(self.get_channel_size());
let cfg = self.create_config::<B, N>(req_protocol_names, Some(tx));
(rx, cfg)
}
fn create_config<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
self,
req_protocol_names: &ReqProtocolNames,
tx: Option<async_channel::Sender<network::IncomingRequest>>,
) -> N::RequestResponseProtocolConfig {
let name = req_protocol_names.get_name(self);
let legacy_names = self.get_legacy_name().into_iter().map(Into::into).collect();
match self {
Protocol::ChunkFetchingV1 | Protocol::ChunkFetchingV2 => N::request_response_config(
name,
legacy_names,
1_000,
POV_RESPONSE_SIZE,
CHUNK_REQUEST_TIMEOUT,
tx,
),
Protocol::CollationFetchingV1 | Protocol::CollationFetchingV2 => {
N::request_response_config(
name,
legacy_names,
1_000,
POV_RESPONSE_SIZE,
POV_REQUEST_TIMEOUT_CONNECTED,
tx,
)
},
Protocol::PoVFetchingV1 => N::request_response_config(
name,
legacy_names,
1_000,
POV_RESPONSE_SIZE,
POV_REQUEST_TIMEOUT_CONNECTED,
tx,
),
Protocol::AvailableDataFetchingV1 => N::request_response_config(
name,
legacy_names,
1_000,
POV_RESPONSE_SIZE,
POV_REQUEST_TIMEOUT_CONNECTED,
tx,
),
Protocol::DisputeSendingV1 => N::request_response_config(
name,
legacy_names,
1_000,
100,
DISPUTE_REQUEST_TIMEOUT,
tx,
),
Protocol::AttestedCandidateV2 => N::request_response_config(
name,
legacy_names,
1_000,
ATTESTED_CANDIDATE_RESPONSE_SIZE,
ATTESTED_CANDIDATE_TIMEOUT,
tx,
),
}
}
fn get_channel_size(self) -> usize {
match self {
Protocol::ChunkFetchingV1 | Protocol::ChunkFetchingV2 => 100,
Protocol::CollationFetchingV1 | Protocol::CollationFetchingV2 => 10,
Protocol::PoVFetchingV1 => 10,
Protocol::AvailableDataFetchingV1 => 100,
Protocol::DisputeSendingV1 => 100,
Protocol::AttestedCandidateV2 => {
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
ATTESTED_CANDIDATE_TIMEOUT.as_millis() as u64 * available_bandwidth
/ (1000 * MAX_CODE_SIZE as u64),
MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as u64,
);
debug_assert!(
size > 0,
"We should have a channel size greater zero, otherwise we won't accept any requests."
);
size as usize
},
}
}
const fn get_legacy_name(self) -> Option<&'static str> {
match self {
Protocol::ChunkFetchingV1 => Some("/pezkuwi/req_chunk/1"),
Protocol::CollationFetchingV1 => Some("/pezkuwi/req_collation/1"),
Protocol::PoVFetchingV1 => Some("/pezkuwi/req_pov/1"),
Protocol::AvailableDataFetchingV1 => Some("/pezkuwi/req_available_data/1"),
Protocol::DisputeSendingV1 => Some("/pezkuwi/send_dispute/1"),
Protocol::AttestedCandidateV2 => None,
Protocol::CollationFetchingV2 => None,
Protocol::ChunkFetchingV2 => None,
}
}
}
pub trait IsRequest {
type Response;
const PROTOCOL: Protocol;
}
#[derive(Clone)]
pub struct ReqProtocolNames {
names: HashMap<Protocol, ProtocolName>,
}
impl ReqProtocolNames {
pub fn new<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> Self {
let mut names = HashMap::new();
for protocol in Protocol::iter() {
names.insert(protocol, Self::generate_name(protocol, &genesis_hash, fork_id));
}
Self { names }
}
pub fn get_name(&self, protocol: Protocol) -> ProtocolName {
self.names
.get(&protocol)
.expect("All `Protocol` enum variants are added above via `strum`; qed")
.clone()
}
fn generate_name<Hash: AsRef<[u8]>>(
protocol: Protocol,
genesis_hash: &Hash,
fork_id: Option<&str>,
) -> ProtocolName {
let prefix = if let Some(fork_id) = fork_id {
format!("/{}/{}", hex::encode(genesis_hash), fork_id)
} else {
format!("/{}", hex::encode(genesis_hash))
};
let short_name = match protocol {
Protocol::ChunkFetchingV1 => "/req_chunk/1",
Protocol::CollationFetchingV1 => "/req_collation/1",
Protocol::PoVFetchingV1 => "/req_pov/1",
Protocol::AvailableDataFetchingV1 => "/req_available_data/1",
Protocol::DisputeSendingV1 => "/send_dispute/1",
Protocol::CollationFetchingV2 => "/req_collation/2",
Protocol::AttestedCandidateV2 => "/req_attested_candidate/2",
Protocol::ChunkFetchingV2 => "/req_chunk/2",
};
format!("{}{}", prefix, short_name).into()
}
}