saorsa_core/
peer_record.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! # Peer DHT Record System
15//!
16//! This module implements the core data structures for peer discovery and connection
17//! establishment in the P2P network. It provides secure, scalable, and reliable
18//! peer record management with comprehensive validation and serialization.
19//!
20//! ## Security Features
21//! - Ed25519 signature verification for all records
22//! - Monotonic counter system prevents replay attacks
23//! - Size limits prevent memory exhaustion attacks
24//! - Canonical serialization prevents signature bypass
25//!
26//! ## Performance Optimizations
27//! - Efficient binary serialization with bincode
28//! - Signature verification caching
29//! - Minimal memory allocations
30//! - Batch processing support
31
32use crate::error::{SecurityError, StorageError};
33use crate::{NetworkAddress, P2PError, Result};
34use blake3::Hash;
35use ed25519_dalek::{Signature, Signer, Verifier, VerifyingKey};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::time::{Duration, SystemTime, UNIX_EPOCH};
40use uuid::Uuid;
41
42/// Maximum size for a serialized DHT record to prevent memory exhaustion attacks
43pub const MAX_DHT_RECORD_SIZE: usize = 64 * 1024; // 64KB
44
45/// Maximum number of endpoints per peer to prevent resource exhaustion
46pub const MAX_ENDPOINTS_PER_PEER: usize = 16;
47
48/// Maximum TTL for DHT records (24 hours)
49pub const MAX_TTL_SECONDS: u32 = 24 * 60 * 60;
50
51/// Default TTL for DHT records (5 minutes)
52pub const DEFAULT_TTL_SECONDS: u32 = 5 * 60;
53
54/// Unique identifier for a user in the P2P network
55/// Generated from public key hash to ensure uniqueness and prevent collisions
56#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
57pub struct UserId {
58    /// BLAKE3 hash of the user's public key
59    pub hash: [u8; 32],
60}
61
62impl UserId {
63    /// Create a new UserId from a public key
64    pub fn from_public_key(public_key: &VerifyingKey) -> Self {
65        let hash = blake3::hash(public_key.as_bytes());
66        Self { hash: hash.into() }
67    }
68
69    /// Create a UserId from raw bytes
70    pub fn from_bytes(bytes: [u8; 32]) -> Self {
71        Self { hash: bytes }
72    }
73
74    /// Get the raw bytes of this UserId
75    pub fn as_bytes(&self) -> &[u8; 32] {
76        &self.hash
77    }
78}
79
80impl fmt::Display for UserId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        write!(f, "{}", hex::encode(&self.hash[..8]))
83    }
84}
85
86/// Unique identifier for a peer endpoint/device
87#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub struct EndpointId {
89    /// UUID v4 for device identification
90    pub uuid: Uuid,
91}
92
93impl EndpointId {
94    /// Generate a new random endpoint ID
95    pub fn new() -> Self {
96        Self {
97            uuid: Uuid::new_v4(),
98        }
99    }
100
101    /// Create from existing UUID
102    pub fn from_uuid(uuid: Uuid) -> Self {
103        Self { uuid }
104    }
105}
106
107impl Default for EndpointId {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl fmt::Display for EndpointId {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        write!(f, "{}", self.uuid)
116    }
117}
118
119/// Node identifier for coordinator nodes
120pub type NodeId = String;
121
122/// NAT type classification based on IETF draft-seemann-quic-nat-traversal
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
124pub enum NatType {
125    /// No NAT - public IP address
126    NoNat,
127    /// Full Cone NAT - best case for hole punching
128    FullCone,
129    /// Restricted Cone NAT - IP address restricted
130    RestrictedCone,
131    /// Port Restricted NAT - IP address and port restricted
132    PortRestricted,
133    /// Symmetric NAT - worst case for hole punching
134    Symmetric,
135    /// Unknown NAT type - requires further detection
136    Unknown,
137}
138
139impl NatType {
140    /// Check if this NAT type supports hole punching
141    pub fn supports_hole_punching(&self) -> bool {
142        matches!(
143            self,
144            NatType::NoNat | NatType::FullCone | NatType::RestrictedCone | NatType::PortRestricted
145        )
146    }
147
148    /// Get the difficulty score for hole punching (0 = impossible, 100 = easy)
149    pub fn hole_punching_difficulty(&self) -> u8 {
150        match self {
151            NatType::NoNat => 100,
152            NatType::FullCone => 90,
153            NatType::RestrictedCone => 70,
154            NatType::PortRestricted => 50,
155            NatType::Symmetric => 10,
156            NatType::Unknown => 0,
157        }
158    }
159}
160
161impl fmt::Display for NatType {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            NatType::NoNat => write!(f, "No NAT"),
165            NatType::FullCone => write!(f, "Full Cone"),
166            NatType::RestrictedCone => write!(f, "Restricted Cone"),
167            NatType::PortRestricted => write!(f, "Port Restricted"),
168            NatType::Symmetric => write!(f, "Symmetric"),
169            NatType::Unknown => write!(f, "Unknown"),
170        }
171    }
172}
173
174/// Peer endpoint information for connection establishment
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
176pub struct PeerEndpoint {
177    /// Unique identifier for this endpoint/device
178    pub endpoint_id: EndpointId,
179
180    /// External network address as observed by the network
181    pub external_address: NetworkAddress,
182
183    /// Detected NAT type for this endpoint
184    pub nat_type: NatType,
185
186    /// Coordinator nodes this peer is connected to
187    pub coordinator_nodes: Vec<NodeId>,
188
189    /// Optional device/client information
190    pub device_info: Option<String>,
191
192    /// Timestamp when this endpoint was last updated
193    pub last_updated: u64,
194}
195
196impl PeerEndpoint {
197    /// Create a new peer endpoint
198    pub fn new(
199        endpoint_id: EndpointId,
200        external_address: NetworkAddress,
201        nat_type: NatType,
202        coordinator_nodes: Vec<NodeId>,
203        device_info: Option<String>,
204    ) -> Self {
205        Self {
206            endpoint_id,
207            external_address,
208            nat_type,
209            coordinator_nodes,
210            device_info,
211            last_updated: current_timestamp(),
212        }
213    }
214
215    /// Check if this endpoint is stale based on last update time
216    pub fn is_stale(&self, max_age: Duration) -> bool {
217        let age = current_timestamp().saturating_sub(self.last_updated);
218        age > max_age.as_secs()
219    }
220
221    /// Update the last_updated timestamp
222    pub fn refresh(&mut self) {
223        self.last_updated = current_timestamp();
224    }
225}
226
227/// DHT record for peer discovery with security and validation
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub struct PeerDHTRecord {
230    /// Record format version for compatibility
231    pub version: u8,
232
233    /// Unique user identifier
234    pub user_id: UserId,
235
236    /// User's public key for signature verification
237    pub public_key: VerifyingKey,
238
239    /// Monotonic counter to prevent replay attacks
240    pub sequence_number: u64,
241
242    /// Optional human-readable name
243    pub name: Option<String>,
244
245    /// Current connection endpoints
246    pub endpoints: Vec<PeerEndpoint>,
247
248    /// Unix timestamp of this record creation
249    pub timestamp: u64,
250
251    /// Time-to-live in seconds
252    pub ttl: u32,
253
254    /// Ed25519 signature over all above fields
255    pub signature: Signature,
256}
257
258impl PeerDHTRecord {
259    /// Current record format version
260    pub const CURRENT_VERSION: u8 = 1;
261
262    /// Create a new unsigned DHT record
263    pub fn new(
264        user_id: UserId,
265        public_key: VerifyingKey,
266        sequence_number: u64,
267        name: Option<String>,
268        endpoints: Vec<PeerEndpoint>,
269        ttl: u32,
270    ) -> Result<Self> {
271        // Validate inputs
272        Self::validate_inputs(&name, &endpoints, ttl)?;
273
274        Ok(Self {
275            version: Self::CURRENT_VERSION,
276            user_id,
277            public_key,
278            sequence_number,
279            name,
280            endpoints,
281            timestamp: current_timestamp(),
282            ttl,
283            signature: Signature::from_bytes(&[0; 64]), // Placeholder, will be signed
284        })
285    }
286
287    /// Validate record inputs
288    fn validate_inputs(name: &Option<String>, endpoints: &[PeerEndpoint], ttl: u32) -> Result<()> {
289        // Validate name length
290        if let Some(name) = name {
291            if name.len() > 255 {
292                return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
293                    field: "name".to_string().into(),
294                    reason: format!("Name too long (max 255), got {} chars", name.len()).into(),
295                }));
296            }
297            if name.is_empty() {
298                return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
299                    field: "name".to_string().into(),
300                    reason: "Name cannot be empty".to_string().into(),
301                }));
302            }
303        }
304
305        // Validate endpoints
306        if endpoints.is_empty() {
307            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
308                field: "endpoints".to_string().into(),
309                reason: "At least one endpoint required".to_string().into(),
310            }));
311        }
312        if endpoints.len() > MAX_ENDPOINTS_PER_PEER {
313            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
314                field: "endpoints".to_string().into(),
315                reason: format!(
316                    "Too many endpoints ({}, max {})",
317                    endpoints.len(),
318                    MAX_ENDPOINTS_PER_PEER
319                )
320                .into(),
321            }));
322        }
323
324        // Validate TTL
325        if ttl == 0 {
326            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
327                field: "ttl".to_string().into(),
328                reason: "TTL cannot be zero".to_string().into(),
329            }));
330        }
331        if ttl > MAX_TTL_SECONDS {
332            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
333                field: "ttl".to_string().into(),
334                reason: format!("TTL too large ({}, max {})", ttl, MAX_TTL_SECONDS).into(),
335            }));
336        }
337
338        Ok(())
339    }
340
341    /// Create the canonical message for signing
342    pub fn create_signable_message(&self) -> Result<Vec<u8>> {
343        let mut message = Vec::new();
344
345        // Version
346        message.push(self.version);
347
348        // User ID
349        message.extend_from_slice(&self.user_id.hash);
350
351        // Public key
352        message.extend_from_slice(self.public_key.as_bytes());
353
354        // Sequence number (big endian)
355        message.extend_from_slice(&self.sequence_number.to_be_bytes());
356
357        // Name (length-prefixed)
358        if let Some(ref name) = self.name {
359            let name_bytes = name.as_bytes();
360            message.extend_from_slice(&(name_bytes.len() as u32).to_be_bytes());
361            message.extend_from_slice(name_bytes);
362        } else {
363            message.extend_from_slice(&0u32.to_be_bytes());
364        }
365
366        // Endpoints (serialized deterministically)
367        let endpoints_data = bincode::serialize(&self.endpoints).map_err(|e| {
368            P2PError::Storage(crate::error::StorageError::Database(
369                format!("Failed to serialize endpoints: {}", e).into(),
370            ))
371        })?;
372        message.extend_from_slice(&(endpoints_data.len() as u32).to_be_bytes());
373        message.extend_from_slice(&endpoints_data);
374
375        // Timestamp
376        message.extend_from_slice(&self.timestamp.to_be_bytes());
377
378        // TTL
379        message.extend_from_slice(&self.ttl.to_be_bytes());
380
381        Ok(message)
382    }
383
384    /// Sign the record with the given private key
385    pub fn sign(&mut self, signing_key: &ed25519_dalek::SigningKey) -> Result<()> {
386        let message = self.create_signable_message()?;
387        self.signature = signing_key.sign(&message);
388        Ok(())
389    }
390
391    /// Verify the record signature
392    pub fn verify_signature(&self) -> Result<()> {
393        let message = self.create_signable_message()?;
394        self.public_key
395            .verify(&message, &self.signature)
396            .map_err(|_| {
397                P2PError::Security(SecurityError::SignatureVerificationFailed(
398                    "Failed to verify signature".to_string().into(),
399                ))
400            })?;
401        Ok(())
402    }
403
404    /// Check if the record has expired
405    pub fn is_expired(&self) -> bool {
406        let age = current_timestamp().saturating_sub(self.timestamp);
407        age > self.ttl as u64
408    }
409
410    /// Get the remaining TTL in seconds
411    pub fn remaining_ttl(&self) -> u32 {
412        let age = current_timestamp().saturating_sub(self.timestamp);
413        if age >= self.ttl as u64 {
414            0
415        } else {
416            self.ttl - age as u32
417        }
418    }
419
420    /// Serialize the record using bincode for efficiency
421    pub fn serialize(&self) -> Result<Vec<u8>> {
422        let serialized = bincode::serialize(self).map_err(|e| {
423            P2PError::Storage(StorageError::Database(
424                format!("Failed to serialize record: {e}").into(),
425            ))
426        })?;
427
428        // Enforce size limits
429        if serialized.len() > MAX_DHT_RECORD_SIZE {
430            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
431                field: "record_size".to_string().into(),
432                reason: format!(
433                    "Record too large ({} bytes, max {})",
434                    serialized.len(),
435                    MAX_DHT_RECORD_SIZE
436                )
437                .into(),
438            }));
439        }
440
441        Ok(serialized)
442    }
443
444    /// Deserialize a record from bytes with validation
445    pub fn deserialize(data: &[u8]) -> Result<Self> {
446        // Check size limits
447        if data.len() > MAX_DHT_RECORD_SIZE {
448            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
449                field: "record_size".to_string().into(),
450                reason: format!(
451                    "Record too large ({} bytes, max {})",
452                    data.len(),
453                    MAX_DHT_RECORD_SIZE
454                )
455                .into(),
456            }));
457        }
458
459        let record: PeerDHTRecord = bincode::deserialize(data).map_err(|e| {
460            P2PError::Storage(StorageError::Database(
461                format!("Failed to deserialize record: {e}").into(),
462            ))
463        })?;
464
465        // Validate version
466        if record.version > Self::CURRENT_VERSION {
467            return Err(P2PError::Config(crate::error::ConfigError::InvalidValue {
468                field: "record_version".to_string().into(),
469                reason: format!("Unsupported record version: {}", record.version).into(),
470            }));
471        }
472
473        // Validate basic constraints
474        Self::validate_inputs(&record.name, &record.endpoints, record.ttl)?;
475
476        // Verify signature
477        record.verify_signature()?;
478
479        Ok(record)
480    }
481
482    /// Get a hash of this record for deduplication
483    pub fn content_hash(&self) -> Hash {
484        let mut hasher = blake3::Hasher::new();
485        hasher.update(&self.user_id.hash);
486        hasher.update(&self.sequence_number.to_be_bytes());
487        hasher.update(&self.timestamp.to_be_bytes());
488        hasher.finalize()
489    }
490}
491
492/// Get current Unix timestamp
493fn current_timestamp() -> u64 {
494    SystemTime::now()
495        .duration_since(UNIX_EPOCH)
496        .map(|d| d.as_secs())
497        .unwrap_or(0)
498}
499
500/// Signature verification cache for performance optimization
501pub struct SignatureCache {
502    cache: HashMap<Hash, bool>,
503    max_size: usize,
504}
505
506impl SignatureCache {
507    /// Create a new signature cache
508    pub fn new(max_size: usize) -> Self {
509        Self {
510            cache: HashMap::new(),
511            max_size,
512        }
513    }
514
515    /// Verify signature with caching
516    pub fn verify_cached(&mut self, record: &PeerDHTRecord) -> Result<()> {
517        let hash = record.content_hash();
518
519        // Check cache first
520        if let Some(&result) = self.cache.get(&hash) {
521            return if result {
522                Ok(())
523            } else {
524                Err(P2PError::Security(
525                    SecurityError::SignatureVerificationFailed(
526                        "Invalid signature in cache".to_string().into(),
527                    ),
528                ))
529            };
530        }
531
532        // Verify signature
533        let result = record.verify_signature();
534        let success = result.is_ok();
535
536        // Cache the result
537        if self.cache.len() >= self.max_size {
538            // Simple eviction: remove oldest entry
539            if let Some(key) = self.cache.keys().next().cloned() {
540                self.cache.remove(&key);
541            }
542        }
543        self.cache.insert(hash, success);
544
545        result
546    }
547
548    /// Clear the cache
549    pub fn clear(&mut self) {
550        self.cache.clear();
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use ed25519_dalek::SigningKey;
558    use rand::rngs::OsRng;
559
560    fn create_test_keypair() -> (SigningKey, VerifyingKey) {
561        let mut csprng = OsRng {};
562        let signing_key = SigningKey::generate(&mut csprng);
563        let verifying_key = signing_key.verifying_key().clone();
564        (signing_key, verifying_key)
565    }
566
567    fn create_test_endpoint() -> PeerEndpoint {
568        PeerEndpoint::new(
569            EndpointId::new(),
570            "192.168.1.1:8080".parse::<NetworkAddress>().unwrap(),
571            NatType::FullCone,
572            vec!["coordinator1".to_string()],
573            Some("test-device".to_string()),
574        )
575    }
576
577    #[test]
578    fn test_user_id_generation() {
579        let (_, verifying_key) = create_test_keypair();
580        let user_id = UserId::from_public_key(&verifying_key);
581
582        // Should be deterministic
583        let user_id2 = UserId::from_public_key(&verifying_key);
584        assert_eq!(user_id, user_id2);
585    }
586
587    #[test]
588    fn test_nat_type_hole_punching() {
589        assert!(NatType::NoNat.supports_hole_punching());
590        assert!(NatType::FullCone.supports_hole_punching());
591        assert!(NatType::RestrictedCone.supports_hole_punching());
592        assert!(NatType::PortRestricted.supports_hole_punching());
593        assert!(!NatType::Symmetric.supports_hole_punching());
594        assert!(!NatType::Unknown.supports_hole_punching());
595    }
596
597    #[test]
598    fn test_peer_endpoint_creation() {
599        let endpoint = create_test_endpoint();
600        assert!(!endpoint.is_stale(Duration::from_secs(60)));
601
602        let mut old_endpoint = endpoint.clone();
603        old_endpoint.last_updated = current_timestamp() - 120; // 2 minutes ago
604        assert!(old_endpoint.is_stale(Duration::from_secs(60)));
605    }
606
607    #[test]
608    fn test_dht_record_creation_and_signing() {
609        let (signing_key, verifying_key) = create_test_keypair();
610        let user_id = UserId::from_public_key(&verifying_key);
611        let endpoint = create_test_endpoint();
612
613        let mut record = PeerDHTRecord::new(
614            user_id,
615            verifying_key,
616            1,
617            Some("test-user".to_string()),
618            vec![endpoint],
619            DEFAULT_TTL_SECONDS,
620        )
621        .unwrap();
622
623        // Sign the record
624        record.sign(&signing_key).unwrap();
625
626        // Verify signature
627        assert!(record.verify_signature().is_ok());
628
629        // Check expiration
630        assert!(!record.is_expired());
631        assert!(record.remaining_ttl() > 0);
632    }
633
634    #[test]
635    fn test_record_serialization() {
636        let (signing_key, verifying_key) = create_test_keypair();
637        let user_id = UserId::from_public_key(&verifying_key);
638        let endpoint = create_test_endpoint();
639
640        let mut record = PeerDHTRecord::new(
641            user_id,
642            verifying_key,
643            1,
644            Some("test-user".to_string()),
645            vec![endpoint],
646            DEFAULT_TTL_SECONDS,
647        )
648        .unwrap();
649
650        record.sign(&signing_key).unwrap();
651
652        // Serialize and deserialize
653        let serialized = record.serialize().unwrap();
654        let deserialized = PeerDHTRecord::deserialize(&serialized).unwrap();
655
656        assert_eq!(record, deserialized);
657    }
658
659    #[test]
660    fn test_signature_cache() {
661        let (signing_key, verifying_key) = create_test_keypair();
662        let user_id = UserId::from_public_key(&verifying_key);
663        let endpoint = create_test_endpoint();
664
665        let mut record = PeerDHTRecord::new(
666            user_id,
667            verifying_key,
668            1,
669            Some("test-user".to_string()),
670            vec![endpoint],
671            DEFAULT_TTL_SECONDS,
672        )
673        .unwrap();
674
675        record.sign(&signing_key).unwrap();
676
677        let mut cache = SignatureCache::new(100);
678
679        // First verification should compute
680        assert!(cache.verify_cached(&record).is_ok());
681
682        // Second verification should use cache
683        assert!(cache.verify_cached(&record).is_ok());
684    }
685
686    #[test]
687    fn test_validation_limits() {
688        let (_, verifying_key) = create_test_keypair();
689        let user_id = UserId::from_public_key(&verifying_key);
690
691        // Test name too long
692        let long_name = "a".repeat(256);
693        let result = PeerDHTRecord::new(
694            user_id.clone(),
695            verifying_key,
696            1,
697            Some(long_name),
698            vec![create_test_endpoint()],
699            DEFAULT_TTL_SECONDS,
700        );
701        assert!(result.is_err());
702
703        // Test too many endpoints
704        let many_endpoints = vec![create_test_endpoint(); MAX_ENDPOINTS_PER_PEER + 1];
705        let result = PeerDHTRecord::new(
706            user_id.clone(),
707            verifying_key,
708            1,
709            Some("test".to_string()),
710            many_endpoints,
711            DEFAULT_TTL_SECONDS,
712        );
713        assert!(result.is_err());
714
715        // Test TTL too large
716        let result = PeerDHTRecord::new(
717            user_id,
718            verifying_key,
719            1,
720            Some("test".to_string()),
721            vec![create_test_endpoint()],
722            MAX_TTL_SECONDS + 1,
723        );
724        assert!(result.is_err());
725    }
726}