Skip to main content

saorsa_core/
peer_record.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! # Peer DHT Record System
15//!
16//! This module implements the core data structures for peer discovery and connection
17//! establishment in the P2P network. It provides secure, scalable, and reliable
18//! peer record management with comprehensive validation and serialization.
19//!
20//! ## Security Features
21//! - ML-DSA (post-quantum) signature verification for all records
22//! - Monotonic counter system prevents replay attacks
23//! - Size limits prevent memory exhaustion attacks
24//! - Canonical serialization prevents signature bypass
25//!
26//! ## Performance Optimizations
27//! - Efficient binary serialization with bincode
28//! - Signature verification caching
29//! - Minimal memory allocations
30//! - Batch processing support
31
32use crate::error::SecurityError;
33use crate::quantum_crypto::ant_quic_integration::{MlDsaPublicKey, MlDsaSecretKey, MlDsaSignature};
34use crate::{NetworkAddress, P2PError, Result};
35use blake3::Hash;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::time::{Duration, SystemTime, UNIX_EPOCH};
40use uuid::Uuid;
41
42/// Maximum size for a serialized DHT record to prevent memory exhaustion attacks
43pub const MAX_DHT_RECORD_SIZE: usize = 64 * 1024; // 64KB
44
45/// Maximum number of endpoints per peer to prevent resource exhaustion
46pub const MAX_ENDPOINTS_PER_PEER: usize = 16;
47
48/// Maximum TTL for DHT records (24 hours)
49pub const MAX_TTL_SECONDS: u32 = 24 * 60 * 60;
50
51/// Default TTL for DHT records (5 minutes)
52pub const DEFAULT_TTL_SECONDS: u32 = 5 * 60;
53
54/// Unique identifier for a user in the P2P network
55/// Generated from public key hash to ensure uniqueness and prevent collisions
56#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
57pub struct UserId {
58    /// BLAKE3 hash of the user's public key
59    pub hash: [u8; 32],
60}
61
62impl UserId {
63    /// Create a new UserId from a public key
64    pub fn from_public_key(public_key: &MlDsaPublicKey) -> Self {
65        let hash = blake3::hash(public_key.as_bytes());
66        Self { hash: hash.into() }
67    }
68
69    /// Create a UserId from raw bytes
70    pub fn from_bytes(bytes: [u8; 32]) -> Self {
71        Self { hash: bytes }
72    }
73
74    /// Get the raw bytes of this UserId
75    pub fn as_bytes(&self) -> &[u8; 32] {
76        &self.hash
77    }
78}
79
80impl fmt::Display for UserId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        write!(f, "{}", hex::encode(&self.hash[..8]))
83    }
84}
85
86/// Unique identifier for a peer endpoint/device
87#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub struct EndpointId {
89    /// UUID v4 for device identification
90    pub uuid: Uuid,
91}
92
93impl EndpointId {
94    /// Generate a new random endpoint ID
95    pub fn new() -> Self {
96        Self {
97            uuid: Uuid::new_v4(),
98        }
99    }
100
101    /// Create from existing UUID
102    pub fn from_uuid(uuid: Uuid) -> Self {
103        Self { uuid }
104    }
105}
106
107impl Default for EndpointId {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl fmt::Display for EndpointId {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        write!(f, "{}", self.uuid)
116    }
117}
118
119/// Node identifier for coordinator nodes
120pub type NodeId = String;
121
122/// NAT type classification based on IETF draft-seemann-quic-nat-traversal
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
124pub enum NatType {
125    /// No NAT - public IP address
126    NoNat,
127    /// Full Cone NAT - best case for hole punching
128    FullCone,
129    /// Restricted Cone NAT - IP address restricted
130    RestrictedCone,
131    /// Port Restricted NAT - IP address and port restricted
132    PortRestricted,
133    /// Symmetric NAT - worst case for hole punching
134    Symmetric,
135    /// Unknown NAT type - requires further detection
136    Unknown,
137}
138
139impl NatType {
140    /// Check if this NAT type supports hole punching
141    pub fn supports_hole_punching(&self) -> bool {
142        matches!(
143            self,
144            NatType::NoNat | NatType::FullCone | NatType::RestrictedCone | NatType::PortRestricted
145        )
146    }
147
148    /// Get the difficulty score for hole punching (0 = impossible, 100 = easy)
149    pub fn hole_punching_difficulty(&self) -> u8 {
150        match self {
151            NatType::NoNat => 100,
152            NatType::FullCone => 90,
153            NatType::RestrictedCone => 70,
154            NatType::PortRestricted => 50,
155            NatType::Symmetric => 10,
156            NatType::Unknown => 0,
157        }
158    }
159}
160
161impl fmt::Display for NatType {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            NatType::NoNat => write!(f, "No NAT"),
165            NatType::FullCone => write!(f, "Full Cone"),
166            NatType::RestrictedCone => write!(f, "Restricted Cone"),
167            NatType::PortRestricted => write!(f, "Port Restricted"),
168            NatType::Symmetric => write!(f, "Symmetric"),
169            NatType::Unknown => write!(f, "Unknown"),
170        }
171    }
172}
173
174/// Peer endpoint information for connection establishment
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
176pub struct PeerEndpoint {
177    /// Unique identifier for this endpoint/device
178    pub endpoint_id: EndpointId,
179
180    /// External network address as observed by the network
181    pub external_address: NetworkAddress,
182
183    /// Detected NAT type for this endpoint
184    pub nat_type: NatType,
185
186    /// Coordinator nodes this peer is connected to
187    pub coordinator_nodes: Vec<NodeId>,
188
189    /// Optional device/client information
190    pub device_info: Option<String>,
191
192    /// Timestamp when this endpoint was last updated
193    pub last_updated: u64,
194}
195
196impl PeerEndpoint {
197    /// Create a new peer endpoint
198    pub fn new(
199        endpoint_id: EndpointId,
200        external_address: NetworkAddress,
201        nat_type: NatType,
202        coordinator_nodes: Vec<NodeId>,
203        device_info: Option<String>,
204    ) -> Self {
205        Self {
206            endpoint_id,
207            external_address,
208            nat_type,
209            coordinator_nodes,
210            device_info,
211            last_updated: current_timestamp(),
212        }
213    }
214
215    /// Check if this endpoint is stale based on last update time
216    pub fn is_stale(&self, max_age: Duration) -> bool {
217        let age = current_timestamp().saturating_sub(self.last_updated);
218        age > max_age.as_secs()
219    }
220
221    /// Update the last_updated timestamp
222    pub fn refresh(&mut self) {
223        self.last_updated = current_timestamp();
224    }
225}
226
227/// DHT record for peer discovery with security and validation
228#[derive(Clone)]
229pub struct PeerDHTRecord {
230    /// Record format version for compatibility
231    pub version: u8,
232
233    /// Unique user identifier
234    pub user_id: UserId,
235
236    /// User's public key for signature verification
237    pub public_key: MlDsaPublicKey,
238
239    /// Monotonic counter to prevent replay attacks
240    pub sequence_number: u64,
241
242    /// Optional display name
243    pub name: Option<String>,
244
245    /// Network endpoints for this peer
246    pub endpoints: Vec<PeerEndpoint>,
247
248    /// Time-to-live in seconds
249    pub ttl: u32,
250
251    /// Timestamp when this record was created
252    pub timestamp: u64,
253
254    /// ML-DSA signature over all above fields
255    pub signature: MlDsaSignature,
256}
257
258impl PeerDHTRecord {
259    /// Current record format version
260    pub const CURRENT_VERSION: u8 = 1;
261
262    /// Create a new unsigned DHT record
263    pub fn new(
264        user_id: UserId,
265        public_key: MlDsaPublicKey,
266        sequence_number: u64,
267        name: Option<String>,
268        endpoints: Vec<PeerEndpoint>,
269        ttl: u32,
270    ) -> Result<Self> {
271        // Validate inputs
272        Self::validate_inputs(&name, &endpoints, ttl)?;
273
274        Ok(Self {
275            version: Self::CURRENT_VERSION,
276            user_id,
277            public_key,
278            sequence_number,
279            name,
280            endpoints,
281            timestamp: current_timestamp(),
282            ttl,
283            signature: {
284                // Create a placeholder signature - in practice this would be properly signed
285                let sig_bytes = [0u8; 3309];
286                MlDsaSignature(Box::new(sig_bytes))
287            },
288        })
289    }
290
291    /// Validate record inputs
292    fn validate_inputs(name: &Option<String>, endpoints: &[PeerEndpoint], ttl: u32) -> Result<()> {
293        // Validate name length
294        if let Some(name) = name {
295            if name.len() > 255 {
296                return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
297                    field: "name".to_string().into(),
298                    reason: format!("Name too long (max 255), got {} chars", name.len()).into(),
299                }));
300            }
301            if name.is_empty() {
302                return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
303                    field: "name".to_string().into(),
304                    reason: "Name cannot be empty".to_string().into(),
305                }));
306            }
307        }
308
309        // Validate endpoints
310        if endpoints.is_empty() {
311            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
312                field: "endpoints".to_string().into(),
313                reason: "At least one endpoint required".to_string().into(),
314            }));
315        }
316        if endpoints.len() > MAX_ENDPOINTS_PER_PEER {
317            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
318                field: "endpoints".to_string().into(),
319                reason: format!(
320                    "Too many endpoints ({}, max {})",
321                    endpoints.len(),
322                    MAX_ENDPOINTS_PER_PEER
323                )
324                .into(),
325            }));
326        }
327
328        // Validate TTL
329        if ttl == 0 {
330            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
331                field: "ttl".to_string().into(),
332                reason: "TTL cannot be zero".to_string().into(),
333            }));
334        }
335        if ttl > MAX_TTL_SECONDS {
336            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
337                field: "ttl".to_string().into(),
338                reason: format!("TTL too large ({}, max {})", ttl, MAX_TTL_SECONDS).into(),
339            }));
340        }
341
342        Ok(())
343    }
344
345    /// Create the canonical message for signing
346    pub fn create_signable_message(&self) -> Result<Vec<u8>> {
347        let mut message = Vec::new();
348
349        // Version
350        message.push(self.version);
351
352        // User ID
353        message.extend_from_slice(&self.user_id.hash);
354
355        // Public key
356        message.extend_from_slice(self.public_key.as_bytes());
357
358        // Sequence number (big endian)
359        message.extend_from_slice(&self.sequence_number.to_be_bytes());
360
361        // Name (length-prefixed)
362        if let Some(ref name) = self.name {
363            let name_bytes = name.as_bytes();
364            message.extend_from_slice(&(name_bytes.len() as u32).to_be_bytes());
365            message.extend_from_slice(name_bytes);
366        } else {
367            message.extend_from_slice(&0u32.to_be_bytes());
368        }
369
370        // Endpoints (serialized deterministically)
371        let endpoints_data = postcard::to_stdvec(&self.endpoints).map_err(|e| {
372            P2PError::Storage(crate::error::StorageError::Database(
373                format!("Failed to serialize endpoints: {}", e).into(),
374            ))
375        })?;
376        message.extend_from_slice(&(endpoints_data.len() as u32).to_be_bytes());
377        message.extend_from_slice(&endpoints_data);
378
379        // Timestamp
380        message.extend_from_slice(&self.timestamp.to_be_bytes());
381
382        // TTL
383        message.extend_from_slice(&self.ttl.to_be_bytes());
384
385        Ok(message)
386    }
387
388    /// Sign the record with the given private key
389    pub fn sign(&mut self, signing_key: &MlDsaSecretKey) -> Result<()> {
390        let message = self.create_signable_message()?;
391        self.signature =
392            crate::quantum_crypto::ml_dsa_sign(signing_key, &message).map_err(|e| {
393                P2PError::Security(SecurityError::SignatureVerificationFailed(
394                    format!("ML-DSA signing failed: {:?}", e).into(),
395                ))
396            })?;
397        Ok(())
398    }
399
400    /// Verify the record signature
401    pub fn verify_signature(&self) -> Result<()> {
402        let message = self.create_signable_message()?;
403        let ok = crate::quantum_crypto::ml_dsa_verify(&self.public_key, &message, &self.signature)
404            .map_err(|e| {
405                P2PError::Security(SecurityError::SignatureVerificationFailed(
406                    format!("ML-DSA verify error: {:?}", e).into(),
407                ))
408            })?;
409        if ok {
410            Ok(())
411        } else {
412            Err(P2PError::Security(
413                SecurityError::SignatureVerificationFailed(
414                    "Failed to verify signature".to_string().into(),
415                ),
416            ))
417        }
418    }
419
420    /// Check if the record has expired
421    pub fn is_expired(&self) -> bool {
422        let age = current_timestamp().saturating_sub(self.timestamp);
423        age > self.ttl as u64
424    }
425
426    /// Get the remaining TTL in seconds
427    pub fn remaining_ttl(&self) -> u32 {
428        let age = current_timestamp().saturating_sub(self.timestamp);
429        if age >= self.ttl as u64 {
430            0
431        } else {
432            self.ttl - age as u32
433        }
434    }
435
436    /// Get a hash of this record for deduplication
437    pub fn content_hash(&self) -> Hash {
438        let mut hasher = blake3::Hasher::new();
439        hasher.update(&self.user_id.hash);
440        hasher.update(&self.sequence_number.to_be_bytes());
441        hasher.update(&self.timestamp.to_be_bytes());
442        hasher.finalize()
443    }
444}
445
446/// Get current Unix timestamp
447fn current_timestamp() -> u64 {
448    SystemTime::now()
449        .duration_since(UNIX_EPOCH)
450        .map(|d| d.as_secs())
451        .unwrap_or(0)
452}
453
454/// Signature verification cache for performance optimization
455pub struct SignatureCache {
456    cache: HashMap<Hash, bool>,
457    max_size: usize,
458}
459
460impl SignatureCache {
461    /// Create a new signature cache
462    pub fn new(max_size: usize) -> Self {
463        Self {
464            cache: HashMap::new(),
465            max_size,
466        }
467    }
468
469    /// Verify signature with caching
470    pub fn verify_cached(&mut self, record: &PeerDHTRecord) -> Result<()> {
471        let hash = record.content_hash();
472
473        // Check cache first
474        if let Some(&result) = self.cache.get(&hash) {
475            return if result {
476                Ok(())
477            } else {
478                Err(P2PError::Security(
479                    SecurityError::SignatureVerificationFailed(
480                        "Invalid signature in cache".to_string().into(),
481                    ),
482                ))
483            };
484        }
485
486        // Verify signature
487        let result = record.verify_signature();
488        let success = result.is_ok();
489
490        // Cache the result
491        if self.cache.len() >= self.max_size {
492            // Simple eviction: remove oldest entry
493            if let Some(key) = self.cache.keys().next().cloned() {
494                self.cache.remove(&key);
495            }
496        }
497        self.cache.insert(hash, success);
498
499        result
500    }
501
502    /// Clear the cache
503    pub fn clear(&mut self) {
504        self.cache.clear();
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511    // Using ML-DSA PQC keys for tests
512
513    fn create_test_keypair() -> (MlDsaSecretKey, MlDsaPublicKey) {
514        let (public_key, secret_key) = crate::quantum_crypto::generate_ml_dsa_keypair().unwrap();
515        (secret_key, public_key)
516    }
517
518    fn create_test_endpoint() -> PeerEndpoint {
519        PeerEndpoint::new(
520            EndpointId::new(),
521            "192.168.1.1:8080".parse::<NetworkAddress>().unwrap(),
522            NatType::FullCone,
523            vec!["coordinator1".to_string()],
524            Some("test-device".to_string()),
525        )
526    }
527
528    #[test]
529    fn test_user_id_generation() {
530        let (_, public_key) = create_test_keypair();
531        let user_id = UserId::from_public_key(&public_key);
532
533        // Should be deterministic
534        let user_id2 = UserId::from_public_key(&public_key);
535        assert_eq!(user_id, user_id2);
536    }
537
538    #[test]
539    fn test_nat_type_hole_punching() {
540        assert!(NatType::NoNat.supports_hole_punching());
541        assert!(NatType::FullCone.supports_hole_punching());
542        assert!(NatType::RestrictedCone.supports_hole_punching());
543        assert!(NatType::PortRestricted.supports_hole_punching());
544        assert!(!NatType::Symmetric.supports_hole_punching());
545        assert!(!NatType::Unknown.supports_hole_punching());
546    }
547
548    #[test]
549    fn test_peer_endpoint_creation() {
550        let endpoint = create_test_endpoint();
551        assert!(!endpoint.is_stale(Duration::from_secs(60)));
552
553        let mut old_endpoint = endpoint.clone();
554        old_endpoint.last_updated = current_timestamp() - 120; // 2 minutes ago
555        assert!(old_endpoint.is_stale(Duration::from_secs(60)));
556    }
557
558    #[test]
559    fn test_dht_record_creation_and_signing() {
560        let (secret_key, public_key) = create_test_keypair();
561        let user_id = UserId::from_public_key(&public_key);
562        let endpoint = create_test_endpoint();
563
564        let mut record = PeerDHTRecord::new(
565            user_id,
566            public_key,
567            1,
568            Some("test-user".to_string()),
569            vec![endpoint],
570            DEFAULT_TTL_SECONDS,
571        )
572        .unwrap();
573
574        // Sign the record
575        record.sign(&secret_key).unwrap();
576
577        // Verify signature
578        assert!(record.verify_signature().is_ok());
579
580        // Check expiration
581        assert!(!record.is_expired());
582        assert!(record.remaining_ttl() > 0);
583    }
584
585    #[test]
586    fn test_signature_cache() {
587        let (secret_key, public_key) = create_test_keypair();
588        let user_id = UserId::from_public_key(&public_key);
589        let endpoint = create_test_endpoint();
590
591        let mut record = PeerDHTRecord::new(
592            user_id,
593            public_key,
594            1,
595            Some("test-user".to_string()),
596            vec![endpoint],
597            DEFAULT_TTL_SECONDS,
598        )
599        .unwrap();
600
601        record.sign(&secret_key).unwrap();
602
603        let mut cache = SignatureCache::new(100);
604
605        // First verification should compute
606        assert!(cache.verify_cached(&record).is_ok());
607
608        // Second verification should use cache
609        assert!(cache.verify_cached(&record).is_ok());
610    }
611
612    #[test]
613    fn test_validation_limits() {
614        let (_, public_key) = create_test_keypair();
615        let user_id = UserId::from_public_key(&public_key);
616
617        // Test name too long
618        let long_name = "a".repeat(256);
619        let result = PeerDHTRecord::new(
620            user_id.clone(),
621            public_key.clone(),
622            1,
623            Some(long_name),
624            vec![create_test_endpoint()],
625            DEFAULT_TTL_SECONDS,
626        );
627        assert!(result.is_err());
628
629        // Test too many endpoints
630        let many_endpoints = vec![create_test_endpoint(); MAX_ENDPOINTS_PER_PEER + 1];
631        let result = PeerDHTRecord::new(
632            user_id.clone(),
633            public_key.clone(),
634            1,
635            Some("test".to_string()),
636            many_endpoints,
637            DEFAULT_TTL_SECONDS,
638        );
639        assert!(result.is_err());
640
641        // Test TTL too large
642        let result = PeerDHTRecord::new(
643            user_id,
644            public_key,
645            1,
646            Some("test".to_string()),
647            vec![create_test_endpoint()],
648            MAX_TTL_SECONDS + 1,
649        );
650        assert!(result.is_err());
651    }
652}