saorsa_core/adaptive/
retrieval.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Parallel content retrieval system
15//!
16//! This module implements parallel retrieval strategies as specified:
17//! - Kademlia lookup (α=3 parallel)
18//! - Hyperbolic greedy routing
19//! - SOM region broadcast
20//! - First successful response wins
21
22use super::*;
23use crate::adaptive::{
24    ContentType, learning::QLearnCacheManager, routing::AdaptiveRouter, storage::ContentStore,
25};
26use anyhow::Result;
27use futures::future::select_all;
28use std::{
29    collections::HashMap,
30    sync::Arc,
31    time::{Duration, Instant},
32};
33use tokio::{
34    sync::{RwLock, mpsc},
35    time::timeout,
36};
37
38/// Retrieval manager for parallel content fetching
39pub struct RetrievalManager {
40    /// Routing system for finding content
41    router: Arc<AdaptiveRouter>,
42
43    /// Local content store
44    content_store: Arc<ContentStore>,
45
46    /// Cache manager for Q-learning decisions
47    cache_manager: Arc<QLearnCacheManager>,
48
49    /// Retrieval statistics
50    stats: Arc<RwLock<RetrievalStats>>,
51
52    /// Retrieval timeout
53    timeout: Duration,
54}
55
56/// Strategy for content retrieval
57#[derive(Debug, Clone, PartialEq)]
58pub enum RetrievalStrategy {
59    /// Use all available strategies in parallel
60    Parallel,
61
62    /// Kademlia DHT lookup
63    Kademlia,
64
65    /// Hyperbolic greedy routing
66    Hyperbolic,
67
68    /// SOM region broadcast
69    SOMBroadcast,
70
71    /// Sequential fallback (try strategies in order)
72    Sequential,
73}
74
75/// Retrieval statistics
76#[derive(Debug, Default, Clone)]
77pub struct RetrievalStats {
78    /// Total retrieval attempts
79    pub total_retrievals: u64,
80
81    /// Successful retrievals
82    pub successful_retrievals: u64,
83
84    /// Failed retrievals
85    pub failed_retrievals: u64,
86
87    /// Retrievals by strategy
88    pub retrievals_by_strategy: HashMap<String, u64>,
89
90    /// Success by strategy
91    pub success_by_strategy: HashMap<String, u64>,
92
93    /// Average retrieval time
94    pub avg_retrieval_time_ms: f64,
95
96    /// Cache hits during retrieval
97    pub cache_hits: u64,
98}
99
100/// Result of a retrieval attempt
101#[derive(Debug)]
102struct RetrievalResult {
103    /// Retrieved content
104    content: Vec<u8>,
105
106    /// Strategy that succeeded
107    strategy: String,
108
109    /// Time taken
110    duration: Duration,
111
112    /// Node that provided content
113    source_node: NodeId,
114}
115
116impl RetrievalManager {
117    /// Create a new retrieval manager
118    pub fn new(
119        router: Arc<AdaptiveRouter>,
120        content_store: Arc<ContentStore>,
121        cache_manager: Arc<QLearnCacheManager>,
122    ) -> Self {
123        Self {
124            router,
125            content_store,
126            cache_manager,
127            stats: Arc::new(RwLock::new(RetrievalStats::default())),
128            timeout: Duration::from_secs(5),
129        }
130    }
131
132    /// Retrieve content using specified strategy
133    pub async fn retrieve(
134        &self,
135        content_hash: &ContentHash,
136        strategy: RetrievalStrategy,
137    ) -> Result<Vec<u8>> {
138        let start_time = Instant::now();
139
140        // Check local store first
141        if let Some(content) = self.content_store.retrieve(content_hash).await? {
142            self.update_stats_success("local", start_time.elapsed())
143                .await;
144            return Ok(content);
145        }
146
147        // Check cache
148        if let Some(content) = self.cache_manager.get(content_hash).await {
149            let mut stats = self.stats.write().await;
150            stats.cache_hits += 1;
151            drop(stats);
152            self.update_stats_success("cache", start_time.elapsed())
153                .await;
154            return Ok(content);
155        }
156
157        // Perform retrieval based on strategy
158        let result = match strategy {
159            RetrievalStrategy::Parallel => self.parallel_retrieve(content_hash).await,
160            RetrievalStrategy::Kademlia => self.kademlia_retrieve(content_hash).await,
161            RetrievalStrategy::Hyperbolic => self.hyperbolic_retrieve(content_hash).await,
162            RetrievalStrategy::SOMBroadcast => self.som_broadcast_retrieve(content_hash).await,
163            RetrievalStrategy::Sequential => self.sequential_retrieve(content_hash).await,
164        };
165
166        match result {
167            Ok(retrieval_result) => {
168                // Update statistics
169                self.update_stats_success(&retrieval_result.strategy, retrieval_result.duration)
170                    .await;
171
172                // Cache the content based on Q-learning decision
173                self.cache_manager
174                    .decide_caching(
175                        *content_hash,
176                        retrieval_result.content.clone(),
177                        ContentType::DataRetrieval,
178                    )
179                    .await?;
180
181                // Update routing statistics
182                self.router
183                    .update_statistics(
184                        &retrieval_result.source_node,
185                        true,
186                        retrieval_result.duration.as_millis() as u64,
187                    )
188                    .await;
189
190                Ok(retrieval_result.content)
191            }
192            Err(e) => {
193                self.update_stats_failure().await;
194                Err(e)
195            }
196        }
197    }
198
199    /// Parallel retrieval using all strategies
200    async fn parallel_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
201        let (tx, mut rx) = mpsc::channel(3);
202
203        // Launch all strategies in parallel
204        let kademlia_handle = {
205            let hash = *content_hash;
206            let tx = tx.clone();
207            let manager = self.clone_for_task();
208            tokio::spawn(async move {
209                if let Ok(result) = manager.kademlia_retrieve(&hash).await {
210                    let _ = tx.send(result).await;
211                }
212            })
213        };
214
215        let hyperbolic_handle = {
216            let hash = *content_hash;
217            let tx = tx.clone();
218            let manager = self.clone_for_task();
219            tokio::spawn(async move {
220                if let Ok(result) = manager.hyperbolic_retrieve(&hash).await {
221                    let _ = tx.send(result).await;
222                }
223            })
224        };
225
226        let som_handle = {
227            let hash = *content_hash;
228            let tx = tx.clone();
229            let manager = self.clone_for_task();
230            tokio::spawn(async move {
231                if let Ok(result) = manager.som_broadcast_retrieve(&hash).await {
232                    let _ = tx.send(result).await;
233                }
234            })
235        };
236
237        // Drop original sender so channel closes when all tasks complete
238        drop(tx);
239
240        // Wait for first successful result
241        match timeout(self.timeout, rx.recv()).await {
242            Ok(Some(result)) => {
243                // Cancel other tasks
244                kademlia_handle.abort();
245                hyperbolic_handle.abort();
246                som_handle.abort();
247
248                Ok(result)
249            }
250            Ok(None) => Err(anyhow::anyhow!("All retrieval strategies failed")),
251            Err(_) => Err(anyhow::anyhow!("Retrieval timeout")),
252        }
253    }
254
255    /// Kademlia-based retrieval
256    async fn kademlia_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
257        let start_time = Instant::now();
258
259        // Get Kademlia routing strategy
260        let strategies = self.router.get_all_strategies();
261        let kademlia_strategy = strategies
262            .get("Kademlia")
263            .ok_or_else(|| anyhow::anyhow!("Kademlia strategy not available"))?;
264
265        // Find nodes storing this content (α=3 parallel as per spec)
266        let nodes = kademlia_strategy
267            .find_closest_nodes(content_hash, 3)
268            .await?;
269
270        // Query nodes in parallel
271        let mut futures = Vec::new();
272        for node in nodes {
273            let future = Box::pin(self.query_node_for_content(node, *content_hash));
274            futures.push(future);
275        }
276
277        // Wait for first successful response
278        if !futures.is_empty() {
279            let (result, _index, _remaining) = select_all(futures).await;
280            if let Ok((content, source_node)) = result {
281                return Ok(RetrievalResult {
282                    content,
283                    strategy: "Kademlia".to_string(),
284                    duration: start_time.elapsed(),
285                    source_node,
286                });
287            }
288        }
289
290        Err(anyhow::anyhow!("Kademlia retrieval failed"))
291    }
292
293    /// Hyperbolic greedy routing retrieval
294    async fn hyperbolic_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
295        let start_time = Instant::now();
296
297        // Get hyperbolic routing strategy
298        let strategies = self.router.get_all_strategies();
299        let hyperbolic_strategy = strategies
300            .get("Hyperbolic")
301            .ok_or_else(|| anyhow::anyhow!("Hyperbolic strategy not available"))?;
302
303        // Find path to content using greedy routing
304        let path = hyperbolic_strategy
305            .find_path(&NodeId {
306                hash: content_hash.0,
307            })
308            .await?;
309
310        // Query nodes along the path
311        for node in path {
312            if let Ok((content, source_node)) = self
313                .query_node_for_content(node.clone(), *content_hash)
314                .await
315            {
316                return Ok(RetrievalResult {
317                    content,
318                    strategy: "Hyperbolic".to_string(),
319                    duration: start_time.elapsed(),
320                    source_node,
321                });
322            }
323        }
324
325        Err(anyhow::anyhow!("Hyperbolic retrieval failed"))
326    }
327
328    /// SOM broadcast retrieval
329    async fn som_broadcast_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
330        let start_time = Instant::now();
331
332        // Get SOM routing strategy
333        let strategies = self.router.get_all_strategies();
334        let som_strategy = strategies
335            .get("SOM")
336            .ok_or_else(|| anyhow::anyhow!("SOM strategy not available"))?;
337
338        // Find nodes in the content's SOM region
339        let nodes = som_strategy.find_closest_nodes(content_hash, 10).await?;
340
341        // Broadcast to all nodes in parallel
342        let mut futures = Vec::new();
343        for node in nodes {
344            let future = Box::pin(self.query_node_for_content(node, *content_hash));
345            futures.push(future);
346        }
347
348        // Wait for first successful response
349        if !futures.is_empty() {
350            let (result, _index, _remaining) = select_all(futures).await;
351            if let Ok((content, source_node)) = result {
352                return Ok(RetrievalResult {
353                    content,
354                    strategy: "SOM".to_string(),
355                    duration: start_time.elapsed(),
356                    source_node,
357                });
358            }
359        }
360
361        Err(anyhow::anyhow!("SOM broadcast retrieval failed"))
362    }
363
364    /// Sequential retrieval (fallback mode)
365    async fn sequential_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
366        // Try strategies in order
367        if let Ok(result) = self.kademlia_retrieve(content_hash).await {
368            return Ok(result);
369        }
370
371        if let Ok(result) = self.hyperbolic_retrieve(content_hash).await {
372            return Ok(result);
373        }
374
375        if let Ok(result) = self.som_broadcast_retrieve(content_hash).await {
376            return Ok(result);
377        }
378
379        Err(anyhow::anyhow!("All retrieval strategies failed"))
380    }
381
382    /// Query a specific node for content
383    async fn query_node_for_content(
384        &self,
385        node: NodeId,
386        content_hash: ContentHash,
387    ) -> Result<(Vec<u8>, NodeId)> {
388        // In real implementation, this would:
389        // 1. Send GET_CONTENT message to node
390        // 2. Wait for response
391        // 3. Verify content hash matches
392        // 4. Return content
393
394        // For now, simulate with random success
395        if rand::random::<f64>() > 0.7 {
396            // Simulate content retrieval
397            let content = format!("Content for hash {content_hash:?}").into_bytes();
398            Ok((content, node))
399        } else {
400            Err(anyhow::anyhow!("Node does not have content"))
401        }
402    }
403
404    /// Update statistics for successful retrieval
405    async fn update_stats_success(&self, strategy: &str, duration: Duration) {
406        let mut stats = self.stats.write().await;
407        stats.total_retrievals += 1;
408        stats.successful_retrievals += 1;
409
410        *stats
411            .retrievals_by_strategy
412            .entry(strategy.to_string())
413            .or_insert(0) += 1;
414        *stats
415            .success_by_strategy
416            .entry(strategy.to_string())
417            .or_insert(0) += 1;
418
419        // Update average retrieval time
420        let current_avg = stats.avg_retrieval_time_ms;
421        let current_count = stats.successful_retrievals as f64;
422        stats.avg_retrieval_time_ms =
423            (current_avg * (current_count - 1.0) + duration.as_millis() as f64) / current_count;
424    }
425
426    /// Update statistics for failed retrieval
427    async fn update_stats_failure(&self) {
428        let mut stats = self.stats.write().await;
429        stats.total_retrievals += 1;
430        stats.failed_retrievals += 1;
431    }
432
433    /// Clone manager for spawning tasks
434    fn clone_for_task(&self) -> Self {
435        Self {
436            router: self.router.clone(),
437            content_store: self.content_store.clone(),
438            cache_manager: self.cache_manager.clone(),
439            stats: self.stats.clone(),
440            timeout: self.timeout,
441        }
442    }
443
444    /// Get retrieval statistics
445    pub async fn get_stats(&self) -> RetrievalStats {
446        self.stats.read().await.clone()
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::adaptive::{
454        hyperbolic::HyperbolicSpace,
455        som::{GridSize, SelfOrganizingMap, SomConfig},
456        trust::MockTrustProvider,
457    };
458    use tempfile::TempDir;
459
460    async fn create_test_retrieval_manager() -> RetrievalManager {
461        let trust_provider = Arc::new(MockTrustProvider::new());
462        let hyperbolic = Arc::new(HyperbolicSpace::new());
463        let som_config = SomConfig {
464            initial_learning_rate: 0.3,
465            initial_radius: 5.0,
466            iterations: 100,
467            grid_size: GridSize::Fixed(10, 10),
468        };
469        let som = Arc::new(SelfOrganizingMap::new(som_config));
470        let router = Arc::new(AdaptiveRouter::new(trust_provider, hyperbolic, som));
471
472        let temp_dir = TempDir::new().unwrap();
473        let storage_config = StorageConfig {
474            db_path: temp_dir.path().to_str().unwrap().to_string(),
475            ..Default::default()
476        };
477        let content_store = Arc::new(ContentStore::new(storage_config).await.unwrap());
478        let cache_manager = Arc::new(QLearnCacheManager::new(1024 * 1024));
479
480        RetrievalManager::new(router, content_store, cache_manager)
481    }
482
483    #[tokio::test]
484    async fn test_local_retrieval() {
485        let manager = create_test_retrieval_manager().await;
486        let content = b"Test content".to_vec();
487        let hash = ContentStore::calculate_hash(&content);
488
489        // Store content locally
490        let metadata = crate::adaptive::storage::ContentMetadata {
491            size: content.len(),
492            content_type: ContentType::DataRetrieval,
493            created_at: std::time::SystemTime::now()
494                .duration_since(std::time::UNIX_EPOCH)
495                .unwrap()
496                .as_secs(),
497            chunk_count: None,
498            replication_factor: 8,
499        };
500        manager
501            .content_store
502            .store(content.clone(), metadata)
503            .await
504            .unwrap();
505
506        // Retrieve should find it locally
507        let retrieved = manager
508            .retrieve(&hash, RetrievalStrategy::Parallel)
509            .await
510            .unwrap();
511        assert_eq!(retrieved, content);
512
513        // Check stats
514        let stats = manager.get_stats().await;
515        assert_eq!(stats.successful_retrievals, 1);
516        assert_eq!(stats.success_by_strategy.get("local"), Some(&1));
517    }
518
519    #[tokio::test]
520    async fn test_cache_retrieval() {
521        let manager = create_test_retrieval_manager().await;
522        let content = b"Cached content".to_vec();
523        let hash = ContentStore::calculate_hash(&content);
524
525        // Add to cache
526        manager
527            .cache_manager
528            .insert(hash.clone(), content.clone())
529            .await;
530
531        // Retrieve should find it in cache
532        let retrieved = manager
533            .retrieve(&hash, RetrievalStrategy::Parallel)
534            .await
535            .unwrap();
536        assert_eq!(retrieved, content);
537
538        // Check stats
539        let stats = manager.get_stats().await;
540        assert_eq!(stats.cache_hits, 1);
541    }
542
543    #[tokio::test]
544    async fn test_parallel_strategy() {
545        let manager = create_test_retrieval_manager().await;
546        let hash = ContentHash([42u8; 32]);
547
548        // Try parallel retrieval (will fail in test environment)
549        let result = manager.retrieve(&hash, RetrievalStrategy::Parallel).await;
550
551        // In test environment, this will likely fail
552        assert!(result.is_err());
553
554        // Check that attempts were made
555        let stats = manager.get_stats().await;
556        assert_eq!(stats.total_retrievals, 1);
557        assert_eq!(stats.failed_retrievals, 1);
558    }
559
560    #[tokio::test]
561    async fn test_strategy_selection() {
562        let manager = create_test_retrieval_manager().await;
563        let hash = ContentHash([42u8; 32]);
564
565        // Test different strategies
566        for strategy in [
567            RetrievalStrategy::Kademlia,
568            RetrievalStrategy::Hyperbolic,
569            RetrievalStrategy::SOMBroadcast,
570            RetrievalStrategy::Sequential,
571        ] {
572            let _ = manager.retrieve(&hash, strategy.clone()).await;
573        }
574
575        // Check that all strategies were attempted
576        let stats = manager.get_stats().await;
577        assert_eq!(stats.total_retrievals, 4);
578    }
579}