rtc_datachannel/data_channel/
mod.rs

1//#[cfg(test)]
2//mod data_channel_test;
3
4use crate::message::{message_channel_ack::*, message_channel_open::*, *};
5use bytes::{Buf, BytesMut};
6use sctp::{PayloadProtocolIdentifier, ReliabilityType};
7use shared::error::{Error, Result};
8use shared::marshal::*;
9use std::collections::VecDeque;
10
11const RECEIVE_MTU: usize = 8192;
12
13/// Config is used to configure the data channel.
14#[derive(Eq, PartialEq, Default, Clone, Debug)]
15pub struct Config {
16    pub channel_type: ChannelType,
17    pub negotiated: bool,
18    pub priority: u16,
19    pub reliability_parameter: u32,
20    pub label: String,
21    pub protocol: String,
22}
23
24/// DataChannelMessage is used to data sent over SCTP
25#[derive(Debug, Default, Clone)]
26pub struct DataChannelMessage {
27    pub association_handle: usize,
28    pub stream_id: u16,
29    pub ppi: PayloadProtocolIdentifier,
30    pub unordered: bool,
31    pub reliability_type: ReliabilityType,
32    pub payload: BytesMut,
33}
34
35/// DataChannel represents a data channel
36#[derive(Debug, Default, Clone)]
37pub struct DataChannel {
38    config: Config,
39    association_handle: usize,
40    stream_id: u16,
41    messages: VecDeque<DataChannelMessage>,
42
43    // stats
44    messages_sent: usize,
45    messages_received: usize,
46    bytes_sent: usize,
47    bytes_received: usize,
48}
49
50impl DataChannel {
51    fn new(config: Config, association_handle: usize, stream_id: u16) -> Self {
52        Self {
53            config,
54            association_handle,
55            stream_id,
56            messages: VecDeque::new(),
57            ..Default::default()
58        }
59    }
60
61    /// Dial opens a data channels over SCTP
62    pub fn dial(config: Config, association_handle: usize, stream_id: u16) -> Result<Self> {
63        let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id);
64
65        if !config.negotiated {
66            let msg = Message::DataChannelOpen(DataChannelOpen {
67                channel_type: config.channel_type,
68                priority: config.priority,
69                reliability_parameter: config.reliability_parameter,
70                label: config.label.bytes().collect(),
71                protocol: config.protocol.bytes().collect(),
72            })
73            .marshal()?;
74
75            let (unordered, reliability_type) = data_channel.get_reliability_params();
76
77            data_channel.messages.push_back(DataChannelMessage {
78                association_handle,
79                stream_id,
80                ppi: PayloadProtocolIdentifier::Dcep,
81                unordered,
82                reliability_type,
83                payload: msg,
84            });
85        }
86
87        Ok(data_channel)
88    }
89
90    /// Accept is used to accept incoming data channels over SCTP
91    pub fn accept(
92        mut config: Config,
93        association_handle: usize,
94        stream_id: u16,
95        ppi: PayloadProtocolIdentifier,
96        buf: &[u8],
97    ) -> Result<Self> {
98        if ppi != PayloadProtocolIdentifier::Dcep {
99            return Err(Error::InvalidPayloadProtocolIdentifier(ppi as u8));
100        }
101
102        let mut read_buf = buf;
103        let msg = Message::unmarshal(&mut read_buf)?;
104
105        if let Message::DataChannelOpen(dco) = msg {
106            config.channel_type = dco.channel_type;
107            config.priority = dco.priority;
108            config.reliability_parameter = dco.reliability_parameter;
109            config.label = String::from_utf8(dco.label)?;
110            config.protocol = String::from_utf8(dco.protocol)?;
111        } else {
112            return Err(Error::InvalidMessageType(msg.message_type() as u8));
113        };
114
115        let mut data_channel = DataChannel::new(config, association_handle, stream_id);
116
117        data_channel.write_data_channel_ack()?;
118
119        Ok(data_channel)
120    }
121
122    /// Returns packets to transmit
123    pub fn poll_transmit(&mut self) -> Option<DataChannelMessage> {
124        self.messages.pop_front()
125    }
126
127    /// Read reads a packet of len(p) bytes as binary data.
128    pub fn read(&mut self, ppi: PayloadProtocolIdentifier, buf: &[u8]) -> Result<BytesMut> {
129        self.read_data_channel(ppi, buf).map(|(b, _)| b)
130    }
131
132    /// ReadDataChannel reads a packet of len(p) bytes. It returns the number of bytes read and
133    /// `true` if the data read is a string.
134    pub fn read_data_channel(
135        &mut self,
136        ppi: PayloadProtocolIdentifier,
137        buf: &[u8],
138    ) -> Result<(BytesMut, bool)> {
139        let mut is_string = false;
140        match ppi {
141            PayloadProtocolIdentifier::Dcep => {
142                let mut data_buf = buf;
143                match self.handle_dcep(&mut data_buf) {
144                    Ok(()) => {}
145                    Err(err) => {
146                        log::error!("Failed to handle DCEP: {:?}", err);
147                        return Err(err);
148                    }
149                }
150            }
151            PayloadProtocolIdentifier::String | PayloadProtocolIdentifier::StringEmpty => {
152                is_string = true;
153            }
154            _ => {}
155        };
156
157        let data = match ppi {
158            PayloadProtocolIdentifier::StringEmpty | PayloadProtocolIdentifier::BinaryEmpty => {
159                BytesMut::new()
160            }
161            _ => BytesMut::from(buf),
162        };
163
164        self.messages_received += 1;
165        self.bytes_received += 1;
166
167        Ok((data, is_string))
168    }
169
170    /// MessagesSent returns the number of messages sent
171    pub fn messages_sent(&self) -> usize {
172        self.messages_sent
173    }
174
175    /// MessagesReceived returns the number of messages received
176    pub fn messages_received(&self) -> usize {
177        self.messages_received
178    }
179
180    /// BytesSent returns the number of bytes sent
181    pub fn bytes_sent(&self) -> usize {
182        self.bytes_sent
183    }
184
185    /// BytesReceived returns the number of bytes received
186    pub fn bytes_received(&self) -> usize {
187        self.bytes_received
188    }
189
190    /// association_handle returns the association handle
191    pub fn association_handle(&self) -> usize {
192        self.association_handle
193    }
194
195    /// StreamIdentifier returns the Stream identifier associated to the stream.
196    pub fn stream_identifier(&self) -> u16 {
197        self.stream_id
198    }
199
200    fn handle_dcep<B>(&mut self, data: &mut B) -> Result<()>
201    where
202        B: Buf,
203    {
204        let msg = Message::unmarshal(data)?;
205
206        match msg {
207            Message::DataChannelOpen(_) => {
208                // Note: DATA_CHANNEL_OPEN message is handled inside Server() method.
209                // Therefore, the message will not reach here.
210                log::debug!("Received DATA_CHANNEL_OPEN");
211                self.write_data_channel_ack()?;
212            }
213            Message::DataChannelAck(_) => {
214                log::debug!("Received DATA_CHANNEL_ACK");
215                //self.commit_reliability_params();
216            }
217        };
218
219        Ok(())
220    }
221
222    /// Write writes len(p) bytes from p as binary data
223    pub fn write(&mut self, data: &[u8]) -> Result<usize> {
224        self.write_data_channel(data, false)
225    }
226
227    /// WriteDataChannel writes len(p) bytes from p
228    pub fn write_data_channel(&mut self, data: &[u8], is_string: bool) -> Result<usize> {
229        let data_len = data.len();
230
231        // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-12#section-6.6
232        // SCTP does not support the sending of empty user messages.  Therefore,
233        // if an empty message has to be sent, the appropriate PPID (WebRTC
234        // String Empty or WebRTC Binary Empty) is used and the SCTP user
235        // message of one zero byte is sent.  When receiving an SCTP user
236        // message with one of these PPIDs, the receiver MUST ignore the SCTP
237        // user message and process it as an empty message.
238        let ppi = match (is_string, data_len) {
239            (false, 0) => PayloadProtocolIdentifier::BinaryEmpty,
240            (false, _) => PayloadProtocolIdentifier::Binary,
241            (true, 0) => PayloadProtocolIdentifier::StringEmpty,
242            (true, _) => PayloadProtocolIdentifier::String,
243        };
244
245        let (unordered, reliability_type) = self.get_reliability_params();
246
247        let n = if data_len == 0 {
248            self.messages.push_back(DataChannelMessage {
249                association_handle: self.association_handle,
250                stream_id: self.stream_id,
251                ppi,
252                unordered,
253                reliability_type,
254                payload: BytesMut::from(&[0][..]),
255            });
256
257            0
258        } else {
259            self.messages.push_back(DataChannelMessage {
260                association_handle: self.association_handle,
261                stream_id: self.stream_id,
262                ppi,
263                unordered,
264                reliability_type,
265                payload: BytesMut::from(data),
266            });
267
268            self.bytes_sent += data.len();
269            data.len()
270        };
271
272        self.messages_sent += 1;
273        Ok(n)
274    }
275
276    fn write_data_channel_ack(&mut self) -> Result<()> {
277        let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
278        let (unordered, reliability_type) = self.get_reliability_params();
279        self.messages.push_back(DataChannelMessage {
280            association_handle: self.association_handle,
281            stream_id: self.stream_id,
282            ppi: PayloadProtocolIdentifier::Dcep,
283            unordered,
284            reliability_type,
285            payload: ack,
286        });
287        Ok(())
288    }
289    /*
290    /// Close closes the DataChannel and the underlying SCTP stream.
291    pub async fn close(&self) -> Result<()> {
292        // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7
293        // Closing of a data channel MUST be signaled by resetting the
294        // corresponding outgoing streams [RFC6525].  This means that if one
295        // side decides to close the data channel, it resets the corresponding
296        // outgoing stream.  When the peer sees that an incoming stream was
297        // reset, it also resets its corresponding outgoing stream.  Once this
298        // is completed, the data channel is closed.  Resetting a stream sets
299        // the Stream Sequence Numbers (SSNs) of the stream back to 'zero' with
300        // a corresponding notification to the application layer that the reset
301        // has been performed.  Streams are available for reuse after a reset
302        // has been performed.
303        Ok(self.stream.shutdown(Shutdown::Both).await?)
304    }
305
306    /// BufferedAmount returns the number of bytes of data currently queued to be
307    /// sent over this stream.
308    pub fn buffered_amount(&self) -> usize {
309        self.stream.buffered_amount()
310    }
311
312    /// BufferedAmountLowThreshold returns the number of bytes of buffered outgoing
313    /// data that is considered "low." Defaults to 0.
314    pub fn buffered_amount_low_threshold(&self) -> usize {
315        self.stream.buffered_amount_low_threshold()
316    }
317
318    /// SetBufferedAmountLowThreshold is used to update the threshold.
319    /// See BufferedAmountLowThreshold().
320    pub fn set_buffered_amount_low_threshold(&self, threshold: usize) {
321        self.stream.set_buffered_amount_low_threshold(threshold)
322    }
323
324    /// OnBufferedAmountLow sets the callback handler which would be called when the
325    /// number of bytes of outgoing data buffered is lower than the threshold.
326    pub fn on_buffered_amount_low(&self, f: OnBufferedAmountLowFn) {
327        self.stream.on_buffered_amount_low(f)
328    }*/
329
330    fn get_reliability_params(&self) -> (bool, ReliabilityType) {
331        let (unordered, reliability_type) = match self.config.channel_type {
332            ChannelType::Reliable => (false, ReliabilityType::Reliable),
333            ChannelType::ReliableUnordered => (true, ReliabilityType::Reliable),
334            ChannelType::PartialReliableRexmit => (false, ReliabilityType::Rexmit),
335            ChannelType::PartialReliableRexmitUnordered => (true, ReliabilityType::Rexmit),
336            ChannelType::PartialReliableTimed => (false, ReliabilityType::Timed),
337            ChannelType::PartialReliableTimedUnordered => (true, ReliabilityType::Timed),
338        };
339
340        (unordered, reliability_type)
341    }
342}