use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use actix::Actor;
use calimero_blobstore::config::BlobStoreConfig;
use calimero_blobstore::{BlobManager, FileSystem};
use calimero_context::config::ContextConfig;
use calimero_context::ContextManager;
use calimero_context_config::client::Client as ExternalClient;
use calimero_context_primitives::client::ContextClient;
use calimero_network::NetworkManager;
use calimero_network_primitives::client::NetworkClient;
use calimero_network_primitives::config::NetworkConfig;
use calimero_node_primitives::client::NodeClient;
use calimero_server::config::ServerConfig;
use calimero_store::config::StoreConfig;
use calimero_store::db::Database;
use calimero_store::Store;
use calimero_store_encryption::EncryptedDatabase;
use calimero_store_rocksdb::RocksDB;
use calimero_utils_actix::LazyRecipient;
use camino::Utf8PathBuf;
use libp2p::gossipsub::IdentTopic;
use libp2p::identity::Keypair;
use prometheus_client::registry::Registry;
use tokio::sync::{broadcast, mpsc};
use tracing::info;
use crate::arbiter_pool::ArbiterPool;
use crate::gc::GarbageCollector;
use crate::network_event_channel::{self, NetworkEventChannelConfig};
use crate::network_event_processor::NetworkEventBridge;
use crate::sync::{SyncConfig, SyncManager};
use crate::NodeManager;
pub use calimero_node_primitives::NodeMode;
#[derive(Debug, Clone)]
pub struct SpecializedNodeConfig {
pub invite_topic: String,
pub accept_mock_tee: bool,
}
#[derive(Debug)]
pub struct NodeConfig {
pub home: Utf8PathBuf,
pub identity: Keypair,
pub network: NetworkConfig,
pub sync: SyncConfig,
pub datastore: StoreConfig,
pub blobstore: BlobStoreConfig,
pub context: ContextConfig,
pub server: ServerConfig,
pub gc_interval_secs: Option<u64>, pub mode: NodeMode,
pub specialized_node: SpecializedNodeConfig,
}
pub async fn start(config: NodeConfig) -> eyre::Result<()> {
let mut registry = <Registry>::default();
let peer_id = config.identity.public().to_peer_id();
info!("Peer ID: {}", peer_id);
let datastore = if let Some(ref key) = config.datastore.encryption_key {
info!("Opening encrypted datastore");
let inner_db = RocksDB::open(&config.datastore)?;
let encrypted_db = EncryptedDatabase::wrap(inner_db, key.clone())?;
Store::new(std::sync::Arc::new(encrypted_db))
} else {
Store::open::<RocksDB>(&config.datastore)?
};
let blobstore = BlobManager::new(datastore.clone(), FileSystem::new(&config.blobstore).await?);
let node_recipient = LazyRecipient::new();
let network_recipient = LazyRecipient::new();
let context_recipient = LazyRecipient::new();
let channel_config = NetworkEventChannelConfig {
channel_size: 1000, warning_threshold: 0.8, stats_log_interval: Duration::from_secs(30),
};
let (network_event_sender, network_event_receiver) =
network_event_channel::channel(channel_config, &mut registry);
let mut arbiter_pool = ArbiterPool::new().await?;
let network_manager = NetworkManager::new(
&config.network,
Arc::new(network_event_sender),
&mut registry,
)
.await?;
let network_client = NetworkClient::new(network_recipient.clone());
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(network_recipient.init(ctx), "failed to initialize");
network_manager
});
info!(
topic = %config.specialized_node.invite_topic,
"Subscribing to specialized node invite topic"
);
let _ignored = network_client
.subscribe(IdentTopic::new(
config.specialized_node.invite_topic.clone(),
))
.await?;
let (event_sender, _) = broadcast::channel(256);
let (ctx_sync_tx, ctx_sync_rx) = mpsc::channel(64);
let node_client = NodeClient::new(
datastore.clone(),
blobstore.clone(),
network_client.clone(),
node_recipient.clone(),
event_sender,
ctx_sync_tx,
config.specialized_node.invite_topic.clone(),
);
let external_client = ExternalClient::from_config(&config.context.client);
let context_client = ContextClient::new(
datastore.clone(),
node_client.clone(),
external_client,
context_recipient.clone(),
);
let context_manager = ContextManager::new(
datastore.clone(),
node_client.clone(),
context_client.clone(),
config.context.client.clone(),
Some(&mut registry),
);
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(context_recipient.init(ctx), "failed to initialize");
context_manager
});
let node_state = crate::NodeState::new(config.specialized_node.accept_mock_tee, config.mode);
let sync_manager = SyncManager::new(
config.sync,
node_client.clone(),
context_client.clone(),
network_client.clone(),
node_state.clone(),
ctx_sync_rx,
);
let node_manager = NodeManager::new(
blobstore.clone(),
sync_manager.clone(),
context_client.clone(),
node_client.clone(),
node_state.clone(),
);
let node_manager_addr = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(node_recipient.init(ctx), "failed to initialize");
node_manager
});
let bridge = NetworkEventBridge::new(network_event_receiver, node_manager_addr);
let bridge_shutdown = bridge.shutdown_handle();
let bridge_handle = tokio::spawn(bridge.run());
let server = calimero_server::start(
config.server.clone(),
context_client.clone(),
node_client.clone(),
datastore.clone(),
registry,
);
let gc_interval = Duration::from_secs(
config.gc_interval_secs.unwrap_or(12 * 3600), );
let gc = GarbageCollector::new(datastore.clone(), gc_interval);
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |_ctx| gc);
let mut sync = pin!(sync_manager.start());
let mut server = tokio::spawn(server);
let mut bridge = bridge_handle;
info!("Node started successfully");
loop {
tokio::select! {
_ = &mut sync => {},
res = &mut server => res??,
res = &mut bridge => {
tracing::warn!("Network event bridge stopped: {:?}", res);
}
res = &mut arbiter_pool.system_handle => {
bridge_shutdown.notify_one();
break res?;
}
}
}
}