1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use std::pin::pin;
use std::time::Duration;
use actix::{AsyncContext, WrapFuture};
use calimero_context_client::group::ListAllGroupsRequest;
use futures_util::StreamExt;
use tracing::{debug, error, warn};
use super::NodeManager;
use crate::constants;
impl NodeManager {
pub(super) fn setup_startup_subscriptions(&self, ctx: &mut actix::Context<Self>) {
let node_client = self.clients.node.clone();
let contexts = self.clients.context.get_context_ids(None);
let _handle = ctx.spawn(
async move {
let mut contexts = pin!(contexts);
while let Some(context_id) = contexts.next().await {
let Ok(context_id) = context_id else {
error!("Failed to get context ID");
continue;
};
if let Err(err) = node_client.subscribe(&context_id).await {
error!(%context_id, %err, "Failed to subscribe to context");
}
}
}
.into_actor(self),
);
let node_client = self.clients.node.clone();
let context_client = self.clients.context.clone();
let _handle = ctx.spawn(
async move {
match context_client
.list_all_groups(ListAllGroupsRequest {
offset: 0,
limit: usize::MAX,
})
.await
{
Ok(groups) => {
for group in groups {
let ns_bytes = group.group_id.to_bytes();
if let Err(err) = node_client.subscribe_namespace(ns_bytes).await {
error!(?group.group_id, %err, "Failed to subscribe to group topic");
continue;
}
// Pull governance ops directly from a live peer.
// This catches any ops we missed while offline.
// sync_namespace_from_peer returns silently if no
// mesh peers are up yet; the heartbeat interval
// will retry on subsequent ticks.
if let Err(err) = node_client.sync_namespace(ns_bytes).await {
warn!(?group.group_id, %err, "Failed to queue namespace governance sync at startup");
}
}
}
Err(err) => {
error!(%err, "Failed to list groups for startup subscription");
}
}
}
.into_actor(self),
);
}
pub(super) fn setup_maintenance_intervals(&self, ctx: &mut actix::Context<Self>) {
let _handle = ctx.run_interval(
Duration::from_secs(constants::OLD_BLOBS_EVICTION_FREQUENCY_S),
|act, _ctx| {
act.state.evict_old_blobs();
},
);
// Periodic gossipsub mesh-peer-count snapshot. Logs one entry per
// subscribed topic so CI / operators can see the actual mesh size,
// independent of the libp2p-gossipsub internal `Updating mesh,
// new mesh: {…}` heartbeat log (which reports additions, not
// current state, and is easy to misread as "mesh is empty" when
// the mesh has simply already been populated).
let _handle = ctx.run_interval(
Duration::from_secs(constants::MESH_STATS_LOG_FREQUENCY_S),
|act, ctx| {
let network_client = act.clients.node.network_client().clone();
let _ignored = ctx.spawn(
async move {
let stats = network_client.mesh_stats().await;
if stats.is_empty() {
debug!("gossipsub mesh: no subscribed topics");
return;
}
let total: usize = stats.iter().map(|(_, n)| *n).sum();
let topics = stats.len();
for (topic, count) in &stats {
debug!(%topic, mesh_peers = count, "gossipsub mesh size");
}
debug!(topics, total_mesh_peers = total, "gossipsub mesh summary");
}
.into_actor(act),
);
},
);
let _handle = ctx.run_interval(
Duration::from_secs(constants::PENDING_DELTAS_CLEANUP_FREQUENCY_S),
|act, ctx| {
let max_age = Duration::from_secs(constants::PENDING_DELTA_MAX_AGE_S);
let delta_stores = act.state.delta_stores_handle();
let _ignored = ctx.spawn(
async move {
for entry in delta_stores.iter() {
let context_id = *entry.key();
let delta_store = entry.value();
let evicted = delta_store.cleanup_stale(max_age).await;
if evicted > 0 {
warn!(
%context_id,
evicted_count = evicted,
"Evicted stale pending deltas (timed out after 5 min)"
);
}
let stats = delta_store.pending_stats().await;
if stats.count > 0 {
debug!(
%context_id,
pending_count = stats.count,
oldest_age_secs = stats.oldest_age_secs,
missing_parents = stats.total_missing_parents,
"Pending delta statistics"
);
if stats.count > constants::PENDING_DELTA_SNAPSHOT_THRESHOLD {
warn!(
%context_id,
pending_count = stats.count,
threshold = constants::PENDING_DELTA_SNAPSHOT_THRESHOLD,
"Too many pending deltas - state sync will recover on next periodic sync"
);
}
}
}
}
.into_actor(act),
);
},
);
}
pub(super) fn setup_hash_heartbeat_interval(&self, ctx: &mut actix::Context<Self>) {
let _handle = ctx.run_interval(
Duration::from_secs(constants::HASH_HEARTBEAT_FREQUENCY_S),
|act, ctx| {
let context_client = act.clients.context.clone();
let node_client = act.clients.node.clone();
let _ignored = ctx.spawn(
async move {
let contexts = context_client.get_context_ids(None);
let mut contexts_stream = pin!(contexts);
while let Some(context_id_result) = contexts_stream.next().await {
let Ok(context_id) = context_id_result else {
continue;
};
let Ok(Some(context)) = context_client.get_context(&context_id) else {
continue;
};
if context.root_hash.is_zero() {
debug!(%context_id, "Skipping heartbeat broadcast: Node uninitialized");
continue;
}
if let Err(err) = node_client
.broadcast_heartbeat(
&context_id,
context.root_hash,
context.dag_heads.clone(),
)
.await
{
debug!(
%context_id,
error = %err,
"Failed to broadcast hash heartbeat"
);
}
}
}
.into_actor(act),
);
},
);
}
}