str0m 0.18.0

WebRTC library in Sans-IO style
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
//! Data channel related types.

use std::time::Duration;
use std::{fmt, str, time::Instant};

use crate::sctp::RtcSctp;
use crate::util::already_happened;
use crate::{Rtc, RtcError};

pub use crate::sctp::ChannelConfig;
pub use crate::sctp::Reliability;
pub use crate::sctp::SctpInitData;

/// Identifier of a data channel.
///
/// This is NOT the SCTP stream id.
// Deliberately not Deref or From to avoid this Id being created outside of this module.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChannelId(usize);

/// Data channel data from remote peer.
///
/// This is obtained via [`Event::ChannelData`][crate::Event::ChannelData].
#[derive(PartialEq, Eq)]
pub struct ChannelData {
    /// Identifier of the channel this data was sent on.
    ///
    /// The channel would have been previously announced via
    /// [`Event::ChannelOpen`][crate::Event::ChannelOpen].
    pub id: ChannelId,

    /// Tells whether the sender sent this data as binary or text.
    pub binary: bool,

    /// The actual data sent. If `binary` is false, this can be converted to text.
    pub data: Vec<u8>,
}

/// Channel for sending data to the remote peer.
///
/// Get this handle from [`Rtc::channel()`][crate::Rtc::channel()].
pub struct Channel<'a> {
    sctp_stream_id: u16,
    rtc: &'a mut Rtc,
}

impl<'a> Channel<'a> {
    pub(crate) fn new(sctp_stream_id: u16, rtc: &'a mut Rtc) -> Self {
        Channel {
            rtc,
            sctp_stream_id,
        }
    }

    /// Write data to the remote peer and indicate whether it's text or binary.
    ///
    /// Returns true or false whether the buffer was accepted or not.
    #[must_use = "Whether the buffer was accepted by the write()"]
    pub fn write(&mut self, binary: bool, buf: &[u8]) -> Result<bool, RtcError> {
        // If it's not available, don't accept.
        let available = self.rtc.sctp.available();
        if buf.len() > available {
            return Ok(false);
        }

        // Try write.
        let written = self.rtc.sctp.write(self.sctp_stream_id, binary, buf)?;

        // Invariant: if available calculation is correct, we should have accepted.
        assert_eq!(
            written,
            buf.len(),
            "Data channel write() less than entire buffer"
        );

        Ok(true)
    }

    /// Get the amount of buffered data.
    ///
    /// Returns 0 if the channel is closed or encountered some error. This is to
    /// be similar to the [RTCPeerConnection equivalent][buff]
    ///
    /// [buff]: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/bufferedAmount
    pub fn buffered_amount(&mut self) -> usize {
        self.rtc.sctp.buffered_amount(self.sctp_stream_id)
    }

    /// Set the threshold to emit an
    /// [`Event::ChannelBufferedAmountLow`][crate::Event::ChannelBufferedAmountLow]
    ///
    /// Setting this on a closed or broken channel does not show an error. This is
    /// be similar to the [RTCPeerConnection equivalent][buff]
    ///
    /// [buff]: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/bufferedAmountLowThreshold
    pub fn set_buffered_amount_low_threshold(&mut self, threshold: usize) {
        self.rtc
            .sctp
            .set_buffered_amount_low_threshold(self.sctp_stream_id, threshold);
    }

    /// Get the channel config.
    ///
    /// The config is not available in every case depending on whether the channel was
    /// negotiated in- or out of band.
    ///
    /// # In-band negotiation (the usual case)
    ///
    /// For (regular) in-band negotiation (DCEP, Data Channel Establishment Protocol), this
    /// returns `None` until the DCEP handshake completes. The config is guaranteed to be
    /// available when [`Event::ChannelOpen`][crate::Event::ChannelOpen] is emitted.
    ///
    /// # Out-of-band negotiation
    ///
    /// Returns `None` when the remote side created the data channel connection without using
    /// DCEP. This is called out-of-band negotiation, where the remote peer opens a stream
    /// but doesn't send the channel configuration through the DCEP protocol messages.
    ///
    /// For locally created out-of-band channels, the config is always available since it
    /// was provided during channel creation.
    ///
    /// In str0m, DCEP is disabled by setting the `negotiated` field to `Some(stream_id)` in
    /// [`ChannelConfig`]. This corresponds to the `negotiated: true` property in the
    /// browser's [`createDataChannel()`][n] dictionary.
    ///
    /// [n]: https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createDataChannel#negotiated
    pub fn config(&self) -> Option<&ChannelConfig> {
        self.rtc.sctp.config(self.sctp_stream_id)
    }
}

impl fmt::Debug for ChannelData {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut ds = f.debug_struct("ChannelData");

        ds.field("id", &self.id);
        ds.field("binary", &self.binary);

