1use std::collections::HashMap;
19use std::net::SocketAddr;
20
21const BLOCK_DOWNLOAD_TIMEOUT: u64 = 60;
25
26const STALE_TIP_CHECK_INTERVAL: u64 = 600; const MAX_BLOCKS_PER_PEER: usize = 16;
31
32#[derive(Debug, Clone)]
36pub struct BlockRequest {
37 pub block_hash: [u8; 32],
39 pub peer_id: u64,
41 pub requested_at: u64,
43 pub height: u32,
45}
46
47#[derive(Debug, Clone)]
49pub struct PeerDownloadStats {
50 pub blocks_delivered: u64,
52 pub blocks_timed_out: u64,
54 pub bytes_downloaded: u64,
56 pub avg_delivery_ms: f64,
58 pub in_flight: usize,
60 pub addr: SocketAddr,
62}
63
64impl PeerDownloadStats {
65 fn new(addr: SocketAddr) -> Self {
66 PeerDownloadStats {
67 blocks_delivered: 0,
68 blocks_timed_out: 0,
69 bytes_downloaded: 0,
70 avg_delivery_ms: 0.0,
71 in_flight: 0,
72 addr,
73 }
74 }
75
76 pub fn success_rate(&self) -> f64 {
78 let total = self.blocks_delivered + self.blocks_timed_out;
79 if total == 0 {
80 return 1.0;
81 }
82 self.blocks_delivered as f64 / total as f64
83 }
84
85 pub fn estimated_speed(&self) -> u64 {
87 if self.avg_delivery_ms < 1.0 || self.blocks_delivered == 0 {
88 return 0;
89 }
90 let avg_size = self.bytes_downloaded / self.blocks_delivered;
91 (avg_size as f64 / (self.avg_delivery_ms / 1000.0)) as u64
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum SchedulerAction {
98 RequestBlock {
100 peer_id: u64,
102 block_hash: [u8; 32],
104 },
105 TimeoutPeer {
107 peer_id: u64,
109 block_hash: [u8; 32],
111 },
112 StaleTipDetected,
114}
115
116pub struct DownloadScheduler {
121 in_flight: HashMap<[u8; 32], BlockRequest>,
123 peer_stats: HashMap<u64, PeerDownloadStats>,
125 last_block_time: u64,
127 stale_tip_warned: bool,
129}
130
131impl DownloadScheduler {
132 pub fn new(now: u64) -> Self {
134 DownloadScheduler {
135 in_flight: HashMap::new(),
136 peer_stats: HashMap::new(),
137 last_block_time: now,
138 stale_tip_warned: false,
139 }
140 }
141
142 pub fn register_peer(&mut self, peer_id: u64, addr: SocketAddr) {
144 self.peer_stats
145 .insert(peer_id, PeerDownloadStats::new(addr));
146 }
147
148 pub fn remove_peer(&mut self, peer_id: u64) -> Vec<[u8; 32]> {
151 self.peer_stats.remove(&peer_id);
152 let cancelled: Vec<[u8; 32]> = self
153 .in_flight
154 .iter()
155 .filter(|(_, req)| req.peer_id == peer_id)
156 .map(|(hash, _)| *hash)
157 .collect();
158 for hash in &cancelled {
159 self.in_flight.remove(hash);
160 }
161 cancelled
162 }
163
164 pub fn record_request(&mut self, block_hash: [u8; 32], peer_id: u64, height: u32, now: u64) {
166 self.in_flight.insert(
167 block_hash,
168 BlockRequest {
169 block_hash,
170 peer_id,
171 requested_at: now,
172 height,
173 },
174 );
175 if let Some(stats) = self.peer_stats.get_mut(&peer_id) {
176 stats.in_flight += 1;
177 }
178 }
179
180 pub fn record_delivery(&mut self, block_hash: &[u8; 32], block_size: u64, now: u64) {
182 if let Some(req) = self.in_flight.remove(block_hash) {
183 let delivery_ms = (now.saturating_sub(req.requested_at)) * 1000;
184 if let Some(stats) = self.peer_stats.get_mut(&req.peer_id) {
185 stats.blocks_delivered += 1;
186 stats.bytes_downloaded += block_size;
187 stats.in_flight = stats.in_flight.saturating_sub(1);
188 if stats.blocks_delivered == 1 {
190 stats.avg_delivery_ms = delivery_ms as f64;
191 } else {
192 stats.avg_delivery_ms = stats.avg_delivery_ms * 0.9 + delivery_ms as f64 * 0.1;
193 }
194 }
195 self.last_block_time = now;
196 self.stale_tip_warned = false;
197 }
198 }
199
200 pub fn check_timeouts(&mut self, now: u64) -> Vec<SchedulerAction> {
203 let mut actions = Vec::new();
204
205 let timed_out: Vec<([u8; 32], u64)> = self
207 .in_flight
208 .iter()
209 .filter(|(_, req)| now.saturating_sub(req.requested_at) > BLOCK_DOWNLOAD_TIMEOUT)
210 .map(|(hash, req)| (*hash, req.peer_id))
211 .collect();
212
213 for (hash, peer_id) in timed_out {
214 self.in_flight.remove(&hash);
215 if let Some(stats) = self.peer_stats.get_mut(&peer_id) {
216 stats.blocks_timed_out += 1;
217 stats.in_flight = stats.in_flight.saturating_sub(1);
218 }
219 actions.push(SchedulerAction::TimeoutPeer {
220 peer_id,
221 block_hash: hash,
222 });
223 }
224
225 if !self.stale_tip_warned
227 && now.saturating_sub(self.last_block_time) > STALE_TIP_CHECK_INTERVAL
228 {
229 self.stale_tip_warned = true;
230 actions.push(SchedulerAction::StaleTipDetected);
231 }
232
233 actions
234 }
235
236 pub fn pick_peer(&self, available_peers: &[u64]) -> Option<u64> {
243 available_peers
244 .iter()
245 .filter_map(|&pid| self.peer_stats.get(&pid).map(|s| (pid, s)))
246 .filter(|(_, stats)| stats.in_flight < MAX_BLOCKS_PER_PEER)
247 .max_by(|(_, a), (_, b)| {
248 let score_a = a.success_rate() * 100.0 + a.estimated_speed() as f64 / 1024.0;
250 let score_b = b.success_rate() * 100.0 + b.estimated_speed() as f64 / 1024.0;
251 score_a
252 .partial_cmp(&score_b)
253 .unwrap_or(std::cmp::Ordering::Equal)
254 })
255 .map(|(pid, _)| pid)
256 }
257
258 pub fn in_flight_count(&self) -> usize {
260 self.in_flight.len()
261 }
262
263 pub fn peer_stats(&self, peer_id: u64) -> Option<&PeerDownloadStats> {
265 self.peer_stats.get(&peer_id)
266 }
267
268 pub fn last_block_time(&self) -> u64 {
270 self.last_block_time
271 }
272
273 pub fn timeout_secs(&self) -> u64 {
275 BLOCK_DOWNLOAD_TIMEOUT
276 }
277
278 pub fn stale_tip_secs(&self) -> u64 {
280 STALE_TIP_CHECK_INTERVAL
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 fn addr(port: u16) -> SocketAddr {
289 format!("127.0.0.1:{}", port).parse().unwrap()
290 }
291
292 #[test]
293 fn test_new_scheduler() {
294 let sched = DownloadScheduler::new(1000);
295 assert_eq!(sched.in_flight_count(), 0);
296 assert_eq!(sched.last_block_time(), 1000);
297 }
298
299 #[test]
300 fn test_register_and_remove_peer() {
301 let mut sched = DownloadScheduler::new(1000);
302 sched.register_peer(1, addr(8333));
303 assert!(sched.peer_stats(1).is_some());
304
305 let cancelled = sched.remove_peer(1);
306 assert!(cancelled.is_empty());
307 assert!(sched.peer_stats(1).is_none());
308 }
309
310 #[test]
311 fn test_request_and_delivery() {
312 let mut sched = DownloadScheduler::new(1000);
313 sched.register_peer(1, addr(8333));
314
315 let hash = [0xAB; 32];
316 sched.record_request(hash, 1, 100, 1000);
317 assert_eq!(sched.in_flight_count(), 1);
318 assert_eq!(sched.peer_stats(1).unwrap().in_flight, 1);
319
320 sched.record_delivery(&hash, 500_000, 1005);
321 assert_eq!(sched.in_flight_count(), 0);
322
323 let stats = sched.peer_stats(1).unwrap();
324 assert_eq!(stats.blocks_delivered, 1);
325 assert_eq!(stats.bytes_downloaded, 500_000);
326 assert_eq!(stats.in_flight, 0);
327 assert_eq!(sched.last_block_time(), 1005);
328 }
329
330 #[test]
331 fn test_timeout_detection() {
332 let mut sched = DownloadScheduler::new(1000);
333 sched.register_peer(1, addr(8333));
334
335 let hash = [0xCD; 32];
336 sched.record_request(hash, 1, 100, 1000);
337
338 let actions = sched.check_timeouts(1050);
340 assert!(actions.is_empty());
341
342 let actions = sched.check_timeouts(1061);
344 assert_eq!(actions.len(), 1);
345 assert_eq!(
346 actions[0],
347 SchedulerAction::TimeoutPeer {
348 peer_id: 1,
349 block_hash: hash
350 }
351 );
352
353 assert_eq!(sched.in_flight_count(), 0);
355 assert_eq!(sched.peer_stats(1).unwrap().blocks_timed_out, 1);
356 }
357
358 #[test]
359 fn test_stale_tip_detection() {
360 let mut sched = DownloadScheduler::new(1000);
361
362 let actions = sched.check_timeouts(1500);
364 assert!(actions.is_empty());
365
366 let actions = sched.check_timeouts(1601);
368 assert_eq!(actions.len(), 1);
369 assert_eq!(actions[0], SchedulerAction::StaleTipDetected);
370
371 let actions = sched.check_timeouts(1700);
373 assert!(actions.is_empty());
374 }
375
376 #[test]
377 fn test_stale_tip_resets_on_delivery() {
378 let mut sched = DownloadScheduler::new(1000);
379 sched.register_peer(1, addr(8333));
380
381 let actions = sched.check_timeouts(1601);
383 assert!(actions
384 .iter()
385 .any(|a| *a == SchedulerAction::StaleTipDetected));
386
387 let hash = [0xEF; 32];
389 sched.record_request(hash, 1, 200, 1600);
390 sched.record_delivery(&hash, 1_000_000, 1605);
391
392 let actions = sched.check_timeouts(2210);
394 assert!(actions
395 .iter()
396 .any(|a| *a == SchedulerAction::StaleTipDetected));
397 }
398
399 #[test]
400 fn test_pick_peer_prefers_fewer_inflight() {
401 let mut sched = DownloadScheduler::new(1000);
402 sched.register_peer(1, addr(8333));
403 sched.register_peer(2, addr(8334));
404
405 for i in 0..5u8 {
407 let mut hash = [0u8; 32];
408 hash[0] = i;
409 sched.record_request(hash, 1, i as u32, 1000);
410 }
411
412 let best = sched.pick_peer(&[1, 2]);
414 assert_eq!(best, Some(2));
415 }
416
417 #[test]
418 fn test_pick_peer_respects_max_inflight() {
419 let mut sched = DownloadScheduler::new(1000);
420 sched.register_peer(1, addr(8333));
421
422 for i in 0..MAX_BLOCKS_PER_PEER as u8 {
424 let mut hash = [0u8; 32];
425 hash[0] = i;
426 sched.record_request(hash, 1, i as u32, 1000);
427 }
428
429 let best = sched.pick_peer(&[1]);
431 assert_eq!(best, None);
432 }
433
434 #[test]
435 fn test_remove_peer_cancels_requests() {
436 let mut sched = DownloadScheduler::new(1000);
437 sched.register_peer(1, addr(8333));
438
439 let hash1 = [0xAA; 32];
440 let hash2 = [0xBB; 32];
441 sched.record_request(hash1, 1, 100, 1000);
442 sched.record_request(hash2, 1, 101, 1000);
443
444 let cancelled = sched.remove_peer(1);
445 assert_eq!(cancelled.len(), 2);
446 assert_eq!(sched.in_flight_count(), 0);
447 }
448
449 #[test]
450 fn test_peer_success_rate() {
451 let mut sched = DownloadScheduler::new(1000);
452 sched.register_peer(1, addr(8333));
453
454 for i in 0..4u8 {
456 let mut hash = [0u8; 32];
457 hash[0] = i;
458 sched.record_request(hash, 1, i as u32, 1000 + i as u64);
459 }
460 for i in 0..3u8 {
462 let mut hash = [0u8; 32];
463 hash[0] = i;
464 sched.record_delivery(&hash, 100_000, 1010);
465 }
466 let actions = sched.check_timeouts(1070);
468 assert_eq!(actions.len(), 1);
469
470 let stats = sched.peer_stats(1).unwrap();
471 assert_eq!(stats.blocks_delivered, 3);
472 assert_eq!(stats.blocks_timed_out, 1);
473 assert!((stats.success_rate() - 0.75).abs() < 0.01);
474 }
475}