chie_core/
content_router.rs

1//! Content routing optimizer for efficient content discovery and retrieval.
2//!
3//! This module provides intelligent routing algorithms to find and retrieve content
4//! from the optimal peers in the network. It combines peer selection, network topology,
5//! content availability, and caching strategies.
6//!
7//! # Example
8//!
9//! ```
10//! use chie_core::{ContentRouter, RoutingStrategy, PeerContentLocation};
11//!
12//! # async fn example() {
13//! let mut router = ContentRouter::new();
14//!
15//! // Register content locations
16//! router.register_location("QmContent123", PeerContentLocation {
17//!     peer_id: "peer1".to_string(),
18//!     cid: "QmContent123".to_string(),
19//!     availability_score: 0.95,
20//!     last_verified: std::time::SystemTime::now(),
21//!     chunk_count: 100,
22//!     complete: true,
23//! });
24//!
25//! // Find optimal peers for content
26//! let peers = router.find_peers("QmContent123", 3);
27//! println!("Found {} peers with content", peers.len());
28//! # }
29//! ```
30
31use std::collections::HashMap;
32use std::time::{Duration, SystemTime};
33
34/// Represents a content location (peer hosting content).
35#[derive(Debug, Clone)]
36pub struct PeerContentLocation {
37    /// Peer ID hosting the content
38    pub peer_id: String,
39    /// Content ID (CID)
40    pub cid: String,
41    /// Availability score (0.0 to 1.0)
42    pub availability_score: f64,
43    /// Last time this location was verified
44    pub last_verified: SystemTime,
45    /// Number of chunks available
46    pub chunk_count: u32,
47    /// Whether the peer has complete content
48    pub complete: bool,
49}
50
51/// Routing strategy for content discovery.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum RoutingStrategy {
54    /// Closest peers first (lowest latency)
55    Closest,
56    /// Most available peers first (highest availability)
57    MostAvailable,
58    /// Load balanced across peers
59    LoadBalanced,
60    /// Redundant routing (multiple sources)
61    Redundant,
62}
63
64/// Content routing statistics.
65#[derive(Debug, Clone, Default)]
66pub struct RoutingStats {
67    /// Total routing requests
68    pub total_requests: u64,
69    /// Cache hits
70    pub cache_hits: u64,
71    /// Cache misses
72    pub cache_misses: u64,
73    /// Average peers per content
74    pub avg_peers_per_content: f64,
75    /// Total unique content tracked
76    pub unique_content: usize,
77}
78
79/// Content router for intelligent content discovery.
80pub struct ContentRouter {
81    /// Content ID to locations mapping
82    content_locations: HashMap<String, Vec<PeerContentLocation>>,
83    /// Routing strategy
84    strategy: RoutingStrategy,
85    /// Cache for recent lookups
86    lookup_cache: HashMap<String, Vec<String>>,
87    /// Cache TTL
88    cache_ttl: Duration,
89    /// Statistics
90    stats: RoutingStats,
91    /// Location verification interval
92    verification_interval: Duration,
93}
94
95impl ContentRouter {
96    /// Create a new content router.
97    #[must_use]
98    #[inline]
99    pub fn new() -> Self {
100        Self {
101            content_locations: HashMap::new(),
102            strategy: RoutingStrategy::LoadBalanced,
103            lookup_cache: HashMap::new(),
104            cache_ttl: Duration::from_secs(60),
105            stats: RoutingStats::default(),
106            verification_interval: Duration::from_secs(300),
107        }
108    }
109
110    /// Create a router with a specific strategy.
111    #[must_use]
112    #[inline]
113    pub fn with_strategy(strategy: RoutingStrategy) -> Self {
114        Self {
115            strategy,
116            ..Self::new()
117        }
118    }
119
120    /// Set the routing strategy.
121    #[inline]
122    pub fn set_strategy(&mut self, strategy: RoutingStrategy) {
123        self.strategy = strategy;
124    }
125
126    /// Set cache TTL.
127    #[inline]
128    pub fn set_cache_ttl(&mut self, ttl: Duration) {
129        self.cache_ttl = ttl;
130    }
131
132    /// Register a content location.
133    pub fn register_location(&mut self, cid: &str, location: PeerContentLocation) {
134        let locations = self.content_locations.entry(cid.to_string()).or_default();
135
136        // Update if exists, otherwise add
137        if let Some(existing) = locations.iter_mut().find(|l| l.peer_id == location.peer_id) {
138            *existing = location;
139        } else {
140            locations.push(location);
141        }
142
143        // Invalidate cache for this CID
144        self.lookup_cache.remove(cid);
145    }
146
147    /// Unregister a content location.
148    pub fn unregister_location(&mut self, cid: &str, peer_id: &str) {
149        if let Some(locations) = self.content_locations.get_mut(cid) {
150            locations.retain(|l| l.peer_id != peer_id);
151            if locations.is_empty() {
152                self.content_locations.remove(cid);
153            }
154            self.lookup_cache.remove(cid);
155        }
156    }
157
158    /// Find peers hosting specific content.
159    #[must_use]
160    pub fn find_peers(&mut self, cid: &str, max_peers: usize) -> Vec<String> {
161        self.stats.total_requests += 1;
162
163        // Check cache first
164        if let Some(cached) = self.lookup_cache.get(cid) {
165            self.stats.cache_hits += 1;
166            return cached.iter().take(max_peers).cloned().collect();
167        }
168
169        self.stats.cache_misses += 1;
170
171        // Get locations for this content
172        let locations = match self.content_locations.get(cid) {
173            Some(locs) => locs,
174            None => return Vec::new(),
175        };
176
177        // Filter out stale locations
178        let valid_locations: Vec<_> = locations
179            .iter()
180            .filter(|l| self.is_location_valid(l))
181            .cloned()
182            .collect();
183
184        if valid_locations.is_empty() {
185            return Vec::new();
186        }
187
188        // Apply routing strategy
189        let mut selected = match self.strategy {
190            RoutingStrategy::Closest => self.route_by_closest(valid_locations),
191            RoutingStrategy::MostAvailable => self.route_by_availability(valid_locations),
192            RoutingStrategy::LoadBalanced => self.route_load_balanced(valid_locations),
193            RoutingStrategy::Redundant => self.route_redundant(valid_locations),
194        };
195
196        selected.truncate(max_peers);
197
198        let peer_ids: Vec<String> = selected.iter().map(|l| l.peer_id.clone()).collect();
199
200        // Update cache
201        self.lookup_cache.insert(cid.to_string(), peer_ids.clone());
202
203        peer_ids
204    }
205
206    /// Check if a location is still valid.
207    fn is_location_valid(&self, location: &PeerContentLocation) -> bool {
208        if let Ok(duration) = SystemTime::now().duration_since(location.last_verified) {
209            duration < self.verification_interval
210        } else {
211            false
212        }
213    }
214
215    /// Route by closest peers (would use latency in real implementation).
216    fn route_by_closest(
217        &self,
218        mut locations: Vec<PeerContentLocation>,
219    ) -> Vec<PeerContentLocation> {
220        // Sort by availability score as proxy for "closeness"
221        locations.sort_by(|a, b| {
222            b.availability_score
223                .partial_cmp(&a.availability_score)
224                .unwrap_or(std::cmp::Ordering::Equal)
225        });
226        locations
227    }
228
229    /// Route by most available peers.
230    fn route_by_availability(
231        &self,
232        mut locations: Vec<PeerContentLocation>,
233    ) -> Vec<PeerContentLocation> {
234        locations.sort_by(|a, b| {
235            // Complete content first, then by availability score
236            match (a.complete, b.complete) {
237                (true, false) => std::cmp::Ordering::Less,
238                (false, true) => std::cmp::Ordering::Greater,
239                _ => b
240                    .availability_score
241                    .partial_cmp(&a.availability_score)
242                    .unwrap_or(std::cmp::Ordering::Equal),
243            }
244        });
245        locations
246    }
247
248    /// Route with load balancing.
249    fn route_load_balanced(&self, locations: Vec<PeerContentLocation>) -> Vec<PeerContentLocation> {
250        // Simple round-robin for now
251        // In production, would track actual load per peer
252        locations
253    }
254
255    /// Route with redundancy (multiple sources).
256    fn route_redundant(&self, locations: Vec<PeerContentLocation>) -> Vec<PeerContentLocation> {
257        // Return all valid locations for redundancy
258        locations
259    }
260
261    /// Get all peers hosting a specific content.
262    #[must_use]
263    #[inline]
264    pub fn get_all_peers(&self, cid: &str) -> Vec<String> {
265        self.content_locations
266            .get(cid)
267            .map(|locs| locs.iter().map(|l| l.peer_id.clone()).collect())
268            .unwrap_or_default()
269    }
270
271    /// Get content availability score.
272    #[must_use]
273    #[inline]
274    pub fn get_availability(&self, cid: &str) -> Option<f64> {
275        self.content_locations.get(cid).map(|locs| {
276            if locs.is_empty() {
277                return 0.0;
278            }
279            let total: f64 = locs.iter().map(|l| l.availability_score).sum();
280            total / locs.len() as f64
281        })
282    }
283
284    /// Check if content is available.
285    #[must_use]
286    #[inline]
287    pub fn has_content(&self, cid: &str) -> bool {
288        self.content_locations.contains_key(cid)
289    }
290
291    /// Get number of peers hosting content.
292    #[must_use]
293    #[inline]
294    pub fn peer_count(&self, cid: &str) -> usize {
295        self.content_locations
296            .get(cid)
297            .map(|locs| locs.len())
298            .unwrap_or(0)
299    }
300
301    /// Find content by popularity (most peers).
302    #[must_use]
303    pub fn find_popular_content(&self, limit: usize) -> Vec<String> {
304        let mut content_peers: Vec<_> = self
305            .content_locations
306            .iter()
307            .map(|(cid, locs)| (cid.clone(), locs.len()))
308            .collect();
309
310        content_peers.sort_by(|a, b| b.1.cmp(&a.1));
311
312        content_peers
313            .into_iter()
314            .take(limit)
315            .map(|(cid, _)| cid)
316            .collect()
317    }
318
319    /// Find rare content (fewest peers).
320    #[must_use]
321    pub fn find_rare_content(&self, limit: usize) -> Vec<String> {
322        let mut content_peers: Vec<_> = self
323            .content_locations
324            .iter()
325            .map(|(cid, locs)| (cid.clone(), locs.len()))
326            .collect();
327
328        content_peers.sort_by(|a, b| a.1.cmp(&b.1));
329
330        content_peers
331            .into_iter()
332            .take(limit)
333            .map(|(cid, _)| cid)
334            .collect()
335    }
336
337    /// Get routing statistics.
338    #[must_use]
339    #[inline]
340    pub fn get_statistics(&self) -> RoutingStats {
341        let mut stats = self.stats.clone();
342        stats.unique_content = self.content_locations.len();
343
344        if !self.content_locations.is_empty() {
345            let total_peers: usize = self.content_locations.values().map(|locs| locs.len()).sum();
346            stats.avg_peers_per_content = total_peers as f64 / self.content_locations.len() as f64;
347        }
348
349        stats
350    }
351
352    /// Clear routing cache.
353    #[inline]
354    pub fn clear_cache(&mut self) {
355        self.lookup_cache.clear();
356    }
357
358    /// Remove stale locations.
359    #[must_use]
360    pub fn cleanup_stale_locations(&mut self) -> usize {
361        let mut removed_count = 0;
362        let now = SystemTime::now();
363        let verification_interval = self.verification_interval;
364        let mut cids_to_remove = Vec::new();
365
366        for (cid, locations) in self.content_locations.iter_mut() {
367            let initial_len = locations.len();
368            locations.retain(|l| {
369                if let Ok(duration) = now.duration_since(l.last_verified) {
370                    duration < verification_interval
371                } else {
372                    false
373                }
374            });
375            removed_count += initial_len - locations.len();
376
377            if locations.is_empty() {
378                cids_to_remove.push(cid.clone());
379            }
380        }
381
382        for cid in cids_to_remove {
383            self.content_locations.remove(&cid);
384            self.lookup_cache.remove(&cid);
385        }
386
387        removed_count
388    }
389
390    /// Get total number of tracked content.
391    #[must_use]
392    #[inline]
393    pub fn content_count(&self) -> usize {
394        self.content_locations.len()
395    }
396
397    /// Get total number of locations.
398    #[must_use]
399    #[inline]
400    pub fn location_count(&self) -> usize {
401        self.content_locations.values().map(|locs| locs.len()).sum()
402    }
403
404    /// Find peers with specific content completeness.
405    #[must_use]
406    #[inline]
407    pub fn find_complete_peers(&self, cid: &str) -> Vec<String> {
408        self.content_locations
409            .get(cid)
410            .map(|locs| {
411                locs.iter()
412                    .filter(|l| l.complete)
413                    .map(|l| l.peer_id.clone())
414                    .collect()
415            })
416            .unwrap_or_default()
417    }
418
419    /// Get content with minimum peer count.
420    #[must_use]
421    #[inline]
422    pub fn find_well_distributed_content(&self, min_peers: usize) -> Vec<String> {
423        self.content_locations
424            .iter()
425            .filter(|(_, locs)| locs.len() >= min_peers)
426            .map(|(cid, _)| cid.clone())
427            .collect()
428    }
429
430    /// Suggest content for replication (poorly distributed).
431    #[must_use]
432    #[inline]
433    pub fn suggest_replication_targets(&self, max_peers: usize) -> Vec<String> {
434        self.content_locations
435            .iter()
436            .filter(|(_, locs)| locs.len() <= max_peers)
437            .map(|(cid, _)| cid.clone())
438            .collect()
439    }
440}
441
442impl Default for ContentRouter {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    fn create_test_location(peer_id: &str, cid: &str, complete: bool) -> PeerContentLocation {
453        PeerContentLocation {
454            peer_id: peer_id.to_string(),
455            cid: cid.to_string(),
456            availability_score: 0.9,
457            last_verified: SystemTime::now(),
458            chunk_count: 100,
459            complete,
460        }
461    }
462
463    #[test]
464    fn test_register_and_find_peers() {
465        let mut router = ContentRouter::new();
466
467        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
468        router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
469
470        let peers = router.find_peers("QmTest", 10);
471        assert_eq!(peers.len(), 2);
472        assert!(peers.contains(&"peer1".to_string()));
473        assert!(peers.contains(&"peer2".to_string()));
474    }
475
476    #[test]
477    fn test_unregister_location() {
478        let mut router = ContentRouter::new();
479
480        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
481        router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
482
483        assert_eq!(router.peer_count("QmTest"), 2);
484
485        router.unregister_location("QmTest", "peer1");
486        assert_eq!(router.peer_count("QmTest"), 1);
487
488        let peers = router.find_peers("QmTest", 10);
489        assert_eq!(peers.len(), 1);
490        assert_eq!(peers[0], "peer2");
491    }
492
493    #[test]
494    fn test_routing_strategies() {
495        let mut router = ContentRouter::new();
496
497        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
498        router.register_location("QmTest", create_test_location("peer2", "QmTest", false));
499
500        router.set_strategy(RoutingStrategy::MostAvailable);
501        let peers = router.find_peers("QmTest", 1);
502        assert_eq!(peers.len(), 1);
503
504        router.clear_cache(); // Clear cache before changing strategy
505        router.set_strategy(RoutingStrategy::Redundant);
506        let peers = router.find_peers("QmTest", 10);
507        assert_eq!(peers.len(), 2);
508    }
509
510    #[test]
511    fn test_content_availability() {
512        let mut router = ContentRouter::new();
513
514        let mut loc1 = create_test_location("peer1", "QmTest", true);
515        loc1.availability_score = 0.8;
516        let mut loc2 = create_test_location("peer2", "QmTest", true);
517        loc2.availability_score = 1.0;
518
519        router.register_location("QmTest", loc1);
520        router.register_location("QmTest", loc2);
521
522        let availability = router.get_availability("QmTest").unwrap();
523        assert!((availability - 0.9).abs() < 0.01);
524    }
525
526    #[test]
527    fn test_find_popular_content() {
528        let mut router = ContentRouter::new();
529
530        router.register_location(
531            "QmContent1",
532            create_test_location("peer1", "QmContent1", true),
533        );
534        router.register_location(
535            "QmContent1",
536            create_test_location("peer2", "QmContent1", true),
537        );
538        router.register_location(
539            "QmContent1",
540            create_test_location("peer3", "QmContent1", true),
541        );
542
543        router.register_location(
544            "QmContent2",
545            create_test_location("peer1", "QmContent2", true),
546        );
547
548        let popular = router.find_popular_content(1);
549        assert_eq!(popular.len(), 1);
550        assert_eq!(popular[0], "QmContent1");
551    }
552
553    #[test]
554    fn test_find_rare_content() {
555        let mut router = ContentRouter::new();
556
557        router.register_location(
558            "QmContent1",
559            create_test_location("peer1", "QmContent1", true),
560        );
561        router.register_location(
562            "QmContent1",
563            create_test_location("peer2", "QmContent1", true),
564        );
565        router.register_location(
566            "QmContent1",
567            create_test_location("peer3", "QmContent1", true),
568        );
569
570        router.register_location(
571            "QmContent2",
572            create_test_location("peer1", "QmContent2", true),
573        );
574
575        let rare = router.find_rare_content(1);
576        assert_eq!(rare.len(), 1);
577        assert_eq!(rare[0], "QmContent2");
578    }
579
580    #[test]
581    fn test_cache_functionality() {
582        let mut router = ContentRouter::new();
583
584        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
585
586        // First call - cache miss
587        let _ = router.find_peers("QmTest", 10);
588        let stats = router.get_statistics();
589        assert_eq!(stats.cache_misses, 1);
590        assert_eq!(stats.cache_hits, 0);
591
592        // Second call - cache hit
593        let _ = router.find_peers("QmTest", 10);
594        let stats = router.get_statistics();
595        assert_eq!(stats.cache_hits, 1);
596    }
597
598    #[test]
599    fn test_clear_cache() {
600        let mut router = ContentRouter::new();
601
602        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
603        let _ = router.find_peers("QmTest", 10);
604
605        router.clear_cache();
606        let _ = router.find_peers("QmTest", 10);
607
608        let stats = router.get_statistics();
609        assert_eq!(stats.cache_misses, 2);
610    }
611
612    #[test]
613    fn test_find_complete_peers() {
614        let mut router = ContentRouter::new();
615
616        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
617        router.register_location("QmTest", create_test_location("peer2", "QmTest", false));
618        router.register_location("QmTest", create_test_location("peer3", "QmTest", true));
619
620        let complete = router.find_complete_peers("QmTest");
621        assert_eq!(complete.len(), 2);
622        assert!(complete.contains(&"peer1".to_string()));
623        assert!(complete.contains(&"peer3".to_string()));
624    }
625
626    #[test]
627    fn test_replication_suggestions() {
628        let mut router = ContentRouter::new();
629
630        router.register_location(
631            "QmContent1",
632            create_test_location("peer1", "QmContent1", true),
633        );
634
635        router.register_location(
636            "QmContent2",
637            create_test_location("peer1", "QmContent2", true),
638        );
639        router.register_location(
640            "QmContent2",
641            create_test_location("peer2", "QmContent2", true),
642        );
643        router.register_location(
644            "QmContent2",
645            create_test_location("peer3", "QmContent2", true),
646        );
647
648        let targets = router.suggest_replication_targets(2);
649        assert!(targets.contains(&"QmContent1".to_string()));
650        assert!(!targets.contains(&"QmContent2".to_string()));
651    }
652
653    #[test]
654    fn test_statistics() {
655        let mut router = ContentRouter::new();
656
657        router.register_location(
658            "QmContent1",
659            create_test_location("peer1", "QmContent1", true),
660        );
661        router.register_location(
662            "QmContent1",
663            create_test_location("peer2", "QmContent1", true),
664        );
665        router.register_location(
666            "QmContent2",
667            create_test_location("peer1", "QmContent2", true),
668        );
669
670        let _ = router.find_peers("QmContent1", 10);
671
672        let stats = router.get_statistics();
673        assert_eq!(stats.unique_content, 2);
674        assert_eq!(stats.total_requests, 1);
675        assert!((stats.avg_peers_per_content - 1.5).abs() < 0.01);
676    }
677
678    #[test]
679    fn test_max_peers_limit() {
680        let mut router = ContentRouter::new();
681
682        router.register_location("QmTest", create_test_location("peer1", "QmTest", true));
683        router.register_location("QmTest", create_test_location("peer2", "QmTest", true));
684        router.register_location("QmTest", create_test_location("peer3", "QmTest", true));
685
686        let peers = router.find_peers("QmTest", 2);
687        assert_eq!(peers.len(), 2);
688    }
689}