1use ed25519_dalek::SigningKey;
10
11use crate::{
12 net_fetch::{PeerConnector, RequestTransport, send_request_on_stream},
13 peer::PeerAddr,
14 relay::{RelayAnnouncement, RelayCapacity, RelayLimits},
15 wire::{
16 Envelope, FLAG_RESPONSE, MsgType, RelayConnect, RelayPayloadKind as WireRelayPayloadKind,
17 RelayRegister, RelayRegistered, RelayStream, WirePayload,
18 },
19};
20
21use super::{
22 AbuseLimits, ActiveRelaySlot, NodeHandle,
23 helpers::{
24 now_unix_secs, query_relay_list, relay_payload_kind_to_internal,
25 relay_payload_kind_to_wire, relay_peer_key,
26 },
27};
28
29impl NodeHandle {
30 pub async fn relay_register(&self, peer_addr: PeerAddr) -> anyhow::Result<RelayRegistered> {
31 self.relay_register_with_slot(peer_addr, None).await
32 }
33
34 pub async fn relay_register_with_slot(
35 &self,
36 peer_addr: PeerAddr,
37 relay_slot_id: Option<u64>,
38 ) -> anyhow::Result<RelayRegistered> {
39 let mut state = self.state.write().await;
40 let now = now_unix_secs()?;
41 let slot = state
42 .relay
43 .register_or_renew(relay_peer_key(&peer_addr), relay_slot_id, now)?;
44 Ok(RelayRegistered {
45 relay_slot_id: slot.relay_slot_id,
46 expires_at: slot.expires_at,
47 })
48 }
49
50 pub async fn relay_connect(
51 &self,
52 peer_addr: PeerAddr,
53 req: RelayConnect,
54 ) -> anyhow::Result<()> {
55 let mut state = self.state.write().await;
56 state.relay.connect(
57 relay_peer_key(&peer_addr),
58 req.relay_slot_id,
59 now_unix_secs()?,
60 )?;
61 Ok(())
62 }
63
64 pub async fn relay_stream(
65 &self,
66 peer_addr: PeerAddr,
67 req: RelayStream,
68 ) -> anyhow::Result<RelayStream> {
69 let mut state = self.state.write().await;
70 let peer_key = relay_peer_key(&peer_addr);
71 let score = *state.relay_scores.get(&peer_key).unwrap_or(&0);
72 if req.kind == WireRelayPayloadKind::Content && score < 2 {
73 let score_mut = state.relay_scores.entry(peer_key).or_insert(0);
74 *score_mut = (*score_mut - 1).clamp(-10, 10);
75 anyhow::bail!("content relay requires positive trust score");
76 }
77 let max_payload_bytes = if score >= 5 {
78 512 * 1024
79 } else if score >= 0 {
80 128 * 1024
81 } else {
82 32 * 1024
83 };
84 if req.payload.len() > max_payload_bytes {
85 let score_mut = state
86 .relay_scores
87 .entry(relay_peer_key(&peer_addr))
88 .or_insert(0);
89 *score_mut = (*score_mut - 1).clamp(-10, 10);
90 anyhow::bail!("relay payload exceeds adaptive limit for peer score");
91 }
92 let relayed = state.relay.relay_stream(
93 req.relay_slot_id,
94 req.stream_id,
95 relay_payload_kind_to_internal(req.kind),
96 relay_peer_key(&peer_addr),
97 req.payload,
98 now_unix_secs()?,
99 )?;
100 let score_mut = state
101 .relay_scores
102 .entry(relay_peer_key(&peer_addr))
103 .or_insert(0);
104 *score_mut = (*score_mut + 1).clamp(-10, 10);
105
106 Ok(RelayStream {
107 relay_slot_id: relayed.relay_slot_id,
108 stream_id: relayed.stream_id,
109 kind: relay_payload_kind_to_wire(relayed.kind),
110 payload: relayed.payload,
111 })
112 }
113
114 pub async fn set_relay_limits(&self, limits: RelayLimits) -> anyhow::Result<()> {
115 let mut state = self.state.write().await;
116 state.relay.set_limits(limits);
117 Ok(())
118 }
119
120 pub async fn set_abuse_limits(&self, limits: AbuseLimits) -> anyhow::Result<()> {
121 let mut state = self.state.write().await;
122 state.abuse_limits = limits;
123 state.abuse_counters.clear();
124 Ok(())
125 }
126
127 pub async fn note_relay_result(&self, peer: &PeerAddr, success: bool) -> anyhow::Result<()> {
128 let mut state = self.state.write().await;
129 let key = relay_peer_key(peer);
130 let delta = if success { 1 } else { -3 };
131 let score = state.relay_scores.entry(key).or_insert(0);
132 *score = (*score + delta).clamp(-10, 10);
133 Ok(())
134 }
135
136 pub async fn select_relay_peer(&self) -> anyhow::Result<Option<PeerAddr>> {
137 Ok(self.select_relay_peers(1).await?.into_iter().next())
138 }
139
140 pub async fn select_relay_peers(&self, max_peers: usize) -> anyhow::Result<Vec<PeerAddr>> {
141 let now = now_unix_secs()?;
142 let mut state = self.state.write().await;
143
144 let mut relay_capable: Vec<PeerAddr> = state
146 .peer_db
147 .relay_capable_peers(now)
148 .into_iter()
149 .map(|record| record.addr.clone())
150 .collect();
151
152 let announced_addrs: Vec<PeerAddr> = state
155 .relay
156 .known_announcements()
157 .into_iter()
158 .filter(|ann| ann.is_fresh(now))
159 .flat_map(|ann| ann.relay_addrs)
160 .collect();
161 let existing_keys: std::collections::HashSet<String> =
163 relay_capable.iter().map(relay_peer_key).collect();
164 for addr in announced_addrs {
165 if !existing_keys.contains(&relay_peer_key(&addr)) {
166 relay_capable.push(addr);
167 }
168 }
169
170 let mut candidates = if relay_capable.is_empty() {
172 state
173 .peer_db
174 .all_records()
175 .into_iter()
176 .filter(|record| {
177 now.saturating_sub(record.last_seen_unix)
178 <= crate::peer_db::PEX_FRESHNESS_WINDOW_SECS
179 })
180 .map(|record| record.addr)
181 .collect::<Vec<_>>()
182 } else {
183 relay_capable
184 };
185
186 candidates.sort_by_key(relay_peer_key);
187 if candidates.is_empty() {
188 return Ok(vec![]);
189 }
190
191 let best_score = candidates
192 .iter()
193 .map(|peer| *state.relay_scores.get(&relay_peer_key(peer)).unwrap_or(&0))
194 .max()
195 .unwrap_or(0);
196 let preferred = candidates
197 .into_iter()
198 .filter(|peer| {
199 *state.relay_scores.get(&relay_peer_key(peer)).unwrap_or(&0) == best_score
200 })
201 .collect::<Vec<_>>();
202 let source = if preferred.is_empty() {
203 vec![]
204 } else {
205 preferred
206 };
207 if source.is_empty() {
208 return Ok(vec![]);
209 }
210
211 let cap = max_peers.max(1).min(source.len());
212 let start = state.relay_rotation_cursor % source.len();
213 state.relay_rotation_cursor = state.relay_rotation_cursor.saturating_add(1);
214 let mut selected = Vec::with_capacity(cap);
215 for idx in 0..cap {
216 selected.push(source[(start + idx) % source.len()].clone());
217 }
218 Ok(selected)
219 }
220
221 pub async fn fetch_relay_list_from_peer<T: RequestTransport + ?Sized>(
229 &self,
230 transport: &T,
231 peer: &PeerAddr,
232 max_count: u16,
233 ) -> anyhow::Result<Vec<RelayAnnouncement>> {
234 query_relay_list(transport, peer, max_count).await
235 }
236
237 pub async fn ingest_relay_announcements(
243 &self,
244 announcements: Vec<RelayAnnouncement>,
245 ) -> anyhow::Result<usize> {
246 let mut state = self.state.write().await;
247 let now = now_unix_secs()?;
248 let mut ingested = 0usize;
249 for ann in announcements {
250 if state.relay.ingest_announcement(ann, now).is_ok() {
251 ingested += 1;
252 }
253 }
254 Ok(ingested)
255 }
256
257 pub async fn discover_relays_via_peers<T: RequestTransport + ?Sized>(
263 &self,
264 transport: &T,
265 seed_peers: &[PeerAddr],
266 max_per_peer: u16,
267 ) -> anyhow::Result<usize> {
268 let mut total = 0usize;
269 for peer in seed_peers {
270 if let Ok(announcements) = self
271 .fetch_relay_list_from_peer(transport, peer, max_per_peer)
272 .await
273 {
274 total += self.ingest_relay_announcements(announcements).await?;
275 }
276 }
277 Ok(total)
278 }
279
280 pub async fn publish_relay_announcement(
285 &self,
286 signing_key: &SigningKey,
287 self_addrs: Vec<PeerAddr>,
288 capacity: RelayCapacity,
289 ttl_secs: u64,
290 ) -> anyhow::Result<RelayAnnouncement> {
291 use crate::capabilities::Capabilities;
292 let now = now_unix_secs()?;
293 let ann = RelayAnnouncement::new_signed(
294 signing_key,
295 self_addrs,
296 Capabilities {
297 relay: true,
298 ..Capabilities::default()
299 },
300 capacity,
301 now,
302 ttl_secs,
303 )?;
304 let mut state = self.state.write().await;
305 state.relay.ingest_announcement(ann.clone(), now)?;
306 Ok(ann)
307 }
308
309 pub async fn publish_relay_announcement_to_dht<T: RequestTransport + ?Sized>(
318 &self,
319 transport: &T,
320 ann: &RelayAnnouncement,
321 seed_peers: &[PeerAddr],
322 ) -> anyhow::Result<usize> {
323 use super::helpers::replicate_store_to_closest;
324 use crate::dht::{DEFAULT_TTL_SECS, K};
325 use crate::relay::{
326 current_rendezvous_bucket, relay_rendezvous_index, relay_rendezvous_key,
327 };
328
329 let now = now_unix_secs()?;
330 let bucket_id = current_rendezvous_bucket(now);
331 let encoded = crate::cbor::to_vec(ann)?;
332 let bucket_end = (bucket_id + 1) * crate::relay::RELAY_RENDEZVOUS_BUCKET_SECS;
334 let ttl = bucket_end.saturating_sub(now).max(DEFAULT_TTL_SECS);
335
336 let mut total = 0usize;
337 for which in 0u8..2 {
338 let slot = relay_rendezvous_index(&ann.relay_pubkey, bucket_id, which);
339 let key = relay_rendezvous_key(bucket_id, slot);
340 total += replicate_store_to_closest(
341 transport,
342 self,
343 key,
344 encoded.clone(),
345 ttl,
346 seed_peers,
347 K,
348 )
349 .await
350 .unwrap_or(0);
351 }
352 Ok(total)
353 }
354
355 pub async fn discover_relays_from_dht<T: RequestTransport + ?Sized>(
364 &self,
365 transport: &T,
366 seed_peers: &[PeerAddr],
367 ) -> anyhow::Result<usize> {
368 use crate::relay::{
369 RELAY_RENDEZVOUS_N, RelayAnnouncement as RA, current_rendezvous_bucket,
370 relay_rendezvous_key,
371 };
372
373 let now = now_unix_secs()?;
374 let bucket_id = current_rendezvous_bucket(now);
375 let mut found_announcements = Vec::new();
376
377 for slot in 0..RELAY_RENDEZVOUS_N {
378 let key = relay_rendezvous_key(bucket_id, slot);
379 if let Ok(Some(dht_value)) = self
380 .dht_find_value_iterative(transport, key, seed_peers)
381 .await
382 && let Ok(ann) = crate::cbor::from_slice::<RA>(&dht_value.value)
383 {
384 found_announcements.push(ann);
385 }
386 }
387
388 self.ingest_relay_announcements(found_announcements).await
389 }
390
391 pub async fn register_relay_tunnel<C: PeerConnector + 'static>(
402 &self,
403 connector: &C,
404 relay_addr: &PeerAddr,
405 ) -> anyhow::Result<ActiveRelaySlot> {
406 let mut stream = connector.connect(relay_addr).await?;
407
408 let register_payload = WirePayload::RelayRegister(RelayRegister {
410 relay_slot_id: None,
411 tunnel: true,
412 });
413 let request_envelope = Envelope::from_typed(1, 0, ®ister_payload)?;
414
415 let response = send_request_on_stream(
416 &mut stream,
417 request_envelope,
418 std::time::Duration::from_secs(10),
419 )
420 .await?;
421
422 if response.flags & FLAG_RESPONSE == 0 {
423 anyhow::bail!("relay registration: unexpected non-response");
424 }
425 if response.r#type != MsgType::RelayRegistered as u16 {
426 let payload_str = String::from_utf8_lossy(&response.payload);
428 anyhow::bail!("relay registration failed: {}", payload_str);
429 }
430
431 let registered: RelayRegistered = crate::cbor::from_slice(&response.payload)?;
432 let slot = ActiveRelaySlot {
433 relay_addr: relay_addr.clone(),
434 slot_id: registered.relay_slot_id,
435 expires_at: registered.expires_at,
436 };
437
438 {
441 let mut state = self.state.write().await;
442 state
443 .active_relay_slots
444 .retain(|s| s.relay_addr != slot.relay_addr);
445 state.active_relay_slots.push(slot.clone());
446 }
447
448 let node = self.clone();
453 let relay_addr_key = relay_addr.clone();
454 tokio::spawn(async move {
455 let _ = node.serve_wire_stream(stream, None).await;
456 let mut state = node.state.write().await;
458 state
459 .active_relay_slots
460 .retain(|s| s.relay_addr != relay_addr_key);
461 });
462
463 Ok(slot)
464 }
465
466 pub async fn active_relay_slot(&self) -> Option<ActiveRelaySlot> {
468 self.state.read().await.active_relay_slots.first().cloned()
469 }
470
471 pub async fn active_relay_slots(&self) -> Vec<ActiveRelaySlot> {
473 self.state.read().await.active_relay_slots.clone()
474 }
475
476 pub async fn relayed_self_addr(&self, self_addr: PeerAddr) -> PeerAddr {
484 let slots = self.state.read().await.active_relay_slots.clone();
485 match slots.first() {
486 Some(active) => PeerAddr {
487 relay_via: Some(crate::peer::RelayRoute {
488 relay_addr: Box::new(active.relay_addr.clone()),
489 slot_id: active.slot_id,
490 }),
491 ..self_addr
492 },
493 None => self_addr,
494 }
495 }
496
497 pub async fn all_relayed_self_addrs(&self, self_addr: PeerAddr) -> Vec<PeerAddr> {
502 let slots = self.state.read().await.active_relay_slots.clone();
503 if slots.is_empty() {
504 return vec![self_addr];
505 }
506 slots
507 .iter()
508 .map(|active| PeerAddr {
509 relay_via: Some(crate::peer::RelayRoute {
510 relay_addr: Box::new(active.relay_addr.clone()),
511 slot_id: active.slot_id,
512 }),
513 ..self_addr.clone()
514 })
515 .collect()
516 }
517}