1use std::collections::{HashMap, HashSet, VecDeque};
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime, UNIX_EPOCH, Instant};
14
15use anyhow::{anyhow, Result};
16use tokio::net::UdpSocket;
17use tokio::sync::{mpsc, Mutex};
18use tokio_stream::StreamExt;
19
20use rift_core::{Identity, Invite, PeerId, MessageId};
21use rift_core::e2ee::{
22 derive_e2ee_shared_key, ed25519_public_from_bytes, generate_e2ee_keypair,
23 public_key_from_bytes, sign_e2ee_public, verify_e2ee_public,
24};
25use rift_discovery::{discover_peers, start_mdns_advertisement, DiscoveryConfig, MdnsHandle};
26use rift_nat::{
27 attempt_hole_punch, gather_public_addrs, gather_turn_candidates, NatConfig, PeerEndpoint,
28 TurnRelay,
29};
30use rift_protocol::{
31 decode_frame, encode_frame, CallControl, CallState, Capabilities, ChatMessage, CodecId,
32 ControlMessage, EncryptedPayload, FeatureFlag, GroupControl, GroupMode, IceCandidate,
33 CandidateType, PeerInfo, ProtocolVersion, QosProfile, RiftFrameHeader, RiftPayload, SessionId,
34 StreamKind, VoicePacket,
35};
36use rift_metrics as metrics;
37use aes_gcm::{Aes256Gcm, Key, Nonce, KeyInit};
38use aes_gcm::aead::{Aead, Payload};
39use rand::RngCore;
40use sha2::Digest;
41use subtle::ConstantTimeEq;
42
43#[cfg(feature = "predictive-rendezvous")]
44use rift_rndzv::{
45 build_probe_payload, compute_slot_params, parse_probe_payload, rendezvous_id_from_seed,
46 ParsedProbe, ProbePayload, RendezvousMetrics, RendezvousOutcome, RendezvousState, Role,
47 SemanticRendezvousToken,
48};
49#[cfg(feature = "predictive-rendezvous")]
50use tokio::time::Instant as TokioInstant;
51
52const MAX_PACKET: usize = 2048;
54const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(1);
56const RATE_LIMIT_PKTS_PER_SEC: u32 = 1200;
58const GROUP_MESH_MAX: usize = 5;
60
61#[derive(Debug, Clone)]
62pub struct MeshConfig {
63 pub channel_name: String,
65 pub password: Option<String>,
67 pub listen_port: u16,
69 pub relay_capable: bool,
71 pub qos: QosProfile,
73 pub auth_token: Option<Vec<u8>>,
75 pub require_auth: bool,
77 pub e2ee_key: Option<[u8; 32]>,
79 pub rekey_interval_secs: Option<u64>,
81 pub max_direct_peers: Option<usize>,
83}
84
85#[derive(Debug, Clone)]
86pub enum PeerRoute {
87 Direct { addr: SocketAddr },
89 Relayed { via: PeerId },
91}
92
93#[derive(Clone)]
94enum MeshSocket {
95 Udp(Arc<UdpSocket>),
96 Turn(Arc<TurnRelay>),
97}
98
99#[derive(Debug, Clone)]
100pub enum MeshEvent {
101 PeerJoined(PeerId),
103 PeerLeft(PeerId),
105 ChatReceived(ChatMessage),
107 VoiceFrame {
109 from: PeerId,
110 seq: u32,
111 timestamp: u64,
112 session: SessionId,
113 codec: CodecId,
114 payload: Vec<u8>,
115 },
116 RouteUpdated { peer_id: PeerId, route: PeerRoute },
118 RouteUpgraded(PeerId),
120 PeerCapabilities { peer_id: PeerId, capabilities: Capabilities },
122 PeerSessionConfig { peer_id: PeerId, codec: CodecId, frame_ms: u16 },
124 GroupCodec(CodecId),
125 StatsUpdate { peer: PeerId, stats: LinkStats, global: GlobalStats },
126 PeerIdentity { peer_id: PeerId, public_key: Vec<u8> },
127 IncomingCall {
128 session: SessionId,
129 from: PeerId,
130 rndzv_srt_uri: Option<String>,
131 },
132 CallAccepted { session: SessionId, from: PeerId },
133 CallDeclined {
134 session: SessionId,
135 from: PeerId,
136 reason: Option<String>,
137 },
138 CallEnded { session: SessionId },
139 GroupTopology { session: SessionId, mode: GroupMode },
140}
141
142#[derive(Debug, Clone, Copy)]
143pub struct LinkStats {
144 pub rtt_ms: f32,
146 pub loss: f32,
148 pub jitter_ms: f32,
150}
151
152#[derive(Debug, Clone, Copy)]
153pub struct GlobalStats {
154 pub num_peers: usize,
156 pub num_sessions: usize,
158 pub packets_sent: u64,
160 pub packets_received: u64,
162 pub bytes_sent: u64,
164 pub bytes_received: u64,
166}
167
168pub struct Mesh {
170 inner: Arc<MeshInner>,
171 events_rx: mpsc::Receiver<MeshEvent>,
172 discovery_config: DiscoveryConfig,
173 _mdns: Option<MdnsHandle>,
174}
175
176#[derive(Clone)]
178pub struct MeshHandle {
179 inner: Arc<MeshInner>,
180}
181
182struct MeshInner {
191 sockets: Mutex<Vec<MeshSocket>>,
192 turn_relays: Mutex<HashMap<SocketAddr, usize>>,
193 identity: Identity,
194 peers_by_id: Mutex<HashMap<PeerId, SocketAddr>>,
195 peer_caps: Mutex<HashMap<PeerId, bool>>,
196 peer_capabilities: Mutex<HashMap<PeerId, Capabilities>>,
197 peer_public_keys: Mutex<HashMap<PeerId, Vec<u8>>>,
198 peer_session: Mutex<HashMap<PeerId, SessionConfig>>,
199 preferred_codecs: Mutex<Vec<CodecId>>,
200 preferred_features: Mutex<Vec<FeatureFlag>>,
201 routes: Mutex<HashMap<PeerId, PeerRoute>>,
202 peer_addrs: Mutex<HashMap<PeerId, SocketAddr>>,
203 peer_candidates: Mutex<HashMap<PeerId, Vec<SocketAddr>>>,
204 peer_ice_candidates: Mutex<HashMap<PeerId, Vec<IceCandidate>>>,
205 self_candidates: Mutex<Vec<SocketAddr>>,
206 self_ice_candidates: Mutex<Vec<IceCandidate>>,
207 relay_candidates: Mutex<HashMap<PeerId, PeerId>>,
208 connections: Mutex<HashMap<SocketAddr, PeerConnection>>,
209 pending: Mutex<HashMap<PendingKey, PendingHandshake>>,
210 cache: Mutex<HashSet<MessageId>>,
211 voice_seq: Mutex<HashMap<PeerId, u32>>,
212 control_seq: Mutex<u32>,
213 nat_cfg: Mutex<Option<NatConfig>>,
214 qos: QosProfile,
215 link_stats: Mutex<HashMap<PeerId, LinkStatsState>>,
216 peer_traffic: Mutex<HashMap<PeerId, TrafficStats>>,
217 global_traffic: Mutex<TrafficStats>,
218 rate_limits: Mutex<HashMap<SocketAddr, RateLimitState>>,
219 auth_required: bool,
220 auth_token: Option<Vec<u8>>,
221 events_tx: mpsc::Sender<MeshEvent>,
222 relay_capable: bool,
223 session_mgr: Mutex<SessionManager>,
224 active_session: Mutex<SessionId>,
225 channel_session: SessionId,
226 group_codec: Mutex<CodecId>,
227 group_topology: Mutex<HashMap<SessionId, GroupMode>>,
228 candidate_attempts: Mutex<HashMap<PeerId, tokio::time::Instant>>,
229 e2ee_key: Option<[u8; 32]>,
230 e2ee_keys: Mutex<HashMap<(PeerId, SessionId), [u8; 32]>>,
231 e2ee_pending: Mutex<HashMap<(PeerId, SessionId), rift_core::e2ee::E2eeKeypair>>,
232 e2ee_ready: Mutex<HashMap<(PeerId, SessionId), E2eeReadyState>>,
233 rekey_interval_secs: Option<u64>,
234 max_direct_peers: Option<usize>,
235 #[cfg(feature = "predictive-rendezvous")]
236 pr_sessions: Mutex<HashMap<u64, PrSession>>,
237}
238
239#[cfg(feature = "predictive-rendezvous")]
240struct PrSession {
241 tx: mpsc::Sender<(SocketAddr, ParsedProbe)>,
242}
243
244#[derive(Debug, Default, Clone)]
245struct TrafficStats {
246 packets_sent: u64,
247 packets_received: u64,
248 bytes_sent: u64,
249 bytes_received: u64,
250}
251
252#[derive(Debug, Clone, Copy)]
253struct E2eeReadyState {
254 local_ready: bool,
255 remote_ready: bool,
256}
257
258#[derive(Debug, Clone)]
259struct RateLimitState {
260 window_start: Instant,
261 count: u32,
262 last_drop: Instant,
263}
264
265impl RateLimitState {
266 fn new(now: Instant) -> Self {
267 Self {
268 window_start: now,
269 count: 0,
270 last_drop: now,
271 }
272 }
273
274 fn allow(&mut self, now: Instant) -> bool {
275 if now.duration_since(self.window_start) >= RATE_LIMIT_WINDOW {
276 self.window_start = now;
277 self.count = 0;
278 }
279 if self.count >= RATE_LIMIT_PKTS_PER_SEC {
280 return false;
281 }
282 self.count = self.count.saturating_add(1);
283 true
284 }
285}
286
287#[derive(Debug, Clone)]
288pub struct SessionConfig {
290 pub codec: CodecId,
292 pub frame_ms: u16,
294}
295
296struct PeerConnection {
297 peer_id: Option<PeerId>,
298 session: rift_core::noise::NoiseSession,
299 socket_idx: usize,
300 authenticated: bool,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
304struct PendingKey {
305 socket_idx: usize,
306 addr: SocketAddr,
307}
308
309enum PendingHandshake {
310 InitiatorAwait2(snow::HandshakeState),
311 ResponderAwait3(snow::HandshakeState),
312}
313
314#[derive(Debug)]
315struct SessionManager {
316 sessions: HashMap<SessionId, SessionState>,
317}
318
319#[derive(Debug, Clone)]
320struct SessionState {
321 state: CallState,
322 participants: HashSet<PeerId>,
323}
324
325#[derive(Debug, Clone)]
326struct LinkStatsState {
327 last_seq: Option<u32>,
328 window: VecDeque<bool>,
329 window_size: usize,
330 last_transit_ms: Option<f32>,
331 jitter_ms: f32,
332 rtt_ms: f32,
333 last_emit: tokio::time::Instant,
334}
335
336impl LinkStatsState {
337 fn new() -> Self {
338 Self {
339 last_seq: None,
340 window: VecDeque::with_capacity(128),
341 window_size: 100,
342 last_transit_ms: None,
343 jitter_ms: 0.0,
344 rtt_ms: 0.0,
345 last_emit: tokio::time::Instant::now() - Duration::from_secs(10),
346 }
347 }
348
349 fn update_on_receive(&mut self, seq: u32, sent_ms: u64, arrival_ms: u64) {
350 if let Some(last) = self.last_seq {
351 if seq <= last {
352 return;
353 }
354 let gap = (seq - last).saturating_sub(1);
355 let add = gap.min(self.window_size as u32) as usize;
356 for _ in 0..add {
357 self.push_window(false);
358 }
359 }
360 self.push_window(true);
361 self.last_seq = Some(seq);
362
363 let transit = arrival_ms as f32 - sent_ms as f32;
364 if let Some(prev) = self.last_transit_ms {
365 let d = (transit - prev).abs();
366 self.jitter_ms += (d - self.jitter_ms) / 16.0;
367 }
368 self.last_transit_ms = Some(transit);
369 }
370
371 fn update_rtt(&mut self, rtt_ms: f32) {
372 if self.rtt_ms == 0.0 {
373 self.rtt_ms = rtt_ms;
374 } else {
375 let alpha = 0.1;
376 self.rtt_ms = self.rtt_ms * (1.0 - alpha) + rtt_ms * alpha;
377 }
378 }
379
380 fn push_window(&mut self, received: bool) {
381 if self.window.len() >= self.window_size {
382 self.window.pop_front();
383 }
384 self.window.push_back(received);
385 }
386
387 fn loss_ratio(&self) -> f32 {
388 if self.window.is_empty() {
389 return 0.0;
390 }
391 let received = self.window.iter().filter(|v| **v).count() as f32;
392 let total = self.window.len() as f32;
393 (total - received) / total
394 }
395
396 fn snapshot(&self) -> LinkStats {
397 LinkStats {
398 rtt_ms: self.rtt_ms,
399 loss: self.loss_ratio(),
400 jitter_ms: self.jitter_ms,
401 }
402 }
403}
404
405impl SessionManager {
406 fn new(channel_session: SessionId) -> Self {
407 let mut sessions = HashMap::new();
408 sessions.insert(
409 channel_session,
410 SessionState {
411 state: CallState::Active,
412 participants: HashSet::new(),
413 },
414 );
415 Self { sessions }
416 }
417
418 fn ensure_session(&mut self, session: SessionId) -> &mut SessionState {
419 self.sessions.entry(session).or_insert_with(|| SessionState {
420 state: CallState::Ringing,
421 participants: HashSet::new(),
422 })
423 }
424
425 fn add_participant(&mut self, session: SessionId, peer_id: PeerId) {
426 let state = self.ensure_session(session);
427 state.participants.insert(peer_id);
428 }
429
430 fn remove_participant(&mut self, session: SessionId, peer_id: PeerId) {
431 if let Some(state) = self.sessions.get_mut(&session) {
432 state.participants.remove(&peer_id);
433 }
434 }
435
436 fn remove_participant_all(&mut self, peer_id: PeerId) {
437 for state in self.sessions.values_mut() {
438 state.participants.remove(&peer_id);
439 }
440 }
441
442 fn set_state(&mut self, session: SessionId, state: CallState) {
443 let entry = self.ensure_session(session);
444 entry.state = state;
445 }
446
447 fn participants(&self, session: SessionId) -> Vec<PeerId> {
448 self.sessions
449 .get(&session)
450 .map(|state| state.participants.iter().copied().collect())
451 .unwrap_or_default()
452 }
453}
454
455impl Mesh {
456 pub async fn new(identity: Identity, config: MeshConfig) -> Result<Self> {
460 let addr = SocketAddr::from(([0, 0, 0, 0], config.listen_port));
461 let socket = UdpSocket::bind(addr).await?;
462 let socket = Arc::new(socket);
463 let peer_id = identity.peer_id;
464 let channel_session = SessionId::from_channel(&config.channel_name, config.password.as_deref());
465 let session_mgr = SessionManager::new(channel_session);
466
467 let (events_tx, events_rx) = mpsc::channel(256);
468 let inner = Arc::new(MeshInner {
469 sockets: Mutex::new(vec![MeshSocket::Udp(socket.clone())]),
470 turn_relays: Mutex::new(HashMap::new()),
471 identity,
472 peers_by_id: Mutex::new(HashMap::new()),
473 peer_caps: Mutex::new(HashMap::new()),
474 peer_capabilities: Mutex::new(HashMap::new()),
475 peer_public_keys: Mutex::new(HashMap::new()),
476 peer_session: Mutex::new(HashMap::new()),
477 preferred_codecs: Mutex::new(Vec::new()),
478 preferred_features: Mutex::new(Vec::new()),
479 routes: Mutex::new(HashMap::new()),
480 group_codec: Mutex::new(CodecId::Opus),
481 peer_addrs: Mutex::new(HashMap::new()),
482 peer_candidates: Mutex::new(HashMap::new()),
483 peer_ice_candidates: Mutex::new(HashMap::new()),
484 self_candidates: Mutex::new(Vec::new()),
485 self_ice_candidates: Mutex::new(Vec::new()),
486 relay_candidates: Mutex::new(HashMap::new()),
487 connections: Mutex::new(HashMap::new()),
488 pending: Mutex::new(HashMap::new()),
489 cache: Mutex::new(HashSet::new()),
490 voice_seq: Mutex::new(HashMap::new()),
491 control_seq: Mutex::new(0),
492 nat_cfg: Mutex::new(None),
493 qos: config.qos,
494 link_stats: Mutex::new(HashMap::new()),
495 peer_traffic: Mutex::new(HashMap::new()),
496 global_traffic: Mutex::new(TrafficStats::default()),
497 rate_limits: Mutex::new(HashMap::new()),
498 auth_required: config.require_auth,
499 auth_token: config.auth_token,
500 events_tx,
501 relay_capable: config.relay_capable,
502 session_mgr: Mutex::new(session_mgr),
503 active_session: Mutex::new(channel_session),
504 channel_session,
505 candidate_attempts: Mutex::new(HashMap::new()),
506 e2ee_key: config.e2ee_key,
507 e2ee_keys: Mutex::new(HashMap::new()),
508 e2ee_pending: Mutex::new(HashMap::new()),
509 e2ee_ready: Mutex::new(HashMap::new()),
510 rekey_interval_secs: config.rekey_interval_secs,
511 max_direct_peers: config.max_direct_peers,
512 group_topology: Mutex::new(HashMap::new()),
513 #[cfg(feature = "predictive-rendezvous")]
514 pr_sessions: Mutex::new(HashMap::new()),
515 });
516
517 {
518 let mut topo = inner.group_topology.lock().await;
519 topo.insert(channel_session, GroupMode::Mesh);
520 }
521
522 MeshInner::spawn_receiver(inner.clone(), 0, MeshSocket::Udp(socket.clone()));
523 MeshInner::spawn_auto_upgrade(inner.clone());
524 MeshInner::spawn_candidate_checks(inner.clone());
525 MeshInner::spawn_rekey(inner.clone());
526 MeshInner::spawn_scaling(inner.clone());
527 MeshInner::spawn_pinger(inner.clone());
528
529 let mesh = Self {
530 inner,
531 events_rx,
532 discovery_config: DiscoveryConfig {
533 channel_name: config.channel_name,
534 password: config.password,
535 peer_id,
536 listen_port: socket.local_addr()?.port(),
537 },
538 _mdns: None,
539 };
540
541 Ok(mesh)
542 }
543
544 pub fn local_peer_id(&self) -> PeerId {
546 self.inner.identity.peer_id
547 }
548
549 pub async fn local_addr(&self) -> Result<SocketAddr> {
551 let sockets = self.inner.sockets.lock().await;
552 match sockets.first() {
553 Some(MeshSocket::Udp(sock)) => {
554 let addr = sock.local_addr()?;
555 if addr.ip().is_unspecified() {
557 Ok(SocketAddr::new(
558 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
559 addr.port(),
560 ))
561 } else {
562 Ok(addr)
563 }
564 }
565 Some(MeshSocket::Turn(relay)) => Ok(relay.relay_addr()),
566 None => Err(anyhow!("missing socket")),
567 }
568 }
569
570 pub fn start_lan_discovery(&mut self) -> Result<()> {
572 let mdns = start_mdns_advertisement(self.discovery_config.clone())?;
573 self._mdns = Some(mdns);
574 MeshInner::spawn_discovery(self.inner.clone(), self.discovery_config.clone());
575 Ok(())
576 }
577
578 pub async fn enable_nat(&mut self, nat_cfg: NatConfig) {
580 let mut cfg = self.inner.nat_cfg.lock().await;
581 *cfg = Some(nat_cfg);
582 drop(cfg);
583 let nat_cfg = { self.inner.nat_cfg.lock().await.clone() };
584 if let Some(nat_cfg) = nat_cfg {
585 let local_addr = {
586 let sockets = self.inner.sockets.lock().await;
587 sockets.first().and_then(|sock| match sock {
588 MeshSocket::Udp(udp) => udp.local_addr().ok(),
589 MeshSocket::Turn(relay) => Some(relay.relay_addr()),
590 })
591 };
592 let mut relay_addrs = Vec::new();
593 if !nat_cfg.turn_servers.is_empty() {
594 if let Ok(relays) = gather_turn_candidates(&nat_cfg).await {
595 for relay in relays {
596 let relay_addr = relay.relay_addr;
597 let relay_handle = relay.relay.clone();
598 if let Ok(socket_idx) = self.inner.add_turn_relay(relay_handle).await {
599 let mut turn_relays = self.inner.turn_relays.lock().await;
600 turn_relays.insert(relay_addr, socket_idx);
601 relay_addrs.push(relay_addr);
602 let _ = rift_nat::spawn_turn_keepalive(
603 relay.relay.clone(),
604 nat_cfg.turn_keepalive_ms,
605 );
606 }
607 }
608 }
609 }
610 if let Ok(addrs) = gather_public_addrs(&nat_cfg).await {
611 let local_addr = {
612 let sockets = self.inner.sockets.lock().await;
613 sockets.first().and_then(|sock| match sock {
614 MeshSocket::Udp(udp) => udp.local_addr().ok(),
615 MeshSocket::Turn(relay) => Some(relay.relay_addr()),
616 })
617 };
618 let mut combined = addrs.clone();
619 combined.extend(relay_addrs.clone());
620 if let Some(local) = local_addr {
621 combined.push(local);
622 }
623 combined.sort();
624 combined.dedup();
625 let mut self_candidates = self.inner.self_candidates.lock().await;
626 *self_candidates = combined;
627 drop(self_candidates);
628 let ice_candidates = MeshInner::build_ice_candidates(local_addr, &addrs, &relay_addrs);
629 let mut self_ice = self.inner.self_ice_candidates.lock().await;
630 *self_ice = ice_candidates;
631 } else if let Some(local) = local_addr {
632 let mut self_candidates = self.inner.self_candidates.lock().await;
633 let mut combined = vec![local];
634 combined.extend(relay_addrs.clone());
635 combined.sort();
636 combined.dedup();
637 *self_candidates = combined;
638 drop(self_candidates);
639 let ice_candidates = MeshInner::build_ice_candidates(Some(local), &[], &relay_addrs);
640 let mut self_ice = self.inner.self_ice_candidates.lock().await;
641 *self_ice = ice_candidates;
642 }
643 }
644 }
645
646 pub async fn join_invite(&mut self, invite: Invite, nat_cfg: NatConfig) -> Result<()> {
648 {
649 let mut cfg = self.inner.nat_cfg.lock().await;
650 *cfg = Some(nat_cfg.clone());
651 }
652
653 let mut targets = invite.known_peers;
654 targets.extend(invite.candidates);
655 targets.sort();
656 targets.dedup();
657
658 let mut candidates: Vec<IceCandidate> = targets
659 .into_iter()
660 .map(|addr| IceCandidate {
661 addr,
662 cand_type: CandidateType::Srflx,
663 priority: 50,
664 foundation: MeshInner::candidate_foundation(addr, CandidateType::Srflx),
665 })
666 .collect();
667 for relay_addr in invite.relay_candidates {
668 candidates.push(IceCandidate {
669 addr: relay_addr,
670 cand_type: CandidateType::Relay,
671 priority: 70,
672 foundation: MeshInner::candidate_foundation(relay_addr, CandidateType::Relay),
673 });
674 }
675
676 for candidate in candidates {
677 match candidate.cand_type {
678 CandidateType::Relay => {
679 let socket_idx = {
680 let map = self.inner.turn_relays.lock().await;
681 map.get(&candidate.addr).copied()
682 };
683 if let Some(socket_idx) = socket_idx {
684 if let Err(err) = self.inner.initiate_handshake(candidate.addr, socket_idx).await {
685 tracing::warn!("handshake to {} failed: {err}", candidate.addr);
686 }
687 }
688 }
689 _ => {
690 if let Err(err) = self.inner.initiate_handshake(candidate.addr, 0).await {
692 tracing::warn!("direct handshake to {} failed: {err}", candidate.addr);
693 } else {
694 continue;
695 }
696
697 let endpoint = PeerEndpoint {
698 peer_id: PeerId([0u8; 32]),
699 external_addrs: vec![candidate.addr],
700 punch_ports: vec![candidate.addr.port()],
701 };
702 if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
703 let socket_idx = self.inner.add_socket(socket).await?;
704 if let Err(err) = self.inner.initiate_handshake(remote, socket_idx).await {
705 tracing::warn!("handshake to {} failed: {err}", remote);
706 }
707 }
708 }
709 }
710 }
711 Ok(())
712 }
713
714 pub async fn active_session(&self) -> SessionId {
716 *self.inner.active_session.lock().await
717 }
718
719 pub async fn next_event(&mut self) -> Option<MeshEvent> {
721 self.events_rx.recv().await
722 }
723
724 pub fn handle(&self) -> MeshHandle {
726 MeshHandle {
727 inner: self.inner.clone(),
728 }
729 }
730}
731
732impl MeshInner {
733 fn build_ice_candidates(
734 local_addr: Option<SocketAddr>,
735 public_addrs: &[SocketAddr],
736 relay_addrs: &[SocketAddr],
737 ) -> Vec<IceCandidate> {
738 let mut out = Vec::new();
739 if let Some(addr) = local_addr {
740 out.push(IceCandidate {
741 addr,
742 cand_type: CandidateType::Host,
743 priority: 100,
744 foundation: Self::candidate_foundation(addr, CandidateType::Host),
745 });
746 }
747 for addr in public_addrs {
748 out.push(IceCandidate {
749 addr: *addr,
750 cand_type: CandidateType::Srflx,
751 priority: 90,
752 foundation: Self::candidate_foundation(*addr, CandidateType::Srflx),
753 });
754 }
755 for addr in relay_addrs {
756 out.push(IceCandidate {
757 addr: *addr,
758 cand_type: CandidateType::Relay,
759 priority: 70,
760 foundation: Self::candidate_foundation(*addr, CandidateType::Relay),
761 });
762 }
763 out.sort_by(|a, b| b.priority.cmp(&a.priority));
764 out.dedup_by(|a, b| a.addr == b.addr && a.cand_type == b.cand_type);
765 out
766 }
767
768 fn candidate_foundation(addr: SocketAddr, cand_type: CandidateType) -> u64 {
769 use std::hash::{Hash, Hasher};
770 let mut hasher = std::collections::hash_map::DefaultHasher::new();
771 addr.hash(&mut hasher);
772 cand_type.hash(&mut hasher);
773 hasher.finish()
774 }
775
776 async fn current_group_mode(&self, session: SessionId) -> GroupMode {
777 let topo = self.group_topology.lock().await;
778 topo.get(&session).cloned().unwrap_or(GroupMode::Mesh)
779 }
780
781 async fn set_group_mode(&self, session: SessionId, mode: GroupMode) {
782 let mut topo = self.group_topology.lock().await;
783 topo.insert(session, mode);
784 }
785
786 async fn is_topology_coordinator(&self, session: SessionId) -> bool {
787 let peers = {
788 let mgr = self.session_mgr.lock().await;
789 mgr.participants(session)
790 };
791 if peers.is_empty() {
792 return true;
793 }
794 let mut all = peers;
795 if !all.contains(&self.identity.peer_id) {
796 all.push(self.identity.peer_id);
797 }
798 all.sort_by_key(|p| p.0);
799 all.first().copied() == Some(self.identity.peer_id)
800 }
801
802 async fn select_forwarder(&self, session: SessionId) -> Option<PeerId> {
803 let participants = {
804 let mgr = self.session_mgr.lock().await;
805 mgr.participants(session)
806 };
807 if participants.is_empty() {
808 return None;
809 }
810 let relay_caps = self.peer_caps.lock().await.clone();
811 let stats = self.link_stats.lock().await.clone();
812
813 let mut candidates: Vec<PeerId> = participants
814 .into_iter()
815 .filter(|peer| relay_caps.get(peer).copied().unwrap_or(false) || *peer == self.identity.peer_id)
816 .collect();
817 if self.relay_capable && !candidates.contains(&self.identity.peer_id) {
818 candidates.push(self.identity.peer_id);
819 }
820 if candidates.is_empty() {
821 return None;
822 }
823 candidates.sort_by_key(|peer| {
824 if *peer == self.identity.peer_id {
825 0u64
826 } else if let Some(st) = stats.get(peer) {
827 (st.rtt_ms as u64).saturating_add((st.loss_ratio() * 1000.0) as u64)
828 } else {
829 10_000
830 }
831 });
832 candidates.first().copied()
833 }
834
835 async fn update_group_topology(&self, session: SessionId) -> Result<()> {
836 if !self.is_topology_coordinator(session).await {
837 return Ok(());
838 }
839 let participants = {
840 let mgr = self.session_mgr.lock().await;
841 mgr.participants(session)
842 };
843 let count = participants.len() + 1;
845 let next = if count <= GROUP_MESH_MAX {
846 GroupMode::Mesh
847 } else if self.e2ee_key.is_some() {
848 if let Some(forwarder) = self.select_forwarder(session).await {
849 GroupMode::Hybrid { forwarder }
850 } else {
851 GroupMode::Mesh
852 }
853 } else {
854 GroupMode::Mesh
855 };
856
857 let current = self.current_group_mode(session).await;
858 if current == next {
859 return Ok(());
860 }
861 self.set_group_mode(session, next).await;
862 let msg = ControlMessage::Group(GroupControl::Topology { session, mode: next });
863 for peer in participants {
864 if peer == self.identity.peer_id {
865 continue;
866 }
867 let _ = self.send_control_to_peer(peer, msg.clone(), session).await;
868 }
869 let _ = self
870 .events_tx
871 .send(MeshEvent::GroupTopology { session, mode: next })
872 .await;
873 Ok(())
874 }
875
876 async fn should_forward_voice(&self, session: SessionId, from: PeerId) -> bool {
877 if from == self.identity.peer_id {
878 return false;
879 }
880 match self.current_group_mode(session).await {
881 GroupMode::Hybrid { forwarder } if forwarder == self.identity.peer_id => true,
882 _ => false,
883 }
884 }
885
886 async fn should_use_group_key(&self, session: SessionId) -> bool {
887 matches!(self.current_group_mode(session).await, GroupMode::Hybrid { .. }) && self.e2ee_key.is_some()
888 }
889 async fn broadcast_chat(&self, text: String) -> Result<()> {
890 let timestamp = now_timestamp();
891 let seq = self.next_control_seq().await;
892 let chat = ChatMessage::new(self.identity.peer_id, timestamp, text);
893 let payload = RiftPayload::Control(ControlMessage::Chat(chat.clone()));
894 let session = self.channel_session;
895
896 let mut cache = self.cache.lock().await;
897 cache.insert(chat.id);
898 drop(cache);
899
900 let routes = self.routes_snapshot().await;
901 for (peer_id, route) in routes {
902 if peer_id == self.identity.peer_id {
903 continue;
904 }
905 if let Err(err) = self
906 .send_to_peer(peer_id, route, payload.clone(), seq, timestamp, session)
907 .await
908 {
909 tracing::debug!(peer = %peer_id, "chat send failed: {err}");
910 }
911 }
912
913 Ok(())
914 }
915 fn spawn_receiver(inner: Arc<Self>, socket_idx: usize, socket: MeshSocket) {
916 tokio::spawn(async move {
917 let mut buf = [0u8; MAX_PACKET];
918 match socket {
919 MeshSocket::Udp(socket) => loop {
920 let (len, addr) = match socket.recv_from(&mut buf).await {
921 Ok(res) => res,
922 Err(_) => continue,
923 };
924 if let Err(err) = inner
925 .clone()
926 .handle_packet(socket_idx, addr, &buf[..len])
927 .await
928 {
929 tracing::warn!("mesh recv error: {err}");
930 }
931 },
932 MeshSocket::Turn(relay) => loop {
933 let res = relay.recv_from(&mut buf).await;
934 let Ok((len, addr)) = res else {
935 tracing::warn!("turn recv error");
936 continue;
937 };
938 if let Err(err) = inner
939 .clone()
940 .handle_packet(socket_idx, addr, &buf[..len])
941 .await
942 {
943 tracing::warn!("mesh recv error: {err}");
944 }
945 },
946 }
947 });
948 }
949
950 fn spawn_discovery(inner: Arc<Self>, config: DiscoveryConfig) {
951 tokio::spawn(async move {
952 loop {
953 let stream = match discover_peers(config.clone()) {
954 Ok(stream) => stream,
955 Err(err) => {
956 tracing::warn!("discovery failed: {err}");
957 tokio::time::sleep(Duration::from_secs(2)).await;
958 continue;
959 }
960 };
961 tokio::pin!(stream);
962 let mut window = tokio::time::interval(Duration::from_millis(200));
963 let window_deadline = tokio::time::Instant::now() + Duration::from_secs(3);
964 loop {
965 tokio::select! {
966 maybe = stream.next() => {
967 let Some(peer) = maybe else { break; };
968 if peer.peer_id == inner.identity.peer_id {
969 continue;
970 }
971 let already = {
972 let connections = inner.connections.lock().await;
973 connections.contains_key(&peer.addr)
974 };
975 if already {
976 continue;
977 }
978 if let Err(err) = inner.initiate_handshake(peer.addr, 0).await {
979 tracing::warn!("handshake to {} failed: {err}", peer.addr);
980 let inner_retry = inner.clone();
981 tokio::spawn(async move {
982 for attempt in 1..=10 {
983 tokio::time::sleep(Duration::from_secs(1)).await;
984 let already = {
985 let connections = inner_retry.connections.lock().await;
986 connections.contains_key(&peer.addr)
987 };
988 if already {
989 break;
990 }
991 if let Err(err) = inner_retry.initiate_handshake(peer.addr, 0).await {
992 tracing::debug!(
993 "handshake retry {} to {} failed: {err}",
994 attempt,
995 peer.addr
996 );
997 } else {
998 break;
999 }
1000 }
1001 });
1002 }
1003 }
1004 _ = window.tick() => {
1005 if tokio::time::Instant::now() >= window_deadline {
1006 break;
1007 }
1008 }
1009 }
1010 }
1011 tokio::time::sleep(Duration::from_secs(3)).await;
1012 }
1013 });
1014 }
1015
1016 fn spawn_auto_upgrade(inner: Arc<Self>) {
1017 tokio::spawn(async move {
1018 let mut tick = tokio::time::interval(Duration::from_secs(30));
1019 loop {
1020 tick.tick().await;
1021 let nat_cfg = { inner.nat_cfg.lock().await.clone() };
1022 let Some(nat_cfg) = nat_cfg else {
1023 continue;
1024 };
1025 let relayed: Vec<(PeerId, Vec<SocketAddr>)> = {
1026 let routes = inner.routes.lock().await;
1027 let addrs = inner.peer_addrs.lock().await;
1028 let candidates = inner.peer_candidates.lock().await;
1029 routes
1030 .iter()
1031 .filter_map(|(peer_id, route)| match route {
1032 PeerRoute::Relayed { .. } => {
1033 if let Some(cands) = candidates.get(peer_id) {
1034 Some((*peer_id, cands.clone()))
1035 } else {
1036 addrs
1037 .get(peer_id)
1038 .copied()
1039 .map(|addr| (*peer_id, vec![addr]))
1040 }
1041 }
1042 _ => None,
1043 })
1044 .collect()
1045 };
1046
1047 for (peer_id, addrs) in relayed {
1048 let endpoint = PeerEndpoint {
1049 peer_id,
1050 external_addrs: addrs.clone(),
1051 punch_ports: addrs.iter().map(|addr| addr.port()).collect(),
1052 };
1053 if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
1054 if let Ok(socket_idx) = inner.add_socket(socket).await {
1055 if let Err(err) = inner.initiate_handshake(remote, socket_idx).await {
1056 tracing::debug!(
1057 peer = %peer_id,
1058 "auto-upgrade handshake failed: {err}"
1059 );
1060 }
1061 }
1062 }
1063 }
1064 }
1065 });
1066 }
1067
1068 fn spawn_candidate_checks(inner: Arc<Self>) {
1069 tokio::spawn(async move {
1070 let mut tick = tokio::time::interval(Duration::from_secs(15));
1071 loop {
1072 tick.tick().await;
1073 let nat_cfg = { inner.nat_cfg.lock().await.clone() };
1074 let Some(nat_cfg) = nat_cfg else {
1075 continue;
1076 };
1077 let ice_map = { inner.peer_ice_candidates.lock().await.clone() };
1078 let candidates_map = { inner.peer_candidates.lock().await.clone() };
1079 let routes = { inner.routes.lock().await.clone() };
1080 let addrs = { inner.peer_addrs.lock().await.clone() };
1081 let now = tokio::time::Instant::now();
1082
1083 let mut targets: Vec<(PeerId, Vec<IceCandidate>)> = Vec::new();
1084 for (peer_id, route) in routes.iter() {
1085 if matches!(route, PeerRoute::Direct { .. }) {
1086 continue;
1087 }
1088 let candidates = if let Some(ice) = ice_map.get(peer_id) {
1089 let mut sorted = ice.clone();
1090 sorted.sort_by(|a, b| b.priority.cmp(&a.priority));
1091 sorted
1092 } else {
1093 candidates_map
1094 .get(peer_id)
1095 .cloned()
1096 .or_else(|| addrs.get(peer_id).copied().map(|addr| vec![addr]))
1097 .unwrap_or_default()
1098 .into_iter()
1099 .map(|addr| IceCandidate {
1100 addr,
1101 cand_type: CandidateType::Srflx,
1102 priority: 50,
1103 foundation: MeshInner::candidate_foundation(addr, CandidateType::Srflx),
1104 })
1105 .collect()
1106 };
1107 if candidates.is_empty() {
1108 continue;
1109 }
1110 targets.push((*peer_id, candidates));
1111 }
1112
1113 for (peer_id, candidates) in targets {
1114 let mut attempts = inner.candidate_attempts.lock().await;
1115 if let Some(last) = attempts.get(&peer_id) {
1116 if now.duration_since(*last) < Duration::from_secs(20) {
1117 continue;
1118 }
1119 }
1120 attempts.insert(peer_id, now);
1121 drop(attempts);
1122
1123 for candidate in candidates {
1124 let mut connected = None;
1125 match candidate.cand_type {
1126 CandidateType::Relay => {
1127 let socket_idx = {
1128 let map = inner.turn_relays.lock().await;
1129 map.get(&candidate.addr).copied()
1130 };
1131 if let Some(socket_idx) = socket_idx {
1132 if let Err(err) = inner.initiate_handshake(candidate.addr, socket_idx).await {
1133 tracing::debug!(
1134 peer = %peer_id,
1135 "turn candidate handshake failed: {err}"
1136 );
1137 } else {
1138 connected = Some(candidate.addr);
1139 }
1140 }
1141 }
1142 _ => {
1143 let endpoint = PeerEndpoint {
1144 peer_id,
1145 external_addrs: vec![candidate.addr],
1146 punch_ports: vec![candidate.addr.port()],
1147 };
1148 if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
1149 if let Ok(socket_idx) = inner.add_socket(socket).await {
1150 if let Err(err) = inner.initiate_handshake(remote, socket_idx).await {
1151 tracing::debug!(
1152 peer = %peer_id,
1153 "candidate check handshake failed: {err}"
1154 );
1155 } else {
1156 connected = Some(remote);
1157 }
1158 }
1159 }
1160 }
1161 }
1162 if let Some(remote) = connected {
1163 let msg = ControlMessage::IceCheck {
1164 session: SessionId::NONE,
1165 tie_breaker: rand::random::<u64>(),
1166 candidate: candidate.clone(),
1167 };
1168 let _ = inner.send_control_to_peer(peer_id, msg, SessionId::NONE).await;
1169 if remote == candidate.addr {
1170 break;
1171 }
1172 }
1173 }
1174 }
1175 }
1176 });
1177 }
1178
1179 fn spawn_pinger(inner: Arc<Self>) {
1180 tokio::spawn(async move {
1181 let mut tick = tokio::time::interval(Duration::from_secs(2));
1182 let mut nonce: u64 = 1;
1183 loop {
1184 tick.tick().await;
1185 let peers: Vec<PeerId> = {
1186 let routes = inner.routes.lock().await;
1187 routes.keys().copied().collect()
1188 };
1189 for peer_id in peers {
1190 if peer_id == inner.identity.peer_id {
1191 continue;
1192 }
1193 let ping = ControlMessage::Ping {
1194 nonce,
1195 sent_at_ms: now_timestamp(),
1196 };
1197 nonce = nonce.wrapping_add(1);
1198 let _ = inner
1199 .send_control_to_peer(peer_id, ping, SessionId::NONE)
1200 .await;
1201 }
1202 }
1203 });
1204 }
1205
1206 fn spawn_rekey(inner: Arc<Self>) {
1207 tokio::spawn(async move {
1208 let interval = inner.rekey_interval_secs.unwrap_or(0);
1209 if interval == 0 {
1210 return;
1211 }
1212 let mut tick = tokio::time::interval(Duration::from_secs(interval));
1213 loop {
1214 tick.tick().await;
1215 let routes = inner.routes.lock().await.clone();
1216 let peers = inner.peers_by_id.lock().await.clone();
1217 for (peer_id, route) in routes {
1218 if !matches!(route, PeerRoute::Direct { .. }) {
1219 continue;
1220 }
1221 if let Some(addr) = peers.get(&peer_id).copied() {
1222 if let Err(err) = inner.initiate_handshake(addr, 0).await {
1223 tracing::debug!(peer = %peer_id, "rekey handshake failed: {err}");
1224 }
1225 }
1226 }
1227 }
1228 });
1229 }
1230
1231 fn spawn_scaling(inner: Arc<Self>) {
1232 tokio::spawn(async move {
1233 let mut tick = tokio::time::interval(Duration::from_secs(10));
1234 loop {
1235 tick.tick().await;
1236 let Some(max_direct) = inner.max_direct_peers else {
1237 continue;
1238 };
1239 let routes = inner.routes.lock().await.clone();
1240 let relay_candidates = inner.relay_candidates.lock().await.clone();
1241 let peer_addrs = inner.peer_addrs.lock().await.clone();
1242 let mut direct_peers: Vec<PeerId> = routes
1243 .iter()
1244 .filter_map(|(peer_id, route)| match route {
1245 PeerRoute::Direct { .. } => Some(*peer_id),
1246 _ => None,
1247 })
1248 .collect();
1249
1250 if direct_peers.len() <= max_direct {
1251 continue;
1252 }
1253
1254 direct_peers.sort_by_key(|peer| peer.to_hex());
1255 let mut routes_guard = inner.routes.lock().await;
1256 let mut direct_count = direct_peers.len();
1257 for peer_id in direct_peers {
1258 if direct_count <= max_direct {
1259 break;
1260 }
1261 let Some(relay) = relay_candidates.get(&peer_id).copied() else {
1262 continue;
1263 };
1264 if matches!(routes_guard.get(&peer_id), Some(PeerRoute::Relayed { .. })) {
1265 continue;
1266 }
1267 routes_guard.insert(peer_id, PeerRoute::Relayed { via: relay });
1268 direct_count = direct_count.saturating_sub(1);
1269 drop(routes_guard);
1270 let _ = inner
1271 .events_tx
1272 .send(MeshEvent::RouteUpdated {
1273 peer_id,
1274 route: PeerRoute::Relayed { via: relay },
1275 })
1276 .await;
1277 tracing::info!(peer = %peer_id, relay = %relay, "scaled route to relay");
1278 routes_guard = inner.routes.lock().await;
1279 }
1280
1281 if direct_count <= max_direct {
1282 continue;
1283 }
1284
1285 drop(routes_guard);
1288
1289 if direct_count < max_direct {
1291 let mut routes_guard = inner.routes.lock().await;
1292 for (peer_id, route) in routes.iter() {
1293 if !matches!(route, PeerRoute::Relayed { .. }) {
1294 continue;
1295 }
1296 if let Some(addr) = peer_addrs.get(peer_id).copied() {
1297 routes_guard.insert(*peer_id, PeerRoute::Direct { addr });
1298 let _ = inner
1299 .events_tx
1300 .send(MeshEvent::RouteUpdated {
1301 peer_id: *peer_id,
1302 route: PeerRoute::Direct { addr },
1303 })
1304 .await;
1305 tracing::info!(peer = %peer_id, "scaled route back to direct");
1306 }
1307 }
1308 }
1309 }
1310 });
1311 }
1312
1313 async fn add_socket(self: &Arc<Self>, socket: UdpSocket) -> Result<usize> {
1314 let socket = Arc::new(socket);
1315 let mut sockets = self.sockets.lock().await;
1316 sockets.push(MeshSocket::Udp(socket.clone()));
1317 let idx = sockets.len() - 1;
1318 drop(sockets);
1319 Self::spawn_receiver(self.clone(), idx, MeshSocket::Udp(socket));
1320 Ok(idx)
1321 }
1322
1323 async fn add_turn_relay(self: &Arc<Self>, relay: Arc<TurnRelay>) -> Result<usize> {
1324 let mut sockets = self.sockets.lock().await;
1325 sockets.push(MeshSocket::Turn(relay.clone()));
1326 let idx = sockets.len() - 1;
1327 drop(sockets);
1328 Self::spawn_receiver(self.clone(), idx, MeshSocket::Turn(relay));
1329 Ok(idx)
1330 }
1331
1332 async fn initiate_handshake(&self, addr: SocketAddr, socket_idx: usize) -> Result<()> {
1333 let builder = rift_core::noise::noise_builder();
1334 let static_kp = builder.generate_keypair()?;
1335 let mut hs = builder
1336 .local_private_key(&static_kp.private)
1337 .build_initiator()?;
1338
1339 let mut buf = [0u8; MAX_PACKET];
1340 let len = hs.write_message(&[], &mut buf)?;
1341 self.send_raw(socket_idx, addr, &buf[..len]).await?;
1342
1343 let mut pending = self.pending.lock().await;
1344 pending.insert(
1345 PendingKey { socket_idx, addr },
1346 PendingHandshake::InitiatorAwait2(hs),
1347 );
1348 Ok(())
1349 }
1350
1351 async fn handle_packet(self: Arc<Self>, socket_idx: usize, addr: SocketAddr, data: &[u8]) -> Result<()> {
1352 if !self.allow_packet(addr).await {
1353 return Ok(());
1354 }
1355 #[cfg(feature = "predictive-rendezvous")]
1356 if self.try_handle_pr_probe(addr, data).await? {
1357 return Ok(());
1358 }
1359 if self.try_handle_pending(socket_idx, addr, data).await? {
1360 return Ok(());
1361 }
1362
1363 let maybe_session = {
1364 let mut connections = self.connections.lock().await;
1365 connections.get_mut(&addr).map(|conn| {
1366 let mut out = vec![0u8; MAX_PACKET];
1367 let len = conn.session.decrypt(data, &mut out)?;
1368 Ok::<_, anyhow::Error>(out[..len].to_vec())
1369 })
1370 };
1371
1372 let plaintext = match maybe_session {
1373 Some(res) => res?,
1374 None => {
1375 self.start_responder(socket_idx, addr, data).await?;
1376 return Ok(());
1377 }
1378 };
1379
1380 self.record_recv(addr, data.len()).await;
1381 let (header, payload) = decode_frame(&plaintext)?;
1382 self.update_link_stats(header.source, header.seq, header.timestamp)
1383 .await;
1384 self.handle_frame(addr, header, payload).await?;
1385 Ok(())
1386 }
1387
1388 async fn try_handle_pending(
1389 &self,
1390 socket_idx: usize,
1391 addr: SocketAddr,
1392 data: &[u8],
1393 ) -> Result<bool> {
1394 let mut pending = self.pending.lock().await;
1395 let key = PendingKey { socket_idx, addr };
1396 let Some(state) = pending.remove(&key) else {
1397 return Ok(false);
1398 };
1399 drop(pending);
1400
1401 match state {
1402 PendingHandshake::InitiatorAwait2(mut hs) => {
1403 let mut out = [0u8; MAX_PACKET];
1404 hs.read_message(data, &mut out)?;
1405 let len = hs.write_message(&[], &mut out)?;
1406 self.send_raw(socket_idx, addr, &out[..len]).await?;
1407 let transport = hs.into_transport_mode()?;
1408 self.install_connection(addr, transport, socket_idx).await?;
1409 }
1410 PendingHandshake::ResponderAwait3(mut hs) => {
1411 let mut out = [0u8; MAX_PACKET];
1412 hs.read_message(data, &mut out)?;
1413 let transport = hs.into_transport_mode()?;
1414 self.install_connection(addr, transport, socket_idx).await?;
1415 }
1416 }
1417
1418 Ok(true)
1419 }
1420
1421 async fn start_responder(
1422 &self,
1423 socket_idx: usize,
1424 addr: SocketAddr,
1425 first_msg: &[u8],
1426 ) -> Result<()> {
1427 let builder = rift_core::noise::noise_builder();
1428 let static_kp = builder.generate_keypair()?;
1429 let mut hs = builder
1430 .local_private_key(&static_kp.private)
1431 .build_responder()?;
1432
1433 let mut out = [0u8; MAX_PACKET];
1434 hs.read_message(first_msg, &mut out)?;
1435 let len = hs.write_message(&[], &mut out)?;
1436 self.send_raw(socket_idx, addr, &out[..len]).await?;
1437
1438 let mut pending = self.pending.lock().await;
1439 pending.insert(
1440 PendingKey { socket_idx, addr },
1441 PendingHandshake::ResponderAwait3(hs),
1442 );
1443 Ok(())
1444 }
1445
1446 async fn install_connection(
1447 &self,
1448 addr: SocketAddr,
1449 transport: snow::TransportState,
1450 socket_idx: usize,
1451 ) -> Result<()> {
1452 let conn = PeerConnection {
1453 peer_id: None,
1454 session: rift_core::noise::NoiseSession::new(transport),
1455 socket_idx,
1456 authenticated: !self.auth_required,
1457 };
1458 let mut connections = self.connections.lock().await;
1459 connections.insert(addr, conn);
1460 drop(connections);
1461
1462 if let Some(token) = self.auth_token.clone() {
1463 let auth = ControlMessage::Auth { token };
1464 let seq = self.next_control_seq().await;
1465 let _ = self
1466 .send_payload(
1467 addr,
1468 RiftPayload::Control(auth),
1469 seq,
1470 now_timestamp(),
1471 SessionId::NONE,
1472 None,
1473 )
1474 .await;
1475 }
1476
1477 let join = ControlMessage::Join {
1478 peer_id: self.identity.peer_id,
1479 display_name: None,
1480 };
1481 let seq = self.next_control_seq().await;
1482 self.send_payload(
1483 addr,
1484 RiftPayload::Control(join),
1485 seq,
1486 now_timestamp(),
1487 SessionId::NONE,
1488 None,
1489 )
1490 .await?;
1491
1492 let state = ControlMessage::PeerState {
1493 peer_id: self.identity.peer_id,
1494 relay_capable: self.relay_capable,
1495 };
1496 let seq = self.next_control_seq().await;
1497 self.send_payload(
1498 addr,
1499 RiftPayload::Control(state),
1500 seq,
1501 now_timestamp(),
1502 SessionId::NONE,
1503 None,
1504 )
1505 .await?;
1506
1507 self.send_peer_list(addr).await?;
1508 self.send_hello(addr).await?;
1509 Ok(())
1510 }
1511
1512 async fn send_peer_list(&self, addr: SocketAddr) -> Result<()> {
1513 let peers: Vec<PeerInfo> = {
1514 let peers_by_id = self.peers_by_id.lock().await;
1515 let peer_caps = self.peer_caps.lock().await;
1516 let peer_candidates = self.peer_candidates.lock().await;
1517 peers_by_id
1518 .iter()
1519 .map(|(peer_id, addr)| {
1520 let mut addrs = peer_candidates
1521 .get(peer_id)
1522 .cloned()
1523 .unwrap_or_else(|| vec![*addr]);
1524 if addrs.is_empty() {
1525 addrs.push(*addr);
1526 }
1527 addrs.sort();
1528 addrs.dedup();
1529 PeerInfo {
1530 peer_id: *peer_id,
1531 addr: *addr,
1532 addrs,
1533 relay_capable: *peer_caps.get(peer_id).unwrap_or(&false),
1534 }
1535 })
1536 .collect()
1537 };
1538 let msg = RiftPayload::Control(ControlMessage::PeerList { peers });
1539 let seq = self.next_control_seq().await;
1540 self.send_payload(addr, msg, seq, now_timestamp(), SessionId::NONE, None)
1541 .await
1542 }
1543
1544 async fn handle_capabilities(&self, peer_id: PeerId, capabilities: Capabilities) -> Result<()> {
1545 {
1546 let mut caps = self.peer_capabilities.lock().await;
1547 caps.insert(peer_id, capabilities.clone());
1548 }
1549
1550 let session = self.negotiate_session_config(&capabilities).await;
1551 {
1552 let mut sessions = self.peer_session.lock().await;
1553 sessions.insert(peer_id, session.clone());
1554 }
1555 self.update_group_codec().await;
1556
1557 let _ = self
1558 .events_tx
1559 .send(MeshEvent::PeerCapabilities {
1560 peer_id,
1561 capabilities: capabilities.clone(),
1562 })
1563 .await;
1564 let _ = self
1565 .events_tx
1566 .send(MeshEvent::PeerSessionConfig {
1567 peer_id,
1568 codec: session.codec,
1569 frame_ms: session.frame_ms,
1570 })
1571 .await;
1572 self.maybe_start_e2ee(peer_id, &capabilities).await?;
1573 Ok(())
1574 }
1575
1576 async fn maybe_start_e2ee(&self, peer_id: PeerId, capabilities: &Capabilities) -> Result<()> {
1577 if self.e2ee_key.is_some() {
1578 return Ok(());
1579 }
1580 let local_supports = {
1581 let preferred = self.preferred_features.lock().await;
1582 if preferred.is_empty() {
1583 true
1584 } else {
1585 preferred.contains(&FeatureFlag::E2EE)
1586 }
1587 };
1588 if !local_supports || !capabilities.features.contains(&FeatureFlag::E2EE) {
1589 return Ok(());
1590 }
1591 let _ = self.start_e2ee(peer_id, self.channel_session).await;
1592 Ok(())
1593 }
1594
1595 async fn negotiate_session_config(&self, remote: &Capabilities) -> SessionConfig {
1596 let preferred = {
1597 let prefs = self.preferred_codecs.lock().await;
1598 if prefs.is_empty() {
1599 vec![CodecId::Opus, CodecId::PCM16]
1600 } else {
1601 prefs.clone()
1602 }
1603 };
1604 let codec = preferred
1605 .into_iter()
1606 .find(|codec| remote.audio_codecs.contains(codec))
1607 .unwrap_or(CodecId::Opus);
1608 let frame_ms = remote
1609 .preferred_frame_duration_ms
1610 .unwrap_or(20)
1611 .min(20);
1612 SessionConfig { codec, frame_ms }
1613 }
1614
1615 async fn update_group_codec(&self) {
1616 let caps = self.peer_capabilities.lock().await;
1617 let preferred = {
1618 let prefs = self.preferred_codecs.lock().await;
1619 if prefs.is_empty() {
1620 vec![CodecId::Opus, CodecId::PCM16]
1621 } else {
1622 prefs.clone()
1623 }
1624 };
1625 let mut selected = CodecId::Opus;
1626 for codec in preferred {
1627 let mut all_support = true;
1628 for cap in caps.values() {
1629 if !cap.audio_codecs.contains(&codec) {
1630 all_support = false;
1631 break;
1632 }
1633 }
1634 if all_support {
1635 selected = codec;
1636 break;
1637 }
1638 }
1639 let mut group = self.group_codec.lock().await;
1640 if *group != selected {
1641 *group = selected;
1642 let _ = self.events_tx.send(MeshEvent::GroupCodec(selected)).await;
1643 }
1644 }
1645
1646 async fn send_hello(&self, addr: SocketAddr) -> Result<()> {
1647 let caps = self.default_capabilities().await;
1648 let public_key = self.identity.keypair.public.to_bytes().to_vec();
1649 let candidates = {
1650 let self_candidates = self.self_candidates.lock().await;
1651 self_candidates.clone()
1652 };
1653 let msg = RiftPayload::Control(ControlMessage::Hello {
1654 peer_id: self.identity.peer_id,
1655 public_key,
1656 capabilities: caps,
1657 candidates,
1658 });
1659 let seq = self.next_control_seq().await;
1660 self.send_payload(addr, msg, seq, now_timestamp(), SessionId::NONE, None)
1661 .await
1662 }
1663
1664 async fn send_ice_candidates(&self, peer_id: PeerId, session: SessionId) -> Result<()> {
1665 let candidates = {
1666 let self_ice = self.self_ice_candidates.lock().await;
1667 self_ice.clone()
1668 };
1669 if candidates.is_empty() {
1670 return Ok(());
1671 }
1672 let msg = ControlMessage::IceCandidates {
1673 peer_id: self.identity.peer_id,
1674 session,
1675 candidates,
1676 };
1677 self.send_control_to_peer(peer_id, msg, session).await
1678 }
1679
1680 async fn default_capabilities(&self) -> Capabilities {
1681 let preferred = self.preferred_codecs().await;
1682 let codecs = if preferred.is_empty() {
1683 vec![CodecId::Opus, CodecId::PCM16]
1684 } else {
1685 preferred
1686 };
1687 let features = {
1688 let preferred = self.preferred_features.lock().await;
1689 if preferred.is_empty() {
1690 vec![
1691 FeatureFlag::Voice,
1692 FeatureFlag::Text,
1693 FeatureFlag::Relay,
1694 FeatureFlag::E2EE,
1695 ]
1696 } else {
1697 preferred.clone()
1698 }
1699 };
1700 Capabilities {
1701 supported_versions: vec![ProtocolVersion::V2, ProtocolVersion::V1],
1702 audio_codecs: codecs,
1703 features,
1704 max_bitrate: Some(96_000),
1705 preferred_frame_duration_ms: Some(20),
1706 }
1707 }
1708
1709 async fn preferred_codecs(&self) -> Vec<CodecId> {
1710 self.preferred_codecs.lock().await.clone()
1711 }
1712
1713 async fn set_preferred_codecs(&self, codecs: Vec<CodecId>) {
1714 let mut preferred = self.preferred_codecs.lock().await;
1715 *preferred = codecs;
1716 }
1717
1718 async fn set_preferred_features(&self, features: Vec<FeatureFlag>) {
1719 let mut preferred = self.preferred_features.lock().await;
1720 *preferred = features;
1721 }
1722
1723 async fn next_control_seq(&self) -> u32 {
1724 let mut seq = self.control_seq.lock().await;
1725 let current = *seq;
1726 *seq = seq.wrapping_add(1);
1727 current
1728 }
1729
1730 async fn send_control_to_peer(
1731 &self,
1732 peer_id: PeerId,
1733 msg: ControlMessage,
1734 session: SessionId,
1735 ) -> Result<()> {
1736 let payload = RiftPayload::Control(msg);
1737 let routes = self.routes_snapshot().await;
1738 let Some(route) = routes.get(&peer_id).cloned() else {
1739 return Err(anyhow!("missing route"));
1740 };
1741 let seq = self.next_control_seq().await;
1742 self.send_to_peer(peer_id, route, payload, seq, now_timestamp(), session)
1743 .await
1744 }
1745
1746 async fn start_e2ee(&self, peer_id: PeerId, session: SessionId) -> Result<()> {
1747 if peer_id == self.identity.peer_id {
1748 return Ok(());
1749 }
1750 if self.identity.peer_id.0 > peer_id.0 {
1752 return Ok(());
1753 }
1754 {
1755 let keys = self.e2ee_keys.lock().await;
1756 if keys.contains_key(&(peer_id, session)) {
1757 return Ok(());
1758 }
1759 }
1760 {
1761 let pending = self.e2ee_pending.lock().await;
1762 if pending.contains_key(&(peer_id, session)) {
1763 return Ok(());
1764 }
1765 }
1766 let peer_public = {
1767 let keys = self.peer_public_keys.lock().await;
1768 keys.get(&peer_id).cloned()
1769 };
1770 let Some(peer_public) = peer_public else {
1771 return Ok(());
1772 };
1773 let Some(_peer_public) = ed25519_public_from_bytes(&peer_public) else {
1774 return Ok(());
1775 };
1776 let keypair = generate_e2ee_keypair();
1777 let signature = sign_e2ee_public(&self.identity, &session.0, &keypair.public);
1778 let public_key = keypair.public.to_bytes();
1779 {
1780 let mut pending = self.e2ee_pending.lock().await;
1781 pending.insert((peer_id, session), keypair);
1782 }
1783 let msg = ControlMessage::E2eeInit {
1784 session,
1785 from: self.identity.peer_id,
1786 public_key,
1787 signature,
1788 };
1789 self.send_control_to_peer(peer_id, msg, session).await?;
1790 tracing::info!(peer = %peer_id, session = ?session, "e2ee init sent");
1791 Ok(())
1792 }
1793
1794 async fn handle_e2ee_init(
1795 &self,
1796 peer_id: PeerId,
1797 session: SessionId,
1798 public_key: [u8; 32],
1799 signature: Vec<u8>,
1800 ) -> Result<()> {
1801 let should_initiate = self.identity.peer_id.0 < peer_id.0;
1802 {
1803 let keys = self.e2ee_keys.lock().await;
1804 if keys.contains_key(&(peer_id, session)) {
1805 return Ok(());
1806 }
1807 }
1808 if should_initiate {
1809 let pending = self.e2ee_pending.lock().await;
1810 if pending.contains_key(&(peer_id, session)) {
1811 tracing::info!(
1812 peer = %peer_id,
1813 session = ?session,
1814 "e2ee init ignored (local is initiator)"
1815 );
1816 return Ok(());
1817 }
1818 } else {
1819 let mut pending = self.e2ee_pending.lock().await;
1820 pending.remove(&(peer_id, session));
1821 }
1822 let peer_public = {
1823 let keys = self.peer_public_keys.lock().await;
1824 keys.get(&peer_id).cloned()
1825 };
1826 let Some(peer_public) = peer_public else {
1827 return Ok(());
1828 };
1829 let Some(peer_public) = ed25519_public_from_bytes(&peer_public) else {
1830 return Ok(());
1831 };
1832 let remote_public = public_key_from_bytes(public_key);
1833 if !verify_e2ee_public(&peer_public, &session.0, &remote_public, &signature) {
1834 tracing::warn!(peer = %peer_id, "e2ee init signature invalid");
1835 return Ok(());
1836 }
1837 let keypair = generate_e2ee_keypair();
1838 let shared = derive_e2ee_shared_key(&keypair.secret, &remote_public, &session.0);
1839 let shared_fingerprint = hex::encode(&sha2::Sha256::digest(shared)[..4]);
1840 {
1841 let mut keys = self.e2ee_keys.lock().await;
1842 keys.insert((peer_id, session), shared);
1843 }
1844 {
1845 let mut ready = self.e2ee_ready.lock().await;
1846 let entry = ready
1847 .entry((peer_id, session))
1848 .or_insert(E2eeReadyState {
1849 local_ready: false,
1850 remote_ready: false,
1851 });
1852 entry.local_ready = true;
1853 }
1854 let response = ControlMessage::E2eeResp {
1855 session,
1856 from: self.identity.peer_id,
1857 public_key: keypair.public.to_bytes(),
1858 signature: sign_e2ee_public(&self.identity, &session.0, &keypair.public),
1859 };
1860 self.send_control_to_peer(peer_id, response, session).await?;
1861 tracing::info!(
1862 peer = %peer_id,
1863 session = ?session,
1864 key = %shared_fingerprint,
1865 "e2ee key derived (resp)"
1866 );
1867 let ready_msg = ControlMessage::EncryptedReady { session, alg: 1 };
1868 let _ = self.send_control_to_peer(peer_id, ready_msg, session).await;
1869 tracing::info!(peer = %peer_id, session = ?session, "e2ee response sent");
1870 Ok(())
1871 }
1872
1873 async fn handle_e2ee_resp(
1874 &self,
1875 peer_id: PeerId,
1876 session: SessionId,
1877 public_key: [u8; 32],
1878 signature: Vec<u8>,
1879 ) -> Result<()> {
1880 {
1881 let keys = self.e2ee_keys.lock().await;
1882 if keys.contains_key(&(peer_id, session)) {
1883 return Ok(());
1884 }
1885 }
1886 let pending = {
1887 let mut pending = self.e2ee_pending.lock().await;
1888 pending.remove(&(peer_id, session))
1889 };
1890 let Some(pending) = pending else {
1891 return Ok(());
1892 };
1893 let peer_public = {
1894 let keys = self.peer_public_keys.lock().await;
1895 keys.get(&peer_id).cloned()
1896 };
1897 let Some(peer_public) = peer_public else {
1898 return Ok(());
1899 };
1900 let Some(peer_public) = ed25519_public_from_bytes(&peer_public) else {
1901 return Ok(());
1902 };
1903 let remote_public = public_key_from_bytes(public_key);
1904 if !verify_e2ee_public(&peer_public, &session.0, &remote_public, &signature) {
1905 tracing::warn!(peer = %peer_id, "e2ee response signature invalid");
1906 return Ok(());
1907 }
1908 let shared = derive_e2ee_shared_key(&pending.secret, &remote_public, &session.0);
1909 let shared_fingerprint = hex::encode(&sha2::Sha256::digest(shared)[..4]);
1910 {
1911 let mut keys = self.e2ee_keys.lock().await;
1912 keys.insert((peer_id, session), shared);
1913 }
1914 {
1915 let mut ready = self.e2ee_ready.lock().await;
1916 let entry = ready
1917 .entry((peer_id, session))
1918 .or_insert(E2eeReadyState {
1919 local_ready: false,
1920 remote_ready: false,
1921 });
1922 entry.local_ready = true;
1923 }
1924 let ready_msg = ControlMessage::EncryptedReady { session, alg: 1 };
1925 let _ = self.send_control_to_peer(peer_id, ready_msg, session).await;
1926 tracing::info!(
1927 peer = %peer_id,
1928 session = ?session,
1929 key = %shared_fingerprint,
1930 "e2ee key derived (init)"
1931 );
1932 tracing::info!(peer = %peer_id, session = ?session, "e2ee key established");
1933 Ok(())
1934 }
1935
1936 async fn maybe_encrypt_payload(
1937 &self,
1938 payload: RiftPayload,
1939 header: &RiftFrameHeader,
1940 peer_id: Option<PeerId>,
1941 session: SessionId,
1942 ) -> Result<RiftPayload> {
1943 match payload {
1944 RiftPayload::Relay { target, inner } => {
1945 if should_encrypt(&inner) {
1946 let use_group = self.should_use_group_key(session).await
1947 && matches!(*inner, RiftPayload::Voice(_));
1948 let key = if use_group {
1949 self.e2ee_key.ok_or_else(|| anyhow!("missing e2ee key"))?
1950 } else {
1951 let keys = self.e2ee_keys.lock().await;
1952 peer_id
1953 .and_then(|peer| keys.get(&(peer, session)).copied())
1954 .or(self.e2ee_key)
1955 .ok_or_else(|| anyhow!("missing e2ee key"))?
1956 };
1957 if !use_group {
1958 if let Some(peer) = peer_id {
1959 let ready = self.e2ee_ready.lock().await;
1960 let ready = ready
1961 .get(&(peer, session))
1962 .map(|state| state.local_ready && state.remote_ready)
1963 .unwrap_or(false);
1964 if !ready {
1965 tracing::debug!(
1966 target: "security",
1967 peer = %peer,
1968 session = ?session,
1969 "e2ee not ready; sending plaintext"
1970 );
1971 return Ok(RiftPayload::Relay { target, inner });
1972 }
1973 }
1974 }
1975 let encrypted = RiftPayload::Encrypted(
1976 encrypt_payload_with_key(&key, header, &inner)?
1977 );
1978 Ok(RiftPayload::Relay {
1979 target,
1980 inner: Box::new(encrypted),
1981 })
1982 } else {
1983 Ok(RiftPayload::Relay { target, inner })
1984 }
1985 }
1986 other => {
1987 if !should_encrypt(&other) {
1988 return Ok(other);
1989 }
1990 let use_group = self.should_use_group_key(session).await
1991 && matches!(other, RiftPayload::Voice(_));
1992 let key = if use_group {
1993 self.e2ee_key.ok_or_else(|| anyhow!("missing e2ee key"))?
1994 } else {
1995 let keys = self.e2ee_keys.lock().await;
1996 peer_id
1997 .and_then(|peer| keys.get(&(peer, session)).copied())
1998 .or(self.e2ee_key)
1999 .ok_or_else(|| anyhow!("missing e2ee key"))?
2000 };
2001 if !use_group {
2002 if let Some(peer) = peer_id {
2003 let ready = self.e2ee_ready.lock().await;
2004 let ready = ready
2005 .get(&(peer, session))
2006 .map(|state| state.local_ready && state.remote_ready)
2007 .unwrap_or(false);
2008 if !ready {
2009 tracing::debug!(
2010 target: "security",
2011 peer = %peer,
2012 session = ?session,
2013 "e2ee not ready; sending plaintext"
2014 );
2015 return Ok(other);
2016 }
2017 }
2018 }
2019 let encrypted = encrypt_payload_with_key(&key, header, &other)?;
2020 Ok(RiftPayload::Encrypted(encrypted))
2021 }
2022 }
2023 }
2024
2025 async fn decrypt_payload(
2026 &self,
2027 header: &RiftFrameHeader,
2028 encrypted: EncryptedPayload,
2029 peer_id: PeerId,
2030 session: SessionId,
2031 ) -> Result<RiftPayload> {
2032 let key = {
2033 let keys = self.e2ee_keys.lock().await;
2034 keys.get(&(peer_id, session)).copied()
2035 }
2036 .or(self.e2ee_key)
2037 .ok_or_else(|| anyhow!("missing e2ee key"))?;
2038 decrypt_payload_with_key(&key, header, encrypted)
2039 }
2040
2041 async fn update_link_stats(&self, peer_id: PeerId, seq: u32, sent_ms: u64) {
2042 if peer_id == self.identity.peer_id {
2043 return;
2044 }
2045 let arrival_ms = now_timestamp();
2046 let mut stats_map = self.link_stats.lock().await;
2047 let state = stats_map.entry(peer_id).or_insert_with(LinkStatsState::new);
2048 state.update_on_receive(seq, sent_ms, arrival_ms);
2049 let should_emit = state.last_emit.elapsed() >= Duration::from_millis(500);
2050 let stats = state.snapshot();
2051 if should_emit {
2052 state.last_emit = tokio::time::Instant::now();
2053 }
2054 drop(stats_map);
2055 if should_emit {
2056 let global = self.global_stats_snapshot().await;
2057 let _ = self
2058 .events_tx
2059 .send(MeshEvent::StatsUpdate {
2060 peer: peer_id,
2061 stats,
2062 global,
2063 })
2064 .await;
2065 metrics::observe_histogram(
2066 "rift_rtt_ms",
2067 &[("peer", &peer_id.to_hex())],
2068 stats.rtt_ms as f64,
2069 );
2070 metrics::set_gauge(
2071 "rift_packet_loss",
2072 &[("peer", &peer_id.to_hex())],
2073 stats.loss as f64,
2074 );
2075 metrics::set_gauge(
2076 "rift_jitter_ms",
2077 &[("peer", &peer_id.to_hex())],
2078 stats.jitter_ms as f64,
2079 );
2080 self.consider_route(peer_id, stats).await;
2081 }
2082 }
2083
2084 async fn update_rtt(&self, peer_id: PeerId, sent_at_ms: u64) {
2085 let now_ms = now_timestamp();
2086 let rtt_ms = now_ms.saturating_sub(sent_at_ms) as f32;
2087 let mut stats_map = self.link_stats.lock().await;
2088 let state = stats_map.entry(peer_id).or_insert_with(LinkStatsState::new);
2089 state.update_rtt(rtt_ms);
2090 let stats = state.snapshot();
2091 drop(stats_map);
2092 let global = self.global_stats_snapshot().await;
2093 let _ = self
2094 .events_tx
2095 .send(MeshEvent::StatsUpdate {
2096 peer: peer_id,
2097 stats,
2098 global,
2099 })
2100 .await;
2101 metrics::observe_histogram(
2102 "rift_rtt_ms",
2103 &[("peer", &peer_id.to_hex())],
2104 stats.rtt_ms as f64,
2105 );
2106 self.consider_route(peer_id, stats).await;
2107 }
2108
2109 async fn allow_packet(&self, addr: SocketAddr) -> bool {
2110 let now = Instant::now();
2111 let mut limits = self.rate_limits.lock().await;
2112 let entry = limits.entry(addr).or_insert_with(|| RateLimitState::new(now));
2113 let allowed = entry.allow(now);
2114 if !allowed {
2115 if now.duration_since(entry.last_drop) >= RATE_LIMIT_WINDOW {
2116 entry.last_drop = now;
2117 tracing::warn!(target = "security", %addr, "rate limit exceeded");
2118 }
2119 metrics::inc_counter("rift_packets_dropped", &[("reason", "rate_limit")]);
2120 }
2121 allowed
2122 }
2123
2124 #[cfg(feature = "predictive-rendezvous")]
2125 async fn try_handle_pr_probe(&self, addr: SocketAddr, data: &[u8]) -> Result<bool> {
2126 let parsed = match parse_probe_payload(data) {
2127 Ok(parsed) => parsed,
2128 Err(_) => return Ok(false),
2129 };
2130
2131 let rendezvous_id = parsed.rendezvous_id;
2132 let tx = {
2133 let sessions = self.pr_sessions.lock().await;
2134 sessions.get(&rendezvous_id).map(|session| session.tx.clone())
2135 };
2136
2137 let Some(tx) = tx else {
2138 return Ok(false);
2139 };
2140
2141 if tx.send((addr, parsed)).await.is_err() {
2142 let mut sessions = self.pr_sessions.lock().await;
2143 sessions.remove(&rendezvous_id);
2144 }
2145
2146 Ok(true)
2147 }
2148
2149 #[cfg(feature = "predictive-rendezvous")]
2150 async fn register_pr_session(
2151 &self,
2152 rendezvous_id: u64,
2153 tx: mpsc::Sender<(SocketAddr, ParsedProbe)>,
2154 ) -> Result<()> {
2155 let mut sessions = self.pr_sessions.lock().await;
2156 if sessions.contains_key(&rendezvous_id) {
2157 return Err(anyhow!("predictive rendezvous session already exists"));
2158 }
2159 sessions.insert(rendezvous_id, PrSession { tx });
2160 Ok(())
2161 }
2162
2163 #[cfg(feature = "predictive-rendezvous")]
2164 async fn unregister_pr_session(&self, rendezvous_id: u64) {
2165 let mut sessions = self.pr_sessions.lock().await;
2166 sessions.remove(&rendezvous_id);
2167 }
2168
2169 #[cfg(feature = "predictive-rendezvous")]
2170 async fn primary_local_port(&self) -> Result<u16> {
2171 let sockets = self.sockets.lock().await;
2172 match sockets.first() {
2173 Some(MeshSocket::Udp(sock)) => Ok(sock.local_addr()?.port()),
2174 Some(MeshSocket::Turn(relay)) => Ok(relay.relay_addr().port()),
2175 None => Err(anyhow!("missing socket")),
2176 }
2177 }
2178
2179 async fn record_send(&self, addr: SocketAddr, bytes: usize) {
2180 let peer_id = {
2181 let connections = self.connections.lock().await;
2182 connections.get(&addr).and_then(|conn| conn.peer_id)
2183 };
2184 let mut global = self.global_traffic.lock().await;
2185 global.packets_sent = global.packets_sent.saturating_add(1);
2186 global.bytes_sent = global.bytes_sent.saturating_add(bytes as u64);
2187 drop(global);
2188 metrics::inc_counter("rift_packets_sent", &[]);
2189 metrics::add_counter("rift_bytes_sent", &[], bytes as u64);
2190 if let Some(peer_id) = peer_id {
2191 let mut peer = self.peer_traffic.lock().await;
2192 let entry = peer.entry(peer_id).or_default();
2193 entry.packets_sent = entry.packets_sent.saturating_add(1);
2194 entry.bytes_sent = entry.bytes_sent.saturating_add(bytes as u64);
2195 drop(peer);
2196 metrics::inc_counter("rift_packets_sent", &[("peer", &peer_id.to_hex())]);
2197 metrics::add_counter("rift_bytes_sent", &[("peer", &peer_id.to_hex())], bytes as u64);
2198 }
2199 }
2200
2201 async fn record_recv(&self, addr: SocketAddr, bytes: usize) {
2202 let peer_id = {
2203 let connections = self.connections.lock().await;
2204 connections.get(&addr).and_then(|conn| conn.peer_id)
2205 };
2206 let mut global = self.global_traffic.lock().await;
2207 global.packets_received = global.packets_received.saturating_add(1);
2208 global.bytes_received = global.bytes_received.saturating_add(bytes as u64);
2209 drop(global);
2210 metrics::inc_counter("rift_packets_received", &[]);
2211 metrics::add_counter("rift_bytes_received", &[], bytes as u64);
2212 if let Some(peer_id) = peer_id {
2213 let mut peer = self.peer_traffic.lock().await;
2214 let entry = peer.entry(peer_id).or_default();
2215 entry.packets_received = entry.packets_received.saturating_add(1);
2216 entry.bytes_received = entry.bytes_received.saturating_add(bytes as u64);
2217 drop(peer);
2218 metrics::inc_counter("rift_packets_received", &[("peer", &peer_id.to_hex())]);
2219 metrics::add_counter(
2220 "rift_bytes_received",
2221 &[("peer", &peer_id.to_hex())],
2222 bytes as u64,
2223 );
2224 }
2225 }
2226
2227 async fn global_stats_snapshot(&self) -> GlobalStats {
2228 let global = self.global_traffic.lock().await.clone();
2229 let peers = self.peers_by_id.lock().await.len() + 1;
2230 let sessions = self.session_mgr.lock().await.sessions.len();
2231 GlobalStats {
2232 num_peers: peers,
2233 num_sessions: sessions,
2234 packets_sent: global.packets_sent,
2235 packets_received: global.packets_received,
2236 bytes_sent: global.bytes_sent,
2237 bytes_received: global.bytes_received,
2238 }
2239 }
2240
2241 async fn emit_global_metrics(&self) {
2242 let global = self.global_stats_snapshot().await;
2243 metrics::set_gauge("rift_number_of_peers", &[], global.num_peers as f64);
2244 metrics::set_gauge("rift_number_of_sessions", &[], global.num_sessions as f64);
2245 }
2246
2247 async fn consider_route(&self, peer_id: PeerId, stats: LinkStats) {
2248 let qos = &self.qos;
2249 let prefer_relay =
2250 stats.loss > qos.packet_loss_tolerance || stats.rtt_ms > qos.max_latency_ms as f32;
2251 if prefer_relay {
2252 let relay = {
2253 let relays = self.relay_candidates.lock().await;
2254 relays.get(&peer_id).copied()
2255 };
2256 if let Some(relay) = relay {
2257 let relay_ok = {
2258 let peers = self.peers_by_id.lock().await;
2259 peers.contains_key(&relay)
2260 };
2261 if relay_ok {
2262 let mut routes = self.routes.lock().await;
2263 if !matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. })) {
2264 routes.insert(peer_id, PeerRoute::Relayed { via: relay });
2265 drop(routes);
2266 let _ = self
2267 .events_tx
2268 .send(MeshEvent::RouteUpdated {
2269 peer_id,
2270 route: PeerRoute::Relayed { via: relay },
2271 })
2272 .await;
2273 tracing::info!(peer = %peer_id, relay = %relay, "switching to relay route");
2274 }
2275 }
2276 }
2277 return;
2278 }
2279
2280 let direct_addr = {
2281 let peers = self.peers_by_id.lock().await;
2282 peers.get(&peer_id).copied()
2283 };
2284 if let Some(addr) = direct_addr {
2285 let mut routes = self.routes.lock().await;
2286 if !matches!(routes.get(&peer_id), Some(PeerRoute::Direct { .. })) {
2287 routes.insert(peer_id, PeerRoute::Direct { addr });
2288 drop(routes);
2289 let _ = self
2290 .events_tx
2291 .send(MeshEvent::RouteUpdated {
2292 peer_id,
2293 route: PeerRoute::Direct { addr },
2294 })
2295 .await;
2296 tracing::info!(peer = %peer_id, "switching to direct route");
2297 }
2298 }
2299 }
2300
2301 async fn send_payload(
2302 &self,
2303 addr: SocketAddr,
2304 payload: RiftPayload,
2305 seq: u32,
2306 timestamp: u64,
2307 session: SessionId,
2308 peer_id: Option<PeerId>,
2309 ) -> Result<()> {
2310 self.send_payload_with_source(
2311 addr,
2312 payload,
2313 seq,
2314 timestamp,
2315 self.identity.peer_id,
2316 session,
2317 peer_id,
2318 )
2319 .await
2320 }
2321
2322 async fn send_payload_with_source(
2323 &self,
2324 addr: SocketAddr,
2325 payload: RiftPayload,
2326 seq: u32,
2327 timestamp: u64,
2328 source: PeerId,
2329 session: SessionId,
2330 peer_id: Option<PeerId>,
2331 ) -> Result<()> {
2332 let stream = stream_for_payload(&payload);
2333 let header = RiftFrameHeader {
2334 version: ProtocolVersion::V1,
2335 stream,
2336 flags: 0,
2337 seq,
2338 timestamp,
2339 source,
2340 session,
2341 };
2342 let payload = self
2343 .maybe_encrypt_payload(payload, &header, peer_id, session)
2344 .await?;
2345 let plaintext = encode_frame(&header, &payload);
2346 let (ciphertext, socket_idx) = {
2347 let mut connections = self.connections.lock().await;
2348 let Some(conn) = connections.get_mut(&addr) else {
2349 return Err(anyhow!("missing connection"));
2350 };
2351 let mut out = vec![0u8; plaintext.len() + 128];
2352 let len = conn.session.encrypt(&plaintext, &mut out)?;
2353 out.truncate(len);
2354 (out, conn.socket_idx)
2355 };
2356 self.send_raw(socket_idx, addr, &ciphertext).await?;
2357 self.record_send(addr, ciphertext.len()).await;
2358 Ok(())
2359 }
2360
2361 async fn send_frame(
2362 &self,
2363 addr: SocketAddr,
2364 header: &RiftFrameHeader,
2365 payload: &RiftPayload,
2366 ) -> Result<()> {
2367 let plaintext = encode_frame(header, payload);
2368 let (ciphertext, socket_idx) = {
2369 let mut connections = self.connections.lock().await;
2370 let Some(conn) = connections.get_mut(&addr) else {
2371 return Err(anyhow!("missing connection"));
2372 };
2373 let mut out = vec![0u8; plaintext.len() + 128];
2374 let len = conn.session.encrypt(&plaintext, &mut out)?;
2375 out.truncate(len);
2376 (out, conn.socket_idx)
2377 };
2378 self.send_raw(socket_idx, addr, &ciphertext).await?;
2379 self.record_send(addr, ciphertext.len()).await;
2380 Ok(())
2381 }
2382
2383 async fn forward_group_voice(
2384 &self,
2385 header: &RiftFrameHeader,
2386 payload: &RiftPayload,
2387 ) -> Result<()> {
2388 let session = header.session;
2389 let from = header.source;
2390 let participants = {
2391 let mgr = self.session_mgr.lock().await;
2392 mgr.participants(session)
2393 };
2394 let routes = self.routes_snapshot().await;
2395 for peer_id in participants {
2396 if peer_id == self.identity.peer_id || peer_id == from {
2397 continue;
2398 }
2399 let Some(route) = routes.get(&peer_id).cloned() else {
2400 continue;
2401 };
2402 match route {
2403 PeerRoute::Direct { addr } => {
2404 let _ = self.send_frame(addr, header, payload).await;
2405 }
2406 PeerRoute::Relayed { via } => {
2407 let relay_addr = {
2408 let peers = self.peers_by_id.lock().await;
2409 peers.get(&via).copied()
2410 };
2411 let Some(relay_addr) = relay_addr else {
2412 continue;
2413 };
2414 let envelope = RiftPayload::Relay {
2415 target: peer_id,
2416 inner: Box::new(payload.clone()),
2417 };
2418 let _ = self.send_frame(relay_addr, header, &envelope).await;
2419 }
2420 }
2421 }
2422 Ok(())
2423 }
2424
2425 async fn handle_frame(
2426 self: Arc<Self>,
2427 addr: SocketAddr,
2428 header: RiftFrameHeader,
2429 payload: RiftPayload,
2430 ) -> Result<()> {
2431 if self.auth_required
2432 && !matches!(
2433 payload,
2434 RiftPayload::Control(ControlMessage::Auth { .. })
2435 | RiftPayload::Control(ControlMessage::Join { .. })
2436 | RiftPayload::Control(ControlMessage::Hello { .. })
2437 | RiftPayload::Control(ControlMessage::IceCandidates { .. })
2438 | RiftPayload::Control(ControlMessage::IceCheck { .. })
2439 | RiftPayload::Control(ControlMessage::IceCheckAck { .. })
2440 | RiftPayload::Control(ControlMessage::E2eeInit { .. })
2441 | RiftPayload::Control(ControlMessage::E2eeResp { .. })
2442 | RiftPayload::Control(ControlMessage::EncryptedReady { .. })
2443 | RiftPayload::Control(ControlMessage::PeerState { .. })
2444 )
2445 {
2446 let authenticated = {
2447 let connections = self.connections.lock().await;
2448 connections
2449 .get(&addr)
2450 .map(|conn| conn.authenticated)
2451 .unwrap_or(false)
2452 };
2453 if !authenticated {
2454 tracing::warn!(%addr, "unauthenticated peer message rejected");
2455 return Ok(());
2456 }
2457 }
2458 match payload {
2459 RiftPayload::Control(ControlMessage::Join { peer_id, .. }) => {
2460 let mut connections = self.connections.lock().await;
2461 if let Some(conn) = connections.get_mut(&addr) {
2462 conn.peer_id = Some(peer_id);
2463 }
2464 drop(connections);
2465
2466 let mut peers = self.peers_by_id.lock().await;
2467 peers.insert(peer_id, addr);
2468 drop(peers);
2469
2470 let mut peer_addrs = self.peer_addrs.lock().await;
2471 peer_addrs.insert(peer_id, addr);
2472 drop(peer_addrs);
2473 let mut peer_candidates = self.peer_candidates.lock().await;
2474 peer_candidates
2475 .entry(peer_id)
2476 .or_insert_with(|| vec![addr]);
2477 drop(peer_candidates);
2478
2479 let mut routes = self.routes.lock().await;
2480 let upgraded = matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. }));
2481 routes.insert(peer_id, PeerRoute::Direct { addr });
2482 drop(routes);
2483
2484 let mut sessions = self.session_mgr.lock().await;
2485 sessions.add_participant(self.channel_session, peer_id);
2486 drop(sessions);
2487 let _ = self.update_group_topology(self.channel_session).await;
2488
2489 let mut caps = self.peer_capabilities.lock().await;
2490 caps.entry(peer_id).or_insert_with(default_peer_capabilities);
2491 drop(caps);
2492 self.update_group_codec().await;
2493
2494 let _ = self.events_tx.send(MeshEvent::PeerJoined(peer_id)).await;
2495 let _ = self
2496 .events_tx
2497 .send(MeshEvent::RouteUpdated {
2498 peer_id,
2499 route: PeerRoute::Direct { addr },
2500 })
2501 .await;
2502 self.emit_global_metrics().await;
2503 if upgraded {
2504 let _ = self.events_tx.send(MeshEvent::RouteUpgraded(peer_id)).await;
2505 tracing::info!(peer = %peer_id, "route upgraded to direct");
2506 }
2507
2508 self.send_peer_list(addr).await?;
2509 let all_peers: Vec<SocketAddr> = {
2511 let peers_by_id = self.peers_by_id.lock().await;
2512 peers_by_id.values().copied().collect()
2513 };
2514 for peer_addr in all_peers {
2515 if peer_addr != addr {
2516 let _ = self.send_peer_list(peer_addr).await;
2517 }
2518 }
2519 }
2520 RiftPayload::Control(ControlMessage::PeerState {
2521 peer_id,
2522 relay_capable,
2523 }) => {
2524 let mut peer_caps = self.peer_caps.lock().await;
2525 peer_caps.insert(peer_id, relay_capable);
2526 drop(peer_caps);
2527 }
2528 RiftPayload::Control(ControlMessage::Hello { peer_id, public_key, capabilities, candidates }) => {
2529 {
2530 let mut keys = self.peer_public_keys.lock().await;
2531 keys.insert(peer_id, public_key.clone());
2532 }
2533 if !candidates.is_empty() {
2534 let mut peer_candidates = self.peer_candidates.lock().await;
2535 peer_candidates.insert(peer_id, candidates);
2536 }
2537 let _ = self
2538 .events_tx
2539 .send(MeshEvent::PeerIdentity {
2540 peer_id,
2541 public_key: public_key.clone(),
2542 })
2543 .await;
2544 self.handle_capabilities(peer_id, capabilities).await?;
2545 let authenticated = {
2546 let connections = self.connections.lock().await;
2547 connections
2548 .get(&addr)
2549 .map(|conn| conn.authenticated)
2550 .unwrap_or(false)
2551 };
2552 if !self.auth_required || authenticated {
2553 let _ = self.start_e2ee(peer_id, self.channel_session).await;
2554 }
2555 let _ = self.send_ice_candidates(peer_id, SessionId::NONE).await;
2556 }
2557 RiftPayload::Control(ControlMessage::IceCandidates { peer_id, candidates, .. }) => {
2558 if !candidates.is_empty() {
2559 let mut peer_ice = self.peer_ice_candidates.lock().await;
2560 peer_ice.insert(peer_id, candidates.clone());
2561 drop(peer_ice);
2562 let mut peer_candidates = self.peer_candidates.lock().await;
2563 let addrs: Vec<SocketAddr> = candidates.into_iter().map(|cand| cand.addr).collect();
2564 peer_candidates.insert(peer_id, addrs);
2565 }
2566 }
2567 RiftPayload::Control(ControlMessage::IceCheck { session, candidate, .. }) => {
2568 let ack = ControlMessage::IceCheckAck { session, candidate };
2569 let from = header.source;
2570 let _ = self.send_control_to_peer(from, ack, session).await;
2571 }
2572 RiftPayload::Control(ControlMessage::IceCheckAck { candidate, .. }) => {
2573 let peer_id = header.source;
2574 let addr = candidate.addr;
2575 let mut peer_addrs = self.peer_addrs.lock().await;
2576 peer_addrs.insert(peer_id, addr);
2577 drop(peer_addrs);
2578
2579 let mut routes = self.routes.lock().await;
2580 let upgraded = matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. }));
2581 if !matches!(routes.get(&peer_id), Some(PeerRoute::Direct { .. })) {
2582 routes.insert(peer_id, PeerRoute::Direct { addr });
2583 }
2584 drop(routes);
2585
2586 let _ = self
2587 .events_tx
2588 .send(MeshEvent::RouteUpdated {
2589 peer_id,
2590 route: PeerRoute::Direct { addr },
2591 })
2592 .await;
2593 if upgraded {
2594 let _ = self.events_tx.send(MeshEvent::RouteUpgraded(peer_id)).await;
2595 }
2596 }
2597 RiftPayload::Control(ControlMessage::KeyInit { .. })
2598 | RiftPayload::Control(ControlMessage::KeyResp { .. }) => {
2599 tracing::debug!(target: "security", "legacy e2ee control message ignored");
2600 }
2601 RiftPayload::Control(ControlMessage::CapabilitiesUpdate(capabilities)) => {
2602 let peer_id = header.source;
2603 self.handle_capabilities(peer_id, capabilities).await?;
2604 }
2605 RiftPayload::Control(ControlMessage::Leave { peer_id }) => {
2606 let mut peers = self.peers_by_id.lock().await;
2607 peers.remove(&peer_id);
2608 drop(peers);
2609 let mut peer_caps = self.peer_caps.lock().await;
2610 peer_caps.remove(&peer_id);
2611 drop(peer_caps);
2612 let mut peer_keys = self.peer_public_keys.lock().await;
2613 peer_keys.remove(&peer_id);
2614 drop(peer_keys);
2615 let mut routes = self.routes.lock().await;
2616 routes.remove(&peer_id);
2617 let removed: Vec<PeerId> = routes
2618 .iter()
2619 .filter_map(|(pid, route)| match route {
2620 PeerRoute::Relayed { via } if *via == peer_id => Some(*pid),
2621 _ => None,
2622 })
2623 .collect();
2624 for pid in removed {
2625 routes.remove(&pid);
2626 }
2627 drop(routes);
2628 let mut peer_addrs = self.peer_addrs.lock().await;
2629 peer_addrs.remove(&peer_id);
2630 drop(peer_addrs);
2631 let mut peer_candidates = self.peer_candidates.lock().await;
2632 peer_candidates.remove(&peer_id);
2633 drop(peer_candidates);
2634 let mut peer_ice = self.peer_ice_candidates.lock().await;
2635 peer_ice.remove(&peer_id);
2636 drop(peer_ice);
2637 let mut relay_candidates = self.relay_candidates.lock().await;
2638 relay_candidates.remove(&peer_id);
2639 drop(relay_candidates);
2640 let mut e2ee_keys = self.e2ee_keys.lock().await;
2641 e2ee_keys.retain(|(peer, _), _| *peer != peer_id);
2642 drop(e2ee_keys);
2643 let mut e2ee_pending = self.e2ee_pending.lock().await;
2644 e2ee_pending.retain(|(peer, _), _| *peer != peer_id);
2645 drop(e2ee_pending);
2646 let mut e2ee_ready = self.e2ee_ready.lock().await;
2647 e2ee_ready.retain(|(peer, _), _| *peer != peer_id);
2648 drop(e2ee_ready);
2649 let mut stats = self.link_stats.lock().await;
2650 stats.remove(&peer_id);
2651 drop(stats);
2652 let mut sessions = self.session_mgr.lock().await;
2653 sessions.remove_participant_all(peer_id);
2654 drop(sessions);
2655 let _ = self.update_group_topology(self.channel_session).await;
2656 let active = *self.active_session.lock().await;
2657 if active != SessionId::NONE && active != self.channel_session {
2658 let _ = self.update_group_topology(active).await;
2659 }
2660 let _ = self.events_tx.send(MeshEvent::PeerLeft(peer_id)).await;
2661 self.emit_global_metrics().await;
2662 }
2663 RiftPayload::Control(ControlMessage::Chat(chat)) => {
2664 let mut cache = self.cache.lock().await;
2665 if cache.contains(&chat.id) {
2666 return Ok(());
2667 }
2668 cache.insert(chat.id);
2669 drop(cache);
2670
2671 let _ = self
2672 .events_tx
2673 .send(MeshEvent::ChatReceived(chat.clone()))
2674 .await;
2675 }
2676 RiftPayload::Control(ControlMessage::Ping { nonce, sent_at_ms }) => {
2677 let from = header.source;
2678 let pong = ControlMessage::Pong { nonce, sent_at_ms };
2679 let _ = self.send_control_to_peer(from, pong, SessionId::NONE).await;
2680 }
2681 RiftPayload::Control(ControlMessage::Pong { sent_at_ms, .. }) => {
2682 let from = header.source;
2683 self.update_rtt(from, sent_at_ms).await;
2684 }
2685 RiftPayload::Control(ControlMessage::Auth { token }) => {
2686 let expected = self.auth_token.clone().unwrap_or_default();
2687 let token_valid = if token.len() != expected.len() {
2689 false
2690 } else {
2691 token.ct_eq(&expected).into()
2692 };
2693 if self.auth_required && !token_valid {
2694 tracing::warn!(%addr, "auth token mismatch");
2695 self.disconnect_addr(addr).await;
2696 return Ok(());
2697 }
2698 let mut connections = self.connections.lock().await;
2699 if let Some(conn) = connections.get_mut(&addr) {
2700 conn.authenticated = true;
2701 }
2702 }
2703 RiftPayload::Control(ControlMessage::PeerList { peers }) => {
2704 self.handle_peer_list(addr, peers).await?;
2705 }
2706 RiftPayload::Control(ControlMessage::Group(group)) => {
2707 match group {
2708 GroupControl::Join { session, peer_id, .. } => {
2709 let mut sessions = self.session_mgr.lock().await;
2710 sessions.add_participant(session, peer_id);
2711 drop(sessions);
2712 let _ = self.update_group_topology(session).await;
2713 }
2714 GroupControl::Leave { session, peer_id } => {
2715 let mut sessions = self.session_mgr.lock().await;
2716 sessions.remove_participant(session, peer_id);
2717 drop(sessions);
2718 let _ = self.update_group_topology(session).await;
2719 }
2720 GroupControl::Topology { session, mode } => {
2721 self.set_group_mode(session, mode).await;
2722 let _ = self
2723 .events_tx
2724 .send(MeshEvent::GroupTopology { session, mode })
2725 .await;
2726 }
2727 GroupControl::StreamPublish { .. } | GroupControl::StreamSubscribe { .. } => {}
2728 }
2729 }
2730 RiftPayload::Control(ControlMessage::Call(call)) => {
2731 self.handle_call(call).await?;
2732 }
2733 RiftPayload::Control(ControlMessage::E2eeInit {
2734 session,
2735 from,
2736 public_key,
2737 signature,
2738 }) => {
2739 self.handle_e2ee_init(from, session, public_key, signature).await?;
2740 }
2741 RiftPayload::Control(ControlMessage::E2eeResp {
2742 session,
2743 from,
2744 public_key,
2745 signature,
2746 }) => {
2747 self.handle_e2ee_resp(from, session, public_key, signature).await?;
2748 }
2749 RiftPayload::Control(ControlMessage::EncryptedReady { session, .. }) => {
2750 let peer_id = header.source;
2751 let mut ready = self.e2ee_ready.lock().await;
2752 let entry = ready
2753 .entry((peer_id, session))
2754 .or_insert(E2eeReadyState {
2755 local_ready: false,
2756 remote_ready: false,
2757 });
2758 entry.remote_ready = true;
2759 }
2760 RiftPayload::Voice(VoicePacket { codec_id, payload }) => {
2761 let from = header.source;
2762 let seq = header.seq;
2763 let timestamp = header.timestamp;
2764 let session = header.session;
2765 let codec = codec_id;
2766 let active_session = *self.active_session.lock().await;
2767 if active_session != SessionId::NONE && session != active_session {
2768 return Ok(());
2769 }
2770 let mut seqs = self.voice_seq.lock().await;
2771 if let Some(last) = seqs.get(&from) {
2772 if seq <= *last {
2773 return Ok(());
2774 }
2775 }
2776 seqs.insert(from, seq);
2777 drop(seqs);
2778 if self.should_forward_voice(session, from).await {
2779 let payload = RiftPayload::Voice(VoicePacket {
2780 codec_id: codec,
2781 payload: payload.clone(),
2782 });
2783 let _ = self.forward_group_voice(&header, &payload).await;
2784 }
2785
2786 let _ = self
2787 .events_tx
2788 .send(MeshEvent::VoiceFrame {
2789 from,
2790 seq,
2791 timestamp,
2792 session,
2793 codec,
2794 payload: payload.clone(),
2795 })
2796 .await;
2797 }
2798 RiftPayload::Encrypted(encrypted) => {
2799 let from = header.source;
2800 let session = header.session;
2801 if header.stream == StreamKind::Voice
2802 && self.should_forward_voice(session, from).await
2803 {
2804 let payload = RiftPayload::Encrypted(encrypted.clone());
2805 let _ = self.forward_group_voice(&header, &payload).await;
2806 }
2807 if let Ok(inner) = self.decrypt_payload(&header, encrypted, from, session).await {
2808 Box::pin(self.handle_frame(addr, header, inner)).await?;
2809 } else {
2810 let has_key = {
2811 let keys = self.e2ee_keys.lock().await;
2812 keys.contains_key(&(from, session))
2813 };
2814 let key_fingerprint = {
2815 let keys = self.e2ee_keys.lock().await;
2816 keys.get(&(from, session))
2817 .map(|key| hex::encode(&sha2::Sha256::digest(key)[..4]))
2818 };
2819 let ready = {
2820 let ready = self.e2ee_ready.lock().await;
2821 ready
2822 .get(&(from, session))
2823 .map(|state| state.local_ready && state.remote_ready)
2824 .unwrap_or(false)
2825 };
2826 tracing::warn!(
2827 %addr,
2828 peer = %from,
2829 session = ?session,
2830 has_key,
2831 key = key_fingerprint.as_deref().unwrap_or("none"),
2832 ready,
2833 "e2ee decrypt failed"
2834 );
2835 }
2836 }
2837 RiftPayload::Relay { target, inner } => {
2838 if target == self.identity.peer_id {
2839 Box::pin(self.handle_frame(addr, header, *inner)).await?;
2840 } else {
2841 self.forward_relay(target, header, *inner).await?;
2842 }
2843 }
2844 RiftPayload::Text(chat) => {
2845 let mut cache = self.cache.lock().await;
2846 if cache.contains(&chat.id) {
2847 return Ok(());
2848 }
2849 cache.insert(chat.id);
2850 drop(cache);
2851 let _ = self
2852 .events_tx
2853 .send(MeshEvent::ChatReceived(chat.clone()))
2854 .await;
2855 }
2856 RiftPayload::Control(ControlMessage::RouteInfo { .. })
2857 | RiftPayload::Control(ControlMessage::Capabilities(_)) => {}
2858 }
2859 Ok(())
2860 }
2861
2862 async fn handle_peer_list(self: Arc<Self>, addr: SocketAddr, peers: Vec<PeerInfo>) -> Result<()> {
2863 for peer in &peers {
2865 if peer.peer_id == self.identity.peer_id {
2866 continue;
2867 }
2868 let mut sessions = self.session_mgr.lock().await;
2869 sessions.add_participant(self.channel_session, peer.peer_id);
2870 }
2871 let _ = self.update_group_topology(self.channel_session).await;
2873
2874 let nat_cfg = { self.nat_cfg.lock().await.clone() };
2875 let Some(nat_cfg) = nat_cfg else {
2876 return Ok(());
2877 };
2878
2879 let relay_peer_id = {
2880 let connections = self.connections.lock().await;
2881 connections.get(&addr).and_then(|conn| conn.peer_id)
2882 };
2883 let relay_capable = if let Some(peer_id) = relay_peer_id {
2884 let peer_caps = self.peer_caps.lock().await;
2885 peer_caps.get(&peer_id).copied().unwrap_or(false)
2886 } else {
2887 false
2888 };
2889
2890 for peer in peers {
2891 if peer.peer_id == self.identity.peer_id {
2892 continue;
2893 }
2894 let already = {
2895 let peers_by_id = self.peers_by_id.lock().await;
2896 peers_by_id.contains_key(&peer.peer_id)
2897 };
2898 let mut peer_caps = self.peer_caps.lock().await;
2899 peer_caps.insert(peer.peer_id, peer.relay_capable);
2900 drop(peer_caps);
2901 let mut addrs = if peer.addrs.is_empty() {
2902 vec![peer.addr]
2903 } else {
2904 peer.addrs.clone()
2905 };
2906 addrs.sort();
2907 addrs.dedup();
2908 let primary_addr = addrs[0];
2909 let mut peer_addrs = self.peer_addrs.lock().await;
2910 peer_addrs.insert(peer.peer_id, primary_addr);
2911 drop(peer_addrs);
2912 let mut peer_candidates = self.peer_candidates.lock().await;
2913 peer_candidates.insert(peer.peer_id, addrs.clone());
2914 drop(peer_candidates);
2915
2916 if already {
2917 continue;
2918 }
2919 let already = {
2920 let connections = self.connections.lock().await;
2921 connections.contains_key(&primary_addr)
2922 };
2923 if already {
2924 continue;
2925 }
2926 let endpoint = PeerEndpoint {
2927 peer_id: PeerId([0u8; 32]),
2928 external_addrs: addrs.clone(),
2929 punch_ports: addrs.iter().map(|addr| addr.port()).collect(),
2930 };
2931 if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
2932 if let Ok(socket_idx) = self.add_socket(socket).await {
2933 if let Err(err) = self.initiate_handshake(remote, socket_idx).await {
2934 tracing::warn!("handshake to {} failed: {err}", remote);
2935 }
2936 }
2937 } else if relay_capable {
2938 if let Some(relay_peer_id) = relay_peer_id {
2939 let mut relay_candidates = self.relay_candidates.lock().await;
2940 relay_candidates.insert(peer.peer_id, relay_peer_id);
2941 drop(relay_candidates);
2942 let mut routes = self.routes.lock().await;
2943 if !matches!(routes.get(&peer.peer_id), Some(PeerRoute::Direct { .. })) {
2944 routes.insert(peer.peer_id, PeerRoute::Relayed { via: relay_peer_id });
2945 drop(routes);
2946 let _ = self
2947 .events_tx
2948 .send(MeshEvent::RouteUpdated {
2949 peer_id: peer.peer_id,
2950 route: PeerRoute::Relayed { via: relay_peer_id },
2951 })
2952 .await;
2953 tracing::info!(
2954 peer = %peer.peer_id,
2955 relay = %relay_peer_id,
2956 "relay route established"
2957 );
2958 }
2959 }
2960 }
2961 }
2962
2963 Ok(())
2964 }
2965
2966 async fn start_call(&self, to: PeerId, rndzv_srt_uri: Option<String>) -> Result<SessionId> {
2967 let session = SessionId::random();
2968 tracing::info!(to = %to, session = ?session, "call start");
2969 {
2970 let mut sessions = self.session_mgr.lock().await;
2971 sessions.set_state(session, CallState::Ringing);
2972 sessions.add_participant(session, self.identity.peer_id);
2973 sessions.add_participant(session, to);
2974 }
2975 self.emit_global_metrics().await;
2976 let call = CallControl::Invite {
2977 session,
2978 from: self.identity.peer_id,
2979 to,
2980 display_name: None,
2981 rndzv_srt_uri,
2982 };
2983 self.send_call_to_peer(to, call, session).await?;
2984 Ok(session)
2985 }
2986
2987 async fn accept_call(&self, session: SessionId) -> Result<()> {
2988 tracing::info!(session = ?session, "call accept");
2989 {
2990 let mut sessions = self.session_mgr.lock().await;
2991 sessions.set_state(session, CallState::Active);
2992 sessions.add_participant(session, self.identity.peer_id);
2993 }
2994 self.emit_global_metrics().await;
2995 let participants = {
2996 let sessions = self.session_mgr.lock().await;
2997 sessions.participants(session)
2998 };
2999 for peer_id in participants {
3000 if peer_id == self.identity.peer_id {
3001 continue;
3002 }
3003 let _ = self.start_e2ee(peer_id, session).await;
3004 let call = CallControl::Accept {
3005 session,
3006 from: self.identity.peer_id,
3007 };
3008 let _ = self.send_call_to_peer(peer_id, call, session).await;
3009 }
3010 let mut active = self.active_session.lock().await;
3011 *active = session;
3012 Ok(())
3013 }
3014
3015 async fn decline_call(&self, session: SessionId, reason: Option<String>) -> Result<()> {
3016 tracing::info!(session = ?session, ?reason, "call decline");
3017 {
3018 let mut sessions = self.session_mgr.lock().await;
3019 sessions.set_state(session, CallState::Ended);
3020 }
3021 self.emit_global_metrics().await;
3022 let participants = {
3023 let sessions = self.session_mgr.lock().await;
3024 sessions.participants(session)
3025 };
3026 for peer_id in participants {
3027 if peer_id == self.identity.peer_id {
3028 continue;
3029 }
3030 let call = CallControl::Decline {
3031 session,
3032 from: self.identity.peer_id,
3033 reason: reason.clone(),
3034 };
3035 let _ = self.send_call_to_peer(peer_id, call, session).await;
3036 }
3037 let mut active = self.active_session.lock().await;
3038 if *active == session {
3039 *active = self.channel_session;
3040 }
3041 Ok(())
3042 }
3043
3044 async fn end_call(&self, session: SessionId) -> Result<()> {
3045 tracing::info!(session = ?session, "call end");
3046 {
3047 let mut sessions = self.session_mgr.lock().await;
3048 sessions.set_state(session, CallState::Ended);
3049 }
3050 self.emit_global_metrics().await;
3051 let participants = {
3052 let sessions = self.session_mgr.lock().await;
3053 sessions.participants(session)
3054 };
3055 for peer_id in participants {
3056 if peer_id == self.identity.peer_id {
3057 continue;
3058 }
3059 let call = CallControl::Bye {
3060 session,
3061 from: self.identity.peer_id,
3062 };
3063 let _ = self.send_call_to_peer(peer_id, call, session).await;
3064 }
3065 let mut active = self.active_session.lock().await;
3066 if *active == session {
3067 *active = self.channel_session;
3068 }
3069 Ok(())
3070 }
3071
3072 async fn send_call_to_peer(
3073 &self,
3074 peer_id: PeerId,
3075 call: CallControl,
3076 session: SessionId,
3077 ) -> Result<()> {
3078 let payload = RiftPayload::Control(ControlMessage::Call(call));
3079 let routes = self.routes_snapshot().await;
3080 let Some(route) = routes.get(&peer_id).cloned() else {
3081 return Err(anyhow!("missing route"));
3082 };
3083 let seq = self.next_control_seq().await;
3084 self.send_to_peer(peer_id, route, payload, seq, now_timestamp(), session)
3085 .await
3086 }
3087
3088 async fn handle_call(&self, call: CallControl) -> Result<()> {
3089 match call {
3090 CallControl::Invite {
3091 session,
3092 from,
3093 to,
3094 rndzv_srt_uri,
3095 ..
3096 } => {
3097 if to != self.identity.peer_id {
3098 return Ok(());
3099 }
3100 {
3101 let mut sessions = self.session_mgr.lock().await;
3102 sessions.set_state(session, CallState::Ringing);
3103 sessions.add_participant(session, from);
3104 sessions.add_participant(session, to);
3105 }
3106 let _ = self.update_group_topology(session).await;
3107 let _ = self
3108 .events_tx
3109 .send(MeshEvent::IncomingCall {
3110 session,
3111 from,
3112 rndzv_srt_uri,
3113 })
3114 .await;
3115 }
3116 CallControl::Accept { session, from } => {
3117 {
3118 let mut sessions = self.session_mgr.lock().await;
3119 sessions.set_state(session, CallState::Active);
3120 sessions.add_participant(session, from);
3121 sessions.add_participant(session, self.identity.peer_id);
3122 }
3123 let _ = self.update_group_topology(session).await;
3124 let mut active = self.active_session.lock().await;
3125 *active = session;
3126 let _ = self.start_e2ee(from, session).await;
3127 let _ = self
3128 .events_tx
3129 .send(MeshEvent::CallAccepted { session, from })
3130 .await;
3131 }
3132 CallControl::Decline { session, from, reason } => {
3133 {
3134 let mut sessions = self.session_mgr.lock().await;
3135 sessions.set_state(session, CallState::Ended);
3136 }
3137 let mut active = self.active_session.lock().await;
3138 if *active == session {
3139 *active = self.channel_session;
3140 }
3141 let _ = self
3142 .events_tx
3143 .send(MeshEvent::CallDeclined { session, from, reason })
3144 .await;
3145 }
3146 CallControl::Bye { session, .. } => {
3147 {
3148 let mut sessions = self.session_mgr.lock().await;
3149 sessions.set_state(session, CallState::Ended);
3150 }
3151 let mut active = self.active_session.lock().await;
3152 if *active == session {
3153 *active = self.channel_session;
3154 }
3155 let _ = self
3156 .events_tx
3157 .send(MeshEvent::CallEnded { session })
3158 .await;
3159 }
3160 CallControl::Mute { .. } => {}
3161 CallControl::SessionInfo { session, participants } => {
3162 let mut sessions = self.session_mgr.lock().await;
3163 for peer in participants {
3164 sessions.add_participant(session, peer);
3165 }
3166 drop(sessions);
3167 let _ = self.update_group_topology(session).await;
3168 }
3169 }
3170 Ok(())
3171 }
3172
3173 async fn disconnect_addr(&self, addr: SocketAddr) {
3174 let peer_id = {
3175 let mut connections = self.connections.lock().await;
3176 connections.remove(&addr).and_then(|conn| conn.peer_id)
3177 };
3178 let Some(peer_id) = peer_id else {
3179 return;
3180 };
3181 let mut peers = self.peers_by_id.lock().await;
3182 peers.remove(&peer_id);
3183 drop(peers);
3184 let mut routes = self.routes.lock().await;
3185 routes.remove(&peer_id);
3186 drop(routes);
3187 let mut peer_addrs = self.peer_addrs.lock().await;
3188 peer_addrs.remove(&peer_id);
3189 drop(peer_addrs);
3190 let mut peer_candidates = self.peer_candidates.lock().await;
3191 peer_candidates.remove(&peer_id);
3192 drop(peer_candidates);
3193 let mut e2ee_keys = self.e2ee_keys.lock().await;
3194 e2ee_keys.retain(|(peer, _), _| *peer != peer_id);
3195 drop(e2ee_keys);
3196 let mut pending = self.e2ee_pending.lock().await;
3197 pending.retain(|(peer, _), _| *peer != peer_id);
3198 drop(pending);
3199 let _ = self.events_tx.send(MeshEvent::PeerLeft(peer_id)).await;
3200 }
3201
3202 async fn socket_by_idx(&self, socket_idx: usize) -> Result<MeshSocket> {
3203 let sockets = self.sockets.lock().await;
3204 sockets
3205 .get(socket_idx)
3206 .cloned()
3207 .ok_or_else(|| anyhow!("missing socket"))
3208 }
3209
3210 async fn send_raw(&self, socket_idx: usize, addr: SocketAddr, data: &[u8]) -> Result<()> {
3211 let socket = self.socket_by_idx(socket_idx).await?;
3212 match socket {
3213 MeshSocket::Udp(socket) => {
3214 socket.send_to(data, addr).await?;
3215 Ok(())
3216 }
3217 MeshSocket::Turn(relay) => {
3218 relay.send_to(addr, data).await?;
3219 Ok(())
3220 }
3221 }
3222 }
3223
3224 async fn broadcast_voice(
3225 &self,
3226 _from: PeerId,
3227 seq: u32,
3228 timestamp: u64,
3229 payload: Vec<u8>,
3230 ) -> Result<()> {
3231 let codec = *self.group_codec.lock().await;
3232 let msg = RiftPayload::Voice(VoicePacket { codec_id: codec, payload });
3233 let session = *self.active_session.lock().await;
3234 let mode = self.current_group_mode(session).await;
3235 if let GroupMode::Hybrid { forwarder } = mode {
3236 if forwarder != self.identity.peer_id {
3237 let routes = self.routes_snapshot().await;
3238 if let Some(route) = routes.get(&forwarder).cloned() {
3239 let _ = self
3240 .send_to_peer(forwarder, route, msg, seq, timestamp, session)
3241 .await;
3242 }
3243 return Ok(());
3244 }
3245 }
3246
3247 let routes = self.routes_snapshot().await;
3248 for (peer_id, route) in routes {
3249 if peer_id == self.identity.peer_id {
3250 continue;
3251 }
3252 if let Err(err) = self
3253 .send_to_peer(peer_id, route, msg.clone(), seq, timestamp, session)
3254 .await
3255 {
3256 tracing::debug!(peer = %peer_id, "voice send failed: {err}");
3257 }
3258 }
3259 Ok(())
3260 }
3261
3262 async fn routes_snapshot(&self) -> HashMap<PeerId, PeerRoute> {
3263 self.routes.lock().await.clone()
3264 }
3265
3266 async fn send_to_peer(
3267 &self,
3268 peer_id: PeerId,
3269 route: PeerRoute,
3270 payload: RiftPayload,
3271 seq: u32,
3272 timestamp: u64,
3273 session: SessionId,
3274 ) -> Result<()> {
3275 match route {
3276 PeerRoute::Direct { addr } => self
3277 .send_payload(addr, payload, seq, timestamp, session, Some(peer_id))
3278 .await,
3279 PeerRoute::Relayed { via } => {
3280 let relay_addr = {
3281 let peers = self.peers_by_id.lock().await;
3282 peers.get(&via).copied()
3283 };
3284 let Some(relay_addr) = relay_addr else {
3285 return Err(anyhow!("missing relay addr"));
3286 };
3287 let envelope = RiftPayload::Relay {
3288 target: peer_id,
3289 inner: Box::new(payload),
3290 };
3291 self.send_payload(relay_addr, envelope, seq, timestamp, session, Some(peer_id))
3292 .await
3293 }
3294 }
3295 }
3296
3297 async fn forward_relay(
3298 &self,
3299 target: PeerId,
3300 header: RiftFrameHeader,
3301 inner: RiftPayload,
3302 ) -> Result<()> {
3303 let route = {
3304 let routes = self.routes.lock().await;
3305 routes.get(&target).cloned()
3306 };
3307 let Some(route) = route else {
3308 return Ok(());
3309 };
3310 match route {
3311 PeerRoute::Direct { addr } => {
3312 let envelope = RiftPayload::Relay {
3313 target,
3314 inner: Box::new(inner),
3315 };
3316 self.send_payload_with_source(
3317 addr,
3318 envelope,
3319 header.seq,
3320 header.timestamp,
3321 header.source,
3322 header.session,
3323 Some(target),
3324 )
3325 .await?;
3326 }
3327 PeerRoute::Relayed { via } => {
3328 let relay_addr = {
3329 let peers = self.peers_by_id.lock().await;
3330 peers.get(&via).copied()
3331 };
3332 if let Some(relay_addr) = relay_addr {
3333 let envelope = RiftPayload::Relay {
3334 target,
3335 inner: Box::new(inner),
3336 };
3337 self.send_payload_with_source(
3338 relay_addr,
3339 envelope,
3340 header.seq,
3341 header.timestamp,
3342 header.source,
3343 header.session,
3344 Some(target),
3345 )
3346 .await?;
3347 }
3348 }
3349 }
3350 Ok(())
3351 }
3352}
3353
3354impl MeshHandle {
3355 pub async fn broadcast_voice(&self, seq: u32, timestamp: u64, payload: Vec<u8>) -> Result<()> {
3357 self.inner
3358 .broadcast_voice(self.inner.identity.peer_id, seq, timestamp, payload)
3359 .await
3360 }
3361
3362 pub async fn broadcast_chat(&self, text: String) -> Result<()> {
3364 self.inner.broadcast_chat(text).await
3365 }
3366
3367 pub async fn start_call(&self, to: PeerId) -> Result<SessionId> {
3369 self.inner.start_call(to, None).await
3370 }
3371
3372 pub async fn start_call_with_srt(
3374 &self,
3375 to: PeerId,
3376 rndzv_srt_uri: Option<String>,
3377 ) -> Result<SessionId> {
3378 self.inner.start_call(to, rndzv_srt_uri).await
3379 }
3380
3381 pub async fn accept_call(&self, session: SessionId) -> Result<()> {
3383 self.inner.accept_call(session).await
3384 }
3385
3386 pub async fn decline_call(&self, session: SessionId, reason: Option<String>) -> Result<()> {
3388 self.inner.decline_call(session, reason).await
3389 }
3390
3391 pub async fn end_call(&self, session: SessionId) -> Result<()> {
3393 self.inner.end_call(session).await
3394 }
3395
3396 pub async fn set_preferred_codecs(&self, codecs: Vec<CodecId>) {
3398 self.inner.set_preferred_codecs(codecs).await;
3399 }
3400
3401 pub async fn set_preferred_features(&self, features: Vec<FeatureFlag>) {
3403 self.inner.set_preferred_features(features).await;
3404 }
3405
3406 pub async fn peer_session_config(&self, peer_id: PeerId) -> Option<SessionConfig> {
3408 let sessions = self.inner.peer_session.lock().await;
3409 sessions.get(&peer_id).cloned()
3410 }
3411
3412 pub async fn group_codec(&self) -> CodecId {
3414 *self.inner.group_codec.lock().await
3415 }
3416
3417 pub async fn connect_addr(&self, addr: SocketAddr) -> Result<()> {
3419 self.inner.initiate_handshake(addr, 0).await
3420 }
3421
3422 pub async fn connect_with_socket(&self, socket: UdpSocket, addr: SocketAddr) -> Result<()> {
3424 let socket_idx = self.inner.add_socket(socket).await?;
3425 self.inner.initiate_handshake(addr, socket_idx).await
3426 }
3427
3428 pub async fn disconnect_peer(&self, peer_id: PeerId) {
3430 let addr = {
3431 let peers = self.inner.peers_by_id.lock().await;
3432 peers.get(&peer_id).copied()
3433 };
3434 if let Some(addr) = addr {
3435 self.inner.disconnect_addr(addr).await;
3436 }
3437 }
3438
3439 #[cfg(feature = "predictive-rendezvous")]
3440 pub async fn run_rendezvous(
3445 &self,
3446 cfg: RendezvousConfig,
3447 ) -> Result<RendezvousResult, RendezvousError> {
3448 let rendezvous_id = rendezvous_id_from_seed(&cfg.token.seed);
3449 let (tx, mut rx) = mpsc::channel::<(SocketAddr, ParsedProbe)>(64);
3450 self.inner
3451 .register_pr_session(rendezvous_id, tx)
3452 .await
3453 .map_err(RendezvousError::Mesh)?;
3454
3455 struct PrGuard {
3456 inner: Arc<MeshInner>,
3457 rendezvous_id: u64,
3458 }
3459
3460 impl Drop for PrGuard {
3461 fn drop(&mut self) {
3462 let inner = self.inner.clone();
3463 let rendezvous_id = self.rendezvous_id;
3464 tokio::spawn(async move {
3465 inner.unregister_pr_session(rendezvous_id).await;
3466 });
3467 }
3468 }
3469
3470 let _guard = PrGuard {
3471 inner: self.inner.clone(),
3472 rendezvous_id,
3473 };
3474
3475 let mut state = RendezvousState::new(cfg.token.clone(), cfg.role);
3476 let mut last_sent_slot: Option<u64> = None;
3477 let mut last_local_offset: Option<u16> = None;
3478 let mut local_offsets = Vec::new();
3479 let mut metrics = RendezvousMetrics::new();
3480 let start_instant = TokioInstant::now();
3481 let deadline = TokioInstant::now() + cfg.max_duration;
3482
3483 loop {
3484 if TokioInstant::now() >= deadline {
3485 metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3486 metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3487 tracing::debug!(
3488 target = "predictive_rendezvous",
3489 rendezvous_id,
3490 slots_attempted = metrics.slots_attempted,
3491 probes_sent = metrics.probes_sent,
3492 probes_received = metrics.probes_received,
3493 "rendezvous timeout"
3494 );
3495 return Err(RendezvousError::Timeout);
3496 }
3497
3498 let now_ms = now_timestamp();
3499 let t0_ms = cfg.token.time_model.t0.saturating_mul(1_000);
3500 let window_ms = cfg.token.time_model.window_secs.saturating_mul(1_000);
3501 let end_ms = t0_ms.saturating_add(window_ms);
3502
3503 if cfg.token.time_model.slot_ms == 0 {
3504 return Err(RendezvousError::InvalidConfig("slot_ms must be > 0"));
3505 }
3506 if now_ms >= end_ms {
3507 metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3508 metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3509 tracing::debug!(
3510 target = "predictive_rendezvous",
3511 rendezvous_id,
3512 slots_attempted = metrics.slots_attempted,
3513 "rendezvous window closed"
3514 );
3515 return Err(RendezvousError::WindowClosed);
3516 }
3517
3518 if let Some(slot) = compute_slot_params(
3519 &cfg.token.seed,
3520 &cfg.token.time_model,
3521 cfg.role,
3522 now_ms,
3523 ) {
3524 if last_sent_slot != Some(slot.slot_index) {
3525 last_sent_slot = Some(slot.slot_index);
3526 last_local_offset = Some(slot.local_port_offset);
3527 local_offsets.push(slot.local_port_offset);
3528 metrics.slots_attempted = metrics.slots_attempted.saturating_add(1);
3529 state.record_sent_slot(&slot);
3530
3531 tracing::debug!(
3532 target = "predictive_rendezvous",
3533 rendezvous_id,
3534 slot_index = slot.slot_index,
3535 local_port_offset = slot.local_port_offset,
3536 remote_port_offset = slot.remote_port_offset,
3537 "rendezvous slot emit"
3538 );
3539
3540 let payload = build_probe_payload(ProbePayload {
3541 rendezvous_id,
3542 slot_index: slot.slot_index,
3543 sender_fingerprint: cfg.sender_fingerprint,
3544 });
3545
3546 for addr in &cfg.potential_remote_addrs {
3547 self.inner
3548 .send_raw(0, *addr, &payload)
3549 .await
3550 .map_err(RendezvousError::Mesh)?;
3551 metrics.probes_sent = metrics.probes_sent.saturating_add(1);
3552 }
3553 }
3554 }
3555
3556 let next_wake_ms = if now_ms < t0_ms {
3557 t0_ms
3558 } else {
3559 let slot_index = (now_ms - t0_ms) / cfg.token.time_model.slot_ms;
3560 t0_ms + (slot_index + 1) * cfg.token.time_model.slot_ms
3561 };
3562
3563 let sleep_ms = next_wake_ms.saturating_sub(now_ms).max(1);
3564 let wake_at = TokioInstant::now() + Duration::from_millis(sleep_ms);
3565
3566 tokio::select! {
3567 _ = tokio::time::sleep_until(deadline) => {
3568 return Err(RendezvousError::Timeout);
3569 }
3570 _ = tokio::time::sleep_until(wake_at) => {
3571 continue;
3572 }
3573 recv = rx.recv() => {
3574 let Some((addr, probe)) = recv else {
3575 metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3576 metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3577 return Err(RendezvousError::ChannelClosed);
3578 };
3579 metrics.probes_received = metrics.probes_received.saturating_add(1);
3580 let outcome = state.record_received_probe(addr, &probe);
3581 if let RendezvousOutcome::Succeeded { remote_addr, slot_index } = outcome {
3582 let base_port = self.inner.primary_local_port().await.map_err(RendezvousError::Mesh)?;
3583 let local_port = last_local_offset
3584 .map(|offset| base_port.wrapping_add(offset))
3585 .unwrap_or(base_port);
3586 metrics.slot_index_success = Some(slot_index);
3587 metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3588 metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3589 tracing::debug!(
3590 target = "predictive_rendezvous",
3591 rendezvous_id,
3592 slot_index,
3593 local_port,
3594 "rendezvous succeeded"
3595 );
3596 return Ok(RendezvousResult {
3597 remote_addr,
3598 local_port,
3599 slot_index,
3600 metrics,
3601 });
3602 }
3603 }
3604 }
3605 }
3606 }
3607}
3608
3609#[cfg(feature = "predictive-rendezvous")]
3610#[derive(Debug, Clone)]
3612pub struct RendezvousConfig {
3613 pub token: SemanticRendezvousToken,
3614 pub role: Role,
3615 pub potential_remote_addrs: Vec<SocketAddr>,
3616 pub max_duration: Duration,
3617 pub sender_fingerprint: [u8; 16],
3618}
3619
3620#[cfg(feature = "predictive-rendezvous")]
3621#[derive(Debug, Clone, PartialEq, Eq)]
3623pub struct RendezvousResult {
3624 pub remote_addr: SocketAddr,
3625 pub local_port: u16,
3626 pub slot_index: u64,
3627 pub metrics: RendezvousMetrics,
3628}
3629
3630#[cfg(feature = "predictive-rendezvous")]
3631#[derive(Debug)]
3632pub enum RendezvousError {
3633 Timeout,
3634 WindowClosed,
3635 ChannelClosed,
3636 InvalidConfig(&'static str),
3637 Mesh(anyhow::Error),
3638}
3639
3640#[cfg(feature = "predictive-rendezvous")]
3641impl std::fmt::Display for RendezvousError {
3642 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3643 match self {
3644 RendezvousError::Timeout => write!(f, "rendezvous timed out"),
3645 RendezvousError::WindowClosed => write!(f, "rendezvous window closed"),
3646 RendezvousError::ChannelClosed => write!(f, "rendezvous channel closed"),
3647 RendezvousError::InvalidConfig(msg) => write!(f, "invalid rendezvous config: {msg}"),
3648 RendezvousError::Mesh(err) => write!(f, "mesh error: {err}"),
3649 }
3650 }
3651}
3652
3653#[cfg(feature = "predictive-rendezvous")]
3654impl std::error::Error for RendezvousError {}
3655
3656#[cfg(feature = "predictive-rendezvous")]
3657fn derive_nat_hint(offsets: &[u16]) -> rift_rndzv::NatBehaviorHint {
3658 if offsets.len() < 2 {
3659 return rift_rndzv::NatBehaviorHint::Unknown;
3660 }
3661 let mut unique = std::collections::HashSet::new();
3662 for offset in offsets {
3663 unique.insert(*offset);
3664 }
3665 if unique.len() == 1 {
3666 rift_rndzv::NatBehaviorHint::PortPreserving
3667 } else {
3668 rift_rndzv::NatBehaviorHint::HighVariance
3669 }
3670}
3671
3672fn now_timestamp() -> u64 {
3673 SystemTime::now()
3674 .duration_since(UNIX_EPOCH)
3675 .map(|d| d.as_millis() as u64)
3676 .unwrap_or(0)
3677}
3678
3679fn stream_for_payload(payload: &RiftPayload) -> StreamKind {
3680 match payload {
3681 RiftPayload::Control(_) => StreamKind::Control,
3682 RiftPayload::Voice(_) => StreamKind::Voice,
3683 RiftPayload::Text(_) => StreamKind::Text,
3684 RiftPayload::Relay { inner, .. } => stream_for_payload(inner),
3685 RiftPayload::Encrypted(_) => StreamKind::Control,
3686 }
3687}
3688
3689fn should_encrypt(payload: &RiftPayload) -> bool {
3690 match payload {
3691 RiftPayload::Voice(_) => true,
3692 RiftPayload::Text(_) => true,
3693 RiftPayload::Control(ControlMessage::Chat(_)) => true,
3694 RiftPayload::Relay { .. } => false,
3695 RiftPayload::Encrypted(_) => false,
3696 RiftPayload::Control(_) => false,
3697 }
3698}
3699
3700fn encrypt_payload_with_key(
3701 key: &[u8; 32],
3702 header: &RiftFrameHeader,
3703 payload: &RiftPayload,
3704) -> Result<EncryptedPayload> {
3705 let plaintext = bincode::serialize(payload)?;
3706 let mut nonce = [0u8; 12];
3707 rand::rngs::OsRng.fill_bytes(&mut nonce);
3708 let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
3709 let aad = bincode::serialize(header)?;
3710 let ciphertext = cipher
3711 .encrypt(Nonce::from_slice(&nonce), Payload { msg: &plaintext, aad: &aad })
3712 .map_err(|_| anyhow!("e2ee encrypt failed"))?;
3713 Ok(EncryptedPayload { nonce, ciphertext })
3714}
3715
3716#[cfg(all(test, feature = "predictive-rendezvous"))]
3717mod pr_tests {
3718 use super::*;
3719 use rift_rndzv::{EscalationPolicy, IdentityConstraints, SearchStrategy, SemanticRendezvousToken, TimeModel};
3720
3721 #[tokio::test]
3722 async fn predictive_rendezvous_succeeds_loopback() {
3723 let cfg = MeshConfig {
3724 channel_name: "pr-test".to_string(),
3725 password: None,
3726 listen_port: 0,
3727 relay_capable: false,
3728 qos: QosProfile::default(),
3729 auth_token: None,
3730 require_auth: false,
3731 e2ee_key: None,
3732 rekey_interval_secs: None,
3733 max_direct_peers: None,
3734 };
3735
3736 let mesh_a = Mesh::new(Identity::generate(), cfg.clone()).await.unwrap();
3737 let mesh_b = Mesh::new(Identity::generate(), cfg.clone()).await.unwrap();
3738
3739 let addr_a = mesh_a.local_addr().await.unwrap();
3740 let addr_b = mesh_b.local_addr().await.unwrap();
3741
3742 let now_secs = SystemTime::now()
3743 .duration_since(UNIX_EPOCH)
3744 .unwrap()
3745 .as_secs();
3746
3747 let token = SemanticRendezvousToken::new(
3748 rift_rndzv::api::RendezvousSpaceId([0u8; 32]),
3749 [7u8; 32],
3750 IdentityConstraints {
3751 allowed_fingerprints: Vec::new(),
3752 },
3753 TimeModel {
3754 t0: now_secs + 1,
3755 window_secs: 3,
3756 slot_ms: 200,
3757 },
3758 SearchStrategy::BasicDeterministic,
3759 EscalationPolicy::None,
3760 );
3761
3762 let cfg_a = RendezvousConfig {
3763 token: token.clone(),
3764 role: Role::Caller,
3765 potential_remote_addrs: vec![addr_b],
3766 max_duration: Duration::from_secs(3),
3767 sender_fingerprint: [1u8; 16],
3768 };
3769
3770 let cfg_b = RendezvousConfig {
3771 token,
3772 role: Role::Callee,
3773 potential_remote_addrs: vec![addr_a],
3774 max_duration: Duration::from_secs(3),
3775 sender_fingerprint: [2u8; 16],
3776 };
3777
3778 let handle_a = mesh_a.handle();
3779 let handle_b = mesh_b.handle();
3780
3781 let (res_a, res_b) = tokio::join!(
3782 handle_a.run_rendezvous(cfg_a),
3783 handle_b.run_rendezvous(cfg_b),
3784 );
3785
3786 assert!(res_a.is_ok(), "caller rendezvous failed: {:?}", res_a);
3787 assert!(res_b.is_ok(), "callee rendezvous failed: {:?}", res_b);
3788 }
3789}
3790
3791fn decrypt_payload_with_key(
3792 key: &[u8; 32],
3793 header: &RiftFrameHeader,
3794 encrypted: EncryptedPayload,
3795) -> Result<RiftPayload> {
3796 let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
3797 let aad = bincode::serialize(header)?;
3798 let plaintext = cipher
3799 .decrypt(Nonce::from_slice(&encrypted.nonce), Payload { msg: &encrypted.ciphertext, aad: &aad })
3800 .map_err(|_| anyhow!("e2ee decrypt failed"))?;
3801 let payload: RiftPayload = bincode::deserialize(&plaintext)?;
3802 Ok(payload)
3803}
3804
3805#[cfg(test)]
3806mod security_tests {
3807 use super::*;
3808
3809 #[test]
3810 fn e2ee_encrypt_decrypt_roundtrip() {
3811 let key = [7u8; 32];
3812 let header = RiftFrameHeader {
3813 version: ProtocolVersion::V1,
3814 stream: StreamKind::Voice,
3815 flags: 0,
3816 seq: 1,
3817 timestamp: 123,
3818 source: PeerId([1u8; 32]),
3819 session: SessionId::NONE,
3820 };
3821 let payload = RiftPayload::Voice(VoicePacket {
3822 codec_id: CodecId::Opus,
3823 payload: vec![1, 2, 3, 4],
3824 });
3825
3826 let encrypted = encrypt_payload_with_key(&key, &header, &payload).unwrap();
3827 let decoded = decrypt_payload_with_key(&key, &header, encrypted).unwrap();
3828 match decoded {
3829 RiftPayload::Voice(pkt) => assert_eq!(pkt.payload, vec![1, 2, 3, 4]),
3830 other => panic!("unexpected payload: {other:?}"),
3831 }
3832 }
3833
3834 #[test]
3835 fn e2ee_aad_mismatch_fails() {
3836 let key = [9u8; 32];
3837 let mut header = RiftFrameHeader {
3838 version: ProtocolVersion::V1,
3839 stream: StreamKind::Text,
3840 flags: 0,
3841 seq: 5,
3842 timestamp: 999,
3843 source: PeerId([2u8; 32]),
3844 session: SessionId::NONE,
3845 };
3846 let payload = RiftPayload::Text(ChatMessage::new(
3847 PeerId([3u8; 32]),
3848 999,
3849 "hi".to_string(),
3850 ));
3851
3852 let encrypted = encrypt_payload_with_key(&key, &header, &payload).unwrap();
3853 header.seq = 6;
3854 let res = decrypt_payload_with_key(&key, &header, encrypted);
3855 assert!(res.is_err());
3856 }
3857}
3858
3859fn default_peer_capabilities() -> Capabilities {
3860 Capabilities {
3861 supported_versions: vec![ProtocolVersion::V1],
3862 audio_codecs: vec![CodecId::Opus],
3863 features: vec![FeatureFlag::Voice, FeatureFlag::Text],
3864 max_bitrate: Some(48_000),
3865 preferred_frame_duration_ms: Some(20),
3866 }
3867}