Skip to main content

abtc_application/
download_scheduler.rs

1//! Block Download Scheduler
2//!
3//! Tracks in-flight block requests with timeouts, stale-tip detection,
4//! and per-peer download performance. This corresponds to block download
5//! management logic in Bitcoin Core's `net_processing.cpp`.
6//!
7//! ## Design
8//!
9//! Each block request is tracked with a timestamp. If a peer doesn't
10//! deliver a block within `BLOCK_DOWNLOAD_TIMEOUT` seconds, the request
11//! is cancelled and reassigned to another peer. Peers that consistently
12//! time out receive a misbehavior penalty.
13//!
14//! The scheduler also detects a "stale tip" — when no new blocks have
15//! been connected for `STALE_TIP_CHECK_INTERVAL` seconds — and triggers
16//! re-sync with a different peer.
17
18use std::collections::HashMap;
19use std::net::SocketAddr;
20
21// ── Configuration ───────────────────────────────────────────────────
22
23/// Seconds to wait for a requested block before timing out.
24const BLOCK_DOWNLOAD_TIMEOUT: u64 = 60;
25
26/// Seconds of no new block before we consider the tip stale.
27const STALE_TIP_CHECK_INTERVAL: u64 = 600; // 10 minutes
28
29/// Maximum number of blocks a single peer can have in flight.
30const MAX_BLOCKS_PER_PEER: usize = 16;
31
32// ── Types ───────────────────────────────────────────────────────────
33
34/// A single in-flight block request.
35#[derive(Debug, Clone)]
36pub struct BlockRequest {
37    /// The hash of the block being requested (32 bytes).
38    pub block_hash: [u8; 32],
39    /// Peer ID that was assigned this request.
40    pub peer_id: u64,
41    /// Unix timestamp when the request was sent.
42    pub requested_at: u64,
43    /// Expected block height (for ordering and reorg detection).
44    pub height: u32,
45}
46
47/// Per-peer download statistics and performance metrics.
48#[derive(Debug, Clone)]
49pub struct PeerDownloadStats {
50    /// Total blocks successfully delivered.
51    pub blocks_delivered: u64,
52    /// Total blocks that timed out.
53    pub blocks_timed_out: u64,
54    /// Total bytes downloaded from this peer.
55    pub bytes_downloaded: u64,
56    /// Average delivery time in milliseconds.
57    pub avg_delivery_ms: f64,
58    /// Current number of in-flight requests.
59    pub in_flight: usize,
60    /// Peer's socket address.
61    pub addr: SocketAddr,
62}
63
64impl PeerDownloadStats {
65    fn new(addr: SocketAddr) -> Self {
66        PeerDownloadStats {
67            blocks_delivered: 0,
68            blocks_timed_out: 0,
69            bytes_downloaded: 0,
70            avg_delivery_ms: 0.0,
71            in_flight: 0,
72            addr,
73        }
74    }
75
76    /// Delivery success rate (0.0 to 1.0).
77    pub fn success_rate(&self) -> f64 {
78        let total = self.blocks_delivered + self.blocks_timed_out;
79        if total == 0 {
80            return 1.0;
81        }
82        self.blocks_delivered as f64 / total as f64
83    }
84
85    /// Estimated download speed in bytes/sec.
86    pub fn estimated_speed(&self) -> u64 {
87        if self.avg_delivery_ms < 1.0 || self.blocks_delivered == 0 {
88            return 0;
89        }
90        let avg_size = self.bytes_downloaded / self.blocks_delivered;
91        (avg_size as f64 / (self.avg_delivery_ms / 1000.0)) as u64
92    }
93}
94
95/// Action returned by the scheduler after checking download state.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum SchedulerAction {
98    /// Request this block from the specified peer.
99    RequestBlock {
100        /// Peer ID to request from.
101        peer_id: u64,
102        /// Block hash to request.
103        block_hash: [u8; 32],
104    },
105    /// A block request timed out — penalize the peer.
106    TimeoutPeer {
107        /// Peer ID that missed the deadline.
108        peer_id: u64,
109        /// Block hash that timed out.
110        block_hash: [u8; 32],
111    },
112    /// The chain tip appears stale — trigger re-sync with any available peer.
113    StaleTipDetected,
114}
115
116/// The block download scheduler.
117///
118/// Tracks in-flight block requests with timeouts and maintains per-peer statistics.
119/// Detects stale tips and helps select the best peer for future requests.
120pub struct DownloadScheduler {
121    /// Currently in-flight requests, keyed by block hash.
122    in_flight: HashMap<[u8; 32], BlockRequest>,
123    /// Per-peer download statistics and performance metrics.
124    peer_stats: HashMap<u64, PeerDownloadStats>,
125    /// Unix timestamp of the last block successfully connected.
126    last_block_time: u64,
127    /// Whether a stale-tip warning has already been emitted (avoid spam).
128    stale_tip_warned: bool,
129}
130
131impl DownloadScheduler {
132    /// Create a new download scheduler.
133    pub fn new(now: u64) -> Self {
134        DownloadScheduler {
135            in_flight: HashMap::new(),
136            peer_stats: HashMap::new(),
137            last_block_time: now,
138            stale_tip_warned: false,
139        }
140    }
141
142    /// Register a peer so we can track its stats.
143    pub fn register_peer(&mut self, peer_id: u64, addr: SocketAddr) {
144        self.peer_stats
145            .insert(peer_id, PeerDownloadStats::new(addr));
146    }
147
148    /// Remove a peer and cancel all its in-flight requests.
149    /// Returns the block hashes that need to be re-queued.
150    pub fn remove_peer(&mut self, peer_id: u64) -> Vec<[u8; 32]> {
151        self.peer_stats.remove(&peer_id);
152        let cancelled: Vec<[u8; 32]> = self
153            .in_flight
154            .iter()
155            .filter(|(_, req)| req.peer_id == peer_id)
156            .map(|(hash, _)| *hash)
157            .collect();
158        for hash in &cancelled {
159            self.in_flight.remove(hash);
160        }
161        cancelled
162    }
163
164    /// Record a block request being sent.
165    pub fn record_request(&mut self, block_hash: [u8; 32], peer_id: u64, height: u32, now: u64) {
166        self.in_flight.insert(
167            block_hash,
168            BlockRequest {
169                block_hash,
170                peer_id,
171                requested_at: now,
172                height,
173            },
174        );
175        if let Some(stats) = self.peer_stats.get_mut(&peer_id) {
176            stats.in_flight += 1;
177        }
178    }
179
180    /// Record a successfully received block.
181    pub fn record_delivery(&mut self, block_hash: &[u8; 32], block_size: u64, now: u64) {
182        if let Some(req) = self.in_flight.remove(block_hash) {
183            let delivery_ms = (now.saturating_sub(req.requested_at)) * 1000;
184            if let Some(stats) = self.peer_stats.get_mut(&req.peer_id) {
185                stats.blocks_delivered += 1;
186                stats.bytes_downloaded += block_size;
187                stats.in_flight = stats.in_flight.saturating_sub(1);
188                // Exponential moving average of delivery time
189                if stats.blocks_delivered == 1 {
190                    stats.avg_delivery_ms = delivery_ms as f64;
191                } else {
192                    stats.avg_delivery_ms = stats.avg_delivery_ms * 0.9 + delivery_ms as f64 * 0.1;
193                }
194            }
195            self.last_block_time = now;
196            self.stale_tip_warned = false;
197        }
198    }
199
200    /// Check for timed-out requests and stale tip.
201    /// Returns actions that the caller should execute.
202    pub fn check_timeouts(&mut self, now: u64) -> Vec<SchedulerAction> {
203        let mut actions = Vec::new();
204
205        // Check for timed-out block requests
206        let timed_out: Vec<([u8; 32], u64)> = self
207            .in_flight
208            .iter()
209            .filter(|(_, req)| now.saturating_sub(req.requested_at) > BLOCK_DOWNLOAD_TIMEOUT)
210            .map(|(hash, req)| (*hash, req.peer_id))
211            .collect();
212
213        for (hash, peer_id) in timed_out {
214            self.in_flight.remove(&hash);
215            if let Some(stats) = self.peer_stats.get_mut(&peer_id) {
216                stats.blocks_timed_out += 1;
217                stats.in_flight = stats.in_flight.saturating_sub(1);
218            }
219            actions.push(SchedulerAction::TimeoutPeer {
220                peer_id,
221                block_hash: hash,
222            });
223        }
224
225        // Check for stale tip
226        if !self.stale_tip_warned
227            && now.saturating_sub(self.last_block_time) > STALE_TIP_CHECK_INTERVAL
228        {
229            self.stale_tip_warned = true;
230            actions.push(SchedulerAction::StaleTipDetected);
231        }
232
233        actions
234    }
235
236    /// Pick the best peer to request a block from.
237    ///
238    /// Prefers peers with:
239    /// 1. Fewer in-flight requests (below MAX_BLOCKS_PER_PEER)
240    /// 2. Higher success rate
241    /// 3. Higher download speed
242    pub fn pick_peer(&self, available_peers: &[u64]) -> Option<u64> {
243        available_peers
244            .iter()
245            .filter_map(|&pid| self.peer_stats.get(&pid).map(|s| (pid, s)))
246            .filter(|(_, stats)| stats.in_flight < MAX_BLOCKS_PER_PEER)
247            .max_by(|(_, a), (_, b)| {
248                // Score: success_rate * 100 + speed/1024
249                let score_a = a.success_rate() * 100.0 + a.estimated_speed() as f64 / 1024.0;
250                let score_b = b.success_rate() * 100.0 + b.estimated_speed() as f64 / 1024.0;
251                score_a
252                    .partial_cmp(&score_b)
253                    .unwrap_or(std::cmp::Ordering::Equal)
254            })
255            .map(|(pid, _)| pid)
256    }
257
258    /// Get the number of in-flight requests.
259    pub fn in_flight_count(&self) -> usize {
260        self.in_flight.len()
261    }
262
263    /// Get download stats for a specific peer.
264    pub fn peer_stats(&self, peer_id: u64) -> Option<&PeerDownloadStats> {
265        self.peer_stats.get(&peer_id)
266    }
267
268    /// Get the last block connection time.
269    pub fn last_block_time(&self) -> u64 {
270        self.last_block_time
271    }
272
273    /// Get the download timeout threshold.
274    pub fn timeout_secs(&self) -> u64 {
275        BLOCK_DOWNLOAD_TIMEOUT
276    }
277
278    /// Get the stale tip threshold.
279    pub fn stale_tip_secs(&self) -> u64 {
280        STALE_TIP_CHECK_INTERVAL
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    fn addr(port: u16) -> SocketAddr {
289        format!("127.0.0.1:{}", port).parse().unwrap()
290    }
291
292    #[test]
293    fn test_new_scheduler() {
294        let sched = DownloadScheduler::new(1000);
295        assert_eq!(sched.in_flight_count(), 0);
296        assert_eq!(sched.last_block_time(), 1000);
297    }
298
299    #[test]
300    fn test_register_and_remove_peer() {
301        let mut sched = DownloadScheduler::new(1000);
302        sched.register_peer(1, addr(8333));
303        assert!(sched.peer_stats(1).is_some());
304
305        let cancelled = sched.remove_peer(1);
306        assert!(cancelled.is_empty());
307        assert!(sched.peer_stats(1).is_none());
308    }
309
310    #[test]
311    fn test_request_and_delivery() {
312        let mut sched = DownloadScheduler::new(1000);
313        sched.register_peer(1, addr(8333));
314
315        let hash = [0xAB; 32];
316        sched.record_request(hash, 1, 100, 1000);
317        assert_eq!(sched.in_flight_count(), 1);
318        assert_eq!(sched.peer_stats(1).unwrap().in_flight, 1);
319
320        sched.record_delivery(&hash, 500_000, 1005);
321        assert_eq!(sched.in_flight_count(), 0);
322
323        let stats = sched.peer_stats(1).unwrap();
324        assert_eq!(stats.blocks_delivered, 1);
325        assert_eq!(stats.bytes_downloaded, 500_000);
326        assert_eq!(stats.in_flight, 0);
327        assert_eq!(sched.last_block_time(), 1005);
328    }
329
330    #[test]
331    fn test_timeout_detection() {
332        let mut sched = DownloadScheduler::new(1000);
333        sched.register_peer(1, addr(8333));
334
335        let hash = [0xCD; 32];
336        sched.record_request(hash, 1, 100, 1000);
337
338        // Not timed out yet
339        let actions = sched.check_timeouts(1050);
340        assert!(actions.is_empty());
341
342        // Now timed out (> 60 seconds)
343        let actions = sched.check_timeouts(1061);
344        assert_eq!(actions.len(), 1);
345        assert_eq!(
346            actions[0],
347            SchedulerAction::TimeoutPeer {
348                peer_id: 1,
349                block_hash: hash
350            }
351        );
352
353        // Request should be removed from in-flight
354        assert_eq!(sched.in_flight_count(), 0);
355        assert_eq!(sched.peer_stats(1).unwrap().blocks_timed_out, 1);
356    }
357
358    #[test]
359    fn test_stale_tip_detection() {
360        let mut sched = DownloadScheduler::new(1000);
361
362        // Not stale yet
363        let actions = sched.check_timeouts(1500);
364        assert!(actions.is_empty());
365
366        // Stale tip (> 600 seconds)
367        let actions = sched.check_timeouts(1601);
368        assert_eq!(actions.len(), 1);
369        assert_eq!(actions[0], SchedulerAction::StaleTipDetected);
370
371        // Should not warn again (stale_tip_warned = true)
372        let actions = sched.check_timeouts(1700);
373        assert!(actions.is_empty());
374    }
375
376    #[test]
377    fn test_stale_tip_resets_on_delivery() {
378        let mut sched = DownloadScheduler::new(1000);
379        sched.register_peer(1, addr(8333));
380
381        // Trigger stale
382        let actions = sched.check_timeouts(1601);
383        assert!(actions
384            .iter()
385            .any(|a| *a == SchedulerAction::StaleTipDetected));
386
387        // Deliver a block → resets stale warning
388        let hash = [0xEF; 32];
389        sched.record_request(hash, 1, 200, 1600);
390        sched.record_delivery(&hash, 1_000_000, 1605);
391
392        // Should be able to trigger stale again later
393        let actions = sched.check_timeouts(2210);
394        assert!(actions
395            .iter()
396            .any(|a| *a == SchedulerAction::StaleTipDetected));
397    }
398
399    #[test]
400    fn test_pick_peer_prefers_fewer_inflight() {
401        let mut sched = DownloadScheduler::new(1000);
402        sched.register_peer(1, addr(8333));
403        sched.register_peer(2, addr(8334));
404
405        // Give peer 1 some in-flight requests
406        for i in 0..5u8 {
407            let mut hash = [0u8; 32];
408            hash[0] = i;
409            sched.record_request(hash, 1, i as u32, 1000);
410        }
411
412        // Peer 2 has no in-flight, so should be preferred
413        let best = sched.pick_peer(&[1, 2]);
414        assert_eq!(best, Some(2));
415    }
416
417    #[test]
418    fn test_pick_peer_respects_max_inflight() {
419        let mut sched = DownloadScheduler::new(1000);
420        sched.register_peer(1, addr(8333));
421
422        // Fill peer 1 to max
423        for i in 0..MAX_BLOCKS_PER_PEER as u8 {
424            let mut hash = [0u8; 32];
425            hash[0] = i;
426            sched.record_request(hash, 1, i as u32, 1000);
427        }
428
429        // No peer available
430        let best = sched.pick_peer(&[1]);
431        assert_eq!(best, None);
432    }
433
434    #[test]
435    fn test_remove_peer_cancels_requests() {
436        let mut sched = DownloadScheduler::new(1000);
437        sched.register_peer(1, addr(8333));
438
439        let hash1 = [0xAA; 32];
440        let hash2 = [0xBB; 32];
441        sched.record_request(hash1, 1, 100, 1000);
442        sched.record_request(hash2, 1, 101, 1000);
443
444        let cancelled = sched.remove_peer(1);
445        assert_eq!(cancelled.len(), 2);
446        assert_eq!(sched.in_flight_count(), 0);
447    }
448
449    #[test]
450    fn test_peer_success_rate() {
451        let mut sched = DownloadScheduler::new(1000);
452        sched.register_peer(1, addr(8333));
453
454        // 3 deliveries, 1 timeout → 75% success
455        for i in 0..4u8 {
456            let mut hash = [0u8; 32];
457            hash[0] = i;
458            sched.record_request(hash, 1, i as u32, 1000 + i as u64);
459        }
460        // Deliver first 3
461        for i in 0..3u8 {
462            let mut hash = [0u8; 32];
463            hash[0] = i;
464            sched.record_delivery(&hash, 100_000, 1010);
465        }
466        // Timeout the 4th
467        let actions = sched.check_timeouts(1070);
468        assert_eq!(actions.len(), 1);
469
470        let stats = sched.peer_stats(1).unwrap();
471        assert_eq!(stats.blocks_delivered, 3);
472        assert_eq!(stats.blocks_timed_out, 1);
473        assert!((stats.success_rate() - 0.75).abs() < 0.01);
474    }
475}