rtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40    pub(crate) timestamp: Instant,
41    pub(crate) transaction_id: TransactionId,
42    pub(crate) destination: SocketAddr,
43    pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47    fn default() -> Self {
48        Self {
49            timestamp: Instant::now(),
50            transaction_id: TransactionId::default(),
51            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52            is_use_candidate: false,
53        }
54    }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59    pub ufrag: String,
60    pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65    pub(crate) local_credentials: Credentials,
66    pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70    let mut username = Username::new(ATTR_USERNAME, String::new());
71    username.get_from(m)?;
72
73    if username.to_string() != expected_username {
74        return Err(Error::Other(format!(
75            "{:?} expected({}) actual({})",
76            Error::ErrMismatchUsername,
77            expected_username,
78            username,
79        )));
80    }
81
82    Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86    let message_integrity_attr = MessageIntegrity(key.to_vec());
87    message_integrity_attr.check(m)
88}
89
90pub enum Event {
91    ConnectionStateChange(ConnectionState),
92    SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93}
94
95/// Represents the ICE agent.
96pub struct Agent {
97    pub(crate) tie_breaker: u64,
98    pub(crate) is_controlling: bool,
99    pub(crate) lite: bool,
100
101    pub(crate) start_time: Instant,
102
103    pub(crate) connection_state: ConnectionState,
104    pub(crate) last_connection_state: ConnectionState,
105
106    //pub(crate) started_ch_tx: Mutex<Option<broadcast::Sender<()>>>,
107    pub(crate) ufrag_pwd: UfragPwd,
108
109    pub(crate) local_candidates: Vec<Candidate>,
110    pub(crate) remote_candidates: Vec<Candidate>,
111    pub(crate) candidate_pairs: Vec<CandidatePair>,
112    pub(crate) nominated_pair: Option<usize>,
113    pub(crate) selected_pair: Option<usize>,
114
115    // LRU of outbound Binding request Transaction IDs
116    pub(crate) pending_binding_requests: Vec<BindingRequest>,
117
118    // the following variables won't be changed after init_with_defaults()
119    pub(crate) insecure_skip_verify: bool,
120    pub(crate) max_binding_requests: u16,
121    pub(crate) host_acceptance_min_wait: Duration,
122    pub(crate) srflx_acceptance_min_wait: Duration,
123    pub(crate) prflx_acceptance_min_wait: Duration,
124    pub(crate) relay_acceptance_min_wait: Duration,
125    // How long connectivity checks can fail before the ICE Agent
126    // goes to disconnected
127    pub(crate) disconnected_timeout: Duration,
128    // How long connectivity checks can fail before the ICE Agent
129    // goes to failed
130    pub(crate) failed_timeout: Duration,
131    // How often should we send keepalive packets?
132    // 0 means never
133    pub(crate) keepalive_interval: Duration,
134    // How often should we run our internal taskLoop to check for state changes when connecting
135    pub(crate) check_interval: Duration,
136    pub(crate) checking_duration: Instant,
137    pub(crate) last_checking_time: Instant,
138
139    pub(crate) mdns: Option<Mdns>,
140    pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
141
142    pub(crate) mdns_mode: MulticastDnsMode,
143    pub(crate) mdns_local_name: String,
144    pub(crate) mdns_local_ip: Option<IpAddr>,
145
146    pub(crate) candidate_types: Vec<CandidateType>,
147    pub(crate) network_types: Vec<NetworkType>,
148    pub(crate) urls: Vec<Url>,
149
150    pub(crate) write_outs: VecDeque<TaggedBytesMut>,
151    pub(crate) event_outs: VecDeque<Event>,
152}
153
154impl Default for Agent {
155    fn default() -> Self {
156        Self {
157            tie_breaker: 0,
158            is_controlling: false,
159            lite: false,
160            start_time: Instant::now(),
161            connection_state: Default::default(),
162            last_connection_state: Default::default(),
163            ufrag_pwd: Default::default(),
164            local_candidates: vec![],
165            remote_candidates: vec![],
166            candidate_pairs: vec![],
167            nominated_pair: None,
168            selected_pair: None,
169            pending_binding_requests: vec![],
170            insecure_skip_verify: false,
171            max_binding_requests: 0,
172            host_acceptance_min_wait: Default::default(),
173            srflx_acceptance_min_wait: Default::default(),
174            prflx_acceptance_min_wait: Default::default(),
175            relay_acceptance_min_wait: Default::default(),
176            disconnected_timeout: Default::default(),
177            failed_timeout: Default::default(),
178            keepalive_interval: Default::default(),
179            check_interval: Default::default(),
180            checking_duration: Instant::now(),
181            last_checking_time: Instant::now(),
182            mdns_mode: MulticastDnsMode::Disabled,
183            mdns_local_name: "".to_owned(),
184            mdns_local_ip: None,
185            mdns_queries: HashMap::new(),
186            mdns: None,
187            candidate_types: vec![],
188            network_types: vec![],
189            urls: vec![],
190            write_outs: Default::default(),
191            event_outs: Default::default(),
192        }
193    }
194}
195
196impl Agent {
197    /// Creates a new Agent.
198    pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
199        let mut mdns_local_name = config.multicast_dns_local_name.clone();
200        if mdns_local_name.is_empty() {
201            mdns_local_name = generate_multicast_dns_name();
202        }
203
204        if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
205            return Err(Error::ErrInvalidMulticastDnshostName);
206        }
207
208        let mdns_mode = config.multicast_dns_mode;
209        let mdns = create_multicast_dns(
210            mdns_mode,
211            &mdns_local_name,
212            &config.multicast_dns_local_ip,
213            &config.multicast_dns_query_timeout,
214        )
215        .unwrap_or_else(|err| {
216            // Opportunistic mDNS: If we can't open the connection, that's ok: we
217            // can continue without it.
218            warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
219            None
220        });
221
222        let candidate_types = if config.candidate_types.is_empty() {
223            default_candidate_types()
224        } else {
225            config.candidate_types.clone()
226        };
227
228        if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
229        {
230            return Err(Error::ErrLiteUsingNonHostCandidates);
231        }
232
233        if !config.urls.is_empty()
234            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
235            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
236        {
237            return Err(Error::ErrUselessUrlsProvided);
238        }
239
240        let mut agent = Self {
241            tie_breaker: rand::random::<u64>(),
242            is_controlling: config.is_controlling,
243            lite: config.lite,
244
245            start_time: Instant::now(),
246
247            nominated_pair: None,
248            selected_pair: None,
249            candidate_pairs: vec![],
250
251            connection_state: ConnectionState::New,
252
253            insecure_skip_verify: config.insecure_skip_verify,
254
255            //started_ch_tx: MuteSome(started_ch_tx)),
256
257            //won't change after init_with_defaults()
258            max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
259                max_binding_requests
260            } else {
261                DEFAULT_MAX_BINDING_REQUESTS
262            },
263            host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
264                config.host_acceptance_min_wait
265            {
266                host_acceptance_min_wait
267            } else {
268                DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
269            },
270            srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
271                config.srflx_acceptance_min_wait
272            {
273                srflx_acceptance_min_wait
274            } else {
275                DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
276            },
277            prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
278                config.prflx_acceptance_min_wait
279            {
280                prflx_acceptance_min_wait
281            } else {
282                DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
283            },
284            relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
285                config.relay_acceptance_min_wait
286            {
287                relay_acceptance_min_wait
288            } else {
289                DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
290            },
291
292            // How long connectivity checks can fail before the ICE Agent
293            // goes to disconnected
294            disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
295                disconnected_timeout
296            } else {
297                DEFAULT_DISCONNECTED_TIMEOUT
298            },
299
300            // How long connectivity checks can fail before the ICE Agent
301            // goes to failed
302            failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
303                failed_timeout
304            } else {
305                DEFAULT_FAILED_TIMEOUT
306            },
307
308            // How often should we send keepalive packets?
309            // 0 means never
310            keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
311                keepalive_interval
312            } else {
313                DEFAULT_KEEPALIVE_INTERVAL
314            },
315
316            // How often should we run our internal taskLoop to check for state changes when connecting
317            check_interval: if config.check_interval == Duration::from_secs(0) {
318                DEFAULT_CHECK_INTERVAL
319            } else {
320                config.check_interval
321            },
322            checking_duration: Instant::now(),
323            last_checking_time: Instant::now(),
324            last_connection_state: ConnectionState::Unspecified,
325
326            mdns,
327            mdns_queries: HashMap::new(),
328
329            mdns_mode,
330            mdns_local_name,
331            mdns_local_ip: config.multicast_dns_local_ip,
332
333            ufrag_pwd: UfragPwd::default(),
334
335            local_candidates: vec![],
336            remote_candidates: vec![],
337
338            // LRU of outbound Binding request Transaction IDs
339            pending_binding_requests: vec![],
340
341            candidate_types,
342            network_types: config.network_types.clone(),
343            urls: config.urls.clone(),
344
345            write_outs: VecDeque::new(),
346            event_outs: VecDeque::new(),
347        };
348
349        // Restart is also used to initialize the agent for the first time
350        if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
351        {
352            let _ = agent.close();
353            return Err(err);
354        }
355
356        Ok(agent)
357    }
358
359    /// Adds a new local candidate.
360    pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
361        // Filter by network type if network_types is configured
362        if !self.network_types.is_empty() {
363            let candidate_network_type = c.network_type();
364            if !self.network_types.contains(&candidate_network_type) {
365                debug!(
366                    "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
367                    candidate_network_type, self.network_types
368                );
369                return Ok(false);
370            }
371        }
372
373        if c.candidate_type() == CandidateType::Host
374            && self.mdns_mode == MulticastDnsMode::QueryAndGather
375            && c.network_type == NetworkType::Udp4
376            && self
377                .mdns_local_ip
378                .is_some_and(|local_ip| local_ip == c.addr().ip())
379        {
380            // only one .local mDNS host candidate per IPv4 is supported
381            // when registered local ip matches, use mdns_local_name to hide local host ip
382            trace!(
383                "mDNS hides local ip {} with local name {}",
384                c.address, self.mdns_local_name
385            );
386            c.address = self.mdns_local_name.clone();
387        }
388
389        for cand in &self.local_candidates {
390            if cand.equal(&c) {
391                return Ok(false);
392            }
393        }
394
395        self.local_candidates.push(c);
396        let local_index = self.local_candidates.len() - 1;
397        let local_tcp_type = self.local_candidates[local_index].tcp_type();
398
399        for remote_index in 0..self.remote_candidates.len() {
400            let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
401
402            // TCP type pairing rules (RFC 6544):
403            // - Active can only pair with Passive
404            // - Passive can only pair with Active
405            // - SimultaneousOpen can pair with SimultaneousOpen
406            // - Unspecified (UDP) can pair with Unspecified
407            let should_pair = match (local_tcp_type, remote_tcp_type) {
408                (TcpType::Active, TcpType::Passive) => true,
409                (TcpType::Passive, TcpType::Active) => true,
410                (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
411                (TcpType::Unspecified, TcpType::Unspecified) => true, // UDP candidates
412                _ => false,
413            };
414
415            if should_pair {
416                self.add_pair(local_index, remote_index);
417            }
418        }
419
420        self.request_connectivity_check();
421
422        Ok(true)
423    }
424
425    /// Adds a new remote candidate.
426    pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
427        // Filter by network type if network_types is configured
428        if !self.network_types.is_empty() {
429            let candidate_network_type = c.network_type();
430            if !self.network_types.contains(&candidate_network_type) {
431                debug!(
432                    "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
433                    candidate_network_type, self.network_types
434                );
435                return Ok(false);
436            }
437        }
438
439        // TCP active candidates don't have a listening port - they initiate connections.
440        // The remote active side will probe our passive candidates, so we don't need
441        // to do anything with remote active candidates.
442        if c.tcp_type() == TcpType::Active {
443            debug!(
444                "Ignoring remote candidate with tcptype active: {}",
445                c.address()
446            );
447            return Ok(false);
448        }
449
450        // If we have a mDNS Candidate lets fully resolve it before adding it locally
451        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
452            if self.mdns_mode == MulticastDnsMode::Disabled {
453                warn!(
454                    "remote mDNS candidate added, but mDNS is disabled: ({})",
455                    c.address()
456                );
457                return Ok(false);
458            }
459
460            if c.candidate_type() != CandidateType::Host {
461                return Err(Error::ErrAddressParseFailed);
462            }
463
464            if let Some(mdns_conn) = &mut self.mdns {
465                let query_id = mdns_conn.query(c.address());
466                self.mdns_queries.insert(query_id, c);
467            }
468
469            return Ok(false);
470        }
471
472        self.trigger_request_connectivity_check(vec![c]);
473        Ok(true)
474    }
475
476    fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
477        for c in remote_candidates {
478            if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
479                let remote_tcp_type = c.tcp_type();
480                self.remote_candidates.push(c);
481                let remote_index = self.remote_candidates.len() - 1;
482
483                // Apply TCP type pairing rules (RFC 6544)
484                for local_index in 0..self.local_candidates.len() {
485                    let local_tcp_type = self.local_candidates[local_index].tcp_type();
486
487                    // TCP type pairing rules:
488                    // - Active can only pair with Passive
489                    // - Passive can only pair with Active
490                    // - SimultaneousOpen can pair with SimultaneousOpen
491                    // - Unspecified (UDP) can pair with Unspecified
492                    let should_pair = match (local_tcp_type, remote_tcp_type) {
493                        (TcpType::Active, TcpType::Passive) => true,
494                        (TcpType::Passive, TcpType::Active) => true,
495                        (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
496                        (TcpType::Unspecified, TcpType::Unspecified) => true, // UDP candidates
497                        _ => false,
498                    };
499
500                    if should_pair {
501                        self.add_pair(local_index, remote_index);
502                    }
503                }
504
505                self.request_connectivity_check();
506            }
507        }
508    }
509
510    /// Sets the credentials of the remote agent.
511    pub fn set_remote_credentials(
512        &mut self,
513        remote_ufrag: String,
514        remote_pwd: String,
515    ) -> Result<()> {
516        if remote_ufrag.is_empty() {
517            return Err(Error::ErrRemoteUfragEmpty);
518        } else if remote_pwd.is_empty() {
519            return Err(Error::ErrRemotePwdEmpty);
520        }
521
522        self.ufrag_pwd.remote_credentials = Some(Credentials {
523            ufrag: remote_ufrag,
524            pwd: remote_pwd,
525        });
526
527        Ok(())
528    }
529
530    /// Returns the remote credentials.
531    pub fn get_remote_credentials(&self) -> Option<&Credentials> {
532        self.ufrag_pwd.remote_credentials.as_ref()
533    }
534
535    /// Returns the local credentials.
536    pub fn get_local_credentials(&self) -> &Credentials {
537        &self.ufrag_pwd.local_credentials
538    }
539
540    pub fn role(&self) -> bool {
541        self.is_controlling
542    }
543
544    pub fn set_role(&mut self, is_controlling: bool) {
545        self.is_controlling = is_controlling;
546    }
547
548    pub fn state(&self) -> ConnectionState {
549        self.connection_state
550    }
551
552    pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
553        self.find_local_candidate(transport.local_addr, transport.transport_protocol)
554            .is_some()
555            && self.validate_non_stun_traffic(transport.peer_addr)
556    }
557
558    fn get_timeout_interval(&self) -> Duration {
559        let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
560            self.check_interval,
561            self.keepalive_interval,
562            self.disconnected_timeout,
563            self.failed_timeout,
564        );
565        let mut interval = DEFAULT_CHECK_INTERVAL;
566
567        let mut update_interval = |x: Duration| {
568            if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
569                interval = x;
570            }
571        };
572
573        match self.last_connection_state {
574            ConnectionState::New | ConnectionState::Checking => {
575                // While connecting, check candidates more frequently
576                update_interval(check_interval);
577            }
578            ConnectionState::Connected | ConnectionState::Disconnected => {
579                update_interval(keepalive_interval);
580            }
581            _ => {}
582        };
583        // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
584        update_interval(disconnected_timeout);
585        update_interval(failed_timeout);
586        interval
587    }
588
589    /// Returns the selected pair (local_candidate, remote_candidate) or none
590    pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
591        if let Some(pair_index) = self.get_selected_pair() {
592            let candidate_pair = &self.candidate_pairs[pair_index];
593            Some((
594                &self.local_candidates[candidate_pair.local_index],
595                &self.remote_candidates[candidate_pair.remote_index],
596            ))
597        } else {
598            None
599        }
600    }
601
602    pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
603        if let Some(pair_index) = self.get_best_available_pair() {
604            let candidate_pair = &self.candidate_pairs[pair_index];
605            Some((
606                &self.local_candidates[candidate_pair.local_index],
607                &self.remote_candidates[candidate_pair.remote_index],
608            ))
609        } else {
610            None
611        }
612    }
613
614    /// start connectivity checks
615    pub fn start_connectivity_checks(
616        &mut self,
617        is_controlling: bool,
618        remote_ufrag: String,
619        remote_pwd: String,
620    ) -> Result<()> {
621        debug!(
622            "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
623            is_controlling, remote_ufrag, remote_pwd
624        );
625        self.set_remote_credentials(remote_ufrag, remote_pwd)?;
626        self.is_controlling = is_controlling;
627        self.start();
628
629        self.update_connection_state(ConnectionState::Checking);
630        self.request_connectivity_check();
631
632        Ok(())
633    }
634
635    /// Restarts the ICE Agent with the provided ufrag/pwd
636    /// If no ufrag/pwd is provided the Agent will generate one itself.
637    pub fn restart(
638        &mut self,
639        mut ufrag: String,
640        mut pwd: String,
641        keep_local_candidates: bool,
642    ) -> Result<()> {
643        if ufrag.is_empty() {
644            ufrag = generate_ufrag();
645        }
646        if pwd.is_empty() {
647            pwd = generate_pwd();
648        }
649
650        if ufrag.len() * 8 < 24 {
651            return Err(Error::ErrLocalUfragInsufficientBits);
652        }
653        if pwd.len() * 8 < 128 {
654            return Err(Error::ErrLocalPwdInsufficientBits);
655        }
656
657        // Clear all agent needed to take back to fresh state
658        self.ufrag_pwd.local_credentials.ufrag = ufrag;
659        self.ufrag_pwd.local_credentials.pwd = pwd;
660        self.ufrag_pwd.remote_credentials = None;
661
662        self.pending_binding_requests = vec![];
663
664        self.candidate_pairs = vec![];
665
666        self.set_selected_pair(None);
667        self.delete_all_candidates(keep_local_candidates);
668        self.start();
669
670        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
671        // for new Agents
672        if self.connection_state != ConnectionState::New {
673            self.update_connection_state(ConnectionState::Checking);
674        }
675
676        Ok(())
677    }
678
679    /// Returns the local candidates.
680    pub fn get_local_candidates(&self) -> &[Candidate] {
681        &self.local_candidates
682    }
683
684    fn contact(&mut self, now: Instant) {
685        if self.connection_state == ConnectionState::Failed {
686            // The connection is currently failed so don't send any checks
687            // In the future it may be restarted though
688            self.last_connection_state = self.connection_state;
689            return;
690        }
691        if self.connection_state == ConnectionState::Checking {
692            // We have just entered checking for the first time so update our checking timer
693            if self.last_connection_state != self.connection_state {
694                self.checking_duration = now;
695            }
696
697            // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
698            if now
699                .checked_duration_since(self.checking_duration)
700                .unwrap_or_else(|| Duration::from_secs(0))
701                > self.disconnected_timeout + self.failed_timeout
702            {
703                self.update_connection_state(ConnectionState::Failed);
704                self.last_connection_state = self.connection_state;
705                return;
706            }
707        }
708
709        self.contact_candidates();
710
711        self.last_connection_state = self.connection_state;
712        self.last_checking_time = now;
713    }
714
715    pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
716        if self.connection_state != new_state {
717            // Connection has gone to failed, release all gathered candidates
718            if new_state == ConnectionState::Failed {
719                self.set_selected_pair(None);
720                self.delete_all_candidates(false);
721            }
722
723            info!(
724                "[{}]: Setting new connection state: {}",
725                self.get_name(),
726                new_state
727            );
728            self.connection_state = new_state;
729            self.event_outs
730                .push_back(Event::ConnectionStateChange(new_state));
731        }
732    }
733
734    pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
735        if let Some(pair_index) = selected_pair {
736            trace!(
737                "[{}]: Set selected candidate pair: {:?}",
738                self.get_name(),
739                self.candidate_pairs[pair_index]
740            );
741
742            self.candidate_pairs[pair_index].nominated = true;
743            self.selected_pair = Some(pair_index);
744
745            self.update_connection_state(ConnectionState::Connected);
746
747            // Notify when the selected pair changes
748            let candidate_pair = &self.candidate_pairs[pair_index];
749            self.event_outs
750                .push_back(Event::SelectedCandidatePairChange(
751                    Box::new(self.local_candidates[candidate_pair.local_index].clone()),
752                    Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
753                ));
754        } else {
755            self.selected_pair = None;
756        }
757    }
758
759    pub(crate) fn ping_all_candidates(&mut self) {
760        let mut pairs: Vec<(usize, usize)> = vec![];
761
762        let name = self.get_name().to_string();
763        if self.candidate_pairs.is_empty() {
764            warn!(
765                "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
766                name,
767            );
768        }
769        for p in &mut self.candidate_pairs {
770            if p.state == CandidatePairState::Waiting {
771                p.state = CandidatePairState::InProgress;
772            } else if p.state != CandidatePairState::InProgress {
773                continue;
774            }
775
776            if p.binding_request_count > self.max_binding_requests {
777                trace!(
778                    "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
779                    name,
780                    *p,
781                    self.local_candidates[p.local_index].addr(),
782                    self.remote_candidates[p.remote_index].addr()
783                );
784                p.state = CandidatePairState::Failed;
785            } else {
786                p.binding_request_count += 1;
787                let local = p.local_index;
788                let remote = p.remote_index;
789                pairs.push((local, remote));
790            }
791        }
792
793        if !pairs.is_empty() {
794            trace!(
795                "[{}]: pinging all {} candidates",
796                self.get_name(),
797                pairs.len()
798            );
799        }
800
801        for (local, remote) in pairs {
802            self.ping_candidate(local, remote);
803        }
804    }
805
806    pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
807        let p = CandidatePair::new(
808            local_index,
809            remote_index,
810            self.local_candidates[local_index].priority(),
811            self.remote_candidates[remote_index].priority(),
812            self.is_controlling,
813        );
814        self.candidate_pairs.push(p);
815    }
816
817    pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
818        for (index, p) in self.candidate_pairs.iter().enumerate() {
819            if p.local_index == local_index && p.remote_index == remote_index {
820                return Some(index);
821            }
822        }
823        None
824    }
825
826    /// Checks if the selected pair is (still) valid.
827    /// Note: the caller should hold the agent lock.
828    pub(crate) fn validate_selected_pair(&mut self) -> bool {
829        let (valid, disconnected_time) = {
830            self.selected_pair.as_ref().map_or_else(
831                || (false, Duration::from_secs(0)),
832                |&pair_index| {
833                    let remote_index = self.candidate_pairs[pair_index].remote_index;
834
835                    let disconnected_time = Instant::now()
836                        .duration_since(self.remote_candidates[remote_index].last_received());
837                    (true, disconnected_time)
838                },
839            )
840        };
841
842        if valid {
843            // Only allow transitions to fail if a.failedTimeout is non-zero
844            let mut total_time_to_failure = self.failed_timeout;
845            if total_time_to_failure != Duration::from_secs(0) {
846                total_time_to_failure += self.disconnected_timeout;
847            }
848
849            if total_time_to_failure != Duration::from_secs(0)
850                && disconnected_time > total_time_to_failure
851            {
852                self.update_connection_state(ConnectionState::Failed);
853            } else if self.disconnected_timeout != Duration::from_secs(0)
854                && disconnected_time > self.disconnected_timeout
855            {
856                self.update_connection_state(ConnectionState::Disconnected);
857            } else {
858                self.update_connection_state(ConnectionState::Connected);
859            }
860        }
861
862        valid
863    }
864
865    /// Sends STUN Binding Indications to the selected pair.
866    /// if no packet has been sent on that pair in the last keepaliveInterval.
867    /// Note: the caller should hold the agent lock.
868    pub(crate) fn check_keepalive(&mut self) {
869        let (local_index, remote_index) = {
870            self.selected_pair
871                .as_ref()
872                .map_or((None, None), |&pair_index| {
873                    let p = &self.candidate_pairs[pair_index];
874                    (Some(p.local_index), Some(p.remote_index))
875                })
876        };
877
878        if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
879            let last_sent =
880                Instant::now().duration_since(self.local_candidates[local_index].last_sent());
881
882            let last_received =
883                Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
884
885            if (self.keepalive_interval != Duration::from_secs(0))
886                && ((last_sent > self.keepalive_interval)
887                    || (last_received > self.keepalive_interval))
888            {
889                // we use binding request instead of indication to support refresh consent schemas
890                // see https://tools.ietf.org/html/rfc7675
891                self.ping_candidate(local_index, remote_index);
892            }
893        }
894    }
895
896    fn request_connectivity_check(&mut self) {
897        if self.ufrag_pwd.remote_credentials.is_some() {
898            self.contact(Instant::now());
899        }
900    }
901
902    /// Remove all candidates.
903    /// This closes any listening sockets and removes both the local and remote candidate lists.
904    ///
905    /// This is used for restarts, failures and on close.
906    pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
907        if !keep_local_candidates {
908            self.local_candidates.clear();
909        }
910        self.remote_candidates.clear();
911    }
912
913    pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
914        for (index, c) in self.remote_candidates.iter().enumerate() {
915            if c.addr() == addr {
916                return Some(index);
917            }
918        }
919        None
920    }
921
922    pub(crate) fn find_local_candidate(
923        &self,
924        addr: SocketAddr,
925        transport_protocol: TransportProtocol,
926    ) -> Option<usize> {
927        for (index, c) in self.local_candidates.iter().enumerate() {
928            if c.network_type().to_protocol() != transport_protocol {
929                continue;
930            }
931
932            // For TCP active candidates, match by IP only (ignore port).
933            // TCP active candidates use port 9 as placeholder in signaling,
934            // but the actual connection uses an ephemeral port.
935            if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
936                if c.addr().ip() == addr.ip() {
937                    return Some(index);
938                }
939            } else if c.addr() == addr {
940                return Some(index);
941            }
942        }
943        None
944    }
945
946    pub(crate) fn send_binding_request(
947        &mut self,
948        m: &Message,
949        local_index: usize,
950        remote_index: usize,
951    ) {
952        trace!(
953            "[{}]: ping STUN from {} to {}",
954            self.get_name(),
955            self.local_candidates[local_index],
956            self.remote_candidates[remote_index],
957        );
958
959        self.invalidate_pending_binding_requests(Instant::now());
960
961        self.pending_binding_requests.push(BindingRequest {
962            timestamp: Instant::now(),
963            transaction_id: m.transaction_id,
964            destination: self.remote_candidates[remote_index].addr(),
965            is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
966        });
967
968        self.send_stun(m, local_index, remote_index);
969    }
970
971    pub(crate) fn send_binding_success(
972        &mut self,
973        m: &Message,
974        local_index: usize,
975        remote_index: usize,
976    ) {
977        let addr = self.remote_candidates[remote_index].addr();
978        let (ip, port) = (addr.ip(), addr.port());
979        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
980
981        let (out, result) = {
982            let mut out = Message::new();
983            let result = out.build(&[
984                Box::new(m.clone()),
985                Box::new(BINDING_SUCCESS),
986                Box::new(XorMappedAddress { ip, port }),
987                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
988                Box::new(FINGERPRINT),
989            ]);
990            (out, result)
991        };
992
993        if let Err(err) = result {
994            warn!(
995                "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
996                self.get_name(),
997                self.local_candidates[local_index],
998                self.remote_candidates[remote_index],
999                err
1000            );
1001        } else {
1002            self.send_stun(&out, local_index, remote_index);
1003        }
1004    }
1005
1006    /// Removes pending binding requests that are over `maxBindingRequestTimeout` old Let HTO be the
1007    /// transaction timeout, which SHOULD be 2*RTT if RTT is known or 500 ms otherwise.
1008    ///
1009    /// reference: (IETF ref-8445)[https://tools.ietf.org/html/rfc8445#appendix-B.1].
1010    pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1011        let pending_binding_requests = &mut self.pending_binding_requests;
1012        let initial_size = pending_binding_requests.len();
1013
1014        let mut temp = vec![];
1015        for binding_request in pending_binding_requests.drain(..) {
1016            if filter_time
1017                .checked_duration_since(binding_request.timestamp)
1018                .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1019                .unwrap_or(true)
1020            {
1021                temp.push(binding_request);
1022            }
1023        }
1024
1025        *pending_binding_requests = temp;
1026        let bind_requests_remaining = pending_binding_requests.len();
1027        let bind_requests_removed = initial_size - bind_requests_remaining;
1028        if bind_requests_removed > 0 {
1029            trace!(
1030                "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1031                self.get_name(),
1032                bind_requests_removed,
1033                bind_requests_remaining,
1034            );
1035        }
1036    }
1037
1038    /// Assert that the passed `TransactionID` is in our `pendingBindingRequests` and returns the
1039    /// destination, If the bindingRequest was valid remove it from our pending cache.
1040    pub(crate) fn handle_inbound_binding_success(
1041        &mut self,
1042        id: TransactionId,
1043    ) -> Option<BindingRequest> {
1044        self.invalidate_pending_binding_requests(Instant::now());
1045
1046        let pending_binding_requests = &mut self.pending_binding_requests;
1047        for i in 0..pending_binding_requests.len() {
1048            if pending_binding_requests[i].transaction_id == id {
1049                let valid_binding_request = pending_binding_requests.remove(i);
1050                return Some(valid_binding_request);
1051            }
1052        }
1053        None
1054    }
1055
1056    /// Processes STUN traffic from a remote candidate.
1057    pub(crate) fn handle_inbound(
1058        &mut self,
1059        m: &mut Message,
1060        local_index: usize,
1061        remote_addr: SocketAddr,
1062    ) -> Result<()> {
1063        if m.typ.method != METHOD_BINDING
1064            || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1065                || m.typ.class == CLASS_REQUEST
1066                || m.typ.class == CLASS_INDICATION)
1067        {
1068            trace!(
1069                "[{}]: unhandled STUN from {} to {} class({}) method({})",
1070                self.get_name(),
1071                remote_addr,
1072                self.local_candidates[local_index],
1073                m.typ.class,
1074                m.typ.method
1075            );
1076            return Err(Error::ErrUnhandledStunpacket);
1077        }
1078
1079        if self.is_controlling {
1080            if m.contains(ATTR_ICE_CONTROLLING) {
1081                debug!(
1082                    "[{}]: inbound isControlling && a.isControlling == true",
1083                    self.get_name(),
1084                );
1085                return Err(Error::ErrUnexpectedStunrequestMessage);
1086            } else if m.contains(ATTR_USE_CANDIDATE) {
1087                debug!(
1088                    "[{}]: useCandidate && a.isControlling == true",
1089                    self.get_name(),
1090                );
1091                return Err(Error::ErrUnexpectedStunrequestMessage);
1092            }
1093        } else if m.contains(ATTR_ICE_CONTROLLED) {
1094            debug!(
1095                "[{}]: inbound isControlled && a.isControlling == false",
1096                self.get_name(),
1097            );
1098            return Err(Error::ErrUnexpectedStunrequestMessage);
1099        }
1100
1101        let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1102            debug!(
1103                "[{}]: ufrag_pwd.remote_credentials.is_none",
1104                self.get_name(),
1105            );
1106            return Err(Error::ErrPasswordEmpty);
1107        };
1108
1109        let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1110        if m.typ.class == CLASS_SUCCESS_RESPONSE {
1111            if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1112            {
1113                warn!(
1114                    "[{}]: discard message from ({}), {}",
1115                    self.get_name(),
1116                    remote_addr,
1117                    err
1118                );
1119                return Err(err);
1120            }
1121
1122            if let Some(remote_index) = &remote_candidate_index {
1123                self.handle_success_response(m, local_index, *remote_index, remote_addr);
1124            } else {
1125                warn!(
1126                    "[{}]: discard success message from ({}), no such remote",
1127                    self.get_name(),
1128                    remote_addr
1129                );
1130                return Err(Error::ErrUnhandledStunpacket);
1131            }
1132        } else if m.typ.class == CLASS_REQUEST {
1133            {
1134                let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1135                    + ":"
1136                    + remote_credentials.ufrag.as_str();
1137                if let Err(err) = assert_inbound_username(m, &username) {
1138                    warn!(
1139                        "[{}]: discard message from ({}), {}",
1140                        self.get_name(),
1141                        remote_addr,
1142                        err
1143                    );
1144                    return Err(err);
1145                } else if let Err(err) = assert_inbound_message_integrity(
1146                    m,
1147                    self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1148                ) {
1149                    warn!(
1150                        "[{}]: discard message from ({}), {}",
1151                        self.get_name(),
1152                        remote_addr,
1153                        err
1154                    );
1155                    return Err(err);
1156                }
1157            }
1158
1159            if remote_candidate_index.is_none() {
1160                // Use the local candidate's network type for the peer-reflexive candidate
1161                let network_type = self.local_candidates[local_index].network_type();
1162                let (ip, port) = (remote_addr.ip(), remote_addr.port());
1163
1164                let prflx_candidate_config = CandidatePeerReflexiveConfig {
1165                    base_config: CandidateConfig {
1166                        network: network_type.to_string(),
1167                        address: ip.to_string(),
1168                        port,
1169                        component: self.local_candidates[local_index].component(),
1170                        ..CandidateConfig::default()
1171                    },
1172                    rel_addr: "".to_owned(),
1173                    rel_port: 0,
1174                };
1175
1176                match prflx_candidate_config.new_candidate_peer_reflexive() {
1177                    Ok(prflx_candidate) => {
1178                        if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1179                            && added
1180                        {
1181                            remote_candidate_index = Some(self.remote_candidates.len() - 1);
1182                        }
1183                    }
1184                    Err(err) => {
1185                        error!(
1186                            "[{}]: Failed to create new remote prflx candidate ({})",
1187                            self.get_name(),
1188                            err
1189                        );
1190                        return Err(err);
1191                    }
1192                };
1193
1194                debug!(
1195                    "[{}]: adding a new peer-reflexive candidate: {} ",
1196                    self.get_name(),
1197                    remote_addr
1198                );
1199            }
1200
1201            trace!(
1202                "[{}]: inbound STUN (Request) from {} to {}",
1203                self.get_name(),
1204                remote_addr,
1205                self.local_candidates[local_index]
1206            );
1207
1208            if let Some(remote_index) = &remote_candidate_index {
1209                self.handle_binding_request(m, local_index, *remote_index);
1210            }
1211        }
1212
1213        if let Some(remote_index) = remote_candidate_index {
1214            self.remote_candidates[remote_index].seen(false);
1215        }
1216
1217        Ok(())
1218    }
1219
1220    // Processes non STUN traffic from a remote candidate, and returns true if it is an actual
1221    // remote candidate.
1222    pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1223        self.find_remote_candidate(remote_addr)
1224            .is_some_and(|remote_index| {
1225                self.remote_candidates[remote_index].seen(false);
1226                true
1227            })
1228    }
1229
1230    pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1231        let peer_addr = self.remote_candidates[remote_index].addr();
1232        let local_addr = self.local_candidates[local_index].addr();
1233        let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1234            TransportProtocol::TCP
1235        } else {
1236            TransportProtocol::UDP
1237        };
1238
1239        self.write_outs.push_back(TaggedBytesMut {
1240            now: Instant::now(),
1241            transport: TransportContext {
1242                local_addr,
1243                peer_addr,
1244                ecn: None,
1245                transport_protocol,
1246            },
1247            message: BytesMut::from(&msg.raw[..]),
1248        });
1249
1250        self.local_candidates[local_index].seen(true);
1251    }
1252
1253    fn handle_inbound_candidate_msg(
1254        &mut self,
1255        local_index: usize,
1256        msg: TaggedBytesMut,
1257    ) -> Result<()> {
1258        if is_stun_message(&msg.message) {
1259            let mut m = Message {
1260                raw: msg.message.to_vec(),
1261                ..Message::default()
1262            };
1263
1264            if let Err(err) = m.decode() {
1265                warn!(
1266                    "[{}]: Failed to handle decode ICE from {} to {}: {}",
1267                    self.get_name(),
1268                    msg.transport.local_addr,
1269                    msg.transport.peer_addr,
1270                    err
1271                );
1272                Err(err)
1273            } else {
1274                self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1275            }
1276        } else {
1277            if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1278                warn!(
1279                    "[{}]: Discarded message, not a valid remote candidate from {}",
1280                    self.get_name(),
1281                    msg.transport.peer_addr,
1282                );
1283            } else {
1284                warn!(
1285                    "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1286                    self.get_name(),
1287                    msg.transport.peer_addr
1288                );
1289            }
1290            Err(Error::ErrNonStunmessage)
1291        }
1292    }
1293
1294    pub(crate) fn get_name(&self) -> &str {
1295        if self.is_controlling {
1296            "controlling"
1297        } else {
1298            "controlled"
1299        }
1300    }
1301
1302    pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1303        self.selected_pair
1304    }
1305
1306    pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1307        let mut best_pair_index: Option<usize> = None;
1308
1309        for (index, p) in self.candidate_pairs.iter().enumerate() {
1310            if p.state == CandidatePairState::Failed {
1311                continue;
1312            }
1313
1314            if let Some(pair_index) = &mut best_pair_index {
1315                let b = &self.candidate_pairs[*pair_index];
1316                if b.priority() < p.priority() {
1317                    *pair_index = index;
1318                }
1319            } else {
1320                best_pair_index = Some(index);
1321            }
1322        }
1323
1324        best_pair_index
1325    }
1326
1327    pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1328        let mut best_pair_index: Option<usize> = None;
1329
1330        for (index, p) in self.candidate_pairs.iter().enumerate() {
1331            if p.state != CandidatePairState::Succeeded {
1332                continue;
1333            }
1334
1335            if let Some(pair_index) = &mut best_pair_index {
1336                let b = &self.candidate_pairs[*pair_index];
1337                if b.priority() < p.priority() {
1338                    *pair_index = index;
1339                }
1340            } else {
1341                best_pair_index = Some(index);
1342            }
1343        }
1344
1345        best_pair_index
1346    }
1347}