1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
//! Node startup and initialization.
//!
//! **Purpose**: Bootstraps the node with all required services and actors.
//! **Main Function**: `start(NodeConfig)` - initializes and runs the node.
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use actix::Actor;
use calimero_blobstore::config::BlobStoreConfig;
use calimero_blobstore::{BlobManager as BlobStore, FileSystem};
use calimero_context::config::ContextConfig;
use calimero_context::ContextManager;
use calimero_context_client::client::ContextClient;
use calimero_network::NetworkManager;
use calimero_network_primitives::client::NetworkClient;
use calimero_network_primitives::config::NetworkConfig;
use calimero_node_primitives::client::{BlobManager, NodeClient, SyncClient};
use calimero_server::config::ServerConfig;
use calimero_store::config::StoreConfig;
use calimero_store::db::Database;
use calimero_store::Store;
use calimero_store_encryption::EncryptedDatabase;
use calimero_store_rocksdb::RocksDB;
use calimero_utils_actix::LazyRecipient;
use camino::Utf8PathBuf;
use libp2p::gossipsub::IdentTopic;
use libp2p::identity::Keypair;
use prometheus_client::registry::Registry;
use tokio::sync::{broadcast, mpsc};
use tracing::info;
use crate::arbiter_pool::ArbiterPool;
use crate::gc::GarbageCollector;
use crate::network_event_channel::{self, NetworkEventChannelConfig};
use crate::network_event_processor::NetworkEventBridge;
use crate::state_delta_bridge::{start_state_delta_actor, STATE_DELTA_CHANNEL_CAPACITY};
use crate::sync::{SyncConfig, SyncManager};
use crate::sync_session_bridge::{start_sync_session_actor, SYNC_SESSION_CHANNEL_CAPACITY};
use crate::NodeManager;
pub use calimero_node_primitives::NodeMode;
/// Configuration for specialized node functionality (e.g., read-only nodes).
#[derive(Debug, Clone)]
pub struct SpecializedNodeConfig {
/// Topic name for specialized node invite discovery messages.
pub invite_topic: String,
/// Whether to accept mock TEE attestation (testing only).
pub accept_mock_tee: bool,
}
#[derive(Debug)]
pub struct NodeConfig {
pub home: Utf8PathBuf,
pub identity: Keypair,
pub network: NetworkConfig,
pub sync: SyncConfig,
pub datastore: StoreConfig,
pub blobstore: BlobStoreConfig,
pub context: ContextConfig,
pub server: ServerConfig,
pub gc_interval_secs: Option<u64>, // Optional GC interval in seconds (default: 12 hours)
pub mode: NodeMode,
pub specialized_node: SpecializedNodeConfig,
}
pub async fn start(config: NodeConfig) -> eyre::Result<()> {
let mut registry = <Registry>::default();
let peer_id = config.identity.public().to_peer_id();
info!("Peer ID: {}", peer_id);
// Open datastore with optional encryption
let datastore = if let Some(ref key) = config.datastore.encryption_key {
info!("Opening encrypted datastore");
let inner_db = RocksDB::open(&config.datastore)?;
let encrypted_db = EncryptedDatabase::wrap(inner_db, key.clone())?;
Store::new(std::sync::Arc::new(encrypted_db))
} else {
Store::open::<RocksDB>(&config.datastore)?
};
let blob_store = BlobStore::new(datastore.clone(), FileSystem::new(&config.blobstore).await?);
let blob_manager = BlobManager::new(blob_store.clone());
let node_recipient = LazyRecipient::new();
let network_recipient = LazyRecipient::new();
let context_recipient = LazyRecipient::new();
// Create dedicated network event channel for reliable message delivery
// This replaces LazyRecipient<NetworkEvent> to avoid cross-arbiter message loss
let channel_config = NetworkEventChannelConfig {
channel_size: 1000, // Configurable, handles burst patterns
warning_threshold: 0.8, // Log warning at 80% capacity
stats_log_interval: Duration::from_secs(30),
};
let (network_event_sender, network_event_receiver) =
network_event_channel::channel(channel_config, &mut registry);
// Create arbiter pool for spawning actors across threads
let mut arbiter_pool = ArbiterPool::new().await?;
// Create NetworkManager with channel-based dispatcher for reliable event delivery
let network_manager = NetworkManager::new(
&config.network,
Arc::new(network_event_sender),
&mut registry,
)
.await?;
let network_client = NetworkClient::new(network_recipient.clone());
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(network_recipient.init(ctx), "failed to initialize");
network_manager
});
info!(
topic = %config.specialized_node.invite_topic,
"Subscribing to specialized node invite topic"
);
let _ignored = network_client
.subscribe(IdentTopic::new(
config.specialized_node.invite_topic.clone(),
))
.await?;
// Increased buffer sizes for better burst handling and concurrency
// 256 events: supports more concurrent WebSocket clients
// 64 sync requests: handles burst context joins/syncs
let (event_sender, _) = broadcast::channel(256);
let (ctx_sync_tx, ctx_sync_rx) = mpsc::channel(64);
let (ns_sync_tx, ns_sync_rx) = mpsc::channel(16);
let (ns_join_tx, ns_join_rx) = mpsc::channel(16);
let sync_client = SyncClient::new(ctx_sync_tx, ns_sync_tx, ns_join_tx);
// Channel for the execute path to notify the node about locally-
// applied deltas so the in-memory DeltaStore stays current without
// re-scanning the DB on every interval sync. Drained by a task
// spawned once `node_state` is available (below).
let (local_delta_tx, mut local_delta_rx) = mpsc::channel(256);
let node_client = NodeClient::new(
datastore.clone(),
blob_manager.clone(),
network_client.clone(),
node_recipient.clone(),
event_sender,
sync_client,
config.specialized_node.invite_topic.clone(),
Some(local_delta_tx),
);
let context_client = ContextClient::new(
datastore.clone(),
node_client.clone(),
context_recipient.clone(),
);
let context_manager = ContextManager::new(
datastore.clone(),
node_client.clone(),
context_client.clone(),
Some(&mut registry),
);
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(context_recipient.init(ctx), "failed to initialize");
context_manager
});
let node_state = crate::NodeState::new(config.specialized_node.accept_mock_tee, config.mode);
// Drain locally-applied delta notifications from the execute path
// and register them into the in-memory DeltaStore. Replaces the
// per-interval-sync `load_persisted_deltas` rescan that existed
// solely to catch up on execute-side writes.
{
let delta_stores = node_state.delta_stores_handle();
let _drainer = tokio::spawn(async move {
while let Some(msg) = local_delta_rx.recv().await {
// Clone the DeltaStore value out of the DashMap and
// drop the Ref before awaiting — holding a shard lock
// across `.await` would block other context shards.
let store = match delta_stores.get(&msg.context_id) {
Some(entry) => entry.value().clone(),
None => {
// Benign race: execute finished before the
// DeltaStore entry was created (e.g. isolated
// single-node test). Startup
// `load_persisted_deltas` will pick the row up
// when the store is eventually constructed.
tracing::debug!(
context_id = %msg.context_id,
"no DeltaStore for local applied delta, skipping"
);
continue;
}
};
let delta = calimero_dag::CausalDelta {
id: msg.delta_id,
parents: msg.parents,
payload: msg.actions,
hlc: msg.hlc,
expected_root_hash: msg.expected_root_hash,
kind: calimero_dag::DeltaKind::Regular,
};
match store.add_local_applied_delta(delta).await {
Ok(cascaded_events) if !cascaded_events.is_empty() => {
// Cascaded children's DB state + dag_heads were
// persisted inside add_local_applied_delta; the
// events list returned here carries payloads that
// still need handler execution. Today the drainer
// has no line into NodeClients / NodeManager, so
// we rely on the restart-replay contract (#2185):
// records stay `applied: true, events: Some(..)`
// until the next `load_persisted_deltas` surfaces
// them. Log at info so missed handler runs are
// observable while plumbing is added.
tracing::info!(
context_id = %msg.context_id,
cascaded_count = cascaded_events.len(),
"Cascaded events persisted; awaiting restart replay for handler execution"
);
}
Ok(_) => {}
Err(e) => {
tracing::warn!(
error = ?e,
context_id = %msg.context_id,
"failed to register local applied delta in DAG"
);
}
}
}
});
}
let mut sync_manager = SyncManager::new(
config.sync,
node_client.clone(),
context_client.clone(),
network_client.clone(),
node_state.clone(),
ctx_sync_rx,
ns_sync_rx,
ns_join_rx,
);
// Spin up the dedicated StateDelta actor on its own Arbiter
// BEFORE constructing NodeManager so the sender can be threaded
// through. The arbiter is drawn from `arbiter_pool` (which owns
// the Actix `System`); see issue #2299.
let state_delta_arbiter = arbiter_pool.get().await?;
let state_delta_tx =
start_state_delta_actor(&state_delta_arbiter, STATE_DELTA_CHANNEL_CAPACITY);
// Spin up the dedicated SyncSession actor on its own Arbiter
// (#2316). The actor receives a `SyncManager` clone so it can
// call `handle_opened_stream` (responder) and
// `perform_interval_sync` (initiator) without contending with
// the `NodeManager` arbiter or the `SyncManager::start` select
// loop. Initiator results are forwarded back to the original
// `sync_manager` via `session_result_tx`/`session_result_rx`
// so per-context tracking state still updates.
let sync_session_arbiter = arbiter_pool.get().await?;
// Unbounded result channel: a dropped result would leave the
// per-context `last_sync = None` forever and stall that context
// (same failure shape as the C1 dispatch stall). Result messages
// are small (~32 bytes); bounding adds risk without payoff.
let (session_result_tx, session_result_rx) = tokio::sync::mpsc::unbounded_channel();
let sync_session_tx = start_sync_session_actor(
&sync_session_arbiter,
SYNC_SESSION_CHANNEL_CAPACITY,
config.sync.max_concurrent,
sync_manager.clone(),
config.sync.timeout,
Some(session_result_tx),
);
sync_manager.set_session_handles(sync_session_tx.clone(), session_result_rx);
let node_manager = NodeManager::new(
blob_store.clone(),
sync_manager.clone(),
context_client.clone(),
node_client.clone(),
datastore.clone(),
node_state.clone(),
state_delta_tx,
sync_session_tx,
);
// Start NodeManager actor and get its address
let node_manager_addr = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |ctx| {
assert!(node_recipient.init(ctx), "failed to initialize");
node_manager
});
// Start the network event bridge in a dedicated tokio task
// This bridges the channel to NodeManager, ensuring reliable message delivery
// by avoiding cross-arbiter message passing issues
let bridge = NetworkEventBridge::new(network_event_receiver, node_manager_addr);
let bridge_shutdown = bridge.shutdown_handle();
let bridge_handle = tokio::spawn(bridge.run());
let server = calimero_server::start(
config.server.clone(),
context_client.clone(),
node_client.clone(),
datastore.clone(),
registry,
);
// Start garbage collection actor
let gc_interval = Duration::from_secs(
config.gc_interval_secs.unwrap_or(12 * 3600), // Default: 12 hours
);
let gc = GarbageCollector::new(datastore.clone(), gc_interval);
let _ignored = Actor::start_in_arbiter(&arbiter_pool.get().await?, move |_ctx| gc);
let mut sync = pin!(sync_manager.start());
let mut server = tokio::spawn(server);
let mut bridge = bridge_handle;
info!("Node started successfully");
loop {
tokio::select! {
_ = &mut sync => {},
res = &mut server => res??,
res = &mut bridge => {
// Bridge task completed (channel closed or shutdown signal)
tracing::warn!("Network event bridge stopped: {:?}", res);
}
res = &mut arbiter_pool.system_handle => {
// Signal bridge shutdown before exiting. The
// StateDelta arbiter handle (`state_delta_arbiter`)
// lives until this function returns; the underlying
// Actix arbiter thread is owned by the System and
// shuts down with it.
bridge_shutdown.notify_one();
break res?;
}
}
}
}