rtc_datachannel/data_channel/
mod.rs

1#[cfg(test)]
2mod data_channel_test;
3
4use crate::message::{
5    message_channel_ack::*, message_channel_close::*, message_channel_open::*,
6    message_channel_threshold::*, *,
7};
8use bytes::{Buf, BytesMut};
9use log::debug;
10use sctp::{PayloadProtocolIdentifier, ReliabilityType};
11use shared::error::{Error, Result};
12use shared::marshal::*;
13use std::collections::VecDeque;
14
15const RECEIVE_MTU: usize = 8192;
16
17/// DataChannelConfig is used to configure the data channel.
18#[derive(Eq, PartialEq, Default, Clone, Debug)]
19pub struct DataChannelConfig {
20    pub channel_type: ChannelType,
21    pub negotiated: bool,
22    pub priority: u16,
23    pub reliability_parameter: u32,
24    pub label: String,
25    pub protocol: String,
26}
27
28/// DataChannelMessage is used to data sent over SCTP
29#[derive(Debug, Default, Clone)]
30pub struct DataChannelMessage {
31    pub association_handle: usize,
32    pub stream_id: u16,
33    pub ppi: PayloadProtocolIdentifier,
34    pub payload: BytesMut,
35}
36
37/// DataChannel represents a data channel
38#[derive(Debug, Default, Clone)]
39pub struct DataChannel {
40    config: DataChannelConfig,
41    association_handle: usize,
42    stream_id: u16,
43
44    read_outs: VecDeque<DataChannelMessage>,
45    write_outs: VecDeque<DataChannelMessage>,
46
47    // stats
48    messages_sent: usize,
49    messages_received: usize,
50    bytes_sent: usize,
51    bytes_received: usize,
52}
53
54impl DataChannel {
55    fn new(config: DataChannelConfig, association_handle: usize, stream_id: u16) -> Self {
56        Self {
57            config,
58            association_handle,
59            stream_id,
60            read_outs: VecDeque::new(),
61            write_outs: VecDeque::new(),
62            ..Default::default()
63        }
64    }
65
66    /// Dial opens a data channels over SCTP
67    pub fn dial(
68        config: DataChannelConfig,
69        association_handle: usize,
70        stream_id: u16,
71    ) -> Result<Self> {
72        let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id);
73
74        if !config.negotiated {
75            let msg = Message::DataChannelOpen(DataChannelOpen {
76                channel_type: config.channel_type,
77                priority: config.priority,
78                reliability_parameter: config.reliability_parameter,
79                label: config.label.bytes().collect(),
80                protocol: config.protocol.bytes().collect(),
81            })
82            .marshal()?;
83
84            data_channel.write_outs.push_back(DataChannelMessage {
85                association_handle,
86                stream_id,
87                ppi: PayloadProtocolIdentifier::Dcep,
88                payload: msg,
89            });
90        }
91
92        Ok(data_channel)
93    }
94
95    /// Accept is used to accept incoming data channels over SCTP
96    pub fn accept(
97        mut config: DataChannelConfig,
98        association_handle: usize,
99        stream_id: u16,
100        ppi: PayloadProtocolIdentifier,
101        buf: &[u8],
102    ) -> Result<Self> {
103        if ppi != PayloadProtocolIdentifier::Dcep {
104            return Err(Error::InvalidPayloadProtocolIdentifier(ppi as u8));
105        }
106
107        let mut read_buf = buf;
108        let msg = Message::unmarshal(&mut read_buf)?;
109
110        if let Message::DataChannelOpen(dco) = msg {
111            config.channel_type = dco.channel_type;
112            config.priority = dco.priority;
113            config.reliability_parameter = dco.reliability_parameter;
114            config.label = String::from_utf8(dco.label)?;
115            config.protocol = String::from_utf8(dco.protocol)?;
116        } else {
117            return Err(Error::InvalidMessageType(msg.message_type() as u8));
118        };
119
120        let mut data_channel = DataChannel::new(config, association_handle, stream_id);
121
122        data_channel.write_data_channel_ack()?;
123
124        Ok(data_channel)
125    }
126
127    /// MessagesSent returns the number of messages sent
128    pub fn messages_sent(&self) -> usize {
129        self.messages_sent
130    }
131
132    /// MessagesReceived returns the number of messages received
133    pub fn messages_received(&self) -> usize {
134        self.messages_received
135    }
136
137    /// BytesSent returns the number of bytes sent
138    pub fn bytes_sent(&self) -> usize {
139        self.bytes_sent
140    }
141
142    /// BytesReceived returns the number of bytes received
143    pub fn bytes_received(&self) -> usize {
144        self.bytes_received
145    }
146
147    /// association_handle returns the association handle
148    pub fn association_handle(&self) -> usize {
149        self.association_handle
150    }
151
152    /// StreamIdentifier returns the Stream identifier associated to the stream.
153    pub fn stream_identifier(&self) -> u16 {
154        self.stream_id
155    }
156
157    pub fn config(&self) -> &DataChannelConfig {
158        &self.config
159    }
160
161    fn handle_dcep<B>(&mut self, data: &mut B) -> Result<()>
162    where
163        B: Buf,
164    {
165        let msg = Message::unmarshal(data)?;
166
167        match msg {
168            Message::DataChannelOpen(_) => {
169                // Note: DATA_CHANNEL_OPEN message is handled inside Server() method.
170                // Therefore, the message will not reach here.
171                debug!("Received DATA_CHANNEL_OPEN");
172                self.write_data_channel_ack()?;
173            }
174            Message::DataChannelAck(_) => {
175                debug!("Received DATA_CHANNEL_ACK");
176            }
177            _ => {
178                return Err(Error::InvalidMessageType(msg.message_type() as u8));
179            }
180        };
181
182        Ok(())
183    }
184
185    fn write_data_channel_ack(&mut self) -> Result<()> {
186        let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
187        self.write_outs.push_back(DataChannelMessage {
188            association_handle: self.association_handle,
189            stream_id: self.stream_id,
190            ppi: PayloadProtocolIdentifier::Dcep,
191            payload: ack,
192        });
193        Ok(())
194    }
195
196    fn write_data_channel_close(&mut self) -> Result<()> {
197        let close = Message::DataChannelClose(DataChannelClose {}).marshal()?;
198        self.write_outs.push_back(DataChannelMessage {
199            association_handle: self.association_handle,
200            stream_id: self.stream_id,
201            ppi: PayloadProtocolIdentifier::Dcep,
202            payload: close,
203        });
204        Ok(())
205    }
206
207    fn write_data_channel_high_threshold(&mut self, threshold: u32) -> Result<()> {
208        let low_threshold =
209            Message::DataChannelThreshold(DataChannelThreshold::High(threshold)).marshal()?;
210        self.write_outs.push_back(DataChannelMessage {
211            association_handle: self.association_handle,
212            stream_id: self.stream_id,
213            ppi: PayloadProtocolIdentifier::Dcep,
214            payload: low_threshold,
215        });
216        Ok(())
217    }
218
219    fn write_data_channel_low_threshold(&mut self, threshold: u32) -> Result<()> {
220        let low_threshold =
221            Message::DataChannelThreshold(DataChannelThreshold::Low(threshold)).marshal()?;
222        self.write_outs.push_back(DataChannelMessage {
223            association_handle: self.association_handle,
224            stream_id: self.stream_id,
225            ppi: PayloadProtocolIdentifier::Dcep,
226            payload: low_threshold,
227        });
228        Ok(())
229    }
230
231    /// SetBufferedAmountHighThreshold is used to update the threshold.
232    /// See BufferedAmountHighThreshold().
233    pub fn set_buffered_amount_high_threshold(&mut self, threshold: u32) -> Result<()> {
234        self.write_data_channel_high_threshold(threshold)
235    }
236
237    /// SetBufferedAmountLowThreshold is used to update the threshold.
238    /// See BufferedAmountLowThreshold().
239    pub fn set_buffered_amount_low_threshold(&mut self, threshold: u32) -> Result<()> {
240        self.write_data_channel_low_threshold(threshold)
241    }
242
243    /*
244    /// OnBufferedAmountLow sets the callback handler which would be called when the
245    /// number of bytes of outgoing data buffered is lower than the threshold.
246    pub fn on_buffered_amount_low(&self, f: OnBufferedAmountLowFn) {
247        self.stream.on_buffered_amount_low(f)
248    }*/
249
250    pub fn get_reliability_params(channel_type: ChannelType) -> (bool, ReliabilityType) {
251        match channel_type {
252            ChannelType::Reliable => (false, ReliabilityType::Reliable),
253            ChannelType::ReliableUnordered => (true, ReliabilityType::Reliable),
254            ChannelType::PartialReliableRexmit => (false, ReliabilityType::Rexmit),
255            ChannelType::PartialReliableRexmitUnordered => (true, ReliabilityType::Rexmit),
256            ChannelType::PartialReliableTimed => (false, ReliabilityType::Timed),
257            ChannelType::PartialReliableTimedUnordered => (true, ReliabilityType::Timed),
258        }
259    }
260
261    pub fn get_channel_type_and_reliability_parameter(
262        ordered: bool,
263        max_retransmits: Option<u16>,
264        max_packet_life_time: Option<u16>,
265    ) -> (ChannelType, u32) {
266        let channel_type;
267        let reliability_parameter;
268
269        match (max_retransmits, max_packet_life_time) {
270            (None, None) => {
271                reliability_parameter = 0u32;
272                if ordered {
273                    channel_type = ChannelType::Reliable;
274                } else {
275                    channel_type = ChannelType::ReliableUnordered;
276                }
277            }
278
279            (Some(max_retransmits), _) => {
280                reliability_parameter = max_retransmits as u32;
281                if ordered {
282                    channel_type = ChannelType::PartialReliableRexmit;
283                } else {
284                    channel_type = ChannelType::PartialReliableRexmitUnordered;
285                }
286            }
287
288            (None, Some(max_packet_lifetime)) => {
289                reliability_parameter = max_packet_lifetime as u32;
290                if ordered {
291                    channel_type = ChannelType::PartialReliableTimed;
292                } else {
293                    channel_type = ChannelType::PartialReliableTimedUnordered;
294                }
295            }
296        }
297
298        (channel_type, reliability_parameter)
299    }
300
301    pub fn get_data_channel_message(is_string: bool, data: BytesMut) -> DataChannelMessage {
302        // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-12#section-6.6
303        // SCTP does not support the sending of empty user messages.  Therefore,
304        // if an empty message has to be sent, the appropriate PPID (WebRTC
305        // String Empty or WebRTC Binary Empty) is used and the SCTP user
306        // message of one zero byte is sent.  When receiving an SCTP user
307        // message with one of these PPIDs, the receiver MUST ignore the SCTP
308        // user message and process it as an empty message.
309        let ppi = match (is_string, data.len()) {
310            (false, 0) => PayloadProtocolIdentifier::BinaryEmpty,
311            (false, _) => PayloadProtocolIdentifier::Binary,
312            (true, 0) => PayloadProtocolIdentifier::StringEmpty,
313            (true, _) => PayloadProtocolIdentifier::String,
314        };
315
316        if data.is_empty() {
317            DataChannelMessage {
318                ppi,
319                payload: BytesMut::from(&[0][..]),
320                ..Default::default()
321            }
322        } else {
323            DataChannelMessage {
324                ppi,
325                payload: data,
326                ..Default::default()
327            }
328        }
329    }
330}
331
332impl sansio::Protocol<DataChannelMessage, DataChannelMessage, ()> for DataChannel {
333    type Rout = DataChannelMessage;
334    type Wout = DataChannelMessage;
335    type Eout = ();
336    type Error = Error;
337    type Time = ();
338
339    /// ReadDataChannel reads a packet of len(p) bytes. It returns the number of bytes read and
340    /// `true` if the data read is a string.
341    fn handle_read(&mut self, msg: DataChannelMessage) -> Result<()> {
342        self.messages_received += 1;
343        self.bytes_received += msg.payload.len();
344
345        if msg.ppi == PayloadProtocolIdentifier::Dcep {
346            let mut data_buf = &msg.payload[..];
347            self.handle_dcep(&mut data_buf)
348        } else {
349            self.read_outs.push_back(msg);
350            Ok(())
351        }
352    }
353
354    fn poll_read(&mut self) -> Option<DataChannelMessage> {
355        self.read_outs.pop_front()
356    }
357
358    /// handle_write writes len(p) bytes from p
359    fn handle_write(&mut self, mut msg: DataChannelMessage) -> Result<()> {
360        self.messages_sent += 1;
361        self.bytes_sent += msg.payload.len();
362
363        msg.association_handle = self.association_handle;
364        msg.stream_id = self.stream_id;
365        self.write_outs.push_back(msg);
366
367        Ok(())
368    }
369
370    /// Returns packets to transmit
371    fn poll_write(&mut self) -> Option<DataChannelMessage> {
372        self.write_outs.pop_front()
373    }
374
375    /// Close closes the DataChannel and the underlying SCTP stream.
376    fn close(&mut self) -> Result<()> {
377        // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7
378        // Closing of a data channel MUST be signaled by resetting the
379        // corresponding outgoing streams [RFC6525].  This means that if one
380        // side decides to close the data channel, it resets the corresponding
381        // outgoing stream.  When the peer sees that an incoming stream was
382        // reset, it also resets its corresponding outgoing stream.  Once this
383        // is completed, the data channel is closed.  Resetting a stream sets
384        // the Stream Sequence Numbers (SSNs) of the stream back to 'zero' with
385        // a corresponding notification to the application layer that the reset
386        // has been performed.  Streams are available for reuse after a reset
387        // has been performed.
388        self.write_data_channel_close()
389    }
390}