Skip to main content

turn_client_dimpl/
lib.rs

1// Copyright (C) 2026 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8//
9// SPDX-License-Identifier: MIT OR Apache-2.0
10
11//! #turn-client-dimpl
12//!
13//! TLS TURN client using Dimpl.
14//!
15//! An implementation of a TURN client suitable for TLS over TCP connections and DTLS over UDP
16//! connections.
17//!
18//! Note: no certificate validation is currently performed so this is TLS implementation is not
19//! currently recommended for use.
20
21#![deny(missing_debug_implementations)]
22#![deny(missing_docs)]
23#![cfg_attr(docsrs, feature(doc_cfg))]
24#![deny(clippy::std_instead_of_core)]
25#![deny(clippy::std_instead_of_alloc)]
26#![no_std]
27
28extern crate alloc;
29
30pub use dimpl;
31
32#[cfg(any(feature = "std", test))]
33extern crate std;
34
35use alloc::boxed::Box;
36use alloc::collections::VecDeque;
37use alloc::sync::Arc;
38use alloc::vec::Vec;
39use core::net::{IpAddr, SocketAddr};
40use core::time::Duration;
41
42use stun_proto::agent::Transmit;
43use stun_proto::types::data::Data;
44use stun_proto::Instant;
45
46use stun_proto::types::TransportType;
47
48use tracing::{trace, warn};
49
50use turn_client_proto::api::{
51    BindChannelError, CreatePermissionError, DelayedMessageOrChannelSend, DeleteError, SendError,
52    Socket5Tuple, TcpAllocateError, TcpConnectError, TransmitBuild, TurnClientApi, TurnConfig,
53    TurnEvent, TurnPeerData, TurnPollRet, TurnRecvRet,
54};
55use turn_client_proto::udp::TurnClientUdp;
56
57/// A TURN client that communicates over TLS.
58#[derive(Debug)]
59pub struct TurnClientDimpl {
60    protocol: TurnClientUdp,
61    dtls: Box<dimpl::Dtls>,
62    base_instant: std::time::Instant,
63    base_now: Option<Instant>,
64    connected: bool,
65    pending_write: VecDeque<Transmit<Data<'static>>>,
66    pending_read: VecDeque<TurnPeerData<Vec<u8>>>,
67}
68
69impl TurnClientDimpl {
70    /// Allocate an address on a TURN server to relay data to and from peers.
71    pub fn allocate(
72        local_addr: SocketAddr,
73        remote_addr: SocketAddr,
74        config: TurnConfig,
75        tls_config: Arc<dimpl::Config>,
76    ) -> Self {
77        let cert = dimpl::certificate::generate_self_signed_certificate().unwrap();
78        let base_instant = std::time::Instant::now();
79        let mut dtls = Box::new(dimpl::Dtls::new_auto(tls_config, cert, base_instant));
80        dtls.set_active(true);
81
82        Self {
83            protocol: TurnClientUdp::allocate(local_addr, remote_addr, config),
84            base_instant,
85            base_now: None,
86            dtls,
87            connected: false,
88            pending_read: VecDeque::default(),
89            pending_write: VecDeque::default(),
90        }
91    }
92
93    fn empty_transmit_queue(&mut self, now: Instant) {
94        while let Some(transmit) = self.protocol.poll_transmit(now) {
95            match self.dtls.send_application_data(&transmit.data) {
96                Ok(_) => (),
97                Err(e) => {
98                    warn!("Failure to send data: {e:?}");
99                    continue;
100                }
101            }
102        }
103        self.poll(now);
104    }
105}
106
107impl TurnClientApi for TurnClientDimpl {
108    fn transport(&self) -> TransportType {
109        self.protocol.transport()
110    }
111
112    fn local_addr(&self) -> SocketAddr {
113        self.protocol.local_addr()
114    }
115
116    fn remote_addr(&self) -> SocketAddr {
117        self.protocol.remote_addr()
118    }
119
120    fn poll(&mut self, now: Instant) -> TurnPollRet {
121        let base_now = *self.base_now.get_or_insert(now);
122        let _ = self.dtls.handle_timeout(
123            Instant::from_nanos((now - base_now).as_nanos() as i64).to_std(self.base_instant),
124        );
125
126        let mut out = [0; 2048];
127        let mut earliest_wait = None;
128        loop {
129            let ret = self.dtls.poll_output(&mut out);
130            tracing::error!("dtls poll ret {ret:?}");
131            match ret {
132                dimpl::Output::Packet(p) => {
133                    self.pending_write.push_back(Transmit::new(
134                        Data::from(Box::from(p)),
135                        TransportType::Udp,
136                        self.local_addr(),
137                        self.remote_addr(),
138                    ));
139                    earliest_wait = Some(now);
140                }
141                dimpl::Output::Timeout(time) => {
142                    let wait = Instant::from_nanos((time - self.base_instant).as_nanos() as i64);
143                    tracing::error!(
144                        "time {time:?} base {:?} wait {wait:?} now {now:?}",
145                        self.base_instant
146                    );
147                    if wait == now {
148                        let _ = self.dtls.handle_timeout(time);
149                        continue;
150                    }
151                    if earliest_wait.is_none_or(|earliest| earliest > wait) {
152                        earliest_wait = Some(wait);
153                    }
154                    break;
155                }
156                dimpl::Output::Connected => self.connected = true,
157                // TODO: validate certificate
158                dimpl::Output::PeerCert(_peer_cert) => (),
159                dimpl::Output::KeyingMaterial(_key, _srtp_profile) => (),
160                dimpl::Output::ApplicationData(app_data) => {
161                    let transmit = Transmit::new(
162                        app_data,
163                        TransportType::Udp,
164                        self.remote_addr(),
165                        self.local_addr(),
166                    );
167                    match self.protocol.recv(transmit, now) {
168                        TurnRecvRet::Handled => (),
169                        TurnRecvRet::Ignored(_transmit) => (),
170                        TurnRecvRet::PeerData(peer_data) => {
171                            self.pending_read.push_back(peer_data.into_owned());
172                        }
173                        TurnRecvRet::PeerIcmp {
174                            transport: _,
175                            peer: _,
176                            icmp_type: _,
177                            icmp_code: _,
178                            icmp_data: _,
179                        } => (),
180                    }
181                }
182                _ => (),
183            }
184        }
185
186        if self.connected {
187            self.protocol.poll(now)
188        } else if let Some(earliest) = earliest_wait {
189            TurnPollRet::WaitUntil(earliest)
190        } else {
191            TurnPollRet::WaitUntil(now + Duration::from_secs(600))
192        }
193    }
194
195    fn relayed_addresses(&self) -> impl Iterator<Item = (TransportType, SocketAddr)> + '_ {
196        self.protocol.relayed_addresses()
197    }
198
199    fn permissions(
200        &self,
201        transport: TransportType,
202        relayed: SocketAddr,
203    ) -> impl Iterator<Item = IpAddr> + '_ {
204        self.protocol.permissions(transport, relayed)
205    }
206
207    fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<Data<'static>>> {
208        if self.connected {
209            self.empty_transmit_queue(now);
210        }
211        self.pending_write.pop_front()
212    }
213
214    fn poll_event(&mut self) -> Option<TurnEvent> {
215        self.protocol.poll_event()
216    }
217
218    fn delete(&mut self, now: Instant) -> Result<(), DeleteError> {
219        self.protocol.delete(now)?;
220        self.empty_transmit_queue(now);
221        Ok(())
222    }
223
224    fn create_permission(
225        &mut self,
226        transport: TransportType,
227        peer_addr: IpAddr,
228        now: Instant,
229    ) -> Result<(), CreatePermissionError> {
230        self.protocol.create_permission(transport, peer_addr, now)?;
231        self.empty_transmit_queue(now);
232        Ok(())
233    }
234
235    fn have_permission(&self, transport: TransportType, to: IpAddr) -> bool {
236        self.protocol.have_permission(transport, to)
237    }
238
239    fn bind_channel(
240        &mut self,
241        transport: TransportType,
242        peer_addr: SocketAddr,
243        now: Instant,
244    ) -> Result<(), BindChannelError> {
245        self.protocol.bind_channel(transport, peer_addr, now)?;
246        self.empty_transmit_queue(now);
247        Ok(())
248    }
249
250    fn tcp_connect(&mut self, peer_addr: SocketAddr, now: Instant) -> Result<(), TcpConnectError> {
251        self.protocol.tcp_connect(peer_addr, now)?;
252
253        self.empty_transmit_queue(now);
254
255        Ok(())
256    }
257
258    fn allocated_tcp_socket(
259        &mut self,
260        _id: u32,
261        _five_tuple: Socket5Tuple,
262        _peer_addr: SocketAddr,
263        _local_addr: Option<SocketAddr>,
264        _now: Instant,
265    ) -> Result<(), TcpAllocateError> {
266        Err(TcpAllocateError::NoAllocation)
267    }
268
269    fn tcp_closed(&mut self, local_addr: SocketAddr, remote_addr: SocketAddr, now: Instant) {
270        self.protocol.tcp_closed(local_addr, remote_addr, now);
271    }
272
273    fn send_to<T: AsRef<[u8]> + core::fmt::Debug>(
274        &mut self,
275        transport: TransportType,
276        to: SocketAddr,
277        data: T,
278        now: Instant,
279    ) -> Result<Option<TransmitBuild<DelayedMessageOrChannelSend<T>>>, SendError> {
280        if let Some(transmit) = self.protocol.send_to(transport, to, data, now)? {
281            let transmit = transmit.build();
282            match self.dtls.send_application_data(&transmit.data) {
283                Ok(_) => (),
284                Err(e) => {
285                    warn!("Error when writing plaintext: {e:?}");
286                    return Err(SendError::NoAllocation);
287                }
288            }
289        }
290        self.empty_transmit_queue(now);
291
292        Ok(self.poll_transmit(now).map(|transmit| {
293            TransmitBuild::new(
294                DelayedMessageOrChannelSend::OwnedData(transmit.data.to_vec()),
295                transmit.transport,
296                transmit.from,
297                transmit.to,
298            )
299        }))
300    }
301
302    #[tracing::instrument(
303        name = "turn_dimpl_recv",
304        skip(self, transmit, now),
305        fields(
306            transport = %transmit.transport,
307            from = ?transmit.from,
308            data_len = transmit.data.as_ref().len()
309        )
310    )]
311    fn recv<T: AsRef<[u8]> + core::fmt::Debug>(
312        &mut self,
313        transmit: Transmit<T>,
314        now: Instant,
315    ) -> TurnRecvRet<T> {
316        /* is this data for our client? */
317        if self.transport() != transmit.transport
318            || transmit.to != self.local_addr()
319            || transmit.from != self.remote_addr()
320        {
321            trace!(
322                "received data not directed at us ({} {:?}) but for {} {:?}!",
323                self.transport(),
324                self.local_addr(),
325                transmit.transport,
326                transmit.to,
327            );
328            return TurnRecvRet::Ignored(transmit);
329        };
330
331        match self.dtls.handle_packet(transmit.data.as_ref()) {
332            Ok(_) => (),
333            Err(e) => {
334                warn!("dimpl packet produced error: {e:?}");
335                return TurnRecvRet::Ignored(transmit);
336            }
337        };
338
339        self.poll(now);
340        if let Some(recved) = self.poll_recv(now) {
341            TurnRecvRet::PeerData(recved.into_owned())
342        } else {
343            TurnRecvRet::Handled
344        }
345    }
346
347    fn poll_recv(&mut self, _now: Instant) -> Option<TurnPeerData<Vec<u8>>> {
348        self.pending_read.pop_front()
349    }
350
351    fn protocol_error(&mut self) {
352        self.protocol.protocol_error()
353    }
354}