#![cfg_attr(not(any(test, feature = "std")), no_std)]
#![forbid(unsafe_code)]
#![deny(rustdoc::broken_intra_doc_links)]
extern crate alloc;
use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec, vec::Vec};
use core::{num::NonZeroU32, pin};
use futures_channel::oneshot;
use futures_util::{future, FutureExt as _};
use hashbrown::{hash_map::Entry, HashMap};
use itertools::Itertools as _;
use smoldot::{
chain::{self, chain_information},
chain_spec, header,
informant::HashDisplay,
libp2p::{connection, multiaddr, peer_id},
};
mod database;
mod json_rpc_service;
mod network_service;
mod runtime_service;
mod sync_service;
mod transactions_service;
mod util;
pub mod platform;
pub use json_rpc_service::HandleRpcError;
pub use peer_id::PeerId;
#[derive(Debug, Clone)]
pub struct AddChainConfig<'a, TChain, TRelays> {
pub user_data: TChain,
pub specification: &'a str,
pub database_content: &'a str,
pub potential_relay_chains: TRelays,
pub json_rpc: AddChainConfigJsonRpc,
}
#[derive(Debug, Clone)]
pub enum AddChainConfigJsonRpc {
Disabled,
Enabled {
max_pending_requests: NonZeroU32,
max_subscriptions: u32,
},
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChainId(usize);
impl From<usize> for ChainId {
fn from(id: usize) -> ChainId {
ChainId(id)
}
}
impl From<ChainId> for usize {
fn from(chain_id: ChainId) -> usize {
chain_id.0
}
}
pub struct Client<TPlat: platform::PlatformRef, TChain = ()> {
platform: TPlat,
public_api_chains: slab::Slab<PublicApiChain<TChain>>,
chains_by_key: HashMap<ChainKey, RunningChain<TPlat>, fnv::FnvBuildHasher>,
}
struct PublicApiChain<TChain> {
user_data: TChain,
key: ChainKey,
chain_spec_chain_id: String,
json_rpc_frontend: Option<json_rpc_service::Frontend>,
_public_api_chain_destroyed_tx: oneshot::Sender<()>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ChainKey {
genesis_block_hash: [u8; 32],
relay_chain: Option<(Box<ChainKey>, u32)>,
fork_id: Option<String>,
}
struct RunningChain<TPlat: platform::PlatformRef> {
services: future::MaybeDone<future::Shared<future::RemoteHandle<ChainServices<TPlat>>>>,
log_name: String,
num_references: NonZeroU32,
}
struct ChainServices<TPlat: platform::PlatformRef> {
network_service: Arc<network_service::NetworkService<TPlat>>,
network_identity: peer_id::PeerId,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
transactions_service: Arc<transactions_service::TransactionsService<TPlat>>,
}
impl<TPlat: platform::PlatformRef> Clone for ChainServices<TPlat> {
fn clone(&self) -> Self {
ChainServices {
network_service: self.network_service.clone(),
network_identity: self.network_identity.clone(),
sync_service: self.sync_service.clone(),
runtime_service: self.runtime_service.clone(),
transactions_service: self.transactions_service.clone(),
}
}
}
pub struct AddChainSuccess {
pub chain_id: ChainId,
pub json_rpc_responses: Option<JsonRpcResponses>,
}
pub struct JsonRpcResponses {
inner: Option<json_rpc_service::Frontend>,
public_api_chain_destroyed_rx: oneshot::Receiver<()>,
}
impl JsonRpcResponses {
pub async fn next(&mut self) -> Option<String> {
if let Some(frontend) = self.inner.as_mut() {
let response_fut = pin::pin!(frontend.next_json_rpc_response());
match future::select(response_fut, &mut self.public_api_chain_destroyed_rx).await {
future::Either::Left((response, _)) => return Some(response),
future::Either::Right((_result, _)) => {
debug_assert!(_result.is_err());
}
}
}
self.inner = None;
None
}
}
impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
pub fn new(platform: TPlat) -> Self {
let expected_chains = 8;
Client {
platform,
public_api_chains: slab::Slab::with_capacity(expected_chains),
chains_by_key: HashMap::with_capacity_and_hasher(expected_chains, Default::default()),
}
}
pub fn add_chain(
&mut self,
config: AddChainConfig<'_, TChain, impl Iterator<Item = ChainId>>,
) -> Result<AddChainSuccess, AddChainError> {
let chain_spec = match chain_spec::ChainSpec::from_json_bytes(config.specification) {
Ok(cs) => cs,
Err(err) => {
return Err(AddChainError::ChainSpecParseError(err));
}
};
let (chain_information, genesis_block_header, checkpoint_nodes) = {
match (
chain_spec.as_chain_information().map(|(ci, _)| ci), chain_spec.light_sync_state().map(|s| {
chain::chain_information::ValidChainInformation::try_from(
s.as_chain_information(),
)
}),
database::decode_database(
config.database_content,
chain_spec.block_number_bytes().into(),
),
) {
(Ok(genesis_ci), checkpoint, Ok(database_content))
if database_content.genesis_block_hash
== genesis_ci
.as_ref()
.finalized_block_header
.hash(chain_spec.block_number_bytes().into())
&& checkpoint.as_ref().and_then(|r| r.as_ref().ok()).map_or(
true,
|cp| {
cp.as_ref().finalized_block_header.number
< database_content
.chain_information
.as_ref()
.finalized_block_header
.number
},
) =>
{
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(
database_content.chain_information,
genesis_header.into(),
database_content.known_nodes,
)
}
(
Err(chain_spec::FromGenesisStorageError::UnknownStorageItems),
checkpoint,
Ok(database_content),
) if checkpoint
.as_ref()
.and_then(|r| r.as_ref().ok())
.map_or(true, |cp| {
cp.as_ref().finalized_block_header.number
< database_content
.chain_information
.as_ref()
.finalized_block_header
.number
}) =>
{
let genesis_header = header::Header {
parent_hash: [0; 32],
number: 0,
state_root: *chain_spec.genesis_storage().into_trie_root_hash().unwrap(),
extrinsics_root: smoldot::trie::empty_trie_merkle_value(),
digest: header::DigestRef::empty().into(),
};
if database_content.genesis_block_hash
== genesis_header.hash(chain_spec.block_number_bytes().into())
{
(
database_content.chain_information,
genesis_header,
database_content.known_nodes,
)
} else if let Some(Ok(checkpoint)) = checkpoint {
(checkpoint, genesis_header, database_content.known_nodes)
} else {
return Err(AddChainError::ChainSpecNeitherGenesisStorageNorCheckpoint);
}
}
(Err(chain_spec::FromGenesisStorageError::UnknownStorageItems), None, _) => {
return Err(AddChainError::ChainSpecNeitherGenesisStorageNorCheckpoint);
}
(
Err(chain_spec::FromGenesisStorageError::UnknownStorageItems),
Some(Ok(checkpoint)),
_,
) => {
let genesis_header = header::Header {
parent_hash: [0; 32],
number: 0,
state_root: *chain_spec.genesis_storage().into_trie_root_hash().unwrap(),
extrinsics_root: smoldot::trie::empty_trie_merkle_value(),
digest: header::DigestRef::empty().into(),
};
(checkpoint, genesis_header, Default::default())
}
(Err(err), _, _) => return Err(AddChainError::InvalidGenesisStorage(err)),
(_, Some(Err(err)), _) => {
return Err(AddChainError::InvalidCheckpoint(err));
}
(Ok(genesis_ci), Some(Ok(checkpoint)), _) => {
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(checkpoint, genesis_header.into(), Default::default())
}
(Ok(genesis_ci), None, _) => {
let genesis_header =
header::Header::from(genesis_ci.as_ref().finalized_block_header.clone());
(genesis_ci, genesis_header, Default::default())
}
}
};
let relay_chain_id = if let Some((relay_chain_id, _para_id)) = chain_spec.relay_chain() {
let chain = config
.potential_relay_chains
.filter(|c| {
self.public_api_chains
.get(c.0)
.map_or(false, |chain| chain.chain_spec_chain_id == relay_chain_id)
})
.exactly_one();
match chain {
Ok(c) => Some(c),
Err(mut iter) => {
return Err(if iter.next().is_none() {
AddChainError::NoRelayChainFound
} else {
debug_assert!(iter.next().is_some());
AddChainError::MultipleRelayChains
});
}
}
} else {
None
};
let (bootstrap_nodes, invalid_bootstrap_nodes_sanitized) = {
let mut valid_list = Vec::with_capacity(chain_spec.boot_nodes().len());
let mut invalid_list = Vec::with_capacity(0);
for node in chain_spec.boot_nodes() {
match node {
chain_spec::Bootnode::Parsed { multiaddr, peer_id } => {
if let Ok(multiaddr) = multiaddr.parse::<multiaddr::Multiaddr>() {
let peer_id = peer_id::PeerId::from_bytes(peer_id).unwrap();
valid_list.push((peer_id, vec![multiaddr]));
} else {
invalid_list.push(multiaddr)
}
}
chain_spec::Bootnode::UnrecognizedFormat(unparsed) => invalid_list.push(
unparsed
.chars()
.filter(|c| c.is_ascii())
.collect::<String>(),
),
}
}
(valid_list, invalid_list)
};
let chain_spec_chain_id = chain_spec.id().to_owned();
let genesis_block_hash = genesis_block_header.hash(chain_spec.block_number_bytes().into());
let genesis_block_state_root = genesis_block_header.state_root;
let new_chain_key = ChainKey {
genesis_block_hash,
relay_chain: relay_chain_id.map(|ck| {
(
Box::new(self.public_api_chains.get(ck.0).unwrap().key.clone()),
chain_spec.relay_chain().unwrap().1,
)
}),
fork_id: chain_spec.fork_id().map(|f| f.to_owned()),
};
let relay_chain_ready_future: Option<(future::MaybeDone<future::Shared<_>>, String)> =
relay_chain_id.map(|relay_chain| {
let relay_chain = &self
.chains_by_key
.get(&self.public_api_chains.get(relay_chain.0).unwrap().key)
.unwrap();
let future = match &relay_chain.services {
future::MaybeDone::Done(d) => future::MaybeDone::Done(d.clone()),
future::MaybeDone::Future(d) => future::MaybeDone::Future(d.clone()),
future::MaybeDone::Gone => unreachable!(),
};
(future, relay_chain.log_name.clone())
});
let log_name = {
let base = chain_spec
.id()
.chars()
.filter(|c| c.is_ascii_graphic())
.collect::<String>();
let mut suffix = None;
loop {
let attempt = if let Some(suffix) = suffix {
format!("{base}-{suffix}")
} else {
base.clone()
};
if !self.chains_by_key.values().any(|c| *c.log_name == attempt) {
break attempt;
}
match &mut suffix {
Some(v) => *v += 1,
v @ None => *v = Some(1),
}
}
};
let (services_init, log_name) = match self.chains_by_key.entry(new_chain_key.clone()) {
Entry::Occupied(mut entry) => {
entry.get_mut().num_references = entry.get().num_references.checked_add(1).unwrap();
let entry = entry.into_mut();
(&mut entry.services, &entry.log_name)
}
Entry::Vacant(entry) => {
let network_noise_key = connection::NoiseKey::new(&rand::random());
let network_identify_agent_version = format!(
"{} {}",
self.platform.client_name(),
self.platform.client_version()
);
let running_chain_init_future: future::RemoteHandle<ChainServices<TPlat>> = {
let platform = self.platform.clone();
let chain_spec = chain_spec.clone(); let log_name = log_name.clone();
let future = async move {
let relay_chain =
if let Some((mut relay_chain_ready_future, relay_chain_log_name)) =
relay_chain_ready_future
{
(&mut relay_chain_ready_future).await;
let running_relay_chain =
pin::Pin::new(&mut relay_chain_ready_future)
.take_output()
.unwrap();
Some((running_relay_chain, relay_chain_log_name))
} else {
None
};
let chain_name = chain_spec.name().to_owned();
let relay_chain_para_id = chain_spec.relay_chain().map(|(_, id)| id);
let starting_block_number =
chain_information.as_ref().finalized_block_header.number;
let starting_block_hash = chain_information
.as_ref()
.finalized_block_header
.hash(chain_spec.block_number_bytes().into());
let has_bad_blocks = chain_spec.bad_blocks_hashes().count() != 0;
let running_chain = start_services(
log_name.clone(),
&platform,
chain_information,
genesis_block_header
.scale_encoding_vec(chain_spec.block_number_bytes().into()),
chain_spec,
relay_chain.as_ref().map(|(r, _)| r),
network_identify_agent_version,
network_noise_key,
)
.await;
if let Some((_, relay_chain_log_name)) = relay_chain.as_ref() {
log::info!(
target: "smoldot",
"Parachain initialization complete for {}. Name: {:?}. Genesis \
hash: {}. State root hash: 0x{}. Network identity: {}. Relay \
chain: {} (id: {})",
log_name,
chain_name,
HashDisplay(&genesis_block_hash),
hex::encode(genesis_block_state_root),
running_chain.network_identity,
relay_chain_log_name,
relay_chain_para_id.unwrap(),
);
} else {
log::info!(
target: "smoldot",
"Chain initialization complete for {}. Name: {:?}. Genesis \
hash: {}. State root hash: 0x{}. Network identity: {}. Chain \
specification or database starting at: {} (#{})",
log_name,
chain_name,
HashDisplay(&genesis_block_hash),
hex::encode(genesis_block_state_root),
running_chain.network_identity,
HashDisplay(&starting_block_hash),
starting_block_number
);
}
if has_bad_blocks {
log::warn!(
target: "smoldot",
"Chain specification of {} contains a list of bad blocks. Bad \
blocks are not implemented in the light client. An appropriate \
way to silence this warning is to remove the bad blocks from the \
chain specification, which can safely be done:\n\
- For relay chains: if the chain specification contains a \
checkpoint and that the bad blocks have a block number inferior \
to this checkpoint.\n\
- For parachains: if the bad blocks have a block number inferior \
to the current parachain finalized block.", log_name
);
}
running_chain
};
let (background_future, output_future) = future.remote_handle();
self.platform
.spawn_task("services-initialization".into(), background_future.boxed());
output_future
};
let entry = entry.insert(RunningChain {
services: future::maybe_done(running_chain_init_future.shared()),
log_name,
num_references: NonZeroU32::new(1).unwrap(),
});
(&mut entry.services, &entry.log_name)
}
};
if !invalid_bootstrap_nodes_sanitized.is_empty() {
log::warn!(
target: "smoldot",
"Failed to parse some of the bootnodes of {}. \
These bootnodes have been ignored. List: {}",
log_name, invalid_bootstrap_nodes_sanitized.join(", ")
);
}
if bootstrap_nodes.is_empty() {
log::warn!(
target: "smoldot",
"Newly-added chain {} has an empty list of bootnodes. Smoldot will likely fail \
to connect to its peer-to-peer network.",
log_name
);
}
let public_api_chains_entry = self.public_api_chains.vacant_entry();
let new_chain_id = ChainId(public_api_chains_entry.key());
self.platform
.spawn_task("network-service-add-initial-topology".into(), {
let mut running_chain_init = match services_init {
future::MaybeDone::Done(d) => future::MaybeDone::Done(d.clone()),
future::MaybeDone::Future(d) => future::MaybeDone::Future(d.clone()),
future::MaybeDone::Gone => unreachable!(),
};
let platform = self.platform.clone();
async move {
(&mut running_chain_init).await;
let running_chain = pin::Pin::new(&mut running_chain_init)
.take_output()
.unwrap();
running_chain
.network_service
.discover(&platform.now(), 0, checkpoint_nodes, false)
.await;
running_chain
.network_service
.discover(&platform.now(), 0, bootstrap_nodes, true)
.await;
}
.boxed()
});
let json_rpc_frontend = if let AddChainConfigJsonRpc::Enabled {
max_pending_requests,
max_subscriptions,
} = config.json_rpc
{
let mut running_chain_init = match services_init {
future::MaybeDone::Done(d) => future::MaybeDone::Done(d.clone()),
future::MaybeDone::Future(d) => future::MaybeDone::Future(d.clone()),
future::MaybeDone::Gone => unreachable!(),
};
let (frontend, service_starter) = json_rpc_service::service(json_rpc_service::Config {
log_name: log_name.clone(), max_pending_requests,
max_subscriptions,
max_parallel_requests: NonZeroU32::new(24).unwrap(),
max_parallel_subscription_updates: NonZeroU32::new(8).unwrap(),
});
let system_name = self.platform.client_name().into_owned();
let system_version = self.platform.client_version().into_owned();
let platform = self.platform.clone();
let init_future = async move {
(&mut running_chain_init).await;
let running_chain = pin::Pin::new(&mut running_chain_init)
.take_output()
.unwrap();
service_starter.start(json_rpc_service::StartConfig {
platform,
sync_service: running_chain.sync_service,
network_service: (running_chain.network_service, 0), transactions_service: running_chain.transactions_service,
runtime_service: running_chain.runtime_service,
chain_spec: &chain_spec,
peer_id: &running_chain.network_identity,
system_name,
system_version,
genesis_block_hash,
genesis_block_state_root,
})
};
self.platform
.spawn_task("json-rpc-service-init".into(), init_future.boxed());
Some(frontend)
} else {
None
};
let (public_api_chain_destroyed_tx, public_api_chain_destroyed_rx) = oneshot::channel();
public_api_chains_entry.insert(PublicApiChain {
user_data: config.user_data,
key: new_chain_key,
chain_spec_chain_id,
json_rpc_frontend: json_rpc_frontend.clone(),
_public_api_chain_destroyed_tx: public_api_chain_destroyed_tx,
});
Ok(AddChainSuccess {
chain_id: new_chain_id,
json_rpc_responses: json_rpc_frontend.map(|f| JsonRpcResponses {
inner: Some(f),
public_api_chain_destroyed_rx,
}),
})
}
#[must_use]
pub fn remove_chain(&mut self, id: ChainId) -> TChain {
let removed_chain = self.public_api_chains.remove(id.0);
let running_chain = self.chains_by_key.get_mut(&removed_chain.key).unwrap();
if running_chain.num_references.get() == 1 {
log::info!(target: "smoldot", "Shutting down chain {}", running_chain.log_name);
self.chains_by_key.remove(&removed_chain.key);
} else {
running_chain.num_references =
NonZeroU32::new(running_chain.num_references.get() - 1).unwrap();
}
self.public_api_chains.shrink_to_fit();
removed_chain.user_data
}
pub fn chain_user_data_mut(&mut self, chain_id: ChainId) -> &mut TChain {
&mut self
.public_api_chains
.get_mut(chain_id.0)
.unwrap()
.user_data
}
pub fn json_rpc_request(
&mut self,
json_rpc_request: impl Into<String>,
chain_id: ChainId,
) -> Result<(), HandleRpcError> {
self.json_rpc_request_inner(json_rpc_request.into(), chain_id)
}
fn json_rpc_request_inner(
&mut self,
json_rpc_request: String,
chain_id: ChainId,
) -> Result<(), HandleRpcError> {
let json_rpc_sender = match self
.public_api_chains
.get_mut(chain_id.0)
.unwrap()
.json_rpc_frontend
{
Some(ref mut json_rpc_sender) => json_rpc_sender,
_ => panic!(),
};
json_rpc_sender.queue_rpc_request(json_rpc_request)
}
}
#[derive(Debug, derive_more::Display)]
pub enum AddChainError {
#[display(fmt = "Failed to decode chain specification: {_0}")]
ChainSpecParseError(chain_spec::ParseError),
#[display(fmt = "Either a checkpoint or the genesis storage must be provided")]
ChainSpecNeitherGenesisStorageNorCheckpoint,
#[display(fmt = "Invalid checkpoint in chain specification: {_0}")]
InvalidCheckpoint(chain_information::ValidityError),
#[display(fmt = "Failed to build genesis chain information: {_0}")]
InvalidGenesisStorage(chain_spec::FromGenesisStorageError),
#[display(fmt = "Couldn't find relevant relay chain")]
NoRelayChainFound,
#[display(fmt = "Multiple relevant relay chains found")]
MultipleRelayChains,
}
async fn start_services<TPlat: platform::PlatformRef>(
log_name: String,
platform: &TPlat,
chain_information: chain::chain_information::ValidChainInformation,
genesis_block_scale_encoded_header: Vec<u8>,
chain_spec: chain_spec::ChainSpec,
relay_chain: Option<&ChainServices<TPlat>>,
network_identify_agent_version: String,
network_noise_key: connection::NoiseKey,
) -> ChainServices<TPlat> {
let network_identity =
peer_id::PublicKey::Ed25519(*network_noise_key.libp2p_public_ed25519_key()).into_peer_id();
let (network_service, mut network_event_receivers) =
network_service::NetworkService::new(network_service::Config {
platform: platform.clone(),
num_events_receivers: 1, identify_agent_version: network_identify_agent_version,
noise_key: network_noise_key,
chains: vec![network_service::ConfigChain {
log_name: log_name.clone(),
has_grandpa_protocol: matches!(
chain_information.as_ref().finality,
chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
),
genesis_block_hash: header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
finalized_block_height: chain_information.as_ref().finalized_block_header.number,
best_block: (
chain_information.as_ref().finalized_block_header.number,
chain_information
.as_ref()
.finalized_block_header
.hash(chain_spec.block_number_bytes().into()),
),
fork_id: chain_spec.fork_id().map(|n| n.to_owned()),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
}],
})
.await;
let (sync_service, runtime_service) = if let Some(relay_chain) = relay_chain {
let sync_service = Arc::new(
sync_service::SyncService::new(sync_service::Config {
platform: platform.clone(),
log_name: log_name.clone(),
chain_information: chain_information.clone(),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
network_service: (network_service.clone(), 0),
network_events_receiver: network_event_receivers.pop().unwrap(),
parachain: Some(sync_service::ConfigParachain {
parachain_id: chain_spec.relay_chain().unwrap().1,
relay_chain_sync: relay_chain.runtime_service.clone(),
relay_chain_block_number_bytes: relay_chain.sync_service.block_number_bytes(),
}),
})
.await,
);
let runtime_service = Arc::new(
runtime_service::RuntimeService::new(runtime_service::Config {
log_name: log_name.clone(),
platform: platform.clone(),
sync_service: sync_service.clone(),
genesis_block_scale_encoded_header,
})
.await,
);
(sync_service, runtime_service)
} else {
let sync_service = Arc::new(
sync_service::SyncService::new(sync_service::Config {
log_name: log_name.clone(),
chain_information: chain_information.clone(),
block_number_bytes: usize::from(chain_spec.block_number_bytes()),
platform: platform.clone(),
network_service: (network_service.clone(), 0),
network_events_receiver: network_event_receivers.pop().unwrap(),
parachain: None,
})
.await,
);
let runtime_service = Arc::new(
runtime_service::RuntimeService::new(runtime_service::Config {
log_name: log_name.clone(),
platform: platform.clone(),
sync_service: sync_service.clone(),
genesis_block_scale_encoded_header,
})
.await,
);
(sync_service, runtime_service)
};
let transactions_service = Arc::new(
transactions_service::TransactionsService::new(transactions_service::Config {
log_name,
platform: platform.clone(),
sync_service: sync_service.clone(),
runtime_service: runtime_service.clone(),
network_service: (network_service.clone(), 0),
max_pending_transactions: NonZeroU32::new(64).unwrap(),
max_concurrent_downloads: NonZeroU32::new(3).unwrap(),
max_concurrent_validations: NonZeroU32::new(2).unwrap(),
})
.await,
);
ChainServices {
network_service,
network_identity,
runtime_service,
sync_service,
transactions_service,
}
}