use crate::{network_service, platform::Platform, runtime_service};
use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec::Vec};
use core::{fmt, num::NonZeroU32, time::Duration};
use futures::{
channel::{mpsc, oneshot},
lock::Mutex,
prelude::*,
};
use smoldot::{
chain,
executor::host,
libp2p::PeerId,
network::{protocol, service},
trie::{self, prefix_proof, proof_verify},
};
mod parachain;
mod standalone;
pub struct Config<TPlat: Platform> {
pub log_name: String,
pub chain_information: chain::chain_information::ValidChainInformation,
pub block_number_bytes: usize,
pub tasks_executor: Box<dyn FnMut(String, future::BoxFuture<'static, ()>) + Send>,
pub network_service: (Arc<network_service::NetworkService<TPlat>>, usize),
pub network_events_receiver: stream::BoxStream<'static, network_service::Event>,
pub parachain: Option<ConfigParachain<TPlat>>,
}
pub struct ConfigParachain<TPlat: Platform> {
pub relay_chain_sync: Arc<runtime_service::RuntimeService<TPlat>>,
pub relay_chain_block_number_bytes: usize,
pub parachain_id: u32,
}
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct BlocksRequestId(usize);
pub struct SyncService<TPlat: Platform> {
to_background: Mutex<mpsc::Sender<ToBackground>>,
network_service: Arc<network_service::NetworkService<TPlat>>,
network_chain_index: usize,
block_number_bytes: usize,
}
impl<TPlat: Platform> SyncService<TPlat> {
pub async fn new(mut config: Config<TPlat>) -> Self {
let (to_background, from_foreground) = mpsc::channel(16);
let log_target = format!("sync-service-{}", config.log_name);
if let Some(config_parachain) = config.parachain {
(config.tasks_executor)(
log_target.clone(),
Box::pin(parachain::start_parachain(
log_target,
config.chain_information,
config.block_number_bytes,
config_parachain.relay_chain_sync.clone(),
config_parachain.relay_chain_block_number_bytes,
config_parachain.parachain_id,
from_foreground,
config.network_service.1,
config.network_events_receiver,
)),
);
} else {
(config.tasks_executor)(
log_target.clone(),
Box::pin(standalone::start_standalone_chain(
log_target,
config.chain_information,
config.block_number_bytes,
from_foreground,
config.network_service.0.clone(),
config.network_service.1,
config.network_events_receiver,
)),
);
}
SyncService {
to_background: Mutex::new(to_background),
network_service: config.network_service.0,
network_chain_index: config.network_service.1,
block_number_bytes: config.block_number_bytes,
}
}
pub fn block_number_bytes(&self) -> usize {
self.block_number_bytes
}
pub async fn serialize_chain_information(
&self,
) -> Option<chain::chain_information::ValidChainInformation> {
let (send_back, rx) = oneshot::channel();
self.to_background
.lock()
.await
.send(ToBackground::SerializeChainInformation { send_back })
.await
.unwrap();
rx.await.unwrap()
}
pub async fn subscribe_all(&self, buffer_size: usize, runtime_interest: bool) -> SubscribeAll {
let (send_back, rx) = oneshot::channel();
self.to_background
.lock()
.await
.send(ToBackground::SubscribeAll {
send_back,
buffer_size,
runtime_interest,
})
.await
.unwrap();
rx.await.unwrap()
}
pub async fn is_near_head_of_chain_heuristic(&self) -> bool {
let (send_back, rx) = oneshot::channel();
self.to_background
.lock()
.await
.send(ToBackground::IsNearHeadOfChainHeuristic { send_back })
.await
.unwrap();
rx.await.unwrap()
}
pub async fn syncing_peers(
&self,
) -> impl ExactSizeIterator<Item = (PeerId, protocol::Role, u64, [u8; 32])> {
let (send_back, rx) = oneshot::channel();
self.to_background
.lock()
.await
.send(ToBackground::SyncingPeers { send_back })
.await
.unwrap();
rx.await.unwrap().into_iter()
}
pub async fn peers_assumed_know_blocks(
&self,
block_number: u64,
block_hash: &[u8; 32],
) -> impl Iterator<Item = PeerId> {
let (send_back, rx) = oneshot::channel();
self.to_background
.lock()
.await
.send(ToBackground::PeersAssumedKnowBlock {
send_back,
block_number,
block_hash: *block_hash,
})
.await
.unwrap();
rx.await.unwrap().into_iter()
}
pub async fn block_query(
self: Arc<Self>,
block_number: u64,
hash: [u8; 32],
fields: protocol::BlocksRequestFields,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
) -> Result<protocol::BlockData, ()> {
let request_config = protocol::BlocksRequestConfig {
start: protocol::BlocksRequestConfigStart::Hash(hash),
desired_count: NonZeroU32::new(1).unwrap(),
direction: protocol::BlocksRequestDirection::Ascending,
fields: fields.clone(),
};
for target in self
.peers_assumed_know_blocks(block_number, &hash)
.await
.take(usize::try_from(total_attempts).unwrap_or(usize::max_value()))
{
let mut result = match self
.network_service
.clone()
.blocks_request(
target,
self.network_chain_index,
request_config.clone(),
timeout_per_request,
)
.await
{
Ok(b) => b,
Err(_) => continue,
};
return Ok(result.remove(0));
}
Err(())
}
pub async fn block_query_unknown_number(
self: Arc<Self>,
hash: [u8; 32],
fields: protocol::BlocksRequestFields,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
) -> Result<protocol::BlockData, ()> {
let request_config = protocol::BlocksRequestConfig {
start: protocol::BlocksRequestConfigStart::Hash(hash),
desired_count: NonZeroU32::new(1).unwrap(),
direction: protocol::BlocksRequestDirection::Ascending,
fields: fields.clone(),
};
for target in self
.network_service
.peers_list()
.await
.take(usize::try_from(total_attempts).unwrap_or(usize::max_value()))
{
let mut result = match self
.network_service
.clone()
.blocks_request(
target,
self.network_chain_index,
request_config.clone(),
timeout_per_request,
)
.await
{
Ok(b) => b,
Err(_) => continue,
};
return Ok(result.remove(0));
}
Err(())
}
pub async fn storage_query(
self: Arc<Self>,
block_number: u64,
block_hash: &[u8; 32],
storage_trie_root: &[u8; 32],
requested_keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
) -> Result<Vec<Option<Vec<u8>>>, StorageQueryError> {
let mut outcome_errors =
Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value()));
for target in self
.peers_assumed_know_blocks(block_number, block_hash)
.await
.take(usize::try_from(total_attempts).unwrap_or(usize::max_value()))
{
let result = self
.network_service
.clone()
.storage_proof_request(
self.network_chain_index,
target,
protocol::StorageProofRequestConfig {
block_hash: *block_hash,
keys: requested_keys.clone(),
},
timeout_per_request,
)
.await
.map_err(StorageQueryErrorDetail::Network)
.and_then(|outcome| {
let decoded = outcome.decode();
let mut result = Vec::with_capacity(requested_keys.clone().count());
for key in requested_keys.clone() {
result.push(
proof_verify::verify_proof(proof_verify::VerifyProofConfig {
proof: decoded.iter().map(|nv| &nv[..]),
requested_key: key.as_ref(),
trie_root_hash: &storage_trie_root,
})
.map_err(StorageQueryErrorDetail::ProofVerification)?
.map(|v| v.to_owned()),
);
}
debug_assert_eq!(result.len(), result.capacity());
Ok(result)
});
match result {
Ok(values) => return Ok(values),
Err(err) => {
outcome_errors.push(err);
}
}
}
Err(StorageQueryError {
errors: outcome_errors,
})
}
pub async fn storage_prefix_keys_query(
self: Arc<Self>,
block_number: u64,
block_hash: &[u8; 32],
prefix: &[u8],
storage_trie_root: &[u8; 32],
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
) -> Result<Vec<Vec<u8>>, StorageQueryError> {
let mut prefix_scan = prefix_proof::prefix_scan(prefix_proof::Config {
prefix,
trie_root_hash: *storage_trie_root,
});
'main_scan: loop {
let mut outcome_errors =
Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value()));
for target in self
.peers_assumed_know_blocks(block_number, block_hash)
.await
.take(usize::try_from(total_attempts).unwrap_or(usize::max_value()))
{
let result = self
.network_service
.clone()
.storage_proof_request(
self.network_chain_index,
target,
protocol::StorageProofRequestConfig {
block_hash: *block_hash,
keys: prefix_scan.requested_keys().map(|nibbles| {
trie::nibbles_to_bytes_extend(nibbles).collect::<Vec<_>>()
}),
},
timeout_per_request,
)
.await
.map_err(StorageQueryErrorDetail::Network);
match result {
Ok(proof) => {
let decoded_proof = proof.decode();
match prefix_scan.resume(decoded_proof.iter().map(|v| &v[..])) {
Ok(prefix_proof::ResumeOutcome::InProgress(scan)) => {
prefix_scan = scan;
continue 'main_scan;
}
Ok(prefix_proof::ResumeOutcome::Success { keys }) => {
return Ok(keys);
}
Err((scan, err)) => {
prefix_scan = scan;
outcome_errors
.push(StorageQueryErrorDetail::ProofVerification(err));
}
}
}
Err(err) => {
outcome_errors.push(err);
}
}
}
return Err(StorageQueryError {
errors: outcome_errors,
});
}
}
pub async fn call_proof_query<'a>(
self: Arc<Self>,
block_number: u64,
config: protocol::CallProofRequestConfig<
'a,
impl Iterator<Item = impl AsRef<[u8]>> + Clone,
>,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
) -> Result<network_service::EncodedMerkleProof, CallProofQueryError> {
let mut outcome_errors =
Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value()));
for target in self
.peers_assumed_know_blocks(block_number, &config.block_hash)
.await
.take(usize::try_from(total_attempts).unwrap_or(usize::max_value()))
{
let result = self
.network_service
.clone()
.call_proof_request(
self.network_chain_index,
target,
config.clone(),
timeout_per_request,
)
.await;
match result {
Ok(value) if !value.decode().is_empty() => return Ok(value),
Ok(_) => outcome_errors.push(network_service::CallProofRequestError::Request(
service::CallProofRequestError::Request(
smoldot::libp2p::peers::RequestError::Substream(
smoldot::libp2p::connection::established::RequestError::SubstreamClosed,
),
),
)),
Err(err) => {
outcome_errors.push(err);
}
}
}
Err(CallProofQueryError {
errors: outcome_errors,
})
}
}
#[derive(Debug, Clone)]
pub struct StorageQueryError {
pub errors: Vec<StorageQueryErrorDetail>,
}
impl StorageQueryError {
pub fn is_network_problem(&self) -> bool {
self.errors.iter().all(|err| match err {
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Request(_),
),
)
| StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::NoConnection,
) => true,
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Decode(_),
),
) => false,
StorageQueryErrorDetail::ProofVerification(proof_verify::Error::TrieRootNotFound) => {
true
}
StorageQueryErrorDetail::ProofVerification(_) => false,
})
}
}
impl fmt::Display for StorageQueryError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.errors.is_empty() {
write!(f, "No node available for storage query")
} else {
write!(f, "Storage query errors:")?;
for err in &self.errors {
write!(f, "\n- {}", err)?;
}
Ok(())
}
}
}
#[derive(Debug, derive_more::Display, Clone)]
pub enum StorageQueryErrorDetail {
#[display(fmt = "{}", _0)]
Network(network_service::StorageProofRequestError),
#[display(fmt = "{}", _0)]
ProofVerification(proof_verify::Error),
}
#[derive(Debug, Clone)]
pub struct CallProofQueryError {
pub errors: Vec<network_service::CallProofRequestError>,
}
impl CallProofQueryError {
pub fn is_network_problem(&self) -> bool {
self.errors.iter().all(|err| err.is_network_problem())
}
}
impl fmt::Display for CallProofQueryError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.errors.is_empty() {
write!(f, "No node available for call proof query")
} else {
write!(f, "Call proof query errors:")?;
for err in &self.errors {
write!(f, "\n- {}", err)?;
}
Ok(())
}
}
}
pub struct SubscribeAll {
pub finalized_block_scale_encoded_header: Vec<u8>,
pub finalized_block_runtime: Option<FinalizedBlockRuntime>,
pub non_finalized_blocks_ancestry_order: Vec<BlockNotification>,
pub new_blocks: mpsc::Receiver<Notification>,
}
pub struct FinalizedBlockRuntime {
pub virtual_machine: host::HostVmPrototype,
pub storage_code: Option<Vec<u8>>,
pub storage_heap_pages: Option<Vec<u8>>,
}
#[derive(Debug, Clone)]
pub enum Notification {
Finalized {
hash: [u8; 32],
best_block_hash: [u8; 32],
},
Block(BlockNotification),
BestBlockChanged {
hash: [u8; 32],
},
}
#[derive(Debug, Clone)]
pub struct BlockNotification {
pub is_new_best: bool,
pub scale_encoded_header: Vec<u8>,
pub parent_hash: [u8; 32],
}
enum ToBackground {
IsNearHeadOfChainHeuristic { send_back: oneshot::Sender<bool> },
SubscribeAll {
send_back: oneshot::Sender<SubscribeAll>,
buffer_size: usize,
runtime_interest: bool,
},
PeersAssumedKnowBlock {
send_back: oneshot::Sender<Vec<PeerId>>,
block_number: u64,
block_hash: [u8; 32],
},
SyncingPeers {
send_back: oneshot::Sender<Vec<(PeerId, protocol::Role, u64, [u8; 32])>>,
},
SerializeChainInformation {
send_back: oneshot::Sender<Option<chain::chain_information::ValidChainInformation>>,
},
}