use eyre::bail;
use libp2p::PeerId;
use tokio::time::{self, Instant};
#[derive(Debug, Clone, Copy)]
pub(crate) enum SyncProtocol {
None,
DagCatchup,
SnapshotSync,
}
impl From<&calimero_node_primitives::sync::SyncProtocol> for SyncProtocol {
fn from(p: &calimero_node_primitives::sync::SyncProtocol) -> Self {
use calimero_node_primitives::sync::SyncProtocol as P;
match p {
P::None => Self::None,
P::DeltaSync { .. } => Self::DagCatchup,
P::Snapshot { .. }
| P::HashComparison { .. }
| P::BloomFilter { .. }
| P::SubtreePrefetch { .. }
| P::LevelWise { .. } => Self::SnapshotSync,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct SyncState {
last_sync: Option<Instant>,
last_peer: Option<PeerId>,
failure_count: u32,
last_error: Option<String>,
pub success_count: u64,
last_protocol: Option<SyncProtocol>,
}
impl SyncState {
pub(crate) fn new() -> Self {
Self {
last_sync: None,
last_peer: None,
failure_count: 0,
last_error: None,
success_count: 0,
last_protocol: None,
}
}
pub(crate) fn start(&mut self) {
self.last_sync = None; }
pub(crate) fn on_success(&mut self, peer: PeerId, protocol: SyncProtocol) {
self.last_sync = Some(Instant::now());
self.last_peer = Some(peer);
self.failure_count = 0;
self.last_error = None;
self.success_count += 1;
self.last_protocol = Some(protocol);
}
pub(crate) fn on_failure(&mut self, error: String) {
self.last_sync = Some(Instant::now()); self.failure_count += 1;
self.last_error = Some(error);
}
pub(crate) fn backoff_delay(&self) -> time::Duration {
let backoff_secs = 2u64.pow(self.failure_count.min(8));
time::Duration::from_secs(backoff_secs.min(300))
}
pub(crate) fn last_sync(&self) -> Option<Instant> {
self.last_sync
}
pub(crate) fn failure_count(&self) -> u32 {
self.failure_count
}
pub(crate) fn take_last_sync(&mut self) -> Option<Instant> {
self.last_sync.take()
}
}
impl Default for SyncState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub(crate) struct Sequencer {
current: u64,
}
impl Sequencer {
pub(crate) fn next(&mut self) -> u64 {
let id = self.current;
self.current += 1;
id
}
pub(crate) fn expect(&mut self, expected: u64) -> eyre::Result<()> {
if self.current != expected {
bail!("sequence error: expected {}, at {}", expected, self.current);
}
self.current += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use calimero_node_primitives::sync::SyncProtocol as PrimitivesSyncProtocol;
#[test]
fn test_from_primitives_none() {
let primitive = PrimitivesSyncProtocol::None;
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::None));
}
#[test]
fn test_from_primitives_delta_sync() {
let primitive = PrimitivesSyncProtocol::DeltaSync {
missing_delta_ids: vec![[1; 32], [2; 32]],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::DagCatchup));
}
#[test]
fn test_from_primitives_snapshot() {
let primitive = PrimitivesSyncProtocol::Snapshot {
compressed: true,
verified: true,
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
#[test]
fn test_from_primitives_hash_comparison() {
let primitive = PrimitivesSyncProtocol::HashComparison {
root_hash: [3; 32],
divergent_subtrees: vec![],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
#[test]
fn test_from_primitives_bloom_filter() {
let primitive = PrimitivesSyncProtocol::BloomFilter {
filter_size: 1000,
false_positive_rate: 0.01,
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
#[test]
fn test_from_primitives_subtree_prefetch() {
let primitive = PrimitivesSyncProtocol::SubtreePrefetch {
subtree_roots: vec![[4; 32]],
};
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
#[test]
fn test_from_primitives_levelwise() {
let primitive = PrimitivesSyncProtocol::LevelWise { max_depth: 2 };
let tracking: SyncProtocol = (&primitive).into();
assert!(matches!(tracking, SyncProtocol::SnapshotSync));
}
#[test]
fn test_from_primitives_all_variants_covered() {
let variants: Vec<PrimitivesSyncProtocol> = vec![
PrimitivesSyncProtocol::None,
PrimitivesSyncProtocol::DeltaSync {
missing_delta_ids: vec![],
},
PrimitivesSyncProtocol::HashComparison {
root_hash: [0; 32],
divergent_subtrees: vec![],
},
PrimitivesSyncProtocol::Snapshot {
compressed: false,
verified: false,
},
PrimitivesSyncProtocol::BloomFilter {
filter_size: 0,
false_positive_rate: 0.0,
},
PrimitivesSyncProtocol::SubtreePrefetch {
subtree_roots: vec![],
},
PrimitivesSyncProtocol::LevelWise { max_depth: 0 },
];
for variant in &variants {
let _tracking: SyncProtocol = variant.into();
}
assert_eq!(variants.len(), 7, "Expected 7 SyncProtocol variants");
}
}