        let len = &self.data.len();
        if self.binary {
            ds.field("data", len);
        } else {
            match str::from_utf8(&self.data) {
                Ok(s) => {
                    const MAX_LINE_WIDTH: usize = 79;
                    const REST_OF_LINE_WIDTH: usize =
                        "ChannelData { id: ChannelId(0), binary: false, data: \"\" }".len();
                    const TUPLE_WIDTH: usize = "(xxx, ..)".len();
                    const DATA_WIDTH: usize = MAX_LINE_WIDTH - REST_OF_LINE_WIDTH;
                    const PREFIX_WIDTH: usize = DATA_WIDTH - TUPLE_WIDTH;
                    if s.is_ascii() {
                        if len > &DATA_WIDTH {
                            let trunc: String = s.chars().take(PREFIX_WIDTH).collect();
                            ds.field("data", &format_args!("({}, \"{}\"..)", len, trunc));
                        } else {
                            ds.field("data", &s);
                        }
                    } else {
                        ds.field("data", len);
                    }
                }
                Err(e) => {
                    ds.field("data", &format_args!("{:?}", (len, &e)));
                }
            }
        }

        ds.finish()
    }
}

#[derive(Debug, Default)]
pub(crate) struct ChannelHandler {
    allocations: Vec<ChannelAllocation>,
    next_channel_id: usize,
    /// Stream IDs recently closed, with the time they were closed.
    /// Excluded from allocation until the cooldown expires.
    closed_stream_ids: Vec<(u16, Instant)>,
}

#[derive(Debug)]
struct ChannelAllocation {
    id: ChannelId,

    /// Stream id, when it is known. This might be delayed awaiting sctp initialization to
    /// know if we are client or server.
    sctp_stream_id: Option<u16>,

    /// Holds the config until it is used in handle_timeout.
    config: Option<ChannelConfig>,
}

const STREAM_ID_COOLDOWN: Duration = Duration::from_secs(2);

impl ChannelHandler {
    pub fn new_channel(&mut self, config: &ChannelConfig) -> ChannelId {
        let id = self.next_channel_id();

        // For out-of-band negotiated, the id is already set.
        let sctp_stream_id = config.negotiated;
        if let Some(sctp_stream_id) = sctp_stream_id {
            let exists = self
                .allocations
                .iter()
                .any(|a| a.sctp_stream_id == Some(sctp_stream_id));
            assert!(
                !exists,
                "sctp_stream_id ({}) exists already",
                sctp_stream_id
            );
        }

        let alloc = ChannelAllocation {
            id,
            sctp_stream_id,
            // The config is none until we confirm we definitely want this channel.
            config: None,
        };

        debug!("Allocate channel id: {:?}", id);
        self.allocations.push(alloc);

        id
    }

    pub fn confirm(&mut self, id: ChannelId, config: ChannelConfig) {
        let a = self
            .allocations
            .iter_mut()
            .find(|a| a.id == id)
            .expect("Entry for issued channel id");
        a.config = Some(config);
    }

    /// For translating sctp stream id to ChannelId. Any event out of sctp goes via this.
    pub fn channel_id_by_stream_id(&self, sctp_stream_id: u16) -> Option<ChannelId> {
        self.allocations
            .iter()
            .find(|a| a.sctp_stream_id == Some(sctp_stream_id))
            .map(|a| a.id)
    }

    /// Look up sctp stream id for channel id.
    pub fn stream_id_by_channel_id(&self, id: ChannelId) -> Option<u16> {
        self.allocations
            .iter()
            .find(|a| a.id == id)
            .and_then(|a| a.sctp_stream_id)
    }

    pub(crate) fn handle_timeout(&mut self, _now: Instant, sctp: &mut RtcSctp) {
        if !sctp.is_inited() {
            return;
        }

        // Allocate sctp channel ids for ones that are missing.
        self.do_allocations(sctp);

        // After do_allocations so we get a channel for any confirmed.
        self.open_channels(sctp);
    }

    /// Allocate next available `ChannelId`.
    fn next_channel_id(&mut self) -> ChannelId {
        let id = self.next_channel_id;
        self.next_channel_id += 1;

        ChannelId(id)
    }

    fn need_allocation(&self) -> bool {
        self.allocations.iter().any(|a| a.sctp_stream_id.is_none())
    }

    fn need_open(&self) -> bool {
        self.allocations.iter().any(|a| a.config.is_some())
    }

    // Do automatic allocations of sctp stream id.
    fn do_allocations(&mut self, sctp: &RtcSctp) {
        if !self.need_allocation() {
            return;
        }

        // RFC 8831
        // Unless otherwise defined or negotiated, the
        // streams are picked based on the DTLS role (the client picks even
        // stream identifiers, and the server picks odd stream identifiers).
        let base = if sctp.is_client() { 0 } else { 1 };

        let mut taken: Vec<u16> = self
            .allocations
            .iter()
            .filter_map(|a| a.sctp_stream_id)
            .chain(self.closed_stream_ids.iter().map(|(id, _)| *id))
            .collect();

        for a in &mut self.allocations {
            if a.sctp_stream_id.is_some() {
                continue;
            }
            // We need to allocate
            let mut proposed = base;

            while taken.contains(&proposed) {
                proposed += 2
            }

            // Found the next free.
            debug!("Associate stream id {:?} => {}", a.id, proposed);
            a.sctp_stream_id = Some(proposed);
            taken.push(proposed);
        }
    }

