rtc_datachannel/data_channel/
mod.rs1use crate::message::{message_channel_ack::*, message_channel_open::*, *};
5use bytes::{Buf, BytesMut};
6use sctp::{PayloadProtocolIdentifier, ReliabilityType};
7use shared::error::{Error, Result};
8use shared::marshal::*;
9use std::collections::VecDeque;
10
11const RECEIVE_MTU: usize = 8192;
12
13#[derive(Eq, PartialEq, Default, Clone, Debug)]
15pub struct Config {
16 pub channel_type: ChannelType,
17 pub negotiated: bool,
18 pub priority: u16,
19 pub reliability_parameter: u32,
20 pub label: String,
21 pub protocol: String,
22}
23
24#[derive(Debug, Default, Clone)]
26pub struct DataChannelMessage {
27 pub association_handle: usize,
28 pub stream_id: u16,
29 pub ppi: PayloadProtocolIdentifier,
30 pub unordered: bool,
31 pub reliability_type: ReliabilityType,
32 pub payload: BytesMut,
33}
34
35#[derive(Debug, Default, Clone)]
37pub struct DataChannel {
38 config: Config,
39 association_handle: usize,
40 stream_id: u16,
41 messages: VecDeque<DataChannelMessage>,
42
43 messages_sent: usize,
45 messages_received: usize,
46 bytes_sent: usize,
47 bytes_received: usize,
48}
49
50impl DataChannel {
51 fn new(config: Config, association_handle: usize, stream_id: u16) -> Self {
52 Self {
53 config,
54 association_handle,
55 stream_id,
56 messages: VecDeque::new(),
57 ..Default::default()
58 }
59 }
60
61 pub fn dial(config: Config, association_handle: usize, stream_id: u16) -> Result<Self> {
63 let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id);
64
65 if !config.negotiated {
66 let msg = Message::DataChannelOpen(DataChannelOpen {
67 channel_type: config.channel_type,
68 priority: config.priority,
69 reliability_parameter: config.reliability_parameter,
70 label: config.label.bytes().collect(),
71 protocol: config.protocol.bytes().collect(),
72 })
73 .marshal()?;
74
75 let (unordered, reliability_type) = data_channel.get_reliability_params();
76
77 data_channel.messages.push_back(DataChannelMessage {
78 association_handle,
79 stream_id,
80 ppi: PayloadProtocolIdentifier::Dcep,
81 unordered,
82 reliability_type,
83 payload: msg,
84 });
85 }
86
87 Ok(data_channel)
88 }
89
90 pub fn accept(
92 mut config: Config,
93 association_handle: usize,
94 stream_id: u16,
95 ppi: PayloadProtocolIdentifier,
96 buf: &[u8],
97 ) -> Result<Self> {
98 if ppi != PayloadProtocolIdentifier::Dcep {
99 return Err(Error::InvalidPayloadProtocolIdentifier(ppi as u8));
100 }
101
102 let mut read_buf = buf;
103 let msg = Message::unmarshal(&mut read_buf)?;
104
105 if let Message::DataChannelOpen(dco) = msg {
106 config.channel_type = dco.channel_type;
107 config.priority = dco.priority;
108 config.reliability_parameter = dco.reliability_parameter;
109 config.label = String::from_utf8(dco.label)?;
110 config.protocol = String::from_utf8(dco.protocol)?;
111 } else {
112 return Err(Error::InvalidMessageType(msg.message_type() as u8));
113 };
114
115 let mut data_channel = DataChannel::new(config, association_handle, stream_id);
116
117 data_channel.write_data_channel_ack()?;
118
119 Ok(data_channel)
120 }
121
122 pub fn poll_transmit(&mut self) -> Option<DataChannelMessage> {
124 self.messages.pop_front()
125 }
126
127 pub fn read(&mut self, ppi: PayloadProtocolIdentifier, buf: &[u8]) -> Result<BytesMut> {
129 self.read_data_channel(ppi, buf).map(|(b, _)| b)
130 }
131
132 pub fn read_data_channel(
135 &mut self,
136 ppi: PayloadProtocolIdentifier,
137 buf: &[u8],
138 ) -> Result<(BytesMut, bool)> {
139 let mut is_string = false;
140 match ppi {
141 PayloadProtocolIdentifier::Dcep => {
142 let mut data_buf = buf;
143 match self.handle_dcep(&mut data_buf) {
144 Ok(()) => {}
145 Err(err) => {
146 log::error!("Failed to handle DCEP: {:?}", err);
147 return Err(err);
148 }
149 }
150 }
151 PayloadProtocolIdentifier::String | PayloadProtocolIdentifier::StringEmpty => {
152 is_string = true;
153 }
154 _ => {}
155 };
156
157 let data = match ppi {
158 PayloadProtocolIdentifier::StringEmpty | PayloadProtocolIdentifier::BinaryEmpty => {
159 BytesMut::new()
160 }
161 _ => BytesMut::from(buf),
162 };
163
164 self.messages_received += 1;
165 self.bytes_received += 1;
166
167 Ok((data, is_string))
168 }
169
170 pub fn messages_sent(&self) -> usize {
172 self.messages_sent
173 }
174
175 pub fn messages_received(&self) -> usize {
177 self.messages_received
178 }
179
180 pub fn bytes_sent(&self) -> usize {
182 self.bytes_sent
183 }
184
185 pub fn bytes_received(&self) -> usize {
187 self.bytes_received
188 }
189
190 pub fn association_handle(&self) -> usize {
192 self.association_handle
193 }
194
195 pub fn stream_identifier(&self) -> u16 {
197 self.stream_id
198 }
199
200 fn handle_dcep<B>(&mut self, data: &mut B) -> Result<()>
201 where
202 B: Buf,
203 {
204 let msg = Message::unmarshal(data)?;
205
206 match msg {
207 Message::DataChannelOpen(_) => {
208 log::debug!("Received DATA_CHANNEL_OPEN");
211 self.write_data_channel_ack()?;
212 }
213 Message::DataChannelAck(_) => {
214 log::debug!("Received DATA_CHANNEL_ACK");
215 }
217 };
218
219 Ok(())
220 }
221
222 pub fn write(&mut self, data: &[u8]) -> Result<usize> {
224 self.write_data_channel(data, false)
225 }
226
227 pub fn write_data_channel(&mut self, data: &[u8], is_string: bool) -> Result<usize> {
229 let data_len = data.len();
230
231 let ppi = match (is_string, data_len) {
239 (false, 0) => PayloadProtocolIdentifier::BinaryEmpty,
240 (false, _) => PayloadProtocolIdentifier::Binary,
241 (true, 0) => PayloadProtocolIdentifier::StringEmpty,
242 (true, _) => PayloadProtocolIdentifier::String,
243 };
244
245 let (unordered, reliability_type) = self.get_reliability_params();
246
247 let n = if data_len == 0 {
248 self.messages.push_back(DataChannelMessage {
249 association_handle: self.association_handle,
250 stream_id: self.stream_id,
251 ppi,
252 unordered,
253 reliability_type,
254 payload: BytesMut::from(&[0][..]),
255 });
256
257 0
258 } else {
259 self.messages.push_back(DataChannelMessage {
260 association_handle: self.association_handle,
261 stream_id: self.stream_id,
262 ppi,
263 unordered,
264 reliability_type,
265 payload: BytesMut::from(data),
266 });
267
268 self.bytes_sent += data.len();
269 data.len()
270 };
271
272 self.messages_sent += 1;
273 Ok(n)
274 }
275
276 fn write_data_channel_ack(&mut self) -> Result<()> {
277 let ack = Message::DataChannelAck(DataChannelAck {}).marshal()?;
278 let (unordered, reliability_type) = self.get_reliability_params();
279 self.messages.push_back(DataChannelMessage {
280 association_handle: self.association_handle,
281 stream_id: self.stream_id,
282 ppi: PayloadProtocolIdentifier::Dcep,
283 unordered,
284 reliability_type,
285 payload: ack,
286 });
287 Ok(())
288 }
289 fn get_reliability_params(&self) -> (bool, ReliabilityType) {
331 let (unordered, reliability_type) = match self.config.channel_type {
332 ChannelType::Reliable => (false, ReliabilityType::Reliable),
333 ChannelType::ReliableUnordered => (true, ReliabilityType::Reliable),
334 ChannelType::PartialReliableRexmit => (false, ReliabilityType::Rexmit),
335 ChannelType::PartialReliableRexmitUnordered => (true, ReliabilityType::Rexmit),
336 ChannelType::PartialReliableTimed => (false, ReliabilityType::Timed),
337 ChannelType::PartialReliableTimedUnordered => (true, ReliabilityType::Timed),
338 };
339
340 (unordered, reliability_type)
341 }
342}