1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use cs_utils::random_str;
use tokio::try_join;
use tokio::io::duplex;
use webrtc::{peer_connection::configuration::RTCConfiguration, ice_transport::ice_server::RTCIceServer};
use connection_utils::{Channel, Connected};
use crate::rtc_connection::RtcConnection;
type TConnection = Box<dyn Connected>;
type TChannel = Box<dyn Channel>;
pub async fn create_connection_pair() -> (TConnection, TConnection) {
let (stream1, stream2 ) = duplex(4096);
let rtc_config1 = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
let rtc_config2 = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
let (client_connection, server_connection) = try_join!(
tokio::spawn(async move {
let connection = RtcConnection::new(
Box::new(stream1),
rtc_config1,
)
.await
.expect("Cannot create RTC connection.");
return connection
.connect()
.await
.expect("Failed to connect.");
}),
tokio::spawn(async move {
let connection = RtcConnection::new(
Box::new(stream2),
rtc_config2,
)
.await
.expect("Cannot create RTC connection.");
return connection
.listen()
.await
.expect("Failed to connect.");
}),
).unwrap();
return (client_connection, server_connection);
}
pub async fn create_channel_pair(
mut client_connection: TConnection,
mut server_connection: TConnection,
) -> ((TConnection, TChannel), (TConnection, TChannel)) {
let data_channel_label = format!("channel:{}", random_str(8));
let (client_channel, server_channel) = try_join!(
tokio::spawn(async move {
let data_channel = client_connection
.channel(data_channel_label)
.await
.expect("Cannot create data channel.");
return (client_connection, data_channel);
}),
tokio::spawn(async move {
let mut on_remote_channel = server_connection.on_remote_channel()
.expect("Cannot get \"on_remote_channel\" stream.");
while let Some(channel) = on_remote_channel.recv().await {
server_connection.off_remote_channel(on_remote_channel).unwrap();
return (server_connection, channel);
}
panic!("Failed to receive a remote data channel.");
}),
).unwrap();
return (client_channel, server_channel);
}
pub async fn create_channels(
channels_count: usize,
) -> Vec<(TChannel, TChannel)> {
let (
mut client_connection,
mut server_connection,
) = create_connection_pair().await;
let channels = {
let mut result = vec![];
for _ in 0..channels_count {
let (
(client_connection2, client_channel),
(server_connection2, server_channel)) = create_channel_pair(
client_connection,
server_connection,
).await;
client_connection = client_connection2;
server_connection = server_connection2;
result.push((client_channel, server_channel));
}
result
};
return channels;
}