1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
//! Sync state tracking and sequence validation.
//!
//! **Observability**: Tracks sync history for diagnostics and metrics.
//! **Robustness**: Implements exponential backoff on failures.
use eyre::bail;
use libp2p::PeerId;
use tokio::time::{self, Instant};
/// Sync protocol type for tracking which protocol was used (internal metrics).
///
/// This is a simplified enum for internal tracking and metrics, mapping from
/// the full [`calimero_node_primitives::sync::SyncProtocol`] enum.
///
/// Note: With DAG-based sync, we don't have active sync protocols.
/// State propagates automatically via gossipsub BroadcastMessage::StateDelta.
#[derive(Debug, Clone, Copy)]
pub(crate) enum SyncProtocol {
/// No active sync - DAG uses gossipsub broadcast
None,
/// DAG catchup via heads request (for newly joined nodes)
DagCatchup,
/// Full snapshot sync (used when delta sync is not possible)
SnapshotSync,
}
impl From<&calimero_node_primitives::sync::SyncProtocol> for SyncProtocol {
/// Maps the full 7-variant `SyncProtocol` from primitives to the internal 3-variant
/// tracking enum for metrics purposes.
fn from(p: &calimero_node_primitives::sync::SyncProtocol) -> Self {
use calimero_node_primitives::sync::SyncProtocol as P;
match p {
P::None => Self::None,
P::DeltaSync { .. } => Self::DagCatchup,
P::Snapshot { .. }
| P::HashComparison { .. }
| P::BloomFilter { .. }
| P::SubtreePrefetch { .. }
| P::LevelWise { .. } => Self::SnapshotSync,
}
}
}
/// Tracks sync state and history for a context.
///
/// Maintains sync history, failure tracking, and implements exponential backoff
/// for contexts that repeatedly fail to sync.
#[derive(Debug, Clone)]
pub(crate) struct SyncState {
/// Last sync time (None = sync in progress or never synced)
last_sync: Option<Instant>,
/// Last peer we successfully synced with
last_peer: Option<PeerId>,
/// Consecutive sync failures (resets on success)
failure_count: u32,
/// Last sync error message (for diagnostics)
last_error: Option<String>,
/// Total successful syncs (lifetime counter)
pub success_count: u64,
/// Last protocol used (Delta, Full, State)
last_protocol: Option<SyncProtocol>,
}
impl SyncState {
/// Create new sync state (never synced)
pub(crate) fn new() -> Self {
Self {
last_sync: None,
last_peer: None,
failure_count: 0,
last_error: None,
success_count: 0,
last_protocol: None,
}
}
/// Mark sync as started (prevents concurrent syncs)
pub(crate) fn start(&mut self) {
self.last_sync = None; // In progress
}
/// Mark sync as successful
pub(crate) fn on_success(&mut self, peer: PeerId, protocol: SyncProtocol) {
self.last_sync = Some(Instant::now());
self.last_peer = Some(peer);
self.failure_count = 0;
self.last_error = None;
self.success_count += 1;
self.last_protocol = Some(protocol);
}
/// Mark sync as failed
pub(crate) fn on_failure(&mut self, error: String) {
self.last_sync = Some(Instant::now()); // Not in progress anymore
self.failure_count += 1;
self.last_error = Some(error);
}
/// Mark a sync attempt as completed without progress AND without
/// counting as a failure. Used when the peer responded that they
/// don't have the context yet (e.g. `PeerNotMaterialized`) — the
/// attempt is over but applying exponential backoff would punish
/// the context for picking the wrong peer, starving legitimate
/// sync against other peers behind 2s/4s/…/256s delays.
///
/// Clears the in-progress marker (`last_sync` becomes `Some(_)`)
/// so the next dispatch tick can re-attempt against a different
/// peer without waiting for the wedge watchdog (which would
/// otherwise stall the context for `session_wedge_grace`).
/// `failure_count` and `last_error` are intentionally untouched.
pub(crate) fn on_not_materialized(&mut self) {
self.last_sync = Some(Instant::now());
}
/// Calculate exponential backoff delay based on failure count
pub(crate) fn backoff_delay(&self) -> time::Duration {
// Exponential backoff: 2^failures seconds, max 5 minutes
let backoff_secs = 2u64.pow(self.failure_count.min(8));
time::Duration::from_secs(backoff_secs.min(300))
}
/// Get last sync time
pub(crate) fn last_sync(&self) -> Option<Instant> {
self.last_sync
}
/// Get failure count
pub(crate) fn failure_count(&self) -> u32 {
self.failure_count
}
/// Take last_sync value (for marking sync start while keeping old time)
pub(crate) fn take_last_sync(&mut self) -> Option<Instant> {
self.last_sync.take()
}
}
impl Default for SyncState {
fn default() -> Self {
Self::new()
}
}
/// Sequence ID generator and validator for protocol messages.
///
/// Ensures messages are processed in order during sync protocols.
/// Prevents message reordering attacks and protocol confusion.
#[derive(Debug, Default)]
pub(crate) struct Sequencer {
current: u64,
}
impl Sequencer {
/// Get next sequence ID and advance counter.
pub(crate) fn next(&mut self) -> u64 {
let id = self.current;
self.current += 1;
id
}
/// Validate and advance to expected sequence ID.
///
/// # Errors
///
/// Returns error if the provided ID doesn't match the expected sequence.
/// This indicates out-of-order messages or a protocol violation.
pub(crate) fn expect(&mut self, expected: u64) -> eyre::Result<()> {
if self.current != expected {
bail!("sequence error: expected {}, at {}", expected, self.current);
}
self.current += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use calimero_node_primitives::sync::SyncProtocol as PrimitivesSyncProtocol;
/// Test that SyncProtocol::None maps to tracking::SyncProtocol::None
#[test]
fn test_from_primitives_none() {
let primitive = PrimitivesSyncProtocol::None;
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::None));
}
/// Test that DeltaSync maps to DagCatchup
#[test]
fn test_from_primitives_delta_sync() {
let primitive = PrimitivesSyncProtocol::DeltaSync {
missing_delta_ids: vec![[1; 32], [2; 32]],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::DagCatchup));
}
/// Test that Snapshot maps to SnapshotSync
#[test]
fn test_from_primitives_snapshot() {
let primitive = PrimitivesSyncProtocol::Snapshot {
compressed: true,
verified: true,
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
/// Test that HashComparison maps to SnapshotSync (fallback category)
#[test]
fn test_from_primitives_hash_comparison() {
let primitive = PrimitivesSyncProtocol::HashComparison {
root_hash: [3; 32],
divergent_subtrees: vec![],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
/// Test that BloomFilter maps to SnapshotSync (fallback category)
#[test]
fn test_from_primitives_bloom_filter() {
let primitive = PrimitivesSyncProtocol::BloomFilter {
filter_size: 1000,
false_positive_rate: 0.01,
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
/// Test that SubtreePrefetch maps to SnapshotSync (fallback category)
#[test]
fn test_from_primitives_subtree_prefetch() {
let primitive = PrimitivesSyncProtocol::SubtreePrefetch {
subtree_roots: vec![[4; 32]],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
/// Test that LevelWise maps to SnapshotSync (fallback category)
#[test]
fn test_from_primitives_levelwise() {
let primitive = PrimitivesSyncProtocol::LevelWise { max_depth: 2 };
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
/// Test that all 7 primitives variants are covered (exhaustiveness check)
#[test]
fn test_from_primitives_all_variants_covered() {
// This test ensures we handle all variants - if a new variant is added
// to PrimitivesSyncProtocol, this test will fail to compile
let variants: Vec<PrimitivesSyncProtocol> = vec![
PrimitivesSyncProtocol::None,
PrimitivesSyncProtocol::DeltaSync {
missing_delta_ids: vec![],
},
PrimitivesSyncProtocol::HashComparison {
root_hash: [0; 32],
divergent_subtrees: vec![],
},
PrimitivesSyncProtocol::Snapshot {
compressed: false,
verified: false,
},
PrimitivesSyncProtocol::BloomFilter {
filter_size: 0,
false_positive_rate: 0.0,
},
PrimitivesSyncProtocol::SubtreePrefetch {
subtree_roots: vec![],
},
PrimitivesSyncProtocol::LevelWise { max_depth: 0 },
];
// All variants should convert without panic
for variant in &variants {
let _tracking: SyncProtocol = variant.into();
}
assert_eq!(variants.len(), 7, "Expected 7 SyncProtocol variants");
}
}