1use log::{debug, warn};
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::ops::Add;
5use std::time::{Duration, Instant};
6
7use stun::attributes::*;
8use stun::error_code::*;
9use stun::fingerprint::*;
10use stun::integrity::*;
11use stun::message::*;
12use stun::textattrs::*;
13
14use super::permission::*;
15use super::transaction::*;
16use crate::proto;
17
18use crate::client::binding::BindingState;
19use crate::client::{Client, Event, RelayedAddr};
20use shared::error::{Error, Result};
21
22const PERM_REFRESH_INTERVAL: Duration = Duration::from_secs(120);
23const PERM_LIFETIME: Duration = Duration::from_secs(300);
26const MAX_RETRY_ATTEMPTS: u16 = 3;
27
28pub(crate) struct RelayState {
30 pub(crate) relayed_addr: RelayedAddr,
31 pub(crate) integrity: MessageIntegrity,
32 pub(crate) nonce: Nonce,
33 pub(crate) lifetime: Duration,
34 perm_map: HashMap<SocketAddr, Permission>,
35 refresh_alloc_timer: Instant,
36 refresh_perms_timer: Instant,
37}
38
39impl RelayState {
40 pub(super) fn new(
41 relayed_addr: RelayedAddr,
42 integrity: MessageIntegrity,
43 nonce: Nonce,
44 lifetime: Duration,
45 ) -> Self {
46 debug!("initial lifetime: {} seconds", lifetime.as_secs());
47
48 Self {
49 relayed_addr,
50 integrity,
51 nonce,
52 lifetime,
53 perm_map: HashMap::new(),
54 refresh_alloc_timer: Instant::now().add(lifetime / 2),
55 refresh_perms_timer: Instant::now().add(PERM_REFRESH_INTERVAL),
56 }
57 }
58
59 pub(super) fn set_nonce_from_msg(&mut self, msg: &Message) {
60 match Nonce::get_from_as(msg, ATTR_NONCE) {
62 Ok(nonce) => {
63 self.nonce = nonce;
64 debug!("refresh allocation: 438, got new nonce.");
65 }
66 Err(_) => warn!("refresh allocation: 438 but no nonce."),
67 }
68 }
69}
70
71pub struct Relay<'a> {
73 pub(crate) relayed_addr: RelayedAddr,
74 pub(crate) client: &'a mut Client,
75}
76
77impl Relay<'_> {
78 pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<Option<TransactionId>> {
79 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
80 relay
81 .perm_map
82 .entry(peer_addr)
83 .or_insert_with(Permission::default);
84 if let Some(perm) = relay.perm_map.get(&peer_addr)
85 && perm.state() == PermState::Idle
86 {
87 return Ok(Some(
88 self.create_permissions(&[peer_addr], Some(peer_addr))?,
89 ));
90 }
91 Ok(None)
92 } else {
93 Err(Error::ErrConnClosed)
94 }
95 }
96
97 pub(crate) fn poll_timeout(&self) -> Option<Instant> {
98 if let Some(relay) = self.client.relays.get(&self.relayed_addr) {
99 if relay.refresh_alloc_timer < relay.refresh_perms_timer {
100 Some(relay.refresh_alloc_timer)
101 } else {
102 Some(relay.refresh_perms_timer)
103 }
104 } else {
105 None
106 }
107 }
108
109 pub(crate) fn handle_timeout(&mut self, now: Instant) {
110 let (refresh_alloc_timer, refresh_perms_timer) = if let Some(relay) =
111 self.client.relays.get_mut(&self.relayed_addr)
112 {
113 let refresh_alloc_timer = if relay.refresh_alloc_timer <= now {
114 relay.refresh_alloc_timer = relay.refresh_alloc_timer.add(relay.lifetime / 2);
115 Some(relay.lifetime)
116 } else {
117 None
118 };
119
120 let refresh_perms_timer = if relay.refresh_perms_timer <= now {
121 relay.refresh_perms_timer = relay.refresh_perms_timer.add(PERM_REFRESH_INTERVAL);
122 true
123 } else {
124 false
125 };
126
127 (refresh_alloc_timer, refresh_perms_timer)
128 } else {
129 (None, false)
130 };
131
132 if let Some(lifetime) = refresh_alloc_timer {
133 let _ = self.refresh_allocation(lifetime);
134 }
135 if refresh_perms_timer {
136 let _ = self.refresh_permissions();
137 }
138 }
139
140 pub fn send_to(&mut self, p: &[u8], peer_addr: SocketAddr) -> Result<()> {
141 let result = if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
143 if let Some(perm) = relay.perm_map.get_mut(&peer_addr) {
144 if perm.state() != PermState::Permitted {
145 Err(Error::ErrNoPermission)
146 } else {
147 Ok((relay.integrity.clone(), relay.nonce.clone()))
148 }
149 } else {
150 Err(Error::ErrNoPermission)
151 }
152 } else {
153 Err(Error::ErrConnClosed)
154 };
155
156 let (integrity, nonce) = result?;
157
158 self.send(p, peer_addr, integrity, nonce)
159 }
160
161 fn send(
162 &mut self,
163 p: &[u8],
164 peer_addr: SocketAddr,
165 integrity: MessageIntegrity,
166 nonce: Nonce,
167 ) -> Result<()> {
168 let channel_number = {
169 let (bind_st, bind_at, bind_number, bind_addr) = {
170 let b = if let Some(b) = self.client.binding_mgr.find_by_addr(&peer_addr) {
171 b
172 } else {
173 self.client
174 .binding_mgr
175 .create(peer_addr)
176 .ok_or_else(|| Error::Other("Addr not found".to_owned()))?
177 };
178 (b.state(), b.refreshed_at(), b.number, b.addr)
179 };
180
181 if bind_st == BindingState::Idle
182 || bind_st == BindingState::Request
183 || bind_st == BindingState::Failed
184 {
185 if bind_st == BindingState::Idle {
189 if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
190 b.set_state(BindingState::Request);
191 }
192 self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
193 }
194
195 let mut msg = Message::new();
197 msg.build(&[
198 Box::new(TransactionId::new()),
199 Box::new(MessageType::new(METHOD_SEND, CLASS_INDICATION)),
200 Box::new(proto::data::Data(p.to_vec())),
201 Box::new(proto::peeraddr::PeerAddress {
202 ip: peer_addr.ip(),
203 port: peer_addr.port(),
204 }),
205 Box::new(FINGERPRINT),
206 ])?;
207
208 self.client
210 .write_to(&msg.raw, self.client.turn_server_addr()?);
211 return Ok(());
212 }
213
214 if bind_st == BindingState::Ready
217 && Instant::now()
218 .checked_duration_since(bind_at)
219 .unwrap_or_else(|| Duration::from_secs(0))
220 > PERM_LIFETIME
221 {
222 if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
223 b.set_state(BindingState::Refresh);
224 }
225 self.channel_bind(self.relayed_addr, bind_addr, bind_number, nonce, integrity)?;
226 }
227
228 bind_number
229 };
230
231 self.send_channel_data(p, channel_number)
233 }
234
235 pub fn close(&mut self) -> Result<()> {
238 self.refresh_allocation(Duration::from_secs(0))
239 }
240
241 fn create_permissions(
242 &mut self,
243 peer_addrs: &[SocketAddr],
244 peer_addr_opt: Option<SocketAddr>,
245 ) -> Result<TransactionId> {
246 let (username, realm) = (self.client.username(), self.client.realm());
247 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
248 let msg = {
249 let mut setters: Vec<Box<dyn Setter>> = vec![
250 Box::new(TransactionId::new()),
251 Box::new(MessageType::new(METHOD_CREATE_PERMISSION, CLASS_REQUEST)),
252 ];
253
254 for addr in peer_addrs {
255 setters.push(Box::new(proto::peeraddr::PeerAddress {
256 ip: addr.ip(),
257 port: addr.port(),
258 }));
259 }
260
261 setters.push(Box::new(username));
262 setters.push(Box::new(realm));
263 setters.push(Box::new(relay.nonce.clone()));
264 setters.push(Box::new(relay.integrity.clone()));
265 setters.push(Box::new(FINGERPRINT));
266
267 let mut msg = Message::new();
268 msg.build(&setters)?;
269 msg
270 };
271
272 Ok(self.client.perform_transaction(
273 &msg,
274 self.client.turn_server_addr()?,
275 TransactionType::CreatePermissionRequest(self.relayed_addr, peer_addr_opt),
276 ))
277 } else {
278 Err(Error::ErrConnClosed)
279 }
280 }
281
282 pub(super) fn handle_create_permission_response(
283 &mut self,
284 res: Message,
285 peer_addr_opt: Option<SocketAddr>,
286 ) -> Result<()> {
287 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
288 if res.typ.class == CLASS_ERROR_RESPONSE {
289 let mut code = ErrorCodeAttribute::default();
290 let result = code.get_from(&res);
291 let err = if result.is_err() {
292 Error::Other(format!("{}", res.typ))
293 } else if code.code == CODE_STALE_NONCE {
294 relay.set_nonce_from_msg(&res);
295 Error::ErrTryAgain
296 } else {
297 Error::Other(format!("{} (error {})", res.typ, code))
298 };
299 if let Some(peer_addr) = peer_addr_opt {
300 self.client
301 .events
302 .push_back(Event::CreatePermissionError(res.transaction_id, err));
303 relay.perm_map.remove(&peer_addr);
304 }
305 } else if let Some(peer_addr) = peer_addr_opt
306 && let Some(perm) = relay.perm_map.get_mut(&peer_addr)
307 {
308 perm.set_state(PermState::Permitted);
309 self.client
310 .events
311 .push_back(Event::CreatePermissionResponse(
312 res.transaction_id,
313 peer_addr,
314 ));
315 }
316
317 Ok(())
318 } else {
319 Err(Error::ErrConnClosed)
320 }
321 }
322
323 fn refresh_allocation(&mut self, lifetime: Duration) -> Result<()> {
324 let (username, realm) = (self.client.username(), self.client.realm());
325 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
326 let mut msg = Message::new();
327 msg.build(&[
328 Box::new(TransactionId::new()),
329 Box::new(MessageType::new(METHOD_REFRESH, CLASS_REQUEST)),
330 Box::new(proto::lifetime::Lifetime(lifetime)),
331 Box::new(username),
332 Box::new(realm),
333 Box::new(relay.nonce.clone()),
334 Box::new(relay.integrity.clone()),
335 Box::new(FINGERPRINT),
336 ])?;
337
338 let _ = self.client.perform_transaction(
339 &msg,
340 self.client.turn_server_addr()?,
341 TransactionType::RefreshRequest(self.relayed_addr),
342 );
343
344 Ok(())
345 } else {
346 Err(Error::ErrConnClosed)
347 }
348 }
349
350 pub(super) fn handle_refresh_allocation_response(&mut self, res: Message) -> Result<()> {
351 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
352 if res.typ.class == CLASS_ERROR_RESPONSE {
353 let mut code = ErrorCodeAttribute::default();
354 let result = code.get_from(&res);
355 if result.is_err() {
356 Err(Error::Other(format!("{}", res.typ)))
357 } else if code.code == CODE_STALE_NONCE {
358 relay.set_nonce_from_msg(&res);
359 Ok(())
361 } else {
362 Err(Error::Other(format!("{} (error {})", res.typ, code)))
363 }
364 } else {
365 let mut updated_lifetime = proto::lifetime::Lifetime::default();
367 updated_lifetime.get_from(&res)?;
368
369 relay.lifetime = updated_lifetime.0;
370 debug!("updated lifetime: {} seconds", relay.lifetime.as_secs());
371
372 Ok(())
373 }
374 } else {
375 Err(Error::ErrConnClosed)
376 }
377 }
378
379 fn refresh_permissions(&mut self) -> Result<()> {
380 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
381 #[allow(clippy::map_clone)]
382 let addrs: Vec<SocketAddr> = relay.perm_map.keys().map(|addr| *addr).collect();
383 if addrs.is_empty() {
384 debug!("no permission to refresh");
385 return Ok(());
386 }
387 let _ = self.create_permissions(&addrs, None)?;
388 Ok(())
389 } else {
390 Err(Error::ErrConnClosed)
391 }
392 }
393
394 fn channel_bind(
395 &mut self,
396 relayed_addr: RelayedAddr,
397 bind_addr: SocketAddr,
398 bind_number: u16,
399 nonce: Nonce,
400 integrity: MessageIntegrity,
401 ) -> Result<()> {
402 let (msg, turn_server_addr) = {
403 let setters: Vec<Box<dyn Setter>> = vec![
404 Box::new(TransactionId::new()),
405 Box::new(MessageType::new(METHOD_CHANNEL_BIND, CLASS_REQUEST)),
406 Box::new(proto::peeraddr::PeerAddress {
407 ip: bind_addr.ip(),
408 port: bind_addr.port(),
409 }),
410 Box::new(proto::channum::ChannelNumber(bind_number)),
411 Box::new(self.client.username()),
412 Box::new(self.client.realm()),
413 Box::new(nonce),
414 Box::new(integrity),
415 Box::new(FINGERPRINT),
416 ];
417
418 let mut msg = Message::new();
419 msg.build(&setters)?;
420
421 (msg, self.client.turn_server_addr()?)
422 };
423
424 debug!("UDPConn.bind call PerformTransaction 1");
425 let _ = self.client.perform_transaction(
426 &msg,
427 turn_server_addr,
428 TransactionType::ChannelBindRequest(relayed_addr, bind_addr),
429 );
430
431 Ok(())
432 }
433
434 pub(super) fn handle_channel_bind_response(
435 &mut self,
436 res: Message,
437 bind_addr: SocketAddr,
438 ) -> Result<()> {
439 if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
440 let result = if res.typ.class == CLASS_ERROR_RESPONSE {
441 let mut code = ErrorCodeAttribute::default();
442 let result = code.get_from(&res);
443 if result.is_err() {
444 Err(Error::Other(format!("{}", res.typ)))
445 } else if code.code == CODE_STALE_NONCE {
446 relay.set_nonce_from_msg(&res);
447 Err(Error::ErrTryAgain)
448 } else {
449 Err(Error::Other(format!("{} (error {})", res.typ, code)))
450 }
451 } else if res.typ != MessageType::new(METHOD_CHANNEL_BIND, CLASS_SUCCESS_RESPONSE) {
452 Err(Error::ErrUnexpectedResponse)
453 } else {
454 Ok(())
455 };
456
457 if let Err(err) = result {
458 if Error::ErrUnexpectedResponse != err {
459 self.client.binding_mgr.delete_by_addr(&bind_addr);
460 } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
461 b.set_state(BindingState::Failed);
462 }
463
464 warn!("bind() failed: {}", err);
466 } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) {
467 b.set_refreshed_at(Instant::now());
468 b.set_state(BindingState::Ready);
469 debug!("channel binding successful: {}", bind_addr);
470 }
471 Ok(())
472 } else {
473 Err(Error::ErrConnClosed)
474 }
475 }
476
477 fn send_channel_data(&mut self, data: &[u8], channel_number: u16) -> Result<()> {
478 let mut ch_data = proto::chandata::ChannelData {
479 data: data.to_vec(),
480 number: proto::channum::ChannelNumber(channel_number),
481 ..Default::default()
482 };
483 ch_data.encode();
484
485 self.client
486 .write_to(&ch_data.raw, self.client.turn_server_addr()?);
487
488 Ok(())
489 }
490}