rtc 0.9.0

Sans-I/O WebRTC implementation in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
/// Integration test for offer/answer between two rtc (sansio) peers (rtc-to-rtc)
///
/// This test simulates the offer-answer example but with both peers using the sansio API.
/// It verifies that two rtc peers can establish a connection, create data channels,
/// and exchange messages without requiring the webrtc library.
use anyhow::Result;
use bytes::BytesMut;
use sansio::Protocol;
use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::Mutex;

use rtc::peer_connection::RTCPeerConnectionBuilder;
use rtc::peer_connection::configuration::RTCConfigurationBuilder;
use rtc::peer_connection::configuration::setting_engine::SettingEngine;
use rtc::peer_connection::event::RTCDataChannelEvent;
use rtc::peer_connection::event::RTCPeerConnectionEvent;
use rtc::peer_connection::message::RTCMessage;
use rtc::peer_connection::state::RTCIceConnectionState;
use rtc::peer_connection::state::RTCPeerConnectionState;
use rtc::peer_connection::transport::RTCDtlsRole;
use rtc::peer_connection::transport::RTCIceCandidateInit;
use rtc::peer_connection::transport::RTCIceServer;
use rtc::peer_connection::transport::{CandidateConfig, CandidateHostConfig, RTCIceCandidate};

const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
const TEST_MESSAGE: &str = "Hello from offer!";
const ECHO_MESSAGE: &str = "Echo from answer!";

