rtc_datachannel/data_channel/
mod.rs1#[cfg(test)]
2mod data_channel_test;
3
4use crate::message::{
5 message_channel_ack::*, message_channel_close::*, message_channel_open::*,
6 message_channel_threshold::*, *,
7};
8use bytes::{Buf, BytesMut};
9use log::debug;
10use sctp::{PayloadProtocolIdentifier, ReliabilityType};
11use shared::error::{Error, Result};
12use shared::marshal::*;
13use std::collections::VecDeque;
14
15const RECEIVE_MTU: usize = 8192;
16
17#[derive(Eq, PartialEq, Default, Clone, Debug)]
19pub struct DataChannelConfig {
20 pub channel_type: ChannelType,
21 pub negotiated: bool,
22 pub priority: u16,
23 pub reliability_parameter: u32,
24 pub label: String,
25 pub protocol: String,
26}
27
28#[derive(Debug, Default, Clone)]
30pub struct DataChannelMessage {
31 pub association_handle: usize,
32 pub stream_id: u16,
33 pub ppi: PayloadProtocolIdentifier,
34 pub payload: BytesMut,
35}
36
37#[derive(Debug, Default, Clone)]
39pub struct DataChannel {
40 config: DataChannelConfig,
41 association_handle: usize,
42 stream_id: u16,
43
44 read_outs: VecDeque<DataChannelMessage>,
45 write_outs: VecDeque<DataChannelMessage>,
46
47 messages_sent: usize,
49 messages_received: usize,
50 bytes_sent: usize,
51 bytes_received: usize,
52}
53
54impl DataChannel {
55 fn new(config: DataChannelConfig, association_handle: usize, stream_id: u16) -> Self {
56 Self {
57 config,
58 association_handle,
59 stream_id,
60 read_outs: VecDeque::new(),
61 write_outs: VecDeque::new(),
62 ..Default::default()
63 }
64 }
65
66 pub fn dial(
68 config: DataChannelConfig,
69 association_handle: usize,
70 stream_id: u16,
71 ) -> Result<Self> {
72 let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id);
73
74 if !config.negotiated {
75 let msg = Message::DataChannelOpen(DataChannelOpen {
76 channel_type: config.channel_type,
77 priority: config.priority,
78 reliability_parameter: config.reliability_parameter,
79 label: config.label.bytes().collect(),
80 protocol: config.protocol.bytes().collect(),
81 })
82 .marshal()?;
83
84 data_channel.write_outs.push_back(DataChannelMessage {
85 association_handle,
86 stream_id,
87 ppi: PayloadProtocolIdentifier::Dcep,
88 payload: msg,
89 });
90 }
91
92 Ok(data_channel)
93 }
94
95 pub fn accept(
97 mut config: DataChannelConfig,
98 association_handle: usize,
99 stream_id: u16,
100 ppi: PayloadProtocolIdentifier,
101 buf: &[u8],
102 ) -> Result<Self> {
103 if ppi != PayloadProtocolIdentifier::Dcep {
104 return Err(Error::InvalidPayloadProtocolIdentifier(ppi as u8));
105 }
106
107 let mut read_buf = buf;
108 let msg = Message::unmarshal(&mut read_buf)?;
109
110 if let Message::DataChannelOpen(dco) = msg {
111 config.channel_type = dco.channel_type;
112 config.priority = dco.priority;
113 config.reliability_parameter = dco.reliability_parameter;
114 config.label = String::from_utf8(dco.label)?;
115 config.protocol = String::from_utf8(dco.protocol)?;
116 } else {
117 return Err(Error::InvalidMessageType(msg.message_type() as u8));
118 };
119
120 let mut data_channel = DataChannel::new(config, association_handle, stream_id);
121
122 data_channel.write_data_channel_ack()?;
123
124 Ok(data_channel)
125 }
126
127 pub fn messages_sent(&self) -> usize {
129 self.messages_sent
130 }
131
132 pub fn messages_received(&self) -> usize {
134 self.messages_received
135 }
136
137 pub fn bytes_sent(&self) -> usize {
139 self.bytes_sent
140 }
141
142 pub fn bytes_received(&self) -> usize {
144 self.bytes_received
145 }
146
147 pub fn association_handle(&self) -> usize {
149 self.association_handle
150 }
151
152 pub fn stream_identifier(&self) -> u16 {
154 self.stream_id
155 }
156
157 pub fn config(&self) -> &DataChannelConfig {
158 &self.config
159 }
160
161 fn handle_dcep<B>(&mut self, data: &mut B) -> Result<()>
162 where
163 B: Buf,
164 {
165 let msg = Message::unmarshal(data)?;
166
167 match msg {
168 Message::DataChannelOpen(_) => {
169 debug!("Received DATA_CHANNEL_OPEN");
172 self.write_data_channel_ack()?;
173 }
174 Message::DataChannelAck(_) => {
175 debug!("Received DATA_CHANNEL_ACK");
176 }
177 _ => {
178 return Err(Error::InvalidMessageType(msg.message_type() as u8));
179 }
180 };
181
182 Ok(())
183 }
184
185 fn write_data_channel_ack(&mut self) -> Result<()> {
186 let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
187 self.write_outs.push_back(DataChannelMessage {
188 association_handle: self.association_handle,
189 stream_id: self.stream_id,
190 ppi: PayloadProtocolIdentifier::Dcep,
191 payload: ack,
192 });
193 Ok(())
194 }
195
196 fn write_data_channel_close(&mut self) -> Result<()> {
197 let close = Message::DataChannelClose(DataChannelClose {}).marshal()?;
198 self.write_outs.push_back(DataChannelMessage {
199 association_handle: self.association_handle,
200 stream_id: self.stream_id,
201 ppi: PayloadProtocolIdentifier::Dcep,
202 payload: close,
203 });
204 Ok(())
205 }
206
207 fn write_data_channel_high_threshold(&mut self, threshold: u32) -> Result<()> {
208 let low_threshold =
209 Message::DataChannelThreshold(DataChannelThreshold::High(threshold)).marshal()?;
210 self.write_outs.push_back(DataChannelMessage {
211 association_handle: self.association_handle,
212 stream_id: self.stream_id,
213 ppi: PayloadProtocolIdentifier::Dcep,
214 payload: low_threshold,
215 });
216 Ok(())
217 }
218
219 fn write_data_channel_low_threshold(&mut self, threshold: u32) -> Result<()> {
220 let low_threshold =
221 Message::DataChannelThreshold(DataChannelThreshold::Low(threshold)).marshal()?;
222 self.write_outs.push_back(DataChannelMessage {
223 association_handle: self.association_handle,
224 stream_id: self.stream_id,
225 ppi: PayloadProtocolIdentifier::Dcep,
226 payload: low_threshold,
227 });
228 Ok(())
229 }
230
231 pub fn set_buffered_amount_high_threshold(&mut self, threshold: u32) -> Result<()> {
234 self.write_data_channel_high_threshold(threshold)
235 }
236
237 pub fn set_buffered_amount_low_threshold(&mut self, threshold: u32) -> Result<()> {
240 self.write_data_channel_low_threshold(threshold)
241 }
242
243 pub fn get_reliability_params(channel_type: ChannelType) -> (bool, ReliabilityType) {
251 match channel_type {
252 ChannelType::Reliable => (false, ReliabilityType::Reliable),
253 ChannelType::ReliableUnordered => (true, ReliabilityType::Reliable),
254 ChannelType::PartialReliableRexmit => (false, ReliabilityType::Rexmit),
255 ChannelType::PartialReliableRexmitUnordered => (true, ReliabilityType::Rexmit),
256 ChannelType::PartialReliableTimed => (false, ReliabilityType::Timed),
257 ChannelType::PartialReliableTimedUnordered => (true, ReliabilityType::Timed),
258 }
259 }
260
261 pub fn get_channel_type_and_reliability_parameter(
262 ordered: bool,
263 max_retransmits: Option<u16>,
264 max_packet_life_time: Option<u16>,
265 ) -> (ChannelType, u32) {
266 let channel_type;
267 let reliability_parameter;
268
269 match (max_retransmits, max_packet_life_time) {
270 (None, None) => {
271 reliability_parameter = 0u32;
272 if ordered {
273 channel_type = ChannelType::Reliable;
274 } else {
275 channel_type = ChannelType::ReliableUnordered;
276 }
277 }
278
279 (Some(max_retransmits), _) => {
280 reliability_parameter = max_retransmits as u32;
281 if ordered {
282 channel_type = ChannelType::PartialReliableRexmit;
283 } else {
284 channel_type = ChannelType::PartialReliableRexmitUnordered;
285 }
286 }
287
288 (None, Some(max_packet_lifetime)) => {
289 reliability_parameter = max_packet_lifetime as u32;
290 if ordered {
291 channel_type = ChannelType::PartialReliableTimed;
292 } else {
293 channel_type = ChannelType::PartialReliableTimedUnordered;
294 }
295 }
296 }
297
298 (channel_type, reliability_parameter)
299 }
300
301 pub fn get_data_channel_message(is_string: bool, data: BytesMut) -> DataChannelMessage {
302 let ppi = match (is_string, data.len()) {
310 (false, 0) => PayloadProtocolIdentifier::BinaryEmpty,
311 (false, _) => PayloadProtocolIdentifier::Binary,
312 (true, 0) => PayloadProtocolIdentifier::StringEmpty,
313 (true, _) => PayloadProtocolIdentifier::String,
314 };
315
316 if data.is_empty() {
317 DataChannelMessage {
318 ppi,
319 payload: BytesMut::from(&[0][..]),
320 ..Default::default()
321 }
322 } else {
323 DataChannelMessage {
324 ppi,
325 payload: data,
326 ..Default::default()
327 }
328 }
329 }
330}
331
332impl sansio::Protocol<DataChannelMessage, DataChannelMessage, ()> for DataChannel {
333 type Rout = DataChannelMessage;
334 type Wout = DataChannelMessage;
335 type Eout = ();
336 type Error = Error;
337 type Time = ();
338
339 fn handle_read(&mut self, msg: DataChannelMessage) -> Result<()> {
342 self.messages_received += 1;
343 self.bytes_received += msg.payload.len();
344
345 if msg.ppi == PayloadProtocolIdentifier::Dcep {
346 let mut data_buf = &msg.payload[..];
347 self.handle_dcep(&mut data_buf)
348 } else {
349 self.read_outs.push_back(msg);
350 Ok(())
351 }
352 }
353
354 fn poll_read(&mut self) -> Option<DataChannelMessage> {
355 self.read_outs.pop_front()
356 }
357
358 fn handle_write(&mut self, mut msg: DataChannelMessage) -> Result<()> {
360 self.messages_sent += 1;
361 self.bytes_sent += msg.payload.len();
362
363 msg.association_handle = self.association_handle;
364 msg.stream_id = self.stream_id;
365 self.write_outs.push_back(msg);
366
367 Ok(())
368 }
369
370 fn poll_write(&mut self) -> Option<DataChannelMessage> {
372 self.write_outs.pop_front()
373 }
374
375 fn close(&mut self) -> Result<()> {
377 self.write_data_channel_close()
389 }
390}