1use super::*;
23use crate::adaptive::{
24 ContentType, learning::QLearnCacheManager, routing::AdaptiveRouter, storage::ContentStore,
25};
26use anyhow::Result;
27use futures::future::select_all;
28use std::{
29 collections::HashMap,
30 sync::Arc,
31 time::{Duration, Instant},
32};
33use tokio::{
34 sync::{RwLock, mpsc},
35 time::timeout,
36};
37
38pub struct RetrievalManager {
40 router: Arc<AdaptiveRouter>,
42
43 content_store: Arc<ContentStore>,
45
46 cache_manager: Arc<QLearnCacheManager>,
48
49 stats: Arc<RwLock<RetrievalStats>>,
51
52 timeout: Duration,
54}
55
56#[derive(Debug, Clone, PartialEq)]
58pub enum RetrievalStrategy {
59 Parallel,
61
62 Kademlia,
64
65 Hyperbolic,
67
68 SOMBroadcast,
70
71 Sequential,
73}
74
75#[derive(Debug, Default, Clone)]
77pub struct RetrievalStats {
78 pub total_retrievals: u64,
80
81 pub successful_retrievals: u64,
83
84 pub failed_retrievals: u64,
86
87 pub retrievals_by_strategy: HashMap<String, u64>,
89
90 pub success_by_strategy: HashMap<String, u64>,
92
93 pub avg_retrieval_time_ms: f64,
95
96 pub cache_hits: u64,
98}
99
100#[derive(Debug)]
102struct RetrievalResult {
103 content: Vec<u8>,
105
106 strategy: String,
108
109 duration: Duration,
111
112 source_node: NodeId,
114}
115
116impl RetrievalManager {
117 pub fn new(
119 router: Arc<AdaptiveRouter>,
120 content_store: Arc<ContentStore>,
121 cache_manager: Arc<QLearnCacheManager>,
122 ) -> Self {
123 Self {
124 router,
125 content_store,
126 cache_manager,
127 stats: Arc::new(RwLock::new(RetrievalStats::default())),
128 timeout: Duration::from_secs(5),
129 }
130 }
131
132 pub async fn retrieve(
134 &self,
135 content_hash: &ContentHash,
136 strategy: RetrievalStrategy,
137 ) -> Result<Vec<u8>> {
138 let start_time = Instant::now();
139
140 if let Some(content) = self.content_store.retrieve(content_hash).await? {
142 self.update_stats_success("local", start_time.elapsed())
143 .await;
144 return Ok(content);
145 }
146
147 if let Some(content) = self.cache_manager.get(content_hash).await {
149 let mut stats = self.stats.write().await;
150 stats.cache_hits += 1;
151 drop(stats);
152 self.update_stats_success("cache", start_time.elapsed())
153 .await;
154 return Ok(content);
155 }
156
157 let result = match strategy {
159 RetrievalStrategy::Parallel => self.parallel_retrieve(content_hash).await,
160 RetrievalStrategy::Kademlia => self.kademlia_retrieve(content_hash).await,
161 RetrievalStrategy::Hyperbolic => self.hyperbolic_retrieve(content_hash).await,
162 RetrievalStrategy::SOMBroadcast => self.som_broadcast_retrieve(content_hash).await,
163 RetrievalStrategy::Sequential => self.sequential_retrieve(content_hash).await,
164 };
165
166 match result {
167 Ok(retrieval_result) => {
168 self.update_stats_success(&retrieval_result.strategy, retrieval_result.duration)
170 .await;
171
172 self.cache_manager
174 .decide_caching(
175 *content_hash,
176 retrieval_result.content.clone(),
177 ContentType::DataRetrieval,
178 )
179 .await?;
180
181 self.router
183 .update_statistics(
184 &retrieval_result.source_node,
185 true,
186 retrieval_result.duration.as_millis() as u64,
187 )
188 .await;
189
190 Ok(retrieval_result.content)
191 }
192 Err(e) => {
193 self.update_stats_failure().await;
194 Err(e)
195 }
196 }
197 }
198
199 async fn parallel_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
201 let (tx, mut rx) = mpsc::channel(3);
202
203 let kademlia_handle = {
205 let hash = *content_hash;
206 let tx = tx.clone();
207 let manager = self.clone_for_task();
208 tokio::spawn(async move {
209 if let Ok(result) = manager.kademlia_retrieve(&hash).await {
210 let _ = tx.send(result).await;
211 }
212 })
213 };
214
215 let hyperbolic_handle = {
216 let hash = *content_hash;
217 let tx = tx.clone();
218 let manager = self.clone_for_task();
219 tokio::spawn(async move {
220 if let Ok(result) = manager.hyperbolic_retrieve(&hash).await {
221 let _ = tx.send(result).await;
222 }
223 })
224 };
225
226 let som_handle = {
227 let hash = *content_hash;
228 let tx = tx.clone();
229 let manager = self.clone_for_task();
230 tokio::spawn(async move {
231 if let Ok(result) = manager.som_broadcast_retrieve(&hash).await {
232 let _ = tx.send(result).await;
233 }
234 })
235 };
236
237 drop(tx);
239
240 match timeout(self.timeout, rx.recv()).await {
242 Ok(Some(result)) => {
243 kademlia_handle.abort();
245 hyperbolic_handle.abort();
246 som_handle.abort();
247
248 Ok(result)
249 }
250 Ok(None) => Err(anyhow::anyhow!("All retrieval strategies failed")),
251 Err(_) => Err(anyhow::anyhow!("Retrieval timeout")),
252 }
253 }
254
255 async fn kademlia_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
257 let start_time = Instant::now();
258
259 let strategies = self.router.get_all_strategies();
261 let kademlia_strategy = strategies
262 .get("Kademlia")
263 .ok_or_else(|| anyhow::anyhow!("Kademlia strategy not available"))?;
264
265 let nodes = kademlia_strategy
267 .find_closest_nodes(content_hash, 3)
268 .await?;
269
270 let mut futures = Vec::new();
272 for node in nodes {
273 let future = Box::pin(self.query_node_for_content(node, *content_hash));
274 futures.push(future);
275 }
276
277 if !futures.is_empty() {
279 let (result, _index, _remaining) = select_all(futures).await;
280 if let Ok((content, source_node)) = result {
281 return Ok(RetrievalResult {
282 content,
283 strategy: "Kademlia".to_string(),
284 duration: start_time.elapsed(),
285 source_node,
286 });
287 }
288 }
289
290 Err(anyhow::anyhow!("Kademlia retrieval failed"))
291 }
292
293 async fn hyperbolic_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
295 let start_time = Instant::now();
296
297 let strategies = self.router.get_all_strategies();
299 let hyperbolic_strategy = strategies
300 .get("Hyperbolic")
301 .ok_or_else(|| anyhow::anyhow!("Hyperbolic strategy not available"))?;
302
303 let path = hyperbolic_strategy
305 .find_path(&NodeId {
306 hash: content_hash.0,
307 })
308 .await?;
309
310 for node in path {
312 if let Ok((content, source_node)) = self
313 .query_node_for_content(node.clone(), *content_hash)
314 .await
315 {
316 return Ok(RetrievalResult {
317 content,
318 strategy: "Hyperbolic".to_string(),
319 duration: start_time.elapsed(),
320 source_node,
321 });
322 }
323 }
324
325 Err(anyhow::anyhow!("Hyperbolic retrieval failed"))
326 }
327
328 async fn som_broadcast_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
330 let start_time = Instant::now();
331
332 let strategies = self.router.get_all_strategies();
334 let som_strategy = strategies
335 .get("SOM")
336 .ok_or_else(|| anyhow::anyhow!("SOM strategy not available"))?;
337
338 let nodes = som_strategy.find_closest_nodes(content_hash, 10).await?;
340
341 let mut futures = Vec::new();
343 for node in nodes {
344 let future = Box::pin(self.query_node_for_content(node, *content_hash));
345 futures.push(future);
346 }
347
348 if !futures.is_empty() {
350 let (result, _index, _remaining) = select_all(futures).await;
351 if let Ok((content, source_node)) = result {
352 return Ok(RetrievalResult {
353 content,
354 strategy: "SOM".to_string(),
355 duration: start_time.elapsed(),
356 source_node,
357 });
358 }
359 }
360
361 Err(anyhow::anyhow!("SOM broadcast retrieval failed"))
362 }
363
364 async fn sequential_retrieve(&self, content_hash: &ContentHash) -> Result<RetrievalResult> {
366 if let Ok(result) = self.kademlia_retrieve(content_hash).await {
368 return Ok(result);
369 }
370
371 if let Ok(result) = self.hyperbolic_retrieve(content_hash).await {
372 return Ok(result);
373 }
374
375 if let Ok(result) = self.som_broadcast_retrieve(content_hash).await {
376 return Ok(result);
377 }
378
379 Err(anyhow::anyhow!("All retrieval strategies failed"))
380 }
381
382 async fn query_node_for_content(
384 &self,
385 node: NodeId,
386 content_hash: ContentHash,
387 ) -> Result<(Vec<u8>, NodeId)> {
388 if rand::random::<f64>() > 0.7 {
396 let content = format!("Content for hash {content_hash:?}").into_bytes();
398 Ok((content, node))
399 } else {
400 Err(anyhow::anyhow!("Node does not have content"))
401 }
402 }
403
404 async fn update_stats_success(&self, strategy: &str, duration: Duration) {
406 let mut stats = self.stats.write().await;
407 stats.total_retrievals += 1;
408 stats.successful_retrievals += 1;
409
410 *stats
411 .retrievals_by_strategy
412 .entry(strategy.to_string())
413 .or_insert(0) += 1;
414 *stats
415 .success_by_strategy
416 .entry(strategy.to_string())
417 .or_insert(0) += 1;
418
419 let current_avg = stats.avg_retrieval_time_ms;
421 let current_count = stats.successful_retrievals as f64;
422 stats.avg_retrieval_time_ms =
423 (current_avg * (current_count - 1.0) + duration.as_millis() as f64) / current_count;
424 }
425
426 async fn update_stats_failure(&self) {
428 let mut stats = self.stats.write().await;
429 stats.total_retrievals += 1;
430 stats.failed_retrievals += 1;
431 }
432
433 fn clone_for_task(&self) -> Self {
435 Self {
436 router: self.router.clone(),
437 content_store: self.content_store.clone(),
438 cache_manager: self.cache_manager.clone(),
439 stats: self.stats.clone(),
440 timeout: self.timeout,
441 }
442 }
443
444 pub async fn get_stats(&self) -> RetrievalStats {
446 self.stats.read().await.clone()
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::adaptive::{
454 hyperbolic::HyperbolicSpace,
455 som::{GridSize, SelfOrganizingMap, SomConfig},
456 trust::MockTrustProvider,
457 };
458 use tempfile::TempDir;
459
460 async fn create_test_retrieval_manager() -> RetrievalManager {
461 let trust_provider = Arc::new(MockTrustProvider::new());
462 let hyperbolic = Arc::new(HyperbolicSpace::new());
463 let som_config = SomConfig {
464 initial_learning_rate: 0.3,
465 initial_radius: 5.0,
466 iterations: 100,
467 grid_size: GridSize::Fixed(10, 10),
468 };
469 let som = Arc::new(SelfOrganizingMap::new(som_config));
470 let router = Arc::new(AdaptiveRouter::new(trust_provider, hyperbolic, som));
471
472 let temp_dir = TempDir::new().unwrap();
473 let storage_config = StorageConfig {
474 db_path: temp_dir.path().to_str().unwrap().to_string(),
475 ..Default::default()
476 };
477 let content_store = Arc::new(ContentStore::new(storage_config).await.unwrap());
478 let cache_manager = Arc::new(QLearnCacheManager::new(1024 * 1024));
479
480 RetrievalManager::new(router, content_store, cache_manager)
481 }
482
483 #[tokio::test]
484 async fn test_local_retrieval() {
485 let manager = create_test_retrieval_manager().await;
486 let content = b"Test content".to_vec();
487 let hash = ContentStore::calculate_hash(&content);
488
489 let metadata = crate::adaptive::storage::ContentMetadata {
491 size: content.len(),
492 content_type: ContentType::DataRetrieval,
493 created_at: std::time::SystemTime::now()
494 .duration_since(std::time::UNIX_EPOCH)
495 .unwrap()
496 .as_secs(),
497 chunk_count: None,
498 replication_factor: 8,
499 };
500 manager
501 .content_store
502 .store(content.clone(), metadata)
503 .await
504 .unwrap();
505
506 let retrieved = manager
508 .retrieve(&hash, RetrievalStrategy::Parallel)
509 .await
510 .unwrap();
511 assert_eq!(retrieved, content);
512
513 let stats = manager.get_stats().await;
515 assert_eq!(stats.successful_retrievals, 1);
516 assert_eq!(stats.success_by_strategy.get("local"), Some(&1));
517 }
518
519 #[tokio::test]
520 async fn test_cache_retrieval() {
521 let manager = create_test_retrieval_manager().await;
522 let content = b"Cached content".to_vec();
523 let hash = ContentStore::calculate_hash(&content);
524
525 manager
527 .cache_manager
528 .insert(hash.clone(), content.clone())
529 .await;
530
531 let retrieved = manager
533 .retrieve(&hash, RetrievalStrategy::Parallel)
534 .await
535 .unwrap();
536 assert_eq!(retrieved, content);
537
538 let stats = manager.get_stats().await;
540 assert_eq!(stats.cache_hits, 1);
541 }
542
543 #[tokio::test]
544 async fn test_parallel_strategy() {
545 let manager = create_test_retrieval_manager().await;
546 let hash = ContentHash([42u8; 32]);
547
548 let result = manager.retrieve(&hash, RetrievalStrategy::Parallel).await;
550
551 assert!(result.is_err());
553
554 let stats = manager.get_stats().await;
556 assert_eq!(stats.total_retrievals, 1);
557 assert_eq!(stats.failed_retrievals, 1);
558 }
559
560 #[tokio::test]
561 async fn test_strategy_selection() {
562 let manager = create_test_retrieval_manager().await;
563 let hash = ContentHash([42u8; 32]);
564
565 for strategy in [
567 RetrievalStrategy::Kademlia,
568 RetrievalStrategy::Hyperbolic,
569 RetrievalStrategy::SOMBroadcast,
570 RetrievalStrategy::Sequential,
571 ] {
572 let _ = manager.retrieve(&hash, strategy.clone()).await;
573 }
574
575 let stats = manager.get_stats().await;
577 assert_eq!(stats.total_retrievals, 4);
578 }
579}