    // Actually open channels.
    fn open_channels(&mut self, sctp: &mut RtcSctp) {
        for a in &mut self.allocations {
            let Some(config) = a.config.take() else {
                continue;
            };
            let Some(sctp_stream_id) = a.sctp_stream_id else {
                continue;
            };

            debug!("Open stream for: {:?}", a.id);
            sctp.open_stream(sctp_stream_id, config);
        }
    }

    pub fn poll_timeout(&self, sctp: &RtcSctp) -> Option<Instant> {
        if sctp.is_inited() && (self.need_allocation() || self.need_open()) {
            Some(already_happened())
        } else {
            None
        }
    }

    pub fn ensure_channel_id_for(&mut self, sctp_stream_id: u16) {
        let exists = self
            .allocations
            .iter()
            .any(|a| a.sctp_stream_id == Some(sctp_stream_id));

        if !exists {
            let id = self.next_channel_id();
            let alloc = ChannelAllocation {
                id,
                sctp_stream_id: Some(sctp_stream_id),
                config: None,
            };
            self.allocations.push(alloc);
        }
    }

    // NB: Maybe this should still be &mut self or even `self` to prove singular ownership
    pub fn close_channel(&self, id: ChannelId, sctp: &mut RtcSctp) {
        if let Some(sctp_stream_id) = self
            .allocations
            .iter()
            .find(|a| a.id == id)
            .and_then(|s| s.sctp_stream_id)
        {
            sctp.close_stream(sctp_stream_id);
        }
    }

    /// Remove stream IDs from the cooldown list that have expired.
    pub fn expire_closed_stream_ids(&mut self, now: Instant) {
        self.closed_stream_ids
            .retain(|(_, closed_at)| now.duration_since(*closed_at) < STREAM_ID_COOLDOWN);
    }

    pub fn remove_channel(&mut self, id: ChannelId, now: Instant) {
        if let Some(stream_id) = self
            .allocations
            .iter()
            .find(|a| a.id == id)
            .and_then(|a| a.sctp_stream_id)
        {
            self.closed_stream_ids.push((stream_id, now));
        }
        self.allocations.retain(|a| a.id != id)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn channel_id_allocation() {
        let now = Instant::now();
        let mut handler = ChannelHandler::default();

        // allocate first channel, get unique id
        assert_eq!(handler.new_channel(&Default::default()), ChannelId(0));

        // allocate second channel, get unique id
        assert_eq!(handler.new_channel(&Default::default()), ChannelId(1));

        // free channel 0, allocate two more channels and verify that the
        // new channels have unique IDs.
        handler.remove_channel(ChannelId(0), now);
        assert_eq!(handler.new_channel(&Default::default()), ChannelId(2));
        assert_eq!(handler.new_channel(&Default::default()), ChannelId(3));
    }

    #[test]
    fn stream_id_not_reused_during_cooldown() {
        let now = Instant::now();
        let mut handler = ChannelHandler::default();

        // Simulate two channels with known stream IDs (as if do_allocations ran
        // for a client: even IDs 0, 2).
        let id0 = handler.new_channel(&Default::default());
        let _id1 = handler.new_channel(&Default::default());
        // Manually set stream IDs as do_allocations would.
        handler.allocations[0].sctp_stream_id = Some(0);
        handler.allocations[1].sctp_stream_id = Some(2);

        // Close channel 0 (stream ID 0). It should enter cooldown.
        handler.remove_channel(id0, now);
        assert_eq!(handler.closed_stream_ids.len(), 1);
        assert_eq!(handler.closed_stream_ids[0].0, 0);

        // Allocate a new channel and manually assign a stream ID the way
        // do_allocations would — stream 0 should be skipped (in cooldown).
        let _id2 = handler.new_channel(&Default::default());
        // Build the taken list as do_allocations does.
        let taken: Vec<u16> = handler
            .allocations
            .iter()
            .filter_map(|a| a.sctp_stream_id)
            .chain(handler.closed_stream_ids.iter().map(|(id, _)| *id))
            .collect();
        // Stream 0 is in cooldown, stream 2 is active, so next available is 4.
        assert!(taken.contains(&0), "stream 0 should be in cooldown");
        assert!(taken.contains(&2), "stream 2 should be active");

        // After cooldown expires, stream 0 should be available again.
        let after_cooldown = now + STREAM_ID_COOLDOWN;
        handler.expire_closed_stream_ids(after_cooldown);
        assert!(handler.closed_stream_ids.is_empty());

        let taken_after: Vec<u16> = handler
            .allocations
            .iter()
            .filter_map(|a| a.sctp_stream_id)
            .chain(handler.closed_stream_ids.iter().map(|(id, _)| *id))
            .collect();
        assert!(
            !taken_after.contains(&0),
            "stream 0 should be available after cooldown"
        );
    }
}