rtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use sansio::Protocol;
13use std::collections::VecDeque;
14use std::net::{Ipv4Addr, SocketAddr};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use stun::attributes::*;
18use stun::fingerprint::*;
19use stun::integrity::*;
20use stun::message::*;
21use stun::textattrs::*;
22use stun::xoraddr::*;
23
24use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
25use crate::candidate::{candidate_pair::*, *};
26use crate::network_type::NetworkType;
27use crate::rand::*;
28use crate::state::*;
29use crate::url::*;
30use shared::error::*;
31use shared::{TransportContext, TransportMessage, TransportProtocol};
32
33const ZERO_DURATION: Duration = Duration::from_secs(0);
34
35#[derive(Debug, Clone)]
36pub(crate) struct BindingRequest {
37    pub(crate) timestamp: Instant,
38    pub(crate) transaction_id: TransactionId,
39    pub(crate) destination: SocketAddr,
40    pub(crate) is_use_candidate: bool,
41}
42
43impl Default for BindingRequest {
44    fn default() -> Self {
45        Self {
46            timestamp: Instant::now(),
47            transaction_id: TransactionId::default(),
48            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
49            is_use_candidate: false,
50        }
51    }
52}
53
54#[derive(Default, Clone)]
55pub struct Credentials {
56    pub ufrag: String,
57    pub pwd: String,
58}
59
60#[derive(Default, Clone)]
61pub(crate) struct UfragPwd {
62    pub(crate) local_credentials: Credentials,
63    pub(crate) remote_credentials: Option<Credentials>,
64}
65
66fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
67    let mut username = Username::new(ATTR_USERNAME, String::new());
68    username.get_from(m)?;
69
70    if username.to_string() != expected_username {
71        return Err(Error::Other(format!(
72            "{:?} expected({}) actual({})",
73            Error::ErrMismatchUsername,
74            expected_username,
75            username,
76        )));
77    }
78
79    Ok(())
80}
81
82fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
83    let message_integrity_attr = MessageIntegrity(key.to_vec());
84    message_integrity_attr.check(m)
85}
86
87pub enum Event {
88    ConnectionStateChange(ConnectionState),
89    SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
90}
91
92/// Represents the ICE agent.
93pub struct Agent {
94    pub(crate) tie_breaker: u64,
95    pub(crate) is_controlling: bool,
96    pub(crate) lite: bool,
97
98    pub(crate) start_time: Instant,
99
100    pub(crate) connection_state: ConnectionState,
101    pub(crate) last_connection_state: ConnectionState,
102
103    //pub(crate) started_ch_tx: Mutex<Option<broadcast::Sender<()>>>,
104    pub(crate) ufrag_pwd: UfragPwd,
105
106    pub(crate) local_candidates: Vec<Candidate>,
107    pub(crate) remote_candidates: Vec<Candidate>,
108    pub(crate) candidate_pairs: Vec<CandidatePair>,
109    pub(crate) nominated_pair: Option<usize>,
110    pub(crate) selected_pair: Option<usize>,
111
112    // LRU of outbound Binding request Transaction IDs
113    pub(crate) pending_binding_requests: Vec<BindingRequest>,
114
115    // the following variables won't be changed after init_with_defaults()
116    pub(crate) insecure_skip_verify: bool,
117    pub(crate) max_binding_requests: u16,
118    pub(crate) host_acceptance_min_wait: Duration,
119    pub(crate) srflx_acceptance_min_wait: Duration,
120    pub(crate) prflx_acceptance_min_wait: Duration,
121    pub(crate) relay_acceptance_min_wait: Duration,
122    // How long connectivity checks can fail before the ICE Agent
123    // goes to disconnected
124    pub(crate) disconnected_timeout: Duration,
125    // How long connectivity checks can fail before the ICE Agent
126    // goes to failed
127    pub(crate) failed_timeout: Duration,
128    // How often should we send keepalive packets?
129    // 0 means never
130    pub(crate) keepalive_interval: Duration,
131    // How often should we run our internal taskLoop to check for state changes when connecting
132    pub(crate) check_interval: Duration,
133    pub(crate) checking_duration: Instant,
134    pub(crate) last_checking_time: Instant,
135
136    pub(crate) candidate_types: Vec<CandidateType>,
137    pub(crate) urls: Vec<Url>,
138
139    pub(crate) write_outs: VecDeque<TransportMessage<BytesMut>>,
140    pub(crate) event_outs: VecDeque<Event>,
141}
142
143impl Default for Agent {
144    fn default() -> Self {
145        Self {
146            tie_breaker: 0,
147            is_controlling: false,
148            lite: false,
149            start_time: Instant::now(),
150            connection_state: Default::default(),
151            last_connection_state: Default::default(),
152            ufrag_pwd: Default::default(),
153            local_candidates: vec![],
154            remote_candidates: vec![],
155            candidate_pairs: vec![],
156            nominated_pair: None,
157            selected_pair: None,
158            pending_binding_requests: vec![],
159            insecure_skip_verify: false,
160            max_binding_requests: 0,
161            host_acceptance_min_wait: Default::default(),
162            srflx_acceptance_min_wait: Default::default(),
163            prflx_acceptance_min_wait: Default::default(),
164            relay_acceptance_min_wait: Default::default(),
165            disconnected_timeout: Default::default(),
166            failed_timeout: Default::default(),
167            keepalive_interval: Default::default(),
168            check_interval: Default::default(),
169            checking_duration: Instant::now(),
170            last_checking_time: Instant::now(),
171            candidate_types: vec![],
172            urls: vec![],
173            write_outs: Default::default(),
174            event_outs: Default::default(),
175        }
176    }
177}
178
179impl Agent {
180    /// Creates a new Agent.
181    pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
182        let candidate_types = if config.candidate_types.is_empty() {
183            default_candidate_types()
184        } else {
185            config.candidate_types.clone()
186        };
187
188        if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
189        {
190            return Err(Error::ErrLiteUsingNonHostCandidates);
191        }
192
193        if !config.urls.is_empty()
194            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
195            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
196        {
197            return Err(Error::ErrUselessUrlsProvided);
198        }
199
200        let mut agent = Self {
201            tie_breaker: rand::random::<u64>(),
202            is_controlling: config.is_controlling,
203            lite: config.lite,
204
205            start_time: Instant::now(),
206
207            nominated_pair: None,
208            selected_pair: None,
209            candidate_pairs: vec![],
210
211            connection_state: ConnectionState::New,
212
213            insecure_skip_verify: config.insecure_skip_verify,
214
215            //started_ch_tx: MuteSome(started_ch_tx)),
216
217            //won't change after init_with_defaults()
218            max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
219                max_binding_requests
220            } else {
221                DEFAULT_MAX_BINDING_REQUESTS
222            },
223            host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
224                config.host_acceptance_min_wait
225            {
226                host_acceptance_min_wait
227            } else {
228                DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
229            },
230            srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
231                config.srflx_acceptance_min_wait
232            {
233                srflx_acceptance_min_wait
234            } else {
235                DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
236            },
237            prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
238                config.prflx_acceptance_min_wait
239            {
240                prflx_acceptance_min_wait
241            } else {
242                DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
243            },
244            relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
245                config.relay_acceptance_min_wait
246            {
247                relay_acceptance_min_wait
248            } else {
249                DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
250            },
251
252            // How long connectivity checks can fail before the ICE Agent
253            // goes to disconnected
254            disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
255                disconnected_timeout
256            } else {
257                DEFAULT_DISCONNECTED_TIMEOUT
258            },
259
260            // How long connectivity checks can fail before the ICE Agent
261            // goes to failed
262            failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
263                failed_timeout
264            } else {
265                DEFAULT_FAILED_TIMEOUT
266            },
267
268            // How often should we send keepalive packets?
269            // 0 means never
270            keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
271                keepalive_interval
272            } else {
273                DEFAULT_KEEPALIVE_INTERVAL
274            },
275
276            // How often should we run our internal taskLoop to check for state changes when connecting
277            check_interval: if config.check_interval == Duration::from_secs(0) {
278                DEFAULT_CHECK_INTERVAL
279            } else {
280                config.check_interval
281            },
282            checking_duration: Instant::now(),
283            last_checking_time: Instant::now(),
284            last_connection_state: ConnectionState::Unspecified,
285
286            ufrag_pwd: UfragPwd::default(),
287
288            local_candidates: vec![],
289            remote_candidates: vec![],
290
291            // LRU of outbound Binding request Transaction IDs
292            pending_binding_requests: vec![],
293
294            candidate_types,
295            urls: config.urls.clone(),
296
297            write_outs: VecDeque::new(),
298            event_outs: VecDeque::new(),
299        };
300
301        // Restart is also used to initialize the agent for the first time
302        if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
303        {
304            let _ = agent.close();
305            return Err(err);
306        }
307
308        Ok(agent)
309    }
310
311    /// Adds a new local candidate.
312    pub fn add_local_candidate(&mut self, c: Candidate) -> Result<()> {
313        for cand in &self.local_candidates {
314            if cand.equal(&c) {
315                return Ok(());
316            }
317        }
318
319        self.local_candidates.push(c);
320
321        for remote_index in 0..self.remote_candidates.len() {
322            self.add_pair(self.local_candidates.len() - 1, remote_index);
323        }
324
325        self.request_connectivity_check();
326
327        Ok(())
328    }
329
330    /// Adds a new remote candidate.
331    pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
332        // If we have a mDNS Candidate lets fully resolve it before adding it locally
333        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
334            warn!(
335                "remote mDNS candidate added, but mDNS is disabled: ({})",
336                c.address()
337            );
338            return Err(Error::ErrMulticastDnsNotSupported);
339        }
340
341        for cand in &self.remote_candidates {
342            if cand.equal(&c) {
343                return Ok(());
344            }
345        }
346
347        self.remote_candidates.push(c);
348
349        for local_index in 0..self.local_candidates.len() {
350            self.add_pair(local_index, self.remote_candidates.len() - 1);
351        }
352
353        self.request_connectivity_check();
354
355        Ok(())
356    }
357
358    /// Sets the credentials of the remote agent.
359    pub fn set_remote_credentials(
360        &mut self,
361        remote_ufrag: String,
362        remote_pwd: String,
363    ) -> Result<()> {
364        if remote_ufrag.is_empty() {
365            return Err(Error::ErrRemoteUfragEmpty);
366        } else if remote_pwd.is_empty() {
367            return Err(Error::ErrRemotePwdEmpty);
368        }
369
370        self.ufrag_pwd.remote_credentials = Some(Credentials {
371            ufrag: remote_ufrag,
372            pwd: remote_pwd,
373        });
374
375        Ok(())
376    }
377
378    /// Returns the remote credentials.
379    pub fn get_remote_credentials(&self) -> Option<&Credentials> {
380        self.ufrag_pwd.remote_credentials.as_ref()
381    }
382
383    /// Returns the local credentials.
384    pub fn get_local_credentials(&self) -> &Credentials {
385        &self.ufrag_pwd.local_credentials
386    }
387
388    pub fn role(&self) -> bool {
389        self.is_controlling
390    }
391
392    pub fn set_role(&mut self, is_controlling: bool) {
393        self.is_controlling = is_controlling;
394    }
395
396    pub fn state(&self) -> ConnectionState {
397        self.connection_state
398    }
399
400    pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
401        self.find_local_candidate(transport.local_addr, transport.transport_protocol)
402            .is_some()
403            && self.validate_non_stun_traffic(transport.peer_addr)
404    }
405
406    fn get_timeout_interval(&self) -> Duration {
407        let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
408            self.check_interval,
409            self.keepalive_interval,
410            self.disconnected_timeout,
411            self.failed_timeout,
412        );
413        let mut interval = DEFAULT_CHECK_INTERVAL;
414
415        let mut update_interval = |x: Duration| {
416            if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
417                interval = x;
418            }
419        };
420
421        match self.last_connection_state {
422            ConnectionState::New | ConnectionState::Checking => {
423                // While connecting, check candidates more frequently
424                update_interval(check_interval);
425            }
426            ConnectionState::Connected | ConnectionState::Disconnected => {
427                update_interval(keepalive_interval);
428            }
429            _ => {}
430        };
431        // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
432        update_interval(disconnected_timeout);
433        update_interval(failed_timeout);
434        interval
435    }
436
437    /// Returns the selected pair (local_candidate, remote_candidate) or none
438    pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
439        if let Some(pair_index) = self.get_selected_pair() {
440            let candidate_pair = &self.candidate_pairs[pair_index];
441            Some((
442                &self.local_candidates[candidate_pair.local_index],
443                &self.remote_candidates[candidate_pair.remote_index],
444            ))
445        } else {
446            None
447        }
448    }
449
450    pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
451        if let Some(pair_index) = self.get_best_available_pair() {
452            let candidate_pair = &self.candidate_pairs[pair_index];
453            Some((
454                &self.local_candidates[candidate_pair.local_index],
455                &self.remote_candidates[candidate_pair.remote_index],
456            ))
457        } else {
458            None
459        }
460    }
461
462    /// start connectivity checks
463    pub fn start_connectivity_checks(
464        &mut self,
465        is_controlling: bool,
466        remote_ufrag: String,
467        remote_pwd: String,
468    ) -> Result<()> {
469        debug!(
470            "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
471            is_controlling, remote_ufrag, remote_pwd
472        );
473        self.set_remote_credentials(remote_ufrag, remote_pwd)?;
474        self.is_controlling = is_controlling;
475        self.start();
476
477        self.update_connection_state(ConnectionState::Checking);
478        self.request_connectivity_check();
479
480        Ok(())
481    }
482
483    /// Restarts the ICE Agent with the provided ufrag/pwd
484    /// If no ufrag/pwd is provided the Agent will generate one itself.
485    pub fn restart(
486        &mut self,
487        mut ufrag: String,
488        mut pwd: String,
489        keep_local_candidates: bool,
490    ) -> Result<()> {
491        if ufrag.is_empty() {
492            ufrag = generate_ufrag();
493        }
494        if pwd.is_empty() {
495            pwd = generate_pwd();
496        }
497
498        if ufrag.len() * 8 < 24 {
499            return Err(Error::ErrLocalUfragInsufficientBits);
500        }
501        if pwd.len() * 8 < 128 {
502            return Err(Error::ErrLocalPwdInsufficientBits);
503        }
504
505        // Clear all agent needed to take back to fresh state
506        self.ufrag_pwd.local_credentials.ufrag = ufrag;
507        self.ufrag_pwd.local_credentials.pwd = pwd;
508        self.ufrag_pwd.remote_credentials = None;
509
510        self.pending_binding_requests = vec![];
511
512        self.candidate_pairs = vec![];
513
514        self.set_selected_pair(None);
515        self.delete_all_candidates(keep_local_candidates);
516        self.start();
517
518        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
519        // for new Agents
520        if self.connection_state != ConnectionState::New {
521            self.update_connection_state(ConnectionState::Checking);
522        }
523
524        Ok(())
525    }
526
527    /// Returns the local candidates.
528    pub fn get_local_candidates(&self) -> &[Candidate] {
529        &self.local_candidates
530    }
531
532    fn contact(&mut self, now: Instant) {
533        if self.connection_state == ConnectionState::Failed {
534            // The connection is currently failed so don't send any checks
535            // In the future it may be restarted though
536            self.last_connection_state = self.connection_state;
537            return;
538        }
539        if self.connection_state == ConnectionState::Checking {
540            // We have just entered checking for the first time so update our checking timer
541            if self.last_connection_state != self.connection_state {
542                self.checking_duration = now;
543            }
544
545            // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
546            if now
547                .checked_duration_since(self.checking_duration)
548                .unwrap_or_else(|| Duration::from_secs(0))
549                > self.disconnected_timeout + self.failed_timeout
550            {
551                self.update_connection_state(ConnectionState::Failed);
552                self.last_connection_state = self.connection_state;
553                return;
554            }
555        }
556
557        self.contact_candidates();
558
559        self.last_connection_state = self.connection_state;
560        self.last_checking_time = now;
561    }
562
563    pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
564        if self.connection_state != new_state {
565            // Connection has gone to failed, release all gathered candidates
566            if new_state == ConnectionState::Failed {
567                self.set_selected_pair(None);
568                self.delete_all_candidates(false);
569            }
570
571            info!(
572                "[{}]: Setting new connection state: {}",
573                self.get_name(),
574                new_state
575            );
576            self.connection_state = new_state;
577            self.event_outs
578                .push_back(Event::ConnectionStateChange(new_state));
579        }
580    }
581
582    pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
583        if let Some(pair_index) = selected_pair {
584            trace!(
585                "[{}]: Set selected candidate pair: {:?}",
586                self.get_name(),
587                self.candidate_pairs[pair_index]
588            );
589
590            self.candidate_pairs[pair_index].nominated = true;
591            self.selected_pair = Some(pair_index);
592
593            self.update_connection_state(ConnectionState::Connected);
594
595            // Notify when the selected pair changes
596            let candidate_pair = &self.candidate_pairs[pair_index];
597            self.event_outs
598                .push_back(Event::SelectedCandidatePairChange(
599                    Box::new(self.local_candidates[candidate_pair.local_index].clone()),
600                    Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
601                ));
602        } else {
603            self.selected_pair = None;
604        }
605    }
606
607    pub(crate) fn ping_all_candidates(&mut self) {
608        let mut pairs: Vec<(usize, usize)> = vec![];
609
610        let name = self.get_name().to_string();
611        if self.candidate_pairs.is_empty() {
612            warn!(
613                "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
614                name,
615            );
616        }
617        for p in &mut self.candidate_pairs {
618            if p.state == CandidatePairState::Waiting {
619                p.state = CandidatePairState::InProgress;
620            } else if p.state != CandidatePairState::InProgress {
621                continue;
622            }
623
624            if p.binding_request_count > self.max_binding_requests {
625                trace!(
626                    "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
627                    name,
628                    *p,
629                    self.local_candidates[p.local_index].addr(),
630                    self.remote_candidates[p.remote_index].addr()
631                );
632                p.state = CandidatePairState::Failed;
633            } else {
634                p.binding_request_count += 1;
635                let local = p.local_index;
636                let remote = p.remote_index;
637                pairs.push((local, remote));
638            }
639        }
640
641        if !pairs.is_empty() {
642            trace!(
643                "[{}]: pinging all {} candidates",
644                self.get_name(),
645                pairs.len()
646            );
647        }
648
649        for (local, remote) in pairs {
650            self.ping_candidate(local, remote);
651        }
652    }
653
654    pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
655        let p = CandidatePair::new(
656            local_index,
657            remote_index,
658            self.local_candidates[local_index].priority(),
659            self.remote_candidates[remote_index].priority(),
660            self.is_controlling,
661        );
662        self.candidate_pairs.push(p);
663    }
664
665    pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
666        for (index, p) in self.candidate_pairs.iter().enumerate() {
667            if p.local_index == local_index && p.remote_index == remote_index {
668                return Some(index);
669            }
670        }
671        None
672    }
673
674    /// Checks if the selected pair is (still) valid.
675    /// Note: the caller should hold the agent lock.
676    pub(crate) fn validate_selected_pair(&mut self) -> bool {
677        let (valid, disconnected_time) = {
678            self.selected_pair.as_ref().map_or_else(
679                || (false, Duration::from_secs(0)),
680                |&pair_index| {
681                    let remote_index = self.candidate_pairs[pair_index].remote_index;
682
683                    let disconnected_time = Instant::now()
684                        .duration_since(self.remote_candidates[remote_index].last_received());
685                    (true, disconnected_time)
686                },
687            )
688        };
689
690        if valid {
691            // Only allow transitions to fail if a.failedTimeout is non-zero
692            let mut total_time_to_failure = self.failed_timeout;
693            if total_time_to_failure != Duration::from_secs(0) {
694                total_time_to_failure += self.disconnected_timeout;
695            }
696
697            if total_time_to_failure != Duration::from_secs(0)
698                && disconnected_time > total_time_to_failure
699            {
700                self.update_connection_state(ConnectionState::Failed);
701            } else if self.disconnected_timeout != Duration::from_secs(0)
702                && disconnected_time > self.disconnected_timeout
703            {
704                self.update_connection_state(ConnectionState::Disconnected);
705            } else {
706                self.update_connection_state(ConnectionState::Connected);
707            }
708        }
709
710        valid
711    }
712
713    /// Sends STUN Binding Indications to the selected pair.
714    /// if no packet has been sent on that pair in the last keepaliveInterval.
715    /// Note: the caller should hold the agent lock.
716    pub(crate) fn check_keepalive(&mut self) {
717        let (local_index, remote_index) = {
718            self.selected_pair
719                .as_ref()
720                .map_or((None, None), |&pair_index| {
721                    let p = &self.candidate_pairs[pair_index];
722                    (Some(p.local_index), Some(p.remote_index))
723                })
724        };
725
726        if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
727            let last_sent =
728                Instant::now().duration_since(self.local_candidates[local_index].last_sent());
729
730            let last_received =
731                Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
732
733            if (self.keepalive_interval != Duration::from_secs(0))
734                && ((last_sent > self.keepalive_interval)
735                    || (last_received > self.keepalive_interval))
736            {
737                // we use binding request instead of indication to support refresh consent schemas
738                // see https://tools.ietf.org/html/rfc7675
739                self.ping_candidate(local_index, remote_index);
740            }
741        }
742    }
743
744    fn request_connectivity_check(&mut self) {
745        if self.ufrag_pwd.remote_credentials.is_some() {
746            self.contact(Instant::now());
747        }
748    }
749
750    /// Remove all candidates.
751    /// This closes any listening sockets and removes both the local and remote candidate lists.
752    ///
753    /// This is used for restarts, failures and on close.
754    pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
755        if !keep_local_candidates {
756            self.local_candidates.clear();
757        }
758        self.remote_candidates.clear();
759    }
760
761    pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
762        let (ip, port) = (addr.ip(), addr.port());
763        for (index, c) in self.remote_candidates.iter().enumerate() {
764            if c.address() == ip.to_string() && c.port() == port {
765                return Some(index);
766            }
767        }
768        None
769    }
770
771    pub(crate) fn find_local_candidate(
772        &self,
773        addr: SocketAddr,
774        transport_protocol: TransportProtocol,
775    ) -> Option<usize> {
776        for (index, c) in self.local_candidates.iter().enumerate() {
777            if c.addr() == addr && c.network_type().to_protocol() == transport_protocol {
778                return Some(index);
779            }
780        }
781        None
782    }
783
784    pub(crate) fn send_binding_request(
785        &mut self,
786        m: &Message,
787        local_index: usize,
788        remote_index: usize,
789    ) {
790        trace!(
791            "[{}]: ping STUN from {} to {}",
792            self.get_name(),
793            self.local_candidates[local_index],
794            self.remote_candidates[remote_index],
795        );
796
797        self.invalidate_pending_binding_requests(Instant::now());
798
799        self.pending_binding_requests.push(BindingRequest {
800            timestamp: Instant::now(),
801            transaction_id: m.transaction_id,
802            destination: self.remote_candidates[remote_index].addr(),
803            is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
804        });
805
806        self.send_stun(m, local_index, remote_index);
807    }
808
809    pub(crate) fn send_binding_success(
810        &mut self,
811        m: &Message,
812        local_index: usize,
813        remote_index: usize,
814    ) {
815        let addr = self.remote_candidates[remote_index].addr();
816        let (ip, port) = (addr.ip(), addr.port());
817        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
818
819        let (out, result) = {
820            let mut out = Message::new();
821            let result = out.build(&[
822                Box::new(m.clone()),
823                Box::new(BINDING_SUCCESS),
824                Box::new(XorMappedAddress { ip, port }),
825                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
826                Box::new(FINGERPRINT),
827            ]);
828            (out, result)
829        };
830
831        if let Err(err) = result {
832            warn!(
833                "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
834                self.get_name(),
835                self.local_candidates[local_index],
836                self.remote_candidates[remote_index],
837                err
838            );
839        } else {
840            self.send_stun(&out, local_index, remote_index);
841        }
842    }
843
844    /// Removes pending binding requests that are over `maxBindingRequestTimeout` old Let HTO be the
845    /// transaction timeout, which SHOULD be 2*RTT if RTT is known or 500 ms otherwise.
846    ///
847    /// reference: (IETF ref-8445)[https://tools.ietf.org/html/rfc8445#appendix-B.1].
848    pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
849        let pending_binding_requests = &mut self.pending_binding_requests;
850        let initial_size = pending_binding_requests.len();
851
852        let mut temp = vec![];
853        for binding_request in pending_binding_requests.drain(..) {
854            if filter_time
855                .checked_duration_since(binding_request.timestamp)
856                .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
857                .unwrap_or(true)
858            {
859                temp.push(binding_request);
860            }
861        }
862
863        *pending_binding_requests = temp;
864        let bind_requests_remaining = pending_binding_requests.len();
865        let bind_requests_removed = initial_size - bind_requests_remaining;
866        if bind_requests_removed > 0 {
867            trace!(
868                "[{}]: Discarded {} binding requests because they expired, still {} remaining",
869                self.get_name(),
870                bind_requests_removed,
871                bind_requests_remaining,
872            );
873        }
874    }
875
876    /// Assert that the passed `TransactionID` is in our `pendingBindingRequests` and returns the
877    /// destination, If the bindingRequest was valid remove it from our pending cache.
878    pub(crate) fn handle_inbound_binding_success(
879        &mut self,
880        id: TransactionId,
881    ) -> Option<BindingRequest> {
882        self.invalidate_pending_binding_requests(Instant::now());
883
884        let pending_binding_requests = &mut self.pending_binding_requests;
885        for i in 0..pending_binding_requests.len() {
886            if pending_binding_requests[i].transaction_id == id {
887                let valid_binding_request = pending_binding_requests.remove(i);
888                return Some(valid_binding_request);
889            }
890        }
891        None
892    }
893
894    /// Processes STUN traffic from a remote candidate.
895    pub(crate) fn handle_inbound(
896        &mut self,
897        m: &mut Message,
898        local_index: usize,
899        remote_addr: SocketAddr,
900    ) -> Result<()> {
901        if m.typ.method != METHOD_BINDING
902            || !(m.typ.class == CLASS_SUCCESS_RESPONSE
903                || m.typ.class == CLASS_REQUEST
904                || m.typ.class == CLASS_INDICATION)
905        {
906            trace!(
907                "[{}]: unhandled STUN from {} to {} class({}) method({})",
908                self.get_name(),
909                remote_addr,
910                self.local_candidates[local_index],
911                m.typ.class,
912                m.typ.method
913            );
914            return Err(Error::ErrUnhandledStunpacket);
915        }
916
917        if self.is_controlling {
918            if m.contains(ATTR_ICE_CONTROLLING) {
919                debug!(
920                    "[{}]: inbound isControlling && a.isControlling == true",
921                    self.get_name(),
922                );
923                return Err(Error::ErrUnexpectedStunrequestMessage);
924            } else if m.contains(ATTR_USE_CANDIDATE) {
925                debug!(
926                    "[{}]: useCandidate && a.isControlling == true",
927                    self.get_name(),
928                );
929                return Err(Error::ErrUnexpectedStunrequestMessage);
930            }
931        } else if m.contains(ATTR_ICE_CONTROLLED) {
932            debug!(
933                "[{}]: inbound isControlled && a.isControlling == false",
934                self.get_name(),
935            );
936            return Err(Error::ErrUnexpectedStunrequestMessage);
937        }
938
939        let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
940            debug!(
941                "[{}]: ufrag_pwd.remote_credentials.is_none",
942                self.get_name(),
943            );
944            return Err(Error::ErrPasswordEmpty);
945        };
946
947        let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
948        if m.typ.class == CLASS_SUCCESS_RESPONSE {
949            if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
950            {
951                warn!(
952                    "[{}]: discard message from ({}), {}",
953                    self.get_name(),
954                    remote_addr,
955                    err
956                );
957                return Err(err);
958            }
959
960            if let Some(remote_index) = &remote_candidate_index {
961                self.handle_success_response(m, local_index, *remote_index, remote_addr);
962            } else {
963                warn!(
964                    "[{}]: discard success message from ({}), no such remote",
965                    self.get_name(),
966                    remote_addr
967                );
968                return Err(Error::ErrUnhandledStunpacket);
969            }
970        } else if m.typ.class == CLASS_REQUEST {
971            {
972                let username = self.ufrag_pwd.local_credentials.ufrag.clone()
973                    + ":"
974                    + remote_credentials.ufrag.as_str();
975                if let Err(err) = assert_inbound_username(m, &username) {
976                    warn!(
977                        "[{}]: discard message from ({}), {}",
978                        self.get_name(),
979                        remote_addr,
980                        err
981                    );
982                    return Err(err);
983                } else if let Err(err) = assert_inbound_message_integrity(
984                    m,
985                    self.ufrag_pwd.local_credentials.pwd.as_bytes(),
986                ) {
987                    warn!(
988                        "[{}]: discard message from ({}), {}",
989                        self.get_name(),
990                        remote_addr,
991                        err
992                    );
993                    return Err(err);
994                }
995            }
996
997            if remote_candidate_index.is_none() {
998                let (ip, port, network_type) =
999                    (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
1000
1001                let prflx_candidate_config = CandidatePeerReflexiveConfig {
1002                    base_config: CandidateConfig {
1003                        network: network_type.to_string(),
1004                        address: ip.to_string(),
1005                        port,
1006                        component: self.local_candidates[local_index].component(),
1007                        ..CandidateConfig::default()
1008                    },
1009                    rel_addr: "".to_owned(),
1010                    rel_port: 0,
1011                };
1012
1013                match prflx_candidate_config.new_candidate_peer_reflexive() {
1014                    Ok(prflx_candidate) => {
1015                        let _ = self.add_remote_candidate(prflx_candidate);
1016                        remote_candidate_index = Some(self.remote_candidates.len() - 1);
1017                    }
1018                    Err(err) => {
1019                        error!(
1020                            "[{}]: Failed to create new remote prflx candidate ({})",
1021                            self.get_name(),
1022                            err
1023                        );
1024                        return Err(err);
1025                    }
1026                };
1027
1028                debug!(
1029                    "[{}]: adding a new peer-reflexive candidate: {} ",
1030                    self.get_name(),
1031                    remote_addr
1032                );
1033            }
1034
1035            trace!(
1036                "[{}]: inbound STUN (Request) from {} to {}",
1037                self.get_name(),
1038                remote_addr,
1039                self.local_candidates[local_index]
1040            );
1041
1042            if let Some(remote_index) = &remote_candidate_index {
1043                self.handle_binding_request(m, local_index, *remote_index);
1044            }
1045        }
1046
1047        if let Some(remote_index) = remote_candidate_index {
1048            self.remote_candidates[remote_index].seen(false);
1049        }
1050
1051        Ok(())
1052    }
1053
1054    // Processes non STUN traffic from a remote candidate, and returns true if it is an actual
1055    // remote candidate.
1056    pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1057        self.find_remote_candidate(remote_addr)
1058            .is_some_and(|remote_index| {
1059                self.remote_candidates[remote_index].seen(false);
1060                true
1061            })
1062    }
1063
1064    pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1065        let peer_addr = self.remote_candidates[remote_index].addr();
1066        let local_addr = self.local_candidates[local_index].addr();
1067        let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1068            TransportProtocol::TCP
1069        } else {
1070            TransportProtocol::UDP
1071        };
1072
1073        self.write_outs.push_back(TransportMessage {
1074            now: Instant::now(),
1075            transport: TransportContext {
1076                local_addr,
1077                peer_addr,
1078                ecn: None,
1079                transport_protocol,
1080            },
1081            message: BytesMut::from(&msg.raw[..]),
1082        });
1083
1084        self.local_candidates[local_index].seen(true);
1085    }
1086
1087    fn handle_inbound_candidate_msg(
1088        &mut self,
1089        local_index: usize,
1090        msg: TransportMessage<BytesMut>,
1091    ) -> Result<()> {
1092        if is_stun_message(&msg.message) {
1093            let mut m = Message {
1094                raw: msg.message.to_vec(),
1095                ..Message::default()
1096            };
1097
1098            if let Err(err) = m.decode() {
1099                warn!(
1100                    "[{}]: Failed to handle decode ICE from {} to {}: {}",
1101                    self.get_name(),
1102                    msg.transport.local_addr,
1103                    msg.transport.peer_addr,
1104                    err
1105                );
1106                Err(err)
1107            } else {
1108                self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1109            }
1110        } else {
1111            if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1112                warn!(
1113                    "[{}]: Discarded message, not a valid remote candidate from {}",
1114                    self.get_name(),
1115                    msg.transport.peer_addr,
1116                );
1117            } else {
1118                warn!(
1119                    "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1120                    self.get_name(),
1121                    msg.transport.peer_addr
1122                );
1123            }
1124            Err(Error::ErrNonStunmessage)
1125        }
1126    }
1127
1128    pub(crate) fn get_name(&self) -> &str {
1129        if self.is_controlling {
1130            "controlling"
1131        } else {
1132            "controlled"
1133        }
1134    }
1135
1136    pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1137        self.selected_pair
1138    }
1139
1140    pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1141        let mut best_pair_index: Option<usize> = None;
1142
1143        for (index, p) in self.candidate_pairs.iter().enumerate() {
1144            if p.state == CandidatePairState::Failed {
1145                continue;
1146            }
1147
1148            if let Some(pair_index) = &mut best_pair_index {
1149                let b = &self.candidate_pairs[*pair_index];
1150                if b.priority() < p.priority() {
1151                    *pair_index = index;
1152                }
1153            } else {
1154                best_pair_index = Some(index);
1155            }
1156        }
1157
1158        best_pair_index
1159    }
1160
1161    pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1162        let mut best_pair_index: Option<usize> = None;
1163
1164        for (index, p) in self.candidate_pairs.iter().enumerate() {
1165            if p.state != CandidatePairState::Succeeded {
1166                continue;
1167            }
1168
1169            if let Some(pair_index) = &mut best_pair_index {
1170                let b = &self.candidate_pairs[*pair_index];
1171                if b.priority() < p.priority() {
1172                    *pair_index = index;
1173                }
1174            } else {
1175                best_pair_index = Some(index);
1176            }
1177        }
1178
1179        best_pair_index
1180    }
1181}