1pub type Configuration = libdatachannel::Configuration;
2
3pub struct PeerConnection {
4 inner: libdatachannel::PeerConnection,
5 local_description_set_notify: std::sync::Arc<async_notify::Notify>,
6}
7
8impl From<libdatachannel::Error> for crate::Error {
9 fn from(value: libdatachannel::Error) -> Self {
10 Self(value.into())
11 }
12}
13
14#[derive(thiserror::Error, Debug)]
15#[error("{0}")]
16pub struct DataChannelError(String);
17
18impl From<DataChannelError> for crate::Error {
19 fn from(value: DataChannelError) -> Self {
20 Self(value.into())
21 }
22}
23
24impl From<crate::SdpType> for libdatachannel::SdpType {
25 fn from(value: crate::SdpType) -> Self {
26 match value {
27 crate::SdpType::Offer => Self::Offer,
28 crate::SdpType::Answer => Self::Answer,
29 crate::SdpType::Pranswer => Self::Pranswer,
30 crate::SdpType::Rollback => Self::Rollback,
31 }
32 }
33}
34
35impl From<libdatachannel::SdpType> for crate::SdpType {
36 fn from(value: libdatachannel::SdpType) -> Self {
37 match value {
38 libdatachannel::SdpType::Offer => Self::Offer,
39 libdatachannel::SdpType::Answer => Self::Answer,
40 libdatachannel::SdpType::Pranswer => Self::Pranswer,
41 libdatachannel::SdpType::Rollback => Self::Rollback,
42 }
43 }
44}
45
46impl From<crate::Description> for libdatachannel::Description {
47 fn from(value: crate::Description) -> Self {
48 Self {
49 type_: value.type_.into(),
50 sdp: value.sdp,
51 }
52 }
53}
54
55impl From<libdatachannel::Description> for crate::Description {
56 fn from(value: libdatachannel::Description) -> Self {
57 Self {
58 type_: value.type_.into(),
59 sdp: value.sdp,
60 }
61 }
62}
63
64impl From<libdatachannel::GatheringState> for crate::IceGatheringState {
65 fn from(value: libdatachannel::GatheringState) -> Self {
66 match value {
67 libdatachannel::GatheringState::New => Self::New,
68 libdatachannel::GatheringState::InProgress => Self::Gathering,
69 libdatachannel::GatheringState::Complete => Self::Complete,
70 }
71 }
72}
73
74impl From<libdatachannel::State> for crate::PeerConnectionState {
75 fn from(value: libdatachannel::State) -> Self {
76 match value {
77 libdatachannel::State::Connecting => Self::Connecting,
78 libdatachannel::State::Connected => Self::Connected,
79 libdatachannel::State::Disconnected => Self::Disconnected,
80 libdatachannel::State::Failed => Self::Failed,
81 libdatachannel::State::Closed => Self::Closed,
82 }
83 }
84}
85
86impl PeerConnection {
87 pub fn new(config: crate::Configuration) -> Result<Self, crate::Error> {
88 let local_description_set_notify = std::sync::Arc::new(async_notify::Notify::new());
89
90 let mut pc = libdatachannel::PeerConnection::new(libdatachannel::Configuration {
91 ice_servers: config
92 .ice_servers
93 .into_iter()
94 .flat_map(|ice_server| {
95 ice_server
96 .urls
97 .into_iter()
98 .map(|url| {
99 let mid = if let Some(mid) = url.chars().position(|c| c == ':') {
100 mid
101 } else {
102 return url;
103 };
104
105 let (proto, rest) = url.split_at(mid);
106 let rest = &rest[1..];
107
108 if let (Some(username), Some(credential)) =
109 (&ice_server.username, &ice_server.credential)
110 {
111 format!(
112 "{}:{}:{}@{}",
113 proto,
114 urlencoding::encode(username),
115 urlencoding::encode(credential),
116 rest
117 )
118 } else {
119 format!("{}:{}", proto, rest)
120 }
121 })
122 .collect::<Vec<_>>()
123 })
124 .collect::<Vec<_>>(),
125 ice_transport_policy: match config.ice_transport_policy {
126 crate::IceTransportPolicy::All => libdatachannel::TransportPolicy::All,
127 crate::IceTransportPolicy::Relay => libdatachannel::TransportPolicy::Relay,
128 },
129 disable_auto_negotiation: true,
130 ..config.sys
131 })?;
132 pc.set_on_local_description(Some({
133 let local_description_set_notify = std::sync::Arc::clone(&local_description_set_notify);
134 move |_: &str, _: libdatachannel::SdpType| {
135 local_description_set_notify.notify();
136 }
137 }));
138 Ok(Self {
139 inner: pc,
140 local_description_set_notify,
141 })
142 }
143
144 pub fn close(&self) -> Result<(), crate::Error> {
145 self.inner.close()?;
146 Ok(())
147 }
148
149 pub async fn set_local_description(&self, type_: crate::SdpType) -> Result<(), crate::Error> {
150 if type_ == crate::SdpType::Answer
151 && self
152 .local_description()?
153 .map(|d| d.type_ == crate::SdpType::Answer)
154 .unwrap_or(false)
155 {
156 return Ok(());
157 }
158 self.inner.set_local_description(Some(type_.into()))?;
159 self.local_description_set_notify.notified().await;
160 Ok(())
161 }
162
163 pub async fn set_remote_description(
164 &self,
165 description: &crate::Description,
166 ) -> Result<(), crate::Error> {
167 self.inner
168 .set_remote_description(&description.clone().into())?;
169 Ok(())
170 }
171
172 pub fn local_description(&self) -> Result<Option<crate::Description>, crate::Error> {
173 match self.inner.local_description() {
174 Ok(v) => Ok(Some(v.into())),
175 Err(libdatachannel::Error::NotAvail) => Ok(None),
176 Err(e) => Err(e.into()),
177 }
178 }
179
180 pub fn remote_description(&self) -> Result<Option<crate::Description>, crate::Error> {
181 match self.inner.remote_description() {
182 Ok(v) => Ok(Some(v.into())),
183 Err(libdatachannel::Error::NotAvail) => Ok(None),
184 Err(e) => Err(e.into()),
185 }
186 }
187
188 pub async fn add_ice_candidate(&self, cand: Option<&str>) -> Result<(), crate::Error> {
189 self.inner.add_remote_candidate(&cand.unwrap_or(""))?;
190 Ok(())
191 }
192
193 pub fn set_on_ice_candidate(
194 &mut self,
195 cb: Option<impl Fn(Option<&str>) + Send + Sync + 'static>,
196 ) {
197 self.inner.set_on_local_candidate(
198 cb.map(|cb| move |cand: &str| cb(if !cand.is_empty() { Some(cand) } else { None })),
199 )
200 }
201
202 pub fn set_on_ice_gathering_state_change(
203 &mut self,
204 cb: Option<impl Fn(crate::IceGatheringState) + Send + Sync + 'static>,
205 ) {
206 self.inner.set_on_gathering_state_change(
207 cb.map(|cb| move |state: libdatachannel::GatheringState| cb(state.into())),
208 )
209 }
210
211 pub fn set_on_connection_state_change(
212 &mut self,
213 cb: Option<impl Fn(crate::PeerConnectionState) + Send + Sync + 'static>,
214 ) {
215 self.inner
216 .set_on_state_change(cb.map(|cb| move |state: libdatachannel::State| cb(state.into())))
217 }
218
219 pub fn set_on_data_channel(
220 &mut self,
221 cb: Option<impl Fn(DataChannel) + Send + Sync + 'static>,
222 ) {
223 self.inner
224 .set_on_data_channel(cb.map(|cb| move |dc| cb(DataChannel { inner: dc })))
225 }
226
227 pub fn create_data_channel(
228 &self,
229 label: &str,
230 options: crate::DataChannelOptions,
231 ) -> Result<DataChannel, crate::Error> {
232 Ok(DataChannel {
233 inner: self.inner.create_data_channel(
234 label,
235 libdatachannel::DataChannelOptions {
236 reliability: libdatachannel::Reliability {
237 unordered: !options.ordered,
238 unreliable: options.max_packet_life_time.is_some()
239 || options.max_retransmits.is_some(),
240 max_packet_life_time: options.max_packet_life_time.unwrap_or(0) as u32,
241 max_retransmits: options.max_retransmits.unwrap_or(0) as u32,
242 },
243 protocol: options.protocol,
244 negotiated: options.negotiated,
245 stream: options.id,
246 },
247 )?,
248 })
249 }
250}
251
252pub struct DataChannel {
253 inner: libdatachannel::DataChannel,
254}
255
256impl DataChannel {
257 pub fn set_on_open(&mut self, cb: Option<impl Fn() + Send + Sync + 'static>) {
258 self.inner.set_on_open(cb);
259 }
260
261 pub fn set_on_close(&mut self, cb: Option<impl Fn() + Send + Sync + 'static>) {
262 self.inner.set_on_closed(cb);
263 }
264
265 pub fn set_on_buffered_amount_low(&mut self, cb: Option<impl Fn() + Send + Sync + 'static>) {
266 self.inner.set_on_buffered_amount_low(cb);
267 }
268
269 pub fn set_on_error(&mut self, cb: Option<impl Fn(crate::Error) + Send + Sync + 'static>) {
270 self.inner.set_on_error(
271 cb.map(|cb| move |err: &str| cb(DataChannelError(err.to_string()).into())),
272 );
273 }
274
275 pub fn set_on_message(&mut self, cb: Option<impl Fn(&[u8]) + Send + Sync + 'static>) {
276 self.inner.set_on_message(cb);
277 }
278
279 pub fn set_buffered_amount_low_threshold(&self, value: u32) -> Result<(), crate::Error> {
280 self.inner
281 .set_buffered_amount_low_threshold(value as usize)?;
282 Ok(())
283 }
284
285 pub fn buffered_amount(&self) -> Result<u32, crate::Error> {
286 Ok(self.inner.buffered_amount()? as u32)
287 }
288
289 pub fn close(&self) -> Result<(), crate::Error> {
290 self.inner.close()?;
291 Ok(())
292 }
293
294 pub fn send(&self, buf: &[u8]) -> Result<(), crate::Error> {
295 self.inner.send(buf)?;
296 Ok(())
297 }
298}