rtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::url::*;
32use shared::error::*;
33use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
34
35const ZERO_DURATION: Duration = Duration::from_secs(0);
36
37#[derive(Debug, Clone)]
38pub(crate) struct BindingRequest {
39    pub(crate) timestamp: Instant,
40    pub(crate) transaction_id: TransactionId,
41    pub(crate) destination: SocketAddr,
42    pub(crate) is_use_candidate: bool,
43}
44
45impl Default for BindingRequest {
46    fn default() -> Self {
47        Self {
48            timestamp: Instant::now(),
49            transaction_id: TransactionId::default(),
50            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
51            is_use_candidate: false,
52        }
53    }
54}
55
56#[derive(Default, Clone)]
57pub struct Credentials {
58    pub ufrag: String,
59    pub pwd: String,
60}
61
62#[derive(Default, Clone)]
63pub(crate) struct UfragPwd {
64    pub(crate) local_credentials: Credentials,
65    pub(crate) remote_credentials: Option<Credentials>,
66}
67
68fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
69    let mut username = Username::new(ATTR_USERNAME, String::new());
70    username.get_from(m)?;
71
72    if username.to_string() != expected_username {
73        return Err(Error::Other(format!(
74            "{:?} expected({}) actual({})",
75            Error::ErrMismatchUsername,
76            expected_username,
77            username,
78        )));
79    }
80
81    Ok(())
82}
83
84fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
85    let message_integrity_attr = MessageIntegrity(key.to_vec());
86    message_integrity_attr.check(m)
87}
88
89pub enum Event {
90    ConnectionStateChange(ConnectionState),
91    SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
92}
93
94/// Represents the ICE agent.
95pub struct Agent {
96    pub(crate) tie_breaker: u64,
97    pub(crate) is_controlling: bool,
98    pub(crate) lite: bool,
99
100    pub(crate) start_time: Instant,
101
102    pub(crate) connection_state: ConnectionState,
103    pub(crate) last_connection_state: ConnectionState,
104
105    //pub(crate) started_ch_tx: Mutex<Option<broadcast::Sender<()>>>,
106    pub(crate) ufrag_pwd: UfragPwd,
107
108    pub(crate) local_candidates: Vec<Candidate>,
109    pub(crate) remote_candidates: Vec<Candidate>,
110    pub(crate) candidate_pairs: Vec<CandidatePair>,
111    pub(crate) nominated_pair: Option<usize>,
112    pub(crate) selected_pair: Option<usize>,
113
114    // LRU of outbound Binding request Transaction IDs
115    pub(crate) pending_binding_requests: Vec<BindingRequest>,
116
117    // the following variables won't be changed after init_with_defaults()
118    pub(crate) insecure_skip_verify: bool,
119    pub(crate) max_binding_requests: u16,
120    pub(crate) host_acceptance_min_wait: Duration,
121    pub(crate) srflx_acceptance_min_wait: Duration,
122    pub(crate) prflx_acceptance_min_wait: Duration,
123    pub(crate) relay_acceptance_min_wait: Duration,
124    // How long connectivity checks can fail before the ICE Agent
125    // goes to disconnected
126    pub(crate) disconnected_timeout: Duration,
127    // How long connectivity checks can fail before the ICE Agent
128    // goes to failed
129    pub(crate) failed_timeout: Duration,
130    // How often should we send keepalive packets?
131    // 0 means never
132    pub(crate) keepalive_interval: Duration,
133    // How often should we run our internal taskLoop to check for state changes when connecting
134    pub(crate) check_interval: Duration,
135    pub(crate) checking_duration: Instant,
136    pub(crate) last_checking_time: Instant,
137
138    pub(crate) mdns: Option<Mdns>,
139    pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
140
141    pub(crate) mdns_mode: MulticastDnsMode,
142    pub(crate) mdns_local_name: String,
143    pub(crate) mdns_local_ip: Option<IpAddr>,
144
145    pub(crate) candidate_types: Vec<CandidateType>,
146    pub(crate) urls: Vec<Url>,
147
148    pub(crate) write_outs: VecDeque<TaggedBytesMut>,
149    pub(crate) event_outs: VecDeque<Event>,
150}
151
152impl Default for Agent {
153    fn default() -> Self {
154        Self {
155            tie_breaker: 0,
156            is_controlling: false,
157            lite: false,
158            start_time: Instant::now(),
159            connection_state: Default::default(),
160            last_connection_state: Default::default(),
161            ufrag_pwd: Default::default(),
162            local_candidates: vec![],
163            remote_candidates: vec![],
164            candidate_pairs: vec![],
165            nominated_pair: None,
166            selected_pair: None,
167            pending_binding_requests: vec![],
168            insecure_skip_verify: false,
169            max_binding_requests: 0,
170            host_acceptance_min_wait: Default::default(),
171            srflx_acceptance_min_wait: Default::default(),
172            prflx_acceptance_min_wait: Default::default(),
173            relay_acceptance_min_wait: Default::default(),
174            disconnected_timeout: Default::default(),
175            failed_timeout: Default::default(),
176            keepalive_interval: Default::default(),
177            check_interval: Default::default(),
178            checking_duration: Instant::now(),
179            last_checking_time: Instant::now(),
180            mdns_mode: MulticastDnsMode::Disabled,
181            mdns_local_name: "".to_owned(),
182            mdns_local_ip: None,
183            mdns_queries: HashMap::new(),
184            mdns: None,
185            candidate_types: vec![],
186            urls: vec![],
187            write_outs: Default::default(),
188            event_outs: Default::default(),
189        }
190    }
191}
192
193impl Agent {
194    /// Creates a new Agent.
195    pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
196        let mut mdns_local_name = config.multicast_dns_local_name.clone();
197        if mdns_local_name.is_empty() {
198            mdns_local_name = generate_multicast_dns_name();
199        }
200
201        if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
202            return Err(Error::ErrInvalidMulticastDnshostName);
203        }
204
205        let mdns_mode = config.multicast_dns_mode;
206        let mdns = create_multicast_dns(
207            mdns_mode,
208            &mdns_local_name,
209            &config.multicast_dns_local_ip,
210            &config.multicast_dns_query_timeout,
211        )
212        .unwrap_or_else(|err| {
213            // Opportunistic mDNS: If we can't open the connection, that's ok: we
214            // can continue without it.
215            warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
216            None
217        });
218
219        let candidate_types = if config.candidate_types.is_empty() {
220            default_candidate_types()
221        } else {
222            config.candidate_types.clone()
223        };
224
225        if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
226        {
227            return Err(Error::ErrLiteUsingNonHostCandidates);
228        }
229
230        if !config.urls.is_empty()
231            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
232            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
233        {
234            return Err(Error::ErrUselessUrlsProvided);
235        }
236
237        let mut agent = Self {
238            tie_breaker: rand::random::<u64>(),
239            is_controlling: config.is_controlling,
240            lite: config.lite,
241
242            start_time: Instant::now(),
243
244            nominated_pair: None,
245            selected_pair: None,
246            candidate_pairs: vec![],
247
248            connection_state: ConnectionState::New,
249
250            insecure_skip_verify: config.insecure_skip_verify,
251
252            //started_ch_tx: MuteSome(started_ch_tx)),
253
254            //won't change after init_with_defaults()
255            max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
256                max_binding_requests
257            } else {
258                DEFAULT_MAX_BINDING_REQUESTS
259            },
260            host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
261                config.host_acceptance_min_wait
262            {
263                host_acceptance_min_wait
264            } else {
265                DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
266            },
267            srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
268                config.srflx_acceptance_min_wait
269            {
270                srflx_acceptance_min_wait
271            } else {
272                DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
273            },
274            prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
275                config.prflx_acceptance_min_wait
276            {
277                prflx_acceptance_min_wait
278            } else {
279                DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
280            },
281            relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
282                config.relay_acceptance_min_wait
283            {
284                relay_acceptance_min_wait
285            } else {
286                DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
287            },
288
289            // How long connectivity checks can fail before the ICE Agent
290            // goes to disconnected
291            disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
292                disconnected_timeout
293            } else {
294                DEFAULT_DISCONNECTED_TIMEOUT
295            },
296
297            // How long connectivity checks can fail before the ICE Agent
298            // goes to failed
299            failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
300                failed_timeout
301            } else {
302                DEFAULT_FAILED_TIMEOUT
303            },
304
305            // How often should we send keepalive packets?
306            // 0 means never
307            keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
308                keepalive_interval
309            } else {
310                DEFAULT_KEEPALIVE_INTERVAL
311            },
312
313            // How often should we run our internal taskLoop to check for state changes when connecting
314            check_interval: if config.check_interval == Duration::from_secs(0) {
315                DEFAULT_CHECK_INTERVAL
316            } else {
317                config.check_interval
318            },
319            checking_duration: Instant::now(),
320            last_checking_time: Instant::now(),
321            last_connection_state: ConnectionState::Unspecified,
322
323            mdns,
324            mdns_queries: HashMap::new(),
325
326            mdns_mode,
327            mdns_local_name,
328            mdns_local_ip: config.multicast_dns_local_ip,
329
330            ufrag_pwd: UfragPwd::default(),
331
332            local_candidates: vec![],
333            remote_candidates: vec![],
334
335            // LRU of outbound Binding request Transaction IDs
336            pending_binding_requests: vec![],
337
338            candidate_types,
339            urls: config.urls.clone(),
340
341            write_outs: VecDeque::new(),
342            event_outs: VecDeque::new(),
343        };
344
345        // Restart is also used to initialize the agent for the first time
346        if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
347        {
348            let _ = agent.close();
349            return Err(err);
350        }
351
352        Ok(agent)
353    }
354
355    /// Adds a new local candidate.
356    pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<()> {
357        if c.candidate_type() == CandidateType::Host
358            && self.mdns_mode == MulticastDnsMode::QueryAndGather
359            && c.network_type == NetworkType::Udp4
360            && self
361                .mdns_local_ip
362                .is_some_and(|local_ip| local_ip == c.addr().ip())
363        {
364            // only one .local mDNS host candidate per IPv4 is supported
365            // when registered local ip matches, use mdns_local_name to hide local host ip
366            trace!(
367                "mDNS hides local ip {} with local name {}",
368                c.address, self.mdns_local_name
369            );
370            c.address = self.mdns_local_name.clone();
371        }
372
373        for cand in &self.local_candidates {
374            if cand.equal(&c) {
375                return Ok(());
376            }
377        }
378
379        self.local_candidates.push(c);
380
381        for remote_index in 0..self.remote_candidates.len() {
382            self.add_pair(self.local_candidates.len() - 1, remote_index);
383        }
384
385        self.request_connectivity_check();
386
387        Ok(())
388    }
389
390    /// Adds a new remote candidate.
391    pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
392        // If we have a mDNS Candidate lets fully resolve it before adding it locally
393        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
394            if self.mdns_mode == MulticastDnsMode::Disabled {
395                warn!(
396                    "remote mDNS candidate added, but mDNS is disabled: ({})",
397                    c.address()
398                );
399                return Ok(());
400            }
401
402            if c.candidate_type() != CandidateType::Host {
403                return Err(Error::ErrAddressParseFailed);
404            }
405
406            if let Some(mdns_conn) = &mut self.mdns {
407                let query_id = mdns_conn.query(c.address());
408                self.mdns_queries.insert(query_id, c);
409            }
410
411            return Ok(());
412        }
413
414        self.trigger_request_connectivity_check(vec![c]);
415        Ok(())
416    }
417
418    fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
419        for c in remote_candidates {
420            if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
421                self.remote_candidates.push(c);
422
423                for local_index in 0..self.local_candidates.len() {
424                    self.add_pair(local_index, self.remote_candidates.len() - 1);
425                }
426
427                self.request_connectivity_check();
428            }
429        }
430    }
431
432    /// Sets the credentials of the remote agent.
433    pub fn set_remote_credentials(
434        &mut self,
435        remote_ufrag: String,
436        remote_pwd: String,
437    ) -> Result<()> {
438        if remote_ufrag.is_empty() {
439            return Err(Error::ErrRemoteUfragEmpty);
440        } else if remote_pwd.is_empty() {
441            return Err(Error::ErrRemotePwdEmpty);
442        }
443
444        self.ufrag_pwd.remote_credentials = Some(Credentials {
445            ufrag: remote_ufrag,
446            pwd: remote_pwd,
447        });
448
449        Ok(())
450    }
451
452    /// Returns the remote credentials.
453    pub fn get_remote_credentials(&self) -> Option<&Credentials> {
454        self.ufrag_pwd.remote_credentials.as_ref()
455    }
456
457    /// Returns the local credentials.
458    pub fn get_local_credentials(&self) -> &Credentials {
459        &self.ufrag_pwd.local_credentials
460    }
461
462    pub fn role(&self) -> bool {
463        self.is_controlling
464    }
465
466    pub fn set_role(&mut self, is_controlling: bool) {
467        self.is_controlling = is_controlling;
468    }
469
470    pub fn state(&self) -> ConnectionState {
471        self.connection_state
472    }
473
474    pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
475        self.find_local_candidate(transport.local_addr, transport.transport_protocol)
476            .is_some()
477            && self.validate_non_stun_traffic(transport.peer_addr)
478    }
479
480    fn get_timeout_interval(&self) -> Duration {
481        let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
482            self.check_interval,
483            self.keepalive_interval,
484            self.disconnected_timeout,
485            self.failed_timeout,
486        );
487        let mut interval = DEFAULT_CHECK_INTERVAL;
488
489        let mut update_interval = |x: Duration| {
490            if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
491                interval = x;
492            }
493        };
494
495        match self.last_connection_state {
496            ConnectionState::New | ConnectionState::Checking => {
497                // While connecting, check candidates more frequently
498                update_interval(check_interval);
499            }
500            ConnectionState::Connected | ConnectionState::Disconnected => {
501                update_interval(keepalive_interval);
502            }
503            _ => {}
504        };
505        // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
506        update_interval(disconnected_timeout);
507        update_interval(failed_timeout);
508        interval
509    }
510
511    /// Returns the selected pair (local_candidate, remote_candidate) or none
512    pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
513        if let Some(pair_index) = self.get_selected_pair() {
514            let candidate_pair = &self.candidate_pairs[pair_index];
515            Some((
516                &self.local_candidates[candidate_pair.local_index],
517                &self.remote_candidates[candidate_pair.remote_index],
518            ))
519        } else {
520            None
521        }
522    }
523
524    pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
525        if let Some(pair_index) = self.get_best_available_pair() {
526            let candidate_pair = &self.candidate_pairs[pair_index];
527            Some((
528                &self.local_candidates[candidate_pair.local_index],
529                &self.remote_candidates[candidate_pair.remote_index],
530            ))
531        } else {
532            None
533        }
534    }
535
536    /// start connectivity checks
537    pub fn start_connectivity_checks(
538        &mut self,
539        is_controlling: bool,
540        remote_ufrag: String,
541        remote_pwd: String,
542    ) -> Result<()> {
543        debug!(
544            "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
545            is_controlling, remote_ufrag, remote_pwd
546        );
547        self.set_remote_credentials(remote_ufrag, remote_pwd)?;
548        self.is_controlling = is_controlling;
549        self.start();
550
551        self.update_connection_state(ConnectionState::Checking);
552        self.request_connectivity_check();
553
554        Ok(())
555    }
556
557    /// Restarts the ICE Agent with the provided ufrag/pwd
558    /// If no ufrag/pwd is provided the Agent will generate one itself.
559    pub fn restart(
560        &mut self,
561        mut ufrag: String,
562        mut pwd: String,
563        keep_local_candidates: bool,
564    ) -> Result<()> {
565        if ufrag.is_empty() {
566            ufrag = generate_ufrag();
567        }
568        if pwd.is_empty() {
569            pwd = generate_pwd();
570        }
571
572        if ufrag.len() * 8 < 24 {
573            return Err(Error::ErrLocalUfragInsufficientBits);
574        }
575        if pwd.len() * 8 < 128 {
576            return Err(Error::ErrLocalPwdInsufficientBits);
577        }
578
579        // Clear all agent needed to take back to fresh state
580        self.ufrag_pwd.local_credentials.ufrag = ufrag;
581        self.ufrag_pwd.local_credentials.pwd = pwd;
582        self.ufrag_pwd.remote_credentials = None;
583
584        self.pending_binding_requests = vec![];
585
586        self.candidate_pairs = vec![];
587
588        self.set_selected_pair(None);
589        self.delete_all_candidates(keep_local_candidates);
590        self.start();
591
592        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
593        // for new Agents
594        if self.connection_state != ConnectionState::New {
595            self.update_connection_state(ConnectionState::Checking);
596        }
597
598        Ok(())
599    }
600
601    /// Returns the local candidates.
602    pub fn get_local_candidates(&self) -> &[Candidate] {
603        &self.local_candidates
604    }
605
606    fn contact(&mut self, now: Instant) {
607        if self.connection_state == ConnectionState::Failed {
608            // The connection is currently failed so don't send any checks
609            // In the future it may be restarted though
610            self.last_connection_state = self.connection_state;
611            return;
612        }
613        if self.connection_state == ConnectionState::Checking {
614            // We have just entered checking for the first time so update our checking timer
615            if self.last_connection_state != self.connection_state {
616                self.checking_duration = now;
617            }
618
619            // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
620            if now
621                .checked_duration_since(self.checking_duration)
622                .unwrap_or_else(|| Duration::from_secs(0))
623                > self.disconnected_timeout + self.failed_timeout
624            {
625                self.update_connection_state(ConnectionState::Failed);
626                self.last_connection_state = self.connection_state;
627                return;
628            }
629        }
630
631        self.contact_candidates();
632
633        self.last_connection_state = self.connection_state;
634        self.last_checking_time = now;
635    }
636
637    pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
638        if self.connection_state != new_state {
639            // Connection has gone to failed, release all gathered candidates
640            if new_state == ConnectionState::Failed {
641                self.set_selected_pair(None);
642                self.delete_all_candidates(false);
643            }
644
645            info!(
646                "[{}]: Setting new connection state: {}",
647                self.get_name(),
648                new_state
649            );
650            self.connection_state = new_state;
651            self.event_outs
652                .push_back(Event::ConnectionStateChange(new_state));
653        }
654    }
655
656    pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
657        if let Some(pair_index) = selected_pair {
658            trace!(
659                "[{}]: Set selected candidate pair: {:?}",
660                self.get_name(),
661                self.candidate_pairs[pair_index]
662            );
663
664            self.candidate_pairs[pair_index].nominated = true;
665            self.selected_pair = Some(pair_index);
666
667            self.update_connection_state(ConnectionState::Connected);
668
669            // Notify when the selected pair changes
670            let candidate_pair = &self.candidate_pairs[pair_index];
671            self.event_outs
672                .push_back(Event::SelectedCandidatePairChange(
673                    Box::new(self.local_candidates[candidate_pair.local_index].clone()),
674                    Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
675                ));
676        } else {
677            self.selected_pair = None;
678        }
679    }
680
681    pub(crate) fn ping_all_candidates(&mut self) {
682        let mut pairs: Vec<(usize, usize)> = vec![];
683
684        let name = self.get_name().to_string();
685        if self.candidate_pairs.is_empty() {
686            warn!(
687                "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
688                name,
689            );
690        }
691        for p in &mut self.candidate_pairs {
692            if p.state == CandidatePairState::Waiting {
693                p.state = CandidatePairState::InProgress;
694            } else if p.state != CandidatePairState::InProgress {
695                continue;
696            }
697
698            if p.binding_request_count > self.max_binding_requests {
699                trace!(
700                    "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
701                    name,
702                    *p,
703                    self.local_candidates[p.local_index].addr(),
704                    self.remote_candidates[p.remote_index].addr()
705                );
706                p.state = CandidatePairState::Failed;
707            } else {
708                p.binding_request_count += 1;
709                let local = p.local_index;
710                let remote = p.remote_index;
711                pairs.push((local, remote));
712            }
713        }
714
715        if !pairs.is_empty() {
716            trace!(
717                "[{}]: pinging all {} candidates",
718                self.get_name(),
719                pairs.len()
720            );
721        }
722
723        for (local, remote) in pairs {
724            self.ping_candidate(local, remote);
725        }
726    }
727
728    pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
729        let p = CandidatePair::new(
730            local_index,
731            remote_index,
732            self.local_candidates[local_index].priority(),
733            self.remote_candidates[remote_index].priority(),
734            self.is_controlling,
735        );
736        self.candidate_pairs.push(p);
737    }
738
739    pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
740        for (index, p) in self.candidate_pairs.iter().enumerate() {
741            if p.local_index == local_index && p.remote_index == remote_index {
742                return Some(index);
743            }
744        }
745        None
746    }
747
748    /// Checks if the selected pair is (still) valid.
749    /// Note: the caller should hold the agent lock.
750    pub(crate) fn validate_selected_pair(&mut self) -> bool {
751        let (valid, disconnected_time) = {
752            self.selected_pair.as_ref().map_or_else(
753                || (false, Duration::from_secs(0)),
754                |&pair_index| {
755                    let remote_index = self.candidate_pairs[pair_index].remote_index;
756
757                    let disconnected_time = Instant::now()
758                        .duration_since(self.remote_candidates[remote_index].last_received());
759                    (true, disconnected_time)
760                },
761            )
762        };
763
764        if valid {
765            // Only allow transitions to fail if a.failedTimeout is non-zero
766            let mut total_time_to_failure = self.failed_timeout;
767            if total_time_to_failure != Duration::from_secs(0) {
768                total_time_to_failure += self.disconnected_timeout;
769            }
770
771            if total_time_to_failure != Duration::from_secs(0)
772                && disconnected_time > total_time_to_failure
773            {
774                self.update_connection_state(ConnectionState::Failed);
775            } else if self.disconnected_timeout != Duration::from_secs(0)
776                && disconnected_time > self.disconnected_timeout
777            {
778                self.update_connection_state(ConnectionState::Disconnected);
779            } else {
780                self.update_connection_state(ConnectionState::Connected);
781            }
782        }
783
784        valid
785    }
786
787    /// Sends STUN Binding Indications to the selected pair.
788    /// if no packet has been sent on that pair in the last keepaliveInterval.
789    /// Note: the caller should hold the agent lock.
790    pub(crate) fn check_keepalive(&mut self) {
791        let (local_index, remote_index) = {
792            self.selected_pair
793                .as_ref()
794                .map_or((None, None), |&pair_index| {
795                    let p = &self.candidate_pairs[pair_index];
796                    (Some(p.local_index), Some(p.remote_index))
797                })
798        };
799
800        if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
801            let last_sent =
802                Instant::now().duration_since(self.local_candidates[local_index].last_sent());
803
804            let last_received =
805                Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
806
807            if (self.keepalive_interval != Duration::from_secs(0))
808                && ((last_sent > self.keepalive_interval)
809                    || (last_received > self.keepalive_interval))
810            {
811                // we use binding request instead of indication to support refresh consent schemas
812                // see https://tools.ietf.org/html/rfc7675
813                self.ping_candidate(local_index, remote_index);
814            }
815        }
816    }
817
818    fn request_connectivity_check(&mut self) {
819        if self.ufrag_pwd.remote_credentials.is_some() {
820            self.contact(Instant::now());
821        }
822    }
823
824    /// Remove all candidates.
825    /// This closes any listening sockets and removes both the local and remote candidate lists.
826    ///
827    /// This is used for restarts, failures and on close.
828    pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
829        if !keep_local_candidates {
830            self.local_candidates.clear();
831        }
832        self.remote_candidates.clear();
833    }
834
835    pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
836        for (index, c) in self.remote_candidates.iter().enumerate() {
837            if c.addr() == addr {
838                return Some(index);
839            }
840        }
841        None
842    }
843
844    pub(crate) fn find_local_candidate(
845        &self,
846        addr: SocketAddr,
847        transport_protocol: TransportProtocol,
848    ) -> Option<usize> {
849        for (index, c) in self.local_candidates.iter().enumerate() {
850            if c.addr() == addr && c.network_type().to_protocol() == transport_protocol {
851                return Some(index);
852            }
853        }
854        None
855    }
856
857    pub(crate) fn send_binding_request(
858        &mut self,
859        m: &Message,
860        local_index: usize,
861        remote_index: usize,
862    ) {
863        trace!(
864            "[{}]: ping STUN from {} to {}",
865            self.get_name(),
866            self.local_candidates[local_index],
867            self.remote_candidates[remote_index],
868        );
869
870        self.invalidate_pending_binding_requests(Instant::now());
871
872        self.pending_binding_requests.push(BindingRequest {
873            timestamp: Instant::now(),
874            transaction_id: m.transaction_id,
875            destination: self.remote_candidates[remote_index].addr(),
876            is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
877        });
878
879        self.send_stun(m, local_index, remote_index);
880    }
881
882    pub(crate) fn send_binding_success(
883        &mut self,
884        m: &Message,
885        local_index: usize,
886        remote_index: usize,
887    ) {
888        let addr = self.remote_candidates[remote_index].addr();
889        let (ip, port) = (addr.ip(), addr.port());
890        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
891
892        let (out, result) = {
893            let mut out = Message::new();
894            let result = out.build(&[
895                Box::new(m.clone()),
896                Box::new(BINDING_SUCCESS),
897                Box::new(XorMappedAddress { ip, port }),
898                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
899                Box::new(FINGERPRINT),
900            ]);
901            (out, result)
902        };
903
904        if let Err(err) = result {
905            warn!(
906                "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
907                self.get_name(),
908                self.local_candidates[local_index],
909                self.remote_candidates[remote_index],
910                err
911            );
912        } else {
913            self.send_stun(&out, local_index, remote_index);
914        }
915    }
916
917    /// Removes pending binding requests that are over `maxBindingRequestTimeout` old Let HTO be the
918    /// transaction timeout, which SHOULD be 2*RTT if RTT is known or 500 ms otherwise.
919    ///
920    /// reference: (IETF ref-8445)[https://tools.ietf.org/html/rfc8445#appendix-B.1].
921    pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
922        let pending_binding_requests = &mut self.pending_binding_requests;
923        let initial_size = pending_binding_requests.len();
924
925        let mut temp = vec![];
926        for binding_request in pending_binding_requests.drain(..) {
927            if filter_time
928                .checked_duration_since(binding_request.timestamp)
929                .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
930                .unwrap_or(true)
931            {
932                temp.push(binding_request);
933            }
934        }
935
936        *pending_binding_requests = temp;
937        let bind_requests_remaining = pending_binding_requests.len();
938        let bind_requests_removed = initial_size - bind_requests_remaining;
939        if bind_requests_removed > 0 {
940            trace!(
941                "[{}]: Discarded {} binding requests because they expired, still {} remaining",
942                self.get_name(),
943                bind_requests_removed,
944                bind_requests_remaining,
945            );
946        }
947    }
948
949    /// Assert that the passed `TransactionID` is in our `pendingBindingRequests` and returns the
950    /// destination, If the bindingRequest was valid remove it from our pending cache.
951    pub(crate) fn handle_inbound_binding_success(
952        &mut self,
953        id: TransactionId,
954    ) -> Option<BindingRequest> {
955        self.invalidate_pending_binding_requests(Instant::now());
956
957        let pending_binding_requests = &mut self.pending_binding_requests;
958        for i in 0..pending_binding_requests.len() {
959            if pending_binding_requests[i].transaction_id == id {
960                let valid_binding_request = pending_binding_requests.remove(i);
961                return Some(valid_binding_request);
962            }
963        }
964        None
965    }
966
967    /// Processes STUN traffic from a remote candidate.
968    pub(crate) fn handle_inbound(
969        &mut self,
970        m: &mut Message,
971        local_index: usize,
972        remote_addr: SocketAddr,
973    ) -> Result<()> {
974        if m.typ.method != METHOD_BINDING
975            || !(m.typ.class == CLASS_SUCCESS_RESPONSE
976                || m.typ.class == CLASS_REQUEST
977                || m.typ.class == CLASS_INDICATION)
978        {
979            trace!(
980                "[{}]: unhandled STUN from {} to {} class({}) method({})",
981                self.get_name(),
982                remote_addr,
983                self.local_candidates[local_index],
984                m.typ.class,
985                m.typ.method
986            );
987            return Err(Error::ErrUnhandledStunpacket);
988        }
989
990        if self.is_controlling {
991            if m.contains(ATTR_ICE_CONTROLLING) {
992                debug!(
993                    "[{}]: inbound isControlling && a.isControlling == true",
994                    self.get_name(),
995                );
996                return Err(Error::ErrUnexpectedStunrequestMessage);
997            } else if m.contains(ATTR_USE_CANDIDATE) {
998                debug!(
999                    "[{}]: useCandidate && a.isControlling == true",
1000                    self.get_name(),
1001                );
1002                return Err(Error::ErrUnexpectedStunrequestMessage);
1003            }
1004        } else if m.contains(ATTR_ICE_CONTROLLED) {
1005            debug!(
1006                "[{}]: inbound isControlled && a.isControlling == false",
1007                self.get_name(),
1008            );
1009            return Err(Error::ErrUnexpectedStunrequestMessage);
1010        }
1011
1012        let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1013            debug!(
1014                "[{}]: ufrag_pwd.remote_credentials.is_none",
1015                self.get_name(),
1016            );
1017            return Err(Error::ErrPasswordEmpty);
1018        };
1019
1020        let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1021        if m.typ.class == CLASS_SUCCESS_RESPONSE {
1022            if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1023            {
1024                warn!(
1025                    "[{}]: discard message from ({}), {}",
1026                    self.get_name(),
1027                    remote_addr,
1028                    err
1029                );
1030                return Err(err);
1031            }
1032
1033            if let Some(remote_index) = &remote_candidate_index {
1034                self.handle_success_response(m, local_index, *remote_index, remote_addr);
1035            } else {
1036                warn!(
1037                    "[{}]: discard success message from ({}), no such remote",
1038                    self.get_name(),
1039                    remote_addr
1040                );
1041                return Err(Error::ErrUnhandledStunpacket);
1042            }
1043        } else if m.typ.class == CLASS_REQUEST {
1044            {
1045                let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1046                    + ":"
1047                    + remote_credentials.ufrag.as_str();
1048                if let Err(err) = assert_inbound_username(m, &username) {
1049                    warn!(
1050                        "[{}]: discard message from ({}), {}",
1051                        self.get_name(),
1052                        remote_addr,
1053                        err
1054                    );
1055                    return Err(err);
1056                } else if let Err(err) = assert_inbound_message_integrity(
1057                    m,
1058                    self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1059                ) {
1060                    warn!(
1061                        "[{}]: discard message from ({}), {}",
1062                        self.get_name(),
1063                        remote_addr,
1064                        err
1065                    );
1066                    return Err(err);
1067                }
1068            }
1069
1070            if remote_candidate_index.is_none() {
1071                let (ip, port, network_type) =
1072                    (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
1073
1074                let prflx_candidate_config = CandidatePeerReflexiveConfig {
1075                    base_config: CandidateConfig {
1076                        network: network_type.to_string(),
1077                        address: ip.to_string(),
1078                        port,
1079                        component: self.local_candidates[local_index].component(),
1080                        ..CandidateConfig::default()
1081                    },
1082                    rel_addr: "".to_owned(),
1083                    rel_port: 0,
1084                };
1085
1086                match prflx_candidate_config.new_candidate_peer_reflexive() {
1087                    Ok(prflx_candidate) => {
1088                        let _ = self.add_remote_candidate(prflx_candidate);
1089                        remote_candidate_index = Some(self.remote_candidates.len() - 1);
1090                    }
1091                    Err(err) => {
1092                        error!(
1093                            "[{}]: Failed to create new remote prflx candidate ({})",
1094                            self.get_name(),
1095                            err
1096                        );
1097                        return Err(err);
1098                    }
1099                };
1100
1101                debug!(
1102                    "[{}]: adding a new peer-reflexive candidate: {} ",
1103                    self.get_name(),
1104                    remote_addr
1105                );
1106            }
1107
1108            trace!(
1109                "[{}]: inbound STUN (Request) from {} to {}",
1110                self.get_name(),
1111                remote_addr,
1112                self.local_candidates[local_index]
1113            );
1114
1115            if let Some(remote_index) = &remote_candidate_index {
1116                self.handle_binding_request(m, local_index, *remote_index);
1117            }
1118        }
1119
1120        if let Some(remote_index) = remote_candidate_index {
1121            self.remote_candidates[remote_index].seen(false);
1122        }
1123
1124        Ok(())
1125    }
1126
1127    // Processes non STUN traffic from a remote candidate, and returns true if it is an actual
1128    // remote candidate.
1129    pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1130        self.find_remote_candidate(remote_addr)
1131            .is_some_and(|remote_index| {
1132                self.remote_candidates[remote_index].seen(false);
1133                true
1134            })
1135    }
1136
1137    pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1138        let peer_addr = self.remote_candidates[remote_index].addr();
1139        let local_addr = self.local_candidates[local_index].addr();
1140        let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1141            TransportProtocol::TCP
1142        } else {
1143            TransportProtocol::UDP
1144        };
1145
1146        self.write_outs.push_back(TaggedBytesMut {
1147            now: Instant::now(),
1148            transport: TransportContext {
1149                local_addr,
1150                peer_addr,
1151                ecn: None,
1152                transport_protocol,
1153            },
1154            message: BytesMut::from(&msg.raw[..]),
1155        });
1156
1157        self.local_candidates[local_index].seen(true);
1158    }
1159
1160    fn handle_inbound_candidate_msg(
1161        &mut self,
1162        local_index: usize,
1163        msg: TaggedBytesMut,
1164    ) -> Result<()> {
1165        if is_stun_message(&msg.message) {
1166            let mut m = Message {
1167                raw: msg.message.to_vec(),
1168                ..Message::default()
1169            };
1170
1171            if let Err(err) = m.decode() {
1172                warn!(
1173                    "[{}]: Failed to handle decode ICE from {} to {}: {}",
1174                    self.get_name(),
1175                    msg.transport.local_addr,
1176                    msg.transport.peer_addr,
1177                    err
1178                );
1179                Err(err)
1180            } else {
1181                self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1182            }
1183        } else {
1184            if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1185                warn!(
1186                    "[{}]: Discarded message, not a valid remote candidate from {}",
1187                    self.get_name(),
1188                    msg.transport.peer_addr,
1189                );
1190            } else {
1191                warn!(
1192                    "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1193                    self.get_name(),
1194                    msg.transport.peer_addr
1195                );
1196            }
1197            Err(Error::ErrNonStunmessage)
1198        }
1199    }
1200
1201    pub(crate) fn get_name(&self) -> &str {
1202        if self.is_controlling {
1203            "controlling"
1204        } else {
1205            "controlled"
1206        }
1207    }
1208
1209    pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1210        self.selected_pair
1211    }
1212
1213    pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1214        let mut best_pair_index: Option<usize> = None;
1215
1216        for (index, p) in self.candidate_pairs.iter().enumerate() {
1217            if p.state == CandidatePairState::Failed {
1218                continue;
1219            }
1220
1221            if let Some(pair_index) = &mut best_pair_index {
1222                let b = &self.candidate_pairs[*pair_index];
1223                if b.priority() < p.priority() {
1224                    *pair_index = index;
1225                }
1226            } else {
1227                best_pair_index = Some(index);
1228            }
1229        }
1230
1231        best_pair_index
1232    }
1233
1234    pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1235        let mut best_pair_index: Option<usize> = None;
1236
1237        for (index, p) in self.candidate_pairs.iter().enumerate() {
1238            if p.state != CandidatePairState::Succeeded {
1239                continue;
1240            }
1241
1242            if let Some(pair_index) = &mut best_pair_index {
1243                let b = &self.candidate_pairs[*pair_index];
1244                if b.priority() < p.priority() {
1245                    *pair_index = index;
1246                }
1247            } else {
1248                best_pair_index = Some(index);
1249            }
1250        }
1251
1252        best_pair_index
1253    }
1254}