1#![deny(missing_debug_implementations)]
22#![deny(missing_docs)]
23#![cfg_attr(docsrs, feature(doc_cfg))]
24#![deny(clippy::std_instead_of_core)]
25#![deny(clippy::std_instead_of_alloc)]
26#![no_std]
27
28extern crate alloc;
29
30pub use dimpl;
31
32#[cfg(any(feature = "std", test))]
33extern crate std;
34
35use alloc::boxed::Box;
36use alloc::collections::VecDeque;
37use alloc::sync::Arc;
38use alloc::vec::Vec;
39use core::net::{IpAddr, SocketAddr};
40use core::time::Duration;
41
42use stun_proto::agent::Transmit;
43use stun_proto::types::data::Data;
44use stun_proto::Instant;
45
46use stun_proto::types::TransportType;
47
48use tracing::{trace, warn};
49
50use turn_client_proto::api::{
51 BindChannelError, CreatePermissionError, DelayedMessageOrChannelSend, DeleteError, SendError,
52 Socket5Tuple, TcpAllocateError, TcpConnectError, TransmitBuild, TurnClientApi, TurnConfig,
53 TurnEvent, TurnPeerData, TurnPollRet, TurnRecvRet,
54};
55use turn_client_proto::udp::TurnClientUdp;
56
57#[derive(Debug)]
59pub struct TurnClientDimpl {
60 protocol: TurnClientUdp,
61 dtls: Box<dimpl::Dtls>,
62 base_instant: std::time::Instant,
63 base_now: Option<Instant>,
64 connected: bool,
65 pending_write: VecDeque<Transmit<Data<'static>>>,
66 pending_read: VecDeque<TurnPeerData<Vec<u8>>>,
67}
68
69impl TurnClientDimpl {
70 pub fn allocate(
72 local_addr: SocketAddr,
73 remote_addr: SocketAddr,
74 config: TurnConfig,
75 tls_config: Arc<dimpl::Config>,
76 ) -> Self {
77 let cert = dimpl::certificate::generate_self_signed_certificate().unwrap();
78 let base_instant = std::time::Instant::now();
79 let mut dtls = Box::new(dimpl::Dtls::new_auto(tls_config, cert, base_instant));
80 dtls.set_active(true);
81
82 Self {
83 protocol: TurnClientUdp::allocate(local_addr, remote_addr, config),
84 base_instant,
85 base_now: None,
86 dtls,
87 connected: false,
88 pending_read: VecDeque::default(),
89 pending_write: VecDeque::default(),
90 }
91 }
92
93 fn empty_transmit_queue(&mut self, now: Instant) {
94 while let Some(transmit) = self.protocol.poll_transmit(now) {
95 match self.dtls.send_application_data(&transmit.data) {
96 Ok(_) => (),
97 Err(e) => {
98 warn!("Failure to send data: {e:?}");
99 continue;
100 }
101 }
102 }
103 self.poll(now);
104 }
105}
106
107impl TurnClientApi for TurnClientDimpl {
108 fn transport(&self) -> TransportType {
109 self.protocol.transport()
110 }
111
112 fn local_addr(&self) -> SocketAddr {
113 self.protocol.local_addr()
114 }
115
116 fn remote_addr(&self) -> SocketAddr {
117 self.protocol.remote_addr()
118 }
119
120 fn poll(&mut self, now: Instant) -> TurnPollRet {
121 let base_now = *self.base_now.get_or_insert(now);
122 let _ = self.dtls.handle_timeout(
123 Instant::from_nanos((now - base_now).as_nanos() as i64).to_std(self.base_instant),
124 );
125
126 let mut out = [0; 2048];
127 let mut earliest_wait = None;
128 loop {
129 let ret = self.dtls.poll_output(&mut out);
130 tracing::error!("dtls poll ret {ret:?}");
131 match ret {
132 dimpl::Output::Packet(p) => {
133 self.pending_write.push_back(Transmit::new(
134 Data::from(Box::from(p)),
135 TransportType::Udp,
136 self.local_addr(),
137 self.remote_addr(),
138 ));
139 earliest_wait = Some(now);
140 }
141 dimpl::Output::Timeout(time) => {
142 let wait = Instant::from_nanos((time - self.base_instant).as_nanos() as i64);
143 tracing::error!(
144 "time {time:?} base {:?} wait {wait:?} now {now:?}",
145 self.base_instant
146 );
147 if wait == now {
148 let _ = self.dtls.handle_timeout(time);
149 continue;
150 }
151 if earliest_wait.is_none_or(|earliest| earliest > wait) {
152 earliest_wait = Some(wait);
153 }
154 break;
155 }
156 dimpl::Output::Connected => self.connected = true,
157 dimpl::Output::PeerCert(_peer_cert) => (),
159 dimpl::Output::KeyingMaterial(_key, _srtp_profile) => (),
160 dimpl::Output::ApplicationData(app_data) => {
161 let transmit = Transmit::new(
162 app_data,
163 TransportType::Udp,
164 self.remote_addr(),
165 self.local_addr(),
166 );
167 match self.protocol.recv(transmit, now) {
168 TurnRecvRet::Handled => (),
169 TurnRecvRet::Ignored(_transmit) => (),
170 TurnRecvRet::PeerData(peer_data) => {
171 self.pending_read.push_back(peer_data.into_owned());
172 }
173 TurnRecvRet::PeerIcmp {
174 transport: _,
175 peer: _,
176 icmp_type: _,
177 icmp_code: _,
178 icmp_data: _,
179 } => (),
180 }
181 }
182 _ => (),
183 }
184 }
185
186 if self.connected {
187 self.protocol.poll(now)
188 } else if let Some(earliest) = earliest_wait {
189 TurnPollRet::WaitUntil(earliest)
190 } else {
191 TurnPollRet::WaitUntil(now + Duration::from_secs(600))
192 }
193 }
194
195 fn relayed_addresses(&self) -> impl Iterator<Item = (TransportType, SocketAddr)> + '_ {
196 self.protocol.relayed_addresses()
197 }
198
199 fn permissions(
200 &self,
201 transport: TransportType,
202 relayed: SocketAddr,
203 ) -> impl Iterator<Item = IpAddr> + '_ {
204 self.protocol.permissions(transport, relayed)
205 }
206
207 fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<Data<'static>>> {
208 if self.connected {
209 self.empty_transmit_queue(now);
210 }
211 self.pending_write.pop_front()
212 }
213
214 fn poll_event(&mut self) -> Option<TurnEvent> {
215 self.protocol.poll_event()
216 }
217
218 fn delete(&mut self, now: Instant) -> Result<(), DeleteError> {
219 self.protocol.delete(now)?;
220 self.empty_transmit_queue(now);
221 Ok(())
222 }
223
224 fn create_permission(
225 &mut self,
226 transport: TransportType,
227 peer_addr: IpAddr,
228 now: Instant,
229 ) -> Result<(), CreatePermissionError> {
230 self.protocol.create_permission(transport, peer_addr, now)?;
231 self.empty_transmit_queue(now);
232 Ok(())
233 }
234
235 fn have_permission(&self, transport: TransportType, to: IpAddr) -> bool {
236 self.protocol.have_permission(transport, to)
237 }
238
239 fn bind_channel(
240 &mut self,
241 transport: TransportType,
242 peer_addr: SocketAddr,
243 now: Instant,
244 ) -> Result<(), BindChannelError> {
245 self.protocol.bind_channel(transport, peer_addr, now)?;
246 self.empty_transmit_queue(now);
247 Ok(())
248 }
249
250 fn tcp_connect(&mut self, peer_addr: SocketAddr, now: Instant) -> Result<(), TcpConnectError> {
251 self.protocol.tcp_connect(peer_addr, now)?;
252
253 self.empty_transmit_queue(now);
254
255 Ok(())
256 }
257
258 fn allocated_tcp_socket(
259 &mut self,
260 _id: u32,
261 _five_tuple: Socket5Tuple,
262 _peer_addr: SocketAddr,
263 _local_addr: Option<SocketAddr>,
264 _now: Instant,
265 ) -> Result<(), TcpAllocateError> {
266 Err(TcpAllocateError::NoAllocation)
267 }
268
269 fn tcp_closed(&mut self, local_addr: SocketAddr, remote_addr: SocketAddr, now: Instant) {
270 self.protocol.tcp_closed(local_addr, remote_addr, now);
271 }
272
273 fn send_to<T: AsRef<[u8]> + core::fmt::Debug>(
274 &mut self,
275 transport: TransportType,
276 to: SocketAddr,
277 data: T,
278 now: Instant,
279 ) -> Result<Option<TransmitBuild<DelayedMessageOrChannelSend<T>>>, SendError> {
280 if let Some(transmit) = self.protocol.send_to(transport, to, data, now)? {
281 let transmit = transmit.build();
282 match self.dtls.send_application_data(&transmit.data) {
283 Ok(_) => (),
284 Err(e) => {
285 warn!("Error when writing plaintext: {e:?}");
286 return Err(SendError::NoAllocation);
287 }
288 }
289 }
290 self.empty_transmit_queue(now);
291
292 Ok(self.poll_transmit(now).map(|transmit| {
293 TransmitBuild::new(
294 DelayedMessageOrChannelSend::OwnedData(transmit.data.to_vec()),
295 transmit.transport,
296 transmit.from,
297 transmit.to,
298 )
299 }))
300 }
301
302 #[tracing::instrument(
303 name = "turn_dimpl_recv",
304 skip(self, transmit, now),
305 fields(
306 transport = %transmit.transport,
307 from = ?transmit.from,
308 data_len = transmit.data.as_ref().len()
309 )
310 )]
311 fn recv<T: AsRef<[u8]> + core::fmt::Debug>(
312 &mut self,
313 transmit: Transmit<T>,
314 now: Instant,
315 ) -> TurnRecvRet<T> {
316 if self.transport() != transmit.transport
318 || transmit.to != self.local_addr()
319 || transmit.from != self.remote_addr()
320 {
321 trace!(
322 "received data not directed at us ({} {:?}) but for {} {:?}!",
323 self.transport(),
324 self.local_addr(),
325 transmit.transport,
326 transmit.to,
327 );
328 return TurnRecvRet::Ignored(transmit);
329 };
330
331 match self.dtls.handle_packet(transmit.data.as_ref()) {
332 Ok(_) => (),
333 Err(e) => {
334 warn!("dimpl packet produced error: {e:?}");
335 return TurnRecvRet::Ignored(transmit);
336 }
337 };
338
339 self.poll(now);
340 if let Some(recved) = self.poll_recv(now) {
341 TurnRecvRet::PeerData(recved.into_owned())
342 } else {
343 TurnRecvRet::Handled
344 }
345 }
346
347 fn poll_recv(&mut self, _now: Instant) -> Option<TurnPeerData<Vec<u8>>> {
348 self.pending_read.pop_front()
349 }
350
351 fn protocol_error(&mut self) {
352 self.protocol.protocol_error()
353 }
354}