Skip to main content

rtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40    pub(crate) timestamp: Instant,
41    pub(crate) transaction_id: TransactionId,
42    pub(crate) destination: SocketAddr,
43    pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47    fn default() -> Self {
48        Self {
49            timestamp: Instant::now(),
50            transaction_id: TransactionId::default(),
51            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52            is_use_candidate: false,
53        }
54    }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59    pub ufrag: String,
60    pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65    pub(crate) local_credentials: Credentials,
66    pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70    let mut username = Username::new(ATTR_USERNAME, String::new());
71    username.get_from(m)?;
72
73    if username.to_string() != expected_username {
74        return Err(Error::Other(format!(
75            "{:?} expected({}) actual({})",
76            Error::ErrMismatchUsername,
77            expected_username,
78            username,
79        )));
80    }
81
82    Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86    let message_integrity_attr = MessageIntegrity(key.to_vec());
87    message_integrity_attr.check(m)
88}
89
90pub enum Event {
91    ConnectionStateChange(ConnectionState),
92    SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93    /// Emitted when the ICE role switches due to a role conflict (RFC 8445 ยง7.3.1.1).
94    /// The bool is `true` if the agent is now controlling, `false` if now controlled.
95    RoleChange(bool),
96}
97
98/// Represents the ICE agent.
99pub struct Agent {
100    pub(crate) tie_breaker: u64,
101    pub(crate) is_controlling: bool,
102    pub(crate) lite: bool,
103
104    pub(crate) start_time: Instant,
105
106    pub(crate) connection_state: ConnectionState,
107    pub(crate) last_connection_state: ConnectionState,
108
109    //pub(crate) started_ch_tx: Mutex<Option<broadcast::Sender<()>>>,
110    pub(crate) ufrag_pwd: UfragPwd,
111
112    pub(crate) local_candidates: Vec<Candidate>,
113    pub(crate) remote_candidates: Vec<Candidate>,
114    pub(crate) candidate_pairs: Vec<CandidatePair>,
115    pub(crate) nominated_pair: Option<usize>,
116    pub(crate) selected_pair: Option<usize>,
117
118    // LRU of outbound Binding request Transaction IDs
119    pub(crate) pending_binding_requests: Vec<BindingRequest>,
120
121    // the following variables won't be changed after init_with_defaults()
122    pub(crate) insecure_skip_verify: bool,
123    pub(crate) max_binding_requests: u16,
124    pub(crate) host_acceptance_min_wait: Duration,
125    pub(crate) srflx_acceptance_min_wait: Duration,
126    pub(crate) prflx_acceptance_min_wait: Duration,
127    pub(crate) relay_acceptance_min_wait: Duration,
128    // How long connectivity checks can fail before the ICE Agent
129    // goes to disconnected
130    pub(crate) disconnected_timeout: Duration,
131    // How long connectivity checks can fail before the ICE Agent
132    // goes to failed
133    pub(crate) failed_timeout: Duration,
134    // How often should we send keepalive packets?
135    // 0 means never
136    pub(crate) keepalive_interval: Duration,
137    // How often should we run our internal taskLoop to check for state changes when connecting
138    pub(crate) check_interval: Duration,
139    pub(crate) checking_duration: Instant,
140    pub(crate) last_checking_time: Instant,
141
142    pub(crate) mdns: Option<Mdns>,
143    pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
144
145    pub(crate) mdns_mode: MulticastDnsMode,
146    pub(crate) mdns_local_name: String,
147    pub(crate) mdns_local_ip: Option<IpAddr>,
148
149    pub(crate) candidate_types: Vec<CandidateType>,
150    pub(crate) network_types: Vec<NetworkType>,
151    pub(crate) urls: Vec<Url>,
152
153    pub(crate) write_outs: VecDeque<TaggedBytesMut>,
154    pub(crate) event_outs: VecDeque<Event>,
155}
156
157impl Default for Agent {
158    fn default() -> Self {
159        Self {
160            tie_breaker: 0,
161            is_controlling: false,
162            lite: false,
163            start_time: Instant::now(),
164            connection_state: Default::default(),
165            last_connection_state: Default::default(),
166            ufrag_pwd: Default::default(),
167            local_candidates: vec![],
168            remote_candidates: vec![],
169            candidate_pairs: vec![],
170            nominated_pair: None,
171            selected_pair: None,
172            pending_binding_requests: vec![],
173            insecure_skip_verify: false,
174            max_binding_requests: 0,
175            host_acceptance_min_wait: Default::default(),
176            srflx_acceptance_min_wait: Default::default(),
177            prflx_acceptance_min_wait: Default::default(),
178            relay_acceptance_min_wait: Default::default(),
179            disconnected_timeout: Default::default(),
180            failed_timeout: Default::default(),
181            keepalive_interval: Default::default(),
182            check_interval: Default::default(),
183            checking_duration: Instant::now(),
184            last_checking_time: Instant::now(),
185            mdns_mode: MulticastDnsMode::Disabled,
186            mdns_local_name: "".to_owned(),
187            mdns_local_ip: None,
188            mdns_queries: HashMap::new(),
189            mdns: None,
190            candidate_types: vec![],
191            network_types: vec![],
192            urls: vec![],
193            write_outs: Default::default(),
194            event_outs: Default::default(),
195        }
196    }
197}
198
199impl Agent {
200    /// Creates a new Agent.
201    pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
202        let mut mdns_local_name = config.multicast_dns_local_name.clone();
203        if mdns_local_name.is_empty() {
204            mdns_local_name = generate_multicast_dns_name();
205        }
206
207        if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
208            return Err(Error::ErrInvalidMulticastDnshostName);
209        }
210
211        let mdns_mode = config.multicast_dns_mode;
212        let mdns = create_multicast_dns(
213            mdns_mode,
214            &mdns_local_name,
215            &config.multicast_dns_local_ip,
216            &config.multicast_dns_query_timeout,
217        )
218        .unwrap_or_else(|err| {
219            // Opportunistic mDNS: If we can't open the connection, that's ok: we
220            // can continue without it.
221            warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
222            None
223        });
224
225        let candidate_types = if config.candidate_types.is_empty() {
226            default_candidate_types()
227        } else {
228            config.candidate_types.clone()
229        };
230
231        if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
232        {
233            return Err(Error::ErrLiteUsingNonHostCandidates);
234        }
235
236        if !config.urls.is_empty()
237            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
238            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
239        {
240            return Err(Error::ErrUselessUrlsProvided);
241        }
242
243        let mut agent = Self {
244            tie_breaker: rand::random::<u64>(),
245            is_controlling: config.is_controlling,
246            lite: config.lite,
247
248            start_time: Instant::now(),
249
250            nominated_pair: None,
251            selected_pair: None,
252            candidate_pairs: vec![],
253
254            connection_state: ConnectionState::New,
255
256            insecure_skip_verify: config.insecure_skip_verify,
257
258            //started_ch_tx: MuteSome(started_ch_tx)),
259
260            //won't change after init_with_defaults()
261            max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
262                max_binding_requests
263            } else {
264                DEFAULT_MAX_BINDING_REQUESTS
265            },
266            host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
267                config.host_acceptance_min_wait
268            {
269                host_acceptance_min_wait
270            } else {
271                DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
272            },
273            srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
274                config.srflx_acceptance_min_wait
275            {
276                srflx_acceptance_min_wait
277            } else {
278                DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
279            },
280            prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
281                config.prflx_acceptance_min_wait
282            {
283                prflx_acceptance_min_wait
284            } else {
285                DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
286            },
287            relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
288                config.relay_acceptance_min_wait
289            {
290                relay_acceptance_min_wait
291            } else {
292                DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
293            },
294
295            // How long connectivity checks can fail before the ICE Agent
296            // goes to disconnected
297            disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
298                disconnected_timeout
299            } else {
300                DEFAULT_DISCONNECTED_TIMEOUT
301            },
302
303            // How long connectivity checks can fail before the ICE Agent
304            // goes to failed
305            failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
306                failed_timeout
307            } else {
308                DEFAULT_FAILED_TIMEOUT
309            },
310
311            // How often should we send keepalive packets?
312            // 0 means never
313            keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
314                keepalive_interval
315            } else {
316                DEFAULT_KEEPALIVE_INTERVAL
317            },
318
319            // How often should we run our internal taskLoop to check for state changes when connecting
320            check_interval: if config.check_interval == Duration::from_secs(0) {
321                DEFAULT_CHECK_INTERVAL
322            } else {
323                config.check_interval
324            },
325            checking_duration: Instant::now(),
326            last_checking_time: Instant::now(),
327            last_connection_state: ConnectionState::Unspecified,
328
329            mdns,
330            mdns_queries: HashMap::new(),
331
332            mdns_mode,
333            mdns_local_name,
334            mdns_local_ip: config.multicast_dns_local_ip,
335
336            ufrag_pwd: UfragPwd::default(),
337
338            local_candidates: vec![],
339            remote_candidates: vec![],
340
341            // LRU of outbound Binding request Transaction IDs
342            pending_binding_requests: vec![],
343
344            candidate_types,
345            network_types: config.network_types.clone(),
346            urls: config.urls.clone(),
347
348            write_outs: VecDeque::new(),
349            event_outs: VecDeque::new(),
350        };
351
352        // Restart is also used to initialize the agent for the first time
353        if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
354        {
355            let _ = agent.close();
356            return Err(err);
357        }
358
359        Ok(agent)
360    }
361
362    /// Adds a new local candidate.
363    pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
364        // Filter by network type if network_types is configured
365        if !self.network_types.is_empty() {
366            let candidate_network_type = c.network_type();
367            if !self.network_types.contains(&candidate_network_type) {
368                debug!(
369                    "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
370                    candidate_network_type, self.network_types
371                );
372                return Ok(false);
373            }
374        }
375
376        if c.candidate_type() == CandidateType::Host
377            && self.mdns_mode == MulticastDnsMode::QueryAndGather
378            && c.network_type == NetworkType::Udp4
379            && self
380                .mdns_local_ip
381                .is_some_and(|local_ip| local_ip == c.addr().ip())
382        {
383            // only one .local mDNS host candidate per IPv4 is supported
384            // when registered local ip matches, use mdns_local_name to hide local host ip
385            trace!(
386                "mDNS hides local ip {} with local name {}",
387                c.address, self.mdns_local_name
388            );
389            c.address = self.mdns_local_name.clone();
390        }
391
392        for cand in &self.local_candidates {
393            if cand.equal(&c) {
394                return Ok(false);
395            }
396        }
397
398        self.local_candidates.push(c);
399        let local_index = self.local_candidates.len() - 1;
400        let local_tcp_type = self.local_candidates[local_index].tcp_type();
401
402        for remote_index in 0..self.remote_candidates.len() {
403            let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
404
405            // TCP type pairing rules (RFC 6544):
406            // - Active can only pair with Passive
407            // - Passive can only pair with Active
408            // - SimultaneousOpen can pair with SimultaneousOpen
409            // - Unspecified (UDP) can pair with Unspecified
410            let should_pair = match (local_tcp_type, remote_tcp_type) {
411                (TcpType::Active, TcpType::Passive) => true,
412                (TcpType::Passive, TcpType::Active) => true,
413                (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
414                (TcpType::Unspecified, TcpType::Unspecified) => true, // UDP candidates
415                _ => false,
416            };
417
418            if should_pair {
419                self.add_pair(local_index, remote_index);
420            }
421        }
422
423        self.request_connectivity_check();
424
425        Ok(true)
426    }
427
428    /// Adds a new remote candidate.
429    pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
430        // Filter by network type if network_types is configured
431        if !self.network_types.is_empty() {
432            let candidate_network_type = c.network_type();
433            if !self.network_types.contains(&candidate_network_type) {
434                debug!(
435                    "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
436                    candidate_network_type, self.network_types
437                );
438                return Ok(false);
439            }
440        }
441
442        // TCP active candidates don't have a listening port - they initiate connections.
443        // The remote active side will probe our passive candidates, so we don't need
444        // to do anything with remote active candidates.
445        if c.tcp_type() == TcpType::Active {
446            debug!(
447                "Ignoring remote candidate with tcptype active: {}",
448                c.address()
449            );
450            return Ok(false);
451        }
452
453        // If we have a mDNS Candidate lets fully resolve it before adding it locally
454        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
455            if self.mdns_mode == MulticastDnsMode::Disabled {
456                warn!(
457                    "remote mDNS candidate added, but mDNS is disabled: ({})",
458                    c.address()
459                );
460                return Ok(false);
461            }
462
463            if c.candidate_type() != CandidateType::Host {
464                return Err(Error::ErrAddressParseFailed);
465            }
466
467            if let Some(mdns_conn) = &mut self.mdns {
468                let query_id = mdns_conn.query(c.address());
469                self.mdns_queries.insert(query_id, c);
470            }
471
472            return Ok(false);
473        }
474
475        self.trigger_request_connectivity_check(vec![c]);
476        Ok(true)
477    }
478
479    fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
480        for c in remote_candidates {
481            if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
482                let remote_tcp_type = c.tcp_type();
483                self.remote_candidates.push(c);
484                let remote_index = self.remote_candidates.len() - 1;
485
486                // Apply TCP type pairing rules (RFC 6544)
487                for local_index in 0..self.local_candidates.len() {
488                    let local_tcp_type = self.local_candidates[local_index].tcp_type();
489
490                    // TCP type pairing rules:
491                    // - Active can only pair with Passive
492                    // - Passive can only pair with Active
493                    // - SimultaneousOpen can pair with SimultaneousOpen
494                    // - Unspecified (UDP) can pair with Unspecified
495                    let should_pair = match (local_tcp_type, remote_tcp_type) {
496                        (TcpType::Active, TcpType::Passive) => true,
497                        (TcpType::Passive, TcpType::Active) => true,
498                        (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
499                        (TcpType::Unspecified, TcpType::Unspecified) => true, // UDP candidates
500                        _ => false,
501                    };
502
503                    if should_pair {
504                        self.add_pair(local_index, remote_index);
505                    }
506                }
507
508                self.request_connectivity_check();
509            }
510        }
511    }
512
513    /// Sets the credentials of the remote agent.
514    pub fn set_remote_credentials(
515        &mut self,
516        remote_ufrag: String,
517        remote_pwd: String,
518    ) -> Result<()> {
519        if remote_ufrag.is_empty() {
520            return Err(Error::ErrRemoteUfragEmpty);
521        } else if remote_pwd.is_empty() {
522            return Err(Error::ErrRemotePwdEmpty);
523        }
524
525        self.ufrag_pwd.remote_credentials = Some(Credentials {
526            ufrag: remote_ufrag,
527            pwd: remote_pwd,
528        });
529
530        Ok(())
531    }
532
533    /// Returns the remote credentials.
534    pub fn get_remote_credentials(&self) -> Option<&Credentials> {
535        self.ufrag_pwd.remote_credentials.as_ref()
536    }
537
538    /// Returns the local credentials.
539    pub fn get_local_credentials(&self) -> &Credentials {
540        &self.ufrag_pwd.local_credentials
541    }
542
543    pub fn role(&self) -> bool {
544        self.is_controlling
545    }
546
547    pub fn set_role(&mut self, is_controlling: bool) {
548        self.is_controlling = is_controlling;
549    }
550
551    pub fn state(&self) -> ConnectionState {
552        self.connection_state
553    }
554
555    pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
556        self.find_local_candidate(transport.local_addr, transport.transport_protocol)
557            .is_some()
558            && self.validate_non_stun_traffic(transport.peer_addr)
559    }
560
561    fn get_timeout_interval(&self) -> Duration {
562        let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
563            self.check_interval,
564            self.keepalive_interval,
565            self.disconnected_timeout,
566            self.failed_timeout,
567        );
568        let mut interval = DEFAULT_CHECK_INTERVAL;
569
570        let mut update_interval = |x: Duration| {
571            if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
572                interval = x;
573            }
574        };
575
576        match self.last_connection_state {
577            ConnectionState::New | ConnectionState::Checking => {
578                // While connecting, check candidates more frequently
579                update_interval(check_interval);
580            }
581            ConnectionState::Connected | ConnectionState::Disconnected => {
582                update_interval(keepalive_interval);
583            }
584            _ => {}
585        };
586        // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
587        update_interval(disconnected_timeout);
588        update_interval(failed_timeout);
589        interval
590    }
591
592    /// Returns the selected pair (local_candidate, remote_candidate) or none
593    pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
594        if let Some(pair_index) = self.get_selected_pair() {
595            let candidate_pair = &self.candidate_pairs[pair_index];
596            Some((
597                &self.local_candidates[candidate_pair.local_index],
598                &self.remote_candidates[candidate_pair.remote_index],
599            ))
600        } else {
601            None
602        }
603    }
604
605    pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
606        if let Some(pair_index) = self.get_best_available_pair() {
607            let candidate_pair = &self.candidate_pairs[pair_index];
608            Some((
609                &self.local_candidates[candidate_pair.local_index],
610                &self.remote_candidates[candidate_pair.remote_index],
611            ))
612        } else {
613            None
614        }
615    }
616
617    /// start connectivity checks
618    pub fn start_connectivity_checks(
619        &mut self,
620        is_controlling: bool,
621        remote_ufrag: String,
622        remote_pwd: String,
623    ) -> Result<()> {
624        debug!(
625            "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
626            is_controlling, remote_ufrag, remote_pwd
627        );
628        self.set_remote_credentials(remote_ufrag, remote_pwd)?;
629        self.is_controlling = is_controlling;
630        self.start();
631
632        self.update_connection_state(ConnectionState::Checking);
633        self.request_connectivity_check();
634
635        Ok(())
636    }
637
638    /// Restarts the ICE Agent with the provided ufrag/pwd
639    /// If no ufrag/pwd is provided the Agent will generate one itself.
640    pub fn restart(
641        &mut self,
642        mut ufrag: String,
643        mut pwd: String,
644        keep_local_candidates: bool,
645    ) -> Result<()> {
646        if ufrag.is_empty() {
647            ufrag = generate_ufrag();
648        }
649        if pwd.is_empty() {
650            pwd = generate_pwd();
651        }
652
653        if ufrag.len() * 8 < 24 {
654            return Err(Error::ErrLocalUfragInsufficientBits);
655        }
656        if pwd.len() * 8 < 128 {
657            return Err(Error::ErrLocalPwdInsufficientBits);
658        }
659
660        // Clear all agent needed to take back to fresh state
661        self.ufrag_pwd.local_credentials.ufrag = ufrag;
662        self.ufrag_pwd.local_credentials.pwd = pwd;
663        self.ufrag_pwd.remote_credentials = None;
664
665        self.pending_binding_requests = vec![];
666
667        self.candidate_pairs = vec![];
668
669        self.set_selected_pair(None);
670        self.delete_all_candidates(keep_local_candidates);
671        self.start();
672
673        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
674        // for new Agents
675        if self.connection_state != ConnectionState::New {
676            self.update_connection_state(ConnectionState::Checking);
677        }
678
679        Ok(())
680    }
681
682    /// Returns the local candidates.
683    pub fn get_local_candidates(&self) -> &[Candidate] {
684        &self.local_candidates
685    }
686
687    fn contact(&mut self, now: Instant) {
688        if self.connection_state == ConnectionState::Failed {
689            // The connection is currently failed so don't send any checks
690            // In the future it may be restarted though
691            self.last_connection_state = self.connection_state;
692            return;
693        }
694        if self.connection_state == ConnectionState::Checking {
695            // We have just entered checking for the first time so update our checking timer
696            if self.last_connection_state != self.connection_state {
697                self.checking_duration = now;
698            }
699
700            // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
701            if now
702                .checked_duration_since(self.checking_duration)
703                .unwrap_or_else(|| Duration::from_secs(0))
704                > self.disconnected_timeout + self.failed_timeout
705            {
706                self.update_connection_state(ConnectionState::Failed);
707                self.last_connection_state = self.connection_state;
708                return;
709            }
710        }
711
712        self.contact_candidates();
713
714        self.last_connection_state = self.connection_state;
715        self.last_checking_time = now;
716    }
717
718    pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
719        if self.connection_state != new_state {
720            // Connection has gone to failed, release all gathered candidates
721            if new_state == ConnectionState::Failed {
722                self.set_selected_pair(None);
723                self.delete_all_candidates(false);
724            }
725
726            info!(
727                "[{}]: Setting new connection state: {}",
728                self.get_name(),
729                new_state
730            );
731            self.connection_state = new_state;
732            self.event_outs
733                .push_back(Event::ConnectionStateChange(new_state));
734        }
735    }
736
737    pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
738        if let Some(pair_index) = selected_pair {
739            trace!(
740                "[{}]: Set selected candidate pair: {:?}",
741                self.get_name(),
742                self.candidate_pairs[pair_index]
743            );
744
745            self.candidate_pairs[pair_index].nominated = true;
746            self.selected_pair = Some(pair_index);
747
748            self.update_connection_state(ConnectionState::Connected);
749
750            // Notify when the selected pair changes
751            let candidate_pair = &self.candidate_pairs[pair_index];
752            self.event_outs
753                .push_back(Event::SelectedCandidatePairChange(
754                    Box::new(self.local_candidates[candidate_pair.local_index].clone()),
755                    Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
756                ));
757        } else {
758            self.selected_pair = None;
759        }
760    }
761
762    pub(crate) fn ping_all_candidates(&mut self) {
763        let mut pairs: Vec<(usize, usize)> = vec![];
764
765        let name = self.get_name().to_string();
766        if self.candidate_pairs.is_empty() {
767            warn!(
768                "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
769                name,
770            );
771        }
772        for p in &mut self.candidate_pairs {
773            if p.state == CandidatePairState::Waiting {
774                p.state = CandidatePairState::InProgress;
775            } else if p.state != CandidatePairState::InProgress {
776                continue;
777            }
778
779            if p.binding_request_count > self.max_binding_requests {
780                trace!(
781                    "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
782                    name,
783                    *p,
784                    self.local_candidates[p.local_index].addr(),
785                    self.remote_candidates[p.remote_index].addr()
786                );
787                p.state = CandidatePairState::Failed;
788            } else {
789                p.binding_request_count += 1;
790                let local = p.local_index;
791                let remote = p.remote_index;
792                pairs.push((local, remote));
793            }
794        }
795
796        if !pairs.is_empty() {
797            trace!(
798                "[{}]: pinging all {} candidates",
799                self.get_name(),
800                pairs.len()
801            );
802        }
803
804        for (local, remote) in pairs {
805            self.ping_candidate(local, remote);
806        }
807    }
808
809    pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
810        let p = CandidatePair::new(
811            local_index,
812            remote_index,
813            self.local_candidates[local_index].priority(),
814            self.remote_candidates[remote_index].priority(),
815            self.is_controlling,
816        );
817        self.candidate_pairs.push(p);
818    }
819
820    pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
821        for (index, p) in self.candidate_pairs.iter().enumerate() {
822            if p.local_index == local_index && p.remote_index == remote_index {
823                return Some(index);
824            }
825        }
826        None
827    }
828
829    /// Checks if the selected pair is (still) valid.
830    /// Note: the caller should hold the agent lock.
831    pub(crate) fn validate_selected_pair(&mut self) -> bool {
832        let (valid, disconnected_time) = {
833            self.selected_pair.as_ref().map_or_else(
834                || (false, Duration::from_secs(0)),
835                |&pair_index| {
836                    let remote_index = self.candidate_pairs[pair_index].remote_index;
837
838                    let disconnected_time = Instant::now()
839                        .duration_since(self.remote_candidates[remote_index].last_received());
840                    (true, disconnected_time)
841                },
842            )
843        };
844
845        if valid {
846            // Only allow transitions to fail if a.failedTimeout is non-zero
847            let mut total_time_to_failure = self.failed_timeout;
848            if total_time_to_failure != Duration::from_secs(0) {
849                total_time_to_failure += self.disconnected_timeout;
850            }
851
852            if total_time_to_failure != Duration::from_secs(0)
853                && disconnected_time > total_time_to_failure
854            {
855                self.update_connection_state(ConnectionState::Failed);
856            } else if self.disconnected_timeout != Duration::from_secs(0)
857                && disconnected_time > self.disconnected_timeout
858            {
859                self.update_connection_state(ConnectionState::Disconnected);
860            } else {
861                self.update_connection_state(ConnectionState::Connected);
862            }
863        }
864
865        valid
866    }
867
868    /// Sends STUN Binding Indications to the selected pair.
869    /// if no packet has been sent on that pair in the last keepaliveInterval.
870    /// Note: the caller should hold the agent lock.
871    pub(crate) fn check_keepalive(&mut self) {
872        let (local_index, remote_index, pair_index) = {
873            self.selected_pair
874                .as_ref()
875                .map_or((None, None, None), |&pair_index| {
876                    let p = &self.candidate_pairs[pair_index];
877                    (Some(p.local_index), Some(p.remote_index), Some(pair_index))
878                })
879        };
880
881        if let (Some(local_index), Some(remote_index), Some(pair_index)) =
882            (local_index, remote_index, pair_index)
883        {
884            let last_sent =
885                Instant::now().duration_since(self.local_candidates[local_index].last_sent());
886
887            let last_received =
888                Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
889
890            if (self.keepalive_interval != Duration::from_secs(0))
891                && ((last_sent > self.keepalive_interval)
892                    || (last_received > self.keepalive_interval))
893            {
894                // we use binding request instead of indication to support refresh consent schemas
895                // see https://tools.ietf.org/html/rfc7675
896                // Track consent request sent
897                self.candidate_pairs[pair_index].on_consent_request_sent();
898                self.ping_candidate(local_index, remote_index);
899            }
900        }
901    }
902
903    fn request_connectivity_check(&mut self) {
904        if self.ufrag_pwd.remote_credentials.is_some() {
905            self.contact(Instant::now());
906        }
907    }
908
909    /// Remove all candidates.
910    /// This closes any listening sockets and removes both the local and remote candidate lists.
911    ///
912    /// This is used for restarts, failures and on close.
913    pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
914        if !keep_local_candidates {
915            self.local_candidates.clear();
916        }
917        self.remote_candidates.clear();
918    }
919
920    pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
921        for (index, c) in self.remote_candidates.iter().enumerate() {
922            if c.addr() == addr {
923                return Some(index);
924            }
925        }
926        None
927    }
928
929    pub(crate) fn find_local_candidate(
930        &self,
931        addr: SocketAddr,
932        transport_protocol: TransportProtocol,
933    ) -> Option<usize> {
934        for (index, c) in self.local_candidates.iter().enumerate() {
935            if c.network_type().to_protocol() != transport_protocol {
936                continue;
937            }
938
939            // For TCP active candidates, match by IP only (ignore port).
940            // TCP active candidates use port 9 as placeholder in signaling,
941            // but the actual connection uses an ephemeral port.
942            if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
943                if c.addr().ip() == addr.ip() {
944                    return Some(index);
945                }
946            } else if c.addr() == addr {
947                return Some(index);
948            } else if let Some(related_address) = c.related_address()
949                && related_address.address == addr.ip().to_string()
950                && related_address.port == addr.port()
951            {
952                return Some(index);
953            }
954        }
955        None
956    }
957
958    pub(crate) fn send_binding_request(
959        &mut self,
960        m: &Message,
961        local_index: usize,
962        remote_index: usize,
963    ) {
964        trace!(
965            "[{}]: ping STUN from {} to {}",
966            self.get_name(),
967            self.local_candidates[local_index],
968            self.remote_candidates[remote_index],
969        );
970
971        self.invalidate_pending_binding_requests(Instant::now());
972
973        self.pending_binding_requests.push(BindingRequest {
974            timestamp: Instant::now(),
975            transaction_id: m.transaction_id,
976            destination: self.remote_candidates[remote_index].addr(),
977            is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
978        });
979
980        // Track request sent on the candidate pair
981        if let Some(pair_index) = self.find_pair(local_index, remote_index) {
982            self.candidate_pairs[pair_index].on_request_sent();
983        }
984
985        self.send_stun(m, local_index, remote_index);
986    }
987
988    pub(crate) fn send_binding_success(
989        &mut self,
990        m: &Message,
991        local_index: usize,
992        remote_index: usize,
993    ) {
994        let addr = self.remote_candidates[remote_index].addr();
995        let (ip, port) = (addr.ip(), addr.port());
996        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
997
998        let (out, result) = {
999            let mut out = Message::new();
1000            let result = out.build(&[
1001                Box::new(m.clone()),
1002                Box::new(BINDING_SUCCESS),
1003                Box::new(XorMappedAddress { ip, port }),
1004                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
1005                Box::new(FINGERPRINT),
1006            ]);
1007            (out, result)
1008        };
1009
1010        if let Err(err) = result {
1011            warn!(
1012                "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
1013                self.get_name(),
1014                self.local_candidates[local_index],
1015                self.remote_candidates[remote_index],
1016                err
1017            );
1018        } else {
1019            // Track response sent on the candidate pair
1020            if let Some(pair_index) = self.find_pair(local_index, remote_index) {
1021                self.candidate_pairs[pair_index].on_response_sent();
1022            }
1023            self.send_stun(&out, local_index, remote_index);
1024        }
1025    }
1026
1027    /// Sends a 487 (Role Conflict) error response.
1028    /// RFC 8445 Section 7.3.1.1
1029    pub(crate) fn send_role_conflict_error(
1030        &mut self,
1031        m: &Message,
1032        local_index: usize,
1033        remote_index: usize,
1034    ) {
1035        use stun::error_code::*;
1036
1037        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
1038
1039        let (out, result) = {
1040            let mut out = Message::new();
1041            let result = out.build(&[
1042                Box::new(m.clone()),
1043                Box::new(stun::message::BINDING_ERROR),
1044                Box::new(CODE_ROLE_CONFLICT),
1045                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
1046                Box::new(FINGERPRINT),
1047            ]);
1048            (out, result)
1049        };
1050
1051        if let Err(err) = result {
1052            warn!(
1053                "[{}]: Failed to send role conflict error from: {} to: {} error: {}",
1054                self.get_name(),
1055                self.local_candidates[local_index],
1056                self.remote_candidates[remote_index],
1057                err
1058            );
1059        } else {
1060            debug!(
1061                "[{}]: Sent 487 Role Conflict error from {} to {}",
1062                self.get_name(),
1063                self.local_candidates[local_index],
1064                self.remote_candidates[remote_index]
1065            );
1066            self.send_stun(&out, local_index, remote_index);
1067        }
1068    }
1069
1070    /// Switches the ICE agent role and recomputes all candidate pair priorities.
1071    /// RFC 8445 Section 7.3.1.1
1072    pub(crate) fn switch_role(&mut self) {
1073        self.is_controlling = !self.is_controlling;
1074
1075        // Recompute priorities for all candidate pairs
1076        // The priority calculation depends on ice_role_controlling
1077        for pair in &mut self.candidate_pairs {
1078            pair.ice_role_controlling = self.is_controlling;
1079        }
1080
1081        // Clear nominated pair when switching roles
1082        self.nominated_pair = None;
1083
1084        info!(
1085            "[{}]: Role switched, recomputed {} candidate pair priorities",
1086            self.get_name(),
1087            self.candidate_pairs.len()
1088        );
1089
1090        self.event_outs
1091            .push_back(Event::RoleChange(self.is_controlling));
1092    }
1093
1094    /// Removes pending binding requests that are over `maxBindingRequestTimeout` old Let HTO be the
1095    /// transaction timeout, which SHOULD be 2*RTT if RTT is known or 500 ms otherwise.
1096    ///
1097    /// reference: (IETF ref-8445)[https://tools.ietf.org/html/rfc8445#appendix-B.1].
1098    pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1099        let pending_binding_requests = &mut self.pending_binding_requests;
1100        let initial_size = pending_binding_requests.len();
1101
1102        let mut temp = vec![];
1103        for binding_request in pending_binding_requests.drain(..) {
1104            if filter_time
1105                .checked_duration_since(binding_request.timestamp)
1106                .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1107                .unwrap_or(true)
1108            {
1109                temp.push(binding_request);
1110            }
1111        }
1112
1113        *pending_binding_requests = temp;
1114        let bind_requests_remaining = pending_binding_requests.len();
1115        let bind_requests_removed = initial_size - bind_requests_remaining;
1116        if bind_requests_removed > 0 {
1117            trace!(
1118                "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1119                self.get_name(),
1120                bind_requests_removed,
1121                bind_requests_remaining,
1122            );
1123        }
1124    }
1125
1126    /// Assert that the passed `TransactionID` is in our `pendingBindingRequests` and returns the
1127    /// destination, If the bindingRequest was valid remove it from our pending cache.
1128    pub(crate) fn handle_inbound_binding_success(
1129        &mut self,
1130        id: TransactionId,
1131    ) -> Option<BindingRequest> {
1132        self.invalidate_pending_binding_requests(Instant::now());
1133
1134        let pending_binding_requests = &mut self.pending_binding_requests;
1135        for i in 0..pending_binding_requests.len() {
1136            if pending_binding_requests[i].transaction_id == id {
1137                let valid_binding_request = pending_binding_requests.remove(i);
1138                return Some(valid_binding_request);
1139            }
1140        }
1141        None
1142    }
1143
1144    /// Processes STUN traffic from a remote candidate.
1145    pub(crate) fn handle_inbound(
1146        &mut self,
1147        now: Instant,
1148        m: &mut Message,
1149        local_index: usize,
1150        remote_addr: SocketAddr,
1151    ) -> Result<()> {
1152        if m.typ.method != METHOD_BINDING
1153            || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1154                || m.typ.class == CLASS_REQUEST
1155                || m.typ.class == CLASS_INDICATION)
1156        {
1157            trace!(
1158                "[{}]: unhandled STUN from {} to {} class({}) method({})",
1159                self.get_name(),
1160                remote_addr,
1161                self.local_candidates[local_index],
1162                m.typ.class,
1163                m.typ.method
1164            );
1165            return Err(Error::ErrUnhandledStunpacket);
1166        }
1167
1168        // RFC 8445 Section 7.3.1.1 - Detecting and Repairing Role Conflicts
1169        if self.is_controlling {
1170            if m.contains(ATTR_ICE_CONTROLLING) {
1171                // Both agents are controlling - role conflict detected
1172                let mut remote_controlling = crate::attributes::control::AttrControlling::default();
1173                if let Err(err) = remote_controlling.get_from(m) {
1174                    warn!(
1175                        "[{}]: Failed to get remote ICE-CONTROLLING attribute: {}",
1176                        self.get_name(),
1177                        err
1178                    );
1179                    return Err(err);
1180                }
1181
1182                debug!(
1183                    "[{}]: Role conflict detected (both controlling), local tiebreaker: {}, remote tiebreaker: {}",
1184                    self.get_name(),
1185                    self.tie_breaker,
1186                    remote_controlling.0
1187                );
1188
1189                // Only process if this is a request (not a response)
1190                if m.typ.class == CLASS_REQUEST {
1191                    // Send 487 Role Conflict error
1192                    if let Some(remote_index) = self.find_remote_candidate(remote_addr) {
1193                        self.send_role_conflict_error(m, local_index, remote_index);
1194                    }
1195
1196                    // Compare tiebreakers - if ours is smaller, we switch to controlled
1197                    if self.tie_breaker < remote_controlling.0 {
1198                        info!(
1199                            "[{}]: Switching from controlling to controlled due to role conflict (smaller tiebreaker)",
1200                            self.get_name()
1201                        );
1202                        self.switch_role();
1203                    }
1204                }
1205                // Continue processing the message after handling role conflict
1206            } else if m.contains(ATTR_USE_CANDIDATE) {
1207                debug!(
1208                    "[{}]: useCandidate && a.isControlling == true",
1209                    self.get_name(),
1210                );
1211                return Err(Error::ErrUnexpectedStunrequestMessage);
1212            }
1213        } else if m.contains(ATTR_ICE_CONTROLLED) {
1214            // Both agents are controlled - role conflict detected
1215            let mut remote_controlled = crate::attributes::control::AttrControlled::default();
1216            if let Err(err) = remote_controlled.get_from(m) {
1217                warn!(
1218                    "[{}]: Failed to get remote ICE-CONTROLLED attribute: {}",
1219                    self.get_name(),
1220                    err
1221                );
1222                return Err(err);
1223            }
1224
1225            debug!(
1226                "[{}]: Role conflict detected (both controlled), local tiebreaker: {}, remote tiebreaker: {}",
1227                self.get_name(),
1228                self.tie_breaker,
1229                remote_controlled.0
1230            );
1231
1232            // Only process if this is a request (not a response)
1233            if m.typ.class == CLASS_REQUEST {
1234                // Send 487 Role Conflict error
1235                if let Some(remote_index) = self.find_remote_candidate(remote_addr) {
1236                    self.send_role_conflict_error(m, local_index, remote_index);
1237                }
1238
1239                // Compare tiebreakers - if ours is larger, we switch to controlling
1240                if self.tie_breaker > remote_controlled.0 {
1241                    info!(
1242                        "[{}]: Switching from controlled to controlling due to role conflict (larger tiebreaker)",
1243                        self.get_name()
1244                    );
1245                    self.switch_role();
1246                }
1247            }
1248            // Continue processing the message after handling role conflict
1249        }
1250
1251        let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1252            debug!(
1253                "[{}]: ufrag_pwd.remote_credentials.is_none",
1254                self.get_name(),
1255            );
1256            return Err(Error::ErrPasswordEmpty);
1257        };
1258
1259        let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1260        if m.typ.class == CLASS_SUCCESS_RESPONSE {
1261            if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1262            {
1263                warn!(
1264                    "[{}]: discard message from ({}), {}",
1265                    self.get_name(),
1266                    remote_addr,
1267                    err
1268                );
1269                return Err(err);
1270            }
1271
1272            if let Some(remote_index) = &remote_candidate_index {
1273                self.handle_success_response(now, m, local_index, *remote_index, remote_addr);
1274            } else {
1275                warn!(
1276                    "[{}]: discard success message from ({}), no such remote",
1277                    self.get_name(),
1278                    remote_addr
1279                );
1280                return Err(Error::ErrUnhandledStunpacket);
1281            }
1282        } else if m.typ.class == CLASS_REQUEST {
1283            {
1284                let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1285                    + ":"
1286                    + remote_credentials.ufrag.as_str();
1287                if let Err(err) = assert_inbound_username(m, &username) {
1288                    warn!(
1289                        "[{}]: discard message from ({}), {}",
1290                        self.get_name(),
1291                        remote_addr,
1292                        err
1293                    );
1294                    return Err(err);
1295                } else if let Err(err) = assert_inbound_message_integrity(
1296                    m,
1297                    self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1298                ) {
1299                    warn!(
1300                        "[{}]: discard message from ({}), {}",
1301                        self.get_name(),
1302                        remote_addr,
1303                        err
1304                    );
1305                    return Err(err);
1306                }
1307            }
1308
1309            if remote_candidate_index.is_none() {
1310                // Use the local candidate's network type for the peer-reflexive candidate
1311                let network_type = self.local_candidates[local_index].network_type();
1312                let (ip, port) = (remote_addr.ip(), remote_addr.port());
1313
1314                let prflx_candidate_config = CandidatePeerReflexiveConfig {
1315                    base_config: CandidateConfig {
1316                        network: network_type.to_string(),
1317                        address: ip.to_string(),
1318                        port,
1319                        component: self.local_candidates[local_index].component(),
1320                        ..CandidateConfig::default()
1321                    },
1322                    rel_addr: "".to_owned(),
1323                    rel_port: 0,
1324                };
1325
1326                match prflx_candidate_config.new_candidate_peer_reflexive() {
1327                    Ok(prflx_candidate) => {
1328                        if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1329                            && added
1330                        {
1331                            remote_candidate_index = Some(self.remote_candidates.len() - 1);
1332                        }
1333                    }
1334                    Err(err) => {
1335                        error!(
1336                            "[{}]: Failed to create new remote prflx candidate ({})",
1337                            self.get_name(),
1338                            err
1339                        );
1340                        return Err(err);
1341                    }
1342                };
1343
1344                debug!(
1345                    "[{}]: adding a new peer-reflexive candidate: {} ",
1346                    self.get_name(),
1347                    remote_addr
1348                );
1349            }
1350
1351            trace!(
1352                "[{}]: inbound STUN (Request) from {} to {}",
1353                self.get_name(),
1354                remote_addr,
1355                self.local_candidates[local_index]
1356            );
1357
1358            if let Some(remote_index) = &remote_candidate_index {
1359                self.handle_binding_request(m, local_index, *remote_index);
1360            }
1361        }
1362
1363        if let Some(remote_index) = remote_candidate_index {
1364            self.remote_candidates[remote_index].seen(false);
1365        }
1366
1367        Ok(())
1368    }
1369
1370    // Processes non STUN traffic from a remote candidate, and returns true if it is an actual
1371    // remote candidate.
1372    pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1373        self.find_remote_candidate(remote_addr)
1374            .is_some_and(|remote_index| {
1375                self.remote_candidates[remote_index].seen(false);
1376                true
1377            })
1378    }
1379
1380    pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1381        let peer_addr = self.remote_candidates[remote_index].addr();
1382        let local_addr = self.local_candidates[local_index].addr();
1383        let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1384            TransportProtocol::TCP
1385        } else {
1386            TransportProtocol::UDP
1387        };
1388
1389        self.write_outs.push_back(TaggedBytesMut {
1390            now: Instant::now(),
1391            transport: TransportContext {
1392                local_addr,
1393                peer_addr,
1394                ecn: None,
1395                transport_protocol,
1396            },
1397            message: BytesMut::from(&msg.raw[..]),
1398        });
1399
1400        self.local_candidates[local_index].seen(true);
1401    }
1402
1403    fn handle_inbound_candidate_msg(
1404        &mut self,
1405        local_index: usize,
1406        msg: TaggedBytesMut,
1407    ) -> Result<()> {
1408        if is_stun_message(&msg.message) {
1409            let mut m = Message {
1410                raw: msg.message.to_vec(),
1411                ..Message::default()
1412            };
1413
1414            if let Err(err) = m.decode() {
1415                warn!(
1416                    "[{}]: Failed to handle decode ICE from {} to {}: {}",
1417                    self.get_name(),
1418                    msg.transport.local_addr,
1419                    msg.transport.peer_addr,
1420                    err
1421                );
1422                Err(err)
1423            } else {
1424                self.handle_inbound(msg.now, &mut m, local_index, msg.transport.peer_addr)
1425            }
1426        } else {
1427            if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1428                warn!(
1429                    "[{}]: Discarded message, not a valid remote candidate from {}",
1430                    self.get_name(),
1431                    msg.transport.peer_addr,
1432                );
1433            } else {
1434                warn!(
1435                    "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1436                    self.get_name(),
1437                    msg.transport.peer_addr
1438                );
1439            }
1440            Err(Error::ErrNonStunmessage)
1441        }
1442    }
1443
1444    pub(crate) fn get_name(&self) -> &str {
1445        if self.is_controlling {
1446            "controlling"
1447        } else {
1448            "controlled"
1449        }
1450    }
1451
1452    pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1453        self.selected_pair
1454    }
1455
1456    pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1457        let mut best_pair_index: Option<usize> = None;
1458
1459        for (index, p) in self.candidate_pairs.iter().enumerate() {
1460            if p.state == CandidatePairState::Failed {
1461                continue;
1462            }
1463
1464            if let Some(pair_index) = &mut best_pair_index {
1465                let b = &self.candidate_pairs[*pair_index];
1466                if b.priority() < p.priority() {
1467                    *pair_index = index;
1468                }
1469            } else {
1470                best_pair_index = Some(index);
1471            }
1472        }
1473
1474        best_pair_index
1475    }
1476
1477    pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1478        let mut best_pair_index: Option<usize> = None;
1479
1480        for (index, p) in self.candidate_pairs.iter().enumerate() {
1481            if p.state != CandidatePairState::Succeeded {
1482                continue;
1483            }
1484
1485            if let Some(pair_index) = &mut best_pair_index {
1486                let b = &self.candidate_pairs[*pair_index];
1487                if b.priority() < p.priority() {
1488                    *pair_index = index;
1489                }
1490            } else {
1491                best_pair_index = Some(index);
1492            }
1493        }
1494
1495        best_pair_index
1496    }
1497}