1use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::net::{SocketAddr, ToSocketAddrs};
10use std::sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex as StdMutex,
13};
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16use anyhow::{Context, Result};
17use rand::rngs::OsRng;
18use rand::RngCore;
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use tokio::sync::{mpsc, Mutex};
22use tokio::time::Instant;
23
24use rift_core::{decode_invite, generate_invite, Identity, Invite, PeerId, KeyStore};
25use rift_dht::{DhtConfig as RiftDhtConfig, DhtHandle, PeerEndpointInfo};
26use rift_discovery::local_ipv4_addrs;
27use rift_media::{
28 decode_frame, encode_frame, AudioConfig, AudioIn, AudioMixer, AudioOut, OpusDecoder,
29 OpusEncoder,
30};
31use rift_mesh::{Mesh, MeshConfig, MeshEvent, MeshHandle};
32use rift_nat::{
33 attempt_hole_punch, gather_local_candidates, gather_public_addrs, parse_turn_server, NatConfig,
34 PeerEndpoint,
35};
36use rift_protocol::{CallState, Capabilities, QosProfile, SessionId};
37use rift_rndzv::{
38 ChannelKind as RndzvChannelKind, PeerId as RndzvPeerId, RndzvChannel, RndzvConnectTarget,
39 RndzvConnector, RndzvListener, Srt as RndzvSrt,
40 EscalationPolicy, IdentityConstraints, RendezvousSpaceId, SearchStrategy,
41 SemanticRendezvousToken, TimeModel,
42};
43use hkdf::Hkdf;
44use sha2::Sha256;
45
46pub use rift_core::PeerId as RiftPeerId;
47pub use rift_protocol::{
48 CallState as RiftCallState, ChatMessage, CodecId, FeatureFlag, GroupMode,
49 QosProfile as RiftQosProfile, SessionId as RiftSessionId,
50};
51
52pub const SDK_VERSION: &str = "0.1.0";
53pub const SDK_ABI_VERSION: i32 = 1;
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct RiftConfig {
57 pub identity_path: Option<PathBuf>,
59 pub listen_port: u16,
61 pub relay: bool,
63 pub user_name: Option<String>,
65 pub preferred_codecs: Vec<CodecId>,
67 pub preferred_features: Vec<FeatureFlag>,
69 #[serde(default)]
71 pub qos: QosProfile,
72 #[serde(default)]
74 pub metrics_enabled: bool,
75 #[serde(default)]
77 pub security: SecurityConfig,
78 pub dht: DhtConfigSdk,
80 pub audio: AudioConfigSdk,
82 pub network: NetworkConfigSdk,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct AudioConfigSdk {
88 pub enabled: bool,
90 pub input_device: Option<String>,
92 pub output_device: Option<String>,
94 pub quality: String,
96 pub ptt: bool,
98 pub vad: bool,
100 pub mute_output: bool,
102 pub emit_voice_frames: bool,
104 pub allow_fail: bool,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct NetworkConfigSdk {
110 pub prefer_p2p: bool,
112 pub local_ports: Option<Vec<u16>>,
114 pub known_peers: Vec<std::net::SocketAddr>,
116 pub invite: Option<String>,
117 #[serde(default)]
118 pub stun_servers: Vec<String>,
119 #[serde(default)]
120 pub stun_timeout_ms: Option<u64>,
121 #[serde(default)]
122 pub enable_turn: bool,
123 #[serde(default)]
124 pub turn_servers: Vec<String>,
125 #[serde(default)]
126 pub turn_timeout_ms: Option<u64>,
127 #[serde(default)]
128 pub turn_keepalive_ms: Option<u64>,
129 #[serde(default)]
130 pub punch_interval_ms: Option<u64>,
131 #[serde(default)]
132 pub punch_timeout_ms: Option<u64>,
133 #[serde(default)]
134 pub max_direct_peers: Option<usize>,
135 #[serde(default)]
137 pub rndzv: Option<RndzvConfigSdk>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct RndzvConfigSdk {
142 pub srt_uri: String,
144 pub role: RndzvRole,
146 pub remote_addr: Option<SocketAddr>,
148 pub listen_addr: Option<SocketAddr>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum RndzvRole {
154 Connector,
155 Listener,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct SrtInvite {
160 pub label: String,
162 pub uri: String,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct DhtConfigSdk {
168 pub enabled: bool,
170 pub bootstrap_nodes: Vec<String>,
172 pub listen_addr: Option<String>,
174}
175
176impl Default for RiftConfig {
177 fn default() -> Self {
178 Self {
179 identity_path: None,
180 listen_port: 7777,
181 relay: false,
182 user_name: None,
183 preferred_codecs: vec![CodecId::Opus, CodecId::PCM16],
184 preferred_features: vec![
185 FeatureFlag::Voice,
186 FeatureFlag::Text,
187 FeatureFlag::Relay,
188 FeatureFlag::E2EE,
189 ],
190 qos: QosProfile::default(),
191 metrics_enabled: true,
192 security: SecurityConfig::default(),
193 dht: DhtConfigSdk::default(),
194 audio: AudioConfigSdk::default(),
195 network: NetworkConfigSdk::default(),
196 }
197 }
198}
199
200pub fn create_voice_invite(to: PeerId) -> SrtInvite {
202 let mut seed = [0u8; 32];
203 OsRng.fill_bytes(&mut seed);
204 let space = RendezvousSpaceId(*blake3::hash(b"rift-rndzv-voice-call").as_bytes());
205 let now = SystemTime::now()
206 .duration_since(UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_secs();
209 let token = SemanticRendezvousToken::new(
210 space,
211 seed,
212 IdentityConstraints {
213 allowed_fingerprints: vec![to.0],
214 },
215 TimeModel {
216 t0: now.saturating_add(10),
217 window_secs: 120,
218 slot_ms: 250,
219 },
220 SearchStrategy::BasicDeterministic,
221 EscalationPolicy::None,
222 );
223 let uri = token
224 .to_uri()
225 .expect("SRT URI encoding should succeed for valid inputs");
226 SrtInvite {
227 label: "Voice Call".to_string(),
228 uri,
229 }
230}
231
232pub fn accept_voice_invite(invite: &SrtInvite) -> Result<RndzvSrt, RiftError> {
234 RndzvSrt::from_uri(&invite.uri)
235 .map_err(|e| RiftError::Other(format!("rndzv srt decode failed: {e}")))
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239#[serde(default)]
240pub struct SecurityConfig {
241 pub trust_on_first_use: bool,
243 pub known_hosts_path: Option<PathBuf>,
245 pub reject_on_mismatch: bool,
247 pub channel_shared_secret: Option<String>,
249 pub audit_log_path: Option<PathBuf>,
251 pub rekey_interval_secs: Option<u64>,
253}
254
255impl Default for SecurityConfig {
256 fn default() -> Self {
257 Self {
258 trust_on_first_use: true,
259 known_hosts_path: None,
260 reject_on_mismatch: false,
261 channel_shared_secret: None,
262 audit_log_path: None,
263 rekey_interval_secs: Some(600),
264 }
265 }
266}
267
268impl Default for AudioConfigSdk {
269 fn default() -> Self {
270 Self {
271 enabled: true,
272 input_device: None,
273 output_device: None,
274 quality: "medium".to_string(),
275 ptt: false,
276 vad: true,
277 mute_output: false,
278 emit_voice_frames: false,
279 allow_fail: false,
280 }
281 }
282}
283
284impl Default for NetworkConfigSdk {
285 fn default() -> Self {
286 Self {
287 prefer_p2p: true,
288 local_ports: None,
289 known_peers: Vec::new(),
290 invite: None,
291 stun_servers: vec![
292 "stun.l.google.com:19302".to_string(),
293 "stun1.l.google.com:19302".to_string(),
294 ],
295 stun_timeout_ms: Some(800),
296 enable_turn: false,
297 turn_servers: Vec::new(),
298 turn_timeout_ms: Some(1200),
299 turn_keepalive_ms: Some(12000),
300 punch_interval_ms: Some(200),
301 punch_timeout_ms: Some(5000),
302 max_direct_peers: None,
303 rndzv: None,
304 }
305 }
306}
307
308impl Default for DhtConfigSdk {
309 fn default() -> Self {
310 Self {
311 enabled: false,
312 bootstrap_nodes: Vec::new(),
313 listen_addr: None,
314 }
315 }
316}
317
318#[derive(Debug, Clone, Copy)]
319pub struct LinkStats {
320 pub rtt_ms: f32,
322 pub loss: f32,
324 pub jitter_ms: f32,
326}
327
328#[derive(Debug, Clone, Copy)]
329pub struct GlobalStats {
330 pub num_peers: usize,
332 pub num_sessions: usize,
334 pub packets_sent: u64,
336 pub packets_received: u64,
338 pub bytes_sent: u64,
340 pub bytes_received: u64,
342}
343
344#[derive(Debug, Clone)]
345pub enum RouteKind {
346 Direct,
347 Relayed { via: PeerId },
348}
349
350#[derive(Debug, Clone)]
351pub enum RiftEvent {
352 IncomingChat(ChatMessage),
354 IncomingCall {
356 session: SessionId,
357 from: PeerId,
358 rndzv_srt_uri: Option<String>,
359 },
360 CallStateChanged { session: SessionId, state: CallState },
362 PeerJoinedChannel { peer: PeerId, channel: String },
364 PeerLeftChannel { peer: PeerId, channel: String },
366 PeerCapabilities { peer: PeerId, capabilities: Capabilities },
368 AudioLevel { peer: PeerId, level: f32 },
370 CodecSelected { codec: CodecId },
372 AudioBitrate { bitrate: u32 },
374 StatsUpdate { peer: PeerId, stats: LinkStats, global: GlobalStats },
376 RouteUpdated { peer: PeerId, route: RouteKind },
378 GroupTopology { session: SessionId, mode: GroupMode },
380 PeerFingerprint { peer: PeerId, fingerprint: String },
382 SecurityNotice { message: String },
384 VoiceFrame { peer: PeerId, samples: Vec<i16> },
386}
387
388#[derive(Debug, thiserror::Error)]
389pub enum RiftError {
391 #[error("not initialized")]
393 NotInitialized,
394 #[error("channel already joined")]
396 AlreadyJoined,
397 #[error("channel not joined")]
399 NotJoined,
400 #[error("mesh error: {0}")]
402 Mesh(String),
403 #[error("audio error: {0}")]
405 Audio(String),
406 #[error("other: {0}")]
408 Other(String),
409}
410
411struct VoiceRuntime {
413 _audio_in: AudioIn,
414 mixer: Arc<StdMutex<AudioMixer>>,
415 frame_samples: usize,
416 emit_voice: bool,
417 audio_config: AudioConfig,
418 tuning: Arc<StdMutex<AudioTuning>>,
419 rndzv_channel: Arc<StdMutex<Option<RndzvChannel>>>,
420 rndzv_remote_peer: Arc<StdMutex<Option<PeerId>>>,
421}
422
423impl VoiceRuntime {
424 fn set_rndzv_channel(&self, channel: RndzvChannel, remote_peer: PeerId) {
425 {
426 let mut slot = self.rndzv_channel.lock().unwrap();
427 *slot = Some(channel.clone());
428 }
429 {
430 let mut peer_slot = self.rndzv_remote_peer.lock().unwrap();
431 *peer_slot = Some(remote_peer);
432 }
433
434 let mixer = self.mixer.clone();
435 let frame_samples = self.frame_samples;
436 let emit_voice = self.emit_voice;
437 let audio_config = self.audio_config.clone();
438 tokio::spawn(async move {
439 let mut decoder = match OpusDecoder::new(&audio_config) {
440 Ok(decoder) => decoder,
441 Err(err) => {
442 tracing::warn!("rndzv opus decoder init failed: {err}");
443 return;
444 }
445 };
446 loop {
447 let payload = match channel.recv().await {
448 Ok(Some(payload)) => payload,
449 Ok(None) => continue,
450 Err(err) => {
451 tracing::warn!("rndzv channel recv failed: {err}");
452 return;
453 }
454 };
455 if let Ok(out) = decode_frame(CodecId::Opus, &payload, &mut decoder, frame_samples) {
456 let mut mixer = mixer.lock().unwrap();
457 mixer.push(peer_to_stream_id(&remote_peer), out.clone());
458 if emit_voice {
459 let _ = out;
460 }
461 }
462 }
463 });
464 }
465
466 fn clear_rndzv_channel(&self) {
467 {
468 let mut slot = self.rndzv_channel.lock().unwrap();
469 *slot = None;
470 }
471 {
472 let mut peer_slot = self.rndzv_remote_peer.lock().unwrap();
473 *peer_slot = None;
474 }
475 }
476}
477
478#[derive(Debug, Clone)]
480struct AudioTuning {
481 bitrate: u32,
482 fec: bool,
483 loss_pct: u8,
484}
485
486struct QosState {
488 profile: QosProfile,
489 peer_stats: HashMap<PeerId, LinkStats>,
490 current: AudioTuning,
491 last_adjust: Instant,
492}
493
494struct SessionRuntime {
496 _channel: String,
497 handle: MeshHandle,
498 _voice: Option<Arc<VoiceRuntime>>,
499 _dht: Option<DhtHandle>,
500 pending_call_srt: Arc<StdMutex<HashMap<SessionId, String>>>,
501}
502
503pub struct RiftHandle {
505 identity: Mutex<Option<Identity>>,
507 _local_peer_id: PeerId,
509 config: RiftConfig,
511 overrides: Mutex<RiftConfigOverrides>,
513 runtime: Mutex<Option<SessionRuntime>>,
515 event_rx: Mutex<mpsc::UnboundedReceiver<RiftEvent>>,
517 event_tx: mpsc::UnboundedSender<RiftEvent>,
518 ptt_active: Arc<AtomicBool>,
519 mute_active: Arc<AtomicBool>,
520}
521
522#[derive(Debug, Default, Clone)]
523struct RiftConfigOverrides {
525 dht_enabled: Option<bool>,
526 bootstrap_nodes: Option<Vec<String>>,
527 invite: Option<String>,
528 turn_servers: Option<Vec<String>>,
529 audio_quality: Option<String>,
530}
531
532impl RiftHandle {
533 pub async fn new(config: RiftConfig) -> Result<Self, RiftError> {
535 rift_metrics::set_enabled(config.metrics_enabled);
536 let (event_tx, event_rx) = mpsc::unbounded_channel();
537 let identity_path = config
538 .identity_path
539 .clone()
540 .unwrap_or_else(|| Identity::default_path().unwrap_or_else(|_| PathBuf::from("identity.key")));
541 let existed = identity_path.exists();
542 let identity = KeyStore::load_or_generate(&identity_path)
543 .context("identity load failed")
544 .map_err(|e| RiftError::Other(format!("{e}")))?;
545 if !existed {
546 tracing::info!(path = %identity_path.display(), "new identity generated");
547 }
548 let local_peer_id = identity.peer_id;
549 Ok(Self {
550 ptt_active: Arc::new(AtomicBool::new(!config.audio.ptt)),
551 identity: Mutex::new(Some(identity)),
552 _local_peer_id: local_peer_id,
553 config,
554 overrides: Mutex::new(RiftConfigOverrides::default()),
555 runtime: Mutex::new(None),
556 event_rx: Mutex::new(event_rx),
557 event_tx,
558 mute_active: Arc::new(AtomicBool::new(false)),
559 })
560 }
561
562 pub async fn set_dht_enabled(&self, enabled: bool) {
564 let mut overrides = self.overrides.lock().await;
565 overrides.dht_enabled = Some(enabled);
566 }
567
568 pub async fn set_bootstrap_nodes(&self, nodes: Vec<String>) {
570 let mut overrides = self.overrides.lock().await;
571 overrides.bootstrap_nodes = Some(nodes);
572 }
573
574 pub async fn set_invite(&self, invite: Option<String>) {
576 let mut overrides = self.overrides.lock().await;
577 overrides.invite = invite;
578 }
579
580 pub async fn set_turn_servers(&self, servers: Vec<String>) {
582 let mut overrides = self.overrides.lock().await;
583 overrides.turn_servers = Some(servers);
584 }
585
586 pub async fn set_audio_quality(&self, quality: Option<String>) {
588 let mut overrides = self.overrides.lock().await;
589 overrides.audio_quality = quality;
590 }
591
592 pub async fn join_channel(
594 &self,
595 name: &str,
596 password: Option<&str>,
597 internet: bool,
598 ) -> Result<(), RiftError> {
599 let mut cfg = self.config.clone();
600 {
601 let overrides = self.overrides.lock().await;
602 if let Some(enabled) = overrides.dht_enabled {
603 cfg.dht.enabled = enabled;
604 }
605 if let Some(nodes) = overrides.bootstrap_nodes.clone() {
606 cfg.dht.bootstrap_nodes = nodes;
607 }
608 if let Some(invite) = overrides.invite.clone() {
609 cfg.network.invite = Some(invite);
610 }
611 if let Some(turn_servers) = overrides.turn_servers.clone() {
612 cfg.network.turn_servers = turn_servers;
613 cfg.network.enable_turn = !cfg.network.turn_servers.is_empty();
614 }
615 if let Some(quality) = overrides.audio_quality.clone() {
616 cfg.audio.quality = quality;
617 }
618 }
619 let mut runtime_guard = self.runtime.lock().await;
620 if runtime_guard.is_some() {
621 return Err(RiftError::AlreadyJoined);
622 }
623 let identity = {
624 let mut identity_guard = self.identity.lock().await;
625 match identity_guard.take() {
626 Some(identity) => identity,
627 None => Identity::load(cfg.identity_path.as_deref())
628 .context("identity not found")
629 .map_err(|e| RiftError::Other(format!("{e}")))?,
630 }
631 };
632
633 let auth_token = self
634 .config
635 .security
636 .channel_shared_secret
637 .as_deref()
638 .map(|secret| derive_auth_token(secret, name));
639 let nat_cfg = if internet {
640 Some(default_nat_config(
641 cfg.listen_port,
642 cfg.network.local_ports.clone(),
643 cfg.network.stun_servers.clone(),
644 cfg.network.stun_timeout_ms,
645 cfg.network.punch_interval_ms,
646 cfg.network.punch_timeout_ms,
647 cfg.network.enable_turn,
648 cfg.network.turn_servers.clone(),
649 cfg.network.turn_timeout_ms,
650 cfg.network.turn_keepalive_ms,
651 ))
652 } else {
653 None
654 };
655 let mut known_peers = cfg.network.known_peers.clone();
656 if internet && known_peers.is_empty() {
657 if let Some(nat_cfg) = nat_cfg.as_ref() {
658 if !nat_cfg.stun_servers.is_empty() {
659 if let Ok(public_addrs) = gather_public_addrs(nat_cfg).await {
660 if !public_addrs.is_empty() {
661 known_peers = public_addrs;
662 }
663 }
664 }
665 }
666 }
667 let invite_for_key = if let Some(invite_str) = &cfg.network.invite {
668 Some(decode_invite(invite_str).map_err(|e| RiftError::Other(format!("{e}")))?)
669 } else if internet {
670 let mut candidates = gather_local_candidates(cfg.listen_port);
671 if let Some(nat_cfg) = nat_cfg.as_ref() {
672 if !nat_cfg.stun_servers.is_empty() {
673 if let Ok(public_addrs) = gather_public_addrs(nat_cfg).await {
674 candidates.extend(public_addrs);
675 }
676 }
677 }
678 candidates.sort();
679 candidates.dedup();
680 Some(generate_invite(
681 name,
682 password,
683 known_peers.clone(),
684 candidates,
685 ))
686 } else {
687 None
688 };
689 let e2ee_key = derive_e2ee_key(
690 name,
691 password,
692 invite_for_key.as_ref(),
693 cfg.security.channel_shared_secret.as_deref(),
694 );
695 let config = MeshConfig {
696 channel_name: name.to_string(),
697 password: password.map(|v| v.to_string()),
698 listen_port: cfg.listen_port,
699 relay_capable: cfg.relay,
700 qos: cfg.qos.clone(),
701 auth_token,
702 require_auth: cfg.security.channel_shared_secret.is_some(),
703 e2ee_key,
704 rekey_interval_secs: cfg.security.rekey_interval_secs,
705 max_direct_peers: cfg.network.max_direct_peers,
706 };
707 let mut mesh = Mesh::new(identity, config)
708 .await
709 .map_err(|e| RiftError::Mesh(format!("{e}")))?;
710
711 let handle = mesh.handle();
712 handle
713 .set_preferred_codecs(self.config.preferred_codecs.clone())
714 .await;
715 handle
716 .set_preferred_features(self.config.preferred_features.clone())
717 .await;
718 let security_handle = handle.clone();
719
720 if internet {
721 let nat_cfg = nat_cfg.clone().expect("nat cfg");
722 mesh.enable_nat(nat_cfg.clone()).await;
723 let invite = invite_for_key.clone().unwrap_or_else(|| Invite {
724 channel_name: name.to_string(),
725 password: password.map(|v| v.to_string()),
726 channel_key: [0u8; 32],
727 known_peers: Vec::new(),
728 candidates: Vec::new(),
729 relay_candidates: Vec::new(),
730 version: 2,
731 created_at: now_timestamp(),
732 });
733 mesh.join_invite(invite, nat_cfg)
734 .await
735 .map_err(|e| RiftError::Mesh(format!("{e}")))?;
736 } else {
737 mesh.start_lan_discovery()
738 .map_err(|e| RiftError::Mesh(format!("{e}")))?;
739 }
740
741 let event_tx = self.event_tx.clone();
742 let channel = name.to_string();
743 let channel_for_task = channel.clone();
744 let pending_call_srt: Arc<StdMutex<HashMap<SessionId, String>>> =
745 Arc::new(StdMutex::new(HashMap::new()));
746 let pending_call_srt_task = pending_call_srt.clone();
747 let voice = if cfg.audio.enabled {
748 match start_audio_pipeline(
749 cfg.clone(),
750 handle.clone(),
751 self.local_peer_id(),
752 self.ptt_active.clone(),
753 self.mute_active.clone(),
754 ) {
755 Ok(voice) => Some(Arc::new(voice)),
756 Err(err) => {
757 if cfg.audio.allow_fail {
758 tracing::warn!("audio pipeline failed: {err}");
759 None
760 } else {
761 return Err(err);
762 }
763 }
764 }
765 } else {
766 None
767 };
768 if let Some(voice) = voice.as_ref() {
769 let _ = event_tx.send(RiftEvent::AudioBitrate {
770 bitrate: voice.audio_config.bitrate,
771 });
772 }
773
774 let dht = if cfg.dht.enabled {
775 let dht_config = RiftDhtConfig {
776 bootstrap_nodes: parse_socket_addrs(&cfg.dht.bootstrap_nodes),
777 listen_addr: cfg
778 .dht
779 .listen_addr
780 .as_deref()
781 .and_then(parse_socket_addr)
782 .unwrap_or_else(|| {
783 SocketAddr::from(([0, 0, 0, 0], cfg.listen_port.saturating_add(100)))
784 }),
785 };
786 let handle_dht = DhtHandle::new(dht_config)
787 .await
788 .map_err(|e| RiftError::Other(format!("{e}")))?;
789
790 let channel_id = rift_core::ChannelId::from_channel(&channel, password);
791 let nat_cfg = nat_cfg.clone().unwrap_or_else(|| default_nat_config(
792 cfg.listen_port,
793 cfg.network.local_ports.clone(),
794 cfg.network.stun_servers.clone(),
795 cfg.network.stun_timeout_ms,
796 cfg.network.punch_interval_ms,
797 cfg.network.punch_timeout_ms,
798 cfg.network.enable_turn,
799 cfg.network.turn_servers.clone(),
800 cfg.network.turn_timeout_ms,
801 cfg.network.turn_keepalive_ms,
802 ));
803 let addrs = match gather_public_addrs(&nat_cfg).await {
804 Ok(public_addrs) if !public_addrs.is_empty() => public_addrs,
805 _ => local_ipv4_addrs()
806 .map_err(|e| RiftError::Other(format!("{e}")))?
807 .into_iter()
808 .map(|ip| SocketAddr::new(ip, cfg.listen_port))
809 .collect::<Vec<_>>(),
810 };
811 let info = PeerEndpointInfo {
812 peer_id: self.local_peer_id(),
813 addrs,
814 };
815 let _ = handle_dht.announce(channel_id, info.clone()).await;
816
817 let announce_handle = handle_dht.clone();
818 let announce_info = info.clone();
819 tokio::spawn(async move {
820 let mut tick = tokio::time::interval(Duration::from_secs(30));
821 loop {
822 tick.tick().await;
823 let _ = announce_handle
824 .announce(channel_id, announce_info.clone())
825 .await;
826 }
827 });
828
829 let lookup_handle = handle_dht.clone();
830 let mesh_handle = handle.clone();
831 let nat_cfg = default_nat_config(
832 self.config.listen_port,
833 self.config.network.local_ports.clone(),
834 self.config.network.stun_servers.clone(),
835 self.config.network.stun_timeout_ms,
836 self.config.network.punch_interval_ms,
837 self.config.network.punch_timeout_ms,
838 self.config.network.enable_turn,
839 self.config.network.turn_servers.clone(),
840 self.config.network.turn_timeout_ms,
841 self.config.network.turn_keepalive_ms,
842 );
843 tokio::spawn(async move {
844 let mut tick = tokio::time::interval(Duration::from_secs(12));
845 loop {
846 tick.tick().await;
847 if let Ok(peers) = lookup_handle.lookup(channel_id).await {
848 for peer in peers {
849 if peer.peer_id == info.peer_id {
850 continue;
851 }
852 for addr in peer.addrs.iter().copied() {
853 let endpoint = PeerEndpoint {
854 peer_id: peer.peer_id,
855 external_addrs: vec![addr],
856 punch_ports: vec![addr.port()],
857 };
858 if let Ok((socket, remote)) = attempt_hole_punch(&nat_cfg, &endpoint).await {
859 let _ = mesh_handle.connect_with_socket(socket, remote).await;
860 } else {
861 let _ = mesh_handle.connect_addr(addr).await;
862 }
863 }
864 }
865 }
866 }
867 });
868
869 Some(handle_dht)
870 } else {
871 None
872 };
873
874 let voice_state = voice.as_ref().map(|v| VoiceRuntimeRef {
875 mixer: v.mixer.clone(),
876 frame_samples: v.frame_samples,
877 emit_voice: v.emit_voice,
878 audio_config: v.audio_config.clone(),
879 tuning: v.tuning.clone(),
880 });
881 let security_cfg = self.config.security.clone();
882 let qos_profile = self.config.qos.clone();
883 let mut qos_state = voice_state.as_ref().map(|state| QosState {
884 profile: qos_profile,
885 peer_stats: HashMap::new(),
886 current: AudioTuning {
887 bitrate: state.audio_config.bitrate,
888 fec: false,
889 loss_pct: 0,
890 },
891 last_adjust: Instant::now() - Duration::from_secs(5),
892 });
893
894 tokio::spawn(async move {
895 let mut mesh = mesh;
896 let mut decoder = if voice_state.is_some() {
897 Some(
898 OpusDecoder::new(&voice_state.as_ref().unwrap().audio_config)
899 .expect("opus decoder"),
900 )
901 } else {
902 None
903 };
904 while let Some(event) = mesh.next_event().await {
905 match event {
906 MeshEvent::PeerJoined(peer) => {
907 let _ = event_tx.send(RiftEvent::PeerJoinedChannel {
908 peer,
909 channel: channel_for_task.clone(),
910 });
911 }
912 MeshEvent::PeerLeft(peer) => {
913 let _ = event_tx.send(RiftEvent::PeerLeftChannel {
914 peer,
915 channel: channel_for_task.clone(),
916 });
917 }
918 MeshEvent::ChatReceived(chat) => {
919 let _ = event_tx.send(RiftEvent::IncomingChat(chat));
920 }
921 MeshEvent::IncomingCall {
922 session,
923 from,
924 rndzv_srt_uri,
925 } => {
926 if let Some(uri) = rndzv_srt_uri.clone() {
927 let mut map = pending_call_srt_task.lock().unwrap();
928 map.insert(session, uri);
929 }
930 let _ = event_tx.send(RiftEvent::IncomingCall {
931 session,
932 from,
933 rndzv_srt_uri,
934 });
935 }
936 MeshEvent::CallAccepted { session, .. } => {
937 let _ = event_tx.send(RiftEvent::CallStateChanged {
938 session,
939 state: CallState::Active,
940 });
941 }
942 MeshEvent::CallDeclined { session, .. } => {
943 let _ = event_tx.send(RiftEvent::CallStateChanged {
944 session,
945 state: CallState::Ended,
946 });
947 }
948 MeshEvent::CallEnded { session } => {
949 let _ = event_tx.send(RiftEvent::CallStateChanged {
950 session,
951 state: CallState::Ended,
952 });
953 }
954 MeshEvent::VoiceFrame { from, codec, payload, .. } => {
955 if let (Some(state), Some(decoder)) = (voice_state.as_ref(), decoder.as_mut()) {
956 if let Ok(out) = decode_frame(codec, &payload, decoder, state.frame_samples) {
957 let mut mixer = state.mixer.lock().unwrap();
958 mixer.push(peer_to_stream_id(&from), out.clone());
959 let level = audio_level(&out);
960 let _ = event_tx.send(RiftEvent::AudioLevel { peer: from, level });
961 if state.emit_voice {
962 let _ = event_tx.send(RiftEvent::VoiceFrame { peer: from, samples: out });
963 }
964 }
965 }
966 }
967 MeshEvent::PeerCapabilities { peer_id, capabilities } => {
968 let _ = event_tx.send(RiftEvent::PeerCapabilities { peer: peer_id, capabilities });
969 }
970 MeshEvent::GroupCodec(codec) => {
971 let _ = event_tx.send(RiftEvent::CodecSelected { codec });
972 }
973 MeshEvent::StatsUpdate { peer, stats, global } => {
974 let sdk_stats = LinkStats {
975 rtt_ms: stats.rtt_ms,
976 loss: stats.loss,
977 jitter_ms: stats.jitter_ms,
978 };
979 let sdk_global = GlobalStats {
980 num_peers: global.num_peers,
981 num_sessions: global.num_sessions,
982 packets_sent: global.packets_sent,
983 packets_received: global.packets_received,
984 bytes_sent: global.bytes_sent,
985 bytes_received: global.bytes_received,
986 };
987 let _ = event_tx.send(RiftEvent::StatsUpdate {
988 peer,
989 stats: sdk_stats,
990 global: sdk_global,
991 });
992 if let (Some(state), Some(qos)) = (voice_state.as_ref(), qos_state.as_mut()) {
993 qos.peer_stats.insert(peer, sdk_stats);
994 if let Some(next) = compute_next_tuning(qos) {
995 let mut tuning = state.tuning.lock().unwrap();
996 let bitrate_changed = tuning.bitrate != next.bitrate;
997 *tuning = next.clone();
998 if bitrate_changed {
999 let _ = event_tx.send(RiftEvent::AudioBitrate { bitrate: next.bitrate });
1000 }
1001 }
1002 }
1003 }
1004 MeshEvent::RouteUpdated { peer_id, route } => {
1005 let route = match route {
1006 rift_mesh::PeerRoute::Direct { .. } => RouteKind::Direct,
1007 rift_mesh::PeerRoute::Relayed { via } => RouteKind::Relayed { via },
1008 };
1009 let _ = event_tx.send(RiftEvent::RouteUpdated {
1010 peer: peer_id,
1011 route,
1012 });
1013 }
1014 MeshEvent::GroupTopology { session, mode } => {
1015 let _ = event_tx.send(RiftEvent::GroupTopology { session, mode });
1016 }
1017 MeshEvent::PeerIdentity { peer_id, public_key } => {
1018 if let Err(err) = handle_peer_identity(
1019 &event_tx,
1020 &security_handle,
1021 &security_cfg,
1022 peer_id,
1023 &public_key,
1024 )
1025 .await
1026 {
1027 tracing::warn!("security check failed: {err}");
1028 }
1029 }
1030 MeshEvent::PeerSessionConfig { .. } | MeshEvent::RouteUpgraded(_) => {}
1031 }
1032 }
1033 });
1034
1035 *runtime_guard = Some(SessionRuntime {
1036 _channel: channel,
1037 handle,
1038 _voice: voice,
1039 _dht: dht,
1040 pending_call_srt,
1041 });
1042 Ok(())
1043 }
1044
1045 pub async fn leave_channel(&self, _name: &str) -> Result<(), RiftError> {
1047 let mut runtime_guard = self.runtime.lock().await;
1048 if runtime_guard.is_none() {
1049 return Err(RiftError::NotJoined);
1050 }
1051 *runtime_guard = None;
1052 Ok(())
1053 }
1054
1055 pub async fn send_chat(&self, text: &str) -> Result<(), RiftError> {
1057 let runtime_guard = self.runtime.lock().await;
1058 let runtime = runtime_guard.as_ref().ok_or(RiftError::NotJoined)?;
1059 runtime
1060 .handle
1061 .broadcast_chat(text.to_string())
1062 .await
1063 .map_err(|e| RiftError::Mesh(format!("{e}")))
1064 }
1065
1066 pub async fn start_call(&self, peer: PeerId) -> Result<SessionId, RiftError> {
1068 self.start_call_with_srt(peer, None).await
1069 }
1070
1071 pub async fn start_call_with_srt(
1073 &self,
1074 peer: PeerId,
1075 rndzv_srt_uri: Option<String>,
1076 ) -> Result<SessionId, RiftError> {
1077 let parsed_srt = if let Some(uri) = rndzv_srt_uri.as_ref() {
1078 Some(
1079 RndzvSrt::from_uri(uri)
1080 .map_err(|e| RiftError::Other(format!("rndzv srt decode failed: {e}")))?,
1081 )
1082 } else {
1083 None
1084 };
1085 let (handle, voice) = {
1086 let runtime_guard = self.runtime.lock().await;
1087 let runtime = runtime_guard.as_ref().ok_or(RiftError::NotJoined)?;
1088 (runtime.handle.clone(), runtime._voice.clone())
1089 };
1090
1091 let session = handle
1092 .start_call_with_srt(peer, rndzv_srt_uri.clone())
1093 .await
1094 .map_err(|e| RiftError::Mesh(format!("{e}")))?;
1095
1096 if let (Some(srt), Some(voice)) = (parsed_srt, voice) {
1097 let local_peer = RndzvPeerId(self.local_peer_id().0);
1098 let target = RndzvConnectTarget::from_srt(srt, local_peer);
1099 let connector = RndzvConnector::new().with_timeout(Duration::from_secs(5));
1100 let outcome = connector
1101 .connect(target)
1102 .await
1103 .map_err(|e| RiftError::Other(format!("rndzv connect failed: {e}")))?;
1104 let session = outcome.session;
1105
1106 let channel = session
1107 .open_channel(RndzvChannelKind::UnreliableDatagram)
1108 .await
1109 .map_err(|e| RiftError::Other(format!("rndzv channel open failed: {e}")))?;
1110 let remote_peer = PeerId((session.remote).0);
1111 voice.set_rndzv_channel(channel, remote_peer);
1112 }
1113
1114 Ok(session)
1115 }
1116
1117 pub async fn accept_call(&self, session: SessionId) -> Result<(), RiftError> {
1119 let (handle, voice, pending_call_srt) = {
1120 let runtime_guard = self.runtime.lock().await;
1121 let runtime = runtime_guard.as_ref().ok_or(RiftError::NotJoined)?;
1122 (
1123 runtime.handle.clone(),
1124 runtime._voice.clone(),
1125 runtime.pending_call_srt.clone(),
1126 )
1127 };
1128 let srt_uri = {
1129 let mut map = pending_call_srt.lock().unwrap();
1130 map.remove(&session)
1131 };
1132 let parsed_srt = if let Some(uri) = srt_uri.as_ref() {
1133 Some(
1134 RndzvSrt::from_uri(uri)
1135 .map_err(|e| RiftError::Other(format!("rndzv srt decode failed: {e}")))?,
1136 )
1137 } else {
1138 None
1139 };
1140
1141 handle
1142 .accept_call(session)
1143 .await
1144 .map_err(|e| RiftError::Mesh(format!("{e}")))?;
1145
1146 if let (Some(srt), Some(voice)) = (parsed_srt, voice) {
1147 let local_peer = RndzvPeerId(self.local_peer_id().0);
1148 let listener = RndzvListener::new(srt.space, local_peer).with_srt(srt);
1149 let outcome = listener
1150 .accept()
1151 .await
1152 .map_err(|e| RiftError::Other(format!("rndzv accept failed: {e}")))?;
1153 let session = outcome.session;
1154
1155 let channel = session
1156 .open_channel(RndzvChannelKind::UnreliableDatagram)
1157 .await
1158 .map_err(|e| RiftError::Other(format!("rndzv channel open failed: {e}")))?;
1159 let remote_peer = PeerId((session.remote).0);
1160 voice.set_rndzv_channel(channel, remote_peer);
1161 }
1162
1163 Ok(())
1164 }
1165
1166 pub async fn decline_call(&self, session: SessionId, reason: Option<&str>) -> Result<(), RiftError> {
1168 let (handle, pending_call_srt) = {
1169 let runtime_guard = self.runtime.lock().await;
1170 let runtime = runtime_guard.as_ref().ok_or(RiftError::NotJoined)?;
1171 (runtime.handle.clone(), runtime.pending_call_srt.clone())
1172 };
1173 {
1174 let mut map = pending_call_srt.lock().unwrap();
1175 map.remove(&session);
1176 }
1177 handle
1178 .decline_call(session, reason.map(|v| v.to_string()))
1179 .await
1180 .map_err(|e| RiftError::Mesh(format!("{e}")))
1181 }
1182
1183 pub async fn end_call(&self, session: SessionId) -> Result<(), RiftError> {
1185 let (handle, voice) = {
1186 let runtime_guard = self.runtime.lock().await;
1187 let runtime = runtime_guard.as_ref().ok_or(RiftError::NotJoined)?;
1188 (runtime.handle.clone(), runtime._voice.clone())
1189 };
1190 if let Some(voice) = voice {
1191 voice.clear_rndzv_channel();
1192 }
1193 handle
1194 .end_call(session)
1195 .await
1196 .map_err(|e| RiftError::Mesh(format!("{e}")))
1197 }
1198
1199 pub async fn next_event(&self) -> Option<RiftEvent> {
1201 let mut rx = self.event_rx.lock().await;
1202 rx.recv().await
1203 }
1204
1205 pub fn try_next_event(&self) -> Option<RiftEvent> {
1207 let mut rx = self.event_rx.blocking_lock();
1208 rx.try_recv().ok()
1209 }
1210
1211 pub fn set_ptt_active(&self, active: bool) {
1213 self.ptt_active.store(active, Ordering::Relaxed);
1214 }
1215
1216 pub fn set_mute(&self, muted: bool) {
1218 self.mute_active.store(muted, Ordering::Relaxed);
1219 }
1220
1221 pub fn local_peer_id(&self) -> PeerId {
1223 self._local_peer_id
1224 }
1225}
1226
1227struct VoiceRuntimeRef {
1228 mixer: Arc<StdMutex<AudioMixer>>,
1229 frame_samples: usize,
1230 emit_voice: bool,
1231 audio_config: AudioConfig,
1232 tuning: Arc<StdMutex<AudioTuning>>,
1233}
1234
1235fn start_audio_pipeline(
1237 config: RiftConfig,
1238 handle: MeshHandle,
1239 _local_peer_id: PeerId,
1240 ptt_active: Arc<AtomicBool>,
1241 mute_active: Arc<AtomicBool>,
1242) -> Result<VoiceRuntime, RiftError> {
1243 let mut audio_config = AudioConfig::default();
1244 let initial_bitrate = map_quality_to_bitrate(Some(&config.audio.quality));
1245 audio_config.bitrate = initial_bitrate
1246 .clamp(config.qos.min_bitrate, config.qos.max_bitrate)
1247 .max(8_000);
1248 rift_metrics::set_gauge("rift_audio_bitrate", &[], audio_config.bitrate as f64);
1249 let (audio_in, mut audio_rx) = AudioIn::new_with_device(&audio_config, config.audio.input_device.as_deref())
1250 .map_err(|e| RiftError::Audio(format!("{e}")))?;
1251 let mut encoder = OpusEncoder::new(&audio_config).map_err(|e| RiftError::Audio(format!("{e}")))?;
1252 let output_device = config.audio.output_device.clone();
1253 let mixer = Arc::new(StdMutex::new(AudioMixer::with_prebuffer(
1254 audio_config.frame_samples(),
1255 8,
1256 )));
1257
1258 let ptt_enabled = config.audio.ptt;
1259 let vad_enabled = config.audio.vad;
1260 let frame_duration = audio_config.frame_duration();
1261 let tuning = Arc::new(StdMutex::new(AudioTuning {
1262 bitrate: audio_config.bitrate,
1263 fec: false,
1264 loss_pct: 0,
1265 }));
1266 let tuning_for_task = tuning.clone();
1267
1268 let rndzv_channel: Arc<StdMutex<Option<RndzvChannel>>> = Arc::new(StdMutex::new(None));
1269 let rndzv_remote_peer: Arc<StdMutex<Option<PeerId>>> = Arc::new(StdMutex::new(None));
1270 let rndzv_channel_for_task = rndzv_channel.clone();
1271 tokio::spawn(async move {
1272 let mut seq: u32 = 0;
1273 let mut hangover: u8 = 0;
1274 let mut last_applied = AudioTuning {
1275 bitrate: audio_config.bitrate,
1276 fec: false,
1277 loss_pct: 0,
1278 };
1279 let rndzv_sender = rndzv_channel_for_task.clone();
1280 while let Some(frame) = audio_rx.recv().await {
1281 let next_tuning = {
1282 let tuning = tuning_for_task.lock().unwrap();
1283 tuning.clone()
1284 };
1285 if next_tuning.bitrate != last_applied.bitrate
1286 || next_tuning.fec != last_applied.fec
1287 || next_tuning.loss_pct != last_applied.loss_pct
1288 {
1289 if let Err(err) = encoder.set_bitrate(next_tuning.bitrate) {
1290 tracing::debug!("opus bitrate update failed: {err}");
1291 }
1292 if let Err(err) = encoder.set_fec(next_tuning.fec) {
1293 tracing::debug!("opus fec update failed: {err}");
1294 }
1295 if let Err(err) = encoder.set_packet_loss(next_tuning.loss_pct) {
1296 tracing::debug!("opus loss update failed: {err}");
1297 }
1298 last_applied = next_tuning;
1299 }
1300 if ptt_enabled && !ptt_active.load(Ordering::Relaxed) {
1301 continue;
1302 }
1303 if mute_active.load(Ordering::Relaxed) {
1304 continue;
1305 }
1306 if !ptt_enabled && vad_enabled {
1307 let active = is_frame_active(&frame);
1308 if active {
1309 hangover = 4;
1310 } else if hangover > 0 {
1311 hangover -= 1;
1312 }
1313 if !active && hangover == 0 {
1314 continue;
1315 }
1316 }
1317 if frame_duration > Duration::from_millis(20) {
1318 tokio::time::sleep(frame_duration - Duration::from_millis(20)).await;
1319 }
1320 let codec = handle.group_codec().await;
1321 let out = match encode_frame(codec, &frame, &mut encoder) {
1322 Ok(out) => out,
1323 Err(_) => continue,
1324 };
1325 let timestamp = now_timestamp();
1326 let maybe_channel = rndzv_sender.lock().unwrap().clone();
1327 if let Some(channel) = maybe_channel {
1328 let _ = channel.send(&out).await;
1329 } else {
1330 let _ = handle.broadcast_voice(seq, timestamp, out).await;
1331 }
1332 seq = seq.wrapping_add(1);
1333 }
1334 });
1335
1336 if !config.audio.mute_output {
1337 let mixer = mixer.clone();
1338 let audio_config = audio_config.clone();
1339 std::thread::spawn(move || {
1340 let audio_out = match AudioOut::new_with_device(&audio_config, output_device.as_deref()) {
1341 Ok(out) => out,
1342 Err(err) => {
1343 tracing::warn!("audio output init failed: {err}");
1344 return;
1345 }
1346 };
1347 let frame_samples = audio_out.frame_samples();
1348 let frame_duration = audio_config.frame_duration();
1349 let target_frames = 6usize;
1350 let mut last_frame = vec![0i16; frame_samples];
1351 let mut last_active = Instant::now() - Duration::from_secs(1);
1352 loop {
1353 std::thread::sleep(frame_duration);
1354 while audio_out.queued_samples() < target_frames * frame_samples {
1355 let (frame, active) = {
1356 let mut mixer = mixer.lock().unwrap();
1357 mixer.mix_next_with_activity()
1358 };
1359 let out_frame = if active {
1360 last_active = Instant::now();
1361 last_frame.clone_from(&frame);
1362 frame
1363 } else if last_active.elapsed() <= Duration::from_millis(300) {
1364 last_frame.clone()
1365 } else {
1366 frame
1367 };
1368 if out_frame.len() == frame_samples {
1369 audio_out.push_frame(&out_frame);
1370 } else {
1371 break;
1372 }
1373 }
1374 }
1375 });
1376 }
1377
1378 Ok(VoiceRuntime {
1379 _audio_in: audio_in,
1380 mixer,
1381 frame_samples: audio_config.frame_samples(),
1382 emit_voice: config.audio.emit_voice_frames,
1383 audio_config,
1384 tuning,
1385 rndzv_channel,
1386 rndzv_remote_peer,
1387 })
1388}
1389
1390fn audio_level(frame: &[i16]) -> f32 {
1391 let mut sum = 0f32;
1392 for s in frame {
1393 sum += (*s as f32).abs();
1394 }
1395 (sum / frame.len().max(1) as f32) / i16::MAX as f32
1396}
1397
1398fn is_frame_active(frame: &[i16]) -> bool {
1399 let mut sum = 0i64;
1400 for s in frame {
1401 sum += (*s as i64).abs();
1402 }
1403 let avg = sum / frame.len().max(1) as i64;
1404 avg > 250
1405}
1406
1407fn compute_next_tuning(qos: &mut QosState) -> Option<AudioTuning> {
1409 if qos.peer_stats.is_empty() {
1410 return None;
1411 }
1412 let now = Instant::now();
1413 if now.duration_since(qos.last_adjust) < Duration::from_secs(2) {
1414 return None;
1415 }
1416 let mut worst_rtt = 0.0f32;
1417 let mut worst_loss = 0.0f32;
1418 for stats in qos.peer_stats.values() {
1419 worst_rtt = worst_rtt.max(stats.rtt_ms);
1420 worst_loss = worst_loss.max(stats.loss);
1421 }
1422
1423 let mut bitrate = qos.current.bitrate;
1424 let max_latency = qos.profile.max_latency_ms as f32;
1425 let target_latency = qos.profile.target_latency_ms as f32;
1426 if worst_loss > qos.profile.packet_loss_tolerance || worst_rtt > max_latency {
1427 bitrate = ((bitrate as f32) * 0.8) as u32;
1428 } else if worst_loss < qos.profile.packet_loss_tolerance * 0.5
1429 && worst_rtt < target_latency
1430 {
1431 bitrate = ((bitrate as f32) * 1.1) as u32;
1432 }
1433 bitrate = bitrate
1434 .clamp(qos.profile.min_bitrate, qos.profile.max_bitrate)
1435 .max(8_000);
1436
1437 let fec = worst_loss > qos.profile.packet_loss_tolerance * 0.5;
1438 let loss_pct = (worst_loss * 100.0).round().min(100.0) as u8;
1439
1440 let next = AudioTuning {
1441 bitrate,
1442 fec,
1443 loss_pct,
1444 };
1445 if next.bitrate != qos.current.bitrate
1446 || next.fec != qos.current.fec
1447 || next.loss_pct != qos.current.loss_pct
1448 {
1449 rift_metrics::set_gauge("rift_audio_bitrate", &[], next.bitrate as f64);
1450 tracing::info!(
1451 bitrate = next.bitrate,
1452 fec = next.fec,
1453 loss_pct = next.loss_pct,
1454 "qos audio tuning updated"
1455 );
1456 qos.current = next.clone();
1457 qos.last_adjust = now;
1458 return Some(next);
1459 }
1460 None
1461}
1462
1463fn map_quality_to_bitrate(quality: Option<&str>) -> u32 {
1464 match quality.unwrap_or("medium") {
1465 "low" => 24_000,
1466 "high" => 96_000,
1467 _ => 48_000,
1468 }
1469}
1470
1471fn now_timestamp() -> u64 {
1472 SystemTime::now()
1473 .duration_since(UNIX_EPOCH)
1474 .map(|d| d.as_millis() as u64)
1475 .unwrap_or(0)
1476}
1477
1478fn peer_to_stream_id(peer: &PeerId) -> u64 {
1479 let mut bytes = [0u8; 8];
1480 bytes.copy_from_slice(&peer.0[..8]);
1481 u64::from_le_bytes(bytes)
1482}
1483
1484fn default_nat_config(
1485 port: u16,
1486 ports: Option<Vec<u16>>,
1487 stun_servers: Vec<String>,
1488 stun_timeout_ms: Option<u64>,
1489 punch_interval_ms: Option<u64>,
1490 punch_timeout_ms: Option<u64>,
1491 enable_turn: bool,
1492 turn_servers: Vec<String>,
1493 turn_timeout_ms: Option<u64>,
1494 turn_keepalive_ms: Option<u64>,
1495) -> NatConfig {
1496 let mut local_ports = ports.unwrap_or_default();
1497 if local_ports.is_empty() {
1498 local_ports.push(port);
1499 local_ports.push(port.saturating_add(1));
1500 local_ports.push(port.saturating_add(2));
1501 }
1502 NatConfig {
1503 local_ports,
1504 stun_servers: parse_socket_addrs(&stun_servers),
1505 stun_timeout_ms: stun_timeout_ms.unwrap_or(800),
1506 punch_interval_ms: punch_interval_ms.unwrap_or(200),
1507 punch_timeout_ms: punch_timeout_ms.unwrap_or(5000),
1508 turn_servers: if enable_turn {
1509 turn_servers
1510 .into_iter()
1511 .filter_map(|s| parse_turn_server(&s).ok())
1512 .collect()
1513 } else {
1514 Vec::new()
1515 },
1516 turn_timeout_ms: turn_timeout_ms.unwrap_or(1200),
1517 turn_keepalive_ms: turn_keepalive_ms.unwrap_or(12000),
1518 }
1519}
1520
1521fn derive_auth_token(secret: &str, channel: &str) -> Vec<u8> {
1523 let hk = Hkdf::<Sha256>::new(Some(channel.as_bytes()), secret.as_bytes());
1524 let mut out = [0u8; 32];
1525 hk.expand(b"rift-auth", &mut out)
1526 .expect("hkdf expand");
1527 out.to_vec()
1528}
1529
1530fn derive_e2ee_key(
1531 channel: &str,
1532 password: Option<&str>,
1533 invite: Option<&Invite>,
1534 shared_secret: Option<&str>,
1535) -> Option<[u8; 32]> {
1536 if let Some(secret) = shared_secret {
1537 let hk = Hkdf::<Sha256>::new(Some(channel.as_bytes()), secret.as_bytes());
1538 let mut out = [0u8; 32];
1539 hk.expand(b"rift-e2ee", &mut out)
1540 .expect("hkdf expand");
1541 return Some(out);
1542 }
1543 if let Some(invite) = invite {
1544 if invite.channel_key.iter().any(|b| *b != 0) {
1545 return Some(invite.channel_key);
1546 }
1547 }
1548 if let Some(password) = password {
1549 let mut hasher = blake3::Hasher::new();
1550 hasher.update(b"rift-e2ee:");
1551 hasher.update(channel.as_bytes());
1552 hasher.update(b":");
1553 hasher.update(password.as_bytes());
1554 let mut out = [0u8; 32];
1555 out.copy_from_slice(hasher.finalize().as_bytes());
1556 return Some(out);
1557 }
1558 None
1559}
1560
1561fn short_peer(peer_id: &PeerId) -> String {
1562 let hex = peer_id.to_hex();
1563 hex.chars().take(8).collect()
1564}
1565
1566fn resolve_known_hosts_path(cfg: &SecurityConfig) -> Result<PathBuf, RiftError> {
1567 if let Some(path) = &cfg.known_hosts_path {
1568 return Ok(expand_tilde(path));
1569 }
1570 let base = dirs::config_dir().ok_or_else(|| RiftError::Other("config dir missing".to_string()))?;
1571 Ok(base.join("rift").join("known_hosts"))
1572}
1573
1574fn expand_tilde(path: &Path) -> PathBuf {
1575 let path_str = path.to_string_lossy();
1576 if let Some(rest) = path_str.strip_prefix("~/") {
1577 if let Some(home) = dirs::home_dir() {
1578 return home.join(rest);
1579 }
1580 }
1581 path.to_path_buf()
1582}
1583
1584fn load_known_hosts(path: &Path) -> HashMap<PeerId, Vec<u8>> {
1585 let mut map = HashMap::new();
1586 let Ok(content) = fs::read_to_string(path) else {
1587 return map;
1588 };
1589 for line in content.lines() {
1590 let line = line.trim();
1591 if line.is_empty() || line.starts_with('#') {
1592 continue;
1593 }
1594 let mut parts = line.split_whitespace();
1595 let Some(peer_hex) = parts.next() else { continue; };
1596 let Some(key_hex) = parts.next() else { continue; };
1597 let Ok(peer_bytes) = hex::decode(peer_hex) else { continue; };
1598 let Ok(key_bytes) = hex::decode(key_hex) else { continue; };
1599 if peer_bytes.len() != 32 {
1600 continue;
1601 }
1602 let mut peer = [0u8; 32];
1603 peer.copy_from_slice(&peer_bytes);
1604 map.insert(PeerId(peer), key_bytes);
1605 }
1606 map
1607}
1608
1609fn append_known_host(path: &Path, peer_id: PeerId, public_key: &[u8]) -> Result<(), RiftError> {
1610 if let Some(parent) = path.parent() {
1611 fs::create_dir_all(parent).map_err(|e| RiftError::Other(format!("{e}")))?;
1612 }
1613 let line = format!("{} {}\n", peer_id.to_hex(), hex::encode(public_key));
1614 fs::OpenOptions::new()
1615 .create(true)
1616 .append(true)
1617 .open(path)
1618 .and_then(|mut file| std::io::Write::write_all(&mut file, line.as_bytes()))
1619 .map_err(|e| RiftError::Other(format!("{e}")))?;
1620 Ok(())
1621}
1622
1623fn fingerprint_key(public_key: &[u8]) -> String {
1624 let hash = blake3::hash(public_key);
1625 let hex = hash.to_hex().to_string();
1626 hex.chars().take(16).collect()
1627}
1628
1629async fn handle_peer_identity(
1631 event_tx: &mpsc::UnboundedSender<RiftEvent>,
1632 handle: &MeshHandle,
1633 cfg: &SecurityConfig,
1634 peer_id: PeerId,
1635 public_key: &[u8],
1636) -> Result<(), RiftError> {
1637 let computed = rift_core::peer_id_from_public_key_bytes(public_key)
1638 .map_err(|e| RiftError::Other(format!("{e}")))?;
1639 let fingerprint = fingerprint_key(public_key);
1640 let _ = event_tx.send(RiftEvent::PeerFingerprint {
1641 peer: peer_id,
1642 fingerprint: fingerprint.clone(),
1643 });
1644 let known_hosts = resolve_known_hosts_path(cfg)?;
1645 let mut known = load_known_hosts(&known_hosts);
1646
1647 if computed != peer_id {
1648 let msg = format!(
1649 "peer id mismatch for {} (fingerprint {})",
1650 short_peer(&peer_id),
1651 fingerprint
1652 );
1653 tracing::warn!(peer = %peer_id, "peer id mismatch");
1654 let _ = event_tx.send(RiftEvent::SecurityNotice { message: msg.clone() });
1655 audit_log(cfg, "peer_id_mismatch", &peer_id, Some(&fingerprint), &msg);
1656 if cfg.reject_on_mismatch {
1657 handle.disconnect_peer(peer_id).await;
1658 }
1659 return Ok(());
1660 }
1661
1662 if let Some(existing) = known.get(&peer_id) {
1663 if existing != public_key {
1664 let msg = format!(
1665 "peer key mismatch for {} (fingerprint {})",
1666 short_peer(&peer_id),
1667 fingerprint
1668 );
1669 tracing::warn!(peer = %peer_id, "peer key mismatch");
1670 let _ = event_tx.send(RiftEvent::SecurityNotice { message: msg.clone() });
1671 audit_log(cfg, "peer_key_mismatch", &peer_id, Some(&fingerprint), &msg);
1672 if cfg.reject_on_mismatch {
1673 handle.disconnect_peer(peer_id).await;
1674 }
1675 }
1676 return Ok(());
1677 }
1678
1679 if cfg.trust_on_first_use {
1680 append_known_host(&known_hosts, peer_id, public_key)?;
1681 known.insert(peer_id, public_key.to_vec());
1682 let msg = format!(
1683 "new peer: {} fingerprint {} (saved to known_hosts)",
1684 short_peer(&peer_id),
1685 fingerprint
1686 );
1687 tracing::info!(peer = %peer_id, "new peer key stored");
1688 let _ = event_tx.send(RiftEvent::SecurityNotice { message: msg.clone() });
1689 audit_log(cfg, "peer_first_seen", &peer_id, Some(&fingerprint), &msg);
1690 } else {
1691 let msg = format!(
1692 "untrusted peer {} fingerprint {} (TOFU disabled)",
1693 short_peer(&peer_id),
1694 fingerprint
1695 );
1696 tracing::warn!(peer = %peer_id, "untrusted peer (TOFU disabled)");
1697 let _ = event_tx.send(RiftEvent::SecurityNotice { message: msg.clone() });
1698 audit_log(cfg, "peer_untrusted", &peer_id, Some(&fingerprint), &msg);
1699 handle.disconnect_peer(peer_id).await;
1700 }
1701 Ok(())
1702}
1703
1704fn audit_log(cfg: &SecurityConfig, event: &str, peer_id: &PeerId, fingerprint: Option<&str>, message: &str) {
1705 let Some(path) = cfg.audit_log_path.as_ref() else { return; };
1706 let path = expand_tilde(path);
1707 if let Some(parent) = path.parent() {
1708 let _ = fs::create_dir_all(parent);
1709 }
1710 let entry = json!({
1711 "ts": now_timestamp(),
1712 "event": event,
1713 "peer_id": peer_id.to_hex(),
1714 "fingerprint": fingerprint.unwrap_or(""),
1715 "message": message,
1716 });
1717 if let Ok(line) = serde_json::to_string(&entry) {
1718 if let Ok(mut file) = fs::OpenOptions::new().create(true).append(true).open(&path) {
1719 let _ = std::io::Write::write_all(&mut file, line.as_bytes());
1720 let _ = std::io::Write::write_all(&mut file, b"\n");
1721 }
1722 }
1723}
1724
1725fn parse_socket_addr(input: &str) -> Option<SocketAddr> {
1726 input.parse::<SocketAddr>().ok()
1727}
1728
1729fn parse_socket_addrs(inputs: &[String]) -> Vec<SocketAddr> {
1730 let mut out = Vec::new();
1731 for input in inputs {
1732 if let Ok(addr) = input.parse::<SocketAddr>() {
1733 out.push(addr);
1734 continue;
1735 }
1736 if let Ok(mut iter) = input.to_socket_addrs() {
1737 if let Some(addr) = iter.next() {
1738 out.push(addr);
1739 }
1740 }
1741 }
1742 out
1743}
1744
1745#[cfg(feature = "ffi")]
1746pub mod ffi;
1747#[cfg(target_os = "android")]
1748pub mod android_jni;
1749
1750#[cfg(test)]
1751mod tests {
1752 use super::*;
1753 use std::io::Write;
1754 use tempfile::NamedTempFile;
1755
1756 #[test]
1759 fn rift_config_default_values() {
1760 let cfg = RiftConfig::default();
1761 assert_eq!(cfg.listen_port, 7777);
1762 assert!(!cfg.relay);
1763 assert!(cfg.user_name.is_none());
1764 assert!(cfg.metrics_enabled);
1765 assert_eq!(cfg.preferred_codecs, vec![CodecId::Opus, CodecId::PCM16]);
1766 assert!(cfg.preferred_features.contains(&FeatureFlag::Voice));
1767 assert!(cfg.preferred_features.contains(&FeatureFlag::E2EE));
1768 }
1769
1770 #[test]
1771 fn audio_config_sdk_default_values() {
1772 let cfg = AudioConfigSdk::default();
1773 assert!(cfg.enabled);
1774 assert!(cfg.input_device.is_none());
1775 assert!(cfg.output_device.is_none());
1776 assert_eq!(cfg.quality, "medium");
1777 assert!(!cfg.ptt);
1778 assert!(cfg.vad);
1779 assert!(!cfg.mute_output);
1780 assert!(!cfg.emit_voice_frames);
1781 assert!(!cfg.allow_fail);
1782 }
1783
1784 #[test]
1785 fn network_config_sdk_default_values() {
1786 let cfg = NetworkConfigSdk::default();
1787 assert!(cfg.prefer_p2p);
1788 assert!(cfg.local_ports.is_none());
1789 assert!(cfg.known_peers.is_empty());
1790 assert!(cfg.invite.is_none());
1791 assert!(!cfg.stun_servers.is_empty());
1792 assert_eq!(cfg.stun_timeout_ms, Some(800));
1793 assert!(!cfg.enable_turn);
1794 assert!(cfg.turn_servers.is_empty());
1795 assert_eq!(cfg.punch_interval_ms, Some(200));
1796 assert_eq!(cfg.punch_timeout_ms, Some(5000));
1797 }
1798
1799 #[test]
1800 fn dht_config_sdk_default_values() {
1801 let cfg = DhtConfigSdk::default();
1802 assert!(!cfg.enabled);
1803 assert!(cfg.bootstrap_nodes.is_empty());
1804 assert!(cfg.listen_addr.is_none());
1805 }
1806
1807 #[test]
1808 fn security_config_default_values() {
1809 let cfg = SecurityConfig::default();
1810 assert!(cfg.trust_on_first_use);
1811 assert!(cfg.known_hosts_path.is_none());
1812 assert!(!cfg.reject_on_mismatch);
1813 assert!(cfg.channel_shared_secret.is_none());
1814 assert!(cfg.audit_log_path.is_none());
1815 assert_eq!(cfg.rekey_interval_secs, Some(600));
1816 }
1817
1818 #[test]
1821 fn derive_auth_token_deterministic() {
1822 let token1 = derive_auth_token("secret", "channel");
1823 let token2 = derive_auth_token("secret", "channel");
1824 assert_eq!(token1, token2);
1825 assert_eq!(token1.len(), 32);
1826 }
1827
1828 #[test]
1829 fn derive_auth_token_different_inputs() {
1830 let token1 = derive_auth_token("secret1", "channel");
1831 let token2 = derive_auth_token("secret2", "channel");
1832 assert_ne!(token1, token2);
1833
1834 let token3 = derive_auth_token("secret", "channel1");
1835 let token4 = derive_auth_token("secret", "channel2");
1836 assert_ne!(token3, token4);
1837 }
1838
1839 #[test]
1840 fn derive_e2ee_key_from_shared_secret() {
1841 let key = derive_e2ee_key("channel", None, None, Some("shared_secret"));
1842 assert!(key.is_some());
1843 let key = key.unwrap();
1844 assert_eq!(key.len(), 32);
1845
1846 let key2 = derive_e2ee_key("channel", None, None, Some("shared_secret"));
1848 assert_eq!(key, key2.unwrap());
1849 }
1850
1851 #[test]
1852 fn derive_e2ee_key_from_password() {
1853 let key = derive_e2ee_key("channel", Some("password"), None, None);
1854 assert!(key.is_some());
1855 let key = key.unwrap();
1856 assert_eq!(key.len(), 32);
1857
1858 let key2 = derive_e2ee_key("channel", Some("other_password"), None, None);
1860 assert_ne!(key, key2.unwrap());
1861 }
1862
1863 #[test]
1864 fn derive_e2ee_key_from_invite() {
1865 let invite = Invite {
1866 channel_name: "test".to_string(),
1867 password: None,
1868 channel_key: [42u8; 32],
1869 known_peers: Vec::new(),
1870 candidates: Vec::new(),
1871 relay_candidates: Vec::new(),
1872 version: 2,
1873 created_at: 0,
1874 };
1875 let key = derive_e2ee_key("channel", None, Some(&invite), None);
1876 assert_eq!(key, Some([42u8; 32]));
1877 }
1878
1879 #[test]
1880 fn derive_e2ee_key_none_without_inputs() {
1881 let key = derive_e2ee_key("channel", None, None, None);
1882 assert!(key.is_none());
1883 }
1884
1885 #[test]
1886 fn derive_e2ee_key_priority_shared_secret_over_password() {
1887 let key_secret = derive_e2ee_key("channel", Some("password"), None, Some("secret"));
1889 let key_password = derive_e2ee_key("channel", Some("password"), None, None);
1890 assert_ne!(key_secret, key_password);
1891 }
1892
1893 #[test]
1894 fn fingerprint_key_returns_16_chars() {
1895 let key = [1u8; 32];
1896 let fp = fingerprint_key(&key);
1897 assert_eq!(fp.len(), 16);
1898 assert!(fp.chars().all(|c| c.is_ascii_hexdigit()));
1899 }
1900
1901 #[test]
1902 fn fingerprint_key_different_keys() {
1903 let fp1 = fingerprint_key(&[1u8; 32]);
1904 let fp2 = fingerprint_key(&[2u8; 32]);
1905 assert_ne!(fp1, fp2);
1906 }
1907
1908 #[test]
1909 fn short_peer_returns_8_chars() {
1910 let peer = PeerId([0xab; 32]);
1911 let short = short_peer(&peer);
1912 assert_eq!(short.len(), 8);
1913 assert_eq!(short, "abababab");
1914 }
1915
1916 #[test]
1919 fn audio_level_silent_frame() {
1920 let frame = vec![0i16; 480];
1921 let level = audio_level(&frame);
1922 assert_eq!(level, 0.0);
1923 }
1924
1925 #[test]
1926 fn audio_level_loud_frame() {
1927 let frame = vec![i16::MAX; 480];
1928 let level = audio_level(&frame);
1929 assert!((level - 1.0).abs() < 0.01);
1930 }
1931
1932 #[test]
1933 fn audio_level_mixed_frame() {
1934 let mut frame = vec![0i16; 480];
1935 for i in 0..240 {
1936 frame[i] = i16::MAX / 2;
1937 }
1938 let level = audio_level(&frame);
1939 assert!(level > 0.0 && level < 1.0);
1940 }
1941
1942 #[test]
1943 fn is_frame_active_silent() {
1944 let frame = vec![0i16; 480];
1945 assert!(!is_frame_active(&frame));
1946 }
1947
1948 #[test]
1949 fn is_frame_active_low_noise() {
1950 let frame = vec![100i16; 480];
1951 assert!(!is_frame_active(&frame));
1952 }
1953
1954 #[test]
1955 fn is_frame_active_loud() {
1956 let frame = vec![1000i16; 480];
1957 assert!(is_frame_active(&frame));
1958 }
1959
1960 #[test]
1963 fn map_quality_to_bitrate_low() {
1964 assert_eq!(map_quality_to_bitrate(Some("low")), 24_000);
1965 }
1966
1967 #[test]
1968 fn map_quality_to_bitrate_medium() {
1969 assert_eq!(map_quality_to_bitrate(Some("medium")), 48_000);
1970 }
1971
1972 #[test]
1973 fn map_quality_to_bitrate_high() {
1974 assert_eq!(map_quality_to_bitrate(Some("high")), 96_000);
1975 }
1976
1977 #[test]
1978 fn map_quality_to_bitrate_default() {
1979 assert_eq!(map_quality_to_bitrate(None), 48_000);
1980 assert_eq!(map_quality_to_bitrate(Some("unknown")), 48_000);
1981 }
1982
1983 #[test]
1986 fn expand_tilde_with_tilde() {
1987 let path = PathBuf::from("~/test/path");
1988 let expanded = expand_tilde(&path);
1989 assert!(!expanded.to_string_lossy().starts_with("~/"));
1991 }
1992
1993 #[test]
1994 fn expand_tilde_without_tilde() {
1995 let path = PathBuf::from("/absolute/path");
1996 let expanded = expand_tilde(&path);
1997 assert_eq!(expanded, path);
1998 }
1999
2000 #[test]
2001 fn expand_tilde_relative_path() {
2002 let path = PathBuf::from("relative/path");
2003 let expanded = expand_tilde(&path);
2004 assert_eq!(expanded, path);
2005 }
2006
2007 #[test]
2010 fn parse_socket_addr_valid() {
2011 let addr = parse_socket_addr("127.0.0.1:8080");
2012 assert!(addr.is_some());
2013 assert_eq!(addr.unwrap().port(), 8080);
2014 }
2015
2016 #[test]
2017 fn parse_socket_addr_invalid() {
2018 let addr = parse_socket_addr("invalid");
2019 assert!(addr.is_none());
2020 }
2021
2022 #[test]
2023 fn parse_socket_addrs_mixed() {
2024 let inputs = vec![
2025 "127.0.0.1:8080".to_string(),
2026 "invalid".to_string(),
2027 "192.168.1.1:9000".to_string(),
2028 ];
2029 let addrs = parse_socket_addrs(&inputs);
2030 assert_eq!(addrs.len(), 2);
2031 }
2032
2033 #[test]
2034 fn parse_socket_addrs_empty() {
2035 let addrs = parse_socket_addrs(&[]);
2036 assert!(addrs.is_empty());
2037 }
2038
2039 #[test]
2042 fn load_known_hosts_empty_file() {
2043 let tmp = NamedTempFile::new().unwrap();
2044 let hosts = load_known_hosts(tmp.path());
2045 assert!(hosts.is_empty());
2046 }
2047
2048 #[test]
2049 fn load_known_hosts_with_entries() {
2050 let mut tmp = NamedTempFile::new().unwrap();
2051 let peer_hex = hex::encode([0xab; 32]);
2052 let key_hex = hex::encode([0xcd; 32]);
2053 writeln!(tmp, "{} {}", peer_hex, key_hex).unwrap();
2054 tmp.flush().unwrap();
2055
2056 let hosts = load_known_hosts(tmp.path());
2057 assert_eq!(hosts.len(), 1);
2058 let peer = PeerId([0xab; 32]);
2059 assert!(hosts.contains_key(&peer));
2060 assert_eq!(hosts[&peer], vec![0xcd; 32]);
2061 }
2062
2063 #[test]
2064 fn load_known_hosts_with_comments() {
2065 let mut tmp = NamedTempFile::new().unwrap();
2066 writeln!(tmp, "# This is a comment").unwrap();
2067 writeln!(tmp, "").unwrap();
2068 let peer_hex = hex::encode([0xab; 32]);
2069 let key_hex = hex::encode([0xcd; 32]);
2070 writeln!(tmp, "{} {}", peer_hex, key_hex).unwrap();
2071 tmp.flush().unwrap();
2072
2073 let hosts = load_known_hosts(tmp.path());
2074 assert_eq!(hosts.len(), 1);
2075 }
2076
2077 #[test]
2078 fn load_known_hosts_nonexistent() {
2079 let hosts = load_known_hosts(Path::new("/nonexistent/path"));
2080 assert!(hosts.is_empty());
2081 }
2082
2083 #[test]
2086 fn peer_to_stream_id_deterministic() {
2087 let peer = PeerId([0x12; 32]);
2088 let id1 = peer_to_stream_id(&peer);
2089 let id2 = peer_to_stream_id(&peer);
2090 assert_eq!(id1, id2);
2091 }
2092
2093 #[test]
2094 fn peer_to_stream_id_different_peers() {
2095 let peer1 = PeerId([0x12; 32]);
2096 let peer2 = PeerId([0x34; 32]);
2097 let id1 = peer_to_stream_id(&peer1);
2098 let id2 = peer_to_stream_id(&peer2);
2099 assert_ne!(id1, id2);
2100 }
2101
2102 #[test]
2105 fn now_timestamp_nonzero() {
2106 let ts = now_timestamp();
2107 assert!(ts > 1577836800000);
2109 }
2110
2111 #[test]
2114 fn rift_error_display() {
2115 assert_eq!(format!("{}", RiftError::NotInitialized), "not initialized");
2116 assert_eq!(format!("{}", RiftError::AlreadyJoined), "channel already joined");
2117 assert_eq!(format!("{}", RiftError::NotJoined), "channel not joined");
2118 assert_eq!(format!("{}", RiftError::Mesh("test".to_string())), "mesh error: test");
2119 assert_eq!(format!("{}", RiftError::Audio("test".to_string())), "audio error: test");
2120 assert_eq!(format!("{}", RiftError::Other("test".to_string())), "other: test");
2121 }
2122
2123 #[test]
2126 fn link_stats_construction() {
2127 let stats = LinkStats {
2128 rtt_ms: 50.0,
2129 loss: 0.01,
2130 jitter_ms: 5.0,
2131 };
2132 assert_eq!(stats.rtt_ms, 50.0);
2133 assert_eq!(stats.loss, 0.01);
2134 assert_eq!(stats.jitter_ms, 5.0);
2135 }
2136
2137 #[test]
2138 fn global_stats_construction() {
2139 let stats = GlobalStats {
2140 num_peers: 5,
2141 num_sessions: 2,
2142 packets_sent: 1000,
2143 packets_received: 950,
2144 bytes_sent: 100_000,
2145 bytes_received: 95_000,
2146 };
2147 assert_eq!(stats.num_peers, 5);
2148 assert_eq!(stats.packets_sent, 1000);
2149 }
2150
2151 #[test]
2154 fn route_kind_direct() {
2155 let route = RouteKind::Direct;
2156 assert!(matches!(route, RouteKind::Direct));
2157 }
2158
2159 #[test]
2160 fn route_kind_relayed() {
2161 let via = PeerId([0xab; 32]);
2162 let route = RouteKind::Relayed { via };
2163 if let RouteKind::Relayed { via: v } = route {
2164 assert_eq!(v.0, [0xab; 32]);
2165 } else {
2166 panic!("expected relayed route");
2167 }
2168 }
2169
2170 #[test]
2173 fn sdk_version_defined() {
2174 assert_eq!(SDK_VERSION, "0.1.0");
2175 assert_eq!(SDK_ABI_VERSION, 1);
2176 }
2177
2178 #[test]
2181 fn default_nat_config_basic() {
2182 let cfg = default_nat_config(
2183 7777,
2184 None,
2185 vec!["stun.example.com:3478".to_string()],
2186 Some(1000),
2187 Some(100),
2188 Some(3000),
2189 false,
2190 Vec::new(),
2191 Some(2000),
2192 Some(15000),
2193 );
2194 assert_eq!(cfg.local_ports, vec![7777, 7778, 7779]);
2195 assert_eq!(cfg.stun_timeout_ms, 1000);
2196 assert_eq!(cfg.punch_interval_ms, 100);
2197 assert_eq!(cfg.punch_timeout_ms, 3000);
2198 assert!(cfg.turn_servers.is_empty());
2199 }
2200
2201 #[test]
2202 fn default_nat_config_custom_ports() {
2203 let cfg = default_nat_config(
2204 7777,
2205 Some(vec![8000, 8001]),
2206 Vec::new(),
2207 None,
2208 None,
2209 None,
2210 false,
2211 Vec::new(),
2212 None,
2213 None,
2214 );
2215 assert_eq!(cfg.local_ports, vec![8000, 8001]);
2216 }
2217
2218 #[test]
2221 fn rift_config_serialization() {
2222 let cfg = RiftConfig::default();
2223 let json = serde_json::to_string(&cfg).unwrap();
2224 let parsed: RiftConfig = serde_json::from_str(&json).unwrap();
2225 assert_eq!(parsed.listen_port, cfg.listen_port);
2226 assert_eq!(parsed.relay, cfg.relay);
2227 }
2228
2229 #[test]
2230 fn security_config_serialization() {
2231 let cfg = SecurityConfig {
2232 trust_on_first_use: false,
2233 known_hosts_path: Some(PathBuf::from("/test/path")),
2234 reject_on_mismatch: true,
2235 channel_shared_secret: Some("secret".to_string()),
2236 audit_log_path: None,
2237 rekey_interval_secs: Some(300),
2238 };
2239 let json = serde_json::to_string(&cfg).unwrap();
2240 let parsed: SecurityConfig = serde_json::from_str(&json).unwrap();
2241 assert!(!parsed.trust_on_first_use);
2242 assert!(parsed.reject_on_mismatch);
2243 assert_eq!(parsed.rekey_interval_secs, Some(300));
2244 }
2245
2246 #[test]
2247 fn srt_invite_serialization() {
2248 let invite = SrtInvite {
2249 label: "Test Call".to_string(),
2250 uri: "srt://test".to_string(),
2251 };
2252 let json = serde_json::to_string(&invite).unwrap();
2253 let parsed: SrtInvite = serde_json::from_str(&json).unwrap();
2254 assert_eq!(parsed.label, "Test Call");
2255 assert_eq!(parsed.uri, "srt://test");
2256 }
2257
2258 #[test]
2261 fn compute_next_tuning_empty_stats() {
2262 let mut qos = QosState {
2263 profile: QosProfile::default(),
2264 peer_stats: HashMap::new(),
2265 current: AudioTuning {
2266 bitrate: 48_000,
2267 fec: false,
2268 loss_pct: 0,
2269 },
2270 last_adjust: Instant::now() - Duration::from_secs(10),
2271 };
2272 let result = compute_next_tuning(&mut qos);
2273 assert!(result.is_none());
2274 }
2275
2276 #[test]
2277 fn compute_next_tuning_high_loss() {
2278 let mut qos = QosState {
2279 profile: QosProfile {
2280 packet_loss_tolerance: 0.05,
2281 max_latency_ms: 200,
2282 target_latency_ms: 100,
2283 min_bitrate: 16_000,
2284 max_bitrate: 128_000,
2285 ..Default::default()
2286 },
2287 peer_stats: HashMap::new(),
2288 current: AudioTuning {
2289 bitrate: 64_000,
2290 fec: false,
2291 loss_pct: 0,
2292 },
2293 last_adjust: Instant::now() - Duration::from_secs(10),
2294 };
2295 qos.peer_stats.insert(
2296 PeerId([0; 32]),
2297 LinkStats {
2298 rtt_ms: 50.0,
2299 loss: 0.10, jitter_ms: 5.0,
2301 },
2302 );
2303 let result = compute_next_tuning(&mut qos);
2304 assert!(result.is_some());
2305 let tuning = result.unwrap();
2306 assert!(tuning.bitrate < 64_000); assert!(tuning.fec); }
2309
2310 #[test]
2311 fn compute_next_tuning_good_conditions() {
2312 let mut qos = QosState {
2313 profile: QosProfile {
2314 packet_loss_tolerance: 0.05,
2315 max_latency_ms: 200,
2316 target_latency_ms: 100,
2317 min_bitrate: 16_000,
2318 max_bitrate: 128_000,
2319 ..Default::default()
2320 },
2321 peer_stats: HashMap::new(),
2322 current: AudioTuning {
2323 bitrate: 48_000,
2324 fec: true,
2325 loss_pct: 5,
2326 },
2327 last_adjust: Instant::now() - Duration::from_secs(10),
2328 };
2329 qos.peer_stats.insert(
2330 PeerId([0; 32]),
2331 LinkStats {
2332 rtt_ms: 20.0,
2333 loss: 0.01, jitter_ms: 2.0,
2335 },
2336 );
2337 let result = compute_next_tuning(&mut qos);
2338 assert!(result.is_some());
2339 let tuning = result.unwrap();
2340 assert!(tuning.bitrate >= 48_000); }
2342
2343 #[test]
2344 fn compute_next_tuning_respects_cooldown() {
2345 let mut qos = QosState {
2346 profile: QosProfile::default(),
2347 peer_stats: HashMap::new(),
2348 current: AudioTuning {
2349 bitrate: 48_000,
2350 fec: false,
2351 loss_pct: 0,
2352 },
2353 last_adjust: Instant::now(), };
2355 qos.peer_stats.insert(
2356 PeerId([0; 32]),
2357 LinkStats {
2358 rtt_ms: 500.0,
2359 loss: 0.50,
2360 jitter_ms: 50.0,
2361 },
2362 );
2363 let result = compute_next_tuning(&mut qos);
2364 assert!(result.is_none()); }
2366}