enigma_rtc/
webrtc.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::Mutex;
3
4use tokio::sync::mpsc;
5
6use crate::config::RtcConfig;
7use crate::error::{EnigmaRtcError, RtcResult};
8use crate::signaling::decode_signaling;
9use crate::types::{RtcEvent, SignalingMessage};
10
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::{APIBuilder, API};
13use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;
14use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
15
16pub trait RtcEngine: Send + Sync {
17    fn set_event_sender(&self, sender: mpsc::UnboundedSender<RtcEvent>) -> RtcResult<()>;
18    fn create_offer(&self) -> RtcResult<String>;
19    fn create_answer(&self, offer_sdp: &str) -> RtcResult<String>;
20    fn apply_answer(&self, answer_sdp: &str) -> RtcResult<()>;
21    fn add_remote_candidate(&self, candidate_json: &str) -> RtcResult<()>;
22    fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()>;
23    fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()>;
24}
25
26pub struct WebRtcEngine {
27    config: RtcConfig,
28    api: API,
29    event_sender: Mutex<Option<mpsc::UnboundedSender<RtcEvent>>>,
30    microphone_enabled: AtomicBool,
31    camera_enabled: AtomicBool,
32    sdp_counter: AtomicUsize,
33}
34
35impl WebRtcEngine {
36    pub fn new(config: RtcConfig) -> RtcResult<Self> {
37        let mut media = MediaEngine::default();
38        media
39            .register_default_codecs()
40            .map_err(|e| EnigmaRtcError::WebRtcError(e.to_string()))?;
41        let api = APIBuilder::new().with_media_engine(media).build();
42        let audio = config.enable_audio;
43        let video = config.enable_video;
44        Ok(Self {
45            config,
46            api,
47            event_sender: Mutex::new(None),
48            microphone_enabled: AtomicBool::new(audio),
49            camera_enabled: AtomicBool::new(video),
50            sdp_counter: AtomicUsize::new(0),
51        })
52    }
53
54    fn next_index(&self) -> usize {
55        self.sdp_counter.fetch_add(1, Ordering::SeqCst)
56    }
57
58    fn build_sdp(&self, label: &str, seed: usize) -> String {
59        let mut sdp = format!("v=0\r\no=- {seed} 2 IN IP4 127.0.0.1\r\ns={label}\r\nt=0 0\r\n");
60        for (index, server) in self.config.ice_servers.iter().enumerate() {
61            sdp.push_str(&format!("a=ice-server:{index} {server}\r\n"));
62        }
63        if let Some(codec) = &self.config.prefer_codec {
64            sdp.push_str(&format!("a=preferred-codec:{codec}\r\n"));
65        }
66        sdp
67    }
68
69    fn emit_event(&self, event: RtcEvent) -> RtcResult<()> {
70        let sender = self
71            .event_sender
72            .lock()
73            .map_err(|_| EnigmaRtcError::ChannelClosed)?
74            .clone();
75        if let Some(tx) = sender {
76            tx.send(event).map_err(|_| EnigmaRtcError::ChannelClosed)?;
77        }
78        Ok(())
79    }
80
81    fn dispatch_local_candidate(&self, seed: usize) -> RtcResult<()> {
82        let message = SignalingMessage::IceCandidate {
83            candidate: format!("candidate:{seed}"),
84            sdp_mid: Some("0".to_string()),
85            sdp_mline_index: Some(0),
86        };
87        self.emit_event(RtcEvent::LocalIceCandidate(message))
88    }
89
90    fn validate_sdp(&self, sdp: &str, sdp_type: RTCSdpType) -> RtcResult<()> {
91        if sdp.trim().is_empty() {
92            return Err(EnigmaRtcError::InvalidSdp);
93        }
94        let mut description = RTCSessionDescription::default();
95        description.sdp_type = sdp_type;
96        description.sdp = sdp.to_string();
97        let _ = &self.api;
98        Ok(())
99    }
100}
101
102impl RtcEngine for WebRtcEngine {
103    fn set_event_sender(&self, sender: mpsc::UnboundedSender<RtcEvent>) -> RtcResult<()> {
104        let mut slot = self
105            .event_sender
106            .lock()
107            .map_err(|_| EnigmaRtcError::ChannelClosed)?;
108        *slot = Some(sender);
109        Ok(())
110    }
111
112    fn create_offer(&self) -> RtcResult<String> {
113        let seed = self.next_index();
114        let sdp = self.build_sdp("offer", seed);
115        self.validate_sdp(&sdp, RTCSdpType::Offer)?;
116        self.dispatch_local_candidate(seed)?;
117        Ok(sdp)
118    }
119
120    fn create_answer(&self, offer_sdp: &str) -> RtcResult<String> {
121        self.validate_sdp(offer_sdp, RTCSdpType::Offer)?;
122        let seed = self.next_index();
123        let sdp = self.build_sdp("answer", seed);
124        self.validate_sdp(&sdp, RTCSdpType::Answer)?;
125        self.dispatch_local_candidate(seed)?;
126        Ok(sdp)
127    }
128
129    fn apply_answer(&self, answer_sdp: &str) -> RtcResult<()> {
130        self.validate_sdp(answer_sdp, RTCSdpType::Answer)
131    }
132
133    fn add_remote_candidate(&self, candidate_json: &str) -> RtcResult<()> {
134        match decode_signaling(candidate_json)? {
135            SignalingMessage::IceCandidate { candidate, .. } => {
136                if candidate.trim().is_empty() {
137                    return Err(EnigmaRtcError::InvalidCandidate);
138                }
139                Ok(())
140            }
141            _ => Err(EnigmaRtcError::InvalidCandidate),
142        }
143    }
144
145    fn set_microphone_enabled(&self, enabled: bool) -> RtcResult<()> {
146        self.microphone_enabled.store(enabled, Ordering::SeqCst);
147        Ok(())
148    }
149
150    fn set_camera_enabled(&self, enabled: bool) -> RtcResult<()> {
151        self.camera_enabled.store(enabled, Ordering::SeqCst);
152        Ok(())
153    }
154}