webrtc_ice/agent/
agent_selector.rs

1use std::net::SocketAddr;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use stun::agent::*;
7use stun::attributes::*;
8use stun::fingerprint::*;
9use stun::integrity::*;
10use stun::message::*;
11use stun::textattrs::*;
12use tokio::time::{Duration, Instant};
13
14use crate::agent::agent_internal::*;
15use crate::candidate::*;
16use crate::control::*;
17use crate::priority::*;
18use crate::use_candidate::*;
19
20#[async_trait]
21trait ControllingSelector {
22    async fn start(&self);
23    async fn contact_candidates(&self);
24    async fn ping_candidate(
25        &self,
26        local: &Arc<dyn Candidate + Send + Sync>,
27        remote: &Arc<dyn Candidate + Send + Sync>,
28    );
29    async fn handle_success_response(
30        &self,
31        m: &Message,
32        local: &Arc<dyn Candidate + Send + Sync>,
33        remote: &Arc<dyn Candidate + Send + Sync>,
34        remote_addr: SocketAddr,
35    );
36    async fn handle_binding_request(
37        &self,
38        m: &Message,
39        local: &Arc<dyn Candidate + Send + Sync>,
40        remote: &Arc<dyn Candidate + Send + Sync>,
41    );
42}
43
44#[async_trait]
45trait ControlledSelector {
46    async fn start(&self);
47    async fn contact_candidates(&self);
48    async fn ping_candidate(
49        &self,
50        local: &Arc<dyn Candidate + Send + Sync>,
51        remote: &Arc<dyn Candidate + Send + Sync>,
52    );
53    async fn handle_success_response(
54        &self,
55        m: &Message,
56        local: &Arc<dyn Candidate + Send + Sync>,
57        remote: &Arc<dyn Candidate + Send + Sync>,
58        remote_addr: SocketAddr,
59    );
60    async fn handle_binding_request(
61        &self,
62        m: &Message,
63        local: &Arc<dyn Candidate + Send + Sync>,
64        remote: &Arc<dyn Candidate + Send + Sync>,
65    );
66}
67
68impl AgentInternal {
69    fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool {
70        let start_time = *self.start_time.lock();
71        match c.candidate_type() {
72            CandidateType::Host => {
73                Instant::now()
74                    .checked_duration_since(start_time)
75                    .unwrap_or_else(|| Duration::from_secs(0))
76                    .as_nanos()
77                    > self.host_acceptance_min_wait.as_nanos()
78            }
79            CandidateType::ServerReflexive => {
80                Instant::now()
81                    .checked_duration_since(start_time)
82                    .unwrap_or_else(|| Duration::from_secs(0))
83                    .as_nanos()
84                    > self.srflx_acceptance_min_wait.as_nanos()
85            }
86            CandidateType::PeerReflexive => {
87                Instant::now()
88                    .checked_duration_since(start_time)
89                    .unwrap_or_else(|| Duration::from_secs(0))
90                    .as_nanos()
91                    > self.prflx_acceptance_min_wait.as_nanos()
92            }
93            CandidateType::Relay => {
94                Instant::now()
95                    .checked_duration_since(start_time)
96                    .unwrap_or_else(|| Duration::from_secs(0))
97                    .as_nanos()
98                    > self.relay_acceptance_min_wait.as_nanos()
99            }
100            CandidateType::Unspecified => {
101                log::error!(
102                    "is_nominatable invalid candidate type {}",
103                    c.candidate_type()
104                );
105                false
106            }
107        }
108    }
109
110    async fn nominate_pair(&self) {
111        let result = {
112            let nominated_pair = self.nominated_pair.lock().await;
113            if let Some(pair) = &*nominated_pair {
114                // The controlling agent MUST include the USE-CANDIDATE attribute in
115                // order to nominate a candidate pair (Section 8.1.1).  The controlled
116                // agent MUST NOT include the USE-CANDIDATE attribute in a Binding
117                // request.
118
119                let (msg, result) = {
120                    let ufrag_pwd = self.ufrag_pwd.lock().await;
121                    let username =
122                        ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
123                    let mut msg = Message::new();
124                    let result = msg.build(&[
125                        Box::new(BINDING_REQUEST),
126                        Box::new(TransactionId::new()),
127                        Box::new(Username::new(ATTR_USERNAME, username)),
128                        Box::<UseCandidateAttr>::default(),
129                        Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
130                        Box::new(PriorityAttr(pair.local.priority())),
131                        Box::new(MessageIntegrity::new_short_term_integrity(
132                            ufrag_pwd.remote_pwd.clone(),
133                        )),
134                        Box::new(FINGERPRINT),
135                    ]);
136                    (msg, result)
137                };
138
139                if let Err(err) = result {
140                    log::error!("{err}");
141                    None
142                } else {
143                    log::trace!(
144                        "ping STUN (nominate candidate pair from {} to {}",
145                        pair.local,
146                        pair.remote
147                    );
148                    let local = pair.local.clone();
149                    let remote = pair.remote.clone();
150                    Some((msg, local, remote))
151                }
152            } else {
153                None
154            }
155        };
156
157        if let Some((msg, local, remote)) = result {
158            self.send_binding_request(&msg, &local, &remote).await;
159        }
160    }
161
162    pub(crate) async fn start(&self) {
163        if self.is_controlling.load(Ordering::SeqCst) {
164            ControllingSelector::start(self).await;
165        } else {
166            ControlledSelector::start(self).await;
167        }
168    }
169
170    pub(crate) async fn contact_candidates(&self) {
171        if self.is_controlling.load(Ordering::SeqCst) {
172            ControllingSelector::contact_candidates(self).await;
173        } else {
174            ControlledSelector::contact_candidates(self).await;
175        }
176    }
177
178    pub(crate) async fn ping_candidate(
179        &self,
180        local: &Arc<dyn Candidate + Send + Sync>,
181        remote: &Arc<dyn Candidate + Send + Sync>,
182    ) {
183        if self.is_controlling.load(Ordering::SeqCst) {
184            ControllingSelector::ping_candidate(self, local, remote).await;
185        } else {
186            ControlledSelector::ping_candidate(self, local, remote).await;
187        }
188    }
189
190    pub(crate) async fn handle_success_response(
191        &self,
192        m: &Message,
193        local: &Arc<dyn Candidate + Send + Sync>,
194        remote: &Arc<dyn Candidate + Send + Sync>,
195        remote_addr: SocketAddr,
196    ) {
197        if self.is_controlling.load(Ordering::SeqCst) {
198            ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await;
199        } else {
200            ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await;
201        }
202    }
203
204    pub(crate) async fn handle_binding_request(
205        &self,
206        m: &Message,
207        local: &Arc<dyn Candidate + Send + Sync>,
208        remote: &Arc<dyn Candidate + Send + Sync>,
209    ) {
210        if self.is_controlling.load(Ordering::SeqCst) {
211            ControllingSelector::handle_binding_request(self, m, local, remote).await;
212        } else {
213            ControlledSelector::handle_binding_request(self, m, local, remote).await;
214        }
215    }
216}
217
218#[async_trait]
219impl ControllingSelector for AgentInternal {
220    async fn start(&self) {
221        {
222            let mut nominated_pair = self.nominated_pair.lock().await;
223            *nominated_pair = None;
224        }
225        *self.start_time.lock() = Instant::now();
226    }
227
228    async fn contact_candidates(&self) {
229        // A lite selector should not contact candidates
230        if self.lite.load(Ordering::SeqCst) {
231            // This only happens if both peers are lite. See RFC 8445 S6.1.1 and S6.2
232            log::trace!("now falling back to full agent");
233        }
234
235        let nominated_pair_is_some = {
236            let nominated_pair = self.nominated_pair.lock().await;
237            nominated_pair.is_some()
238        };
239
240        if self.agent_conn.get_selected_pair().is_some() {
241            if self.validate_selected_pair().await {
242                log::trace!("[{}]: checking keepalive", self.get_name());
243                self.check_keepalive().await;
244            }
245        } else if nominated_pair_is_some {
246            self.nominate_pair().await;
247        } else {
248            let has_nominated_pair =
249                if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
250                    self.is_nominatable(&p.local) && self.is_nominatable(&p.remote)
251                } else {
252                    false
253                };
254
255            if has_nominated_pair {
256                if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
257                    log::trace!(
258                        "Nominatable pair found, nominating ({}, {})",
259                        p.local,
260                        p.remote
261                    );
262                    p.nominated.store(true, Ordering::SeqCst);
263                    {
264                        let mut nominated_pair = self.nominated_pair.lock().await;
265                        *nominated_pair = Some(p);
266                    }
267                }
268
269                self.nominate_pair().await;
270            } else {
271                self.ping_all_candidates().await;
272            }
273        }
274    }
275
276    async fn ping_candidate(
277        &self,
278        local: &Arc<dyn Candidate + Send + Sync>,
279        remote: &Arc<dyn Candidate + Send + Sync>,
280    ) {
281        let (msg, result) = {
282            let ufrag_pwd = self.ufrag_pwd.lock().await;
283            let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
284            let mut msg = Message::new();
285            let result = msg.build(&[
286                Box::new(BINDING_REQUEST),
287                Box::new(TransactionId::new()),
288                Box::new(Username::new(ATTR_USERNAME, username)),
289                Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
290                Box::new(PriorityAttr(local.priority())),
291                Box::new(MessageIntegrity::new_short_term_integrity(
292                    ufrag_pwd.remote_pwd.clone(),
293                )),
294                Box::new(FINGERPRINT),
295            ]);
296            (msg, result)
297        };
298
299        if let Err(err) = result {
300            log::error!("{err}");
301        } else {
302            self.send_binding_request(&msg, local, remote).await;
303        }
304    }
305
306    async fn handle_success_response(
307        &self,
308        m: &Message,
309        local: &Arc<dyn Candidate + Send + Sync>,
310        remote: &Arc<dyn Candidate + Send + Sync>,
311        remote_addr: SocketAddr,
312    ) {
313        if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
314            let transaction_addr = pending_request.destination;
315
316            // Assert that NAT is not symmetric
317            // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1
318            if transaction_addr != remote_addr {
319                log::debug!("discard message: transaction source and destination does not match expected({transaction_addr}), actual({remote})");
320                return;
321            }
322
323            log::trace!("inbound STUN (SuccessResponse) from {remote} to {local}");
324            let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none();
325
326            if let Some(p) = self.find_pair(local, remote).await {
327                p.state
328                    .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
329                log::trace!(
330                    "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}",
331                    p,
332                    p.state.load(Ordering::SeqCst),
333                    pending_request.is_use_candidate,
334                    selected_pair_is_none
335                );
336                if pending_request.is_use_candidate && selected_pair_is_none {
337                    self.set_selected_pair(Some(Arc::clone(&p))).await;
338                }
339            } else {
340                // This shouldn't happen
341                log::error!("Success response from invalid candidate pair");
342            }
343        } else {
344            log::warn!(
345                "discard message from ({}), unknown TransactionID 0x{:?}",
346                remote,
347                m.transaction_id
348            );
349        }
350    }
351
352    async fn handle_binding_request(
353        &self,
354        m: &Message,
355        local: &Arc<dyn Candidate + Send + Sync>,
356        remote: &Arc<dyn Candidate + Send + Sync>,
357    ) {
358        self.send_binding_success(m, local, remote).await;
359        log::trace!("controllingSelector: sendBindingSuccess");
360
361        if let Some(p) = self.find_pair(local, remote).await {
362            let nominated_pair_is_none = {
363                let nominated_pair = self.nominated_pair.lock().await;
364                nominated_pair.is_none()
365            };
366
367            log::trace!(
368                "controllingSelector: after findPair {}, p.state: {}, {}",
369                p,
370                p.state.load(Ordering::SeqCst),
371                nominated_pair_is_none,
372                //self.agent_conn.get_selected_pair().await.is_none() //, {}
373            );
374            if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8
375                && nominated_pair_is_none
376                && self.agent_conn.get_selected_pair().is_none()
377            {
378                if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair().await {
379                    log::trace!("controllingSelector: getBestAvailableCandidatePair {best_pair}");
380                    if best_pair == p
381                        && self.is_nominatable(&p.local)
382                        && self.is_nominatable(&p.remote)
383                    {
384                        log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated",
385                            p.local, p.remote);
386                        {
387                            let mut nominated_pair = self.nominated_pair.lock().await;
388                            *nominated_pair = Some(p);
389                        }
390                        self.nominate_pair().await;
391                    }
392                } else {
393                    log::trace!("No best pair available");
394                }
395            }
396        } else {
397            log::trace!("controllingSelector: addPair");
398            self.add_pair(local.clone(), remote.clone()).await;
399        }
400    }
401}
402
403#[async_trait]
404impl ControlledSelector for AgentInternal {
405    async fn start(&self) {}
406
407    async fn contact_candidates(&self) {
408        // A lite selector should not contact candidates
409        if self.lite.load(Ordering::SeqCst) {
410            self.validate_selected_pair().await;
411        } else if self.agent_conn.get_selected_pair().is_some() {
412            if self.validate_selected_pair().await {
413                log::trace!("[{}]: checking keepalive", self.get_name());
414                self.check_keepalive().await;
415            }
416        } else {
417            self.ping_all_candidates().await;
418        }
419    }
420
421    async fn ping_candidate(
422        &self,
423        local: &Arc<dyn Candidate + Send + Sync>,
424        remote: &Arc<dyn Candidate + Send + Sync>,
425    ) {
426        let (msg, result) = {
427            let ufrag_pwd = self.ufrag_pwd.lock().await;
428            let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
429            let mut msg = Message::new();
430            let result = msg.build(&[
431                Box::new(BINDING_REQUEST),
432                Box::new(TransactionId::new()),
433                Box::new(Username::new(ATTR_USERNAME, username)),
434                Box::new(AttrControlled(self.tie_breaker.load(Ordering::SeqCst))),
435                Box::new(PriorityAttr(local.priority())),
436                Box::new(MessageIntegrity::new_short_term_integrity(
437                    ufrag_pwd.remote_pwd.clone(),
438                )),
439                Box::new(FINGERPRINT),
440            ]);
441            (msg, result)
442        };
443
444        if let Err(err) = result {
445            log::error!("{err}");
446        } else {
447            self.send_binding_request(&msg, local, remote).await;
448        }
449    }
450
451    async fn handle_success_response(
452        &self,
453        m: &Message,
454        local: &Arc<dyn Candidate + Send + Sync>,
455        remote: &Arc<dyn Candidate + Send + Sync>,
456        remote_addr: SocketAddr,
457    ) {
458        // https://tools.ietf.org/html/rfc8445#section-7.3.1.5
459        // If the controlled agent does not accept the request from the
460        // controlling agent, the controlled agent MUST reject the nomination
461        // request with an appropriate error code response (e.g., 400)
462        // [RFC5389].
463
464        if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
465            let transaction_addr = pending_request.destination;
466
467            // Assert that NAT is not symmetric
468            // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1
469            if transaction_addr != remote_addr {
470                log::debug!("discard message: transaction source and destination does not match expected({transaction_addr}), actual({remote})");
471                return;
472            }
473
474            log::trace!("inbound STUN (SuccessResponse) from {remote} to {local}");
475
476            if let Some(p) = self.find_pair(local, remote).await {
477                p.state
478                    .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
479                log::trace!("Found valid candidate pair: {p}");
480
481                if p.nominate_on_binding_success.load(Ordering::SeqCst)
482                    && self.agent_conn.get_selected_pair().is_none()
483                {
484                    self.set_selected_pair(Some(Arc::clone(&p))).await;
485                }
486            } else {
487                // This shouldn't happen
488                log::error!("Success response from invalid candidate pair");
489            }
490        } else {
491            log::warn!(
492                "discard message from ({}), unknown TransactionID 0x{:?}",
493                remote,
494                m.transaction_id
495            );
496        }
497    }
498
499    async fn handle_binding_request(
500        &self,
501        m: &Message,
502        local: &Arc<dyn Candidate + Send + Sync>,
503        remote: &Arc<dyn Candidate + Send + Sync>,
504    ) {
505        if self.find_pair(local, remote).await.is_none() {
506            self.add_pair(local.clone(), remote.clone()).await;
507        }
508
509        if let Some(p) = self.find_pair(local, remote).await {
510            let use_candidate = m.contains(ATTR_USE_CANDIDATE);
511            if use_candidate {
512                // https://tools.ietf.org/html/rfc8445#section-7.3.1.5
513
514                if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 {
515                    // If the state of this pair is Succeeded, it means that the check
516                    // previously sent by this pair produced a successful response and
517                    // generated a valid pair (Section 7.2.5.3.2).  The agent sets the
518                    // nominated flag value of the valid pair to true.
519                    if self.agent_conn.get_selected_pair().is_none() {
520                        self.set_selected_pair(Some(Arc::clone(&p))).await;
521                    }
522                } else {
523                    // If the received Binding request triggered a new check to be
524                    // enqueued in the triggered-check queue (Section 7.3.1.4), once the
525                    // check is sent and if it generates a successful response, and
526                    // generates a valid pair, the agent sets the nominated flag of the
527                    // pair to true.  If the request fails (Section 7.2.5.2), the agent
528                    // MUST remove the candidate pair from the valid list, set the
529                    // candidate pair state to Failed, and set the checklist state to
530                    // Failed.
531                    p.nominate_on_binding_success.store(true, Ordering::SeqCst);
532                }
533            }
534
535            self.send_binding_success(m, local, remote).await;
536            self.ping_candidate(local, remote).await;
537        }
538    }
539}