1use std::net::SocketAddr;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use stun::agent::*;
7use stun::attributes::*;
8use stun::fingerprint::*;
9use stun::integrity::*;
10use stun::message::*;
11use stun::textattrs::*;
12use tokio::time::{Duration, Instant};
13
14use crate::agent::agent_internal::*;
15use crate::candidate::*;
16use crate::control::*;
17use crate::priority::*;
18use crate::use_candidate::*;
19
20#[async_trait]
21trait ControllingSelector {
22 async fn start(&self);
23 async fn contact_candidates(&self);
24 async fn ping_candidate(
25 &self,
26 local: &Arc<dyn Candidate + Send + Sync>,
27 remote: &Arc<dyn Candidate + Send + Sync>,
28 );
29 async fn handle_success_response(
30 &self,
31 m: &Message,
32 local: &Arc<dyn Candidate + Send + Sync>,
33 remote: &Arc<dyn Candidate + Send + Sync>,
34 remote_addr: SocketAddr,
35 );
36 async fn handle_binding_request(
37 &self,
38 m: &Message,
39 local: &Arc<dyn Candidate + Send + Sync>,
40 remote: &Arc<dyn Candidate + Send + Sync>,
41 );
42}
43
44#[async_trait]
45trait ControlledSelector {
46 async fn start(&self);
47 async fn contact_candidates(&self);
48 async fn ping_candidate(
49 &self,
50 local: &Arc<dyn Candidate + Send + Sync>,
51 remote: &Arc<dyn Candidate + Send + Sync>,
52 );
53 async fn handle_success_response(
54 &self,
55 m: &Message,
56 local: &Arc<dyn Candidate + Send + Sync>,
57 remote: &Arc<dyn Candidate + Send + Sync>,
58 remote_addr: SocketAddr,
59 );
60 async fn handle_binding_request(
61 &self,
62 m: &Message,
63 local: &Arc<dyn Candidate + Send + Sync>,
64 remote: &Arc<dyn Candidate + Send + Sync>,
65 );
66}
67
68impl AgentInternal {
69 fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool {
70 let start_time = *self.start_time.lock();
71 match c.candidate_type() {
72 CandidateType::Host => {
73 Instant::now()
74 .checked_duration_since(start_time)
75 .unwrap_or_else(|| Duration::from_secs(0))
76 .as_nanos()
77 > self.host_acceptance_min_wait.as_nanos()
78 }
79 CandidateType::ServerReflexive => {
80 Instant::now()
81 .checked_duration_since(start_time)
82 .unwrap_or_else(|| Duration::from_secs(0))
83 .as_nanos()
84 > self.srflx_acceptance_min_wait.as_nanos()
85 }
86 CandidateType::PeerReflexive => {
87 Instant::now()
88 .checked_duration_since(start_time)
89 .unwrap_or_else(|| Duration::from_secs(0))
90 .as_nanos()
91 > self.prflx_acceptance_min_wait.as_nanos()
92 }
93 CandidateType::Relay => {
94 Instant::now()
95 .checked_duration_since(start_time)
96 .unwrap_or_else(|| Duration::from_secs(0))
97 .as_nanos()
98 > self.relay_acceptance_min_wait.as_nanos()
99 }
100 CandidateType::Unspecified => {
101 log::error!(
102 "is_nominatable invalid candidate type {}",
103 c.candidate_type()
104 );
105 false
106 }
107 }
108 }
109
110 async fn nominate_pair(&self) {
111 let result = {
112 let nominated_pair = self.nominated_pair.lock().await;
113 if let Some(pair) = &*nominated_pair {
114 let (msg, result) = {
120 let ufrag_pwd = self.ufrag_pwd.lock().await;
121 let username =
122 ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
123 let mut msg = Message::new();
124 let result = msg.build(&[
125 Box::new(BINDING_REQUEST),
126 Box::new(TransactionId::new()),
127 Box::new(Username::new(ATTR_USERNAME, username)),
128 Box::<UseCandidateAttr>::default(),
129 Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
130 Box::new(PriorityAttr(pair.local.priority())),
131 Box::new(MessageIntegrity::new_short_term_integrity(
132 ufrag_pwd.remote_pwd.clone(),
133 )),
134 Box::new(FINGERPRINT),
135 ]);
136 (msg, result)
137 };
138
139 if let Err(err) = result {
140 log::error!("{err}");
141 None
142 } else {
143 log::trace!(
144 "ping STUN (nominate candidate pair from {} to {}",
145 pair.local,
146 pair.remote
147 );
148 let local = pair.local.clone();
149 let remote = pair.remote.clone();
150 Some((msg, local, remote))
151 }
152 } else {
153 None
154 }
155 };
156
157 if let Some((msg, local, remote)) = result {
158 self.send_binding_request(&msg, &local, &remote).await;
159 }
160 }
161
162 pub(crate) async fn start(&self) {
163 if self.is_controlling.load(Ordering::SeqCst) {
164 ControllingSelector::start(self).await;
165 } else {
166 ControlledSelector::start(self).await;
167 }
168 }
169
170 pub(crate) async fn contact_candidates(&self) {
171 if self.is_controlling.load(Ordering::SeqCst) {
172 ControllingSelector::contact_candidates(self).await;
173 } else {
174 ControlledSelector::contact_candidates(self).await;
175 }
176 }
177
178 pub(crate) async fn ping_candidate(
179 &self,
180 local: &Arc<dyn Candidate + Send + Sync>,
181 remote: &Arc<dyn Candidate + Send + Sync>,
182 ) {
183 if self.is_controlling.load(Ordering::SeqCst) {
184 ControllingSelector::ping_candidate(self, local, remote).await;
185 } else {
186 ControlledSelector::ping_candidate(self, local, remote).await;
187 }
188 }
189
190 pub(crate) async fn handle_success_response(
191 &self,
192 m: &Message,
193 local: &Arc<dyn Candidate + Send + Sync>,
194 remote: &Arc<dyn Candidate + Send + Sync>,
195 remote_addr: SocketAddr,
196 ) {
197 if self.is_controlling.load(Ordering::SeqCst) {
198 ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await;
199 } else {
200 ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await;
201 }
202 }
203
204 pub(crate) async fn handle_binding_request(
205 &self,
206 m: &Message,
207 local: &Arc<dyn Candidate + Send + Sync>,
208 remote: &Arc<dyn Candidate + Send + Sync>,
209 ) {
210 if self.is_controlling.load(Ordering::SeqCst) {
211 ControllingSelector::handle_binding_request(self, m, local, remote).await;
212 } else {
213 ControlledSelector::handle_binding_request(self, m, local, remote).await;
214 }
215 }
216}
217
218#[async_trait]
219impl ControllingSelector for AgentInternal {
220 async fn start(&self) {
221 {
222 let mut nominated_pair = self.nominated_pair.lock().await;
223 *nominated_pair = None;
224 }
225 *self.start_time.lock() = Instant::now();
226 }
227
228 async fn contact_candidates(&self) {
229 if self.lite.load(Ordering::SeqCst) {
231 log::trace!("now falling back to full agent");
233 }
234
235 let nominated_pair_is_some = {
236 let nominated_pair = self.nominated_pair.lock().await;
237 nominated_pair.is_some()
238 };
239
240 if self.agent_conn.get_selected_pair().is_some() {
241 if self.validate_selected_pair().await {
242 log::trace!("[{}]: checking keepalive", self.get_name());
243 self.check_keepalive().await;
244 }
245 } else if nominated_pair_is_some {
246 self.nominate_pair().await;
247 } else {
248 let has_nominated_pair =
249 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
250 self.is_nominatable(&p.local) && self.is_nominatable(&p.remote)
251 } else {
252 false
253 };
254
255 if has_nominated_pair {
256 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
257 log::trace!(
258 "Nominatable pair found, nominating ({}, {})",
259 p.local,
260 p.remote
261 );
262 p.nominated.store(true, Ordering::SeqCst);
263 {
264 let mut nominated_pair = self.nominated_pair.lock().await;
265 *nominated_pair = Some(p);
266 }
267 }
268
269 self.nominate_pair().await;
270 } else {
271 self.ping_all_candidates().await;
272 }
273 }
274 }
275
276 async fn ping_candidate(
277 &self,
278 local: &Arc<dyn Candidate + Send + Sync>,
279 remote: &Arc<dyn Candidate + Send + Sync>,
280 ) {
281 let (msg, result) = {
282 let ufrag_pwd = self.ufrag_pwd.lock().await;
283 let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
284 let mut msg = Message::new();
285 let result = msg.build(&[
286 Box::new(BINDING_REQUEST),
287 Box::new(TransactionId::new()),
288 Box::new(Username::new(ATTR_USERNAME, username)),
289 Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
290 Box::new(PriorityAttr(local.priority())),
291 Box::new(MessageIntegrity::new_short_term_integrity(
292 ufrag_pwd.remote_pwd.clone(),
293 )),
294 Box::new(FINGERPRINT),
295 ]);
296 (msg, result)
297 };
298
299 if let Err(err) = result {
300 log::error!("{err}");
301 } else {
302 self.send_binding_request(&msg, local, remote).await;
303 }
304 }
305
306 async fn handle_success_response(
307 &self,
308 m: &Message,
309 local: &Arc<dyn Candidate + Send + Sync>,
310 remote: &Arc<dyn Candidate + Send + Sync>,
311 remote_addr: SocketAddr,
312 ) {
313 if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
314 let transaction_addr = pending_request.destination;
315
316 if transaction_addr != remote_addr {
319 log::debug!("discard message: transaction source and destination does not match expected({transaction_addr}), actual({remote})");
320 return;
321 }
322
323 log::trace!("inbound STUN (SuccessResponse) from {remote} to {local}");
324 let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none();
325
326 if let Some(p) = self.find_pair(local, remote).await {
327 p.state
328 .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
329 log::trace!(
330 "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}",
331 p,
332 p.state.load(Ordering::SeqCst),
333 pending_request.is_use_candidate,
334 selected_pair_is_none
335 );
336 if pending_request.is_use_candidate && selected_pair_is_none {
337 self.set_selected_pair(Some(Arc::clone(&p))).await;
338 }
339 } else {
340 log::error!("Success response from invalid candidate pair");
342 }
343 } else {
344 log::warn!(
345 "discard message from ({}), unknown TransactionID 0x{:?}",
346 remote,
347 m.transaction_id
348 );
349 }
350 }
351
352 async fn handle_binding_request(
353 &self,
354 m: &Message,
355 local: &Arc<dyn Candidate + Send + Sync>,
356 remote: &Arc<dyn Candidate + Send + Sync>,
357 ) {
358 self.send_binding_success(m, local, remote).await;
359 log::trace!("controllingSelector: sendBindingSuccess");
360
361 if let Some(p) = self.find_pair(local, remote).await {
362 let nominated_pair_is_none = {
363 let nominated_pair = self.nominated_pair.lock().await;
364 nominated_pair.is_none()
365 };
366
367 log::trace!(
368 "controllingSelector: after findPair {}, p.state: {}, {}",
369 p,
370 p.state.load(Ordering::SeqCst),
371 nominated_pair_is_none,
372 );
374 if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8
375 && nominated_pair_is_none
376 && self.agent_conn.get_selected_pair().is_none()
377 {
378 if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair().await {
379 log::trace!("controllingSelector: getBestAvailableCandidatePair {best_pair}");
380 if best_pair == p
381 && self.is_nominatable(&p.local)
382 && self.is_nominatable(&p.remote)
383 {
384 log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated",
385 p.local, p.remote);
386 {
387 let mut nominated_pair = self.nominated_pair.lock().await;
388 *nominated_pair = Some(p);
389 }
390 self.nominate_pair().await;
391 }
392 } else {
393 log::trace!("No best pair available");
394 }
395 }
396 } else {
397 log::trace!("controllingSelector: addPair");
398 self.add_pair(local.clone(), remote.clone()).await;
399 }
400 }
401}
402
403#[async_trait]
404impl ControlledSelector for AgentInternal {
405 async fn start(&self) {}
406
407 async fn contact_candidates(&self) {
408 if self.lite.load(Ordering::SeqCst) {
410 self.validate_selected_pair().await;
411 } else if self.agent_conn.get_selected_pair().is_some() {
412 if self.validate_selected_pair().await {
413 log::trace!("[{}]: checking keepalive", self.get_name());
414 self.check_keepalive().await;
415 }
416 } else {
417 self.ping_all_candidates().await;
418 }
419 }
420
421 async fn ping_candidate(
422 &self,
423 local: &Arc<dyn Candidate + Send + Sync>,
424 remote: &Arc<dyn Candidate + Send + Sync>,
425 ) {
426 let (msg, result) = {
427 let ufrag_pwd = self.ufrag_pwd.lock().await;
428 let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
429 let mut msg = Message::new();
430 let result = msg.build(&[
431 Box::new(BINDING_REQUEST),
432 Box::new(TransactionId::new()),
433 Box::new(Username::new(ATTR_USERNAME, username)),
434 Box::new(AttrControlled(self.tie_breaker.load(Ordering::SeqCst))),
435 Box::new(PriorityAttr(local.priority())),
436 Box::new(MessageIntegrity::new_short_term_integrity(
437 ufrag_pwd.remote_pwd.clone(),
438 )),
439 Box::new(FINGERPRINT),
440 ]);
441 (msg, result)
442 };
443
444 if let Err(err) = result {
445 log::error!("{err}");
446 } else {
447 self.send_binding_request(&msg, local, remote).await;
448 }
449 }
450
451 async fn handle_success_response(
452 &self,
453 m: &Message,
454 local: &Arc<dyn Candidate + Send + Sync>,
455 remote: &Arc<dyn Candidate + Send + Sync>,
456 remote_addr: SocketAddr,
457 ) {
458 if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
465 let transaction_addr = pending_request.destination;
466
467 if transaction_addr != remote_addr {
470 log::debug!("discard message: transaction source and destination does not match expected({transaction_addr}), actual({remote})");
471 return;
472 }
473
474 log::trace!("inbound STUN (SuccessResponse) from {remote} to {local}");
475
476 if let Some(p) = self.find_pair(local, remote).await {
477 p.state
478 .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
479 log::trace!("Found valid candidate pair: {p}");
480
481 if p.nominate_on_binding_success.load(Ordering::SeqCst)
482 && self.agent_conn.get_selected_pair().is_none()
483 {
484 self.set_selected_pair(Some(Arc::clone(&p))).await;
485 }
486 } else {
487 log::error!("Success response from invalid candidate pair");
489 }
490 } else {
491 log::warn!(
492 "discard message from ({}), unknown TransactionID 0x{:?}",
493 remote,
494 m.transaction_id
495 );
496 }
497 }
498
499 async fn handle_binding_request(
500 &self,
501 m: &Message,
502 local: &Arc<dyn Candidate + Send + Sync>,
503 remote: &Arc<dyn Candidate + Send + Sync>,
504 ) {
505 if self.find_pair(local, remote).await.is_none() {
506 self.add_pair(local.clone(), remote.clone()).await;
507 }
508
509 if let Some(p) = self.find_pair(local, remote).await {
510 let use_candidate = m.contains(ATTR_USE_CANDIDATE);
511 if use_candidate {
512 if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 {
515 if self.agent_conn.get_selected_pair().is_none() {
520 self.set_selected_pair(Some(Arc::clone(&p))).await;
521 }
522 } else {
523 p.nominate_on_binding_success.store(true, Ordering::SeqCst);
532 }
533 }
534
535 self.send_binding_success(m, local, remote).await;
536 self.ping_candidate(local, remote).await;
537 }
538 }
539}