1use std::collections::{HashMap, VecDeque};
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::discovery::PeerInfo;
20use crate::protocol::{
21 generate_message_id, BlocksMessage, GetBlocksMessage, GetHeadersMessage, GetSnapshotMessage,
22 HeadersMessage, Message, MessageId, PeerId, SnapshotMessage,
23};
24use moloch_core::block::{Block, BlockHash, BlockHeader};
25use moloch_core::crypto::Hash;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
29#[serde(rename_all = "snake_case")]
30pub enum SyncMode {
31 Full,
33 #[default]
35 Fast,
36 Snap,
38 CatchUp,
40 Warp,
42}
43
44impl std::fmt::Display for SyncMode {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 SyncMode::Full => write!(f, "full"),
48 SyncMode::Fast => write!(f, "fast"),
49 SyncMode::Snap => write!(f, "snap"),
50 SyncMode::CatchUp => write!(f, "catch-up"),
51 SyncMode::Warp => write!(f, "warp"),
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SyncStatus {
59 pub mode: SyncMode,
61 pub state: SyncState,
63 pub local_height: Option<u64>,
65 pub target_height: Option<u64>,
67 pub blocks_per_second: f64,
69 pub eta_seconds: Option<u64>,
71 pub sync_peers: usize,
73 pub started_at: Option<DateTime<Utc>>,
75 pub progress: f64,
77}
78
79impl Default for SyncStatus {
80 fn default() -> Self {
81 Self {
82 mode: SyncMode::default(),
83 state: SyncState::Idle,
84 local_height: None,
85 target_height: None,
86 blocks_per_second: 0.0,
87 eta_seconds: None,
88 sync_peers: 0,
89 started_at: None,
90 progress: 0.0,
91 }
92 }
93}
94
95impl SyncStatus {
96 pub fn is_syncing(&self) -> bool {
98 matches!(
99 self.state,
100 SyncState::Downloading | SyncState::Verifying | SyncState::Applying
101 )
102 }
103
104 pub fn is_synced(&self) -> bool {
106 self.state == SyncState::Synced
107 }
108
109 pub fn calculate_progress(&mut self) {
111 match (self.local_height, self.target_height) {
112 (Some(local), Some(target)) if target > 0 => {
113 self.progress = (local as f64 / target as f64) * 100.0;
114 }
115 _ => self.progress = 0.0,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub enum SyncState {
124 Idle,
126 FindingPeers,
128 DownloadingHeaders,
130 Downloading,
132 Verifying,
134 Applying,
136 Synced,
138 Failed,
140 Paused,
142}
143
144impl std::fmt::Display for SyncState {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 match self {
147 SyncState::Idle => write!(f, "idle"),
148 SyncState::FindingPeers => write!(f, "finding_peers"),
149 SyncState::DownloadingHeaders => write!(f, "downloading_headers"),
150 SyncState::Downloading => write!(f, "downloading"),
151 SyncState::Verifying => write!(f, "verifying"),
152 SyncState::Applying => write!(f, "applying"),
153 SyncState::Synced => write!(f, "synced"),
154 SyncState::Failed => write!(f, "failed"),
155 SyncState::Paused => write!(f, "paused"),
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct SyncConfig {
163 pub mode: SyncMode,
165 pub batch_size: u32,
167 pub max_concurrent_requests: usize,
169 pub request_timeout: Duration,
171 pub max_retries: u32,
173 pub min_peers: usize,
175 pub sync_threshold: u64,
177 pub checkpoint: Option<Checkpoint>,
179 pub header_first: bool,
181}
182
183impl Default for SyncConfig {
184 fn default() -> Self {
185 Self {
186 mode: SyncMode::Fast,
187 batch_size: 100,
188 max_concurrent_requests: 4,
189 request_timeout: Duration::from_secs(30),
190 max_retries: 3,
191 min_peers: 1,
192 sync_threshold: 10,
193 checkpoint: None,
194 header_first: true,
195 }
196 }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct Checkpoint {
202 pub height: u64,
204 pub hash: BlockHash,
206 pub mmr_root: Hash,
208 pub created_at: DateTime<Utc>,
210}
211
212#[derive(Debug)]
214#[allow(dead_code)]
215struct PendingRequest {
216 id: MessageId,
218 peer: PeerId,
220 kind: RequestKind,
222 sent_at: Instant,
224 retries: u32,
226}
227
228#[derive(Debug, Clone)]
230#[allow(dead_code)]
231enum RequestKind {
232 Headers { start: u64, count: u32 },
233 Blocks { start: u64, count: u32 },
234 Snapshot { height: Option<u64> },
235}
236
237#[derive(Debug, Clone)]
239struct SyncRange {
240 start: u64,
242 end: u64,
244 peer: Option<PeerId>,
246 request_id: Option<MessageId>,
248 retries: u32,
250}
251
252#[derive(Debug)]
254pub struct SyncManager {
255 config: SyncConfig,
257 status: RwLock<SyncStatus>,
259 pending_requests: RwLock<HashMap<MessageId, PendingRequest>>,
261 sync_ranges: RwLock<VecDeque<SyncRange>>,
263 block_buffer: RwLock<HashMap<u64, Block>>,
265 header_buffer: RwLock<HashMap<u64, BlockHeader>>,
267 peer_heights: RwLock<HashMap<PeerId, u64>>,
269 synced_count: std::sync::atomic::AtomicU64,
271 sync_start: RwLock<Option<Instant>>,
273}
274
275impl SyncManager {
276 pub fn new(config: SyncConfig) -> Self {
278 Self {
279 config,
280 status: RwLock::new(SyncStatus::default()),
281 pending_requests: RwLock::new(HashMap::new()),
282 sync_ranges: RwLock::new(VecDeque::new()),
283 block_buffer: RwLock::new(HashMap::new()),
284 header_buffer: RwLock::new(HashMap::new()),
285 peer_heights: RwLock::new(HashMap::new()),
286 synced_count: std::sync::atomic::AtomicU64::new(0),
287 sync_start: RwLock::new(None),
288 }
289 }
290
291 pub fn config(&self) -> &SyncConfig {
293 &self.config
294 }
295
296 pub async fn status(&self) -> SyncStatus {
298 let mut status = self.status.read().await.clone();
299 status.calculate_progress();
300 self.update_rate(&mut status).await;
301 status
302 }
303
304 async fn update_rate(&self, status: &mut SyncStatus) {
306 let sync_start = self.sync_start.read().await;
307 if let Some(start) = *sync_start {
308 let elapsed = start.elapsed().as_secs_f64();
309 if elapsed > 0.0 {
310 let synced = self.synced_count.load(std::sync::atomic::Ordering::Relaxed);
311 status.blocks_per_second = synced as f64 / elapsed;
312
313 if let (Some(local), Some(target)) = (status.local_height, status.target_height) {
315 if status.blocks_per_second > 0.0 && target > local {
316 let remaining = target - local;
317 status.eta_seconds =
318 Some((remaining as f64 / status.blocks_per_second) as u64);
319 }
320 }
321 }
322 }
323 }
324
325 pub async fn needs_sync(&self, local_height: Option<u64>) -> bool {
327 let peer_heights = self.peer_heights.read().await;
328
329 if peer_heights.is_empty() {
330 return false;
331 }
332
333 let max_peer_height = peer_heights.values().copied().max().unwrap_or(0);
334 let local = local_height.unwrap_or(0);
335
336 max_peer_height > local + self.config.sync_threshold
337 }
338
339 pub async fn update_peer_height(&self, peer: PeerId, height: u64) {
341 let mut heights = self.peer_heights.write().await;
342 heights.insert(peer, height);
343
344 let max_height = heights.values().copied().max().unwrap_or(0);
346 let mut status = self.status.write().await;
347 status.target_height = Some(max_height);
348 }
349
350 pub async fn remove_peer(&self, peer: &PeerId) {
352 self.peer_heights.write().await.remove(peer);
353
354 let mut pending = self.pending_requests.write().await;
356 let to_remove: Vec<_> = pending
357 .iter()
358 .filter(|(_, req)| &req.peer == peer)
359 .map(|(id, _)| *id)
360 .collect();
361
362 for id in to_remove {
363 pending.remove(&id);
364 }
365
366 let mut ranges = self.sync_ranges.write().await;
368 for range in ranges.iter_mut() {
369 if range.peer.as_ref() == Some(peer) {
370 range.peer = None;
371 range.request_id = None;
372 }
373 }
374 }
375
376 pub async fn start_sync(&self, from_height: u64, to_height: u64) {
378 info!("Starting sync from {} to {}", from_height, to_height);
379
380 let mut status = self.status.write().await;
381 status.state = SyncState::Downloading;
382 status.local_height = Some(from_height);
383 status.target_height = Some(to_height);
384 status.started_at = Some(Utc::now());
385 drop(status);
386
387 let mut ranges = self.sync_ranges.write().await;
389 ranges.clear();
390
391 let batch_size = self.config.batch_size as u64;
392 let mut start = from_height;
393
394 while start < to_height {
395 let end = (start + batch_size).min(to_height);
396 ranges.push_back(SyncRange {
397 start,
398 end,
399 peer: None,
400 request_id: None,
401 retries: 0,
402 });
403 start = end;
404 }
405
406 *self.sync_start.write().await = Some(Instant::now());
407 self.synced_count
408 .store(0, std::sync::atomic::Ordering::SeqCst);
409 }
410
411 pub async fn pause_sync(&self) {
413 let mut status = self.status.write().await;
414 if status.is_syncing() {
415 status.state = SyncState::Paused;
416 }
417 }
418
419 pub async fn resume_sync(&self) {
421 let mut status = self.status.write().await;
422 if status.state == SyncState::Paused {
423 status.state = SyncState::Downloading;
424 }
425 }
426
427 pub async fn next_request(&self, available_peers: &[PeerInfo]) -> Option<(PeerId, Message)> {
431 let status = self.status.read().await;
432 if !status.is_syncing() {
433 return None;
434 }
435 drop(status);
436
437 let pending = self.pending_requests.read().await;
439 if pending.len() >= self.config.max_concurrent_requests {
440 return None;
441 }
442 drop(pending);
443
444 let mut ranges = self.sync_ranges.write().await;
446
447 for range in ranges.iter_mut() {
448 if range.peer.is_some() {
449 continue; }
451
452 let heights = self.peer_heights.read().await;
454 let suitable_peer = available_peers
455 .iter()
456 .find(|p| heights.get(&p.id).map(|h| *h >= range.end).unwrap_or(false));
457
458 if let Some(peer) = suitable_peer {
459 let message_id = generate_message_id();
460
461 let message = if self.config.header_first {
462 Message::GetHeaders(GetHeadersMessage {
463 id: message_id,
464 start_height: range.start,
465 count: (range.end - range.start) as u32,
466 })
467 } else {
468 Message::GetBlocks(GetBlocksMessage {
469 id: message_id,
470 start_height: range.start,
471 count: (range.end - range.start) as u32,
472 })
473 };
474
475 range.peer = Some(peer.id.clone());
476 range.request_id = Some(message_id);
477
478 let mut pending = self.pending_requests.write().await;
480 pending.insert(
481 message_id,
482 PendingRequest {
483 id: message_id,
484 peer: peer.id.clone(),
485 kind: if self.config.header_first {
486 RequestKind::Headers {
487 start: range.start,
488 count: (range.end - range.start) as u32,
489 }
490 } else {
491 RequestKind::Blocks {
492 start: range.start,
493 count: (range.end - range.start) as u32,
494 }
495 },
496 sent_at: Instant::now(),
497 retries: range.retries,
498 },
499 );
500
501 return Some((peer.id.clone(), message));
502 }
503 }
504
505 None
506 }
507
508 pub async fn handle_blocks(&self, response: BlocksMessage) -> Result<Vec<Block>, SyncError> {
510 let mut pending = self.pending_requests.write().await;
512 let request = pending.remove(&response.request_id);
513 drop(pending);
514
515 if request.is_none() {
516 return Err(SyncError::UnexpectedResponse(response.request_id));
517 }
518
519 let mut buffer = self.block_buffer.write().await;
521 let mut received = Vec::new();
522
523 for block in response.blocks {
524 let height = block.header.height;
525 buffer.insert(height, block.clone());
526 received.push(block);
527 }
528
529 self.synced_count
531 .fetch_add(received.len() as u64, std::sync::atomic::Ordering::SeqCst);
532
533 Ok(received)
534 }
535
536 pub async fn handle_headers(
538 &self,
539 response: HeadersMessage,
540 ) -> Result<Vec<BlockHeader>, SyncError> {
541 let mut pending = self.pending_requests.write().await;
543 let request = pending.remove(&response.request_id);
544 drop(pending);
545
546 if request.is_none() {
547 return Err(SyncError::UnexpectedResponse(response.request_id));
548 }
549
550 let mut buffer = self.header_buffer.write().await;
552 let mut received = Vec::new();
553
554 for header in response.headers {
555 let height = header.height;
556 buffer.insert(height, header.clone());
557 received.push(header);
558 }
559
560 Ok(received)
561 }
562
563 pub async fn handle_snapshot(&self, response: SnapshotMessage) -> Result<(), SyncError> {
565 let mut pending = self.pending_requests.write().await;
566 let request = pending.remove(&response.request_id);
567 drop(pending);
568
569 if request.is_none() {
570 return Err(SyncError::UnexpectedResponse(response.request_id));
571 }
572
573 let mut status = self.status.write().await;
575 status.local_height = Some(response.height);
576
577 info!(
578 "Received snapshot at height {} with {} events",
579 response.height, response.event_count
580 );
581
582 Ok(())
583 }
584
585 pub async fn get_ready_blocks(&self, current_height: u64) -> Vec<Block> {
587 let mut buffer = self.block_buffer.write().await;
588 let mut ready = Vec::new();
589
590 let mut next_height = current_height + 1;
591 while let Some(block) = buffer.remove(&next_height) {
592 ready.push(block);
593 next_height += 1;
594 }
595
596 ready
597 }
598
599 pub async fn complete_range(&self, start: u64, end: u64) {
601 let mut ranges = self.sync_ranges.write().await;
602 ranges.retain(|r| !(r.start == start && r.end == end));
603
604 if ranges.is_empty() {
606 drop(ranges);
607 let mut status = self.status.write().await;
608 status.state = SyncState::Synced;
609 info!("Sync complete");
610 }
611 }
612
613 pub async fn handle_timeout(&self, request_id: MessageId) {
615 let mut pending = self.pending_requests.write().await;
616
617 if let Some(request) = pending.remove(&request_id) {
618 warn!("Request {} to {} timed out", request_id, request.peer);
619
620 let mut ranges = self.sync_ranges.write().await;
622 for range in ranges.iter_mut() {
623 if range.request_id == Some(request_id) {
624 range.peer = None;
625 range.request_id = None;
626 range.retries += 1;
627
628 if range.retries > self.config.max_retries {
629 warn!("Range {}-{} exceeded max retries", range.start, range.end);
630 }
631 break;
632 }
633 }
634 }
635 }
636
637 pub async fn get_timed_out_requests(&self) -> Vec<MessageId> {
639 let pending = self.pending_requests.read().await;
640 let now = Instant::now();
641
642 pending
643 .iter()
644 .filter(|(_, req)| now.duration_since(req.sent_at) > self.config.request_timeout)
645 .map(|(id, _)| *id)
646 .collect()
647 }
648
649 pub fn create_snapshot_request(&self, height: Option<u64>) -> (MessageId, Message) {
651 let id = generate_message_id();
652 let msg = Message::GetSnapshot(GetSnapshotMessage { id, height });
653 (id, msg)
654 }
655
656 pub fn create_blocks_request(&self, start: u64, count: u32) -> (MessageId, Message) {
658 let id = generate_message_id();
659 let msg = Message::GetBlocks(GetBlocksMessage {
660 id,
661 start_height: start,
662 count,
663 });
664 (id, msg)
665 }
666
667 pub fn create_headers_request(&self, start: u64, count: u32) -> (MessageId, Message) {
669 let id = generate_message_id();
670 let msg = Message::GetHeaders(GetHeadersMessage {
671 id,
672 start_height: start,
673 count,
674 });
675 (id, msg)
676 }
677
678 pub async fn stats(&self) -> SyncStats {
680 let status = self.status.read().await;
681 let pending = self.pending_requests.read().await;
682 let ranges = self.sync_ranges.read().await;
683 let block_buffer = self.block_buffer.read().await;
684 let header_buffer = self.header_buffer.read().await;
685 let peer_heights = self.peer_heights.read().await;
686
687 SyncStats {
688 state: status.state,
689 mode: status.mode,
690 local_height: status.local_height,
691 target_height: status.target_height,
692 pending_requests: pending.len(),
693 remaining_ranges: ranges.len(),
694 buffered_blocks: block_buffer.len(),
695 buffered_headers: header_buffer.len(),
696 known_peers: peer_heights.len(),
697 blocks_synced: self.synced_count.load(std::sync::atomic::Ordering::Relaxed),
698 blocks_per_second: status.blocks_per_second,
699 }
700 }
701}
702
703#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct SyncStats {
706 pub state: SyncState,
708 pub mode: SyncMode,
710 pub local_height: Option<u64>,
712 pub target_height: Option<u64>,
714 pub pending_requests: usize,
716 pub remaining_ranges: usize,
718 pub buffered_blocks: usize,
720 pub buffered_headers: usize,
722 pub known_peers: usize,
724 pub blocks_synced: u64,
726 pub blocks_per_second: f64,
728}
729
730#[derive(Debug, thiserror::Error)]
732pub enum SyncError {
733 #[error("unexpected response for request {0}")]
734 UnexpectedResponse(MessageId),
735
736 #[error("request timed out: {0}")]
737 Timeout(MessageId),
738
739 #[error("not enough peers: have {have}, need {need}")]
740 NotEnoughPeers { have: usize, need: usize },
741
742 #[error("invalid block at height {0}")]
743 InvalidBlock(u64),
744
745 #[error("chain mismatch at height {0}")]
746 ChainMismatch(u64),
747
748 #[error("sync cancelled")]
749 Cancelled,
750
751 #[error("peer error: {0}")]
752 PeerError(String),
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use moloch_core::crypto::SecretKey;
759
760 fn test_peer_id() -> PeerId {
761 crate::protocol::PeerId::new(SecretKey::generate().public_key())
762 }
763
764 fn test_peer_info(height: u64) -> PeerInfo {
765 use crate::discovery::{DiscoverySource, PeerMetadata, PeerScore, PeerState};
766
767 PeerInfo {
768 id: test_peer_id(),
769 addresses: vec!["127.0.0.1:8000".parse().unwrap()],
770 state: PeerState::Connected,
771 score: PeerScore::default(),
772 first_seen: Utc::now(),
773 last_seen: Some(Utc::now()),
774 connection_successes: 1,
775 connection_failures: 0,
776 source: DiscoverySource::Static,
777 metadata: PeerMetadata {
778 height: Some(height),
779 ..Default::default()
780 },
781 }
782 }
783
784 #[test]
785 fn test_sync_mode_display() {
786 assert_eq!(format!("{}", SyncMode::Fast), "fast");
787 assert_eq!(format!("{}", SyncMode::Snap), "snap");
788 }
789
790 #[test]
791 fn test_sync_state_display() {
792 assert_eq!(format!("{}", SyncState::Downloading), "downloading");
793 assert_eq!(format!("{}", SyncState::Synced), "synced");
794 }
795
796 #[test]
797 fn test_sync_status_default() {
798 let status = SyncStatus::default();
799 assert!(!status.is_syncing());
800 assert!(!status.is_synced());
801 assert_eq!(status.progress, 0.0);
802 }
803
804 #[test]
805 fn test_sync_status_progress() {
806 let mut status = SyncStatus {
807 local_height: Some(50),
808 target_height: Some(100),
809 ..Default::default()
810 };
811
812 status.calculate_progress();
813 assert_eq!(status.progress, 50.0);
814 }
815
816 #[tokio::test]
817 async fn test_sync_manager_creation() {
818 let config = SyncConfig::default();
819 let manager = SyncManager::new(config);
820
821 let status = manager.status().await;
822 assert_eq!(status.state, SyncState::Idle);
823 }
824
825 #[tokio::test]
826 async fn test_sync_manager_peer_heights() {
827 let config = SyncConfig::default();
828 let manager = SyncManager::new(config);
829
830 let peer1 = test_peer_id();
831 let peer2 = test_peer_id();
832
833 manager.update_peer_height(peer1.clone(), 100).await;
834 manager.update_peer_height(peer2.clone(), 200).await;
835
836 let status = manager.status().await;
837 assert_eq!(status.target_height, Some(200));
838
839 manager.remove_peer(&peer2).await;
841 let status = manager.status().await;
842 assert_eq!(status.target_height, Some(200)); }
844
845 #[tokio::test]
846 async fn test_sync_manager_needs_sync() {
847 let config = SyncConfig {
848 sync_threshold: 10,
849 ..Default::default()
850 };
851 let manager = SyncManager::new(config);
852
853 assert!(!manager.needs_sync(Some(50)).await);
855
856 let peer = test_peer_id();
858 manager.update_peer_height(peer, 100).await;
859
860 assert!(manager.needs_sync(Some(50)).await);
862
863 assert!(!manager.needs_sync(Some(95)).await);
865 }
866
867 #[tokio::test]
868 async fn test_sync_manager_start_sync() {
869 let config = SyncConfig::default();
870 let manager = SyncManager::new(config);
871
872 manager.start_sync(0, 1000).await;
873
874 let status = manager.status().await;
875 assert_eq!(status.state, SyncState::Downloading);
876 assert_eq!(status.local_height, Some(0));
877 assert_eq!(status.target_height, Some(1000));
878
879 let ranges = manager.sync_ranges.read().await;
881 assert!(!ranges.is_empty());
882 }
883
884 #[tokio::test]
885 async fn test_sync_manager_pause_resume() {
886 let config = SyncConfig::default();
887 let manager = SyncManager::new(config);
888
889 manager.start_sync(0, 100).await;
890
891 manager.pause_sync().await;
893 let status = manager.status().await;
894 assert_eq!(status.state, SyncState::Paused);
895
896 manager.resume_sync().await;
898 let status = manager.status().await;
899 assert_eq!(status.state, SyncState::Downloading);
900 }
901
902 #[tokio::test]
903 async fn test_sync_manager_next_request() {
904 let config = SyncConfig::default();
905 let manager = SyncManager::new(config);
906
907 let request = manager.next_request(&[]).await;
909 assert!(request.is_none());
910
911 manager.start_sync(0, 100).await;
913
914 let request = manager.next_request(&[]).await;
916 assert!(request.is_none());
917
918 let peer = test_peer_info(100);
920 manager.update_peer_height(peer.id.clone(), 100).await;
921
922 let request = manager.next_request(&[peer]).await;
923 assert!(request.is_some());
924 }
925
926 #[tokio::test]
927 async fn test_sync_manager_handle_blocks() {
928 let config = SyncConfig::default();
929 let manager = SyncManager::new(config);
930
931 let response = BlocksMessage {
933 request_id: 999,
934 blocks: vec![],
935 has_more: false,
936 };
937 let result = manager.handle_blocks(response).await;
938 assert!(matches!(result, Err(SyncError::UnexpectedResponse(999))));
939 }
940
941 #[tokio::test]
942 async fn test_sync_manager_create_requests() {
943 let config = SyncConfig::default();
944 let manager = SyncManager::new(config);
945
946 let (id1, msg1) = manager.create_blocks_request(0, 100);
947 assert!(matches!(msg1, Message::GetBlocks(_)));
948
949 let (id2, msg2) = manager.create_headers_request(100, 50);
950 assert!(matches!(msg2, Message::GetHeaders(_)));
951
952 let (id3, msg3) = manager.create_snapshot_request(Some(500));
953 assert!(matches!(msg3, Message::GetSnapshot(_)));
954
955 assert_ne!(id1, id2);
957 assert_ne!(id2, id3);
958 }
959
960 #[tokio::test]
961 async fn test_sync_manager_stats() {
962 let config = SyncConfig::default();
963 let manager = SyncManager::new(config);
964
965 manager.start_sync(0, 100).await;
966
967 let stats = manager.stats().await;
968 assert_eq!(stats.state, SyncState::Downloading);
969 assert!(stats.remaining_ranges > 0);
970 assert_eq!(stats.buffered_blocks, 0);
971 }
972
973 #[tokio::test]
974 async fn test_sync_manager_timeout() {
975 let config = SyncConfig::default();
976 let manager = SyncManager::new(config);
977
978 let request_id = generate_message_id();
980 {
981 let mut pending = manager.pending_requests.write().await;
982 pending.insert(
983 request_id,
984 PendingRequest {
985 id: request_id,
986 peer: test_peer_id(),
987 kind: RequestKind::Blocks {
988 start: 0,
989 count: 100,
990 },
991 sent_at: Instant::now() - Duration::from_secs(60),
992 retries: 0,
993 },
994 );
995 }
996
997 let timed_out = manager.get_timed_out_requests().await;
999 assert_eq!(timed_out.len(), 1);
1000 assert_eq!(timed_out[0], request_id);
1001
1002 manager.handle_timeout(request_id).await;
1004 let pending = manager.pending_requests.read().await;
1005 assert!(!pending.contains_key(&request_id));
1006 }
1007
1008 #[tokio::test]
1009 async fn test_sync_manager_complete_range() {
1010 let config = SyncConfig::default();
1011 let manager = SyncManager::new(config);
1012
1013 manager.start_sync(0, 100).await;
1014
1015 let initial_count = manager.sync_ranges.read().await.len();
1016 assert!(initial_count > 0);
1017
1018 manager.complete_range(0, 100).await;
1020
1021 let status = manager.status().await;
1022 assert_eq!(status.state, SyncState::Synced);
1023 }
1024
1025 #[test]
1026 fn test_checkpoint() {
1027 let checkpoint = Checkpoint {
1028 height: 1000,
1029 hash: moloch_core::block::BlockHash(moloch_core::crypto::hash(b"block")),
1030 mmr_root: moloch_core::crypto::hash(b"mmr"),
1031 created_at: Utc::now(),
1032 };
1033
1034 assert_eq!(checkpoint.height, 1000);
1035 }
1036}