Skip to main content

rift_mesh/
lib.rs

1//! Mesh routing, session handling, and peer management.
2//!
3//! This module is the heart of Rift's P2P system. It:
4//! - manages UDP sockets (direct + TURN relays)
5//! - negotiates sessions and capabilities
6//! - handles routing (direct vs relayed) and ICE-lite candidates
7//! - performs encryption/decryption and rate limiting
8//! - emits high-level events to clients
9
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime, UNIX_EPOCH, Instant};
14
15use anyhow::{anyhow, Result};
16use tokio::net::UdpSocket;
17use tokio::sync::{mpsc, Mutex};
18use tokio_stream::StreamExt;
19
20use rift_core::{Identity, Invite, PeerId, MessageId};
21use rift_core::e2ee::{
22    derive_e2ee_shared_key, ed25519_public_from_bytes, generate_e2ee_keypair,
23    public_key_from_bytes, sign_e2ee_public, verify_e2ee_public,
24};
25use rift_discovery::{discover_peers, start_mdns_advertisement, DiscoveryConfig, MdnsHandle};
26use rift_nat::{
27    attempt_hole_punch, gather_public_addrs, gather_turn_candidates, NatConfig, PeerEndpoint,
28    TurnRelay,
29};
30use rift_protocol::{
31    decode_frame, encode_frame, CallControl, CallState, Capabilities, ChatMessage, CodecId,
32    ControlMessage, EncryptedPayload, FeatureFlag, GroupControl, GroupMode, IceCandidate,
33    CandidateType, PeerInfo, ProtocolVersion, QosProfile, RiftFrameHeader, RiftPayload, SessionId,
34    StreamKind, VoicePacket,
35};
36use rift_metrics as metrics;
37use aes_gcm::{Aes256Gcm, Key, Nonce, KeyInit};
38use aes_gcm::aead::{Aead, Payload};
39use rand::RngCore;
40use sha2::Digest;
41use subtle::ConstantTimeEq;
42
43#[cfg(feature = "predictive-rendezvous")]
44use rift_rndzv::{
45    build_probe_payload, compute_slot_params, parse_probe_payload, rendezvous_id_from_seed,
46    ParsedProbe, ProbePayload, RendezvousMetrics, RendezvousOutcome, RendezvousState, Role,
47    SemanticRendezvousToken,
48};
49#[cfg(feature = "predictive-rendezvous")]
50use tokio::time::Instant as TokioInstant;
51
52/// Maximum UDP payload size we attempt to parse.
53const MAX_PACKET: usize = 2048;
54/// Window used for per-peer rate limiting.
55const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(1);
56/// Max packets per second before rate limiting triggers.
57const RATE_LIMIT_PKTS_PER_SEC: u32 = 1200;
58/// Maximum peer count for full-mesh before switching to hybrid.
59const GROUP_MESH_MAX: usize = 5;
60
61#[derive(Debug, Clone)]
62pub struct MeshConfig {
63    /// Channel name for session id derivation.
64    pub channel_name: String,
65    /// Optional channel password.
66    pub password: Option<String>,
67    /// UDP listen port.
68    pub listen_port: u16,
69    /// Whether this node can relay for others.
70    pub relay_capable: bool,
71    /// QoS preferences for voice traffic.
72    pub qos: QosProfile,
73    /// Optional auth token for access control.
74    pub auth_token: Option<Vec<u8>>,
75    /// Whether auth is required for peers.
76    pub require_auth: bool,
77    /// Optional pre-shared E2EE key for group encryption.
78    pub e2ee_key: Option<[u8; 32]>,
79    /// Optional rekey interval for E2EE.
80    pub rekey_interval_secs: Option<u64>,
81    /// Maximum number of direct peers before relaying.
82    pub max_direct_peers: Option<usize>,
83}
84
85#[derive(Debug, Clone)]
86pub enum PeerRoute {
87    /// Direct UDP connection to a peer address.
88    Direct { addr: SocketAddr },
89    /// Relayed through another peer.
90    Relayed { via: PeerId },
91}
92
93#[derive(Clone)]
94enum MeshSocket {
95    Udp(Arc<UdpSocket>),
96    Turn(Arc<TurnRelay>),
97}
98
99#[derive(Debug, Clone)]
100pub enum MeshEvent {
101    /// A peer joined the mesh.
102    PeerJoined(PeerId),
103    /// A peer left the mesh.
104    PeerLeft(PeerId),
105    /// Chat message received.
106    ChatReceived(ChatMessage),
107    /// Voice packet received.
108    VoiceFrame {
109        from: PeerId,
110        seq: u32,
111        timestamp: u64,
112        session: SessionId,
113        codec: CodecId,
114        payload: Vec<u8>,
115    },
116    /// Routing info updated for a peer.
117    RouteUpdated { peer_id: PeerId, route: PeerRoute },
118    /// Route upgraded from relay to direct.
119    RouteUpgraded(PeerId),
120    /// Peer capability advertisement received.
121    PeerCapabilities { peer_id: PeerId, capabilities: Capabilities },
122    /// Peer session configuration received.
123    PeerSessionConfig { peer_id: PeerId, codec: CodecId, frame_ms: u16 },
124    GroupCodec(CodecId),
125    StatsUpdate { peer: PeerId, stats: LinkStats, global: GlobalStats },
126    PeerIdentity { peer_id: PeerId, public_key: Vec<u8> },
127    IncomingCall {
128        session: SessionId,
129        from: PeerId,
130        rndzv_srt_uri: Option<String>,
131    },
132    CallAccepted { session: SessionId, from: PeerId },
133    CallDeclined {
134        session: SessionId,
135        from: PeerId,
136        reason: Option<String>,
137    },
138    CallEnded { session: SessionId },
139    GroupTopology { session: SessionId, mode: GroupMode },
140}
141
142#[derive(Debug, Clone, Copy)]
143pub struct LinkStats {
144    /// Round-trip time in milliseconds.
145    pub rtt_ms: f32,
146    /// Loss fraction (0.0 to 1.0).
147    pub loss: f32,
148    /// Jitter in milliseconds.
149    pub jitter_ms: f32,
150}
151
152#[derive(Debug, Clone, Copy)]
153pub struct GlobalStats {
154    /// Number of peers currently connected.
155    pub num_peers: usize,
156    /// Number of active sessions.
157    pub num_sessions: usize,
158    /// Total packets sent.
159    pub packets_sent: u64,
160    /// Total packets received.
161    pub packets_received: u64,
162    /// Total bytes sent.
163    pub bytes_sent: u64,
164    /// Total bytes received.
165    pub bytes_received: u64,
166}
167
168/// High-level mesh controller used by clients.
169pub struct Mesh {
170    inner: Arc<MeshInner>,
171    events_rx: mpsc::Receiver<MeshEvent>,
172    discovery_config: DiscoveryConfig,
173    _mdns: Option<MdnsHandle>,
174}
175
176/// Cloneable handle for sending commands without owning the event stream.
177#[derive(Clone)]
178pub struct MeshHandle {
179    inner: Arc<MeshInner>,
180}
181
182/// Internal state shared across mesh tasks.
183///
184/// Many fields are guarded by mutexes because the mesh runs in multiple async
185/// tasks (receiver, timers, discovery). The fields are grouped roughly by:
186/// - sockets and relay management
187/// - peer metadata and capabilities
188/// - routing, ICE candidates, and NAT state
189/// - traffic stats, rate limiting, and sessions
190struct MeshInner {
191    sockets: Mutex<Vec<MeshSocket>>,
192    turn_relays: Mutex<HashMap<SocketAddr, usize>>,
193    identity: Identity,
194    peers_by_id: Mutex<HashMap<PeerId, SocketAddr>>,
195    peer_caps: Mutex<HashMap<PeerId, bool>>,
196    peer_capabilities: Mutex<HashMap<PeerId, Capabilities>>,
197    peer_public_keys: Mutex<HashMap<PeerId, Vec<u8>>>,
198    peer_session: Mutex<HashMap<PeerId, SessionConfig>>,
199    preferred_codecs: Mutex<Vec<CodecId>>,
200    preferred_features: Mutex<Vec<FeatureFlag>>,
201    routes: Mutex<HashMap<PeerId, PeerRoute>>,
202    peer_addrs: Mutex<HashMap<PeerId, SocketAddr>>,
203    peer_candidates: Mutex<HashMap<PeerId, Vec<SocketAddr>>>,
204    peer_ice_candidates: Mutex<HashMap<PeerId, Vec<IceCandidate>>>,
205    self_candidates: Mutex<Vec<SocketAddr>>,
206    self_ice_candidates: Mutex<Vec<IceCandidate>>,
207    relay_candidates: Mutex<HashMap<PeerId, PeerId>>,
208    connections: Mutex<HashMap<SocketAddr, PeerConnection>>,
209    pending: Mutex<HashMap<PendingKey, PendingHandshake>>,
210    cache: Mutex<HashSet<MessageId>>,
211    voice_seq: Mutex<HashMap<PeerId, u32>>,
212    control_seq: Mutex<u32>,
213    nat_cfg: Mutex<Option<NatConfig>>,
214    qos: QosProfile,
215    link_stats: Mutex<HashMap<PeerId, LinkStatsState>>,
216    peer_traffic: Mutex<HashMap<PeerId, TrafficStats>>,
217    global_traffic: Mutex<TrafficStats>,
218    rate_limits: Mutex<HashMap<SocketAddr, RateLimitState>>,
219    auth_required: bool,
220    auth_token: Option<Vec<u8>>,
221    events_tx: mpsc::Sender<MeshEvent>,
222    relay_capable: bool,
223    session_mgr: Mutex<SessionManager>,
224    active_session: Mutex<SessionId>,
225    channel_session: SessionId,
226    group_codec: Mutex<CodecId>,
227    group_topology: Mutex<HashMap<SessionId, GroupMode>>,
228    candidate_attempts: Mutex<HashMap<PeerId, tokio::time::Instant>>,
229    e2ee_key: Option<[u8; 32]>,
230    e2ee_keys: Mutex<HashMap<(PeerId, SessionId), [u8; 32]>>,
231    e2ee_pending: Mutex<HashMap<(PeerId, SessionId), rift_core::e2ee::E2eeKeypair>>,
232    e2ee_ready: Mutex<HashMap<(PeerId, SessionId), E2eeReadyState>>,
233    rekey_interval_secs: Option<u64>,
234    max_direct_peers: Option<usize>,
235    #[cfg(feature = "predictive-rendezvous")]
236    pr_sessions: Mutex<HashMap<u64, PrSession>>,
237}
238
239#[cfg(feature = "predictive-rendezvous")]
240struct PrSession {
241    tx: mpsc::Sender<(SocketAddr, ParsedProbe)>,
242}
243
244#[derive(Debug, Default, Clone)]
245struct TrafficStats {
246    packets_sent: u64,
247    packets_received: u64,
248    bytes_sent: u64,
249    bytes_received: u64,
250}
251
252#[derive(Debug, Clone, Copy)]
253struct E2eeReadyState {
254    local_ready: bool,
255    remote_ready: bool,
256}
257
258#[derive(Debug, Clone)]
259struct RateLimitState {
260    window_start: Instant,
261    count: u32,
262    last_drop: Instant,
263}
264
265impl RateLimitState {
266    fn new(now: Instant) -> Self {
267        Self {
268            window_start: now,
269            count: 0,
270            last_drop: now,
271        }
272    }
273
274    fn allow(&mut self, now: Instant) -> bool {
275        if now.duration_since(self.window_start) >= RATE_LIMIT_WINDOW {
276            self.window_start = now;
277            self.count = 0;
278        }
279        if self.count >= RATE_LIMIT_PKTS_PER_SEC {
280            return false;
281        }
282        self.count = self.count.saturating_add(1);
283        true
284    }
285}
286
287#[derive(Debug, Clone)]
288/// Negotiated session configuration with a peer.
289pub struct SessionConfig {
290    /// Selected codec.
291    pub codec: CodecId,
292    /// Frame duration in milliseconds.
293    pub frame_ms: u16,
294}
295
296struct PeerConnection {
297    peer_id: Option<PeerId>,
298    session: rift_core::noise::NoiseSession,
299    socket_idx: usize,
300    authenticated: bool,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
304struct PendingKey {
305    socket_idx: usize,
306    addr: SocketAddr,
307}
308
309enum PendingHandshake {
310    InitiatorAwait2(snow::HandshakeState),
311    ResponderAwait3(snow::HandshakeState),
312}
313
314#[derive(Debug)]
315struct SessionManager {
316    sessions: HashMap<SessionId, SessionState>,
317}
318
319#[derive(Debug, Clone)]
320struct SessionState {
321    state: CallState,
322    participants: HashSet<PeerId>,
323}
324
325#[derive(Debug, Clone)]
326struct LinkStatsState {
327    last_seq: Option<u32>,
328    window: VecDeque<bool>,
329    window_size: usize,
330    last_transit_ms: Option<f32>,
331    jitter_ms: f32,
332    rtt_ms: f32,
333    last_emit: tokio::time::Instant,
334}
335
336impl LinkStatsState {
337    fn new() -> Self {
338        Self {
339            last_seq: None,
340            window: VecDeque::with_capacity(128),
341            window_size: 100,
342            last_transit_ms: None,
343            jitter_ms: 0.0,
344            rtt_ms: 0.0,
345            last_emit: tokio::time::Instant::now() - Duration::from_secs(10),
346        }
347    }
348
349    fn update_on_receive(&mut self, seq: u32, sent_ms: u64, arrival_ms: u64) {
350        if let Some(last) = self.last_seq {
351            if seq <= last {
352                return;
353            }
354            let gap = (seq - last).saturating_sub(1);
355            let add = gap.min(self.window_size as u32) as usize;
356            for _ in 0..add {
357                self.push_window(false);
358            }
359        }
360        self.push_window(true);
361        self.last_seq = Some(seq);
362
363        let transit = arrival_ms as f32 - sent_ms as f32;
364        if let Some(prev) = self.last_transit_ms {
365            let d = (transit - prev).abs();
366            self.jitter_ms += (d - self.jitter_ms) / 16.0;
367        }
368        self.last_transit_ms = Some(transit);
369    }
370
371    fn update_rtt(&mut self, rtt_ms: f32) {
372        if self.rtt_ms == 0.0 {
373            self.rtt_ms = rtt_ms;
374        } else {
375            let alpha = 0.1;
376            self.rtt_ms = self.rtt_ms * (1.0 - alpha) + rtt_ms * alpha;
377        }
378    }
379
380    fn push_window(&mut self, received: bool) {
381        if self.window.len() >= self.window_size {
382            self.window.pop_front();
383        }
384        self.window.push_back(received);
385    }
386
387    fn loss_ratio(&self) -> f32 {
388        if self.window.is_empty() {
389            return 0.0;
390        }
391        let received = self.window.iter().filter(|v| **v).count() as f32;
392        let total = self.window.len() as f32;
393        (total - received) / total
394    }
395
396    fn snapshot(&self) -> LinkStats {
397        LinkStats {
398            rtt_ms: self.rtt_ms,
399            loss: self.loss_ratio(),
400            jitter_ms: self.jitter_ms,
401        }
402    }
403}
404
405impl SessionManager {
406    fn new(channel_session: SessionId) -> Self {
407        let mut sessions = HashMap::new();
408        sessions.insert(
409            channel_session,
410            SessionState {
411                state: CallState::Active,
412                participants: HashSet::new(),
413            },
414        );
415        Self { sessions }
416    }
417
418    fn ensure_session(&mut self, session: SessionId) -> &mut SessionState {
419        self.sessions.entry(session).or_insert_with(|| SessionState {
420            state: CallState::Ringing,
421            participants: HashSet::new(),
422        })
423    }
424
425    fn add_participant(&mut self, session: SessionId, peer_id: PeerId) {
426        let state = self.ensure_session(session);
427        state.participants.insert(peer_id);
428    }
429
430    fn remove_participant(&mut self, session: SessionId, peer_id: PeerId) {
431        if let Some(state) = self.sessions.get_mut(&session) {
432            state.participants.remove(&peer_id);
433        }
434    }
435
436    fn remove_participant_all(&mut self, peer_id: PeerId) {
437        for state in self.sessions.values_mut() {
438            state.participants.remove(&peer_id);
439        }
440    }
441
442    fn set_state(&mut self, session: SessionId, state: CallState) {
443        let entry = self.ensure_session(session);
444        entry.state = state;
445    }
446
447    fn participants(&self, session: SessionId) -> Vec<PeerId> {
448        self.sessions
449            .get(&session)
450            .map(|state| state.participants.iter().copied().collect())
451            .unwrap_or_default()
452    }
453}
454
455impl Mesh {
456    /// Create a new mesh instance bound to the configured UDP port.
457    /// This spawns background tasks for receiving, candidate checks, rekeying,
458    /// scaling decisions, and ping-based link stats.
459    pub async fn new(identity: Identity, config: MeshConfig) -> Result<Self> {
460        let addr = SocketAddr::from(([0, 0, 0, 0], config.listen_port));
461        let socket = UdpSocket::bind(addr).await?;
462        let socket = Arc::new(socket);
463        let peer_id = identity.peer_id;
464        let channel_session = SessionId::from_channel(&config.channel_name, config.password.as_deref());
465        let session_mgr = SessionManager::new(channel_session);
466
467        let (events_tx, events_rx) = mpsc::channel(256);
468        let inner = Arc::new(MeshInner {
469            sockets: Mutex::new(vec![MeshSocket::Udp(socket.clone())]),
470            turn_relays: Mutex::new(HashMap::new()),
471            identity,
472            peers_by_id: Mutex::new(HashMap::new()),
473            peer_caps: Mutex::new(HashMap::new()),
474            peer_capabilities: Mutex::new(HashMap::new()),
475            peer_public_keys: Mutex::new(HashMap::new()),
476            peer_session: Mutex::new(HashMap::new()),
477            preferred_codecs: Mutex::new(Vec::new()),
478            preferred_features: Mutex::new(Vec::new()),
479            routes: Mutex::new(HashMap::new()),
480            group_codec: Mutex::new(CodecId::Opus),
481            peer_addrs: Mutex::new(HashMap::new()),
482            peer_candidates: Mutex::new(HashMap::new()),
483            peer_ice_candidates: Mutex::new(HashMap::new()),
484            self_candidates: Mutex::new(Vec::new()),
485            self_ice_candidates: Mutex::new(Vec::new()),
486            relay_candidates: Mutex::new(HashMap::new()),
487            connections: Mutex::new(HashMap::new()),
488            pending: Mutex::new(HashMap::new()),
489            cache: Mutex::new(HashSet::new()),
490            voice_seq: Mutex::new(HashMap::new()),
491            control_seq: Mutex::new(0),
492            nat_cfg: Mutex::new(None),
493            qos: config.qos,
494            link_stats: Mutex::new(HashMap::new()),
495            peer_traffic: Mutex::new(HashMap::new()),
496            global_traffic: Mutex::new(TrafficStats::default()),
497            rate_limits: Mutex::new(HashMap::new()),
498            auth_required: config.require_auth,
499            auth_token: config.auth_token,
500            events_tx,
501            relay_capable: config.relay_capable,
502            session_mgr: Mutex::new(session_mgr),
503            active_session: Mutex::new(channel_session),
504            channel_session,
505            candidate_attempts: Mutex::new(HashMap::new()),
506            e2ee_key: config.e2ee_key,
507            e2ee_keys: Mutex::new(HashMap::new()),
508            e2ee_pending: Mutex::new(HashMap::new()),
509            e2ee_ready: Mutex::new(HashMap::new()),
510            rekey_interval_secs: config.rekey_interval_secs,
511            max_direct_peers: config.max_direct_peers,
512            group_topology: Mutex::new(HashMap::new()),
513            #[cfg(feature = "predictive-rendezvous")]
514            pr_sessions: Mutex::new(HashMap::new()),
515        });
516
517        {
518            let mut topo = inner.group_topology.lock().await;
519            topo.insert(channel_session, GroupMode::Mesh);
520        }
521
522        MeshInner::spawn_receiver(inner.clone(), 0, MeshSocket::Udp(socket.clone()));
523        MeshInner::spawn_auto_upgrade(inner.clone());
524        MeshInner::spawn_candidate_checks(inner.clone());
525        MeshInner::spawn_rekey(inner.clone());
526        MeshInner::spawn_scaling(inner.clone());
527        MeshInner::spawn_pinger(inner.clone());
528
529        let mesh = Self {
530            inner,
531            events_rx,
532            discovery_config: DiscoveryConfig {
533                channel_name: config.channel_name,
534                password: config.password,
535                peer_id,
536                listen_port: socket.local_addr()?.port(),
537            },
538            _mdns: None,
539        };
540
541        Ok(mesh)
542    }
543
544    /// Return the local peer id.
545    pub fn local_peer_id(&self) -> PeerId {
546        self.inner.identity.peer_id
547    }
548
549    /// Return the local UDP listen address.
550    pub async fn local_addr(&self) -> Result<SocketAddr> {
551        let sockets = self.inner.sockets.lock().await;
552        match sockets.first() {
553            Some(MeshSocket::Udp(sock)) => {
554                let addr = sock.local_addr()?;
555                // Convert 0.0.0.0 to 127.0.0.1 for local connections
556                if addr.ip().is_unspecified() {
557                    Ok(SocketAddr::new(
558                        std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
559                        addr.port(),
560                    ))
561                } else {
562                    Ok(addr)
563                }
564            }
565            Some(MeshSocket::Turn(relay)) => Ok(relay.relay_addr()),
566            None => Err(anyhow!("missing socket")),
567        }
568    }
569
570    /// Start LAN discovery via mDNS and spawn the discovery task.
571    pub fn start_lan_discovery(&mut self) -> Result<()> {
572        let mdns = start_mdns_advertisement(self.discovery_config.clone())?;
573        self._mdns = Some(mdns);
574        MeshInner::spawn_discovery(self.inner.clone(), self.discovery_config.clone());
575        Ok(())
576    }
577
578    /// Enable NAT traversal (STUN + TURN) and populate local candidates.
579    pub async fn enable_nat(&mut self, nat_cfg: NatConfig) {
580        let mut cfg = self.inner.nat_cfg.lock().await;
581        *cfg = Some(nat_cfg);
582        drop(cfg);
583        let nat_cfg = { self.inner.nat_cfg.lock().await.clone() };
584        if let Some(nat_cfg) = nat_cfg {
585            let local_addr = {
586                let sockets = self.inner.sockets.lock().await;
587                sockets.first().and_then(|sock| match sock {
588                    MeshSocket::Udp(udp) => udp.local_addr().ok(),
589                    MeshSocket::Turn(relay) => Some(relay.relay_addr()),
590                })
591            };
592            let mut relay_addrs = Vec::new();
593            if !nat_cfg.turn_servers.is_empty() {
594                if let Ok(relays) = gather_turn_candidates(&nat_cfg).await {
595                    for relay in relays {
596                        let relay_addr = relay.relay_addr;
597                        let relay_handle = relay.relay.clone();
598                        if let Ok(socket_idx) = self.inner.add_turn_relay(relay_handle).await {
599                            let mut turn_relays = self.inner.turn_relays.lock().await;
600                            turn_relays.insert(relay_addr, socket_idx);
601                            relay_addrs.push(relay_addr);
602                            let _ = rift_nat::spawn_turn_keepalive(
603                                relay.relay.clone(),
604                                nat_cfg.turn_keepalive_ms,
605                            );
606                        }
607                    }
608                }
609            }
610            if let Ok(addrs) = gather_public_addrs(&nat_cfg).await {
611                let local_addr = {
612                    let sockets = self.inner.sockets.lock().await;
613                    sockets.first().and_then(|sock| match sock {
614                        MeshSocket::Udp(udp) => udp.local_addr().ok(),
615                        MeshSocket::Turn(relay) => Some(relay.relay_addr()),
616                    })
617                };
618                let mut combined = addrs.clone();
619                combined.extend(relay_addrs.clone());
620                if let Some(local) = local_addr {
621                    combined.push(local);
622                }
623                combined.sort();
624                combined.dedup();
625                let mut self_candidates = self.inner.self_candidates.lock().await;
626                *self_candidates = combined;
627                drop(self_candidates);
628                let ice_candidates = MeshInner::build_ice_candidates(local_addr, &addrs, &relay_addrs);
629                let mut self_ice = self.inner.self_ice_candidates.lock().await;
630                *self_ice = ice_candidates;
631            } else if let Some(local) = local_addr {
632                let mut self_candidates = self.inner.self_candidates.lock().await;
633                let mut combined = vec![local];
634                combined.extend(relay_addrs.clone());
635                combined.sort();
636                combined.dedup();
637                *self_candidates = combined;
638                drop(self_candidates);
639                let ice_candidates = MeshInner::build_ice_candidates(Some(local), &[], &relay_addrs);
640                let mut self_ice = self.inner.self_ice_candidates.lock().await;
641                *self_ice = ice_candidates;
642            }
643        }
644    }
645
646    /// Join peers from an invite using NAT traversal for connectivity.
647    pub async fn join_invite(&mut self, invite: Invite, nat_cfg: NatConfig) -> Result<()> {
648        {
649            let mut cfg = self.inner.nat_cfg.lock().await;
650            *cfg = Some(nat_cfg.clone());
651        }
652
653        let mut targets = invite.known_peers;
654        targets.extend(invite.candidates);
655        targets.sort();
656        targets.dedup();
657
658        let mut candidates: Vec<IceCandidate> = targets
659            .into_iter()
660            .map(|addr| IceCandidate {
661                addr,
662                cand_type: CandidateType::Srflx,
663                priority: 50,
664                foundation: MeshInner::candidate_foundation(addr, CandidateType::Srflx),
665            })
666            .collect();
667        for relay_addr in invite.relay_candidates {
668            candidates.push(IceCandidate {
669                addr: relay_addr,
670                cand_type: CandidateType::Relay,
671                priority: 70,
672                foundation: MeshInner::candidate_foundation(relay_addr, CandidateType::Relay),
673            });
674        }
675
676        for candidate in candidates {
677            match candidate.cand_type {
678                CandidateType::Relay => {
679                    let socket_idx = {
680                        let map = self.inner.turn_relays.lock().await;
681                        map.get(&candidate.addr).copied()
682                    };
683                    if let Some(socket_idx) = socket_idx {
684                        if let Err(err) = self.inner.initiate_handshake(candidate.addr, socket_idx).await {
685                            tracing::warn!("handshake to {} failed: {err}", candidate.addr);
686                        }
687                    }
688                }
689                _ => {
690                    // Try direct handshake on the primary socket first (useful for LAN/local invites).
691                    if let Err(err) = self.inner.initiate_handshake(candidate.addr, 0).await {
692                        tracing::warn!("direct handshake to {} failed: {err}", candidate.addr);
693                    } else {
694                        continue;
695                    }
696
697                    let endpoint = PeerEndpoint {
698                        peer_id: PeerId([0u8; 32]),
699                        external_addrs: vec![candidate.addr],
700                        punch_ports: vec![candidate.addr.port()],
701                    };
702                    if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
703                        let socket_idx = self.inner.add_socket(socket).await?;
704                        if let Err(err) = self.inner.initiate_handshake(remote, socket_idx).await {
705                            tracing::warn!("handshake to {} failed: {err}", remote);
706                        }
707                    }
708                }
709            }
710        }
711        Ok(())
712    }
713
714    /// Return the currently active session id.
715    pub async fn active_session(&self) -> SessionId {
716        *self.inner.active_session.lock().await
717    }
718
719    /// Await the next mesh event.
720    pub async fn next_event(&mut self) -> Option<MeshEvent> {
721        self.events_rx.recv().await
722    }
723
724    /// Obtain a cloneable handle for issuing commands.
725    pub fn handle(&self) -> MeshHandle {
726        MeshHandle {
727            inner: self.inner.clone(),
728        }
729    }
730}
731
732impl MeshInner {
733    fn build_ice_candidates(
734        local_addr: Option<SocketAddr>,
735        public_addrs: &[SocketAddr],
736        relay_addrs: &[SocketAddr],
737    ) -> Vec<IceCandidate> {
738        let mut out = Vec::new();
739        if let Some(addr) = local_addr {
740            out.push(IceCandidate {
741                addr,
742                cand_type: CandidateType::Host,
743                priority: 100,
744                foundation: Self::candidate_foundation(addr, CandidateType::Host),
745            });
746        }
747        for addr in public_addrs {
748            out.push(IceCandidate {
749                addr: *addr,
750                cand_type: CandidateType::Srflx,
751                priority: 90,
752                foundation: Self::candidate_foundation(*addr, CandidateType::Srflx),
753            });
754        }
755        for addr in relay_addrs {
756            out.push(IceCandidate {
757                addr: *addr,
758                cand_type: CandidateType::Relay,
759                priority: 70,
760                foundation: Self::candidate_foundation(*addr, CandidateType::Relay),
761            });
762        }
763        out.sort_by(|a, b| b.priority.cmp(&a.priority));
764        out.dedup_by(|a, b| a.addr == b.addr && a.cand_type == b.cand_type);
765        out
766    }
767
768    fn candidate_foundation(addr: SocketAddr, cand_type: CandidateType) -> u64 {
769        use std::hash::{Hash, Hasher};
770        let mut hasher = std::collections::hash_map::DefaultHasher::new();
771        addr.hash(&mut hasher);
772        cand_type.hash(&mut hasher);
773        hasher.finish()
774    }
775
776    async fn current_group_mode(&self, session: SessionId) -> GroupMode {
777        let topo = self.group_topology.lock().await;
778        topo.get(&session).cloned().unwrap_or(GroupMode::Mesh)
779    }
780
781    async fn set_group_mode(&self, session: SessionId, mode: GroupMode) {
782        let mut topo = self.group_topology.lock().await;
783        topo.insert(session, mode);
784    }
785
786    async fn is_topology_coordinator(&self, session: SessionId) -> bool {
787        let peers = {
788            let mgr = self.session_mgr.lock().await;
789            mgr.participants(session)
790        };
791        if peers.is_empty() {
792            return true;
793        }
794        let mut all = peers;
795        if !all.contains(&self.identity.peer_id) {
796            all.push(self.identity.peer_id);
797        }
798        all.sort_by_key(|p| p.0);
799        all.first().copied() == Some(self.identity.peer_id)
800    }
801
802    async fn select_forwarder(&self, session: SessionId) -> Option<PeerId> {
803        let participants = {
804            let mgr = self.session_mgr.lock().await;
805            mgr.participants(session)
806        };
807        if participants.is_empty() {
808            return None;
809        }
810        let relay_caps = self.peer_caps.lock().await.clone();
811        let stats = self.link_stats.lock().await.clone();
812
813        let mut candidates: Vec<PeerId> = participants
814            .into_iter()
815            .filter(|peer| relay_caps.get(peer).copied().unwrap_or(false) || *peer == self.identity.peer_id)
816            .collect();
817        if self.relay_capable && !candidates.contains(&self.identity.peer_id) {
818            candidates.push(self.identity.peer_id);
819        }
820        if candidates.is_empty() {
821            return None;
822        }
823        candidates.sort_by_key(|peer| {
824            if *peer == self.identity.peer_id {
825                0u64
826            } else if let Some(st) = stats.get(peer) {
827                (st.rtt_ms as u64).saturating_add((st.loss_ratio() * 1000.0) as u64)
828            } else {
829                10_000
830            }
831        });
832        candidates.first().copied()
833    }
834
835    async fn update_group_topology(&self, session: SessionId) -> Result<()> {
836        if !self.is_topology_coordinator(session).await {
837            return Ok(());
838        }
839        let participants = {
840            let mgr = self.session_mgr.lock().await;
841            mgr.participants(session)
842        };
843        // Include local peer in count (participants only contains remote peers)
844        let count = participants.len() + 1;
845        let next = if count <= GROUP_MESH_MAX {
846            GroupMode::Mesh
847        } else if self.e2ee_key.is_some() {
848            if let Some(forwarder) = self.select_forwarder(session).await {
849                GroupMode::Hybrid { forwarder }
850            } else {
851                GroupMode::Mesh
852            }
853        } else {
854            GroupMode::Mesh
855        };
856
857        let current = self.current_group_mode(session).await;
858        if current == next {
859            return Ok(());
860        }
861        self.set_group_mode(session, next).await;
862        let msg = ControlMessage::Group(GroupControl::Topology { session, mode: next });
863        for peer in participants {
864            if peer == self.identity.peer_id {
865                continue;
866            }
867            let _ = self.send_control_to_peer(peer, msg.clone(), session).await;
868        }
869        let _ = self
870            .events_tx
871            .send(MeshEvent::GroupTopology { session, mode: next })
872            .await;
873        Ok(())
874    }
875
876    async fn should_forward_voice(&self, session: SessionId, from: PeerId) -> bool {
877        if from == self.identity.peer_id {
878            return false;
879        }
880        match self.current_group_mode(session).await {
881            GroupMode::Hybrid { forwarder } if forwarder == self.identity.peer_id => true,
882            _ => false,
883        }
884    }
885
886    async fn should_use_group_key(&self, session: SessionId) -> bool {
887        matches!(self.current_group_mode(session).await, GroupMode::Hybrid { .. }) && self.e2ee_key.is_some()
888    }
889    async fn broadcast_chat(&self, text: String) -> Result<()> {
890        let timestamp = now_timestamp();
891        let seq = self.next_control_seq().await;
892        let chat = ChatMessage::new(self.identity.peer_id, timestamp, text);
893        let payload = RiftPayload::Control(ControlMessage::Chat(chat.clone()));
894        let session = self.channel_session;
895
896        let mut cache = self.cache.lock().await;
897        cache.insert(chat.id);
898        drop(cache);
899
900        let routes = self.routes_snapshot().await;
901        for (peer_id, route) in routes {
902            if peer_id == self.identity.peer_id {
903                continue;
904            }
905            if let Err(err) = self
906                .send_to_peer(peer_id, route, payload.clone(), seq, timestamp, session)
907                .await
908            {
909                tracing::debug!(peer = %peer_id, "chat send failed: {err}");
910            }
911        }
912
913        Ok(())
914    }
915    fn spawn_receiver(inner: Arc<Self>, socket_idx: usize, socket: MeshSocket) {
916        tokio::spawn(async move {
917            let mut buf = [0u8; MAX_PACKET];
918            match socket {
919                MeshSocket::Udp(socket) => loop {
920                    let (len, addr) = match socket.recv_from(&mut buf).await {
921                        Ok(res) => res,
922                        Err(_) => continue,
923                    };
924                    if let Err(err) = inner
925                        .clone()
926                        .handle_packet(socket_idx, addr, &buf[..len])
927                        .await
928                    {
929                        tracing::warn!("mesh recv error: {err}");
930                    }
931                },
932                MeshSocket::Turn(relay) => loop {
933                    let res = relay.recv_from(&mut buf).await;
934                    let Ok((len, addr)) = res else {
935                        tracing::warn!("turn recv error");
936                        continue;
937                    };
938                    if let Err(err) = inner
939                        .clone()
940                        .handle_packet(socket_idx, addr, &buf[..len])
941                        .await
942                    {
943                        tracing::warn!("mesh recv error: {err}");
944                    }
945                },
946            }
947        });
948    }
949
950    fn spawn_discovery(inner: Arc<Self>, config: DiscoveryConfig) {
951        tokio::spawn(async move {
952            loop {
953                let stream = match discover_peers(config.clone()) {
954                    Ok(stream) => stream,
955                    Err(err) => {
956                        tracing::warn!("discovery failed: {err}");
957                        tokio::time::sleep(Duration::from_secs(2)).await;
958                        continue;
959                    }
960                };
961                tokio::pin!(stream);
962                let mut window = tokio::time::interval(Duration::from_millis(200));
963                let window_deadline = tokio::time::Instant::now() + Duration::from_secs(3);
964                loop {
965                    tokio::select! {
966                        maybe = stream.next() => {
967                            let Some(peer) = maybe else { break; };
968                            if peer.peer_id == inner.identity.peer_id {
969                                continue;
970                            }
971                            let already = {
972                                let connections = inner.connections.lock().await;
973                                connections.contains_key(&peer.addr)
974                            };
975                            if already {
976                                continue;
977                            }
978                            if let Err(err) = inner.initiate_handshake(peer.addr, 0).await {
979                                tracing::warn!("handshake to {} failed: {err}", peer.addr);
980                                let inner_retry = inner.clone();
981                                tokio::spawn(async move {
982                                    for attempt in 1..=10 {
983                                        tokio::time::sleep(Duration::from_secs(1)).await;
984                                        let already = {
985                                            let connections = inner_retry.connections.lock().await;
986                                            connections.contains_key(&peer.addr)
987                                        };
988                                        if already {
989                                            break;
990                                        }
991                                        if let Err(err) = inner_retry.initiate_handshake(peer.addr, 0).await {
992                                            tracing::debug!(
993                                                "handshake retry {} to {} failed: {err}",
994                                                attempt,
995                                                peer.addr
996                                            );
997                                        } else {
998                                            break;
999                                        }
1000                                    }
1001                                });
1002                            }
1003                        }
1004                        _ = window.tick() => {
1005                            if tokio::time::Instant::now() >= window_deadline {
1006                                break;
1007                            }
1008                        }
1009                    }
1010                }
1011                tokio::time::sleep(Duration::from_secs(3)).await;
1012            }
1013        });
1014    }
1015
1016    fn spawn_auto_upgrade(inner: Arc<Self>) {
1017        tokio::spawn(async move {
1018            let mut tick = tokio::time::interval(Duration::from_secs(30));
1019            loop {
1020                tick.tick().await;
1021                let nat_cfg = { inner.nat_cfg.lock().await.clone() };
1022                let Some(nat_cfg) = nat_cfg else {
1023                    continue;
1024                };
1025                let relayed: Vec<(PeerId, Vec<SocketAddr>)> = {
1026                    let routes = inner.routes.lock().await;
1027                    let addrs = inner.peer_addrs.lock().await;
1028                    let candidates = inner.peer_candidates.lock().await;
1029                    routes
1030                        .iter()
1031                        .filter_map(|(peer_id, route)| match route {
1032                            PeerRoute::Relayed { .. } => {
1033                                if let Some(cands) = candidates.get(peer_id) {
1034                                    Some((*peer_id, cands.clone()))
1035                                } else {
1036                                    addrs
1037                                        .get(peer_id)
1038                                        .copied()
1039                                        .map(|addr| (*peer_id, vec![addr]))
1040                                }
1041                            }
1042                            _ => None,
1043                        })
1044                        .collect()
1045                };
1046
1047                for (peer_id, addrs) in relayed {
1048                    let endpoint = PeerEndpoint {
1049                        peer_id,
1050                        external_addrs: addrs.clone(),
1051                        punch_ports: addrs.iter().map(|addr| addr.port()).collect(),
1052                    };
1053                    if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
1054                        if let Ok(socket_idx) = inner.add_socket(socket).await {
1055                            if let Err(err) = inner.initiate_handshake(remote, socket_idx).await {
1056                                tracing::debug!(
1057                                    peer = %peer_id,
1058                                    "auto-upgrade handshake failed: {err}"
1059                                );
1060                            }
1061                        }
1062                    }
1063                }
1064            }
1065        });
1066    }
1067
1068    fn spawn_candidate_checks(inner: Arc<Self>) {
1069        tokio::spawn(async move {
1070            let mut tick = tokio::time::interval(Duration::from_secs(15));
1071            loop {
1072                tick.tick().await;
1073                let nat_cfg = { inner.nat_cfg.lock().await.clone() };
1074                let Some(nat_cfg) = nat_cfg else {
1075                    continue;
1076                };
1077                let ice_map = { inner.peer_ice_candidates.lock().await.clone() };
1078                let candidates_map = { inner.peer_candidates.lock().await.clone() };
1079                let routes = { inner.routes.lock().await.clone() };
1080                let addrs = { inner.peer_addrs.lock().await.clone() };
1081                let now = tokio::time::Instant::now();
1082
1083                let mut targets: Vec<(PeerId, Vec<IceCandidate>)> = Vec::new();
1084                for (peer_id, route) in routes.iter() {
1085                    if matches!(route, PeerRoute::Direct { .. }) {
1086                        continue;
1087                    }
1088                    let candidates = if let Some(ice) = ice_map.get(peer_id) {
1089                        let mut sorted = ice.clone();
1090                        sorted.sort_by(|a, b| b.priority.cmp(&a.priority));
1091                        sorted
1092                    } else {
1093                        candidates_map
1094                            .get(peer_id)
1095                            .cloned()
1096                            .or_else(|| addrs.get(peer_id).copied().map(|addr| vec![addr]))
1097                            .unwrap_or_default()
1098                            .into_iter()
1099                            .map(|addr| IceCandidate {
1100                                addr,
1101                                cand_type: CandidateType::Srflx,
1102                                priority: 50,
1103                                foundation: MeshInner::candidate_foundation(addr, CandidateType::Srflx),
1104                            })
1105                            .collect()
1106                    };
1107                    if candidates.is_empty() {
1108                        continue;
1109                    }
1110                    targets.push((*peer_id, candidates));
1111                }
1112
1113                for (peer_id, candidates) in targets {
1114                    let mut attempts = inner.candidate_attempts.lock().await;
1115                    if let Some(last) = attempts.get(&peer_id) {
1116                        if now.duration_since(*last) < Duration::from_secs(20) {
1117                            continue;
1118                        }
1119                    }
1120                    attempts.insert(peer_id, now);
1121                    drop(attempts);
1122
1123                    for candidate in candidates {
1124                        let mut connected = None;
1125                        match candidate.cand_type {
1126                            CandidateType::Relay => {
1127                                let socket_idx = {
1128                                    let map = inner.turn_relays.lock().await;
1129                                    map.get(&candidate.addr).copied()
1130                                };
1131                                if let Some(socket_idx) = socket_idx {
1132                                    if let Err(err) = inner.initiate_handshake(candidate.addr, socket_idx).await {
1133                                        tracing::debug!(
1134                                            peer = %peer_id,
1135                                            "turn candidate handshake failed: {err}"
1136                                        );
1137                                    } else {
1138                                        connected = Some(candidate.addr);
1139                                    }
1140                                }
1141                            }
1142                            _ => {
1143                                let endpoint = PeerEndpoint {
1144                                    peer_id,
1145                                    external_addrs: vec![candidate.addr],
1146                                    punch_ports: vec![candidate.addr.port()],
1147                                };
1148                                if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
1149                                    if let Ok(socket_idx) = inner.add_socket(socket).await {
1150                                        if let Err(err) = inner.initiate_handshake(remote, socket_idx).await {
1151                                            tracing::debug!(
1152                                                peer = %peer_id,
1153                                                "candidate check handshake failed: {err}"
1154                                            );
1155                                        } else {
1156                                            connected = Some(remote);
1157                                        }
1158                                    }
1159                                }
1160                            }
1161                        }
1162                        if let Some(remote) = connected {
1163                            let msg = ControlMessage::IceCheck {
1164                                session: SessionId::NONE,
1165                                tie_breaker: rand::random::<u64>(),
1166                                candidate: candidate.clone(),
1167                            };
1168                            let _ = inner.send_control_to_peer(peer_id, msg, SessionId::NONE).await;
1169                            if remote == candidate.addr {
1170                                break;
1171                            }
1172                        }
1173                    }
1174                }
1175            }
1176        });
1177    }
1178
1179    fn spawn_pinger(inner: Arc<Self>) {
1180        tokio::spawn(async move {
1181            let mut tick = tokio::time::interval(Duration::from_secs(2));
1182            let mut nonce: u64 = 1;
1183            loop {
1184                tick.tick().await;
1185                let peers: Vec<PeerId> = {
1186                    let routes = inner.routes.lock().await;
1187                    routes.keys().copied().collect()
1188                };
1189                for peer_id in peers {
1190                    if peer_id == inner.identity.peer_id {
1191                        continue;
1192                    }
1193                    let ping = ControlMessage::Ping {
1194                        nonce,
1195                        sent_at_ms: now_timestamp(),
1196                    };
1197                    nonce = nonce.wrapping_add(1);
1198                    let _ = inner
1199                        .send_control_to_peer(peer_id, ping, SessionId::NONE)
1200                        .await;
1201                }
1202            }
1203        });
1204    }
1205
1206    fn spawn_rekey(inner: Arc<Self>) {
1207        tokio::spawn(async move {
1208            let interval = inner.rekey_interval_secs.unwrap_or(0);
1209            if interval == 0 {
1210                return;
1211            }
1212            let mut tick = tokio::time::interval(Duration::from_secs(interval));
1213            loop {
1214                tick.tick().await;
1215                let routes = inner.routes.lock().await.clone();
1216                let peers = inner.peers_by_id.lock().await.clone();
1217                for (peer_id, route) in routes {
1218                    if !matches!(route, PeerRoute::Direct { .. }) {
1219                        continue;
1220                    }
1221                    if let Some(addr) = peers.get(&peer_id).copied() {
1222                        if let Err(err) = inner.initiate_handshake(addr, 0).await {
1223                            tracing::debug!(peer = %peer_id, "rekey handshake failed: {err}");
1224                        }
1225                    }
1226                }
1227            }
1228        });
1229    }
1230
1231    fn spawn_scaling(inner: Arc<Self>) {
1232        tokio::spawn(async move {
1233            let mut tick = tokio::time::interval(Duration::from_secs(10));
1234            loop {
1235                tick.tick().await;
1236                let Some(max_direct) = inner.max_direct_peers else {
1237                    continue;
1238                };
1239                let routes = inner.routes.lock().await.clone();
1240                let relay_candidates = inner.relay_candidates.lock().await.clone();
1241                let peer_addrs = inner.peer_addrs.lock().await.clone();
1242                let mut direct_peers: Vec<PeerId> = routes
1243                    .iter()
1244                    .filter_map(|(peer_id, route)| match route {
1245                        PeerRoute::Direct { .. } => Some(*peer_id),
1246                        _ => None,
1247                    })
1248                    .collect();
1249
1250                if direct_peers.len() <= max_direct {
1251                    continue;
1252                }
1253
1254                direct_peers.sort_by_key(|peer| peer.to_hex());
1255                let mut routes_guard = inner.routes.lock().await;
1256                let mut direct_count = direct_peers.len();
1257                for peer_id in direct_peers {
1258                    if direct_count <= max_direct {
1259                        break;
1260                    }
1261                    let Some(relay) = relay_candidates.get(&peer_id).copied() else {
1262                        continue;
1263                    };
1264                    if matches!(routes_guard.get(&peer_id), Some(PeerRoute::Relayed { .. })) {
1265                        continue;
1266                    }
1267                    routes_guard.insert(peer_id, PeerRoute::Relayed { via: relay });
1268                    direct_count = direct_count.saturating_sub(1);
1269                    drop(routes_guard);
1270                    let _ = inner
1271                        .events_tx
1272                        .send(MeshEvent::RouteUpdated {
1273                            peer_id,
1274                            route: PeerRoute::Relayed { via: relay },
1275                        })
1276                        .await;
1277                    tracing::info!(peer = %peer_id, relay = %relay, "scaled route to relay");
1278                    routes_guard = inner.routes.lock().await;
1279                }
1280
1281                if direct_count <= max_direct {
1282                    continue;
1283                }
1284
1285                // If we still exceed max_direct and have direct routes without relay candidates,
1286                // keep them as-is until a relay candidate becomes available.
1287                drop(routes_guard);
1288
1289                // If we are under the limit, attempt to restore relayed routes to direct.
1290                if direct_count < max_direct {
1291                    let mut routes_guard = inner.routes.lock().await;
1292                    for (peer_id, route) in routes.iter() {
1293                        if !matches!(route, PeerRoute::Relayed { .. }) {
1294                            continue;
1295                        }
1296                        if let Some(addr) = peer_addrs.get(peer_id).copied() {
1297                            routes_guard.insert(*peer_id, PeerRoute::Direct { addr });
1298                            let _ = inner
1299                                .events_tx
1300                                .send(MeshEvent::RouteUpdated {
1301                                    peer_id: *peer_id,
1302                                    route: PeerRoute::Direct { addr },
1303                                })
1304                                .await;
1305                            tracing::info!(peer = %peer_id, "scaled route back to direct");
1306                        }
1307                    }
1308                }
1309            }
1310        });
1311    }
1312
1313    async fn add_socket(self: &Arc<Self>, socket: UdpSocket) -> Result<usize> {
1314        let socket = Arc::new(socket);
1315        let mut sockets = self.sockets.lock().await;
1316        sockets.push(MeshSocket::Udp(socket.clone()));
1317        let idx = sockets.len() - 1;
1318        drop(sockets);
1319        Self::spawn_receiver(self.clone(), idx, MeshSocket::Udp(socket));
1320        Ok(idx)
1321    }
1322
1323    async fn add_turn_relay(self: &Arc<Self>, relay: Arc<TurnRelay>) -> Result<usize> {
1324        let mut sockets = self.sockets.lock().await;
1325        sockets.push(MeshSocket::Turn(relay.clone()));
1326        let idx = sockets.len() - 1;
1327        drop(sockets);
1328        Self::spawn_receiver(self.clone(), idx, MeshSocket::Turn(relay));
1329        Ok(idx)
1330    }
1331
1332    async fn initiate_handshake(&self, addr: SocketAddr, socket_idx: usize) -> Result<()> {
1333        let builder = rift_core::noise::noise_builder();
1334        let static_kp = builder.generate_keypair()?;
1335        let mut hs = builder
1336            .local_private_key(&static_kp.private)
1337            .build_initiator()?;
1338
1339        let mut buf = [0u8; MAX_PACKET];
1340        let len = hs.write_message(&[], &mut buf)?;
1341        self.send_raw(socket_idx, addr, &buf[..len]).await?;
1342
1343        let mut pending = self.pending.lock().await;
1344        pending.insert(
1345            PendingKey { socket_idx, addr },
1346            PendingHandshake::InitiatorAwait2(hs),
1347        );
1348        Ok(())
1349    }
1350
1351    async fn handle_packet(self: Arc<Self>, socket_idx: usize, addr: SocketAddr, data: &[u8]) -> Result<()> {
1352        if !self.allow_packet(addr).await {
1353            return Ok(());
1354        }
1355        #[cfg(feature = "predictive-rendezvous")]
1356        if self.try_handle_pr_probe(addr, data).await? {
1357            return Ok(());
1358        }
1359        if self.try_handle_pending(socket_idx, addr, data).await? {
1360            return Ok(());
1361        }
1362
1363        let maybe_session = {
1364            let mut connections = self.connections.lock().await;
1365            connections.get_mut(&addr).map(|conn| {
1366                let mut out = vec![0u8; MAX_PACKET];
1367                let len = conn.session.decrypt(data, &mut out)?;
1368                Ok::<_, anyhow::Error>(out[..len].to_vec())
1369            })
1370        };
1371
1372        let plaintext = match maybe_session {
1373            Some(res) => res?,
1374            None => {
1375                self.start_responder(socket_idx, addr, data).await?;
1376                return Ok(());
1377            }
1378        };
1379
1380        self.record_recv(addr, data.len()).await;
1381        let (header, payload) = decode_frame(&plaintext)?;
1382        self.update_link_stats(header.source, header.seq, header.timestamp)
1383            .await;
1384        self.handle_frame(addr, header, payload).await?;
1385        Ok(())
1386    }
1387
1388    async fn try_handle_pending(
1389        &self,
1390        socket_idx: usize,
1391        addr: SocketAddr,
1392        data: &[u8],
1393    ) -> Result<bool> {
1394        let mut pending = self.pending.lock().await;
1395        let key = PendingKey { socket_idx, addr };
1396        let Some(state) = pending.remove(&key) else {
1397            return Ok(false);
1398        };
1399        drop(pending);
1400
1401        match state {
1402            PendingHandshake::InitiatorAwait2(mut hs) => {
1403                let mut out = [0u8; MAX_PACKET];
1404                hs.read_message(data, &mut out)?;
1405                let len = hs.write_message(&[], &mut out)?;
1406                self.send_raw(socket_idx, addr, &out[..len]).await?;
1407                let transport = hs.into_transport_mode()?;
1408                self.install_connection(addr, transport, socket_idx).await?;
1409            }
1410            PendingHandshake::ResponderAwait3(mut hs) => {
1411                let mut out = [0u8; MAX_PACKET];
1412                hs.read_message(data, &mut out)?;
1413                let transport = hs.into_transport_mode()?;
1414                self.install_connection(addr, transport, socket_idx).await?;
1415            }
1416        }
1417
1418        Ok(true)
1419    }
1420
1421    async fn start_responder(
1422        &self,
1423        socket_idx: usize,
1424        addr: SocketAddr,
1425        first_msg: &[u8],
1426    ) -> Result<()> {
1427        let builder = rift_core::noise::noise_builder();
1428        let static_kp = builder.generate_keypair()?;
1429        let mut hs = builder
1430            .local_private_key(&static_kp.private)
1431            .build_responder()?;
1432
1433        let mut out = [0u8; MAX_PACKET];
1434        hs.read_message(first_msg, &mut out)?;
1435        let len = hs.write_message(&[], &mut out)?;
1436        self.send_raw(socket_idx, addr, &out[..len]).await?;
1437
1438        let mut pending = self.pending.lock().await;
1439        pending.insert(
1440            PendingKey { socket_idx, addr },
1441            PendingHandshake::ResponderAwait3(hs),
1442        );
1443        Ok(())
1444    }
1445
1446    async fn install_connection(
1447        &self,
1448        addr: SocketAddr,
1449        transport: snow::TransportState,
1450        socket_idx: usize,
1451    ) -> Result<()> {
1452        let conn = PeerConnection {
1453            peer_id: None,
1454            session: rift_core::noise::NoiseSession::new(transport),
1455            socket_idx,
1456            authenticated: !self.auth_required,
1457        };
1458        let mut connections = self.connections.lock().await;
1459        connections.insert(addr, conn);
1460        drop(connections);
1461
1462        if let Some(token) = self.auth_token.clone() {
1463            let auth = ControlMessage::Auth { token };
1464            let seq = self.next_control_seq().await;
1465            let _ = self
1466                .send_payload(
1467                    addr,
1468                    RiftPayload::Control(auth),
1469                    seq,
1470                    now_timestamp(),
1471                    SessionId::NONE,
1472                    None,
1473                )
1474                .await;
1475        }
1476
1477        let join = ControlMessage::Join {
1478            peer_id: self.identity.peer_id,
1479            display_name: None,
1480        };
1481        let seq = self.next_control_seq().await;
1482        self.send_payload(
1483            addr,
1484            RiftPayload::Control(join),
1485            seq,
1486            now_timestamp(),
1487            SessionId::NONE,
1488            None,
1489        )
1490        .await?;
1491
1492        let state = ControlMessage::PeerState {
1493            peer_id: self.identity.peer_id,
1494            relay_capable: self.relay_capable,
1495        };
1496        let seq = self.next_control_seq().await;
1497        self.send_payload(
1498            addr,
1499            RiftPayload::Control(state),
1500            seq,
1501            now_timestamp(),
1502            SessionId::NONE,
1503            None,
1504        )
1505        .await?;
1506
1507        self.send_peer_list(addr).await?;
1508        self.send_hello(addr).await?;
1509        Ok(())
1510    }
1511
1512    async fn send_peer_list(&self, addr: SocketAddr) -> Result<()> {
1513        let peers: Vec<PeerInfo> = {
1514            let peers_by_id = self.peers_by_id.lock().await;
1515            let peer_caps = self.peer_caps.lock().await;
1516            let peer_candidates = self.peer_candidates.lock().await;
1517            peers_by_id
1518                .iter()
1519                .map(|(peer_id, addr)| {
1520                    let mut addrs = peer_candidates
1521                        .get(peer_id)
1522                        .cloned()
1523                        .unwrap_or_else(|| vec![*addr]);
1524                    if addrs.is_empty() {
1525                        addrs.push(*addr);
1526                    }
1527                    addrs.sort();
1528                    addrs.dedup();
1529                    PeerInfo {
1530                        peer_id: *peer_id,
1531                        addr: *addr,
1532                        addrs,
1533                        relay_capable: *peer_caps.get(peer_id).unwrap_or(&false),
1534                    }
1535                })
1536                .collect()
1537        };
1538        let msg = RiftPayload::Control(ControlMessage::PeerList { peers });
1539        let seq = self.next_control_seq().await;
1540        self.send_payload(addr, msg, seq, now_timestamp(), SessionId::NONE, None)
1541            .await
1542    }
1543
1544    async fn handle_capabilities(&self, peer_id: PeerId, capabilities: Capabilities) -> Result<()> {
1545        {
1546            let mut caps = self.peer_capabilities.lock().await;
1547            caps.insert(peer_id, capabilities.clone());
1548        }
1549
1550        let session = self.negotiate_session_config(&capabilities).await;
1551        {
1552            let mut sessions = self.peer_session.lock().await;
1553            sessions.insert(peer_id, session.clone());
1554        }
1555        self.update_group_codec().await;
1556
1557        let _ = self
1558            .events_tx
1559            .send(MeshEvent::PeerCapabilities {
1560                peer_id,
1561                capabilities: capabilities.clone(),
1562            })
1563            .await;
1564        let _ = self
1565            .events_tx
1566            .send(MeshEvent::PeerSessionConfig {
1567                peer_id,
1568                codec: session.codec,
1569                frame_ms: session.frame_ms,
1570            })
1571            .await;
1572        self.maybe_start_e2ee(peer_id, &capabilities).await?;
1573        Ok(())
1574    }
1575
1576    async fn maybe_start_e2ee(&self, peer_id: PeerId, capabilities: &Capabilities) -> Result<()> {
1577        if self.e2ee_key.is_some() {
1578            return Ok(());
1579        }
1580        let local_supports = {
1581            let preferred = self.preferred_features.lock().await;
1582            if preferred.is_empty() {
1583                true
1584            } else {
1585                preferred.contains(&FeatureFlag::E2EE)
1586            }
1587        };
1588        if !local_supports || !capabilities.features.contains(&FeatureFlag::E2EE) {
1589            return Ok(());
1590        }
1591        let _ = self.start_e2ee(peer_id, self.channel_session).await;
1592        Ok(())
1593    }
1594
1595    async fn negotiate_session_config(&self, remote: &Capabilities) -> SessionConfig {
1596        let preferred = {
1597            let prefs = self.preferred_codecs.lock().await;
1598            if prefs.is_empty() {
1599                vec![CodecId::Opus, CodecId::PCM16]
1600            } else {
1601                prefs.clone()
1602            }
1603        };
1604        let codec = preferred
1605            .into_iter()
1606            .find(|codec| remote.audio_codecs.contains(codec))
1607            .unwrap_or(CodecId::Opus);
1608        let frame_ms = remote
1609            .preferred_frame_duration_ms
1610            .unwrap_or(20)
1611            .min(20);
1612        SessionConfig { codec, frame_ms }
1613    }
1614
1615    async fn update_group_codec(&self) {
1616        let caps = self.peer_capabilities.lock().await;
1617        let preferred = {
1618            let prefs = self.preferred_codecs.lock().await;
1619            if prefs.is_empty() {
1620                vec![CodecId::Opus, CodecId::PCM16]
1621            } else {
1622                prefs.clone()
1623            }
1624        };
1625        let mut selected = CodecId::Opus;
1626        for codec in preferred {
1627            let mut all_support = true;
1628            for cap in caps.values() {
1629                if !cap.audio_codecs.contains(&codec) {
1630                    all_support = false;
1631                    break;
1632                }
1633            }
1634            if all_support {
1635                selected = codec;
1636                break;
1637            }
1638        }
1639        let mut group = self.group_codec.lock().await;
1640        if *group != selected {
1641            *group = selected;
1642            let _ = self.events_tx.send(MeshEvent::GroupCodec(selected)).await;
1643        }
1644    }
1645
1646    async fn send_hello(&self, addr: SocketAddr) -> Result<()> {
1647        let caps = self.default_capabilities().await;
1648        let public_key = self.identity.keypair.public.to_bytes().to_vec();
1649        let candidates = {
1650            let self_candidates = self.self_candidates.lock().await;
1651            self_candidates.clone()
1652        };
1653        let msg = RiftPayload::Control(ControlMessage::Hello {
1654            peer_id: self.identity.peer_id,
1655            public_key,
1656            capabilities: caps,
1657            candidates,
1658        });
1659        let seq = self.next_control_seq().await;
1660        self.send_payload(addr, msg, seq, now_timestamp(), SessionId::NONE, None)
1661            .await
1662    }
1663
1664    async fn send_ice_candidates(&self, peer_id: PeerId, session: SessionId) -> Result<()> {
1665        let candidates = {
1666            let self_ice = self.self_ice_candidates.lock().await;
1667            self_ice.clone()
1668        };
1669        if candidates.is_empty() {
1670            return Ok(());
1671        }
1672        let msg = ControlMessage::IceCandidates {
1673            peer_id: self.identity.peer_id,
1674            session,
1675            candidates,
1676        };
1677        self.send_control_to_peer(peer_id, msg, session).await
1678    }
1679
1680    async fn default_capabilities(&self) -> Capabilities {
1681        let preferred = self.preferred_codecs().await;
1682        let codecs = if preferred.is_empty() {
1683            vec![CodecId::Opus, CodecId::PCM16]
1684        } else {
1685            preferred
1686        };
1687        let features = {
1688            let preferred = self.preferred_features.lock().await;
1689            if preferred.is_empty() {
1690                vec![
1691                    FeatureFlag::Voice,
1692                    FeatureFlag::Text,
1693                    FeatureFlag::Relay,
1694                    FeatureFlag::E2EE,
1695                ]
1696            } else {
1697                preferred.clone()
1698            }
1699        };
1700        Capabilities {
1701            supported_versions: vec![ProtocolVersion::V2, ProtocolVersion::V1],
1702            audio_codecs: codecs,
1703            features,
1704            max_bitrate: Some(96_000),
1705            preferred_frame_duration_ms: Some(20),
1706        }
1707    }
1708
1709    async fn preferred_codecs(&self) -> Vec<CodecId> {
1710        self.preferred_codecs.lock().await.clone()
1711    }
1712
1713    async fn set_preferred_codecs(&self, codecs: Vec<CodecId>) {
1714        let mut preferred = self.preferred_codecs.lock().await;
1715        *preferred = codecs;
1716    }
1717
1718    async fn set_preferred_features(&self, features: Vec<FeatureFlag>) {
1719        let mut preferred = self.preferred_features.lock().await;
1720        *preferred = features;
1721    }
1722
1723    async fn next_control_seq(&self) -> u32 {
1724        let mut seq = self.control_seq.lock().await;
1725        let current = *seq;
1726        *seq = seq.wrapping_add(1);
1727        current
1728    }
1729
1730    async fn send_control_to_peer(
1731        &self,
1732        peer_id: PeerId,
1733        msg: ControlMessage,
1734        session: SessionId,
1735    ) -> Result<()> {
1736        let payload = RiftPayload::Control(msg);
1737        let routes = self.routes_snapshot().await;
1738        let Some(route) = routes.get(&peer_id).cloned() else {
1739            return Err(anyhow!("missing route"));
1740        };
1741        let seq = self.next_control_seq().await;
1742        self.send_to_peer(peer_id, route, payload, seq, now_timestamp(), session)
1743            .await
1744    }
1745
1746    async fn start_e2ee(&self, peer_id: PeerId, session: SessionId) -> Result<()> {
1747        if peer_id == self.identity.peer_id {
1748            return Ok(());
1749        }
1750        // Deterministic initiator: only the lower peer id sends init to avoid key mismatch.
1751        if self.identity.peer_id.0 > peer_id.0 {
1752            return Ok(());
1753        }
1754        {
1755            let keys = self.e2ee_keys.lock().await;
1756            if keys.contains_key(&(peer_id, session)) {
1757                return Ok(());
1758            }
1759        }
1760        {
1761            let pending = self.e2ee_pending.lock().await;
1762            if pending.contains_key(&(peer_id, session)) {
1763                return Ok(());
1764            }
1765        }
1766        let peer_public = {
1767            let keys = self.peer_public_keys.lock().await;
1768            keys.get(&peer_id).cloned()
1769        };
1770        let Some(peer_public) = peer_public else {
1771            return Ok(());
1772        };
1773        let Some(_peer_public) = ed25519_public_from_bytes(&peer_public) else {
1774            return Ok(());
1775        };
1776        let keypair = generate_e2ee_keypair();
1777        let signature = sign_e2ee_public(&self.identity, &session.0, &keypair.public);
1778        let public_key = keypair.public.to_bytes();
1779        {
1780            let mut pending = self.e2ee_pending.lock().await;
1781            pending.insert((peer_id, session), keypair);
1782        }
1783        let msg = ControlMessage::E2eeInit {
1784            session,
1785            from: self.identity.peer_id,
1786            public_key,
1787            signature,
1788        };
1789        self.send_control_to_peer(peer_id, msg, session).await?;
1790        tracing::info!(peer = %peer_id, session = ?session, "e2ee init sent");
1791        Ok(())
1792    }
1793
1794    async fn handle_e2ee_init(
1795        &self,
1796        peer_id: PeerId,
1797        session: SessionId,
1798        public_key: [u8; 32],
1799        signature: Vec<u8>,
1800    ) -> Result<()> {
1801        let should_initiate = self.identity.peer_id.0 < peer_id.0;
1802        {
1803            let keys = self.e2ee_keys.lock().await;
1804            if keys.contains_key(&(peer_id, session)) {
1805                return Ok(());
1806            }
1807        }
1808        if should_initiate {
1809            let pending = self.e2ee_pending.lock().await;
1810            if pending.contains_key(&(peer_id, session)) {
1811                tracing::info!(
1812                    peer = %peer_id,
1813                    session = ?session,
1814                    "e2ee init ignored (local is initiator)"
1815                );
1816                return Ok(());
1817            }
1818        } else {
1819            let mut pending = self.e2ee_pending.lock().await;
1820            pending.remove(&(peer_id, session));
1821        }
1822        let peer_public = {
1823            let keys = self.peer_public_keys.lock().await;
1824            keys.get(&peer_id).cloned()
1825        };
1826        let Some(peer_public) = peer_public else {
1827            return Ok(());
1828        };
1829        let Some(peer_public) = ed25519_public_from_bytes(&peer_public) else {
1830            return Ok(());
1831        };
1832        let remote_public = public_key_from_bytes(public_key);
1833        if !verify_e2ee_public(&peer_public, &session.0, &remote_public, &signature) {
1834            tracing::warn!(peer = %peer_id, "e2ee init signature invalid");
1835            return Ok(());
1836        }
1837        let keypair = generate_e2ee_keypair();
1838        let shared = derive_e2ee_shared_key(&keypair.secret, &remote_public, &session.0);
1839        let shared_fingerprint = hex::encode(&sha2::Sha256::digest(shared)[..4]);
1840        {
1841            let mut keys = self.e2ee_keys.lock().await;
1842            keys.insert((peer_id, session), shared);
1843        }
1844        {
1845            let mut ready = self.e2ee_ready.lock().await;
1846            let entry = ready
1847                .entry((peer_id, session))
1848                .or_insert(E2eeReadyState {
1849                    local_ready: false,
1850                    remote_ready: false,
1851                });
1852            entry.local_ready = true;
1853        }
1854        let response = ControlMessage::E2eeResp {
1855            session,
1856            from: self.identity.peer_id,
1857            public_key: keypair.public.to_bytes(),
1858            signature: sign_e2ee_public(&self.identity, &session.0, &keypair.public),
1859        };
1860        self.send_control_to_peer(peer_id, response, session).await?;
1861        tracing::info!(
1862            peer = %peer_id,
1863            session = ?session,
1864            key = %shared_fingerprint,
1865            "e2ee key derived (resp)"
1866        );
1867        let ready_msg = ControlMessage::EncryptedReady { session, alg: 1 };
1868        let _ = self.send_control_to_peer(peer_id, ready_msg, session).await;
1869        tracing::info!(peer = %peer_id, session = ?session, "e2ee response sent");
1870        Ok(())
1871    }
1872
1873    async fn handle_e2ee_resp(
1874        &self,
1875        peer_id: PeerId,
1876        session: SessionId,
1877        public_key: [u8; 32],
1878        signature: Vec<u8>,
1879    ) -> Result<()> {
1880        {
1881            let keys = self.e2ee_keys.lock().await;
1882            if keys.contains_key(&(peer_id, session)) {
1883                return Ok(());
1884            }
1885        }
1886        let pending = {
1887            let mut pending = self.e2ee_pending.lock().await;
1888            pending.remove(&(peer_id, session))
1889        };
1890        let Some(pending) = pending else {
1891            return Ok(());
1892        };
1893        let peer_public = {
1894            let keys = self.peer_public_keys.lock().await;
1895            keys.get(&peer_id).cloned()
1896        };
1897        let Some(peer_public) = peer_public else {
1898            return Ok(());
1899        };
1900        let Some(peer_public) = ed25519_public_from_bytes(&peer_public) else {
1901            return Ok(());
1902        };
1903        let remote_public = public_key_from_bytes(public_key);
1904        if !verify_e2ee_public(&peer_public, &session.0, &remote_public, &signature) {
1905            tracing::warn!(peer = %peer_id, "e2ee response signature invalid");
1906            return Ok(());
1907        }
1908        let shared = derive_e2ee_shared_key(&pending.secret, &remote_public, &session.0);
1909        let shared_fingerprint = hex::encode(&sha2::Sha256::digest(shared)[..4]);
1910        {
1911            let mut keys = self.e2ee_keys.lock().await;
1912            keys.insert((peer_id, session), shared);
1913        }
1914        {
1915            let mut ready = self.e2ee_ready.lock().await;
1916            let entry = ready
1917                .entry((peer_id, session))
1918                .or_insert(E2eeReadyState {
1919                    local_ready: false,
1920                    remote_ready: false,
1921                });
1922            entry.local_ready = true;
1923        }
1924        let ready_msg = ControlMessage::EncryptedReady { session, alg: 1 };
1925        let _ = self.send_control_to_peer(peer_id, ready_msg, session).await;
1926        tracing::info!(
1927            peer = %peer_id,
1928            session = ?session,
1929            key = %shared_fingerprint,
1930            "e2ee key derived (init)"
1931        );
1932        tracing::info!(peer = %peer_id, session = ?session, "e2ee key established");
1933        Ok(())
1934    }
1935
1936    async fn maybe_encrypt_payload(
1937        &self,
1938        payload: RiftPayload,
1939        header: &RiftFrameHeader,
1940        peer_id: Option<PeerId>,
1941        session: SessionId,
1942    ) -> Result<RiftPayload> {
1943        match payload {
1944            RiftPayload::Relay { target, inner } => {
1945                if should_encrypt(&inner) {
1946                    let use_group = self.should_use_group_key(session).await
1947                        && matches!(*inner, RiftPayload::Voice(_));
1948                let key = if use_group {
1949                    self.e2ee_key.ok_or_else(|| anyhow!("missing e2ee key"))?
1950                } else {
1951                    let keys = self.e2ee_keys.lock().await;
1952                    peer_id
1953                        .and_then(|peer| keys.get(&(peer, session)).copied())
1954                        .or(self.e2ee_key)
1955                        .ok_or_else(|| anyhow!("missing e2ee key"))?
1956                };
1957                if !use_group {
1958                    if let Some(peer) = peer_id {
1959                        let ready = self.e2ee_ready.lock().await;
1960                        let ready = ready
1961                            .get(&(peer, session))
1962                            .map(|state| state.local_ready && state.remote_ready)
1963                            .unwrap_or(false);
1964                        if !ready {
1965                            tracing::debug!(
1966                                target: "security",
1967                                peer = %peer,
1968                                session = ?session,
1969                                "e2ee not ready; sending plaintext"
1970                            );
1971                            return Ok(RiftPayload::Relay { target, inner });
1972                        }
1973                    }
1974                }
1975                let encrypted = RiftPayload::Encrypted(
1976                    encrypt_payload_with_key(&key, header, &inner)?
1977                );
1978                Ok(RiftPayload::Relay {
1979                    target,
1980                    inner: Box::new(encrypted),
1981                    })
1982                } else {
1983                    Ok(RiftPayload::Relay { target, inner })
1984                }
1985            }
1986            other => {
1987                if !should_encrypt(&other) {
1988                    return Ok(other);
1989                }
1990                let use_group = self.should_use_group_key(session).await
1991                    && matches!(other, RiftPayload::Voice(_));
1992                let key = if use_group {
1993                    self.e2ee_key.ok_or_else(|| anyhow!("missing e2ee key"))?
1994                } else {
1995                    let keys = self.e2ee_keys.lock().await;
1996                    peer_id
1997                        .and_then(|peer| keys.get(&(peer, session)).copied())
1998                        .or(self.e2ee_key)
1999                        .ok_or_else(|| anyhow!("missing e2ee key"))?
2000                };
2001                if !use_group {
2002                    if let Some(peer) = peer_id {
2003                        let ready = self.e2ee_ready.lock().await;
2004                        let ready = ready
2005                            .get(&(peer, session))
2006                            .map(|state| state.local_ready && state.remote_ready)
2007                            .unwrap_or(false);
2008                        if !ready {
2009                            tracing::debug!(
2010                                target: "security",
2011                                peer = %peer,
2012                                session = ?session,
2013                                "e2ee not ready; sending plaintext"
2014                            );
2015                            return Ok(other);
2016                        }
2017                    }
2018                }
2019                let encrypted = encrypt_payload_with_key(&key, header, &other)?;
2020                Ok(RiftPayload::Encrypted(encrypted))
2021            }
2022        }
2023    }
2024
2025    async fn decrypt_payload(
2026        &self,
2027        header: &RiftFrameHeader,
2028        encrypted: EncryptedPayload,
2029        peer_id: PeerId,
2030        session: SessionId,
2031    ) -> Result<RiftPayload> {
2032        let key = {
2033            let keys = self.e2ee_keys.lock().await;
2034            keys.get(&(peer_id, session)).copied()
2035        }
2036        .or(self.e2ee_key)
2037        .ok_or_else(|| anyhow!("missing e2ee key"))?;
2038        decrypt_payload_with_key(&key, header, encrypted)
2039    }
2040
2041    async fn update_link_stats(&self, peer_id: PeerId, seq: u32, sent_ms: u64) {
2042        if peer_id == self.identity.peer_id {
2043            return;
2044        }
2045        let arrival_ms = now_timestamp();
2046        let mut stats_map = self.link_stats.lock().await;
2047        let state = stats_map.entry(peer_id).or_insert_with(LinkStatsState::new);
2048        state.update_on_receive(seq, sent_ms, arrival_ms);
2049        let should_emit = state.last_emit.elapsed() >= Duration::from_millis(500);
2050        let stats = state.snapshot();
2051        if should_emit {
2052            state.last_emit = tokio::time::Instant::now();
2053        }
2054        drop(stats_map);
2055        if should_emit {
2056            let global = self.global_stats_snapshot().await;
2057            let _ = self
2058                .events_tx
2059                .send(MeshEvent::StatsUpdate {
2060                    peer: peer_id,
2061                    stats,
2062                    global,
2063                })
2064                .await;
2065            metrics::observe_histogram(
2066                "rift_rtt_ms",
2067                &[("peer", &peer_id.to_hex())],
2068                stats.rtt_ms as f64,
2069            );
2070            metrics::set_gauge(
2071                "rift_packet_loss",
2072                &[("peer", &peer_id.to_hex())],
2073                stats.loss as f64,
2074            );
2075            metrics::set_gauge(
2076                "rift_jitter_ms",
2077                &[("peer", &peer_id.to_hex())],
2078                stats.jitter_ms as f64,
2079            );
2080            self.consider_route(peer_id, stats).await;
2081        }
2082    }
2083
2084    async fn update_rtt(&self, peer_id: PeerId, sent_at_ms: u64) {
2085        let now_ms = now_timestamp();
2086        let rtt_ms = now_ms.saturating_sub(sent_at_ms) as f32;
2087        let mut stats_map = self.link_stats.lock().await;
2088        let state = stats_map.entry(peer_id).or_insert_with(LinkStatsState::new);
2089        state.update_rtt(rtt_ms);
2090        let stats = state.snapshot();
2091        drop(stats_map);
2092        let global = self.global_stats_snapshot().await;
2093        let _ = self
2094            .events_tx
2095            .send(MeshEvent::StatsUpdate {
2096                peer: peer_id,
2097                stats,
2098                global,
2099            })
2100            .await;
2101        metrics::observe_histogram(
2102            "rift_rtt_ms",
2103            &[("peer", &peer_id.to_hex())],
2104            stats.rtt_ms as f64,
2105        );
2106        self.consider_route(peer_id, stats).await;
2107    }
2108
2109    async fn allow_packet(&self, addr: SocketAddr) -> bool {
2110        let now = Instant::now();
2111        let mut limits = self.rate_limits.lock().await;
2112        let entry = limits.entry(addr).or_insert_with(|| RateLimitState::new(now));
2113        let allowed = entry.allow(now);
2114        if !allowed {
2115            if now.duration_since(entry.last_drop) >= RATE_LIMIT_WINDOW {
2116                entry.last_drop = now;
2117                tracing::warn!(target = "security", %addr, "rate limit exceeded");
2118            }
2119            metrics::inc_counter("rift_packets_dropped", &[("reason", "rate_limit")]);
2120        }
2121        allowed
2122    }
2123
2124    #[cfg(feature = "predictive-rendezvous")]
2125    async fn try_handle_pr_probe(&self, addr: SocketAddr, data: &[u8]) -> Result<bool> {
2126        let parsed = match parse_probe_payload(data) {
2127            Ok(parsed) => parsed,
2128            Err(_) => return Ok(false),
2129        };
2130
2131        let rendezvous_id = parsed.rendezvous_id;
2132        let tx = {
2133            let sessions = self.pr_sessions.lock().await;
2134            sessions.get(&rendezvous_id).map(|session| session.tx.clone())
2135        };
2136
2137        let Some(tx) = tx else {
2138            return Ok(false);
2139        };
2140
2141        if tx.send((addr, parsed)).await.is_err() {
2142            let mut sessions = self.pr_sessions.lock().await;
2143            sessions.remove(&rendezvous_id);
2144        }
2145
2146        Ok(true)
2147    }
2148
2149    #[cfg(feature = "predictive-rendezvous")]
2150    async fn register_pr_session(
2151        &self,
2152        rendezvous_id: u64,
2153        tx: mpsc::Sender<(SocketAddr, ParsedProbe)>,
2154    ) -> Result<()> {
2155        let mut sessions = self.pr_sessions.lock().await;
2156        if sessions.contains_key(&rendezvous_id) {
2157            return Err(anyhow!("predictive rendezvous session already exists"));
2158        }
2159        sessions.insert(rendezvous_id, PrSession { tx });
2160        Ok(())
2161    }
2162
2163    #[cfg(feature = "predictive-rendezvous")]
2164    async fn unregister_pr_session(&self, rendezvous_id: u64) {
2165        let mut sessions = self.pr_sessions.lock().await;
2166        sessions.remove(&rendezvous_id);
2167    }
2168
2169    #[cfg(feature = "predictive-rendezvous")]
2170    async fn primary_local_port(&self) -> Result<u16> {
2171        let sockets = self.sockets.lock().await;
2172        match sockets.first() {
2173            Some(MeshSocket::Udp(sock)) => Ok(sock.local_addr()?.port()),
2174            Some(MeshSocket::Turn(relay)) => Ok(relay.relay_addr().port()),
2175            None => Err(anyhow!("missing socket")),
2176        }
2177    }
2178
2179    async fn record_send(&self, addr: SocketAddr, bytes: usize) {
2180        let peer_id = {
2181            let connections = self.connections.lock().await;
2182            connections.get(&addr).and_then(|conn| conn.peer_id)
2183        };
2184        let mut global = self.global_traffic.lock().await;
2185        global.packets_sent = global.packets_sent.saturating_add(1);
2186        global.bytes_sent = global.bytes_sent.saturating_add(bytes as u64);
2187        drop(global);
2188        metrics::inc_counter("rift_packets_sent", &[]);
2189        metrics::add_counter("rift_bytes_sent", &[], bytes as u64);
2190        if let Some(peer_id) = peer_id {
2191            let mut peer = self.peer_traffic.lock().await;
2192            let entry = peer.entry(peer_id).or_default();
2193            entry.packets_sent = entry.packets_sent.saturating_add(1);
2194            entry.bytes_sent = entry.bytes_sent.saturating_add(bytes as u64);
2195            drop(peer);
2196            metrics::inc_counter("rift_packets_sent", &[("peer", &peer_id.to_hex())]);
2197            metrics::add_counter("rift_bytes_sent", &[("peer", &peer_id.to_hex())], bytes as u64);
2198        }
2199    }
2200
2201    async fn record_recv(&self, addr: SocketAddr, bytes: usize) {
2202        let peer_id = {
2203            let connections = self.connections.lock().await;
2204            connections.get(&addr).and_then(|conn| conn.peer_id)
2205        };
2206        let mut global = self.global_traffic.lock().await;
2207        global.packets_received = global.packets_received.saturating_add(1);
2208        global.bytes_received = global.bytes_received.saturating_add(bytes as u64);
2209        drop(global);
2210        metrics::inc_counter("rift_packets_received", &[]);
2211        metrics::add_counter("rift_bytes_received", &[], bytes as u64);
2212        if let Some(peer_id) = peer_id {
2213            let mut peer = self.peer_traffic.lock().await;
2214            let entry = peer.entry(peer_id).or_default();
2215            entry.packets_received = entry.packets_received.saturating_add(1);
2216            entry.bytes_received = entry.bytes_received.saturating_add(bytes as u64);
2217            drop(peer);
2218            metrics::inc_counter("rift_packets_received", &[("peer", &peer_id.to_hex())]);
2219            metrics::add_counter(
2220                "rift_bytes_received",
2221                &[("peer", &peer_id.to_hex())],
2222                bytes as u64,
2223            );
2224        }
2225    }
2226
2227    async fn global_stats_snapshot(&self) -> GlobalStats {
2228        let global = self.global_traffic.lock().await.clone();
2229        let peers = self.peers_by_id.lock().await.len() + 1;
2230        let sessions = self.session_mgr.lock().await.sessions.len();
2231        GlobalStats {
2232            num_peers: peers,
2233            num_sessions: sessions,
2234            packets_sent: global.packets_sent,
2235            packets_received: global.packets_received,
2236            bytes_sent: global.bytes_sent,
2237            bytes_received: global.bytes_received,
2238        }
2239    }
2240
2241    async fn emit_global_metrics(&self) {
2242        let global = self.global_stats_snapshot().await;
2243        metrics::set_gauge("rift_number_of_peers", &[], global.num_peers as f64);
2244        metrics::set_gauge("rift_number_of_sessions", &[], global.num_sessions as f64);
2245    }
2246
2247    async fn consider_route(&self, peer_id: PeerId, stats: LinkStats) {
2248        let qos = &self.qos;
2249        let prefer_relay =
2250            stats.loss > qos.packet_loss_tolerance || stats.rtt_ms > qos.max_latency_ms as f32;
2251        if prefer_relay {
2252            let relay = {
2253                let relays = self.relay_candidates.lock().await;
2254                relays.get(&peer_id).copied()
2255            };
2256            if let Some(relay) = relay {
2257                let relay_ok = {
2258                    let peers = self.peers_by_id.lock().await;
2259                    peers.contains_key(&relay)
2260                };
2261                if relay_ok {
2262                    let mut routes = self.routes.lock().await;
2263                    if !matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. })) {
2264                        routes.insert(peer_id, PeerRoute::Relayed { via: relay });
2265                        drop(routes);
2266                        let _ = self
2267                            .events_tx
2268                            .send(MeshEvent::RouteUpdated {
2269                                peer_id,
2270                                route: PeerRoute::Relayed { via: relay },
2271                            })
2272                            .await;
2273                        tracing::info!(peer = %peer_id, relay = %relay, "switching to relay route");
2274                    }
2275                }
2276            }
2277            return;
2278        }
2279
2280        let direct_addr = {
2281            let peers = self.peers_by_id.lock().await;
2282            peers.get(&peer_id).copied()
2283        };
2284        if let Some(addr) = direct_addr {
2285            let mut routes = self.routes.lock().await;
2286            if !matches!(routes.get(&peer_id), Some(PeerRoute::Direct { .. })) {
2287                routes.insert(peer_id, PeerRoute::Direct { addr });
2288                drop(routes);
2289                let _ = self
2290                    .events_tx
2291                    .send(MeshEvent::RouteUpdated {
2292                        peer_id,
2293                        route: PeerRoute::Direct { addr },
2294                    })
2295                    .await;
2296                tracing::info!(peer = %peer_id, "switching to direct route");
2297            }
2298        }
2299    }
2300
2301    async fn send_payload(
2302        &self,
2303        addr: SocketAddr,
2304        payload: RiftPayload,
2305        seq: u32,
2306        timestamp: u64,
2307        session: SessionId,
2308        peer_id: Option<PeerId>,
2309    ) -> Result<()> {
2310        self.send_payload_with_source(
2311            addr,
2312            payload,
2313            seq,
2314            timestamp,
2315            self.identity.peer_id,
2316            session,
2317            peer_id,
2318        )
2319        .await
2320    }
2321
2322    async fn send_payload_with_source(
2323        &self,
2324        addr: SocketAddr,
2325        payload: RiftPayload,
2326        seq: u32,
2327        timestamp: u64,
2328        source: PeerId,
2329        session: SessionId,
2330        peer_id: Option<PeerId>,
2331    ) -> Result<()> {
2332        let stream = stream_for_payload(&payload);
2333        let header = RiftFrameHeader {
2334            version: ProtocolVersion::V1,
2335            stream,
2336            flags: 0,
2337            seq,
2338            timestamp,
2339            source,
2340            session,
2341        };
2342        let payload = self
2343            .maybe_encrypt_payload(payload, &header, peer_id, session)
2344            .await?;
2345        let plaintext = encode_frame(&header, &payload);
2346        let (ciphertext, socket_idx) = {
2347            let mut connections = self.connections.lock().await;
2348            let Some(conn) = connections.get_mut(&addr) else {
2349                return Err(anyhow!("missing connection"));
2350            };
2351            let mut out = vec![0u8; plaintext.len() + 128];
2352            let len = conn.session.encrypt(&plaintext, &mut out)?;
2353            out.truncate(len);
2354            (out, conn.socket_idx)
2355        };
2356        self.send_raw(socket_idx, addr, &ciphertext).await?;
2357        self.record_send(addr, ciphertext.len()).await;
2358        Ok(())
2359    }
2360
2361    async fn send_frame(
2362        &self,
2363        addr: SocketAddr,
2364        header: &RiftFrameHeader,
2365        payload: &RiftPayload,
2366    ) -> Result<()> {
2367        let plaintext = encode_frame(header, payload);
2368        let (ciphertext, socket_idx) = {
2369            let mut connections = self.connections.lock().await;
2370            let Some(conn) = connections.get_mut(&addr) else {
2371                return Err(anyhow!("missing connection"));
2372            };
2373            let mut out = vec![0u8; plaintext.len() + 128];
2374            let len = conn.session.encrypt(&plaintext, &mut out)?;
2375            out.truncate(len);
2376            (out, conn.socket_idx)
2377        };
2378        self.send_raw(socket_idx, addr, &ciphertext).await?;
2379        self.record_send(addr, ciphertext.len()).await;
2380        Ok(())
2381    }
2382
2383    async fn forward_group_voice(
2384        &self,
2385        header: &RiftFrameHeader,
2386        payload: &RiftPayload,
2387    ) -> Result<()> {
2388        let session = header.session;
2389        let from = header.source;
2390        let participants = {
2391            let mgr = self.session_mgr.lock().await;
2392            mgr.participants(session)
2393        };
2394        let routes = self.routes_snapshot().await;
2395        for peer_id in participants {
2396            if peer_id == self.identity.peer_id || peer_id == from {
2397                continue;
2398            }
2399            let Some(route) = routes.get(&peer_id).cloned() else {
2400                continue;
2401            };
2402            match route {
2403                PeerRoute::Direct { addr } => {
2404                    let _ = self.send_frame(addr, header, payload).await;
2405                }
2406                PeerRoute::Relayed { via } => {
2407                    let relay_addr = {
2408                        let peers = self.peers_by_id.lock().await;
2409                        peers.get(&via).copied()
2410                    };
2411                    let Some(relay_addr) = relay_addr else {
2412                        continue;
2413                    };
2414                    let envelope = RiftPayload::Relay {
2415                        target: peer_id,
2416                        inner: Box::new(payload.clone()),
2417                    };
2418                    let _ = self.send_frame(relay_addr, header, &envelope).await;
2419                }
2420            }
2421        }
2422        Ok(())
2423    }
2424
2425    async fn handle_frame(
2426        self: Arc<Self>,
2427        addr: SocketAddr,
2428        header: RiftFrameHeader,
2429        payload: RiftPayload,
2430    ) -> Result<()> {
2431        if self.auth_required
2432            && !matches!(
2433                payload,
2434                RiftPayload::Control(ControlMessage::Auth { .. })
2435                    | RiftPayload::Control(ControlMessage::Join { .. })
2436                    | RiftPayload::Control(ControlMessage::Hello { .. })
2437                    | RiftPayload::Control(ControlMessage::IceCandidates { .. })
2438                    | RiftPayload::Control(ControlMessage::IceCheck { .. })
2439                    | RiftPayload::Control(ControlMessage::IceCheckAck { .. })
2440                    | RiftPayload::Control(ControlMessage::E2eeInit { .. })
2441                    | RiftPayload::Control(ControlMessage::E2eeResp { .. })
2442                    | RiftPayload::Control(ControlMessage::EncryptedReady { .. })
2443                    | RiftPayload::Control(ControlMessage::PeerState { .. })
2444            )
2445        {
2446            let authenticated = {
2447                let connections = self.connections.lock().await;
2448                connections
2449                    .get(&addr)
2450                    .map(|conn| conn.authenticated)
2451                    .unwrap_or(false)
2452            };
2453            if !authenticated {
2454                tracing::warn!(%addr, "unauthenticated peer message rejected");
2455                return Ok(());
2456            }
2457        }
2458        match payload {
2459            RiftPayload::Control(ControlMessage::Join { peer_id, .. }) => {
2460                let mut connections = self.connections.lock().await;
2461                if let Some(conn) = connections.get_mut(&addr) {
2462                    conn.peer_id = Some(peer_id);
2463                }
2464                drop(connections);
2465
2466                let mut peers = self.peers_by_id.lock().await;
2467                peers.insert(peer_id, addr);
2468                drop(peers);
2469
2470                let mut peer_addrs = self.peer_addrs.lock().await;
2471                peer_addrs.insert(peer_id, addr);
2472                drop(peer_addrs);
2473                let mut peer_candidates = self.peer_candidates.lock().await;
2474                peer_candidates
2475                    .entry(peer_id)
2476                    .or_insert_with(|| vec![addr]);
2477                drop(peer_candidates);
2478
2479                let mut routes = self.routes.lock().await;
2480                let upgraded = matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. }));
2481                routes.insert(peer_id, PeerRoute::Direct { addr });
2482                drop(routes);
2483
2484                let mut sessions = self.session_mgr.lock().await;
2485                sessions.add_participant(self.channel_session, peer_id);
2486                drop(sessions);
2487                let _ = self.update_group_topology(self.channel_session).await;
2488
2489                let mut caps = self.peer_capabilities.lock().await;
2490                caps.entry(peer_id).or_insert_with(default_peer_capabilities);
2491                drop(caps);
2492                self.update_group_codec().await;
2493
2494                let _ = self.events_tx.send(MeshEvent::PeerJoined(peer_id)).await;
2495                let _ = self
2496                    .events_tx
2497                    .send(MeshEvent::RouteUpdated {
2498                        peer_id,
2499                        route: PeerRoute::Direct { addr },
2500                    })
2501                    .await;
2502                self.emit_global_metrics().await;
2503                if upgraded {
2504                    let _ = self.events_tx.send(MeshEvent::RouteUpgraded(peer_id)).await;
2505                    tracing::info!(peer = %peer_id, "route upgraded to direct");
2506                }
2507
2508                self.send_peer_list(addr).await?;
2509                // Broadcast updated peer list to all existing peers so they know about the new peer
2510                let all_peers: Vec<SocketAddr> = {
2511                    let peers_by_id = self.peers_by_id.lock().await;
2512                    peers_by_id.values().copied().collect()
2513                };
2514                for peer_addr in all_peers {
2515                    if peer_addr != addr {
2516                        let _ = self.send_peer_list(peer_addr).await;
2517                    }
2518                }
2519            }
2520            RiftPayload::Control(ControlMessage::PeerState {
2521                peer_id,
2522                relay_capable,
2523            }) => {
2524                let mut peer_caps = self.peer_caps.lock().await;
2525                peer_caps.insert(peer_id, relay_capable);
2526                drop(peer_caps);
2527            }
2528            RiftPayload::Control(ControlMessage::Hello { peer_id, public_key, capabilities, candidates }) => {
2529                {
2530                    let mut keys = self.peer_public_keys.lock().await;
2531                    keys.insert(peer_id, public_key.clone());
2532                }
2533                if !candidates.is_empty() {
2534                    let mut peer_candidates = self.peer_candidates.lock().await;
2535                    peer_candidates.insert(peer_id, candidates);
2536                }
2537                let _ = self
2538                    .events_tx
2539                    .send(MeshEvent::PeerIdentity {
2540                        peer_id,
2541                        public_key: public_key.clone(),
2542                    })
2543                    .await;
2544                self.handle_capabilities(peer_id, capabilities).await?;
2545                let authenticated = {
2546                    let connections = self.connections.lock().await;
2547                    connections
2548                        .get(&addr)
2549                        .map(|conn| conn.authenticated)
2550                        .unwrap_or(false)
2551                };
2552                if !self.auth_required || authenticated {
2553                    let _ = self.start_e2ee(peer_id, self.channel_session).await;
2554                }
2555                let _ = self.send_ice_candidates(peer_id, SessionId::NONE).await;
2556            }
2557            RiftPayload::Control(ControlMessage::IceCandidates { peer_id, candidates, .. }) => {
2558                if !candidates.is_empty() {
2559                    let mut peer_ice = self.peer_ice_candidates.lock().await;
2560                    peer_ice.insert(peer_id, candidates.clone());
2561                    drop(peer_ice);
2562                    let mut peer_candidates = self.peer_candidates.lock().await;
2563                    let addrs: Vec<SocketAddr> = candidates.into_iter().map(|cand| cand.addr).collect();
2564                    peer_candidates.insert(peer_id, addrs);
2565                }
2566            }
2567            RiftPayload::Control(ControlMessage::IceCheck { session, candidate, .. }) => {
2568                let ack = ControlMessage::IceCheckAck { session, candidate };
2569                let from = header.source;
2570                let _ = self.send_control_to_peer(from, ack, session).await;
2571            }
2572            RiftPayload::Control(ControlMessage::IceCheckAck { candidate, .. }) => {
2573                let peer_id = header.source;
2574                let addr = candidate.addr;
2575                let mut peer_addrs = self.peer_addrs.lock().await;
2576                peer_addrs.insert(peer_id, addr);
2577                drop(peer_addrs);
2578
2579                let mut routes = self.routes.lock().await;
2580                let upgraded = matches!(routes.get(&peer_id), Some(PeerRoute::Relayed { .. }));
2581                if !matches!(routes.get(&peer_id), Some(PeerRoute::Direct { .. })) {
2582                    routes.insert(peer_id, PeerRoute::Direct { addr });
2583                }
2584                drop(routes);
2585
2586                let _ = self
2587                    .events_tx
2588                    .send(MeshEvent::RouteUpdated {
2589                        peer_id,
2590                        route: PeerRoute::Direct { addr },
2591                    })
2592                    .await;
2593                if upgraded {
2594                    let _ = self.events_tx.send(MeshEvent::RouteUpgraded(peer_id)).await;
2595                }
2596            }
2597            RiftPayload::Control(ControlMessage::KeyInit { .. })
2598            | RiftPayload::Control(ControlMessage::KeyResp { .. }) => {
2599                tracing::debug!(target: "security", "legacy e2ee control message ignored");
2600            }
2601            RiftPayload::Control(ControlMessage::CapabilitiesUpdate(capabilities)) => {
2602                let peer_id = header.source;
2603                self.handle_capabilities(peer_id, capabilities).await?;
2604            }
2605            RiftPayload::Control(ControlMessage::Leave { peer_id }) => {
2606                let mut peers = self.peers_by_id.lock().await;
2607                peers.remove(&peer_id);
2608                drop(peers);
2609                let mut peer_caps = self.peer_caps.lock().await;
2610                peer_caps.remove(&peer_id);
2611                drop(peer_caps);
2612                let mut peer_keys = self.peer_public_keys.lock().await;
2613                peer_keys.remove(&peer_id);
2614                drop(peer_keys);
2615                let mut routes = self.routes.lock().await;
2616                routes.remove(&peer_id);
2617                let removed: Vec<PeerId> = routes
2618                    .iter()
2619                    .filter_map(|(pid, route)| match route {
2620                        PeerRoute::Relayed { via } if *via == peer_id => Some(*pid),
2621                        _ => None,
2622                    })
2623                    .collect();
2624                for pid in removed {
2625                    routes.remove(&pid);
2626                }
2627                drop(routes);
2628                let mut peer_addrs = self.peer_addrs.lock().await;
2629                peer_addrs.remove(&peer_id);
2630                drop(peer_addrs);
2631                let mut peer_candidates = self.peer_candidates.lock().await;
2632                peer_candidates.remove(&peer_id);
2633                drop(peer_candidates);
2634                let mut peer_ice = self.peer_ice_candidates.lock().await;
2635                peer_ice.remove(&peer_id);
2636                drop(peer_ice);
2637                let mut relay_candidates = self.relay_candidates.lock().await;
2638                relay_candidates.remove(&peer_id);
2639                drop(relay_candidates);
2640                let mut e2ee_keys = self.e2ee_keys.lock().await;
2641                e2ee_keys.retain(|(peer, _), _| *peer != peer_id);
2642                drop(e2ee_keys);
2643                let mut e2ee_pending = self.e2ee_pending.lock().await;
2644                e2ee_pending.retain(|(peer, _), _| *peer != peer_id);
2645                drop(e2ee_pending);
2646                let mut e2ee_ready = self.e2ee_ready.lock().await;
2647                e2ee_ready.retain(|(peer, _), _| *peer != peer_id);
2648                drop(e2ee_ready);
2649                let mut stats = self.link_stats.lock().await;
2650                stats.remove(&peer_id);
2651                drop(stats);
2652                let mut sessions = self.session_mgr.lock().await;
2653                sessions.remove_participant_all(peer_id);
2654                drop(sessions);
2655                let _ = self.update_group_topology(self.channel_session).await;
2656                let active = *self.active_session.lock().await;
2657                if active != SessionId::NONE && active != self.channel_session {
2658                    let _ = self.update_group_topology(active).await;
2659                }
2660                let _ = self.events_tx.send(MeshEvent::PeerLeft(peer_id)).await;
2661                self.emit_global_metrics().await;
2662            }
2663            RiftPayload::Control(ControlMessage::Chat(chat)) => {
2664                let mut cache = self.cache.lock().await;
2665                if cache.contains(&chat.id) {
2666                    return Ok(());
2667                }
2668                cache.insert(chat.id);
2669                drop(cache);
2670
2671                let _ = self
2672                    .events_tx
2673                    .send(MeshEvent::ChatReceived(chat.clone()))
2674                    .await;
2675            }
2676            RiftPayload::Control(ControlMessage::Ping { nonce, sent_at_ms }) => {
2677                let from = header.source;
2678                let pong = ControlMessage::Pong { nonce, sent_at_ms };
2679                let _ = self.send_control_to_peer(from, pong, SessionId::NONE).await;
2680            }
2681            RiftPayload::Control(ControlMessage::Pong { sent_at_ms, .. }) => {
2682                let from = header.source;
2683                self.update_rtt(from, sent_at_ms).await;
2684            }
2685            RiftPayload::Control(ControlMessage::Auth { token }) => {
2686                let expected = self.auth_token.clone().unwrap_or_default();
2687                // Use constant-time comparison to prevent timing attacks
2688                let token_valid = if token.len() != expected.len() {
2689                    false
2690                } else {
2691                    token.ct_eq(&expected).into()
2692                };
2693                if self.auth_required && !token_valid {
2694                    tracing::warn!(%addr, "auth token mismatch");
2695                    self.disconnect_addr(addr).await;
2696                    return Ok(());
2697                }
2698                let mut connections = self.connections.lock().await;
2699                if let Some(conn) = connections.get_mut(&addr) {
2700                    conn.authenticated = true;
2701                }
2702            }
2703            RiftPayload::Control(ControlMessage::PeerList { peers }) => {
2704                self.handle_peer_list(addr, peers).await?;
2705            }
2706            RiftPayload::Control(ControlMessage::Group(group)) => {
2707                match group {
2708                    GroupControl::Join { session, peer_id, .. } => {
2709                        let mut sessions = self.session_mgr.lock().await;
2710                        sessions.add_participant(session, peer_id);
2711                        drop(sessions);
2712                        let _ = self.update_group_topology(session).await;
2713                    }
2714                    GroupControl::Leave { session, peer_id } => {
2715                        let mut sessions = self.session_mgr.lock().await;
2716                        sessions.remove_participant(session, peer_id);
2717                        drop(sessions);
2718                        let _ = self.update_group_topology(session).await;
2719                    }
2720                    GroupControl::Topology { session, mode } => {
2721                        self.set_group_mode(session, mode).await;
2722                        let _ = self
2723                            .events_tx
2724                            .send(MeshEvent::GroupTopology { session, mode })
2725                            .await;
2726                    }
2727                    GroupControl::StreamPublish { .. } | GroupControl::StreamSubscribe { .. } => {}
2728                }
2729            }
2730            RiftPayload::Control(ControlMessage::Call(call)) => {
2731                self.handle_call(call).await?;
2732            }
2733            RiftPayload::Control(ControlMessage::E2eeInit {
2734                session,
2735                from,
2736                public_key,
2737                signature,
2738            }) => {
2739                self.handle_e2ee_init(from, session, public_key, signature).await?;
2740            }
2741            RiftPayload::Control(ControlMessage::E2eeResp {
2742                session,
2743                from,
2744                public_key,
2745                signature,
2746            }) => {
2747                self.handle_e2ee_resp(from, session, public_key, signature).await?;
2748            }
2749            RiftPayload::Control(ControlMessage::EncryptedReady { session, .. }) => {
2750                let peer_id = header.source;
2751                let mut ready = self.e2ee_ready.lock().await;
2752                let entry = ready
2753                    .entry((peer_id, session))
2754                    .or_insert(E2eeReadyState {
2755                        local_ready: false,
2756                        remote_ready: false,
2757                    });
2758                entry.remote_ready = true;
2759            }
2760            RiftPayload::Voice(VoicePacket { codec_id, payload }) => {
2761                let from = header.source;
2762                let seq = header.seq;
2763                let timestamp = header.timestamp;
2764                let session = header.session;
2765                let codec = codec_id;
2766                let active_session = *self.active_session.lock().await;
2767                if active_session != SessionId::NONE && session != active_session {
2768                    return Ok(());
2769                }
2770                let mut seqs = self.voice_seq.lock().await;
2771                if let Some(last) = seqs.get(&from) {
2772                    if seq <= *last {
2773                        return Ok(());
2774                    }
2775                }
2776                seqs.insert(from, seq);
2777                drop(seqs);
2778                if self.should_forward_voice(session, from).await {
2779                    let payload = RiftPayload::Voice(VoicePacket {
2780                        codec_id: codec,
2781                        payload: payload.clone(),
2782                    });
2783                    let _ = self.forward_group_voice(&header, &payload).await;
2784                }
2785
2786                let _ = self
2787                    .events_tx
2788                    .send(MeshEvent::VoiceFrame {
2789                        from,
2790                        seq,
2791                        timestamp,
2792                        session,
2793                        codec,
2794                        payload: payload.clone(),
2795                    })
2796                    .await;
2797            }
2798            RiftPayload::Encrypted(encrypted) => {
2799                let from = header.source;
2800                let session = header.session;
2801                if header.stream == StreamKind::Voice
2802                    && self.should_forward_voice(session, from).await
2803                {
2804                    let payload = RiftPayload::Encrypted(encrypted.clone());
2805                    let _ = self.forward_group_voice(&header, &payload).await;
2806                }
2807                if let Ok(inner) = self.decrypt_payload(&header, encrypted, from, session).await {
2808                    Box::pin(self.handle_frame(addr, header, inner)).await?;
2809                } else {
2810                    let has_key = {
2811                        let keys = self.e2ee_keys.lock().await;
2812                        keys.contains_key(&(from, session))
2813                    };
2814                    let key_fingerprint = {
2815                        let keys = self.e2ee_keys.lock().await;
2816                        keys.get(&(from, session))
2817                            .map(|key| hex::encode(&sha2::Sha256::digest(key)[..4]))
2818                    };
2819                    let ready = {
2820                        let ready = self.e2ee_ready.lock().await;
2821                        ready
2822                            .get(&(from, session))
2823                            .map(|state| state.local_ready && state.remote_ready)
2824                            .unwrap_or(false)
2825                    };
2826                    tracing::warn!(
2827                        %addr,
2828                        peer = %from,
2829                        session = ?session,
2830                        has_key,
2831                        key = key_fingerprint.as_deref().unwrap_or("none"),
2832                        ready,
2833                        "e2ee decrypt failed"
2834                    );
2835                }
2836            }
2837            RiftPayload::Relay { target, inner } => {
2838                if target == self.identity.peer_id {
2839                    Box::pin(self.handle_frame(addr, header, *inner)).await?;
2840                } else {
2841                    self.forward_relay(target, header, *inner).await?;
2842                }
2843            }
2844            RiftPayload::Text(chat) => {
2845                let mut cache = self.cache.lock().await;
2846                if cache.contains(&chat.id) {
2847                    return Ok(());
2848                }
2849                cache.insert(chat.id);
2850                drop(cache);
2851                let _ = self
2852                    .events_tx
2853                    .send(MeshEvent::ChatReceived(chat.clone()))
2854                    .await;
2855            }
2856            RiftPayload::Control(ControlMessage::RouteInfo { .. })
2857            | RiftPayload::Control(ControlMessage::Capabilities(_)) => {}
2858        }
2859        Ok(())
2860    }
2861
2862    async fn handle_peer_list(self: Arc<Self>, addr: SocketAddr, peers: Vec<PeerInfo>) -> Result<()> {
2863        // Always register peers as session participants for topology tracking
2864        for peer in &peers {
2865            if peer.peer_id == self.identity.peer_id {
2866                continue;
2867            }
2868            let mut sessions = self.session_mgr.lock().await;
2869            sessions.add_participant(self.channel_session, peer.peer_id);
2870        }
2871        // Trigger topology update after learning about new participants
2872        let _ = self.update_group_topology(self.channel_session).await;
2873
2874        let nat_cfg = { self.nat_cfg.lock().await.clone() };
2875        let Some(nat_cfg) = nat_cfg else {
2876            return Ok(());
2877        };
2878
2879        let relay_peer_id = {
2880            let connections = self.connections.lock().await;
2881            connections.get(&addr).and_then(|conn| conn.peer_id)
2882        };
2883        let relay_capable = if let Some(peer_id) = relay_peer_id {
2884            let peer_caps = self.peer_caps.lock().await;
2885            peer_caps.get(&peer_id).copied().unwrap_or(false)
2886        } else {
2887            false
2888        };
2889
2890        for peer in peers {
2891            if peer.peer_id == self.identity.peer_id {
2892                continue;
2893            }
2894            let already = {
2895                let peers_by_id = self.peers_by_id.lock().await;
2896                peers_by_id.contains_key(&peer.peer_id)
2897            };
2898            let mut peer_caps = self.peer_caps.lock().await;
2899            peer_caps.insert(peer.peer_id, peer.relay_capable);
2900            drop(peer_caps);
2901            let mut addrs = if peer.addrs.is_empty() {
2902                vec![peer.addr]
2903            } else {
2904                peer.addrs.clone()
2905            };
2906            addrs.sort();
2907            addrs.dedup();
2908            let primary_addr = addrs[0];
2909            let mut peer_addrs = self.peer_addrs.lock().await;
2910            peer_addrs.insert(peer.peer_id, primary_addr);
2911            drop(peer_addrs);
2912            let mut peer_candidates = self.peer_candidates.lock().await;
2913            peer_candidates.insert(peer.peer_id, addrs.clone());
2914            drop(peer_candidates);
2915
2916            if already {
2917                continue;
2918            }
2919            let already = {
2920                let connections = self.connections.lock().await;
2921                connections.contains_key(&primary_addr)
2922            };
2923            if already {
2924                continue;
2925            }
2926            let endpoint = PeerEndpoint {
2927                peer_id: PeerId([0u8; 32]),
2928                external_addrs: addrs.clone(),
2929                punch_ports: addrs.iter().map(|addr| addr.port()).collect(),
2930            };
2931            if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
2932                if let Ok(socket_idx) = self.add_socket(socket).await {
2933                    if let Err(err) = self.initiate_handshake(remote, socket_idx).await {
2934                        tracing::warn!("handshake to {} failed: {err}", remote);
2935                    }
2936                }
2937            } else if relay_capable {
2938                if let Some(relay_peer_id) = relay_peer_id {
2939                    let mut relay_candidates = self.relay_candidates.lock().await;
2940                    relay_candidates.insert(peer.peer_id, relay_peer_id);
2941                    drop(relay_candidates);
2942                    let mut routes = self.routes.lock().await;
2943                    if !matches!(routes.get(&peer.peer_id), Some(PeerRoute::Direct { .. })) {
2944                        routes.insert(peer.peer_id, PeerRoute::Relayed { via: relay_peer_id });
2945                        drop(routes);
2946                        let _ = self
2947                            .events_tx
2948                            .send(MeshEvent::RouteUpdated {
2949                                peer_id: peer.peer_id,
2950                                route: PeerRoute::Relayed { via: relay_peer_id },
2951                            })
2952                            .await;
2953                        tracing::info!(
2954                            peer = %peer.peer_id,
2955                            relay = %relay_peer_id,
2956                            "relay route established"
2957                        );
2958                    }
2959                }
2960            }
2961        }
2962
2963        Ok(())
2964    }
2965
2966    async fn start_call(&self, to: PeerId, rndzv_srt_uri: Option<String>) -> Result<SessionId> {
2967        let session = SessionId::random();
2968        tracing::info!(to = %to, session = ?session, "call start");
2969        {
2970            let mut sessions = self.session_mgr.lock().await;
2971            sessions.set_state(session, CallState::Ringing);
2972            sessions.add_participant(session, self.identity.peer_id);
2973            sessions.add_participant(session, to);
2974        }
2975        self.emit_global_metrics().await;
2976        let call = CallControl::Invite {
2977            session,
2978            from: self.identity.peer_id,
2979            to,
2980            display_name: None,
2981            rndzv_srt_uri,
2982        };
2983        self.send_call_to_peer(to, call, session).await?;
2984        Ok(session)
2985    }
2986
2987    async fn accept_call(&self, session: SessionId) -> Result<()> {
2988        tracing::info!(session = ?session, "call accept");
2989        {
2990            let mut sessions = self.session_mgr.lock().await;
2991            sessions.set_state(session, CallState::Active);
2992            sessions.add_participant(session, self.identity.peer_id);
2993        }
2994        self.emit_global_metrics().await;
2995        let participants = {
2996            let sessions = self.session_mgr.lock().await;
2997            sessions.participants(session)
2998        };
2999        for peer_id in participants {
3000            if peer_id == self.identity.peer_id {
3001                continue;
3002            }
3003            let _ = self.start_e2ee(peer_id, session).await;
3004            let call = CallControl::Accept {
3005                session,
3006                from: self.identity.peer_id,
3007            };
3008            let _ = self.send_call_to_peer(peer_id, call, session).await;
3009        }
3010        let mut active = self.active_session.lock().await;
3011        *active = session;
3012        Ok(())
3013    }
3014
3015    async fn decline_call(&self, session: SessionId, reason: Option<String>) -> Result<()> {
3016        tracing::info!(session = ?session, ?reason, "call decline");
3017        {
3018            let mut sessions = self.session_mgr.lock().await;
3019            sessions.set_state(session, CallState::Ended);
3020        }
3021        self.emit_global_metrics().await;
3022        let participants = {
3023            let sessions = self.session_mgr.lock().await;
3024            sessions.participants(session)
3025        };
3026        for peer_id in participants {
3027            if peer_id == self.identity.peer_id {
3028                continue;
3029            }
3030            let call = CallControl::Decline {
3031                session,
3032                from: self.identity.peer_id,
3033                reason: reason.clone(),
3034            };
3035            let _ = self.send_call_to_peer(peer_id, call, session).await;
3036        }
3037        let mut active = self.active_session.lock().await;
3038        if *active == session {
3039            *active = self.channel_session;
3040        }
3041        Ok(())
3042    }
3043
3044    async fn end_call(&self, session: SessionId) -> Result<()> {
3045        tracing::info!(session = ?session, "call end");
3046        {
3047            let mut sessions = self.session_mgr.lock().await;
3048            sessions.set_state(session, CallState::Ended);
3049        }
3050        self.emit_global_metrics().await;
3051        let participants = {
3052            let sessions = self.session_mgr.lock().await;
3053            sessions.participants(session)
3054        };
3055        for peer_id in participants {
3056            if peer_id == self.identity.peer_id {
3057                continue;
3058            }
3059            let call = CallControl::Bye {
3060                session,
3061                from: self.identity.peer_id,
3062            };
3063            let _ = self.send_call_to_peer(peer_id, call, session).await;
3064        }
3065        let mut active = self.active_session.lock().await;
3066        if *active == session {
3067            *active = self.channel_session;
3068        }
3069        Ok(())
3070    }
3071
3072    async fn send_call_to_peer(
3073        &self,
3074        peer_id: PeerId,
3075        call: CallControl,
3076        session: SessionId,
3077    ) -> Result<()> {
3078        let payload = RiftPayload::Control(ControlMessage::Call(call));
3079        let routes = self.routes_snapshot().await;
3080        let Some(route) = routes.get(&peer_id).cloned() else {
3081            return Err(anyhow!("missing route"));
3082        };
3083        let seq = self.next_control_seq().await;
3084        self.send_to_peer(peer_id, route, payload, seq, now_timestamp(), session)
3085            .await
3086    }
3087
3088    async fn handle_call(&self, call: CallControl) -> Result<()> {
3089        match call {
3090            CallControl::Invite {
3091                session,
3092                from,
3093                to,
3094                rndzv_srt_uri,
3095                ..
3096            } => {
3097                if to != self.identity.peer_id {
3098                    return Ok(());
3099                }
3100                {
3101                    let mut sessions = self.session_mgr.lock().await;
3102                    sessions.set_state(session, CallState::Ringing);
3103                    sessions.add_participant(session, from);
3104                    sessions.add_participant(session, to);
3105                }
3106                let _ = self.update_group_topology(session).await;
3107                let _ = self
3108                    .events_tx
3109                    .send(MeshEvent::IncomingCall {
3110                        session,
3111                        from,
3112                        rndzv_srt_uri,
3113                    })
3114                    .await;
3115            }
3116            CallControl::Accept { session, from } => {
3117                {
3118                    let mut sessions = self.session_mgr.lock().await;
3119                    sessions.set_state(session, CallState::Active);
3120                    sessions.add_participant(session, from);
3121                    sessions.add_participant(session, self.identity.peer_id);
3122                }
3123                let _ = self.update_group_topology(session).await;
3124                let mut active = self.active_session.lock().await;
3125                *active = session;
3126                let _ = self.start_e2ee(from, session).await;
3127                let _ = self
3128                    .events_tx
3129                    .send(MeshEvent::CallAccepted { session, from })
3130                    .await;
3131            }
3132            CallControl::Decline { session, from, reason } => {
3133                {
3134                    let mut sessions = self.session_mgr.lock().await;
3135                    sessions.set_state(session, CallState::Ended);
3136                }
3137                let mut active = self.active_session.lock().await;
3138                if *active == session {
3139                    *active = self.channel_session;
3140                }
3141                let _ = self
3142                    .events_tx
3143                    .send(MeshEvent::CallDeclined { session, from, reason })
3144                    .await;
3145            }
3146            CallControl::Bye { session, .. } => {
3147                {
3148                    let mut sessions = self.session_mgr.lock().await;
3149                    sessions.set_state(session, CallState::Ended);
3150                }
3151                let mut active = self.active_session.lock().await;
3152                if *active == session {
3153                    *active = self.channel_session;
3154                }
3155                let _ = self
3156                    .events_tx
3157                    .send(MeshEvent::CallEnded { session })
3158                    .await;
3159            }
3160            CallControl::Mute { .. } => {}
3161            CallControl::SessionInfo { session, participants } => {
3162                let mut sessions = self.session_mgr.lock().await;
3163                for peer in participants {
3164                    sessions.add_participant(session, peer);
3165                }
3166                drop(sessions);
3167                let _ = self.update_group_topology(session).await;
3168            }
3169        }
3170        Ok(())
3171    }
3172
3173    async fn disconnect_addr(&self, addr: SocketAddr) {
3174        let peer_id = {
3175            let mut connections = self.connections.lock().await;
3176            connections.remove(&addr).and_then(|conn| conn.peer_id)
3177        };
3178        let Some(peer_id) = peer_id else {
3179            return;
3180        };
3181        let mut peers = self.peers_by_id.lock().await;
3182        peers.remove(&peer_id);
3183        drop(peers);
3184        let mut routes = self.routes.lock().await;
3185        routes.remove(&peer_id);
3186        drop(routes);
3187        let mut peer_addrs = self.peer_addrs.lock().await;
3188        peer_addrs.remove(&peer_id);
3189        drop(peer_addrs);
3190        let mut peer_candidates = self.peer_candidates.lock().await;
3191        peer_candidates.remove(&peer_id);
3192        drop(peer_candidates);
3193        let mut e2ee_keys = self.e2ee_keys.lock().await;
3194        e2ee_keys.retain(|(peer, _), _| *peer != peer_id);
3195        drop(e2ee_keys);
3196        let mut pending = self.e2ee_pending.lock().await;
3197        pending.retain(|(peer, _), _| *peer != peer_id);
3198        drop(pending);
3199        let _ = self.events_tx.send(MeshEvent::PeerLeft(peer_id)).await;
3200    }
3201
3202    async fn socket_by_idx(&self, socket_idx: usize) -> Result<MeshSocket> {
3203        let sockets = self.sockets.lock().await;
3204        sockets
3205            .get(socket_idx)
3206            .cloned()
3207            .ok_or_else(|| anyhow!("missing socket"))
3208    }
3209
3210    async fn send_raw(&self, socket_idx: usize, addr: SocketAddr, data: &[u8]) -> Result<()> {
3211        let socket = self.socket_by_idx(socket_idx).await?;
3212        match socket {
3213            MeshSocket::Udp(socket) => {
3214                socket.send_to(data, addr).await?;
3215                Ok(())
3216            }
3217            MeshSocket::Turn(relay) => {
3218                relay.send_to(addr, data).await?;
3219                Ok(())
3220            }
3221        }
3222    }
3223
3224    async fn broadcast_voice(
3225        &self,
3226        _from: PeerId,
3227        seq: u32,
3228        timestamp: u64,
3229        payload: Vec<u8>,
3230    ) -> Result<()> {
3231        let codec = *self.group_codec.lock().await;
3232        let msg = RiftPayload::Voice(VoicePacket { codec_id: codec, payload });
3233        let session = *self.active_session.lock().await;
3234        let mode = self.current_group_mode(session).await;
3235        if let GroupMode::Hybrid { forwarder } = mode {
3236            if forwarder != self.identity.peer_id {
3237                let routes = self.routes_snapshot().await;
3238                if let Some(route) = routes.get(&forwarder).cloned() {
3239                    let _ = self
3240                        .send_to_peer(forwarder, route, msg, seq, timestamp, session)
3241                        .await;
3242                }
3243                return Ok(());
3244            }
3245        }
3246
3247        let routes = self.routes_snapshot().await;
3248        for (peer_id, route) in routes {
3249            if peer_id == self.identity.peer_id {
3250                continue;
3251            }
3252            if let Err(err) = self
3253                .send_to_peer(peer_id, route, msg.clone(), seq, timestamp, session)
3254                .await
3255            {
3256                tracing::debug!(peer = %peer_id, "voice send failed: {err}");
3257            }
3258        }
3259        Ok(())
3260    }
3261
3262    async fn routes_snapshot(&self) -> HashMap<PeerId, PeerRoute> {
3263        self.routes.lock().await.clone()
3264    }
3265
3266    async fn send_to_peer(
3267        &self,
3268        peer_id: PeerId,
3269        route: PeerRoute,
3270        payload: RiftPayload,
3271        seq: u32,
3272        timestamp: u64,
3273        session: SessionId,
3274    ) -> Result<()> {
3275        match route {
3276            PeerRoute::Direct { addr } => self
3277                .send_payload(addr, payload, seq, timestamp, session, Some(peer_id))
3278                .await,
3279            PeerRoute::Relayed { via } => {
3280                let relay_addr = {
3281                    let peers = self.peers_by_id.lock().await;
3282                    peers.get(&via).copied()
3283                };
3284                let Some(relay_addr) = relay_addr else {
3285                    return Err(anyhow!("missing relay addr"));
3286                };
3287                let envelope = RiftPayload::Relay {
3288                    target: peer_id,
3289                    inner: Box::new(payload),
3290                };
3291                self.send_payload(relay_addr, envelope, seq, timestamp, session, Some(peer_id))
3292                    .await
3293            }
3294        }
3295    }
3296
3297    async fn forward_relay(
3298        &self,
3299        target: PeerId,
3300        header: RiftFrameHeader,
3301        inner: RiftPayload,
3302    ) -> Result<()> {
3303        let route = {
3304            let routes = self.routes.lock().await;
3305            routes.get(&target).cloned()
3306        };
3307        let Some(route) = route else {
3308            return Ok(());
3309        };
3310        match route {
3311            PeerRoute::Direct { addr } => {
3312                let envelope = RiftPayload::Relay {
3313                    target,
3314                    inner: Box::new(inner),
3315                };
3316                self.send_payload_with_source(
3317                    addr,
3318                    envelope,
3319                    header.seq,
3320                    header.timestamp,
3321                    header.source,
3322                    header.session,
3323                    Some(target),
3324                )
3325                .await?;
3326            }
3327            PeerRoute::Relayed { via } => {
3328                let relay_addr = {
3329                    let peers = self.peers_by_id.lock().await;
3330                    peers.get(&via).copied()
3331                };
3332                if let Some(relay_addr) = relay_addr {
3333                    let envelope = RiftPayload::Relay {
3334                        target,
3335                        inner: Box::new(inner),
3336                    };
3337                    self.send_payload_with_source(
3338                        relay_addr,
3339                        envelope,
3340                        header.seq,
3341                        header.timestamp,
3342                        header.source,
3343                        header.session,
3344                        Some(target),
3345                    )
3346                    .await?;
3347                }
3348            }
3349        }
3350        Ok(())
3351    }
3352}
3353
3354impl MeshHandle {
3355    /// Broadcast a voice packet to all peers.
3356    pub async fn broadcast_voice(&self, seq: u32, timestamp: u64, payload: Vec<u8>) -> Result<()> {
3357        self.inner
3358            .broadcast_voice(self.inner.identity.peer_id, seq, timestamp, payload)
3359            .await
3360    }
3361
3362    /// Broadcast a chat message to all peers.
3363    pub async fn broadcast_chat(&self, text: String) -> Result<()> {
3364        self.inner.broadcast_chat(text).await
3365    }
3366
3367    /// Start a call with a specific peer.
3368    pub async fn start_call(&self, to: PeerId) -> Result<SessionId> {
3369        self.inner.start_call(to, None).await
3370    }
3371
3372    /// Start a call with an optional rndzv SRT URI attached.
3373    pub async fn start_call_with_srt(
3374        &self,
3375        to: PeerId,
3376        rndzv_srt_uri: Option<String>,
3377    ) -> Result<SessionId> {
3378        self.inner.start_call(to, rndzv_srt_uri).await
3379    }
3380
3381    /// Accept a pending call.
3382    pub async fn accept_call(&self, session: SessionId) -> Result<()> {
3383        self.inner.accept_call(session).await
3384    }
3385
3386    /// Decline a pending call with an optional reason.
3387    pub async fn decline_call(&self, session: SessionId, reason: Option<String>) -> Result<()> {
3388        self.inner.decline_call(session, reason).await
3389    }
3390
3391    /// End an active call.
3392    pub async fn end_call(&self, session: SessionId) -> Result<()> {
3393        self.inner.end_call(session).await
3394    }
3395
3396    /// Update preferred codecs for capability negotiation.
3397    pub async fn set_preferred_codecs(&self, codecs: Vec<CodecId>) {
3398        self.inner.set_preferred_codecs(codecs).await;
3399    }
3400
3401    /// Update preferred feature flags for capability negotiation.
3402    pub async fn set_preferred_features(&self, features: Vec<FeatureFlag>) {
3403        self.inner.set_preferred_features(features).await;
3404    }
3405
3406    /// Retrieve the current session config for a peer.
3407    pub async fn peer_session_config(&self, peer_id: PeerId) -> Option<SessionConfig> {
3408        let sessions = self.inner.peer_session.lock().await;
3409        sessions.get(&peer_id).cloned()
3410    }
3411
3412    /// Return the current group codec.
3413    pub async fn group_codec(&self) -> CodecId {
3414        *self.inner.group_codec.lock().await
3415    }
3416
3417    /// Initiate a connection to a socket address.
3418    pub async fn connect_addr(&self, addr: SocketAddr) -> Result<()> {
3419        self.inner.initiate_handshake(addr, 0).await
3420    }
3421
3422    /// Initiate a connection using a pre-bound socket.
3423    pub async fn connect_with_socket(&self, socket: UdpSocket, addr: SocketAddr) -> Result<()> {
3424        let socket_idx = self.inner.add_socket(socket).await?;
3425        self.inner.initiate_handshake(addr, socket_idx).await
3426    }
3427
3428    /// Disconnect a peer and clean up any related routes.
3429    pub async fn disconnect_peer(&self, peer_id: PeerId) {
3430        let addr = {
3431            let peers = self.inner.peers_by_id.lock().await;
3432            peers.get(&peer_id).copied()
3433        };
3434        if let Some(addr) = addr {
3435            self.inner.disconnect_addr(addr).await;
3436        }
3437    }
3438
3439    #[cfg(feature = "predictive-rendezvous")]
3440    /// Run Predictive Rendezvous over the mesh UDP socket.
3441    ///
3442    /// This drives slot emission and listens for matching probes. See
3443    /// `docs/predictive-rendezvous.md` for the conceptual model.
3444    pub async fn run_rendezvous(
3445        &self,
3446        cfg: RendezvousConfig,
3447    ) -> Result<RendezvousResult, RendezvousError> {
3448        let rendezvous_id = rendezvous_id_from_seed(&cfg.token.seed);
3449        let (tx, mut rx) = mpsc::channel::<(SocketAddr, ParsedProbe)>(64);
3450        self.inner
3451            .register_pr_session(rendezvous_id, tx)
3452            .await
3453            .map_err(RendezvousError::Mesh)?;
3454
3455        struct PrGuard {
3456            inner: Arc<MeshInner>,
3457            rendezvous_id: u64,
3458        }
3459
3460        impl Drop for PrGuard {
3461            fn drop(&mut self) {
3462                let inner = self.inner.clone();
3463                let rendezvous_id = self.rendezvous_id;
3464                tokio::spawn(async move {
3465                    inner.unregister_pr_session(rendezvous_id).await;
3466                });
3467            }
3468        }
3469
3470        let _guard = PrGuard {
3471            inner: self.inner.clone(),
3472            rendezvous_id,
3473        };
3474
3475        let mut state = RendezvousState::new(cfg.token.clone(), cfg.role);
3476        let mut last_sent_slot: Option<u64> = None;
3477        let mut last_local_offset: Option<u16> = None;
3478        let mut local_offsets = Vec::new();
3479        let mut metrics = RendezvousMetrics::new();
3480        let start_instant = TokioInstant::now();
3481        let deadline = TokioInstant::now() + cfg.max_duration;
3482
3483        loop {
3484            if TokioInstant::now() >= deadline {
3485                metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3486                metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3487                tracing::debug!(
3488                    target = "predictive_rendezvous",
3489                    rendezvous_id,
3490                    slots_attempted = metrics.slots_attempted,
3491                    probes_sent = metrics.probes_sent,
3492                    probes_received = metrics.probes_received,
3493                    "rendezvous timeout"
3494                );
3495                return Err(RendezvousError::Timeout);
3496            }
3497
3498            let now_ms = now_timestamp();
3499            let t0_ms = cfg.token.time_model.t0.saturating_mul(1_000);
3500            let window_ms = cfg.token.time_model.window_secs.saturating_mul(1_000);
3501            let end_ms = t0_ms.saturating_add(window_ms);
3502
3503            if cfg.token.time_model.slot_ms == 0 {
3504                return Err(RendezvousError::InvalidConfig("slot_ms must be > 0"));
3505            }
3506            if now_ms >= end_ms {
3507                metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3508                metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3509                tracing::debug!(
3510                    target = "predictive_rendezvous",
3511                    rendezvous_id,
3512                    slots_attempted = metrics.slots_attempted,
3513                    "rendezvous window closed"
3514                );
3515                return Err(RendezvousError::WindowClosed);
3516            }
3517
3518            if let Some(slot) = compute_slot_params(
3519                &cfg.token.seed,
3520                &cfg.token.time_model,
3521                cfg.role,
3522                now_ms,
3523            ) {
3524                if last_sent_slot != Some(slot.slot_index) {
3525                    last_sent_slot = Some(slot.slot_index);
3526                    last_local_offset = Some(slot.local_port_offset);
3527                    local_offsets.push(slot.local_port_offset);
3528                    metrics.slots_attempted = metrics.slots_attempted.saturating_add(1);
3529                    state.record_sent_slot(&slot);
3530
3531                    tracing::debug!(
3532                        target = "predictive_rendezvous",
3533                        rendezvous_id,
3534                        slot_index = slot.slot_index,
3535                        local_port_offset = slot.local_port_offset,
3536                        remote_port_offset = slot.remote_port_offset,
3537                        "rendezvous slot emit"
3538                    );
3539
3540                    let payload = build_probe_payload(ProbePayload {
3541                        rendezvous_id,
3542                        slot_index: slot.slot_index,
3543                        sender_fingerprint: cfg.sender_fingerprint,
3544                    });
3545
3546                    for addr in &cfg.potential_remote_addrs {
3547                        self.inner
3548                            .send_raw(0, *addr, &payload)
3549                            .await
3550                            .map_err(RendezvousError::Mesh)?;
3551                        metrics.probes_sent = metrics.probes_sent.saturating_add(1);
3552                    }
3553                }
3554            }
3555
3556            let next_wake_ms = if now_ms < t0_ms {
3557                t0_ms
3558            } else {
3559                let slot_index = (now_ms - t0_ms) / cfg.token.time_model.slot_ms;
3560                t0_ms + (slot_index + 1) * cfg.token.time_model.slot_ms
3561            };
3562
3563            let sleep_ms = next_wake_ms.saturating_sub(now_ms).max(1);
3564            let wake_at = TokioInstant::now() + Duration::from_millis(sleep_ms);
3565
3566            tokio::select! {
3567                _ = tokio::time::sleep_until(deadline) => {
3568                    return Err(RendezvousError::Timeout);
3569                }
3570                _ = tokio::time::sleep_until(wake_at) => {
3571                    continue;
3572                }
3573                recv = rx.recv() => {
3574                    let Some((addr, probe)) = recv else {
3575                        metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3576                        metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3577                        return Err(RendezvousError::ChannelClosed);
3578                    };
3579                    metrics.probes_received = metrics.probes_received.saturating_add(1);
3580                    let outcome = state.record_received_probe(addr, &probe);
3581                    if let RendezvousOutcome::Succeeded { remote_addr, slot_index } = outcome {
3582                        let base_port = self.inner.primary_local_port().await.map_err(RendezvousError::Mesh)?;
3583                        let local_port = last_local_offset
3584                            .map(|offset| base_port.wrapping_add(offset))
3585                            .unwrap_or(base_port);
3586                        metrics.slot_index_success = Some(slot_index);
3587                        metrics.total_duration_ms = start_instant.elapsed().as_millis() as u64;
3588                        metrics.nat_behavior_hint = derive_nat_hint(&local_offsets);
3589                        tracing::debug!(
3590                            target = "predictive_rendezvous",
3591                            rendezvous_id,
3592                            slot_index,
3593                            local_port,
3594                            "rendezvous succeeded"
3595                        );
3596                        return Ok(RendezvousResult {
3597                            remote_addr,
3598                            local_port,
3599                            slot_index,
3600                            metrics,
3601                        });
3602                    }
3603                }
3604            }
3605        }
3606    }
3607}
3608
3609#[cfg(feature = "predictive-rendezvous")]
3610/// Async rendezvous configuration for Predictive Rendezvous sessions.
3611#[derive(Debug, Clone)]
3612pub struct RendezvousConfig {
3613    pub token: SemanticRendezvousToken,
3614    pub role: Role,
3615    pub potential_remote_addrs: Vec<SocketAddr>,
3616    pub max_duration: Duration,
3617    pub sender_fingerprint: [u8; 16],
3618}
3619
3620#[cfg(feature = "predictive-rendezvous")]
3621/// Result of a successful rendezvous attempt.
3622#[derive(Debug, Clone, PartialEq, Eq)]
3623pub struct RendezvousResult {
3624    pub remote_addr: SocketAddr,
3625    pub local_port: u16,
3626    pub slot_index: u64,
3627    pub metrics: RendezvousMetrics,
3628}
3629
3630#[cfg(feature = "predictive-rendezvous")]
3631#[derive(Debug)]
3632pub enum RendezvousError {
3633    Timeout,
3634    WindowClosed,
3635    ChannelClosed,
3636    InvalidConfig(&'static str),
3637    Mesh(anyhow::Error),
3638}
3639
3640#[cfg(feature = "predictive-rendezvous")]
3641impl std::fmt::Display for RendezvousError {
3642    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3643        match self {
3644            RendezvousError::Timeout => write!(f, "rendezvous timed out"),
3645            RendezvousError::WindowClosed => write!(f, "rendezvous window closed"),
3646            RendezvousError::ChannelClosed => write!(f, "rendezvous channel closed"),
3647            RendezvousError::InvalidConfig(msg) => write!(f, "invalid rendezvous config: {msg}"),
3648            RendezvousError::Mesh(err) => write!(f, "mesh error: {err}"),
3649        }
3650    }
3651}
3652
3653#[cfg(feature = "predictive-rendezvous")]
3654impl std::error::Error for RendezvousError {}
3655
3656#[cfg(feature = "predictive-rendezvous")]
3657fn derive_nat_hint(offsets: &[u16]) -> rift_rndzv::NatBehaviorHint {
3658    if offsets.len() < 2 {
3659        return rift_rndzv::NatBehaviorHint::Unknown;
3660    }
3661    let mut unique = std::collections::HashSet::new();
3662    for offset in offsets {
3663        unique.insert(*offset);
3664    }
3665    if unique.len() == 1 {
3666        rift_rndzv::NatBehaviorHint::PortPreserving
3667    } else {
3668        rift_rndzv::NatBehaviorHint::HighVariance
3669    }
3670}
3671
3672fn now_timestamp() -> u64 {
3673    SystemTime::now()
3674        .duration_since(UNIX_EPOCH)
3675        .map(|d| d.as_millis() as u64)
3676        .unwrap_or(0)
3677}
3678
3679fn stream_for_payload(payload: &RiftPayload) -> StreamKind {
3680    match payload {
3681        RiftPayload::Control(_) => StreamKind::Control,
3682        RiftPayload::Voice(_) => StreamKind::Voice,
3683        RiftPayload::Text(_) => StreamKind::Text,
3684        RiftPayload::Relay { inner, .. } => stream_for_payload(inner),
3685        RiftPayload::Encrypted(_) => StreamKind::Control,
3686    }
3687}
3688
3689fn should_encrypt(payload: &RiftPayload) -> bool {
3690    match payload {
3691        RiftPayload::Voice(_) => true,
3692        RiftPayload::Text(_) => true,
3693        RiftPayload::Control(ControlMessage::Chat(_)) => true,
3694        RiftPayload::Relay { .. } => false,
3695        RiftPayload::Encrypted(_) => false,
3696        RiftPayload::Control(_) => false,
3697    }
3698}
3699
3700fn encrypt_payload_with_key(
3701    key: &[u8; 32],
3702    header: &RiftFrameHeader,
3703    payload: &RiftPayload,
3704) -> Result<EncryptedPayload> {
3705    let plaintext = bincode::serialize(payload)?;
3706    let mut nonce = [0u8; 12];
3707    rand::rngs::OsRng.fill_bytes(&mut nonce);
3708    let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
3709    let aad = bincode::serialize(header)?;
3710    let ciphertext = cipher
3711        .encrypt(Nonce::from_slice(&nonce), Payload { msg: &plaintext, aad: &aad })
3712        .map_err(|_| anyhow!("e2ee encrypt failed"))?;
3713    Ok(EncryptedPayload { nonce, ciphertext })
3714}
3715
3716#[cfg(all(test, feature = "predictive-rendezvous"))]
3717mod pr_tests {
3718    use super::*;
3719    use rift_rndzv::{EscalationPolicy, IdentityConstraints, SearchStrategy, SemanticRendezvousToken, TimeModel};
3720
3721    #[tokio::test]
3722    async fn predictive_rendezvous_succeeds_loopback() {
3723        let cfg = MeshConfig {
3724            channel_name: "pr-test".to_string(),
3725            password: None,
3726            listen_port: 0,
3727            relay_capable: false,
3728            qos: QosProfile::default(),
3729            auth_token: None,
3730            require_auth: false,
3731            e2ee_key: None,
3732            rekey_interval_secs: None,
3733            max_direct_peers: None,
3734        };
3735
3736        let mesh_a = Mesh::new(Identity::generate(), cfg.clone()).await.unwrap();
3737        let mesh_b = Mesh::new(Identity::generate(), cfg.clone()).await.unwrap();
3738
3739        let addr_a = mesh_a.local_addr().await.unwrap();
3740        let addr_b = mesh_b.local_addr().await.unwrap();
3741
3742        let now_secs = SystemTime::now()
3743            .duration_since(UNIX_EPOCH)
3744            .unwrap()
3745            .as_secs();
3746
3747        let token = SemanticRendezvousToken::new(
3748            rift_rndzv::api::RendezvousSpaceId([0u8; 32]),
3749            [7u8; 32],
3750            IdentityConstraints {
3751                allowed_fingerprints: Vec::new(),
3752            },
3753            TimeModel {
3754                t0: now_secs + 1,
3755                window_secs: 3,
3756                slot_ms: 200,
3757            },
3758            SearchStrategy::BasicDeterministic,
3759            EscalationPolicy::None,
3760        );
3761
3762        let cfg_a = RendezvousConfig {
3763            token: token.clone(),
3764            role: Role::Caller,
3765            potential_remote_addrs: vec![addr_b],
3766            max_duration: Duration::from_secs(3),
3767            sender_fingerprint: [1u8; 16],
3768        };
3769
3770        let cfg_b = RendezvousConfig {
3771            token,
3772            role: Role::Callee,
3773            potential_remote_addrs: vec![addr_a],
3774            max_duration: Duration::from_secs(3),
3775            sender_fingerprint: [2u8; 16],
3776        };
3777
3778        let handle_a = mesh_a.handle();
3779        let handle_b = mesh_b.handle();
3780
3781        let (res_a, res_b) = tokio::join!(
3782            handle_a.run_rendezvous(cfg_a),
3783            handle_b.run_rendezvous(cfg_b),
3784        );
3785
3786        assert!(res_a.is_ok(), "caller rendezvous failed: {:?}", res_a);
3787        assert!(res_b.is_ok(), "callee rendezvous failed: {:?}", res_b);
3788    }
3789}
3790
3791fn decrypt_payload_with_key(
3792    key: &[u8; 32],
3793    header: &RiftFrameHeader,
3794    encrypted: EncryptedPayload,
3795) -> Result<RiftPayload> {
3796    let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
3797    let aad = bincode::serialize(header)?;
3798    let plaintext = cipher
3799        .decrypt(Nonce::from_slice(&encrypted.nonce), Payload { msg: &encrypted.ciphertext, aad: &aad })
3800        .map_err(|_| anyhow!("e2ee decrypt failed"))?;
3801    let payload: RiftPayload = bincode::deserialize(&plaintext)?;
3802    Ok(payload)
3803}
3804
3805#[cfg(test)]
3806mod security_tests {
3807    use super::*;
3808
3809    #[test]
3810    fn e2ee_encrypt_decrypt_roundtrip() {
3811        let key = [7u8; 32];
3812        let header = RiftFrameHeader {
3813            version: ProtocolVersion::V1,
3814            stream: StreamKind::Voice,
3815            flags: 0,
3816            seq: 1,
3817            timestamp: 123,
3818            source: PeerId([1u8; 32]),
3819            session: SessionId::NONE,
3820        };
3821        let payload = RiftPayload::Voice(VoicePacket {
3822            codec_id: CodecId::Opus,
3823            payload: vec![1, 2, 3, 4],
3824        });
3825
3826        let encrypted = encrypt_payload_with_key(&key, &header, &payload).unwrap();
3827        let decoded = decrypt_payload_with_key(&key, &header, encrypted).unwrap();
3828        match decoded {
3829            RiftPayload::Voice(pkt) => assert_eq!(pkt.payload, vec![1, 2, 3, 4]),
3830            other => panic!("unexpected payload: {other:?}"),
3831        }
3832    }
3833
3834    #[test]
3835    fn e2ee_aad_mismatch_fails() {
3836        let key = [9u8; 32];
3837        let mut header = RiftFrameHeader {
3838            version: ProtocolVersion::V1,
3839            stream: StreamKind::Text,
3840            flags: 0,
3841            seq: 5,
3842            timestamp: 999,
3843            source: PeerId([2u8; 32]),
3844            session: SessionId::NONE,
3845        };
3846        let payload = RiftPayload::Text(ChatMessage::new(
3847            PeerId([3u8; 32]),
3848            999,
3849            "hi".to_string(),
3850        ));
3851
3852        let encrypted = encrypt_payload_with_key(&key, &header, &payload).unwrap();
3853        header.seq = 6;
3854        let res = decrypt_payload_with_key(&key, &header, encrypted);
3855        assert!(res.is_err());
3856    }
3857}
3858
3859fn default_peer_capabilities() -> Capabilities {
3860    Capabilities {
3861        supported_versions: vec![ProtocolVersion::V1],
3862        audio_codecs: vec![CodecId::Opus],
3863        features: vec![FeatureFlag::Voice, FeatureFlag::Text],
3864        max_bitrate: Some(48_000),
3865        preferred_frame_duration_ms: Some(20),
3866    }
3867}