use std::any::TypeId;
use std::time::Duration;
use blockstore::Blockstore;
use libp2p::Multiaddr;
use libp2p::identity::Keypair;
use tracing::{info, warn};
use crate::blockstore::InMemoryBlockstore;
use crate::events::EventSubscriber;
use crate::network::Network;
use crate::node::{Node, NodeConfig, Result};
use crate::store::{InMemoryStore, Store, StoreError};
#[cfg(target_arch = "wasm32")]
use crate::utils::resolve_bootnode_addresses;
const HOUR: u64 = 60 * 60;
const DAY: u64 = 24 * HOUR;
pub const SAMPLING_WINDOW: Duration = Duration::from_secs(7 * DAY);
pub const DEFAULT_PRUNING_WINDOW: Duration = Duration::from_secs(7 * DAY + HOUR);
pub const DEFAULT_PRUNING_WINDOW_IN_MEMORY: Duration = Duration::from_secs(0);
pub struct NodeBuilder<B, S>
where
B: Blockstore + 'static,
S: Store + 'static,
{
blockstore: B,
store: S,
keypair: Option<Keypair>,
network: Option<Network>,
bootnodes: Vec<Multiaddr>,
listen: Vec<Multiaddr>,
sync_batch_size: Option<u64>,
pruning_window: Option<Duration>,
}
#[derive(Debug, thiserror::Error)]
pub enum NodeBuilderError {
#[error("Network is not specified")]
NetworkNotSpecified,
#[error("Could not resolve any of the bootnode addresses")]
FailedResolvingBootnodes,
#[error(transparent)]
IdentityDecodingError(#[from] libp2p::identity::DecodingError),
#[error(transparent)]
StoreError(#[from] StoreError),
}
impl NodeBuilder<InMemoryBlockstore, InMemoryStore> {
pub fn new() -> Self {
NodeBuilder {
blockstore: InMemoryBlockstore::new(),
store: InMemoryStore::new(),
keypair: None,
network: None,
bootnodes: Vec::new(),
listen: Vec::new(),
sync_batch_size: None,
pruning_window: None,
}
}
}
impl Default for NodeBuilder<InMemoryBlockstore, InMemoryStore> {
fn default() -> Self {
NodeBuilder::new()
}
}
impl<B, S> NodeBuilder<B, S>
where
B: Blockstore + 'static,
S: Store + 'static,
{
pub async fn start(self) -> Result<Node<B, S>> {
let (node, _) = self.start_subscribed().await?;
Ok(node)
}
pub async fn start_subscribed(self) -> Result<(Node<B, S>, EventSubscriber)> {
let config = self.build_config().await?;
Node::start(config).await
}
pub fn blockstore<B2>(self, blockstore: B2) -> NodeBuilder<B2, S>
where
B2: Blockstore + 'static,
{
NodeBuilder {
blockstore,
store: self.store,
keypair: self.keypair,
network: self.network,
bootnodes: self.bootnodes,
listen: self.listen,
sync_batch_size: self.sync_batch_size,
pruning_window: self.pruning_window,
}
}
pub fn store<S2>(self, store: S2) -> NodeBuilder<B, S2>
where
S2: Store + 'static,
{
NodeBuilder {
blockstore: self.blockstore,
store,
keypair: self.keypair,
network: self.network,
bootnodes: self.bootnodes,
listen: self.listen,
sync_batch_size: self.sync_batch_size,
pruning_window: self.pruning_window,
}
}
pub fn network(self, network: Network) -> Self {
NodeBuilder {
network: Some(network),
..self
}
}
pub fn keypair(self, keypair: Keypair) -> Self {
NodeBuilder {
keypair: Some(keypair),
..self
}
}
pub fn bootnodes<I>(self, addrs: I) -> Self
where
I: IntoIterator<Item = Multiaddr>,
{
NodeBuilder {
bootnodes: addrs.into_iter().collect(),
..self
}
}
pub fn listen<I>(self, addrs: I) -> Self
where
I: IntoIterator<Item = Multiaddr>,
{
NodeBuilder {
listen: addrs.into_iter().collect(),
..self
}
}
pub fn sync_batch_size(self, batch_size: u64) -> Self {
NodeBuilder {
sync_batch_size: Some(batch_size),
..self
}
}
pub fn pruning_window(self, dur: Duration) -> Self {
NodeBuilder {
pruning_window: Some(dur),
..self
}
}
async fn build_config(self) -> Result<NodeConfig<B, S>, NodeBuilderError> {
let network = self.network.ok_or(NodeBuilderError::NetworkNotSpecified)?;
let bootnodes = if self.bootnodes.is_empty() {
network.canonical_bootnodes().collect()
} else {
self.bootnodes
};
if bootnodes.is_empty() && self.listen.is_empty() {
warn!(
"Node has empty bootnodes and listening addresses. It will never connect to another peer."
);
}
#[cfg(target_arch = "wasm32")]
let bootnodes = {
let bootnodes_was_empty = bootnodes.is_empty();
let bootnodes = resolve_bootnode_addresses(bootnodes).await;
if bootnodes.is_empty() && !bootnodes_was_empty {
return Err(NodeBuilderError::FailedResolvingBootnodes);
}
bootnodes
};
let in_memory_stores_used = TypeId::of::<S>() == TypeId::of::<InMemoryStore>()
|| TypeId::of::<B>() == TypeId::of::<InMemoryBlockstore>();
let pruning_window = if let Some(dur) = self.pruning_window {
dur
} else if in_memory_stores_used {
DEFAULT_PRUNING_WINDOW_IN_MEMORY
} else {
DEFAULT_PRUNING_WINDOW
};
info!("Sampling window: {SAMPLING_WINDOW:?}, Pruning window: {pruning_window:?}");
let p2p_local_keypair = if let Some(keypair) = self.keypair {
keypair
} else {
self.store.get_identity().await?
};
Ok(NodeConfig {
blockstore: self.blockstore,
store: self.store,
network_id: network.id().to_owned(),
p2p_local_keypair,
p2p_bootnodes: bootnodes,
p2p_listen_on: self.listen,
sync_batch_size: self.sync_batch_size.unwrap_or(512),
sampling_window: SAMPLING_WINDOW,
pruning_window,
})
}
}