Skip to main content

moloch_net/
sync.rs

1//! Chain synchronization protocol for Moloch.
2//!
3//! Supports multiple sync modes:
4//! - Fast sync: Download blocks and verify MMR
5//! - Snap sync: Download state snapshot
6//! - Catch-up sync: Fill gaps in the chain
7//! - Warp sync: Skip to a recent checkpoint
8//!
9//! The sync manager coordinates with peers to efficiently sync the chain.
10
11use std::collections::{HashMap, VecDeque};
12use std::time::{Duration, Instant};
13
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::discovery::PeerInfo;
20use crate::protocol::{
21    generate_message_id, BlocksMessage, GetBlocksMessage, GetHeadersMessage, GetSnapshotMessage,
22    HeadersMessage, Message, MessageId, PeerId, SnapshotMessage,
23};
24use moloch_core::block::{Block, BlockHash, BlockHeader};
25use moloch_core::crypto::Hash;
26
27/// Synchronization mode.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
29#[serde(rename_all = "snake_case")]
30pub enum SyncMode {
31    /// Full sync from genesis.
32    Full,
33    /// Fast sync: download blocks and verify MMR proofs.
34    #[default]
35    Fast,
36    /// Snap sync: download recent state snapshot.
37    Snap,
38    /// Catch-up: fill gaps and stay current.
39    CatchUp,
40    /// Warp sync: skip to recent checkpoint.
41    Warp,
42}
43
44impl std::fmt::Display for SyncMode {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            SyncMode::Full => write!(f, "full"),
48            SyncMode::Fast => write!(f, "fast"),
49            SyncMode::Snap => write!(f, "snap"),
50            SyncMode::CatchUp => write!(f, "catch-up"),
51            SyncMode::Warp => write!(f, "warp"),
52        }
53    }
54}
55
56/// Current sync status.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct SyncStatus {
59    /// Current sync mode.
60    pub mode: SyncMode,
61    /// Current sync state.
62    pub state: SyncState,
63    /// Local chain height.
64    pub local_height: Option<u64>,
65    /// Target chain height (highest seen).
66    pub target_height: Option<u64>,
67    /// Blocks per second (recent rate).
68    pub blocks_per_second: f64,
69    /// Estimated time to sync.
70    pub eta_seconds: Option<u64>,
71    /// Number of peers syncing with.
72    pub sync_peers: usize,
73    /// When sync started.
74    pub started_at: Option<DateTime<Utc>>,
75    /// Progress percentage.
76    pub progress: f64,
77}
78
79impl Default for SyncStatus {
80    fn default() -> Self {
81        Self {
82            mode: SyncMode::default(),
83            state: SyncState::Idle,
84            local_height: None,
85            target_height: None,
86            blocks_per_second: 0.0,
87            eta_seconds: None,
88            sync_peers: 0,
89            started_at: None,
90            progress: 0.0,
91        }
92    }
93}
94
95impl SyncStatus {
96    /// Check if sync is in progress.
97    pub fn is_syncing(&self) -> bool {
98        matches!(
99            self.state,
100            SyncState::Downloading | SyncState::Verifying | SyncState::Applying
101        )
102    }
103
104    /// Check if sync is complete.
105    pub fn is_synced(&self) -> bool {
106        self.state == SyncState::Synced
107    }
108
109    /// Calculate progress percentage.
110    pub fn calculate_progress(&mut self) {
111        match (self.local_height, self.target_height) {
112            (Some(local), Some(target)) if target > 0 => {
113                self.progress = (local as f64 / target as f64) * 100.0;
114            }
115            _ => self.progress = 0.0,
116        }
117    }
118}
119
120/// State of the sync process.
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub enum SyncState {
124    /// Not syncing.
125    Idle,
126    /// Finding peers.
127    FindingPeers,
128    /// Downloading headers.
129    DownloadingHeaders,
130    /// Downloading blocks.
131    Downloading,
132    /// Verifying downloaded data.
133    Verifying,
134    /// Applying blocks to chain.
135    Applying,
136    /// Fully synced.
137    Synced,
138    /// Sync failed.
139    Failed,
140    /// Sync paused.
141    Paused,
142}
143
144impl std::fmt::Display for SyncState {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        match self {
147            SyncState::Idle => write!(f, "idle"),
148            SyncState::FindingPeers => write!(f, "finding_peers"),
149            SyncState::DownloadingHeaders => write!(f, "downloading_headers"),
150            SyncState::Downloading => write!(f, "downloading"),
151            SyncState::Verifying => write!(f, "verifying"),
152            SyncState::Applying => write!(f, "applying"),
153            SyncState::Synced => write!(f, "synced"),
154            SyncState::Failed => write!(f, "failed"),
155            SyncState::Paused => write!(f, "paused"),
156        }
157    }
158}
159
160/// Configuration for sync manager.
161#[derive(Debug, Clone)]
162pub struct SyncConfig {
163    /// Sync mode to use.
164    pub mode: SyncMode,
165    /// Maximum blocks per request.
166    pub batch_size: u32,
167    /// Maximum concurrent requests.
168    pub max_concurrent_requests: usize,
169    /// Request timeout.
170    pub request_timeout: Duration,
171    /// Maximum retries per request.
172    pub max_retries: u32,
173    /// Minimum peers required to sync.
174    pub min_peers: usize,
175    /// How far behind before starting sync.
176    pub sync_threshold: u64,
177    /// Checkpoint to sync to (for warp sync).
178    pub checkpoint: Option<Checkpoint>,
179    /// Enable header-first sync.
180    pub header_first: bool,
181}
182
183impl Default for SyncConfig {
184    fn default() -> Self {
185        Self {
186            mode: SyncMode::Fast,
187            batch_size: 100,
188            max_concurrent_requests: 4,
189            request_timeout: Duration::from_secs(30),
190            max_retries: 3,
191            min_peers: 1,
192            sync_threshold: 10,
193            checkpoint: None,
194            header_first: true,
195        }
196    }
197}
198
199/// A sync checkpoint (trusted state).
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct Checkpoint {
202    /// Block height.
203    pub height: u64,
204    /// Block hash at this height.
205    pub hash: BlockHash,
206    /// MMR root at this height.
207    pub mmr_root: Hash,
208    /// When this checkpoint was created.
209    pub created_at: DateTime<Utc>,
210}
211
212/// A pending sync request.
213#[derive(Debug)]
214#[allow(dead_code)]
215struct PendingRequest {
216    /// Request ID.
217    id: MessageId,
218    /// Peer the request was sent to.
219    peer: PeerId,
220    /// Request type.
221    kind: RequestKind,
222    /// When the request was sent.
223    sent_at: Instant,
224    /// Retry count.
225    retries: u32,
226}
227
228/// Type of sync request.
229#[derive(Debug, Clone)]
230#[allow(dead_code)]
231enum RequestKind {
232    Headers { start: u64, count: u32 },
233    Blocks { start: u64, count: u32 },
234    Snapshot { height: Option<u64> },
235}
236
237/// A block range being synced.
238#[derive(Debug, Clone)]
239struct SyncRange {
240    /// Start height (inclusive).
241    start: u64,
242    /// End height (exclusive).
243    end: u64,
244    /// Peer assigned to this range.
245    peer: Option<PeerId>,
246    /// Request ID if in progress.
247    request_id: Option<MessageId>,
248    /// Number of retries.
249    retries: u32,
250}
251
252/// Sync manager coordinates chain synchronization.
253#[derive(Debug)]
254pub struct SyncManager {
255    /// Configuration.
256    config: SyncConfig,
257    /// Current status.
258    status: RwLock<SyncStatus>,
259    /// Pending requests.
260    pending_requests: RwLock<HashMap<MessageId, PendingRequest>>,
261    /// Ranges being synced.
262    sync_ranges: RwLock<VecDeque<SyncRange>>,
263    /// Downloaded blocks (waiting to be applied).
264    block_buffer: RwLock<HashMap<u64, Block>>,
265    /// Downloaded headers (for header-first sync).
266    header_buffer: RwLock<HashMap<u64, BlockHeader>>,
267    /// Peers and their reported heights.
268    peer_heights: RwLock<HashMap<PeerId, u64>>,
269    /// Blocks successfully synced.
270    synced_count: std::sync::atomic::AtomicU64,
271    /// Sync start time (for rate calculation).
272    sync_start: RwLock<Option<Instant>>,
273}
274
275impl SyncManager {
276    /// Create a new sync manager.
277    pub fn new(config: SyncConfig) -> Self {
278        Self {
279            config,
280            status: RwLock::new(SyncStatus::default()),
281            pending_requests: RwLock::new(HashMap::new()),
282            sync_ranges: RwLock::new(VecDeque::new()),
283            block_buffer: RwLock::new(HashMap::new()),
284            header_buffer: RwLock::new(HashMap::new()),
285            peer_heights: RwLock::new(HashMap::new()),
286            synced_count: std::sync::atomic::AtomicU64::new(0),
287            sync_start: RwLock::new(None),
288        }
289    }
290
291    /// Get the configuration.
292    pub fn config(&self) -> &SyncConfig {
293        &self.config
294    }
295
296    /// Get the current sync status.
297    pub async fn status(&self) -> SyncStatus {
298        let mut status = self.status.read().await.clone();
299        status.calculate_progress();
300        self.update_rate(&mut status).await;
301        status
302    }
303
304    /// Update sync rate statistics.
305    async fn update_rate(&self, status: &mut SyncStatus) {
306        let sync_start = self.sync_start.read().await;
307        if let Some(start) = *sync_start {
308            let elapsed = start.elapsed().as_secs_f64();
309            if elapsed > 0.0 {
310                let synced = self.synced_count.load(std::sync::atomic::Ordering::Relaxed);
311                status.blocks_per_second = synced as f64 / elapsed;
312
313                // Calculate ETA
314                if let (Some(local), Some(target)) = (status.local_height, status.target_height) {
315                    if status.blocks_per_second > 0.0 && target > local {
316                        let remaining = target - local;
317                        status.eta_seconds =
318                            Some((remaining as f64 / status.blocks_per_second) as u64);
319                    }
320                }
321            }
322        }
323    }
324
325    /// Check if we need to sync.
326    pub async fn needs_sync(&self, local_height: Option<u64>) -> bool {
327        let peer_heights = self.peer_heights.read().await;
328
329        if peer_heights.is_empty() {
330            return false;
331        }
332
333        let max_peer_height = peer_heights.values().copied().max().unwrap_or(0);
334        let local = local_height.unwrap_or(0);
335
336        max_peer_height > local + self.config.sync_threshold
337    }
338
339    /// Update a peer's reported height.
340    pub async fn update_peer_height(&self, peer: PeerId, height: u64) {
341        let mut heights = self.peer_heights.write().await;
342        heights.insert(peer, height);
343
344        // Update target height
345        let max_height = heights.values().copied().max().unwrap_or(0);
346        let mut status = self.status.write().await;
347        status.target_height = Some(max_height);
348    }
349
350    /// Remove a peer (on disconnect).
351    pub async fn remove_peer(&self, peer: &PeerId) {
352        self.peer_heights.write().await.remove(peer);
353
354        // Cancel pending requests from this peer
355        let mut pending = self.pending_requests.write().await;
356        let to_remove: Vec<_> = pending
357            .iter()
358            .filter(|(_, req)| &req.peer == peer)
359            .map(|(id, _)| *id)
360            .collect();
361
362        for id in to_remove {
363            pending.remove(&id);
364        }
365
366        // Mark ranges assigned to this peer as unassigned
367        let mut ranges = self.sync_ranges.write().await;
368        for range in ranges.iter_mut() {
369            if range.peer.as_ref() == Some(peer) {
370                range.peer = None;
371                range.request_id = None;
372            }
373        }
374    }
375
376    /// Start syncing from a specific height.
377    pub async fn start_sync(&self, from_height: u64, to_height: u64) {
378        info!("Starting sync from {} to {}", from_height, to_height);
379
380        let mut status = self.status.write().await;
381        status.state = SyncState::Downloading;
382        status.local_height = Some(from_height);
383        status.target_height = Some(to_height);
384        status.started_at = Some(Utc::now());
385        drop(status);
386
387        // Create sync ranges
388        let mut ranges = self.sync_ranges.write().await;
389        ranges.clear();
390
391        let batch_size = self.config.batch_size as u64;
392        let mut start = from_height;
393
394        while start < to_height {
395            let end = (start + batch_size).min(to_height);
396            ranges.push_back(SyncRange {
397                start,
398                end,
399                peer: None,
400                request_id: None,
401                retries: 0,
402            });
403            start = end;
404        }
405
406        *self.sync_start.write().await = Some(Instant::now());
407        self.synced_count
408            .store(0, std::sync::atomic::Ordering::SeqCst);
409    }
410
411    /// Pause syncing.
412    pub async fn pause_sync(&self) {
413        let mut status = self.status.write().await;
414        if status.is_syncing() {
415            status.state = SyncState::Paused;
416        }
417    }
418
419    /// Resume syncing.
420    pub async fn resume_sync(&self) {
421        let mut status = self.status.write().await;
422        if status.state == SyncState::Paused {
423            status.state = SyncState::Downloading;
424        }
425    }
426
427    /// Get the next sync request to send.
428    ///
429    /// Returns (peer_id, message) if there's work to do.
430    pub async fn next_request(&self, available_peers: &[PeerInfo]) -> Option<(PeerId, Message)> {
431        let status = self.status.read().await;
432        if !status.is_syncing() {
433            return None;
434        }
435        drop(status);
436
437        // Check if we're at max concurrent requests
438        let pending = self.pending_requests.read().await;
439        if pending.len() >= self.config.max_concurrent_requests {
440            return None;
441        }
442        drop(pending);
443
444        // Find a range that needs work
445        let mut ranges = self.sync_ranges.write().await;
446
447        for range in ranges.iter_mut() {
448            if range.peer.is_some() {
449                continue; // Already assigned
450            }
451
452            // Find a peer that has this range
453            let heights = self.peer_heights.read().await;
454            let suitable_peer = available_peers
455                .iter()
456                .find(|p| heights.get(&p.id).map(|h| *h >= range.end).unwrap_or(false));
457
458            if let Some(peer) = suitable_peer {
459                let message_id = generate_message_id();
460
461                let message = if self.config.header_first {
462                    Message::GetHeaders(GetHeadersMessage {
463                        id: message_id,
464                        start_height: range.start,
465                        count: (range.end - range.start) as u32,
466                    })
467                } else {
468                    Message::GetBlocks(GetBlocksMessage {
469                        id: message_id,
470                        start_height: range.start,
471                        count: (range.end - range.start) as u32,
472                    })
473                };
474
475                range.peer = Some(peer.id.clone());
476                range.request_id = Some(message_id);
477
478                // Track pending request
479                let mut pending = self.pending_requests.write().await;
480                pending.insert(
481                    message_id,
482                    PendingRequest {
483                        id: message_id,
484                        peer: peer.id.clone(),
485                        kind: if self.config.header_first {
486                            RequestKind::Headers {
487                                start: range.start,
488                                count: (range.end - range.start) as u32,
489                            }
490                        } else {
491                            RequestKind::Blocks {
492                                start: range.start,
493                                count: (range.end - range.start) as u32,
494                            }
495                        },
496                        sent_at: Instant::now(),
497                        retries: range.retries,
498                    },
499                );
500
501                return Some((peer.id.clone(), message));
502            }
503        }
504
505        None
506    }
507
508    /// Handle a received blocks response.
509    pub async fn handle_blocks(&self, response: BlocksMessage) -> Result<Vec<Block>, SyncError> {
510        // Remove from pending
511        let mut pending = self.pending_requests.write().await;
512        let request = pending.remove(&response.request_id);
513        drop(pending);
514
515        if request.is_none() {
516            return Err(SyncError::UnexpectedResponse(response.request_id));
517        }
518
519        // Store blocks in buffer
520        let mut buffer = self.block_buffer.write().await;
521        let mut received = Vec::new();
522
523        for block in response.blocks {
524            let height = block.header.height;
525            buffer.insert(height, block.clone());
526            received.push(block);
527        }
528
529        // Update synced count
530        self.synced_count
531            .fetch_add(received.len() as u64, std::sync::atomic::Ordering::SeqCst);
532
533        Ok(received)
534    }
535
536    /// Handle a received headers response.
537    pub async fn handle_headers(
538        &self,
539        response: HeadersMessage,
540    ) -> Result<Vec<BlockHeader>, SyncError> {
541        // Remove from pending
542        let mut pending = self.pending_requests.write().await;
543        let request = pending.remove(&response.request_id);
544        drop(pending);
545
546        if request.is_none() {
547            return Err(SyncError::UnexpectedResponse(response.request_id));
548        }
549
550        // Store headers in buffer
551        let mut buffer = self.header_buffer.write().await;
552        let mut received = Vec::new();
553
554        for header in response.headers {
555            let height = header.height;
556            buffer.insert(height, header.clone());
557            received.push(header);
558        }
559
560        Ok(received)
561    }
562
563    /// Handle a received snapshot response.
564    pub async fn handle_snapshot(&self, response: SnapshotMessage) -> Result<(), SyncError> {
565        let mut pending = self.pending_requests.write().await;
566        let request = pending.remove(&response.request_id);
567        drop(pending);
568
569        if request.is_none() {
570            return Err(SyncError::UnexpectedResponse(response.request_id));
571        }
572
573        // Update local height to snapshot height
574        let mut status = self.status.write().await;
575        status.local_height = Some(response.height);
576
577        info!(
578            "Received snapshot at height {} with {} events",
579            response.height, response.event_count
580        );
581
582        Ok(())
583    }
584
585    /// Get blocks ready to apply (in order).
586    pub async fn get_ready_blocks(&self, current_height: u64) -> Vec<Block> {
587        let mut buffer = self.block_buffer.write().await;
588        let mut ready = Vec::new();
589
590        let mut next_height = current_height + 1;
591        while let Some(block) = buffer.remove(&next_height) {
592            ready.push(block);
593            next_height += 1;
594        }
595
596        ready
597    }
598
599    /// Mark a range as complete.
600    pub async fn complete_range(&self, start: u64, end: u64) {
601        let mut ranges = self.sync_ranges.write().await;
602        ranges.retain(|r| !(r.start == start && r.end == end));
603
604        // Check if all ranges are complete
605        if ranges.is_empty() {
606            drop(ranges);
607            let mut status = self.status.write().await;
608            status.state = SyncState::Synced;
609            info!("Sync complete");
610        }
611    }
612
613    /// Handle a request timeout.
614    pub async fn handle_timeout(&self, request_id: MessageId) {
615        let mut pending = self.pending_requests.write().await;
616
617        if let Some(request) = pending.remove(&request_id) {
618            warn!("Request {} to {} timed out", request_id, request.peer);
619
620            // Mark the range as unassigned for retry
621            let mut ranges = self.sync_ranges.write().await;
622            for range in ranges.iter_mut() {
623                if range.request_id == Some(request_id) {
624                    range.peer = None;
625                    range.request_id = None;
626                    range.retries += 1;
627
628                    if range.retries > self.config.max_retries {
629                        warn!("Range {}-{} exceeded max retries", range.start, range.end);
630                    }
631                    break;
632                }
633            }
634        }
635    }
636
637    /// Get timed out requests.
638    pub async fn get_timed_out_requests(&self) -> Vec<MessageId> {
639        let pending = self.pending_requests.read().await;
640        let now = Instant::now();
641
642        pending
643            .iter()
644            .filter(|(_, req)| now.duration_since(req.sent_at) > self.config.request_timeout)
645            .map(|(id, _)| *id)
646            .collect()
647    }
648
649    /// Request a snapshot from a peer.
650    pub fn create_snapshot_request(&self, height: Option<u64>) -> (MessageId, Message) {
651        let id = generate_message_id();
652        let msg = Message::GetSnapshot(GetSnapshotMessage { id, height });
653        (id, msg)
654    }
655
656    /// Request blocks from a peer.
657    pub fn create_blocks_request(&self, start: u64, count: u32) -> (MessageId, Message) {
658        let id = generate_message_id();
659        let msg = Message::GetBlocks(GetBlocksMessage {
660            id,
661            start_height: start,
662            count,
663        });
664        (id, msg)
665    }
666
667    /// Request headers from a peer.
668    pub fn create_headers_request(&self, start: u64, count: u32) -> (MessageId, Message) {
669        let id = generate_message_id();
670        let msg = Message::GetHeaders(GetHeadersMessage {
671            id,
672            start_height: start,
673            count,
674        });
675        (id, msg)
676    }
677
678    /// Get sync statistics.
679    pub async fn stats(&self) -> SyncStats {
680        let status = self.status.read().await;
681        let pending = self.pending_requests.read().await;
682        let ranges = self.sync_ranges.read().await;
683        let block_buffer = self.block_buffer.read().await;
684        let header_buffer = self.header_buffer.read().await;
685        let peer_heights = self.peer_heights.read().await;
686
687        SyncStats {
688            state: status.state,
689            mode: status.mode,
690            local_height: status.local_height,
691            target_height: status.target_height,
692            pending_requests: pending.len(),
693            remaining_ranges: ranges.len(),
694            buffered_blocks: block_buffer.len(),
695            buffered_headers: header_buffer.len(),
696            known_peers: peer_heights.len(),
697            blocks_synced: self.synced_count.load(std::sync::atomic::Ordering::Relaxed),
698            blocks_per_second: status.blocks_per_second,
699        }
700    }
701}
702
703/// Sync statistics.
704#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct SyncStats {
706    /// Current sync state.
707    pub state: SyncState,
708    /// Sync mode.
709    pub mode: SyncMode,
710    /// Local chain height.
711    pub local_height: Option<u64>,
712    /// Target chain height.
713    pub target_height: Option<u64>,
714    /// Number of pending requests.
715    pub pending_requests: usize,
716    /// Number of remaining ranges to sync.
717    pub remaining_ranges: usize,
718    /// Number of buffered blocks.
719    pub buffered_blocks: usize,
720    /// Number of buffered headers.
721    pub buffered_headers: usize,
722    /// Number of known peer heights.
723    pub known_peers: usize,
724    /// Total blocks synced.
725    pub blocks_synced: u64,
726    /// Blocks per second.
727    pub blocks_per_second: f64,
728}
729
730/// Sync errors.
731#[derive(Debug, thiserror::Error)]
732pub enum SyncError {
733    #[error("unexpected response for request {0}")]
734    UnexpectedResponse(MessageId),
735
736    #[error("request timed out: {0}")]
737    Timeout(MessageId),
738
739    #[error("not enough peers: have {have}, need {need}")]
740    NotEnoughPeers { have: usize, need: usize },
741
742    #[error("invalid block at height {0}")]
743    InvalidBlock(u64),
744
745    #[error("chain mismatch at height {0}")]
746    ChainMismatch(u64),
747
748    #[error("sync cancelled")]
749    Cancelled,
750
751    #[error("peer error: {0}")]
752    PeerError(String),
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use moloch_core::crypto::SecretKey;
759
760    fn test_peer_id() -> PeerId {
761        crate::protocol::PeerId::new(SecretKey::generate().public_key())
762    }
763
764    fn test_peer_info(height: u64) -> PeerInfo {
765        use crate::discovery::{DiscoverySource, PeerMetadata, PeerScore, PeerState};
766
767        PeerInfo {
768            id: test_peer_id(),
769            addresses: vec!["127.0.0.1:8000".parse().unwrap()],
770            state: PeerState::Connected,
771            score: PeerScore::default(),
772            first_seen: Utc::now(),
773            last_seen: Some(Utc::now()),
774            connection_successes: 1,
775            connection_failures: 0,
776            source: DiscoverySource::Static,
777            metadata: PeerMetadata {
778                height: Some(height),
779                ..Default::default()
780            },
781        }
782    }
783
784    #[test]
785    fn test_sync_mode_display() {
786        assert_eq!(format!("{}", SyncMode::Fast), "fast");
787        assert_eq!(format!("{}", SyncMode::Snap), "snap");
788    }
789
790    #[test]
791    fn test_sync_state_display() {
792        assert_eq!(format!("{}", SyncState::Downloading), "downloading");
793        assert_eq!(format!("{}", SyncState::Synced), "synced");
794    }
795
796    #[test]
797    fn test_sync_status_default() {
798        let status = SyncStatus::default();
799        assert!(!status.is_syncing());
800        assert!(!status.is_synced());
801        assert_eq!(status.progress, 0.0);
802    }
803
804    #[test]
805    fn test_sync_status_progress() {
806        let mut status = SyncStatus {
807            local_height: Some(50),
808            target_height: Some(100),
809            ..Default::default()
810        };
811
812        status.calculate_progress();
813        assert_eq!(status.progress, 50.0);
814    }
815
816    #[tokio::test]
817    async fn test_sync_manager_creation() {
818        let config = SyncConfig::default();
819        let manager = SyncManager::new(config);
820
821        let status = manager.status().await;
822        assert_eq!(status.state, SyncState::Idle);
823    }
824
825    #[tokio::test]
826    async fn test_sync_manager_peer_heights() {
827        let config = SyncConfig::default();
828        let manager = SyncManager::new(config);
829
830        let peer1 = test_peer_id();
831        let peer2 = test_peer_id();
832
833        manager.update_peer_height(peer1.clone(), 100).await;
834        manager.update_peer_height(peer2.clone(), 200).await;
835
836        let status = manager.status().await;
837        assert_eq!(status.target_height, Some(200));
838
839        // Remove peer
840        manager.remove_peer(&peer2).await;
841        let status = manager.status().await;
842        assert_eq!(status.target_height, Some(200)); // Still 200 (target doesn't decrease automatically)
843    }
844
845    #[tokio::test]
846    async fn test_sync_manager_needs_sync() {
847        let config = SyncConfig {
848            sync_threshold: 10,
849            ..Default::default()
850        };
851        let manager = SyncManager::new(config);
852
853        // No peers = no sync needed
854        assert!(!manager.needs_sync(Some(50)).await);
855
856        // Add peer with higher height
857        let peer = test_peer_id();
858        manager.update_peer_height(peer, 100).await;
859
860        // Should sync (100 - 50 > 10)
861        assert!(manager.needs_sync(Some(50)).await);
862
863        // Shouldn't sync if close
864        assert!(!manager.needs_sync(Some(95)).await);
865    }
866
867    #[tokio::test]
868    async fn test_sync_manager_start_sync() {
869        let config = SyncConfig::default();
870        let manager = SyncManager::new(config);
871
872        manager.start_sync(0, 1000).await;
873
874        let status = manager.status().await;
875        assert_eq!(status.state, SyncState::Downloading);
876        assert_eq!(status.local_height, Some(0));
877        assert_eq!(status.target_height, Some(1000));
878
879        // Check ranges were created
880        let ranges = manager.sync_ranges.read().await;
881        assert!(!ranges.is_empty());
882    }
883
884    #[tokio::test]
885    async fn test_sync_manager_pause_resume() {
886        let config = SyncConfig::default();
887        let manager = SyncManager::new(config);
888
889        manager.start_sync(0, 100).await;
890
891        // Pause
892        manager.pause_sync().await;
893        let status = manager.status().await;
894        assert_eq!(status.state, SyncState::Paused);
895
896        // Resume
897        manager.resume_sync().await;
898        let status = manager.status().await;
899        assert_eq!(status.state, SyncState::Downloading);
900    }
901
902    #[tokio::test]
903    async fn test_sync_manager_next_request() {
904        let config = SyncConfig::default();
905        let manager = SyncManager::new(config);
906
907        // No sync started = no request
908        let request = manager.next_request(&[]).await;
909        assert!(request.is_none());
910
911        // Start sync
912        manager.start_sync(0, 100).await;
913
914        // No available peers = no request
915        let request = manager.next_request(&[]).await;
916        assert!(request.is_none());
917
918        // With a suitable peer
919        let peer = test_peer_info(100);
920        manager.update_peer_height(peer.id.clone(), 100).await;
921
922        let request = manager.next_request(&[peer]).await;
923        assert!(request.is_some());
924    }
925
926    #[tokio::test]
927    async fn test_sync_manager_handle_blocks() {
928        let config = SyncConfig::default();
929        let manager = SyncManager::new(config);
930
931        // Handle unexpected response
932        let response = BlocksMessage {
933            request_id: 999,
934            blocks: vec![],
935            has_more: false,
936        };
937        let result = manager.handle_blocks(response).await;
938        assert!(matches!(result, Err(SyncError::UnexpectedResponse(999))));
939    }
940
941    #[tokio::test]
942    async fn test_sync_manager_create_requests() {
943        let config = SyncConfig::default();
944        let manager = SyncManager::new(config);
945
946        let (id1, msg1) = manager.create_blocks_request(0, 100);
947        assert!(matches!(msg1, Message::GetBlocks(_)));
948
949        let (id2, msg2) = manager.create_headers_request(100, 50);
950        assert!(matches!(msg2, Message::GetHeaders(_)));
951
952        let (id3, msg3) = manager.create_snapshot_request(Some(500));
953        assert!(matches!(msg3, Message::GetSnapshot(_)));
954
955        // IDs should be unique
956        assert_ne!(id1, id2);
957        assert_ne!(id2, id3);
958    }
959
960    #[tokio::test]
961    async fn test_sync_manager_stats() {
962        let config = SyncConfig::default();
963        let manager = SyncManager::new(config);
964
965        manager.start_sync(0, 100).await;
966
967        let stats = manager.stats().await;
968        assert_eq!(stats.state, SyncState::Downloading);
969        assert!(stats.remaining_ranges > 0);
970        assert_eq!(stats.buffered_blocks, 0);
971    }
972
973    #[tokio::test]
974    async fn test_sync_manager_timeout() {
975        let config = SyncConfig::default();
976        let manager = SyncManager::new(config);
977
978        // Add a fake pending request
979        let request_id = generate_message_id();
980        {
981            let mut pending = manager.pending_requests.write().await;
982            pending.insert(
983                request_id,
984                PendingRequest {
985                    id: request_id,
986                    peer: test_peer_id(),
987                    kind: RequestKind::Blocks {
988                        start: 0,
989                        count: 100,
990                    },
991                    sent_at: Instant::now() - Duration::from_secs(60),
992                    retries: 0,
993                },
994            );
995        }
996
997        // Should detect timeout
998        let timed_out = manager.get_timed_out_requests().await;
999        assert_eq!(timed_out.len(), 1);
1000        assert_eq!(timed_out[0], request_id);
1001
1002        // Handle timeout
1003        manager.handle_timeout(request_id).await;
1004        let pending = manager.pending_requests.read().await;
1005        assert!(!pending.contains_key(&request_id));
1006    }
1007
1008    #[tokio::test]
1009    async fn test_sync_manager_complete_range() {
1010        let config = SyncConfig::default();
1011        let manager = SyncManager::new(config);
1012
1013        manager.start_sync(0, 100).await;
1014
1015        let initial_count = manager.sync_ranges.read().await.len();
1016        assert!(initial_count > 0);
1017
1018        // Complete the first range
1019        manager.complete_range(0, 100).await;
1020
1021        let status = manager.status().await;
1022        assert_eq!(status.state, SyncState::Synced);
1023    }
1024
1025    #[test]
1026    fn test_checkpoint() {
1027        let checkpoint = Checkpoint {
1028            height: 1000,
1029            hash: moloch_core::block::BlockHash(moloch_core::crypto::hash(b"block")),
1030            mmr_root: moloch_core::crypto::hash(b"mmr"),
1031            created_at: Utc::now(),
1032        };
1033
1034        assert_eq!(checkpoint.height, 1000);
1035    }
1036}