rtc_turn/client/
relay.rs

1use log::{debug, warn};
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::ops::Add;
5use std::time::{Duration, Instant};
6
7use stun::attributes::*;
8use stun::error_code::*;
9use stun::fingerprint::*;
10use stun::integrity::*;
11use stun::message::*;
12use stun::textattrs::*;
13
14use super::permission::*;
15use super::transaction::*;
16use crate::proto;
17
18use crate::client::binding::BindingState;
19use crate::client::{Client, Event, RelayedAddr};
20use shared::error::{Error, Result};
21
22const PERM_REFRESH_INTERVAL: Duration = Duration::from_secs(120);
23// https://datatracker.ietf.org/doc/html/rfc8656#name-permissions-2
24// The Permission Lifetime MUST be 300 seconds (= 5 minutes).
25const PERM_LIFETIME: Duration = Duration::from_secs(300);
26const MAX_RETRY_ATTEMPTS: u16 = 3;
27
28// RelayState is a set of params use by Relay
29pub(crate) struct RelayState {
30    pub(crate) relayed_addr: RelayedAddr,
31    pub(crate) integrity: MessageIntegrity,
32    pub(crate) nonce: Nonce,
33    pub(crate) lifetime: Duration,
34    perm_map: HashMap<SocketAddr, Permission>,
35    refresh_alloc_timer: Instant,
36    refresh_perms_timer: Instant,
37}
38
39impl RelayState {
40    pub(super) fn new(
41        relayed_addr: RelayedAddr,
42        integrity: MessageIntegrity,
43        nonce: Nonce,
44        lifetime: Duration,
45    ) -> Self {
46        debug!("initial lifetime: {} seconds", lifetime.as_secs());
47
48        Self {
49            relayed_addr,
50            integrity,
51            nonce,
52            lifetime,
53            perm_map: HashMap::new(),
54            refresh_alloc_timer: Instant::now().add(lifetime / 2),
55            refresh_perms_timer: Instant::now().add(PERM_REFRESH_INTERVAL),
56        }
57    }
58
59    pub(super) fn set_nonce_from_msg(&mut self, msg: &Message) {
60        // Update nonce
61        match Nonce::get_from_as(msg, ATTR_NONCE) {
62            Ok(nonce) => {
63                self.nonce = nonce;
64                debug!("refresh allocation: 438, got new nonce.");
65            }
66            Err(_) => warn!("refresh allocation: 438 but no nonce."),
67        }
68    }
69}
70
71// Relay is the implementation of the Conn interfaces for UDP Relayed network connections.
72pub struct Relay<'a> {
73    pub(crate) relayed_addr: RelayedAddr,
74    pub(crate) client: &'a mut Client,
75}
76
77impl Relay<'_> {
78    pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<Option<TransactionId>> {
79        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
80            relay
81                .perm_map
82                .entry(peer_addr)
83                .or_insert_with(Permission::default);
84            if let Some(perm) = relay.perm_map.get(&peer_addr)
85                && perm.state() == PermState::Idle
86            {
87                return Ok(Some(
88                    self.create_permissions(&[peer_addr], Some(peer_addr))?,
89                ));
90            }
91            Ok(None)
92        } else {
93            Err(Error::ErrConnClosed)
94        }
95    }
96
97    pub(crate) fn poll_timeout(&self) -> Option<Instant> {
98        if let Some(relay) = self.client.relays.get(&self.relayed_addr) {
99            if relay.refresh_alloc_timer < relay.refresh_perms_timer {
100                Some(relay.refresh_alloc_timer)
101            } else {
102                Some(relay.refresh_perms_timer)
103            }
104        } else {
105            None
106        }
107    }
108
109    pub(crate) fn handle_timeout(&mut self, now: Instant) {
110        let (refresh_alloc_timer, refresh_perms_timer) = if let Some(relay) =
111            self.client.relays.get_mut(&self.relayed_addr)
112        {
113            let refresh_alloc_timer = if relay.refresh_alloc_timer <= now {
114                relay.refresh_alloc_timer = relay.refresh_alloc_timer.add(relay.lifetime / 2);
115                Some(relay.lifetime)
116            } else {
117                None
118            };
119
120            let refresh_perms_timer = if relay.refresh_perms_timer <= now {
121                relay.refresh_perms_timer = relay.refresh_perms_timer.add(PERM_REFRESH_INTERVAL);
122                true
123            } else {
124                false
125            };
126
127            (refresh_alloc_timer, refresh_perms_timer)
128        } else {
129            (None, false)
130        };
131
132        if let Some(lifetime) = refresh_alloc_timer {
133            let _ = self.refresh_allocation(lifetime);
134        }
135        if refresh_perms_timer {
136            let _ = self.refresh_permissions();
137        }
138    }
139
140    pub fn send_to(&mut self, p: &[u8], peer_addr: SocketAddr) -> Result<()> {
141        // check if we have a permission for the destination IP addr
142        let result = if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
143            if let Some(perm) = relay.perm_map.get_mut(&peer_addr) {
144                if perm.state() != PermState::Permitted {
145                    Err(Error::ErrNoPermission)
146                } else {
147                    Ok((relay.integrity.clone(), relay.nonce.clone()))
148                }
149            } else {
150                Err(Error::ErrNoPermission)
151            }
152        } else {
153            Err(Error::ErrConnClosed)
154        };
155
156        let (integrity, nonce) = result?;
157
158        self.send(p, peer_addr, integrity, nonce)
159    }
160
161    fn send(
162        &mut self,
163        p: &[u8],
164        peer_addr: SocketAddr,
165        integrity: MessageIntegrity,
166        nonce: Nonce,
167    ) -> Result<()> {
168        let channel_number = {
169            let (bind_st, bind_at, bind_number, bind_addr) = {
170                let b = if let Some(b) = self.client.binding_mgr.find_by_addr(&peer_addr) {
171                    b
172                } else {
173                    self.client
174                        .binding_mgr
175                        .create(peer_addr)
176                        .ok_or_else(|| Error::Other("Addr not found".to_owned()))?
177                };
178                (b.state(), b.refreshed_at(), b.number, b.addr)
179            };
180
181            if bind_st == BindingState::Idle
182                || bind_st == BindingState::Request
183                || bind_st == BindingState::Failed
184            {
185                // block only callers with the same binding until
186                // the binding transaction has been complete
187                // binding state may have been changed while waiting. check again.
188                if bind_st == BindingState::Idle {
189                    if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
190                        b.set_state(BindingState::Request);
191                    }
192                    self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
193                }
194
195                // send data using SendIndication
196                let mut msg = Message::new();
197                msg.build(&[
198                    Box::new(TransactionId::new()),
199                    Box::new(MessageType::new(METHOD_SEND, CLASS_INDICATION)),
200                    Box::new(proto::data::Data(p.to_vec())),
201                    Box::new(proto::peeraddr::PeerAddress {
202                        ip: peer_addr.ip(),
203                        port: peer_addr.port(),
204                    }),
205                    Box::new(FINGERPRINT),
206                ])?;
207
208                // indication has no transaction (fire-and-forget)
209                self.client
210                    .write_to(&msg.raw, self.client.turn_server_addr()?);
211                return Ok(());
212            }
213
214            // binding is ready
215            // check if the binding needs a refresh
216            if bind_st == BindingState::Ready
217                && Instant::now()
218                    .checked_duration_since(bind_at)
219                    .unwrap_or_else(|| Duration::from_secs(0))
220                    > PERM_LIFETIME
221            {
222                if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
223                    b.set_state(BindingState::Refresh);
224                }
225                self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
226            }
227
228            bind_number
229        };
230
231        // send via ChannelData
232        self.send_channel_data(p, channel_number)
233    }
234
235    // Close closes the connection.
236    // Any blocked ReadFrom or write_to operations will be unblocked and return errors.
237    pub fn close(&mut self) -> Result<()> {
238        self.refresh_allocation(Duration::from_secs(0))
239    }
240
241    fn create_permissions(
242        &mut self,
243        peer_addrs: &[SocketAddr],
244        peer_addr_opt: Option<SocketAddr>,
245    ) -> Result<TransactionId> {
246        let (username, realm) = (self.client.username(), self.client.realm());
247        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
248            let msg = {
249                let mut setters: Vec<Box<dyn Setter>> = vec![
250                    Box::new(TransactionId::new()),
251                    Box::new(MessageType::new(METHOD_CREATE_PERMISSION, CLASS_REQUEST)),
252                ];
253
254                for addr in peer_addrs {
255                    setters.push(Box::new(proto::peeraddr::PeerAddress {
256                        ip: addr.ip(),
257                        port: addr.port(),
258                    }));
259                }
260
261                setters.push(Box::new(username));
262                setters.push(Box::new(realm));
263                setters.push(Box::new(relay.nonce.clone()));
264                setters.push(Box::new(relay.integrity.clone()));
265                setters.push(Box::new(FINGERPRINT));
266
267                let mut msg = Message::new();
268                msg.build(&setters)?;
269                msg
270            };
271
272            Ok(self.client.perform_transaction(
273                &msg,
274                self.client.turn_server_addr()?,
275                TransactionType::CreatePermissionRequest(self.relayed_addr, peer_addr_opt),
276            ))
277        } else {
278            Err(Error::ErrConnClosed)
279        }
280    }
281
282    pub(super) fn handle_create_permission_response(
283        &mut self,
284        res: Message,
285        peer_addr_opt: Option<SocketAddr>,
286    ) -> Result<()> {
287        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
288            if res.typ.class == CLASS_ERROR_RESPONSE {
289                let mut code = ErrorCodeAttribute::default();
290                let result = code.get_from(&res);
291                let err = if result.is_err() {
292                    Error::Other(format!("{}", res.typ))
293                } else if code.code == CODE_STALE_NONCE {
294                    relay.set_nonce_from_msg(&res);
295                    Error::ErrTryAgain
296                } else {
297                    Error::Other(format!("{} (error {})", res.typ, code))
298                };
299                if let Some(peer_addr) = peer_addr_opt {
300                    self.client
301                        .events
302                        .push_back(Event::CreatePermissionError(res.transaction_id, err));
303                    relay.perm_map.remove(&peer_addr);
304                }
305            } else if let Some(peer_addr) = peer_addr_opt
306                && let Some(perm) = relay.perm_map.get_mut(&peer_addr)
307            {
308                perm.set_state(PermState::Permitted);
309                self.client
310                    .events
311                    .push_back(Event::CreatePermissionResponse(
312                        res.transaction_id,
313                        peer_addr,
314                    ));
315            }
316
317            Ok(())
318        } else {
319            Err(Error::ErrConnClosed)
320        }
321    }
322
323    fn refresh_allocation(&mut self, lifetime: Duration) -> Result<()> {
324        let (username, realm) = (self.client.username(), self.client.realm());
325        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
326            let mut msg = Message::new();
327            msg.build(&[
328                Box::new(TransactionId::new()),
329                Box::new(MessageType::new(METHOD_REFRESH, CLASS_REQUEST)),
330                Box::new(proto::lifetime::Lifetime(lifetime)),
331                Box::new(username),
332                Box::new(realm),
333                Box::new(relay.nonce.clone()),
334                Box::new(relay.integrity.clone()),
335                Box::new(FINGERPRINT),
336            ])?;
337
338            let _ = self.client.perform_transaction(
339                &msg,
340                self.client.turn_server_addr()?,
341                TransactionType::RefreshRequest(self.relayed_addr),
342            );
343
344            Ok(())
345        } else {
346            Err(Error::ErrConnClosed)
347        }
348    }
349
350    pub(super) fn handle_refresh_allocation_response(&mut self, res: Message) -> Result<()> {
351        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
352            if res.typ.class == CLASS_ERROR_RESPONSE {
353                let mut code = ErrorCodeAttribute::default();
354                let result = code.get_from(&res);
355                if result.is_err() {
356                    Err(Error::Other(format!("{}", res.typ)))
357                } else if code.code == CODE_STALE_NONCE {
358                    relay.set_nonce_from_msg(&res);
359                    //Error::ErrTryAgain
360                    Ok(())
361                } else {
362                    Err(Error::Other(format!("{} (error {})", res.typ, code)))
363                }
364            } else {
365                // Getting lifetime from response
366                let mut updated_lifetime = proto::lifetime::Lifetime::default();
367                updated_lifetime.get_from(&res)?;
368
369                relay.lifetime = updated_lifetime.0;
370                debug!("updated lifetime: {} seconds", relay.lifetime.as_secs());
371
372                Ok(())
373            }
374        } else {
375            Err(Error::ErrConnClosed)
376        }
377    }
378
379    fn refresh_permissions(&mut self) -> Result<()> {
380        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
381            #[allow(clippy::map_clone)]
382            let addrs: Vec<SocketAddr> = relay.perm_map.keys().map(|addr| *addr).collect();
383            if addrs.is_empty() {
384                debug!("no permission to refresh");
385                return Ok(());
386            }
387            let _ = self.create_permissions(&addrs, None)?;
388            Ok(())
389        } else {
390            Err(Error::ErrConnClosed)
391        }
392    }
393
394    fn channel_bind(
395        &mut self,
396        relayed_addr: RelayedAddr,
397        bind_addr: SocketAddr,
398        bind_number: u16,
399        nonce: Nonce,
400        integrity: MessageIntegrity,
401    ) -> Result<()> {
402        let (msg, turn_server_addr) = {
403            let setters: Vec<Box<dyn Setter>> = vec![
404                Box::new(TransactionId::new()),
405                Box::new(MessageType::new(METHOD_CHANNEL_BIND, CLASS_REQUEST)),
406                Box::new(proto::peeraddr::PeerAddress {
407                    ip: bind_addr.ip(),
408                    port: bind_addr.port(),
409                }),
410                Box::new(proto::channum::ChannelNumber(bind_number)),
411                Box::new(self.client.username()),
412                Box::new(self.client.realm()),
413                Box::new(nonce),
414                Box::new(integrity),
415                Box::new(FINGERPRINT),
416            ];
417
418            let mut msg = Message::new();
419            msg.build(&setters)?;
420
421            (msg, self.client.turn_server_addr()?)
422        };
423
424        debug!("UDPConn.bind call PerformTransaction 1");
425        let _ = self.client.perform_transaction(
426            &msg,
427            turn_server_addr,
428            TransactionType::ChannelBindRequest(relayed_addr, bind_addr),
429        );
430
431        Ok(())
432    }
433
434    pub(super) fn handle_channel_bind_response(
435        &mut self,
436        res: Message,
437        bind_addr: SocketAddr,
438    ) -> Result<()> {
439        if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
440            let result = if res.typ.class == CLASS_ERROR_RESPONSE {
441                let mut code = ErrorCodeAttribute::default();
442                let result = code.get_from(&res);
443                if result.is_err() {
444                    Err(Error::Other(format!("{}", res.typ)))
445                } else if code.code == CODE_STALE_NONCE {
446                    relay.set_nonce_from_msg(&res);
447                    Err(Error::ErrTryAgain)
448                } else {
449                    Err(Error::Other(format!("{} (error {})", res.typ, code)))
450                }
451            } else if res.typ != MessageType::new(METHOD_CHANNEL_BIND, CLASS_SUCCESS_RESPONSE) {
452                Err(Error::ErrUnexpectedResponse)
453            } else {
454                Ok(())
455            };
456
457            if let Err(err) = result {
458                if Error::ErrUnexpectedResponse != err {
459                    self.client.binding_mgr.delete_by_addr(&bind_addr);
460                } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
461                    b.set_state(BindingState::Failed);
462                }
463
464                // keep going...
465                warn!("bind() failed: {}", err);
466            } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
467                b.set_refreshed_at(Instant::now());
468                b.set_state(BindingState::Ready);
469                debug!("channel binding successful: {}", bind_addr);
470            }
471            Ok(())
472        } else {
473            Err(Error::ErrConnClosed)
474        }
475    }
476
477    fn send_channel_data(&mut self, data: &[u8], channel_number: u16) -> Result<()> {
478        let mut ch_data = proto::chandata::ChannelData {
479            data: data.to_vec(),
480            number: proto::channum::ChannelNumber(channel_number),
481            ..Default::default()
482        };
483        ch_data.encode();
484
485        self.client
486            .write_to(&ch_data.raw, self.client.turn_server_addr()?);
487
488        Ok(())
489    }
490}