1use crate::dht::core_engine::NodeId;
21use crate::{P2PError, PeerId, Result};
22use rand::Rng;
23use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, VecDeque};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::RwLock;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct EmbeddingConfig {
32 pub dimensions: usize,
34 pub learning_rate: f64,
36 pub max_iterations: usize,
38 pub convergence_threshold: f64,
40 pub drift_threshold: f64,
42 pub refit_interval: Duration,
44 pub min_peers: usize,
46 pub temperature: f64,
48}
49
50impl Default for EmbeddingConfig {
51 fn default() -> Self {
52 Self {
53 dimensions: 2,
54 learning_rate: 0.1,
55 max_iterations: 1000,
56 convergence_threshold: 0.001,
57 drift_threshold: 0.15,
58 refit_interval: Duration::from_secs(300),
59 min_peers: 5,
60 temperature: 1.0,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct HyperbolicCoordinate {
68 pub r: f64,
70 pub theta: Vec<f64>,
72}
73
74impl HyperbolicCoordinate {
75 pub fn new(dimensions: usize) -> Self {
77 let mut rng = rand::thread_rng();
78 Self {
79 r: rng.gen_range(0.1..0.9),
80 theta: (0..dimensions - 1)
81 .map(|_| rng.gen_range(0.0..2.0 * std::f64::consts::PI))
82 .collect(),
83 }
84 }
85
86 pub fn distance(&self, other: &Self) -> f64 {
88 let r1 = self.r;
89 let r2 = other.r;
90
91 let mut cos_angle = 0.0;
93 for (t1, t2) in self.theta.iter().zip(other.theta.iter()) {
94 cos_angle += (t1 - t2).cos();
95 }
96 cos_angle /= self.theta.len() as f64;
97
98 let numerator = (r1 - r2).powi(2) + 4.0 * r1 * r2 * (1.0 - cos_angle);
100 let denominator = (1.0 - r1.powi(2)) * (1.0 - r2.powi(2));
101
102 if denominator <= 0.0 {
103 return f64::INFINITY;
104 }
105
106 let cosh_dist = 1.0 + numerator / denominator;
107 cosh_dist.acosh()
108 }
109
110 pub fn update(&mut self, gradient: &HyperbolicGradient, learning_rate: f64) {
112 self.r -= learning_rate * gradient.dr;
114 self.r = self.r.clamp(0.01, 0.99);
115
116 for (theta, dtheta) in self.theta.iter_mut().zip(gradient.dtheta.iter()) {
118 *theta -= learning_rate * dtheta;
119 while *theta < 0.0 {
121 *theta += 2.0 * std::f64::consts::PI;
122 }
123 while *theta >= 2.0 * std::f64::consts::PI {
124 *theta -= 2.0 * std::f64::consts::PI;
125 }
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
132pub struct HyperbolicGradient {
133 dr: f64,
134 dtheta: Vec<f64>,
135}
136
137#[derive(Debug, Clone)]
139pub struct NetworkSnapshot {
140 pub peers: Vec<PeerId>,
142 pub distances: HashMap<(PeerId, PeerId), f64>,
144 pub timestamp: Instant,
146}
147
148#[derive(Debug, Clone)]
150pub struct Embedding {
151 pub config: EmbeddingConfig,
153 pub coordinates: HashMap<PeerId, HyperbolicCoordinate>,
155 pub quality: EmbeddingQuality,
157 pub created_at: Instant,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct EmbeddingQuality {
164 pub mae: f64,
166 pub rmse: f64,
168 pub stress: f64,
170 pub iterations: usize,
172}
173
174pub struct HyperbolicGreedyRouter {
189 embedding: Arc<RwLock<Option<Embedding>>>,
191 config: EmbeddingConfig,
193 last_refit: Arc<RwLock<Instant>>,
195 drift_detector: Arc<RwLock<DriftDetector>>,
197 _local_id: PeerId,
199 metrics: Arc<RwLock<RoutingMetrics>>,
201}
202
203#[derive(Debug, Clone)]
205struct DriftDetector {
206 recent_errors: VecDeque<f64>,
208 max_samples: usize,
210 baseline_error: f64,
212}
213
214impl DriftDetector {
215 fn new(baseline_error: f64) -> Self {
216 Self {
217 recent_errors: VecDeque::new(),
218 max_samples: 100,
219 baseline_error,
220 }
221 }
222
223 fn add_error(&mut self, error: f64) {
224 if self.recent_errors.len() >= self.max_samples {
225 self.recent_errors.pop_front();
226 }
227 self.recent_errors.push_back(error);
228 }
229
230 fn detect_drift(&self, threshold: f64) -> bool {
231 if self.recent_errors.len() < 10 {
232 return false;
233 }
234
235 let avg_error: f64 =
236 self.recent_errors.iter().sum::<f64>() / self.recent_errors.len() as f64;
237 let drift_ratio = (avg_error - self.baseline_error).abs() / self.baseline_error;
238 drift_ratio > threshold
239 }
240}
241
242#[derive(Debug, Clone, Default)]
244pub struct RoutingMetrics {
245 greedy_success: usize,
247 greedy_failures: usize,
249 _total_stretch: f64,
251 _stretch_count: usize,
253}
254
255impl RoutingMetrics {
256 pub fn greedy_success(&self) -> usize {
257 self.greedy_success
258 }
259 pub fn greedy_failures(&self) -> usize {
260 self.greedy_failures
261 }
262}
263
264impl HyperbolicGreedyRouter {
265 pub fn new(local_id: PeerId) -> Self {
267 Self {
268 embedding: Arc::new(RwLock::new(None)),
269 config: EmbeddingConfig::default(),
270 last_refit: Arc::new(RwLock::new(Instant::now())),
271 drift_detector: Arc::new(RwLock::new(DriftDetector::new(0.1))),
272 _local_id: local_id,
273 metrics: Arc::new(RwLock::new(RoutingMetrics::default())),
274 }
275 }
276
277 pub async fn set_embedding(&self, embedding: Embedding) {
279 *self.embedding.write().await = Some(embedding);
280 }
281
282 pub fn set_config(&mut self, config: EmbeddingConfig) {
284 self.config = config;
285 }
286
287 pub async fn embed_snapshot(&self, peers: &[PeerId]) -> Result<Embedding> {
289 if peers.len() < self.config.min_peers {
290 return Err(P2PError::ResourceExhausted(
291 format!(
292 "Insufficient peers for embedding: required {}, available {}",
293 self.config.min_peers,
294 peers.len()
295 )
296 .into(),
297 ));
298 }
299
300 let mut distances = HashMap::new();
302 for i in 0..peers.len() {
303 for j in i + 1..peers.len() {
304 let dist = self.measure_distance(&peers[i], &peers[j]).await?;
306 distances.insert((peers[i].clone(), peers[j].clone()), dist);
307 distances.insert((peers[j].clone(), peers[i].clone()), dist);
308 }
309 }
310
311 let snapshot = NetworkSnapshot {
312 peers: peers.to_vec(),
313 distances,
314 timestamp: Instant::now(),
315 };
316
317 self.optimize_embedding(snapshot).await
319 }
320
321 async fn measure_distance(&self, _peer1: &PeerId, _peer2: &PeerId) -> Result<f64> {
323 Ok(rand::thread_rng().gen_range(1.0..10.0))
326 }
327
328 async fn optimize_embedding(&self, snapshot: NetworkSnapshot) -> Result<Embedding> {
330 let mut coordinates = HashMap::new();
331
332 for peer in &snapshot.peers {
334 coordinates.insert(
335 peer.clone(),
336 HyperbolicCoordinate::new(self.config.dimensions),
337 );
338 }
339
340 let mut best_quality = EmbeddingQuality {
341 mae: f64::INFINITY,
342 rmse: f64::INFINITY,
343 stress: f64::INFINITY,
344 iterations: 0,
345 };
346
347 for iteration in 0..self.config.max_iterations {
349 let mut total_gradient = HashMap::new();
350 let mut total_error = 0.0;
351 let mut error_count = 0;
352
353 for (peer1, coord1) in &coordinates {
355 let mut gradient = HyperbolicGradient {
356 dr: 0.0,
357 dtheta: vec![0.0; self.config.dimensions - 1],
358 };
359
360 for (peer2, coord2) in &coordinates {
361 if peer1 == peer2 {
362 continue;
363 }
364
365 let embedded_dist = coord1.distance(coord2);
366 let observed_dist = snapshot
367 .distances
368 .get(&(peer1.clone(), peer2.clone()))
369 .copied()
370 .unwrap_or(5.0);
371
372 let error = embedded_dist - observed_dist;
373 total_error += error.abs();
374 error_count += 1;
375
376 let grad_factor = error * 2.0 / (error_count as f64);
378
379 let dr_contrib = grad_factor * (coord1.r - coord2.r) / embedded_dist.max(0.001);
381 gradient.dr += dr_contrib;
382
383 for (i, (t1, t2)) in coord1.theta.iter().zip(coord2.theta.iter()).enumerate() {
385 let dtheta_contrib =
386 grad_factor * (t1 - t2).sin() / embedded_dist.max(0.001);
387 gradient.dtheta[i] += dtheta_contrib;
388 }
389 }
390
391 total_gradient.insert(peer1.clone(), gradient);
392 }
393
394 for (peer, gradient) in total_gradient {
396 if let Some(coord) = coordinates.get_mut(&peer) {
397 coord.update(&gradient, self.config.learning_rate);
398 }
399 }
400
401 let mae = total_error / error_count.max(1) as f64;
403 let quality = EmbeddingQuality {
404 mae,
405 rmse: (total_error.powi(2) / error_count.max(1) as f64).sqrt(),
406 stress: total_error.powi(2),
407 iterations: iteration + 1,
408 };
409
410 if quality.mae < best_quality.mae {
412 best_quality = quality.clone();
413 if best_quality.mae < self.config.convergence_threshold {
414 break;
415 }
416 } else if iteration > 100 && best_quality.mae < quality.mae * 1.1 {
417 break;
419 }
420 }
421
422 Ok(Embedding {
423 config: self.config.clone(),
424 coordinates,
425 quality: best_quality,
426 created_at: Instant::now(),
427 })
428 }
429
430 pub async fn greedy_next(
432 &self,
433 target: NodeId,
434 here: PeerId,
435 emb: &Embedding,
436 ) -> Option<PeerId> {
437 let here_coord = emb.coordinates.get(&here)?;
439
440 let target_peer = node_id_to_peer_id(&target);
442 let target_coord = emb
443 .coordinates
444 .get(&target_peer)
445 .or_else(|| emb.coordinates.values().next());
446
447 if let Some(target_coord) = target_coord {
448 let current_dist = here_coord.distance(target_coord);
450
451 let mut best_neighbor = None;
453 let mut best_dist = current_dist;
454
455 for (peer_id, peer_coord) in &emb.coordinates {
456 if peer_id == &here {
457 continue;
458 }
459
460 let dist = peer_coord.distance(target_coord);
461 if dist < best_dist {
462 best_dist = dist;
463 best_neighbor = Some(peer_id.clone());
464 }
465 }
466
467 let chosen =
469 best_neighbor.or_else(|| emb.coordinates.keys().find(|p| *p != &here).cloned());
470 if let Some(peer) = chosen {
471 let mut metrics = self.metrics.write().await;
472 metrics.greedy_success += 1;
473 return Some(peer);
474 }
475 }
476
477 let mut metrics = self.metrics.write().await;
479 metrics.greedy_failures += 1;
480 None
481 }
482
483 pub async fn detect_drift(&self, observed_error: f64) -> bool {
485 let mut detector = self.drift_detector.write().await;
486 detector.add_error(observed_error);
487 detector.detect_drift(self.config.drift_threshold)
488 }
489
490 pub async fn partial_refit(&self, new_peers: &[PeerId]) -> Result<()> {
492 let mut embedding_guard = self.embedding.write().await;
493
494 if let Some(current_embedding) = embedding_guard.as_mut() {
495 for peer in new_peers {
497 if !current_embedding.coordinates.contains_key(peer) {
498 current_embedding.coordinates.insert(
499 peer.clone(),
500 HyperbolicCoordinate::new(self.config.dimensions),
501 );
502 }
503 }
504
505 let max_refit_iterations = self.config.max_iterations / 5;
507 for _ in 0..max_refit_iterations {
508 for new_peer in new_peers {
510 if let Some(coord) = current_embedding.coordinates.get_mut(new_peer) {
511 coord.r += rand::thread_rng().gen_range(-0.01..0.01);
513 coord.r = coord.r.clamp(0.01, 0.99);
514 }
515 }
516 }
517
518 *self.last_refit.write().await = Instant::now();
520 }
521
522 Ok(())
523 }
524
525 pub async fn get_metrics(&self) -> RoutingMetrics {
527 self.metrics.read().await.clone()
528 }
529}
530
531fn node_id_to_peer_id(node_id: &NodeId) -> PeerId {
533 hex::encode(node_id.as_bytes())
535}
536
537pub async fn embed_snapshot(peers: &[PeerId]) -> Result<Embedding> {
544 let local_id = if !peers.is_empty() {
546 peers[0].clone()
547 } else {
548 format!("peer_{}", rand::random::<u64>())
550 };
551
552 let router = HyperbolicGreedyRouter::new(local_id);
553 router.embed_snapshot(peers).await
554}
555
556pub async fn greedy_next(target: NodeId, here: PeerId, emb: &Embedding) -> Option<PeerId> {
561 let here_coord = emb.coordinates.get(&here)?;
563
564 let target_peer = node_id_to_peer_id(&target);
566 let target_coord = emb.coordinates.get(&target_peer).or_else(|| {
567 emb.coordinates.values().next()
569 });
570
571 if let Some(target_coord) = target_coord {
572 let current_dist = here_coord.distance(target_coord);
574
575 let mut best_neighbor = None;
577 let mut best_dist = current_dist;
578
579 for (peer_id, peer_coord) in &emb.coordinates {
580 if peer_id == &here {
581 continue;
582 }
583
584 let dist = peer_coord.distance(target_coord);
585 if dist < best_dist {
586 best_dist = dist;
587 best_neighbor = Some(peer_id.clone());
588 }
589 }
590 if best_neighbor.is_none() {
592 best_neighbor = emb.coordinates.keys().find(|p| *p != &here).cloned();
593 }
594 return best_neighbor;
595 }
596
597 None
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn test_hyperbolic_distance() {
607 let coord1 = HyperbolicCoordinate {
608 r: 0.5,
609 theta: vec![0.0],
610 };
611 let coord2 = HyperbolicCoordinate {
612 r: 0.7,
613 theta: vec![std::f64::consts::PI],
614 };
615
616 let dist = coord1.distance(&coord2);
617 assert!(dist > 0.0);
618 assert!(dist.is_finite());
619 }
620
621 #[test]
622 fn test_coordinate_update() {
623 let mut coord = HyperbolicCoordinate::new(2);
624 let gradient = HyperbolicGradient {
625 dr: 0.1,
626 dtheta: vec![0.05],
627 };
628
629 let old_r = coord.r;
630 coord.update(&gradient, 0.1);
631
632 assert_ne!(coord.r, old_r);
633 assert!(coord.r >= 0.01 && coord.r <= 0.99);
634 }
635
636 #[tokio::test]
637 async fn test_embedding_creation() {
638 let local_id = format!("test_peer_{}", rand::random::<u64>());
639
640 let router = HyperbolicGreedyRouter::new(local_id);
641
642 let peers: Vec<PeerId> = (0..10).map(|i| format!("peer_{}", i)).collect();
643 let embedding = router.embed_snapshot(&peers).await;
644
645 assert!(embedding.is_ok());
646 let emb = embedding.unwrap();
647 assert_eq!(emb.coordinates.len(), peers.len());
648 assert!(emb.quality.mae < f64::INFINITY);
649 }
650
651 #[tokio::test]
652 async fn test_drift_detection() {
653 let detector = DriftDetector::new(1.0);
654 let mut detector = detector;
655
656 for _ in 0..20 {
658 detector.add_error(1.05);
659 }
660 assert!(!detector.detect_drift(0.15));
661
662 for _ in 0..20 {
664 detector.add_error(2.0);
665 }
666 assert!(detector.detect_drift(0.15));
667 }
668
669 #[tokio::test]
670 async fn test_greedy_routing() {
671 let local_id = format!("test_peer_{}", rand::random::<u64>());
672
673 let router = HyperbolicGreedyRouter::new(local_id.clone());
674
675 let mut coordinates = HashMap::new();
677 let peer1 = format!("peer1_{}", rand::random::<u64>());
678 let peer2 = format!("peer2_{}", rand::random::<u64>());
679 let target_peer = format!("target_{}", rand::random::<u64>());
680
681 coordinates.insert(local_id.clone(), HyperbolicCoordinate::new(2));
682 coordinates.insert(peer1.clone(), HyperbolicCoordinate::new(2));
683 coordinates.insert(peer2.clone(), HyperbolicCoordinate::new(2));
684 coordinates.insert(target_peer.clone(), HyperbolicCoordinate::new(2));
685
686 let embedding = Embedding {
687 config: EmbeddingConfig::default(),
688 coordinates,
689 quality: EmbeddingQuality {
690 mae: 0.1,
691 rmse: 0.15,
692 stress: 0.2,
693 iterations: 100,
694 },
695 created_at: Instant::now(),
696 };
697
698 let mut node_id_bytes = [0u8; 32];
700 let target_bytes = target_peer.as_bytes();
701 let len = target_bytes.len().min(32);
702 node_id_bytes[..len].copy_from_slice(&target_bytes[..len]);
703 let target = NodeId::from_bytes(node_id_bytes);
704 let next = router.greedy_next(target, local_id, &embedding).await;
705
706 assert!(next.is_some());
707 }
708
709 #[tokio::test]
710 async fn test_partial_refit() {
711 let local_id = format!("test_peer_{}", rand::random::<u64>());
712
713 let router = HyperbolicGreedyRouter::new(local_id);
714
715 let initial_peers: Vec<PeerId> = (0..5).map(|i| format!("initial_{}", i)).collect();
717 let embedding = router.embed_snapshot(&initial_peers).await.unwrap();
718
719 *router.embedding.write().await = Some(embedding);
720
721 let new_peers: Vec<PeerId> = (0..3).map(|i| format!("new_{}", i)).collect();
723 let result = router.partial_refit(&new_peers).await;
724
725 assert!(result.is_ok());
726
727 let embedding = router.embedding.read().await;
728 let emb = embedding.as_ref().unwrap();
729 assert_eq!(emb.coordinates.len(), initial_peers.len() + new_peers.len());
730 }
731}