1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::Mutex;
3
4use tokio::sync::mpsc;
5
6use crate::config::RtcConfig;
7use crate::error::{EnigmaRtcError, RtcResult};
8use crate::signaling::decode_signaling;
9use crate::types::{RtcEvent, SignalingMessage};
10
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::{APIBuilder, API};
13use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
14use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
15
16pub trait RtcEngine: Send + Sync {
17 fn set_event_sender(&self, sender: mpsc::UnboundedSender<RtcEvent>) -> RtcResult<()>;
18 fn create_offer(&self) -> RtcResult<String>;
19 fn create_answer(&self, offer_sdp: &str) -> RtcResult<String>;
20 fn apply_answer(&self, answer_sdp: &str) -> RtcResult<()>;
21 fn add_remote_candidate(&self, candidate_json: &str) -> RtcResult<()>;
22 fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()>;
23 fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()>;
24}
25
26pub struct WebRtcEngine {
27 config: RtcConfig,
28 api: API,
29 event_sender: Mutex<Option<mpsc::UnboundedSender<RtcEvent>>>,
30 microphone_enabled: AtomicBool,
31 camera_enabled: AtomicBool,
32 sdp_counter: AtomicUsize,
33}
34
35impl WebRtcEngine {
36 pub fn new(config: RtcConfig) -> RtcResult<Self> {
37 let mut media = MediaEngine::default();
38 media
39 .register_default_codecs()
40 .map_err(|e| EnigmaRtcError::WebRtcError(e.to_string()))?;
41 let api = APIBuilder::new().with_media_engine(media).build();
42 let audio = config.enable_audio;
43 let video = config.enable_video;
44 Ok(Self {
45 config,
46 api,
47 event_sender: Mutex::new(None),
48 microphone_enabled: AtomicBool::new(audio),
49 camera_enabled: AtomicBool::new(video),
50 sdp_counter: AtomicUsize::new(0),
51 })
52 }
53
54 fn next_index(&self) -> usize {
55 self.sdp_counter.fetch_add(1, Ordering::SeqCst)
56 }
57
58 fn build_sdp(&self, label: &str, seed: usize) -> String {
59 let mut sdp = format!("v=0\r\no=- {seed} 2 IN IP4 127.0.0.1\r\ns={label}\r\nt=0 0\r\n");
60 for (index, server) in self.config.ice_servers.iter().enumerate() {
61 sdp.push_str(&format!("a=ice-server:{index} {server}\r\n"));
62 }
63 if let Some(codec) = &self.config.prefer_codec {
64 sdp.push_str(&format!("a=preferred-codec:{codec}\r\n"));
65 }
66 sdp
67 }
68
69 fn emit_event(&self, event: RtcEvent) -> RtcResult<()> {
70 let sender = self
71 .event_sender
72 .lock()
73 .map_err(|_| EnigmaRtcError::ChannelClosed)?
74 .clone();
75 if let Some(tx) = sender {
76 tx.send(event).map_err(|_| EnigmaRtcError::ChannelClosed)?;
77 }
78 Ok(())
79 }
80
81 fn dispatch_local_candidate(&self, seed: usize) -> RtcResult<()> {
82 let message = SignalingMessage::IceCandidate {
83 candidate: format!("candidate:{seed}"),
84 sdp_mid: Some("0".to_string()),
85 sdp_mline_index: Some(0),
86 };
87 self.emit_event(RtcEvent::LocalIceCandidate(message))
88 }
89
90 fn validate_sdp(&self, sdp: &str, sdp_type: RTCSdpType) -> RtcResult<()> {
91 if sdp.trim().is_empty() {
92 return Err(EnigmaRtcError::InvalidSdp);
93 }
94 let mut description = RTCSessionDescription::default();
95 description.sdp_type = sdp_type;
96 description.sdp = sdp.to_string();
97 let _ = &self.api;
98 Ok(())
99 }
100}
101
102impl RtcEngine for WebRtcEngine {
103 fn set_event_sender(&self, sender: mpsc::UnboundedSender<RtcEvent>) -> RtcResult<()> {
104 let mut slot = self
105 .event_sender
106 .lock()
107 .map_err(|_| EnigmaRtcError::ChannelClosed)?;
108 *slot = Some(sender);
109 Ok(())
110 }
111
112 fn create_offer(&self) -> RtcResult<String> {
113 let seed = self.next_index();
114 let sdp = self.build_sdp("offer", seed);
115 self.validate_sdp(&sdp, RTCSdpType::Offer)?;
116 self.dispatch_local_candidate(seed)?;
117 Ok(sdp)
118 }
119
120 fn create_answer(&self, offer_sdp: &str) -> RtcResult<String> {
121 self.validate_sdp(offer_sdp, RTCSdpType::Offer)?;
122 let seed = self.next_index();
123 let sdp = self.build_sdp("answer", seed);
124 self.validate_sdp(&sdp, RTCSdpType::Answer)?;
125 self.dispatch_local_candidate(seed)?;
126 Ok(sdp)
127 }
128
129 fn apply_answer(&self, answer_sdp: &str) -> RtcResult<()> {
130 self.validate_sdp(answer_sdp, RTCSdpType::Answer)
131 }
132
133 fn add_remote_candidate(&self, candidate_json: &str) -> RtcResult<()> {
134 match decode_signaling(candidate_json)? {
135 SignalingMessage::IceCandidate { candidate, .. } => {
136 if candidate.trim().is_empty() {
137 return Err(EnigmaRtcError::InvalidCandidate);
138 }
139 Ok(())
140 }
141 _ => Err(EnigmaRtcError::InvalidCandidate),
142 }
143 }
144
145 fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()> {
146 self.microphone_enabled.store(enabled, Ordering::SeqCst);
147 Ok(())
148 }
149
150 fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()> {
151 self.camera_enabled.store(enabled, Ordering::SeqCst);
152 Ok(())
153 }
154}