/// Test data channel communication between two rtc (sansio) peers
#[tokio::test]
async fn test_offer_answer_rtc_to_rtc() -> Result<()> {
    env_logger::builder()
        .filter_level(log::LevelFilter::Info)
        .is_test(true)
        .try_init()
        .ok();

    log::info!("Starting offer-answer test: rtc offer -> rtc answer");

    // Track received messages
    let offer_received_messages = Arc::new(Mutex::new(Vec::<String>::new()));
    let answer_received_messages = Arc::new(Mutex::new(Vec::<String>::new()));

    // Create offer peer
    let offer_socket = UdpSocket::bind("127.0.0.1:0").await?;
    let offer_local_addr = offer_socket.local_addr()?;
    log::info!("Offer peer bound to {}", offer_local_addr);

    let mut offer_setting_engine = SettingEngine::default();
    offer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Server)?;

    let offer_config = RTCConfigurationBuilder::new()
        .with_ice_servers(vec![RTCIceServer {
            urls: vec!["stun:stun.l.google.com:19302".to_owned()],
            ..Default::default()
        }])
        .build();

    let mut offer_pc = RTCPeerConnectionBuilder::new()
        .with_configuration(offer_config)
        .with_setting_engine(offer_setting_engine)
        .build()?;
    log::info!("Created offer peer connection");

    // Create data channel on offer side
    let dc_label = "test-channel";
    let _ = offer_pc.create_data_channel(dc_label, None)?;
    log::info!("Created data channel: {}", dc_label);

    // Add local candidate for offer peer
    let offer_candidate = CandidateHostConfig {
        base_config: CandidateConfig {
            network: "udp".to_owned(),
            address: offer_local_addr.ip().to_string(),
            port: offer_local_addr.port(),
            component: 1,
            ..Default::default()
        },
        ..Default::default()
    }
    .new_candidate_host()?;
    let offer_candidate_init = RTCIceCandidate::from(&offer_candidate).to_json()?;
    offer_pc.add_local_candidate(offer_candidate_init)?;

    // Create offer
    let offer = offer_pc.create_offer(None)?;
    log::info!("Offer peer created offer");

    // Set local description on offer
    offer_pc.set_local_description(offer.clone())?;
    log::info!("Offer peer set local description {}", offer);

    // Create answer peer
    let answer_socket = UdpSocket::bind("127.0.0.1:0").await?;
    let answer_local_addr = answer_socket.local_addr()?;
    log::info!("Answer peer bound to {}", answer_local_addr);

    let mut answer_setting_engine = SettingEngine::default();
    answer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Client)?;

    let answer_config = RTCConfigurationBuilder::new()
        .with_ice_servers(vec![RTCIceServer {
            urls: vec!["stun:stun.l.google.com:19302".to_owned()],
            ..Default::default()
        }])
        .build();

    let mut answer_pc = RTCPeerConnectionBuilder::new()
        .with_configuration(answer_config)
        .with_setting_engine(answer_setting_engine)
        .build()?;
    log::info!("Created answer peer connection");

    // Set remote description on answer (the offer)
    log::info!("Answer peer set remote description {}", offer);
    answer_pc.set_remote_description(offer)?;

    // Add local candidate for answer peer
    let answer_candidate = CandidateHostConfig {
        base_config: CandidateConfig {
            network: "udp".to_owned(),
            address: answer_local_addr.ip().to_string(),
            port: answer_local_addr.port(),
            component: 1,
            ..Default::default()
        },
        ..Default::default()
    }
    .new_candidate_host()?;
    let answer_candidate_init = RTCIceCandidate::from(&answer_candidate).to_json()?;
    answer_pc.add_local_candidate(answer_candidate_init)?;

    // Create answer
    let answer = answer_pc.create_answer(None)?;
    log::info!("Answer peer created answer");

    // Set local description on answer
    answer_pc.set_local_description(answer.clone())?;
    log::info!("Answer peer set local description {}", answer);

    // Set remote description on answer
    log::info!("Offer peer set remote description {}", answer);
    offer_pc.set_remote_description(answer)?;

    // Add remote candidates (these are actually local candidates for the remote peer)
    // In sansio API, we add the remote peer's local candidate as our remote candidate
    let offer_remote_candidate_init = RTCIceCandidateInit {
        candidate: format!(
            "candidate:1 1 udp 2130706431 {} {} typ host",
            answer_local_addr.ip(),
            answer_local_addr.port()
        ),
        ..Default::default()
    };
    offer_pc.add_local_candidate(offer_remote_candidate_init)?;
    log::info!("Offer peer added remote peer's candidate");

    let answer_remote_candidate_init = RTCIceCandidateInit {
        candidate: format!(
            "candidate:1 1 udp 2130706431 {} {} typ host",
            offer_local_addr.ip(),
            offer_local_addr.port()
        ),
        ..Default::default()
    };
    answer_pc.add_local_candidate(answer_remote_candidate_init)?;
    log::info!("Answer peer added remote peer's candidate");

    // Run event loops for both peers
    let mut offer_buf = vec![0u8; 2000];
    let mut answer_buf = vec![0u8; 2000];
    let mut offer_connected = false;
    let mut answer_connected = false;
    let mut offer_dc_id = None;
    let mut message_sent = false;
    let mut echo_sent = false;

    let start_time = Instant::now();
    let test_timeout = Duration::from_secs(30);

    while start_time.elapsed() < test_timeout {
        // Process offer peer
        while let Some(msg) = offer_pc.poll_write() {
            match offer_socket
                .send_to(&msg.message, msg.transport.peer_addr)
                .await
            {
                Ok(n) => {
                    log::trace!("Offer sent {} bytes to {}", n, msg.transport.peer_addr);
                }
                Err(err) => {
                    log::error!("Offer socket write error: {}", err);
                }
            }
        }

        while let Some(event) = offer_pc.poll_event() {
            match event {
                RTCPeerConnectionEvent::OnIceConnectionStateChangeEvent(state) => {
                    log::info!("Offer ICE connection state: {}", state);
                    if state == RTCIceConnectionState::Failed {
                        return Err(anyhow::anyhow!("Offer ICE connection failed"));
                    }
                }
                RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => {
                    log::info!("Offer peer connection state: {}", state);
                    if state == RTCPeerConnectionState::Failed {
                        return Err(anyhow::anyhow!("Offer peer connection failed"));
                    }
                    if state == RTCPeerConnectionState::Connected {
                        log::info!("Offer peer connection connected!");
                        offer_connected = true;
                    }
                }
                RTCPeerConnectionEvent::OnDataChannel(dc_event) => match dc_event {
                    RTCDataChannelEvent::OnOpen(channel_id) => {
                        log::info!("Offer data channel {} opened", channel_id);
                        offer_dc_id = Some(channel_id);
                    }
                    _ => {}
                },
                _ => {}
            }
        }

        while let Some(message) = offer_pc.poll_read() {
            match message {
                RTCMessage::RtpPacket(_, _) => {}
                RTCMessage::RtcpPacket(_, _) => {}
                RTCMessage::DataChannelMessage(channel_id, data_channel_message) => {
                    let msg_str = String::from_utf8(data_channel_message.data.to_vec())?;
                    log::info!("Offer received message: '{}' from {}", msg_str, channel_id);
                    let mut msgs = offer_received_messages.lock().await;
                    msgs.push(msg_str);
                }
            }
        }

        // Process answer peer
        while let Some(msg) = answer_pc.poll_write() {
            match answer_socket
                .send_to(&msg.message, msg.transport.peer_addr)
                .await
            {
                Ok(n) => {
                    log::trace!("Answer sent {} bytes to {}", n, msg.transport.peer_addr);
                }
                Err(err) => {
                    log::error!("Answer socket write error: {}", err);
                }
            }
        }

        while let Some(event) = answer_pc.poll_event() {
            match event {
                RTCPeerConnectionEvent::OnIceConnectionStateChangeEvent(state) => {
                    log::info!("Answer ICE connection state: {}", state);
                    if state == RTCIceConnectionState::Failed {
                        return Err(anyhow::anyhow!("Answer ICE connection failed"));
                    }
                }
                RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => {
                    log::info!("Answer peer connection state: {}", state);
                    if state == RTCPeerConnectionState::Failed {
                        return Err(anyhow::anyhow!("Answer peer connection failed"));
                    }
                    if state == RTCPeerConnectionState::Connected {
                        log::info!("Answer peer connection connected!");
                        answer_connected = true;
                    }
                }
                RTCPeerConnectionEvent::OnDataChannel(dc_event) => match dc_event {
                    RTCDataChannelEvent::OnOpen(channel_id) => {
                        log::info!("Answer data channel {} opened", channel_id);
                    }
                    _ => {}
                },
                _ => {}
            }
        }

        while let Some(message) = answer_pc.poll_read() {
            match message {
                RTCMessage::RtpPacket(_, _) => {}
                RTCMessage::RtcpPacket(_, _) => {}
                RTCMessage::DataChannelMessage(channel_id, data_channel_message) => {
                    let msg_str = String::from_utf8(data_channel_message.data.to_vec())?;
                    log::info!("Answer received message: '{}'", msg_str);
                    let mut msgs = answer_received_messages.lock().await;
                    msgs.push(msg_str.clone());

                    // Echo back
                    if !echo_sent {
                        if let Some(mut dc) = answer_pc.data_channel(channel_id) {
                            log::info!("Answer echoing: '{}'", ECHO_MESSAGE);
                            dc.send_text(ECHO_MESSAGE.to_string())?;
                            echo_sent = true;
                        }
                    }
                }
            }
        }

        // Send test message from offer once connected
        if offer_connected && offer_dc_id.is_some() && !message_sent {
            if let Some(mut dc) = offer_pc.data_channel(offer_dc_id.unwrap()) {
                log::info!("Offer sending message: '{}'", TEST_MESSAGE);
                dc.send_text(TEST_MESSAGE.to_string())?;
                message_sent = true;
            }
        }

        // Check if test is complete
        let offer_msgs = offer_received_messages.lock().await;
        let answer_msgs = answer_received_messages.lock().await;
        if offer_msgs.len() >= 1 && answer_msgs.len() >= 1 {
            log::info!("Test complete - both peers received messages");
            break;
        }
        drop(offer_msgs);
        drop(answer_msgs);

        // Handle timeouts
        let offer_timeout = offer_pc
            .poll_timeout()
            .unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
        let answer_timeout = answer_pc
            .poll_timeout()
            .unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
        let next_timeout = offer_timeout.min(answer_timeout);
        let delay = next_timeout.saturating_duration_since(Instant::now());

        if delay.is_zero() {
            offer_pc.handle_timeout(Instant::now()).ok();
            answer_pc.handle_timeout(Instant::now()).ok();
            continue;
        }

        // Wait for data or timeout
        let sleep = tokio::time::sleep(delay.min(Duration::from_millis(10)));
        tokio::pin!(sleep);

        tokio::select! {
            _ = sleep => {
                offer_pc.handle_timeout(Instant::now()).ok();
                answer_pc.handle_timeout(Instant::now()).ok();
            }
            Ok((n, peer_addr)) = offer_socket.recv_from(&mut offer_buf) => {
                offer_pc.handle_read(TaggedBytesMut {
                    now: Instant::now(),
                    transport: TransportContext {
                        local_addr: offer_local_addr,
                        peer_addr,
                        ecn: None,
                        transport_protocol: TransportProtocol::UDP,
                    },
                    message: BytesMut::from(&offer_buf[..n]),
                }).ok();
            }
            Ok((n, peer_addr)) = answer_socket.recv_from(&mut answer_buf) => {
                answer_pc.handle_read(TaggedBytesMut {
                    now: Instant::now(),
                    transport: TransportContext {
                        local_addr: answer_local_addr,
                        peer_addr,
                        ecn: None,
                        transport_protocol: TransportProtocol::UDP,
                    },
                    message: BytesMut::from(&answer_buf[..n]),
                }).ok();
            }
        }
    }

    // Verify test results
    let offer_msgs = offer_received_messages.lock().await;
    let answer_msgs = answer_received_messages.lock().await;

    log::info!("Offer received {} messages", offer_msgs.len());
    log::info!("Answer received {} messages", answer_msgs.len());

    assert!(offer_connected, "Offer peer should have connected");
    assert!(answer_connected, "Answer peer should have connected");
    assert!(
        !offer_msgs.is_empty(),
        "Offer peer should have received messages"
    );
    assert!(
        !answer_msgs.is_empty(),
        "Answer peer should have received messages"
    );
    assert_eq!(
        answer_msgs[0], TEST_MESSAGE,
        "Answer should have received the test message"
    );
    assert_eq!(
        offer_msgs[0], ECHO_MESSAGE,
        "Offer should have received the echo message"
    );

    log::info!("Offer-answer test completed successfully!");

    // Clean up
    offer_pc.close()?;
    answer_pc.close()?;

    Ok(())
}