1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
use std::pin::pin;
use std::time::Duration;
use actix::{AsyncContext, WrapFuture};
use calimero_context_client::group::ListAllGroupsRequest;
use futures_util::StreamExt;
use tracing::{debug, error, warn};
use super::NodeManager;
use crate::constants;
impl NodeManager {
pub(super) fn setup_startup_subscriptions(&self, ctx: &mut actix::Context<Self>) {
let node_client = self.clients.node.clone();
let contexts = self.clients.context.get_context_ids(None);
let _handle = ctx.spawn(
async move {
let mut contexts = pin!(contexts);
while let Some(context_id) = contexts.next().await {
let Ok(context_id) = context_id else {
error!("Failed to get context ID");
continue;
};
if let Err(err) = node_client.subscribe(&context_id).await {
error!(%context_id, %err, "Failed to subscribe to context");
}
}
}
.into_actor(self),
);
let node_client = self.clients.node.clone();
let context_client = self.clients.context.clone();
let _handle = ctx.spawn(
async move {
match context_client
.list_all_groups(ListAllGroupsRequest {
offset: 0,
limit: usize::MAX,
})
.await
{
Ok(groups) => {
for group in groups {
let ns_bytes = group.group_id.to_bytes();
if let Err(err) = node_client.subscribe_namespace(ns_bytes).await {
error!(?group.group_id, %err, "Failed to subscribe to group topic");
continue;
}
// Pull governance ops directly from a live peer.
// This catches any ops we missed while offline.
// sync_namespace_from_peer returns silently if no
// mesh peers are up yet; the heartbeat interval
// will retry on subsequent ticks.
if let Err(err) = node_client.sync_namespace(ns_bytes).await {
warn!(?group.group_id, %err, "Failed to queue namespace governance sync at startup");
}
}
}
Err(err) => {
error!(%err, "Failed to list groups for startup subscription");
}
}
}
.into_actor(self),
);
}
pub(super) fn setup_maintenance_intervals(&self, ctx: &mut actix::Context<Self>) {
let _handle = ctx.run_interval(
Duration::from_secs(constants::OLD_BLOBS_EVICTION_FREQUENCY_S),
|act, _ctx| {
act.state.evict_old_blobs();
},
);
let _handle = ctx.run_interval(
Duration::from_secs(constants::PENDING_DELTAS_CLEANUP_FREQUENCY_S),
|act, ctx| {
let max_age = Duration::from_secs(constants::PENDING_DELTA_MAX_AGE_S);
let delta_stores = act.state.delta_stores_handle();
let _ignored = ctx.spawn(
async move {
for entry in delta_stores.iter() {
let context_id = *entry.key();
let delta_store = entry.value();
let evicted = delta_store.cleanup_stale(max_age).await;
if evicted > 0 {
warn!(
%context_id,
evicted_count = evicted,
"Evicted stale pending deltas (timed out after 5 min)"
);
}
let stats = delta_store.pending_stats().await;
if stats.count > 0 {
debug!(
%context_id,
pending_count = stats.count,
oldest_age_secs = stats.oldest_age_secs,
missing_parents = stats.total_missing_parents,
"Pending delta statistics"
);
if stats.count > constants::PENDING_DELTA_SNAPSHOT_THRESHOLD {
warn!(
%context_id,
pending_count = stats.count,
threshold = constants::PENDING_DELTA_SNAPSHOT_THRESHOLD,
"Too many pending deltas - state sync will recover on next periodic sync"
);
}
}
}
}
.into_actor(act),
);
},
);
}
pub(super) fn setup_hash_heartbeat_interval(&self, ctx: &mut actix::Context<Self>) {
let _handle = ctx.run_interval(
Duration::from_secs(constants::HASH_HEARTBEAT_FREQUENCY_S),
|act, ctx| {
let context_client = act.clients.context.clone();
let node_client = act.clients.node.clone();
let _ignored = ctx.spawn(
async move {
let contexts = context_client.get_context_ids(None);
let mut contexts_stream = pin!(contexts);
while let Some(context_id_result) = contexts_stream.next().await {
let Ok(context_id) = context_id_result else {
continue;
};
let Ok(Some(context)) = context_client.get_context(&context_id) else {
continue;
};
if context.root_hash.is_zero() {
debug!(%context_id, "Skipping heartbeat broadcast: Node uninitialized");
continue;
}
if let Err(err) = node_client
.broadcast_heartbeat(
&context_id,
context.root_hash,
context.dag_heads.clone(),
)
.await
{
debug!(
%context_id,
error = %err,
"Failed to broadcast hash heartbeat"
);
}
}
}
.into_actor(act),
);
},
);
}
}