saorsa_core/bootstrap/
mod.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Bootstrap Cache System
15//!
16//! Provides decentralized peer discovery through local caching of known contacts.
17//! Eliminates dependency on central bootstrap servers by maintaining a high-quality
18//! cache of up to 30,000 peer contacts with automatic conflict resolution for
19//! multiple concurrent instances.
20
21pub mod cache;
22pub mod contact;
23pub mod discovery;
24pub mod merge;
25
26pub use cache::{BootstrapCache, CacheConfig, CacheError};
27pub use contact::{
28    ContactEntry, QualityCalculator, QualityMetrics, QuicConnectionType, QuicContactInfo,
29    QuicQualityMetrics,
30};
31pub use discovery::{BootstrapConfig, BootstrapDiscovery, ConfigurableBootstrapDiscovery};
32pub use merge::{MergeCoordinator, MergeResult};
33// Use real four-word-networking crate types behind a thin facade
34pub use four_word_networking as fourwords;
35use four_word_networking::FourWordAdaptiveEncoder;
36
37/// Minimal facade around external four-word types
38#[derive(Debug, Clone)]
39pub struct FourWordAddress(pub String);
40
41impl FourWordAddress {
42    pub fn from_string(s: &str) -> Result<Self> {
43        let parts: Vec<&str> = s.split(['.', '-']).collect();
44        if parts.len() != 4 {
45            return Err(P2PError::Bootstrap(
46                crate::error::BootstrapError::InvalidData(
47                    "Four-word address must have exactly 4 words"
48                        .to_string()
49                        .into(),
50                ),
51            ));
52        }
53        Ok(FourWordAddress(parts.join("-")))
54    }
55
56    pub fn validate(&self, _encoder: &WordEncoder) -> bool {
57        let parts: Vec<&str> = self.0.split(['.', '-']).collect();
58        parts.len() == 4 && parts.iter().all(|part| !part.is_empty())
59    }
60}
61
62#[derive(Debug, Clone)]
63pub struct WordDictionary;
64
65#[derive(Debug, Clone)]
66pub struct WordEncoder;
67
68impl Default for WordEncoder {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl WordEncoder {
75    pub fn new() -> Self {
76        Self
77    }
78
79    pub fn encode_multiaddr_string(&self, multiaddr: &str) -> Result<FourWordAddress> {
80        // Map multiaddr to IPv4:port if possible, else hash deterministically
81        let socket_addr: std::net::SocketAddr = multiaddr.parse().map_err(|e| {
82            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
83                format!("{e}").into(),
84            ))
85        })?;
86        self.encode_socket_addr(&socket_addr)
87    }
88
89    pub fn decode_to_socket_addr(&self, words: &FourWordAddress) -> Result<std::net::SocketAddr> {
90        let encoder = FourWordAdaptiveEncoder::new().map_err(|e| {
91            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
92                format!("Encoder init failed: {e}").into(),
93            ))
94        })?;
95        // Accept hyphens, spaces or dots; normalize then call adaptive decoder
96        let normalized = words.0.replace(' ', "-");
97        let decoded = encoder.decode(&normalized).map_err(|e| {
98            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
99                format!("Failed to decode four-word address: {e}").into(),
100            ))
101        })?;
102        decoded.parse::<std::net::SocketAddr>().map_err(|_| {
103            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
104                "Decoded address missing port".to_string().into(),
105            ))
106        })
107    }
108
109    pub fn encode_socket_addr(&self, addr: &std::net::SocketAddr) -> Result<FourWordAddress> {
110        let encoder = FourWordAdaptiveEncoder::new().map_err(|e| {
111            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
112                format!("Encoder init failed: {e}").into(),
113            ))
114        })?;
115        let encoded = encoder.encode(&addr.to_string()).map_err(|e| {
116            P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
117                format!("{e}").into(),
118            ))
119        })?;
120        Ok(FourWordAddress(encoded.replace(' ', "-")))
121    }
122}
123
124use crate::error::BootstrapError;
125use crate::rate_limit::{JoinRateLimiter, JoinRateLimiterConfig};
126use crate::security::{IPDiversityConfig, IPDiversityEnforcer};
127use crate::{P2PError, PeerId, Result};
128use std::net::{IpAddr, Ipv6Addr};
129use std::path::PathBuf;
130use std::time::Duration;
131
132/// Default cache configuration
133pub const DEFAULT_MAX_CONTACTS: usize = 30_000;
134/// Default directory for storing bootstrap cache files
135pub const DEFAULT_CACHE_DIR: &str = ".cache/p2p_foundation";
136/// Default interval for merging instance cache files
137pub const DEFAULT_MERGE_INTERVAL: Duration = Duration::from_secs(30);
138/// Default interval for cleaning up stale contacts (1 hour)
139pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);
140/// Default interval for updating contact quality scores (5 minutes)
141pub const DEFAULT_QUALITY_UPDATE_INTERVAL: Duration = Duration::from_secs(300);
142
143/// Bootstrap cache initialization and management
144pub struct BootstrapManager {
145    cache: BootstrapCache,
146    merge_coordinator: MergeCoordinator,
147    word_encoder: WordEncoder,
148    /// Join rate limiter for Sybil attack protection
149    join_limiter: JoinRateLimiter,
150    /// IP diversity enforcer for geographic and ASN diversity
151    diversity_enforcer: IPDiversityEnforcer,
152}
153
154impl BootstrapManager {
155    /// Create a new bootstrap manager with default configuration
156    pub async fn new() -> Result<Self> {
157        let cache_dir = home_cache_dir()?;
158        let config = CacheConfig::default();
159
160        let cache = BootstrapCache::new(cache_dir.clone(), config).await?;
161        let merge_coordinator = MergeCoordinator::new(cache_dir)?;
162        let word_encoder = WordEncoder::new();
163        let join_limiter = JoinRateLimiter::new(JoinRateLimiterConfig::default());
164        let diversity_enforcer = IPDiversityEnforcer::new(IPDiversityConfig::default());
165
166        Ok(Self {
167            cache,
168            merge_coordinator,
169            word_encoder,
170            join_limiter,
171            diversity_enforcer,
172        })
173    }
174
175    /// Create a new bootstrap manager with custom configuration
176    pub async fn with_config(config: CacheConfig) -> Result<Self> {
177        let cache_dir = config.cache_dir.clone();
178
179        let cache = BootstrapCache::new(cache_dir.clone(), config).await?;
180        let merge_coordinator = MergeCoordinator::new(cache_dir)?;
181        let word_encoder = WordEncoder::new();
182        let join_limiter = JoinRateLimiter::new(JoinRateLimiterConfig::default());
183        let diversity_enforcer = IPDiversityEnforcer::new(IPDiversityConfig::default());
184
185        Ok(Self {
186            cache,
187            merge_coordinator,
188            word_encoder,
189            join_limiter,
190            diversity_enforcer,
191        })
192    }
193
194    /// Create a new bootstrap manager with custom configuration and rate limiting
195    pub async fn with_rate_limiting(
196        config: CacheConfig,
197        rate_limit_config: JoinRateLimiterConfig,
198    ) -> Result<Self> {
199        let cache_dir = config.cache_dir.clone();
200
201        let cache = BootstrapCache::new(cache_dir.clone(), config).await?;
202        let merge_coordinator = MergeCoordinator::new(cache_dir)?;
203        let word_encoder = WordEncoder::new();
204        let join_limiter = JoinRateLimiter::new(rate_limit_config);
205        let diversity_enforcer = IPDiversityEnforcer::new(IPDiversityConfig::default());
206
207        Ok(Self {
208            cache,
209            merge_coordinator,
210            word_encoder,
211            join_limiter,
212            diversity_enforcer,
213        })
214    }
215
216    /// Create a new bootstrap manager with full custom configuration
217    pub async fn with_full_config(
218        config: CacheConfig,
219        rate_limit_config: JoinRateLimiterConfig,
220        diversity_config: IPDiversityConfig,
221    ) -> Result<Self> {
222        let cache_dir = config.cache_dir.clone();
223
224        let cache = BootstrapCache::new(cache_dir.clone(), config).await?;
225        let merge_coordinator = MergeCoordinator::new(cache_dir)?;
226        let word_encoder = WordEncoder::new();
227        let join_limiter = JoinRateLimiter::new(rate_limit_config);
228        let diversity_enforcer = IPDiversityEnforcer::new(diversity_config);
229
230        Ok(Self {
231            cache,
232            merge_coordinator,
233            word_encoder,
234            join_limiter,
235            diversity_enforcer,
236        })
237    }
238
239    /// Get bootstrap peers for initial connection
240    pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
241        self.cache.get_bootstrap_peers(count).await
242    }
243
244    /// Add a discovered peer to the cache
245    ///
246    /// This method enforces both rate limiting and IP diversity checks to prevent
247    /// Sybil attacks:
248    ///
249    /// 1. **Rate limiting**: Per-subnet (IPv6 /64, /48 and IPv4 /24) and global limits
250    /// 2. **IP diversity**: Ensures geographic and ASN diversity across the network
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if:
255    /// - The contact has no addresses
256    /// - Join rate limit is exceeded for the IP subnet
257    /// - IP diversity limits are exceeded
258    /// - The cache operation fails
259    pub async fn add_contact(&mut self, contact: ContactEntry) -> Result<()> {
260        // Extract IP address from contact for rate limiting
261        let ip = contact
262            .addresses
263            .first()
264            .map(|addr| addr.ip())
265            .ok_or_else(|| {
266                P2PError::Bootstrap(BootstrapError::InvalidData(
267                    "Contact has no addresses".to_string().into(),
268                ))
269            })?;
270
271        // Check join rate limit (Sybil protection - temporal)
272        self.join_limiter.check_join_allowed(&ip).map_err(|e| {
273            tracing::warn!("Join rate limit exceeded for {}: {}", ip, e);
274            P2PError::Bootstrap(BootstrapError::RateLimited(e.to_string().into()))
275        })?;
276
277        // Convert IP to IPv6 for diversity analysis
278        // IPv4 addresses are mapped to IPv6 (::ffff:a.b.c.d)
279        let ipv6 = ip_to_ipv6(&ip);
280
281        // Analyze IP for diversity enforcement
282        let ip_analysis = self.diversity_enforcer.analyze_ip(ipv6).map_err(|e| {
283            tracing::warn!("IP analysis failed for {}: {}", ip, e);
284            P2PError::Bootstrap(BootstrapError::InvalidData(
285                format!("IP analysis failed: {e}").into(),
286            ))
287        })?;
288
289        // Check IP diversity limits (Sybil protection - geographic/ASN)
290        if !self.diversity_enforcer.can_accept_node(&ip_analysis) {
291            tracing::warn!("IP diversity limit exceeded for {}", ip);
292            return Err(P2PError::Bootstrap(BootstrapError::RateLimited(
293                "IP diversity limits exceeded (too many nodes from same subnet/ASN)"
294                    .to_string()
295                    .into(),
296            )));
297        }
298
299        // Add to diversity tracking
300        if let Err(e) = self.diversity_enforcer.add_node(&ip_analysis) {
301            tracing::warn!("Failed to track IP diversity for {}: {}", ip, e);
302            // Don't fail the add - diversity tracking is best-effort
303        }
304
305        self.cache.add_contact(contact).await
306    }
307
308    /// Add a contact bypassing rate limiting and diversity checks (for internal/trusted sources)
309    ///
310    /// Use this method only for contacts from trusted sources like:
311    /// - Well-known bootstrap nodes
312    /// - Pre-configured seed nodes
313    /// - Admin-approved contacts
314    ///
315    /// # Safety
316    ///
317    /// This method does not enforce rate limiting or diversity checks.
318    /// Only use for trusted sources.
319    pub async fn add_contact_trusted(&mut self, contact: ContactEntry) -> Result<()> {
320        self.cache.add_contact(contact).await
321    }
322
323    /// Update contact performance metrics
324    pub async fn update_contact_metrics(
325        &mut self,
326        peer_id: &PeerId,
327        metrics: QualityMetrics,
328    ) -> Result<()> {
329        self.cache.update_contact_metrics(peer_id, metrics).await
330    }
331
332    /// Start background maintenance tasks
333    pub async fn start_background_tasks(&mut self) -> Result<()> {
334        // Start periodic merge of instance caches
335        let cache_clone = self.cache.clone();
336        let merge_coordinator = self.merge_coordinator.clone();
337
338        tokio::spawn(async move {
339            let mut interval = tokio::time::interval(DEFAULT_MERGE_INTERVAL);
340            loop {
341                interval.tick().await;
342                if let Err(e) = merge_coordinator.merge_instance_caches(&cache_clone).await {
343                    tracing::warn!("Failed to merge instance caches: {}", e);
344                }
345            }
346        });
347
348        // Start quality score updates
349        let cache_clone = self.cache.clone();
350        tokio::spawn(async move {
351            let mut interval = tokio::time::interval(DEFAULT_QUALITY_UPDATE_INTERVAL);
352            loop {
353                interval.tick().await;
354                if let Err(e) = cache_clone.update_quality_scores().await {
355                    tracing::warn!("Failed to update quality scores: {}", e);
356                }
357            }
358        });
359
360        // Start cleanup task
361        let cache_clone = self.cache.clone();
362        tokio::spawn(async move {
363            let mut interval = tokio::time::interval(DEFAULT_CLEANUP_INTERVAL);
364            loop {
365                interval.tick().await;
366                if let Err(e) = cache_clone.cleanup_stale_entries().await {
367                    tracing::warn!("Failed to cleanup stale entries: {}", e);
368                }
369            }
370        });
371
372        Ok(())
373    }
374
375    /// Get cache statistics
376    pub async fn get_stats(&self) -> Result<CacheStats> {
377        self.cache.get_stats().await
378    }
379
380    /// Force a cache merge operation
381    pub async fn force_merge(&self) -> Result<MergeResult> {
382        self.merge_coordinator
383            .merge_instance_caches(&self.cache)
384            .await
385    }
386
387    /// Convert socket address to four-word address
388    pub fn encode_address(&self, socket_addr: &std::net::SocketAddr) -> Result<FourWordAddress> {
389        self.word_encoder
390            .encode_socket_addr(socket_addr)
391            .map_err(|e| {
392                crate::P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
393                    format!("Failed to encode socket address: {e}").into(),
394                ))
395            })
396    }
397
398    /// Convert four-word address to socket address
399    pub fn decode_address(&self, words: &FourWordAddress) -> Result<std::net::SocketAddr> {
400        self.word_encoder.decode_to_socket_addr(words).map_err(|e| {
401            crate::P2PError::Bootstrap(crate::error::BootstrapError::InvalidData(
402                format!("Failed to decode four-word address: {e}").into(),
403            ))
404        })
405    }
406
407    /// Validate four-word address format
408    pub fn validate_words(&self, words: &FourWordAddress) -> Result<()> {
409        if words.validate(&self.word_encoder) {
410            Ok(())
411        } else {
412            Err(crate::P2PError::Bootstrap(
413                crate::error::BootstrapError::InvalidData(
414                    "Invalid four-word address format".to_string().into(),
415                ),
416            ))
417        }
418    }
419
420    /// Get the word encoder for direct access
421    pub fn word_encoder(&self) -> &WordEncoder {
422        &self.word_encoder
423    }
424
425    /// Get well-known bootstrap addresses as four-word addresses
426    pub fn get_well_known_word_addresses(&self) -> Vec<(FourWordAddress, std::net::SocketAddr)> {
427        let well_known_addrs = vec![
428            // Primary bootstrap nodes with well-known addresses
429            std::net::SocketAddr::from(([0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888], 9000)),
430            std::net::SocketAddr::from(([0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8844], 9001)),
431            std::net::SocketAddr::from(([0x2606, 0x4700, 0x4700, 0, 0, 0, 0, 0x1111], 9002)),
432        ];
433
434        well_known_addrs
435            .into_iter()
436            .filter_map(|socket_addr| {
437                if let Ok(words) = self.encode_address(&socket_addr) {
438                    Some((words, socket_addr))
439                } else {
440                    None
441                }
442            })
443            .collect()
444    }
445}
446
447/// Cache statistics for monitoring
448#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
449pub struct CacheStats {
450    /// Total number of contacts in the cache
451    pub total_contacts: usize,
452    /// Number of contacts with high quality scores
453    pub high_quality_contacts: usize,
454    /// Number of contacts with verified IPv6 identity
455    pub verified_contacts: usize,
456    /// Timestamp of the last cache merge operation
457    pub last_merge: chrono::DateTime<chrono::Utc>,
458    /// Timestamp of the last cache cleanup operation
459    pub last_cleanup: chrono::DateTime<chrono::Utc>,
460    /// Cache hit rate for peer discovery operations
461    pub cache_hit_rate: f64,
462    /// Average quality score across all contacts
463    pub average_quality_score: f64,
464
465    // QUIC-specific statistics
466    /// Number of contacts with QUIC networking support
467    pub iroh_contacts: usize,
468    /// Number of contacts with successful NAT traversal (deprecated)
469    pub nat_traversal_contacts: usize,
470    /// Average QUIC connection setup time (milliseconds)
471    pub avg_iroh_setup_time_ms: f64,
472    /// Most successful QUIC connection type
473    pub preferred_iroh_connection_type: Option<String>,
474}
475
476/// Convert an IP address to IPv6
477///
478/// IPv4 addresses are converted to IPv6-mapped format (::ffff:a.b.c.d)
479/// IPv6 addresses are returned as-is
480fn ip_to_ipv6(ip: &IpAddr) -> Ipv6Addr {
481    match ip {
482        IpAddr::V4(ipv4) => ipv4.to_ipv6_mapped(),
483        IpAddr::V6(ipv6) => *ipv6,
484    }
485}
486
487/// Get the home cache directory
488fn home_cache_dir() -> Result<PathBuf> {
489    let home = std::env::var("HOME")
490        .or_else(|_| std::env::var("USERPROFILE"))
491        .map_err(|_| {
492            P2PError::Bootstrap(BootstrapError::CacheError(
493                "Unable to determine home directory".to_string().into(),
494            ))
495        })?;
496
497    let cache_dir = PathBuf::from(home).join(DEFAULT_CACHE_DIR);
498
499    // Ensure cache directory exists
500    std::fs::create_dir_all(&cache_dir).map_err(|e| {
501        P2PError::Bootstrap(BootstrapError::CacheError(
502            format!("Failed to create cache directory: {e}").into(),
503        ))
504    })?;
505
506    Ok(cache_dir)
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use tempfile::TempDir;
513
514    #[tokio::test]
515    async fn test_bootstrap_manager_creation() {
516        let temp_dir = TempDir::new().unwrap();
517        let config = CacheConfig {
518            cache_dir: temp_dir.path().to_path_buf(),
519            max_contacts: 1000,
520            ..CacheConfig::default()
521        };
522
523        let manager = BootstrapManager::with_config(config).await;
524        assert!(manager.is_ok());
525    }
526
527    #[tokio::test]
528    async fn test_home_cache_dir() {
529        let result = home_cache_dir();
530        assert!(result.is_ok());
531
532        let path = result.unwrap();
533        assert!(path.exists());
534        assert!(path.is_dir());
535    }
536}