Skip to main content

scp2p_core/
relay.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7use std::collections::{HashMap, HashSet};
8
9use serde::{Deserialize, Serialize};
10use tokio::sync::{Mutex, mpsc, oneshot};
11
12use crate::capabilities::Capabilities;
13use crate::peer::PeerAddr;
14use crate::wire::Envelope;
15
16pub const RELAY_SLOT_TTL_SECS: u64 = 10 * 60;
17
18/// Maximum allowed TTL for a relay announcement (6 hours).
19pub const RELAY_ANNOUNCEMENT_MAX_TTL_SECS: u64 = 6 * 60 * 60;
20
21/// Default rendezvous bucket duration (1 hour).
22pub const RELAY_RENDEZVOUS_BUCKET_SECS: u64 = 60 * 60;
23
24/// Number of rendezvous keys per bucket.
25pub const RELAY_RENDEZVOUS_N: usize = 16;
26
27// ── Relay Announcement (DHT / PEX) ────────────────────────────────
28
29/// Coarse self-reported bandwidth class for relay announcements.
30///
31/// Clients do NOT trust this blindly — they use it as a hint and
32/// validate with local quality scoring (`RelayScore`).
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
34pub enum BandwidthClass {
35    /// < 10 Mbps
36    Low,
37    /// 10–100 Mbps
38    #[default]
39    Medium,
40    /// > 100 Mbps
41    High,
42}
43
44/// Advertised capacity limits included in a `RelayAnnouncement`.
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct RelayCapacity {
47    pub max_tunnels: u16,
48    pub bandwidth_class: BandwidthClass,
49    /// Per-tunnel byte cap (None = unlimited within daily quota).
50    pub max_bytes_per_tunnel: Option<u64>,
51}
52
53impl Default for RelayCapacity {
54    fn default() -> Self {
55        Self {
56            max_tunnels: 64,
57            bandwidth_class: BandwidthClass::Medium,
58            max_bytes_per_tunnel: None,
59        }
60    }
61}
62
63/// A signed relay announcement published to DHT rendezvous keys or
64/// exchanged via Relay-PEX.
65///
66/// Validation rules:
67/// 1. `signature` verifies over a deterministic CBOR encoding of all
68///    other fields (see [`RelayAnnouncement::signing_bytes`]).
69/// 2. `expires_at - issued_at <= RELAY_ANNOUNCEMENT_MAX_TTL_SECS`.
70/// 3. Every address in `relay_addrs` must be directly reachable (no
71///    `relay_via` chains — relays cannot be behind other relays).
72/// 4. `capabilities.relay` must be `true`.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct RelayAnnouncement {
75    pub relay_pubkey: [u8; 32],
76    pub relay_addrs: Vec<PeerAddr>,
77    pub capabilities: Capabilities,
78    pub capacity: RelayCapacity,
79    pub issued_at: u64,
80    pub expires_at: u64,
81    #[serde(with = "serde_bytes")]
82    pub signature: Vec<u8>,
83}
84
85/// The subset of `RelayAnnouncement` that is signed.
86/// Serialized as a **positional CBOR array** (tuple struct) for
87/// deterministic cross-implementation signing bytes.
88#[derive(Serialize)]
89struct RelayAnnouncementSignable<'a>(
90    &'a [u8; 32],      // relay_pubkey
91    &'a [PeerAddr],    // relay_addrs
92    &'a Capabilities,  // capabilities
93    &'a RelayCapacity, // capacity
94    u64,               // issued_at
95    u64,               // expires_at
96);
97
98impl RelayAnnouncement {
99    /// The deterministic bytes to sign / verify.
100    pub fn signing_bytes(&self) -> Vec<u8> {
101        let signable = RelayAnnouncementSignable(
102            &self.relay_pubkey,
103            &self.relay_addrs,
104            &self.capabilities,
105            &self.capacity,
106            self.issued_at,
107            self.expires_at,
108        );
109        crate::cbor::to_vec(&signable).expect("CBOR serialization of signable must not fail")
110    }
111
112    /// Validate structural rules (does NOT verify cryptographic signature).
113    pub fn validate_structure(&self) -> anyhow::Result<()> {
114        if !self.capabilities.relay {
115            anyhow::bail!("relay announcement must have capabilities.relay = true");
116        }
117        if self.expires_at <= self.issued_at {
118            anyhow::bail!("expires_at must be after issued_at");
119        }
120        if self.expires_at - self.issued_at > RELAY_ANNOUNCEMENT_MAX_TTL_SECS {
121            anyhow::bail!(
122                "relay announcement TTL exceeds maximum ({} secs)",
123                RELAY_ANNOUNCEMENT_MAX_TTL_SECS
124            );
125        }
126        if self.relay_addrs.is_empty() {
127            anyhow::bail!("relay announcement must include at least one address");
128        }
129        for addr in &self.relay_addrs {
130            if addr.relay_via.is_some() {
131                anyhow::bail!("relay addresses must be direct (no relay_via chains)");
132            }
133        }
134        if self.signature.is_empty() {
135            anyhow::bail!("relay announcement signature is empty");
136        }
137        Ok(())
138    }
139
140    /// Verify the Ed25519 signature over `signing_bytes()`.
141    pub fn verify_signature(&self) -> anyhow::Result<()> {
142        self.validate_structure()?;
143        if self.signature.len() != 64 {
144            anyhow::bail!("relay announcement signature must be 64 bytes");
145        }
146        let pubkey = ed25519_dalek::VerifyingKey::from_bytes(&self.relay_pubkey)?;
147        let mut sig_arr = [0u8; 64];
148        sig_arr.copy_from_slice(&self.signature);
149        let sig = ed25519_dalek::Signature::from_bytes(&sig_arr);
150        pubkey.verify_strict(&self.signing_bytes(), &sig)?;
151        Ok(())
152    }
153
154    /// Check whether this announcement is still fresh at `now_unix`.
155    pub fn is_fresh(&self, now_unix: u64) -> bool {
156        now_unix < self.expires_at
157    }
158
159    /// Build and sign a new relay announcement.
160    ///
161    /// `ttl_secs` is clamped to `RELAY_ANNOUNCEMENT_MAX_TTL_SECS`.
162    /// The signature is produced with the provided `signing_key`.
163    pub fn new_signed(
164        signing_key: &ed25519_dalek::SigningKey,
165        relay_addrs: Vec<PeerAddr>,
166        capabilities: Capabilities,
167        capacity: RelayCapacity,
168        issued_at: u64,
169        ttl_secs: u64,
170    ) -> anyhow::Result<Self> {
171        use ed25519_dalek::Signer as _;
172        let ttl = ttl_secs.min(RELAY_ANNOUNCEMENT_MAX_TTL_SECS);
173        let expires_at = issued_at.saturating_add(ttl);
174        let relay_pubkey = signing_key.verifying_key().to_bytes();
175        let mut ann = RelayAnnouncement {
176            relay_pubkey,
177            relay_addrs,
178            capabilities,
179            capacity,
180            issued_at,
181            expires_at,
182            signature: vec![],
183        };
184        let sig: ed25519_dalek::Signature = signing_key.sign(&ann.signing_bytes());
185        ann.signature = sig.to_bytes().to_vec();
186        ann.validate_structure()?;
187        Ok(ann)
188    }
189}
190
191/// Compute the DHT rendezvous key index for a relay in a given bucket.
192///
193/// `which` is 0 or 1 — each relay publishes under two indices.
194///
195/// ```text
196///   i = SHA-256(relay_pubkey || bucket_id || which) mod N
197/// ```
198pub fn relay_rendezvous_index(relay_pubkey: &[u8; 32], bucket_id: u64, which: u8) -> usize {
199    use sha2::{Digest, Sha256};
200    let mut hasher = Sha256::new();
201    hasher.update(relay_pubkey);
202    hasher.update(bucket_id.to_be_bytes());
203    hasher.update([which]);
204    let hash = hasher.finalize();
205    // Take last 8 bytes as u64 and mod N.
206    let val = u64::from_be_bytes(hash[24..32].try_into().expect("8 bytes"));
207    (val as usize) % RELAY_RENDEZVOUS_N
208}
209
210/// Compute the DHT rendezvous key for a given bucket and slot index.
211///
212/// ```text
213///   R_i = SHA-256("scp2p:relay:rendezvous" || bucket_id || i)
214/// ```
215pub fn relay_rendezvous_key(bucket_id: u64, slot: usize) -> [u8; 32] {
216    use sha2::{Digest, Sha256};
217    let mut hasher = Sha256::new();
218    hasher.update(b"scp2p:relay:rendezvous");
219    hasher.update(bucket_id.to_be_bytes());
220    hasher.update((slot as u64).to_be_bytes());
221    let hash = hasher.finalize();
222    let mut key = [0u8; 32];
223    key.copy_from_slice(&hash);
224    key
225}
226
227/// Compute the current rendezvous bucket ID.
228pub fn current_rendezvous_bucket(now_unix: u64) -> u64 {
229    now_unix / RELAY_RENDEZVOUS_BUCKET_SECS
230}
231
232// ── Relay Quality Scoring (Client-Side) ───────────────────────────
233
234/// Client-side observed quality score for a relay.
235///
236/// Selection uses observed scores, not self-reported load.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct RelayScore {
239    pub relay_pubkey: [u8; 32],
240    pub success_count: u32,
241    pub failure_count: u32,
242    pub avg_latency_ms: u32,
243    pub last_probe_at: u64,
244    /// Computed score — higher is better.  Range roughly -1.0 .. 1.0
245    /// for normalized scoring; new/unknown relays start at 0.0.
246    pub score: f32,
247}
248
249impl RelayScore {
250    /// Create a neutral score for a newly discovered relay.
251    pub fn new(relay_pubkey: [u8; 32], now_unix: u64) -> Self {
252        Self {
253            relay_pubkey,
254            success_count: 0,
255            failure_count: 0,
256            avg_latency_ms: 0,
257            last_probe_at: now_unix,
258            score: 0.0,
259        }
260    }
261
262    /// Record a successful tunnel establishment / data transfer.
263    pub fn record_success(&mut self, latency_ms: u32, now_unix: u64) {
264        self.success_count = self.success_count.saturating_add(1);
265        // Exponential moving average of latency.
266        if self.avg_latency_ms == 0 {
267            self.avg_latency_ms = latency_ms;
268        } else {
269            self.avg_latency_ms = (self.avg_latency_ms * 3 + latency_ms) / 4;
270        }
271        self.last_probe_at = now_unix;
272        self.recompute();
273    }
274
275    /// Record a failure (timeout, connection refused, etc.).
276    pub fn record_failure(&mut self, now_unix: u64) {
277        self.failure_count = self.failure_count.saturating_add(1);
278        self.last_probe_at = now_unix;
279        self.recompute();
280    }
281
282    /// Apply time-based decay toward neutral (0.0).
283    pub fn apply_decay(&mut self, now_unix: u64) {
284        let age_hours = now_unix.saturating_sub(self.last_probe_at) / 3600;
285        if age_hours > 0 {
286            let decay = 0.95_f32.powi(age_hours.min(100) as i32);
287            self.score *= decay;
288        }
289    }
290
291    fn recompute(&mut self) {
292        let total = self.success_count + self.failure_count;
293        if total == 0 {
294            self.score = 0.0;
295            return;
296        }
297        // Success ratio biased: successes weigh +1, failures weigh -3.
298        let raw =
299            (self.success_count as f32 - 3.0 * self.failure_count as f32) / (total as f32 * 2.0);
300        self.score = raw.clamp(-1.0, 1.0);
301    }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum RelayPayloadKind {
306    Control,
307    Content,
308}
309
310#[derive(Debug, Clone)]
311pub struct RelayLimits {
312    pub max_control_bytes_per_day: u64,
313    pub max_content_bytes_per_day: u64,
314    pub max_streams_per_day: usize,
315    pub content_relay_enabled: bool,
316}
317
318impl Default for RelayLimits {
319    fn default() -> Self {
320        Self {
321            max_control_bytes_per_day: 16 * 1024 * 1024,
322            max_content_bytes_per_day: 4 * 1024 * 1024,
323            max_streams_per_day: 1024,
324            content_relay_enabled: false,
325        }
326    }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct RelaySlot {
331    pub relay_slot_id: u64,
332    pub owner_peer: String,
333    /// The peer that connected to this slot (if any).
334    pub requester_peer: Option<String>,
335    pub expires_at: u64,
336}
337
338#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct RelayLink {
340    pub relay_slot_id: u64,
341    pub owner_peer: String,
342    pub requester_peer: String,
343}
344
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub struct RelayStream {
347    pub relay_slot_id: u64,
348    pub stream_id: u32,
349    pub kind: RelayPayloadKind,
350    pub from_peer: String,
351    pub to_peer: String,
352    pub payload: Vec<u8>,
353}
354
355#[derive(Debug, Clone)]
356struct RelayUsage {
357    day_bucket: u64,
358    control_bytes: u64,
359    content_bytes: u64,
360    stream_ids: HashSet<u32>,
361}
362
363impl RelayUsage {
364    fn new(day_bucket: u64) -> Self {
365        Self {
366            day_bucket,
367            control_bytes: 0,
368            content_bytes: 0,
369            stream_ids: HashSet::new(),
370        }
371    }
372}
373
374#[derive(Debug, Default, Clone)]
375pub struct RelayManager {
376    slots: HashMap<u64, RelaySlot>,
377    /// Usage quotas tracked per **peer identity** (owner pubkey string)
378    /// rather than per slot, so that re-registering a new slot does not
379    /// grant fresh quotas.
380    usage: HashMap<String, RelayUsage>,
381    limits: RelayLimits,
382    /// Locally cached relay announcements, keyed by relay pubkey.
383    /// Populated via `ingest_announcement` from Relay-PEX responses
384    /// and DHT lookups (§4.9).
385    announcements: HashMap<[u8; 32], RelayAnnouncement>,
386}
387
388impl RelayManager {
389    pub fn set_limits(&mut self, limits: RelayLimits) {
390        self.limits = limits;
391    }
392
393    /// Return all cached relay announcements for Relay-PEX responses.
394    pub fn known_announcements(&self) -> Vec<RelayAnnouncement> {
395        self.announcements.values().cloned().collect()
396    }
397
398    /// Validate and store a relay announcement in the local cache.
399    ///
400    /// Performs full structural + signature verification. Fresh announcements
401    /// replace older ones for the same relay pubkey. Stale entries are pruned
402    /// after each successful ingestion (§4.9).
403    pub fn ingest_announcement(&mut self, ann: RelayAnnouncement, now: u64) -> anyhow::Result<()> {
404        ann.validate_structure()?;
405        if !ann.is_fresh(now) {
406            anyhow::bail!("relay announcement is already expired");
407        }
408        ann.verify_signature()?;
409        // Replace any existing entry for this relay pubkey.
410        self.announcements.insert(ann.relay_pubkey, ann);
411        self.prune_stale_announcements(now);
412        Ok(())
413    }
414
415    /// Remove announcements whose `expires_at` is in the past.
416    pub fn prune_stale_announcements(&mut self, now: u64) {
417        self.announcements.retain(|_, ann| ann.is_fresh(now));
418    }
419
420    pub fn register(&mut self, owner_peer: String, now: u64) -> RelaySlot {
421        // Generate a random slot ID to prevent enumeration attacks.
422        let slot_id = loop {
423            let candidate = rand::random::<u64>();
424            if candidate != 0 && !self.slots.contains_key(&candidate) {
425                break candidate;
426            }
427        };
428        let slot = RelaySlot {
429            relay_slot_id: slot_id,
430            owner_peer,
431            requester_peer: None,
432            expires_at: now.saturating_add(RELAY_SLOT_TTL_SECS),
433        };
434        self.slots.insert(slot.relay_slot_id, slot.clone());
435        slot
436    }
437
438    pub fn register_or_renew(
439        &mut self,
440        owner_peer: String,
441        relay_slot_id: Option<u64>,
442        now: u64,
443    ) -> anyhow::Result<RelaySlot> {
444        self.evict_expired(now);
445        let Some(slot_id) = relay_slot_id else {
446            return Ok(self.register(owner_peer, now));
447        };
448        let slot = self
449            .slots
450            .get_mut(&slot_id)
451            .ok_or_else(|| anyhow::anyhow!("relay slot not found"))?;
452        if slot.owner_peer != owner_peer {
453            anyhow::bail!("relay slot owner mismatch");
454        }
455        slot.expires_at = now.saturating_add(RELAY_SLOT_TTL_SECS);
456        Ok(slot.clone())
457    }
458
459    pub fn connect(
460        &mut self,
461        requester_peer: String,
462        relay_slot_id: u64,
463        now: u64,
464    ) -> anyhow::Result<RelayLink> {
465        self.evict_expired(now);
466        let slot = self
467            .slots
468            .get_mut(&relay_slot_id)
469            .ok_or_else(|| anyhow::anyhow!("relay slot not found"))?;
470
471        // Record who connected so relay_stream can route back.
472        slot.requester_peer = Some(requester_peer.clone());
473
474        Ok(RelayLink {
475            relay_slot_id,
476            owner_peer: slot.owner_peer.clone(),
477            requester_peer,
478        })
479    }
480
481    pub fn relay_stream(
482        &mut self,
483        relay_slot_id: u64,
484        stream_id: u32,
485        kind: RelayPayloadKind,
486        from_peer: String,
487        payload: Vec<u8>,
488        now: u64,
489    ) -> anyhow::Result<RelayStream> {
490        self.evict_expired(now);
491        let slot = self
492            .slots
493            .get(&relay_slot_id)
494            .cloned()
495            .ok_or_else(|| anyhow::anyhow!("relay slot not found"))?;
496
497        self.enforce_quota(&slot.owner_peer, stream_id, kind, payload.len(), now)?;
498
499        let to_peer = if from_peer == slot.owner_peer {
500            // Owner is sending → route to the connected requester.
501            slot.requester_peer
502                .ok_or_else(|| anyhow::anyhow!("no requester connected to relay slot"))?
503        } else if slot.requester_peer.as_ref() == Some(&from_peer) {
504            // Requester is sending → route to the owner.
505            slot.owner_peer
506        } else {
507            // Reject: sender is neither the slot owner nor the
508            // registered requester — unauthorized relay access.
509            anyhow::bail!("unauthorized peer for relay slot");
510        };
511
512        Ok(RelayStream {
513            relay_slot_id,
514            stream_id,
515            kind,
516            from_peer,
517            to_peer,
518            payload,
519        })
520    }
521
522    fn enforce_quota(
523        &mut self,
524        owner_peer: &str,
525        stream_id: u32,
526        kind: RelayPayloadKind,
527        payload_len: usize,
528        now: u64,
529    ) -> anyhow::Result<()> {
530        if matches!(kind, RelayPayloadKind::Content) && !self.limits.content_relay_enabled {
531            anyhow::bail!("content relay is disabled");
532        }
533
534        let day_bucket = now / 86_400;
535        let usage = self
536            .usage
537            .entry(owner_peer.to_string())
538            .or_insert_with(|| RelayUsage::new(day_bucket));
539        if usage.day_bucket != day_bucket {
540            *usage = RelayUsage::new(day_bucket);
541        }
542
543        if usage.stream_ids.insert(stream_id)
544            && usage.stream_ids.len() > self.limits.max_streams_per_day
545        {
546            anyhow::bail!("relay stream quota exceeded");
547        }
548
549        let bytes = payload_len as u64;
550        match kind {
551            RelayPayloadKind::Control => {
552                if usage.control_bytes.saturating_add(bytes) > self.limits.max_control_bytes_per_day
553                {
554                    anyhow::bail!("relay control-byte quota exceeded");
555                }
556                usage.control_bytes = usage.control_bytes.saturating_add(bytes);
557            }
558            RelayPayloadKind::Content => {
559                if usage.content_bytes.saturating_add(bytes) > self.limits.max_content_bytes_per_day
560                {
561                    anyhow::bail!("relay content-byte quota exceeded");
562                }
563                usage.content_bytes = usage.content_bytes.saturating_add(bytes);
564            }
565        }
566        Ok(())
567    }
568
569    fn evict_expired(&mut self, now: u64) {
570        self.slots.retain(|_slot_id, slot| slot.expires_at > now);
571        // Prune usage entries whose peer no longer owns any active slot.
572        let active_peers: std::collections::HashSet<&str> =
573            self.slots.values().map(|s| s.owner_peer.as_str()).collect();
574        self.usage
575            .retain(|peer, _| active_peers.contains(peer.as_str()));
576    }
577}
578
579// ── Relay Tunnel Registry ──────────────────────────────────────────
580
581/// A request forwarded through a relay tunnel.
582///
583/// The relay node receives a request envelope from a downloader, sends
584/// it through this channel to the firewalled node's bridge loop, and
585/// waits for the response on the `oneshot` sender.
586pub type RelayTunnelRequest = (Envelope, oneshot::Sender<Envelope>);
587
588/// Shared registry of active relay tunnels.
589///
590/// Each tunnel corresponds to a firewalled node that has registered a
591/// relay slot with `tunnel: true`.  The registry maps `slot_id` to an
592/// `mpsc::Sender` that feeds the bridge loop for that connection.
593///
594/// This type is cheaply cloneable (interior `Arc<Mutex<…>>`).
595#[derive(Clone, Default)]
596pub struct RelayTunnelRegistry {
597    inner: std::sync::Arc<Mutex<HashMap<u64, mpsc::Sender<RelayTunnelRequest>>>>,
598}
599
600impl RelayTunnelRegistry {
601    pub fn new() -> Self {
602        Self::default()
603    }
604
605    /// Register a relay tunnel for a slot.
606    ///
607    /// Returns an `mpsc::Receiver` that the bridge loop should read
608    /// forwarded requests from.
609    pub async fn register(
610        &self,
611        slot_id: u64,
612        capacity: usize,
613    ) -> mpsc::Receiver<RelayTunnelRequest> {
614        let (tx, rx) = mpsc::channel(capacity);
615        self.inner.lock().await.insert(slot_id, tx);
616        rx
617    }
618
619    /// Remove a relay tunnel when the firewalled node disconnects.
620    pub async fn remove(&self, slot_id: u64) {
621        self.inner.lock().await.remove(&slot_id);
622    }
623
624    /// Forward an envelope to the firewalled node behind `slot_id`.
625    ///
626    /// Returns the response envelope, or an error if the tunnel is not
627    /// found / the bridge loop has shut down / the timeout expires.
628    pub async fn forward(
629        &self,
630        slot_id: u64,
631        request: Envelope,
632        timeout: std::time::Duration,
633    ) -> anyhow::Result<Envelope> {
634        let tx = {
635            let tunnels = self.inner.lock().await;
636            tunnels
637                .get(&slot_id)
638                .cloned()
639                .ok_or_else(|| anyhow::anyhow!("relay tunnel not found for slot {slot_id}"))?
640        };
641        let (resp_tx, resp_rx) = oneshot::channel();
642        tx.send((request, resp_tx))
643            .await
644            .map_err(|_| anyhow::anyhow!("relay bridge loop closed for slot {slot_id}"))?;
645        tokio::time::timeout(timeout, resp_rx)
646            .await
647            .map_err(|_| anyhow::anyhow!("relay tunnel response timed out for slot {slot_id}"))?
648            .map_err(|_| anyhow::anyhow!("relay bridge dropped response for slot {slot_id}"))
649    }
650
651    /// Check whether a tunnel exists for `slot_id`.
652    pub async fn has_tunnel(&self, slot_id: u64) -> bool {
653        self.inner.lock().await.contains_key(&slot_id)
654    }
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660
661    #[test]
662    fn register_and_connect_roundtrip() {
663        let mut relay = RelayManager::default();
664        let slot = relay.register("peer-a".into(), 100);
665        let link = relay
666            .connect("peer-b".into(), slot.relay_slot_id, 101)
667            .expect("connect");
668        assert_eq!(link.owner_peer, "peer-a");
669        assert_eq!(link.requester_peer, "peer-b");
670    }
671
672    #[test]
673    fn expired_slots_are_rejected() {
674        let mut relay = RelayManager::default();
675        let slot = relay.register("peer-a".into(), 100);
676        let err = relay
677            .connect(
678                "peer-b".into(),
679                slot.relay_slot_id,
680                100 + RELAY_SLOT_TTL_SECS + 1,
681            )
682            .expect_err("must expire");
683        assert!(err.to_string().contains("not found"));
684    }
685
686    #[test]
687    fn register_or_renew_extends_existing_slot() {
688        let mut relay = RelayManager::default();
689        let slot = relay.register("peer-a".into(), 100);
690        let renewed = relay
691            .register_or_renew("peer-a".into(), Some(slot.relay_slot_id), 150)
692            .expect("renew");
693        assert_eq!(renewed.relay_slot_id, slot.relay_slot_id);
694        assert_eq!(renewed.expires_at, 150 + RELAY_SLOT_TTL_SECS);
695    }
696
697    #[test]
698    fn renew_rejects_wrong_owner() {
699        let mut relay = RelayManager::default();
700        let slot = relay.register("peer-a".into(), 100);
701        let err = relay
702            .register_or_renew("peer-b".into(), Some(slot.relay_slot_id), 101)
703            .expect_err("owner mismatch must fail");
704        assert!(err.to_string().contains("owner mismatch"));
705    }
706
707    #[test]
708    fn content_relay_disabled_by_default() {
709        let mut relay = RelayManager::default();
710        let slot = relay.register("peer-a".into(), 100);
711        relay
712            .connect("peer-b".into(), slot.relay_slot_id, 101)
713            .expect("connect");
714        let err = relay
715            .relay_stream(
716                slot.relay_slot_id,
717                1,
718                RelayPayloadKind::Content,
719                "peer-b".into(),
720                vec![1, 2, 3],
721                102,
722            )
723            .expect_err("content relay disabled");
724        assert!(err.to_string().contains("disabled"));
725    }
726
727    #[test]
728    fn unauthorized_peer_is_rejected_from_relay_stream() {
729        let mut relay = RelayManager::default();
730        let slot = relay.register("peer-a".into(), 100);
731        relay
732            .connect("peer-b".into(), slot.relay_slot_id, 101)
733            .expect("connect");
734
735        // Owner can stream.
736        relay
737            .relay_stream(
738                slot.relay_slot_id,
739                1,
740                RelayPayloadKind::Control,
741                "peer-a".into(),
742                vec![1],
743                102,
744            )
745            .expect("owner should succeed");
746
747        // Requester can stream.
748        relay
749            .relay_stream(
750                slot.relay_slot_id,
751                2,
752                RelayPayloadKind::Control,
753                "peer-b".into(),
754                vec![2],
755                103,
756            )
757            .expect("requester should succeed");
758
759        // Third party must be rejected.
760        let err = relay
761            .relay_stream(
762                slot.relay_slot_id,
763                3,
764                RelayPayloadKind::Control,
765                "peer-evil".into(),
766                vec![3],
767                104,
768            )
769            .expect_err("unauthorized peer must be rejected");
770        assert!(err.to_string().contains("unauthorized"));
771    }
772
773    #[test]
774    fn control_bytes_quota_is_enforced() {
775        let mut relay = RelayManager::default();
776        relay.set_limits(RelayLimits {
777            max_control_bytes_per_day: 5,
778            ..RelayLimits::default()
779        });
780        let slot = relay.register("peer-a".into(), 100);
781        relay
782            .connect("peer-b".into(), slot.relay_slot_id, 101)
783            .expect("connect");
784        relay
785            .relay_stream(
786                slot.relay_slot_id,
787                1,
788                RelayPayloadKind::Control,
789                "peer-b".into(),
790                vec![1, 2],
791                102,
792            )
793            .expect("within quota");
794        let err = relay
795            .relay_stream(
796                slot.relay_slot_id,
797                2,
798                RelayPayloadKind::Control,
799                "peer-b".into(),
800                vec![1, 2, 3, 4],
801                103,
802            )
803            .expect_err("must exceed quota");
804        assert!(err.to_string().contains("quota"));
805    }
806
807    #[test]
808    fn stream_count_quota_is_enforced() {
809        let mut relay = RelayManager::default();
810        relay.set_limits(RelayLimits {
811            max_streams_per_day: 1,
812            ..RelayLimits::default()
813        });
814        let slot = relay.register("peer-a".into(), 100);
815        relay
816            .connect("peer-b".into(), slot.relay_slot_id, 101)
817            .expect("connect");
818        relay
819            .relay_stream(
820                slot.relay_slot_id,
821                1,
822                RelayPayloadKind::Control,
823                "peer-b".into(),
824                vec![1],
825                102,
826            )
827            .expect("first stream id");
828        let err = relay
829            .relay_stream(
830                slot.relay_slot_id,
831                2,
832                RelayPayloadKind::Control,
833                "peer-b".into(),
834                vec![1],
835                103,
836            )
837            .expect_err("must exceed stream cap");
838        assert!(err.to_string().contains("stream quota"));
839    }
840
841    // ── RelayAnnouncement tests ───────────────────────────────────
842
843    fn make_relay_announcement(
844        issued_at: u64,
845        expires_at: u64,
846        relay_caps: bool,
847        has_relay_via: bool,
848    ) -> RelayAnnouncement {
849        let addr = PeerAddr {
850            ip: "1.2.3.4".parse().unwrap(),
851            port: 9000,
852            transport: crate::peer::TransportProtocol::Tcp,
853            pubkey_hint: None,
854            relay_via: if has_relay_via {
855                Some(crate::peer::RelayRoute {
856                    relay_addr: Box::new(PeerAddr {
857                        ip: "5.6.7.8".parse().unwrap(),
858                        port: 9001,
859                        transport: crate::peer::TransportProtocol::Tcp,
860                        pubkey_hint: None,
861                        relay_via: None,
862                    }),
863                    slot_id: 1,
864                })
865            } else {
866                None
867            },
868        };
869        RelayAnnouncement {
870            relay_pubkey: [1u8; 32],
871            relay_addrs: vec![addr],
872            capabilities: Capabilities {
873                relay: relay_caps,
874                ..Default::default()
875            },
876            capacity: RelayCapacity::default(),
877            issued_at,
878            expires_at,
879            signature: vec![0u8; 64],
880        }
881    }
882
883    #[test]
884    fn relay_announcement_validate_ok() {
885        let ann = make_relay_announcement(1000, 1000 + 3600, true, false);
886        ann.validate_structure().expect("valid");
887    }
888
889    #[test]
890    fn relay_announcement_rejects_no_relay_cap() {
891        let ann = make_relay_announcement(1000, 1000 + 3600, false, false);
892        let err = ann.validate_structure().expect_err("must reject");
893        assert!(err.to_string().contains("relay"));
894    }
895
896    #[test]
897    fn relay_announcement_rejects_excessive_ttl() {
898        let ann = make_relay_announcement(
899            1000,
900            1000 + RELAY_ANNOUNCEMENT_MAX_TTL_SECS + 1,
901            true,
902            false,
903        );
904        let err = ann.validate_structure().expect_err("must reject");
905        assert!(err.to_string().contains("TTL"));
906    }
907
908    #[test]
909    fn relay_announcement_rejects_relay_via_chains() {
910        let ann = make_relay_announcement(1000, 1000 + 3600, true, true);
911        let err = ann.validate_structure().expect_err("must reject");
912        assert!(err.to_string().contains("direct"));
913    }
914
915    #[test]
916    fn relay_announcement_freshness() {
917        let ann = make_relay_announcement(1000, 2000, true, false);
918        assert!(ann.is_fresh(1500));
919        assert!(!ann.is_fresh(2000));
920        assert!(!ann.is_fresh(3000));
921    }
922
923    #[test]
924    fn relay_announcement_signing_bytes_deterministic() {
925        let ann = make_relay_announcement(1000, 2000, true, false);
926        let bytes1 = ann.signing_bytes();
927        let bytes2 = ann.signing_bytes();
928        assert_eq!(bytes1, bytes2);
929        assert!(!bytes1.is_empty());
930    }
931
932    #[test]
933    fn relay_announcement_cbor_roundtrip() {
934        let ann = make_relay_announcement(1000, 2000, true, false);
935        let encoded = crate::cbor::to_vec(&ann).expect("encode");
936        let decoded: RelayAnnouncement = crate::cbor::from_slice(&encoded).expect("decode");
937        assert_eq!(decoded.relay_pubkey, ann.relay_pubkey);
938        assert_eq!(decoded.issued_at, 1000);
939        assert_eq!(decoded.expires_at, 2000);
940    }
941
942    // ── Rendezvous key tests ──────────────────────────────────────
943
944    #[test]
945    fn rendezvous_index_within_bounds() {
946        let pubkey = [42u8; 32];
947        for bucket in 0..100 {
948            let i0 = relay_rendezvous_index(&pubkey, bucket, 0);
949            let i1 = relay_rendezvous_index(&pubkey, bucket, 1);
950            assert!(i0 < RELAY_RENDEZVOUS_N);
951            assert!(i1 < RELAY_RENDEZVOUS_N);
952        }
953    }
954
955    #[test]
956    fn rendezvous_index_different_for_different_pubkeys() {
957        let pk1 = [1u8; 32];
958        let pk2 = [2u8; 32];
959        // Not guaranteed to be different for any single case, but
960        // with different pubkeys we should see variation.
961        let mut indices1 = Vec::new();
962        let mut indices2 = Vec::new();
963        for b in 0..50 {
964            indices1.push(relay_rendezvous_index(&pk1, b, 0));
965            indices2.push(relay_rendezvous_index(&pk2, b, 0));
966        }
967        // At least some should differ.
968        assert_ne!(indices1, indices2);
969    }
970
971    #[test]
972    fn rendezvous_key_deterministic() {
973        let k1 = relay_rendezvous_key(100, 5);
974        let k2 = relay_rendezvous_key(100, 5);
975        assert_eq!(k1, k2);
976        // Different bucket → different key.
977        let k3 = relay_rendezvous_key(101, 5);
978        assert_ne!(k1, k3);
979        // Different slot → different key.
980        let k4 = relay_rendezvous_key(100, 6);
981        assert_ne!(k1, k4);
982    }
983
984    #[test]
985    fn current_rendezvous_bucket_is_stable_within_window() {
986        let now = 1_000_000u64;
987        let b1 = current_rendezvous_bucket(now);
988        let b2 = current_rendezvous_bucket(now + 1);
989        assert_eq!(b1, b2);
990        // After one full bucket interval, it changes.
991        let b3 = current_rendezvous_bucket(now + RELAY_RENDEZVOUS_BUCKET_SECS);
992        assert_eq!(b3, b1 + 1);
993    }
994
995    // ── RelayScore tests ──────────────────────────────────────────
996
997    #[test]
998    fn relay_score_starts_neutral() {
999        let score = RelayScore::new([0u8; 32], 1000);
1000        assert_eq!(score.score, 0.0);
1001        assert_eq!(score.success_count, 0);
1002        assert_eq!(score.failure_count, 0);
1003    }
1004
1005    #[test]
1006    fn relay_score_increases_on_success() {
1007        let mut score = RelayScore::new([0u8; 32], 1000);
1008        score.record_success(50, 1001);
1009        assert!(score.score > 0.0);
1010        assert_eq!(score.success_count, 1);
1011        assert_eq!(score.avg_latency_ms, 50);
1012    }
1013
1014    #[test]
1015    fn relay_score_decreases_on_failure() {
1016        let mut score = RelayScore::new([0u8; 32], 1000);
1017        score.record_failure(1001);
1018        assert!(score.score < 0.0);
1019        assert_eq!(score.failure_count, 1);
1020    }
1021
1022    #[test]
1023    fn relay_score_decay_trends_to_neutral() {
1024        let mut score = RelayScore::new([0u8; 32], 1000);
1025        score.record_success(50, 1000);
1026        let before = score.score;
1027        score.apply_decay(1000 + 3600 * 10); // 10 hours later
1028        assert!(score.score.abs() < before.abs());
1029    }
1030
1031    #[test]
1032    fn relay_score_latency_ema() {
1033        let mut score = RelayScore::new([0u8; 32], 1000);
1034        score.record_success(100, 1001);
1035        assert_eq!(score.avg_latency_ms, 100);
1036        score.record_success(200, 1002);
1037        // EMA: (100 * 3 + 200) / 4 = 125
1038        assert_eq!(score.avg_latency_ms, 125);
1039    }
1040}