datachannel/
datachannel.rs

1use std::convert::TryFrom;
2use std::ffi::{c_void, CStr, CString};
3use std::os::raw::c_char;
4use std::ptr;
5use std::slice;
6
7use datachannel_sys as sys;
8
9use crate::error::{check, Error, Result};
10use crate::logger;
11
12#[derive(Debug, Clone, Default)]
13pub struct Reliability {
14    pub unordered: bool,
15    pub unreliable: bool,
16    pub max_packet_life_time: u32,
17    pub max_retransmits: u32,
18}
19
20impl Reliability {
21    fn from_raw(raw: sys::rtcReliability) -> Self {
22        Self {
23            unordered: raw.unordered,
24            unreliable: raw.unreliable,
25            max_packet_life_time: raw.maxPacketLifeTime,
26            max_retransmits: raw.maxRetransmits,
27        }
28    }
29
30    pub fn unordered(mut self) -> Self {
31        self.unordered = true;
32        self
33    }
34
35    pub fn unreliable(mut self) -> Self {
36        self.unreliable = true;
37        self
38    }
39
40    pub fn max_packet_life_time(mut self, max_packet_life_time: u32) -> Self {
41        self.max_packet_life_time = max_packet_life_time;
42        self
43    }
44
45    pub fn max_retransmits(mut self, max_retransmits: u32) -> Self {
46        self.max_retransmits = max_retransmits;
47        self
48    }
49
50    pub(crate) fn as_raw(&self) -> sys::rtcReliability {
51        sys::rtcReliability {
52            unordered: self.unordered,
53            unreliable: self.unreliable,
54            maxPacketLifeTime: self.max_packet_life_time,
55            maxRetransmits: self.max_retransmits,
56        }
57    }
58}
59
60#[derive(Debug, Clone, Default)]
61pub struct DataChannelInit {
62    reliability: Reliability,
63    protocol: CString,
64    negotiated: bool,
65    manual_stream: bool,
66    stream: u16,
67}
68
69impl DataChannelInit {
70    pub fn reliability(mut self, reliability: Reliability) -> Self {
71        self.reliability = reliability;
72        self
73    }
74
75    pub fn protocol(mut self, protocol: &str) -> Self {
76        self.protocol = CString::new(protocol).unwrap();
77        self
78    }
79
80    pub fn negotiated(mut self) -> Self {
81        self.negotiated = true;
82        self
83    }
84
85    pub fn manual_stream(mut self) -> Self {
86        self.manual_stream = true;
87        self
88    }
89
90    /// numeric ID 0-65534, ignored if `manual_stream` is false
91    pub fn stream(mut self, stream: u16) -> Self {
92        self.stream = stream;
93        self
94    }
95
96    pub(crate) fn as_raw(&self) -> Result<sys::rtcDataChannelInit> {
97        Ok(sys::rtcDataChannelInit {
98            reliability: self.reliability.as_raw(),
99            protocol: self.protocol.as_ptr(),
100            negotiated: self.negotiated,
101            manualStream: self.manual_stream,
102            stream: self.stream,
103        })
104    }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
108pub struct DataChannelId(pub(crate) i32);
109
110#[allow(unused_variables)]
111pub trait DataChannelHandler {
112    fn on_open(&mut self) {}
113    fn on_closed(&mut self) {}
114    fn on_error(&mut self, err: &str) {}
115    fn on_message(&mut self, msg: &[u8]) {}
116    fn on_buffered_amount_low(&mut self) {}
117    fn on_available(&mut self) {}
118}
119
120pub struct RtcDataChannel<D> {
121    id: DataChannelId,
122    dc_handler: D,
123}
124
125impl<D> RtcDataChannel<D>
126where
127    D: DataChannelHandler + Send,
128{
129    pub(crate) fn new(id: DataChannelId, dc_handler: D) -> Result<Box<Self>> {
130        unsafe {
131            let mut rtc_dc = Box::new(RtcDataChannel { id, dc_handler });
132            let ptr = &mut *rtc_dc;
133
134            sys::rtcSetUserPointer(id.0, ptr as *mut _ as *mut c_void);
135
136            check(sys::rtcSetOpenCallback(
137                id.0,
138                Some(RtcDataChannel::<D>::open_cb),
139            ))?;
140
141            check(sys::rtcSetClosedCallback(
142                id.0,
143                Some(RtcDataChannel::<D>::closed_cb),
144            ))?;
145
146            check(sys::rtcSetErrorCallback(
147                id.0,
148                Some(RtcDataChannel::<D>::error_cb),
149            ))?;
150
151            check(sys::rtcSetMessageCallback(
152                id.0,
153                Some(RtcDataChannel::<D>::message_cb),
154            ))?;
155
156            check(sys::rtcSetBufferedAmountLowCallback(
157                id.0,
158                Some(RtcDataChannel::<D>::buffered_amount_low_cb),
159            ))?;
160
161            check(sys::rtcSetAvailableCallback(
162                id.0,
163                Some(RtcDataChannel::<D>::available_cb),
164            ))?;
165
166            Ok(rtc_dc)
167        }
168    }
169
170    unsafe extern "C" fn open_cb(_: i32, ptr: *mut c_void) {
171        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
172        rtc_dc.dc_handler.on_open()
173    }
174
175    unsafe extern "C" fn closed_cb(_: i32, ptr: *mut c_void) {
176        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
177        rtc_dc.dc_handler.on_closed()
178    }
179
180    unsafe extern "C" fn error_cb(_: i32, err: *const c_char, ptr: *mut c_void) {
181        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
182        let err = CStr::from_ptr(err).to_string_lossy();
183        rtc_dc.dc_handler.on_error(&err)
184    }
185
186    unsafe extern "C" fn message_cb(_: i32, msg: *const c_char, size: i32, ptr: *mut c_void) {
187        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
188        let msg = if size < 0 {
189            CStr::from_ptr(msg).to_bytes()
190        } else {
191            slice::from_raw_parts(msg as *const u8, size as usize)
192        };
193        rtc_dc.dc_handler.on_message(msg)
194    }
195
196    unsafe extern "C" fn buffered_amount_low_cb(_: i32, ptr: *mut c_void) {
197        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
198        rtc_dc.dc_handler.on_buffered_amount_low()
199    }
200
201    unsafe extern "C" fn available_cb(_: i32, ptr: *mut c_void) {
202        let rtc_dc = &mut *(ptr as *mut RtcDataChannel<D>);
203        rtc_dc.dc_handler.on_available()
204    }
205
206    pub fn id(&self) -> DataChannelId {
207        self.id
208    }
209
210    pub fn send(&mut self, msg: &[u8]) -> Result<()> {
211        check(unsafe {
212            sys::rtcSendMessage(self.id.0, msg.as_ptr() as *const c_char, msg.len() as i32)
213        })
214        .map(|_| ())
215    }
216
217    pub fn label(&self) -> String {
218        DataChannelInfo::label(self.id)
219    }
220
221    pub fn protocol(&self) -> Option<String> {
222        DataChannelInfo::protocol(self.id)
223    }
224
225    pub fn reliability(&self) -> Reliability {
226        DataChannelInfo::reliability(self.id)
227    }
228
229    pub fn stream(&self) -> usize {
230        DataChannelInfo::stream(self.id)
231    }
232
233    /// Number of bytes currently queued to be sent over the data channel.
234    ///
235    /// This method is the counterpart of [`available_amount`].
236    ///
237    /// [`available_amount`]: RtcDataChannel::available_amount
238    pub fn buffered_amount(&self) -> usize {
239        match check(unsafe { sys::rtcGetBufferedAmount(self.id.0) }) {
240            Ok(amount) => amount as usize,
241            Err(err) => {
242                logger::error!(
243                    "Couldn't get buffered_amount for RtcDataChannel id={:?} {:p}, {}",
244                    self.id,
245                    self,
246                    err
247                );
248                0
249            }
250        }
251    }
252
253    /// Sets the lower threshold of `buffered_amount`.
254    ///
255    /// The default value is 0. When the number of buffered outgoing bytes, as indicated
256    /// by [`buffered_amount`], falls to or below this value, a
257    /// [`on_bufferd_amount_low`] event is fired. This event may be used, for example,
258    /// to implement code which queues more messages to be sent whenever there's room to
259    /// buffer them.
260    ///
261    /// [`buffered_amount`]: RtcDataChannel::buffered_amount
262    /// [`on_bufferd_amount_low`]: DataChannelHandler::on_buffered_amount_low
263    pub fn set_buffered_amount_low_threshold(&mut self, amount: usize) -> Result<()> {
264        let amount = i32::try_from(amount).map_err(|_| Error::InvalidArg)?;
265        check(unsafe { sys::rtcSetBufferedAmountLowThreshold(self.id.0, amount) })?;
266        Ok(())
267    }
268
269    /// Number of bytes currently queued to be consumed from the data channel.
270    ///
271    /// This method is the counterpart of [`buffered_amount`].
272    ///
273    /// [`buffered_amount`]: RtcDataChannel::buffered_amount
274    pub fn available_amount(&self) -> usize {
275        match check(unsafe { sys::rtcGetAvailableAmount(self.id.0) }) {
276            Ok(amount) => amount as usize,
277            Err(err) => {
278                logger::error!(
279                    "Couldn't get available_amount for RtcDataChannel id={:?} {:p}, {}",
280                    self.id,
281                    self,
282                    err
283                );
284                0
285            }
286        }
287    }
288}
289
290impl<D> Drop for RtcDataChannel<D> {
291    fn drop(&mut self) {
292        if let Err(err) = check(unsafe { sys::rtcDeleteDataChannel(self.id.0) }) {
293            logger::error!(
294                "Error while dropping RtcDataChannel id={:?} {:p}: {}",
295                self.id,
296                self,
297                err
298            );
299        }
300    }
301}
302
303#[derive(Debug, Clone)]
304pub struct DataChannelInfo {
305    pub id: DataChannelId,
306    pub label: String,
307    pub protocol: Option<String>,
308    pub reliability: Reliability,
309    pub stream: usize,
310}
311
312impl DataChannelInfo {
313    pub(crate) fn label(id: DataChannelId) -> String {
314        let buf_size =
315            check(unsafe { sys::rtcGetDataChannelLabel(id.0, ptr::null_mut() as *mut c_char, 0) })
316                .expect("Couldn't get buffer size") as usize;
317
318        let mut buf = vec![0; buf_size];
319        match check(unsafe {
320            sys::rtcGetDataChannelLabel(id.0, buf.as_mut_ptr() as *mut c_char, buf_size as i32)
321        }) {
322            Ok(_) => match crate::ffi_string(&buf) {
323                Ok(label) => label,
324                Err(err) => {
325                    logger::error!("Couldn't get label for RtcDataChannel id={:?}, {}", id, err);
326                    String::default()
327                }
328            },
329            Err(err) => {
330                logger::warn!("Couldn't get label for RtcDataChannel id={:?}, {}", id, err);
331                String::default()
332            }
333        }
334    }
335
336    pub(crate) fn protocol(id: DataChannelId) -> Option<String> {
337        let buf_size = check(unsafe {
338            sys::rtcGetDataChannelProtocol(id.0, ptr::null_mut() as *mut c_char, 0)
339        })
340        .expect("Couldn't get buffer size") as usize;
341
342        let mut buf = vec![0; buf_size];
343        match check(unsafe {
344            sys::rtcGetDataChannelProtocol(id.0, buf.as_mut_ptr() as *mut c_char, buf_size as i32)
345        }) {
346            Ok(1) => None,
347            Ok(_) => match crate::ffi_string(&buf) {
348                Ok(protocol) => Some(protocol),
349                Err(err) => {
350                    logger::error!(
351                        "Couldn't get protocol for RtcDataChannel id={:?}, {}",
352                        id,
353                        err
354                    );
355                    None
356                }
357            },
358            Err(err) => {
359                logger::warn!(
360                    "Couldn't get protocol for RtcDataChannel id={:?}, {}",
361                    id,
362                    err
363                );
364                None
365            }
366        }
367    }
368
369    pub(crate) fn reliability(id: DataChannelId) -> Reliability {
370        let mut reliability = sys::rtcReliability {
371            unordered: false,
372            unreliable: false,
373            maxPacketLifeTime: 0,
374            maxRetransmits: 0,
375        };
376
377        check(unsafe { sys::rtcGetDataChannelReliability(id.0, &mut reliability) })
378            .expect("Couldn't get RtcDataChannel reliability");
379
380        Reliability::from_raw(reliability)
381    }
382
383    pub(crate) fn stream(id: DataChannelId) -> usize {
384        check(unsafe { sys::rtcGetDataChannelStream(id.0) })
385            .expect("Couldn't get RtcDataChannel stream") as usize
386    }
387}