1use std::collections::HashMap;
32use std::time::{Duration, SystemTime};
33
34#[derive(Debug, Clone)]
36pub struct PeerContentLocation {
37 pub peer_id: String,
39 pub cid: String,
41 pub availability_score: f64,
43 pub last_verified: SystemTime,
45 pub chunk_count: u32,
47 pub complete: bool,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum RoutingStrategy {
54 Closest,
56 MostAvailable,
58 LoadBalanced,
60 Redundant,
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct RoutingStats {
67 pub total_requests: u64,
69 pub cache_hits: u64,
71 pub cache_misses: u64,
73 pub avg_peers_per_content: f64,
75 pub unique_content: usize,
77}
78
79pub struct ContentRouter {
81 content_locations: HashMap<String, Vec<PeerContentLocation>>,
83 strategy: RoutingStrategy,
85 lookup_cache: HashMap<String, Vec<String>>,
87 cache_ttl: Duration,
89 stats: RoutingStats,
91 verification_interval: Duration,
93}
94
95impl ContentRouter {
96 #[must_use]
98 #[inline]
99 pub fn new() -> Self {
100 Self {
101 content_locations: HashMap::new(),
102 strategy: RoutingStrategy::LoadBalanced,
103 lookup_cache: HashMap::new(),
104 cache_ttl: Duration::from_secs(60),
105 stats: RoutingStats::default(),
106 verification_interval: Duration::from_secs(300),
107 }
108 }
109
110 #[must_use]
112 #[inline]
113 pub fn with_strategy(strategy: RoutingStrategy) -> Self {
114 Self {
115 strategy,
116 ..Self::new()
117 }
118 }
119
120 #[inline]
122 pub fn set_strategy(&mut self, strategy: RoutingStrategy) {
123 self.strategy = strategy;
124 }
125
126 #[inline]
128 pub fn set_cache_ttl(&mut self, ttl: Duration) {
129 self.cache_ttl = ttl;
130 }
131
132 pub fn register_location(&mut self, cid: &str, location: PeerContentLocation) {
134 let locations = self.content_locations.entry(cid.to_string()).or_default();
135
136 if let Some(existing) = locations.iter_mut().find(|l| l.peer_id == location.peer_id) {
138 *existing = location;
139 } else {
140 locations.push(location);
141 }
142
143 self.lookup_cache.remove(cid);
145 }
146
147 pub fn unregister_location(&mut self, cid: &str, peer_id: &str) {
149 if let Some(locations) = self.content_locations.get_mut(cid) {
150 locations.retain(|l| l.peer_id != peer_id);
151 if locations.is_empty() {
152 self.content_locations.remove(cid);
153 }
154 self.lookup_cache.remove(cid);
155 }
156 }
157
158 #[must_use]
160 pub fn find_peers(&mut self, cid: &str, max_peers: usize) -> Vec<String> {
161 self.stats.total_requests += 1;
162
163 if let Some(cached) = self.lookup_cache.get(cid) {
165 self.stats.cache_hits += 1;
166 return cached.iter().take(max_peers).cloned().collect();
167 }
168
169 self.stats.cache_misses += 1;
170
171 let locations = match self.content_locations.get(cid) {
173 Some(locs) => locs,
174 None => return Vec::new(),
175 };
176
177 let valid_locations: Vec<_> = locations
179 .iter()
180 .filter(|l| self.is_location_valid(l))
181 .cloned()
182 .collect();
183
184 if valid_locations.is_empty() {
185 return Vec::new();
186 }
187
188 let mut selected = match self.strategy {
190 RoutingStrategy::Closest => self.route_by_closest(valid_locations),
191 RoutingStrategy::MostAvailable => self.route_by_availability(valid_locations),
192 RoutingStrategy::LoadBalanced => self.route_load_balanced(valid_locations),
193 RoutingStrategy::Redundant => self.route_redundant(valid_locations),
194 };
195
196 selected.truncate(max_peers);
197
198 let peer_ids: Vec<String> = selected.iter().map(|l| l.peer_id.clone()).collect();
199
200 self.lookup_cache.insert(cid.to_string(), peer_ids.clone());
202
203 peer_ids
204 }
205
206 fn is_location_valid(&self, location: &PeerContentLocation) -> bool {
208 if let Ok(duration) = SystemTime::now().duration_since(location.last_verified) {
209 duration < self.verification_interval
210 } else {
211 false
212 }
213 }
214
215 fn route_by_closest(
217 &self,
218 mut locations: Vec<PeerContentLocation>,
219 ) -> Vec<PeerContentLocation> {
220 locations.sort_by(|a, b| {
222 b.availability_score
223 .partial_cmp(&a.availability_score)
224 .unwrap_or(std::cmp::Ordering::Equal)
225 });
226 locations
227 }
228
229 fn route_by_availability(
231 &self,
232 mut locations: Vec<PeerContentLocation>,
233 ) -> Vec<PeerContentLocation> {
234 locations.sort_by(|a, b| {
235 match (a.complete, b.complete) {
237 (true, false) => std::cmp::Ordering::Less,
238 (false, true) => std::cmp::Ordering::Greater,
239 _ => b
240 .availability_score
241 .partial_cmp(&a.availability_score)
242 .unwrap_or(std::cmp::Ordering::Equal),
243 }
244 });
245 locations
246 }
247
248 fn route_load_balanced(&self, locations: Vec<PeerContentLocation>) -> Vec<PeerContentLocation> {
250 locations
253 }
254
255 fn route_redundant(&self, locations: Vec<PeerContentLocation>) -> Vec<PeerContentLocation> {
257 locations
259 }
260
261 #[must_use]
263 #[inline]
264 pub fn get_all_peers(&self, cid: &str) -> Vec<String> {
265 self.content_locations
266 .get(cid)
267 .map(|locs| locs.iter().map(|l| l.peer_id.clone()).collect())
268 .unwrap_or_default()
269 }
270
271 #[must_use]
273 #[inline]
274 pub fn get_availability(&self, cid: &str) -> Option<f64> {
275 self.content_locations.get(cid).map(|locs| {
276 if locs.is_empty() {
277 return 0.0;
278 }
279 let total: f64 = locs.iter().map(|l| l.availability_score).sum();
280 total / locs.len() as f64
281 })
282 }
283
284 #[must_use]
286 #[inline]
287 pub fn has_content(&self, cid: &str) -> bool {
288 self.content_locations.contains_key(cid)
289 }
290
291 #[must_use]
293 #[inline]
294 pub fn peer_count(&self, cid: &str) -> usize {
295 self.content_locations
296 .get(cid)
297 .map(|locs| locs.len())
298 .unwrap_or(0)
299 }
300
301 #[must_use]
303 pub fn find_popular_content(&self, limit: usize) -> Vec<String> {
304 let mut content_peers: Vec<_> = self
305 .content_locations
306 .iter()
307 .map(|(cid, locs)| (cid.clone(), locs.len()))
308 .collect();
309
310 content_peers.sort_by(|a, b| b.1.cmp(&a.1));
311
312 content_peers
313 .into_iter()
314 .take(limit)
315 .map(|(cid, _)| cid)
316 .collect()
317 }
318
319 #[must_use]
321 pub fn find_rare_content(&self, limit: usize) -> Vec<String> {
322 let mut content_peers: Vec<_> = self
323 .content_locations
324 .iter()
325 .map(|(cid, locs)| (cid.clone(), locs.len()))
326 .collect();
327
328 content_peers.sort_by(|a, b| a.1.cmp(&b.1));
329
330 content_peers
331 .into_iter()
332 .take(limit)
333 .map(|(cid, _)| cid)
334 .collect()
335 }
336
337 #[must_use]
339 #[inline]
340 pub fn get_statistics(&self) -> RoutingStats {
341 let mut stats = self.stats.clone();
342 stats.unique_content = self.content_locations.len();
343
344 if !self.content_locations.is_empty() {
345 let total_peers: usize = self.content_locations.values().map(|locs| locs.len()).sum();
346 stats.avg_peers_per_content = total_peers as f64 / self.content_locations.len() as f64;
347 }
348
349 stats
350 }
351
352 #[inline]
354 pub fn clear_cache(&mut self) {
355 self.lookup_cache.clear();
356 }
357
358 #[must_use]
360 pub fn cleanup_stale_locations(&mut self) -> usize {
361 let mut removed_count = 0;
362 let now = SystemTime::now();
363 let verification_interval = self.verification_interval;
364 let mut cids_to_remove = Vec::new();
365
366 for (cid, locations) in self.content_locations.iter_mut() {
367 let initial_len = locations.len();
368 locations.retain(|l| {
369 if let Ok(duration) = now.duration_since(l.last_verified) {
370 duration < verification_interval
371 } else {
372 false
373 }
374 });
375 removed_count += initial_len - locations.len();
376
377 if locations.is_empty() {
378 cids_to_remove.push(cid.clone());
379 }
380 }
381
382 for cid in cids_to_remove {
383 self.content_locations.remove(&cid);
384 self.lookup_cache.remove(&cid);
385 }
386
387 removed_count
388 }
389
390 #[must_use]
392 #[inline]
393 pub fn content_count(&self) -> usize {
394 self.content_locations.len()
395 }
396
397 #[must_use]
399 #[inline]
400 pub fn location_count(&self) -> usize {
401 self.content_locations.values().map(|locs| locs.len()).sum()
402 }
403
404 #[must_use]
406 #[inline]
407 pub fn find_complete_peers(&self, cid: &str) -> Vec<String> {
408 self.content_locations
409 .get(cid)
410 .map(|locs| {
411 locs.iter()
412 .filter(|l| l.complete)
413 .map(|l| l.peer_id.clone())
414 .collect()
415 })
416 .unwrap_or_default()
417 }
418
419 #[must_use]
421 #[inline]
422 pub fn find_well_distributed_content(&self, min_peers: usize) -> Vec<String> {
423 self.content_locations
424 .iter()
425 .filter(|(_, locs)| locs.len() >= min_peers)
426 .map(|(cid, _)| cid.clone())
427 .collect()
428 }
429
430 #[must_use]
432 #[inline]
433 pub fn suggest_replication_targets(&self, max_peers: usize) -> Vec<String> {
434 self.content_locations
435 .iter()
436 .filter(|(_, locs)| locs.len() <= max_peers)
437 .map(|(cid, _)| cid.clone())
438 .collect()
439 }
440}
441
442impl Default for ContentRouter {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 fn create_test_location(peer_id: &str, cid: &str, complete: bool) -> PeerContentLocation {
453 PeerContentLocation {
454 peer_id: peer_id.to_string(),
455 cid: cid.to_string(),
456 availability_score: 0.9,
457 last_verified: SystemTime::now(),
458 chunk_count: 100,
459 complete,
460 }
461 }
462
463 #[test]
464 fn test_register_and_find_peers() {
465 let mut router = ContentRouter::new();
466
467 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
468 router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
469
470 let peers = router.find_peers("QmTest", 10);
471 assert_eq!(peers.len(), 2);
472 assert!(peers.contains(&"peer1".to_string()));
473 assert!(peers.contains(&"peer2".to_string()));
474 }
475
476 #[test]
477 fn test_unregister_location() {
478 let mut router = ContentRouter::new();
479
480 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
481 router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
482
483 assert_eq!(router.peer_count("QmTest"), 2);
484
485 router.unregister_location("QmTest", "peer1");
486 assert_eq!(router.peer_count("QmTest"), 1);
487
488 let peers = router.find_peers("QmTest", 10);
489 assert_eq!(peers.len(), 1);
490 assert_eq!(peers[0], "peer2");
491 }
492
493 #[test]
494 fn test_routing_strategies() {
495 let mut router = ContentRouter::new();
496
497 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
498 router.register_location("QmTest", create_test_location("peer2", "QmTest", false));
499
500 router.set_strategy(RoutingStrategy::MostAvailable);
501 let peers = router.find_peers("QmTest", 1);
502 assert_eq!(peers.len(), 1);
503
504 router.clear_cache(); router.set_strategy(RoutingStrategy::Redundant);
506 let peers = router.find_peers("QmTest", 10);
507 assert_eq!(peers.len(), 2);
508 }
509
510 #[test]
511 fn test_content_availability() {
512 let mut router = ContentRouter::new();
513
514 let mut loc1 = create_test_location("peer1", "QmTest", true);
515 loc1.availability_score = 0.8;
516 let mut loc2 = create_test_location("peer2", "QmTest", true);
517 loc2.availability_score = 1.0;
518
519 router.register_location("QmTest", loc1);
520 router.register_location("QmTest", loc2);
521
522 let availability = router.get_availability("QmTest").unwrap();
523 assert!((availability - 0.9).abs() < 0.01);
524 }
525
526 #[test]
527 fn test_find_popular_content() {
528 let mut router = ContentRouter::new();
529
530 router.register_location(
531 "QmContent1",
532 create_test_location("peer1", "QmContent1", true),
533 );
534 router.register_location(
535 "QmContent1",
536 create_test_location("peer2", "QmContent1", true),
537 );
538 router.register_location(
539 "QmContent1",
540 create_test_location("peer3", "QmContent1", true),
541 );
542
543 router.register_location(
544 "QmContent2",
545 create_test_location("peer1", "QmContent2", true),
546 );
547
548 let popular = router.find_popular_content(1);
549 assert_eq!(popular.len(), 1);
550 assert_eq!(popular[0], "QmContent1");
551 }
552
553 #[test]
554 fn test_find_rare_content() {
555 let mut router = ContentRouter::new();
556
557 router.register_location(
558 "QmContent1",
559 create_test_location("peer1", "QmContent1", true),
560 );
561 router.register_location(
562 "QmContent1",
563 create_test_location("peer2", "QmContent1", true),
564 );
565 router.register_location(
566 "QmContent1",
567 create_test_location("peer3", "QmContent1", true),
568 );
569
570 router.register_location(
571 "QmContent2",
572 create_test_location("peer1", "QmContent2", true),
573 );
574
575 let rare = router.find_rare_content(1);
576 assert_eq!(rare.len(), 1);
577 assert_eq!(rare[0], "QmContent2");
578 }
579
580 #[test]
581 fn test_cache_functionality() {
582 let mut router = ContentRouter::new();
583
584 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
585
586 let _ = router.find_peers("QmTest", 10);
588 let stats = router.get_statistics();
589 assert_eq!(stats.cache_misses, 1);
590 assert_eq!(stats.cache_hits, 0);
591
592 let _ = router.find_peers("QmTest", 10);
594 let stats = router.get_statistics();
595 assert_eq!(stats.cache_hits, 1);
596 }
597
598 #[test]
599 fn test_clear_cache() {
600 let mut router = ContentRouter::new();
601
602 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
603 let _ = router.find_peers("QmTest", 10);
604
605 router.clear_cache();
606 let _ = router.find_peers("QmTest", 10);
607
608 let stats = router.get_statistics();
609 assert_eq!(stats.cache_misses, 2);
610 }
611
612 #[test]
613 fn test_find_complete_peers() {
614 let mut router = ContentRouter::new();
615
616 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
617 router.register_location("QmTest", create_test_location("peer2", "QmTest", false));
618 router.register_location("QmTest", create_test_location("peer3", "QmTest", true));
619
620 let complete = router.find_complete_peers("QmTest");
621 assert_eq!(complete.len(), 2);
622 assert!(complete.contains(&"peer1".to_string()));
623 assert!(complete.contains(&"peer3".to_string()));
624 }
625
626 #[test]
627 fn test_replication_suggestions() {
628 let mut router = ContentRouter::new();
629
630 router.register_location(
631 "QmContent1",
632 create_test_location("peer1", "QmContent1", true),
633 );
634
635 router.register_location(
636 "QmContent2",
637 create_test_location("peer1", "QmContent2", true),
638 );
639 router.register_location(
640 "QmContent2",
641 create_test_location("peer2", "QmContent2", true),
642 );
643 router.register_location(
644 "QmContent2",
645 create_test_location("peer3", "QmContent2", true),
646 );
647
648 let targets = router.suggest_replication_targets(2);
649 assert!(targets.contains(&"QmContent1".to_string()));
650 assert!(!targets.contains(&"QmContent2".to_string()));
651 }
652
653 #[test]
654 fn test_statistics() {
655 let mut router = ContentRouter::new();
656
657 router.register_location(
658 "QmContent1",
659 create_test_location("peer1", "QmContent1", true),
660 );
661 router.register_location(
662 "QmContent1",
663 create_test_location("peer2", "QmContent1", true),
664 );
665 router.register_location(
666 "QmContent2",
667 create_test_location("peer1", "QmContent2", true),
668 );
669
670 let _ = router.find_peers("QmContent1", 10);
671
672 let stats = router.get_statistics();
673 assert_eq!(stats.unique_content, 2);
674 assert_eq!(stats.total_requests, 1);
675 assert!((stats.avg_peers_per_content - 1.5).abs() < 0.01);
676 }
677
678 #[test]
679 fn test_max_peers_limit() {
680 let mut router = ContentRouter::new();
681
682 router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
683 router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
684 router.register_location("QmTest", create_test_location("peer3", "QmTest", true));
685
686 let peers = router.find_peers("QmTest", 2);
687 assert_eq!(peers.len(), 2);
688 }
689}