Skip to main content

scp2p_core/api/
node_relay.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7//! Relay operations on `NodeHandle`: register, connect, stream, peer selection.
8
9use ed25519_dalek::SigningKey;
10
11use crate::{
12    net_fetch::{PeerConnector, RequestTransport, send_request_on_stream},
13    peer::PeerAddr,
14    relay::{RelayAnnouncement, RelayCapacity, RelayLimits},
15    wire::{
16        Envelope, FLAG_RESPONSE, MsgType, RelayConnect, RelayPayloadKind as WireRelayPayloadKind,
17        RelayRegister, RelayRegistered, RelayStream, WirePayload,
18    },
19};
20
21use super::{
22    AbuseLimits, ActiveRelaySlot, NodeHandle,
23    helpers::{
24        now_unix_secs, query_relay_list, relay_payload_kind_to_internal,
25        relay_payload_kind_to_wire, relay_peer_key,
26    },
27};
28
29impl NodeHandle {
30    pub async fn relay_register(&self, peer_addr: PeerAddr) -> anyhow::Result<RelayRegistered> {
31        self.relay_register_with_slot(peer_addr, None).await
32    }
33
34    pub async fn relay_register_with_slot(
35        &self,
36        peer_addr: PeerAddr,
37        relay_slot_id: Option<u64>,
38    ) -> anyhow::Result<RelayRegistered> {
39        let mut state = self.state.write().await;
40        let now = now_unix_secs()?;
41        let slot = state
42            .relay
43            .register_or_renew(relay_peer_key(&peer_addr), relay_slot_id, now)?;
44        Ok(RelayRegistered {
45            relay_slot_id: slot.relay_slot_id,
46            expires_at: slot.expires_at,
47        })
48    }
49
50    pub async fn relay_connect(
51        &self,
52        peer_addr: PeerAddr,
53        req: RelayConnect,
54    ) -> anyhow::Result<()> {
55        let mut state = self.state.write().await;
56        state.relay.connect(
57            relay_peer_key(&peer_addr),
58            req.relay_slot_id,
59            now_unix_secs()?,
60        )?;
61        Ok(())
62    }
63
64    pub async fn relay_stream(
65        &self,
66        peer_addr: PeerAddr,
67        req: RelayStream,
68    ) -> anyhow::Result<RelayStream> {
69        let mut state = self.state.write().await;
70        let peer_key = relay_peer_key(&peer_addr);
71        let score = *state.relay_scores.get(&peer_key).unwrap_or(&0);
72        if req.kind == WireRelayPayloadKind::Content && score < 2 {
73            let score_mut = state.relay_scores.entry(peer_key).or_insert(0);
74            *score_mut = (*score_mut - 1).clamp(-10, 10);
75            anyhow::bail!("content relay requires positive trust score");
76        }
77        let max_payload_bytes = if score >= 5 {
78            512 * 1024
79        } else if score >= 0 {
80            128 * 1024
81        } else {
82            32 * 1024
83        };
84        if req.payload.len() > max_payload_bytes {
85            let score_mut = state
86                .relay_scores
87                .entry(relay_peer_key(&peer_addr))
88                .or_insert(0);
89            *score_mut = (*score_mut - 1).clamp(-10, 10);
90            anyhow::bail!("relay payload exceeds adaptive limit for peer score");
91        }
92        let relayed = state.relay.relay_stream(
93            req.relay_slot_id,
94            req.stream_id,
95            relay_payload_kind_to_internal(req.kind),
96            relay_peer_key(&peer_addr),
97            req.payload,
98            now_unix_secs()?,
99        )?;
100        let score_mut = state
101            .relay_scores
102            .entry(relay_peer_key(&peer_addr))
103            .or_insert(0);
104        *score_mut = (*score_mut + 1).clamp(-10, 10);
105
106        Ok(RelayStream {
107            relay_slot_id: relayed.relay_slot_id,
108            stream_id: relayed.stream_id,
109            kind: relay_payload_kind_to_wire(relayed.kind),
110            payload: relayed.payload,
111        })
112    }
113
114    pub async fn set_relay_limits(&self, limits: RelayLimits) -> anyhow::Result<()> {
115        let mut state = self.state.write().await;
116        state.relay.set_limits(limits);
117        Ok(())
118    }
119
120    pub async fn set_abuse_limits(&self, limits: AbuseLimits) -> anyhow::Result<()> {
121        let mut state = self.state.write().await;
122        state.abuse_limits = limits;
123        state.abuse_counters.clear();
124        Ok(())
125    }
126
127    pub async fn note_relay_result(&self, peer: &PeerAddr, success: bool) -> anyhow::Result<()> {
128        let mut state = self.state.write().await;
129        let key = relay_peer_key(peer);
130        let delta = if success { 1 } else { -3 };
131        let score = state.relay_scores.entry(key).or_insert(0);
132        *score = (*score + delta).clamp(-10, 10);
133        Ok(())
134    }
135
136    pub async fn select_relay_peer(&self) -> anyhow::Result<Option<PeerAddr>> {
137        Ok(self.select_relay_peers(1).await?.into_iter().next())
138    }
139
140    pub async fn select_relay_peers(&self, max_peers: usize) -> anyhow::Result<Vec<PeerAddr>> {
141        let now = now_unix_secs()?;
142        let mut state = self.state.write().await;
143
144        // Prefer peers with known relay=true capability (PeerDb).
145        let mut relay_capable: Vec<PeerAddr> = state
146            .peer_db
147            .relay_capable_peers(now)
148            .into_iter()
149            .map(|record| record.addr.clone())
150            .collect();
151
152        // Also add addresses from announcement cache (§4.9).
153        // Only use fresh announcements.
154        let announced_addrs: Vec<PeerAddr> = state
155            .relay
156            .known_announcements()
157            .into_iter()
158            .filter(|ann| ann.is_fresh(now))
159            .flat_map(|ann| ann.relay_addrs)
160            .collect();
161        // Merge announced addresses — de-dupe by relay_peer_key().
162        let existing_keys: std::collections::HashSet<String> =
163            relay_capable.iter().map(relay_peer_key).collect();
164        for addr in announced_addrs {
165            if !existing_keys.contains(&relay_peer_key(&addr)) {
166                relay_capable.push(addr);
167            }
168        }
169
170        // Fall back to all fresh peers if no relay-capable ones are known.
171        let mut candidates = if relay_capable.is_empty() {
172            state
173                .peer_db
174                .all_records()
175                .into_iter()
176                .filter(|record| {
177                    now.saturating_sub(record.last_seen_unix)
178                        <= crate::peer_db::PEX_FRESHNESS_WINDOW_SECS
179                })
180                .map(|record| record.addr)
181                .collect::<Vec<_>>()
182        } else {
183            relay_capable
184        };
185
186        candidates.sort_by_key(relay_peer_key);
187        if candidates.is_empty() {
188            return Ok(vec![]);
189        }
190
191        let best_score = candidates
192            .iter()
193            .map(|peer| *state.relay_scores.get(&relay_peer_key(peer)).unwrap_or(&0))
194            .max()
195            .unwrap_or(0);
196        let preferred = candidates
197            .into_iter()
198            .filter(|peer| {
199                *state.relay_scores.get(&relay_peer_key(peer)).unwrap_or(&0) == best_score
200            })
201            .collect::<Vec<_>>();
202        let source = if preferred.is_empty() {
203            vec![]
204        } else {
205            preferred
206        };
207        if source.is_empty() {
208            return Ok(vec![]);
209        }
210
211        let cap = max_peers.max(1).min(source.len());
212        let start = state.relay_rotation_cursor % source.len();
213        state.relay_rotation_cursor = state.relay_rotation_cursor.saturating_add(1);
214        let mut selected = Vec::with_capacity(cap);
215        for idx in 0..cap {
216            selected.push(source[(start + idx) % source.len()].clone());
217        }
218        Ok(selected)
219    }
220
221    // ── Relay Discovery: Relay-PEX client (§4.9) ────────────────────────
222
223    /// Ask a single peer for its cached relay announcements (Relay-PEX).
224    ///
225    /// Returns the raw announcement list as sent by the peer; callers
226    /// should pass results through `ingest_relay_announcements` for
227    /// validation and local caching.
228    pub async fn fetch_relay_list_from_peer<T: RequestTransport + ?Sized>(
229        &self,
230        transport: &T,
231        peer: &PeerAddr,
232        max_count: u16,
233    ) -> anyhow::Result<Vec<RelayAnnouncement>> {
234        query_relay_list(transport, peer, max_count).await
235    }
236
237    /// Validate and ingest a batch of relay announcements into the local cache.
238    ///
239    /// Each announcement is independently validated (structure + signature +
240    /// freshness).  Invalid or expired entries are silently skipped.
241    /// Returns the number of successfully ingested announcements.
242    pub async fn ingest_relay_announcements(
243        &self,
244        announcements: Vec<RelayAnnouncement>,
245    ) -> anyhow::Result<usize> {
246        let mut state = self.state.write().await;
247        let now = now_unix_secs()?;
248        let mut ingested = 0usize;
249        for ann in announcements {
250            if state.relay.ingest_announcement(ann, now).is_ok() {
251                ingested += 1;
252            }
253        }
254        Ok(ingested)
255    }
256
257    /// Discover relay nodes by querying a set of seed peers via Relay-PEX.
258    ///
259    /// Contacts up to `max_peers` seed peers, collects their relay lists,
260    /// and ingests all valid announcements into the local cache.
261    /// Returns the total number of newly ingested relay announcements.
262    pub async fn discover_relays_via_peers<T: RequestTransport + ?Sized>(
263        &self,
264        transport: &T,
265        seed_peers: &[PeerAddr],
266        max_per_peer: u16,
267    ) -> anyhow::Result<usize> {
268        let mut total = 0usize;
269        for peer in seed_peers {
270            if let Ok(announcements) = self
271                .fetch_relay_list_from_peer(transport, peer, max_per_peer)
272                .await
273            {
274                total += self.ingest_relay_announcements(announcements).await?;
275            }
276        }
277        Ok(total)
278    }
279
280    /// Build and sign a relay announcement for this node, then ingest it
281    /// into the local cache so it is returned by `RelayListRequest` handlers.
282    ///
283    /// Call this on startup and periodically when `capabilities.relay = true`.
284    pub async fn publish_relay_announcement(
285        &self,
286        signing_key: &SigningKey,
287        self_addrs: Vec<PeerAddr>,
288        capacity: RelayCapacity,
289        ttl_secs: u64,
290    ) -> anyhow::Result<RelayAnnouncement> {
291        use crate::capabilities::Capabilities;
292        let now = now_unix_secs()?;
293        let ann = RelayAnnouncement::new_signed(
294            signing_key,
295            self_addrs,
296            Capabilities {
297                relay: true,
298                ..Capabilities::default()
299            },
300            capacity,
301            now,
302            ttl_secs,
303        )?;
304        let mut state = self.state.write().await;
305        state.relay.ingest_announcement(ann.clone(), now)?;
306        Ok(ann)
307    }
308
309    /// Publish a relay announcement to the DHT rendezvous keys (§4.9).
310    ///
311    /// The relay's assigned two rendezvous slots are derived from its
312    /// pubkey and the current time bucket.  The announcement is encoded
313    /// as a DHT value and replicated to the `K` closest nodes for each
314    /// slot key.
315    ///
316    /// Returns the total number of successful DHT store operations.
317    pub async fn publish_relay_announcement_to_dht<T: RequestTransport + ?Sized>(
318        &self,
319        transport: &T,
320        ann: &RelayAnnouncement,
321        seed_peers: &[PeerAddr],
322    ) -> anyhow::Result<usize> {
323        use super::helpers::replicate_store_to_closest;
324        use crate::dht::{DEFAULT_TTL_SECS, K};
325        use crate::relay::{
326            current_rendezvous_bucket, relay_rendezvous_index, relay_rendezvous_key,
327        };
328
329        let now = now_unix_secs()?;
330        let bucket_id = current_rendezvous_bucket(now);
331        let encoded = crate::cbor::to_vec(ann)?;
332        // TTL = remaining seconds until bucket rolls over.
333        let bucket_end = (bucket_id + 1) * crate::relay::RELAY_RENDEZVOUS_BUCKET_SECS;
334        let ttl = bucket_end.saturating_sub(now).max(DEFAULT_TTL_SECS);
335
336        let mut total = 0usize;
337        for which in 0u8..2 {
338            let slot = relay_rendezvous_index(&ann.relay_pubkey, bucket_id, which);
339            let key = relay_rendezvous_key(bucket_id, slot);
340            total += replicate_store_to_closest(
341                transport,
342                self,
343                key,
344                encoded.clone(),
345                ttl,
346                seed_peers,
347                K,
348            )
349            .await
350            .unwrap_or(0);
351        }
352        Ok(total)
353    }
354
355    /// Discover relay nodes by looking up all rendezvous slots in the DHT
356    /// for the current time bucket (§4.9).
357    ///
358    /// Iterates over all `RELAY_RENDEZVOUS_N` slots, performs an iterative
359    /// DHT find-value lookup for each, decodes any found values as
360    /// `RelayAnnouncement`, and ingests valid entries into the local cache.
361    ///
362    /// Returns the number of newly ingested announcements.
363    pub async fn discover_relays_from_dht<T: RequestTransport + ?Sized>(
364        &self,
365        transport: &T,
366        seed_peers: &[PeerAddr],
367    ) -> anyhow::Result<usize> {
368        use crate::relay::{
369            RELAY_RENDEZVOUS_N, RelayAnnouncement as RA, current_rendezvous_bucket,
370            relay_rendezvous_key,
371        };
372
373        let now = now_unix_secs()?;
374        let bucket_id = current_rendezvous_bucket(now);
375        let mut found_announcements = Vec::new();
376
377        for slot in 0..RELAY_RENDEZVOUS_N {
378            let key = relay_rendezvous_key(bucket_id, slot);
379            if let Ok(Some(dht_value)) = self
380                .dht_find_value_iterative(transport, key, seed_peers)
381                .await
382                && let Ok(ann) = crate::cbor::from_slice::<RA>(&dht_value.value)
383            {
384                found_announcements.push(ann);
385            }
386        }
387
388        self.ingest_relay_announcements(found_announcements).await
389    }
390
391    // ── Relay client methods (firewalled node side) ─────────────
392
393    /// Register a relay tunnel on a remote relay node.
394    ///
395    /// Connects to `relay_addr` using the provided `connector`, sends
396    /// `RelayRegister { tunnel: true }`, stores the slot info, and
397    /// spawns a background task that keeps the connection alive and
398    /// serves forwarded requests via `serve_wire_stream`.
399    ///
400    /// Returns the assigned `ActiveRelaySlot`.
401    pub async fn register_relay_tunnel<C: PeerConnector + 'static>(
402        &self,
403        connector: &C,
404        relay_addr: &PeerAddr,
405    ) -> anyhow::Result<ActiveRelaySlot> {
406        let mut stream = connector.connect(relay_addr).await?;
407
408        // Send RelayRegister { tunnel: true }
409        let register_payload = WirePayload::RelayRegister(RelayRegister {
410            relay_slot_id: None,
411            tunnel: true,
412        });
413        let request_envelope = Envelope::from_typed(1, 0, &register_payload)?;
414
415        let response = send_request_on_stream(
416            &mut stream,
417            request_envelope,
418            std::time::Duration::from_secs(10),
419        )
420        .await?;
421
422        if response.flags & FLAG_RESPONSE == 0 {
423            anyhow::bail!("relay registration: unexpected non-response");
424        }
425        if response.r#type != MsgType::RelayRegistered as u16 {
426            // Check if it's an error
427            let payload_str = String::from_utf8_lossy(&response.payload);
428            anyhow::bail!("relay registration failed: {}", payload_str);
429        }
430
431        let registered: RelayRegistered = crate::cbor::from_slice(&response.payload)?;
432        let slot = ActiveRelaySlot {
433            relay_addr: relay_addr.clone(),
434            slot_id: registered.relay_slot_id,
435            expires_at: registered.expires_at,
436        };
437
438        // Store in state — add to the list, replacing any existing
439        // slot for the same relay address.
440        {
441            let mut state = self.state.write().await;
442            state
443                .active_relay_slots
444                .retain(|s| s.relay_addr != slot.relay_addr);
445            state.active_relay_slots.push(slot.clone());
446        }
447
448        // Spawn a task that keeps the connection open and serves
449        // forwarded requests.  When the relay sends us requests
450        // (forwarded from downloaders), serve_wire_stream will process
451        // them and reply, just as if a downloader connected directly.
452        let node = self.clone();
453        let relay_addr_key = relay_addr.clone();
454        tokio::spawn(async move {
455            let _ = node.serve_wire_stream(stream, None).await;
456            // Connection lost — remove this specific relay slot.
457            let mut state = node.state.write().await;
458            state
459                .active_relay_slots
460                .retain(|s| s.relay_addr != relay_addr_key);
461        });
462
463        Ok(slot)
464    }
465
466    /// Return the first active relay slot, if any (backward-compat).
467    pub async fn active_relay_slot(&self) -> Option<ActiveRelaySlot> {
468        self.state.read().await.active_relay_slots.first().cloned()
469    }
470
471    /// Return all active relay slots.
472    pub async fn active_relay_slots(&self) -> Vec<ActiveRelaySlot> {
473        self.state.read().await.active_relay_slots.clone()
474    }
475
476    /// Build a `PeerAddr` that includes `relay_via` routing for this node.
477    ///
478    /// If this node has an active relay slot, returns a `PeerAddr` whose
479    /// `relay_via` field points to the relay, allowing remote peers to
480    /// reach this firewalled node through the tunnel.
481    ///
482    /// Uses the first active relay slot.
483    pub async fn relayed_self_addr(&self, self_addr: PeerAddr) -> PeerAddr {
484        let slots = self.state.read().await.active_relay_slots.clone();
485        match slots.first() {
486            Some(active) => PeerAddr {
487                relay_via: Some(crate::peer::RelayRoute {
488                    relay_addr: Box::new(active.relay_addr.clone()),
489                    slot_id: active.slot_id,
490                }),
491                ..self_addr
492            },
493            None => self_addr,
494        }
495    }
496
497    /// Build multiple `PeerAddr` variants, one for each active relay.
498    ///
499    /// For provider announcements, publishing all relay routes lets
500    /// downloaders try routes in parallel with fast failover.
501    pub async fn all_relayed_self_addrs(&self, self_addr: PeerAddr) -> Vec<PeerAddr> {
502        let slots = self.state.read().await.active_relay_slots.clone();
503        if slots.is_empty() {
504            return vec![self_addr];
505        }
506        slots
507            .iter()
508            .map(|active| PeerAddr {
509                relay_via: Some(crate::peer::RelayRoute {
510                    relay_addr: Box::new(active.relay_addr.clone()),
511                    slot_id: active.slot_id,
512                }),
513                ..self_addr.clone()
514            })
515            .collect()
516    }
517}