rtc_ice/agent/
mod.rs

1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5pub mod agent_selector;
6pub mod agent_stats;
7
8use agent_config::*;
9use bytes::BytesMut;
10use log::{debug, error, info, trace, warn};
11use std::collections::VecDeque;
12use std::net::{Ipv4Addr, SocketAddr};
13use std::time::{Duration, Instant};
14use stun::attributes::*;
15use stun::fingerprint::*;
16use stun::integrity::*;
17use stun::message::*;
18use stun::textattrs::*;
19use stun::xoraddr::*;
20
21use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
22use crate::candidate::{candidate_pair::*, *};
23use crate::network_type::NetworkType;
24use crate::rand::*;
25use crate::state::*;
26use crate::url::*;
27use shared::error::*;
28use shared::{Protocol, Transmit, TransportContext};
29
30const ZERO_DURATION: Duration = Duration::from_secs(0);
31
32#[derive(Debug, Clone)]
33pub(crate) struct BindingRequest {
34    pub(crate) timestamp: Instant,
35    pub(crate) transaction_id: TransactionId,
36    pub(crate) destination: SocketAddr,
37    pub(crate) is_use_candidate: bool,
38}
39
40impl Default for BindingRequest {
41    fn default() -> Self {
42        Self {
43            timestamp: Instant::now(),
44            transaction_id: TransactionId::default(),
45            destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
46            is_use_candidate: false,
47        }
48    }
49}
50
51#[derive(Default, Clone)]
52pub struct Credentials {
53    pub ufrag: String,
54    pub pwd: String,
55}
56
57#[derive(Default, Clone)]
58pub(crate) struct UfragPwd {
59    pub(crate) local_credentials: Credentials,
60    pub(crate) remote_credentials: Option<Credentials>,
61}
62
63fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
64    let mut username = Username::new(ATTR_USERNAME, String::new());
65    username.get_from(m)?;
66
67    if username.to_string() != expected_username {
68        return Err(Error::Other(format!(
69            "{:?} expected({}) actual({})",
70            Error::ErrMismatchUsername,
71            expected_username,
72            username,
73        )));
74    }
75
76    Ok(())
77}
78
79fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
80    let message_integrity_attr = MessageIntegrity(key.to_vec());
81    message_integrity_attr.check(m)
82}
83
84pub enum Event {
85    ConnectionStateChange(ConnectionState),
86    SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
87}
88
89/// Represents the ICE agent.
90pub struct Agent {
91    pub(crate) tie_breaker: u64,
92    pub(crate) is_controlling: bool,
93    pub(crate) lite: bool,
94
95    pub(crate) start_time: Instant,
96
97    pub(crate) connection_state: ConnectionState,
98    pub(crate) last_connection_state: ConnectionState,
99
100    //pub(crate) started_ch_tx: Mutex<Option<broadcast::Sender<()>>>,
101    pub(crate) ufrag_pwd: UfragPwd,
102
103    pub(crate) local_candidates: Vec<Candidate>,
104    pub(crate) remote_candidates: Vec<Candidate>,
105    pub(crate) candidate_pairs: Vec<CandidatePair>,
106    pub(crate) nominated_pair: Option<usize>,
107    pub(crate) selected_pair: Option<usize>,
108
109    // LRU of outbound Binding request Transaction IDs
110    pub(crate) pending_binding_requests: Vec<BindingRequest>,
111
112    // the following variables won't be changed after init_with_defaults()
113    pub(crate) insecure_skip_verify: bool,
114    pub(crate) max_binding_requests: u16,
115    pub(crate) host_acceptance_min_wait: Duration,
116    pub(crate) srflx_acceptance_min_wait: Duration,
117    pub(crate) prflx_acceptance_min_wait: Duration,
118    pub(crate) relay_acceptance_min_wait: Duration,
119    // How long connectivity checks can fail before the ICE Agent
120    // goes to disconnected
121    pub(crate) disconnected_timeout: Duration,
122    // How long connectivity checks can fail before the ICE Agent
123    // goes to failed
124    pub(crate) failed_timeout: Duration,
125    // How often should we send keepalive packets?
126    // 0 means never
127    pub(crate) keepalive_interval: Duration,
128    // How often should we run our internal taskLoop to check for state changes when connecting
129    pub(crate) check_interval: Duration,
130    pub(crate) checking_duration: Instant,
131    pub(crate) last_checking_time: Instant,
132
133    pub(crate) candidate_types: Vec<CandidateType>,
134    pub(crate) urls: Vec<Url>,
135
136    pub(crate) transmits: VecDeque<Transmit<BytesMut>>,
137    pub(crate) events: VecDeque<Event>,
138}
139
140impl Agent {
141    /// Creates a new Agent.
142    pub fn new(config: AgentConfig) -> Result<Self> {
143        let candidate_types = if config.candidate_types.is_empty() {
144            default_candidate_types()
145        } else {
146            config.candidate_types.clone()
147        };
148
149        if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
150        {
151            return Err(Error::ErrLiteUsingNonHostCandidates);
152        }
153
154        if !config.urls.is_empty()
155            && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
156            && !contains_candidate_type(CandidateType::Relay, &candidate_types)
157        {
158            return Err(Error::ErrUselessUrlsProvided);
159        }
160
161        let mut agent = Self {
162            tie_breaker: rand::random::<u64>(),
163            is_controlling: config.is_controlling,
164            lite: config.lite,
165
166            start_time: Instant::now(),
167
168            nominated_pair: None,
169            selected_pair: None,
170            candidate_pairs: vec![],
171
172            connection_state: ConnectionState::New,
173
174            insecure_skip_verify: config.insecure_skip_verify,
175
176            //started_ch_tx: MuteSome(started_ch_tx)),
177
178            //won't change after init_with_defaults()
179            max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
180                max_binding_requests
181            } else {
182                DEFAULT_MAX_BINDING_REQUESTS
183            },
184            host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
185                config.host_acceptance_min_wait
186            {
187                host_acceptance_min_wait
188            } else {
189                DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
190            },
191            srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
192                config.srflx_acceptance_min_wait
193            {
194                srflx_acceptance_min_wait
195            } else {
196                DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
197            },
198            prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
199                config.prflx_acceptance_min_wait
200            {
201                prflx_acceptance_min_wait
202            } else {
203                DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
204            },
205            relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
206                config.relay_acceptance_min_wait
207            {
208                relay_acceptance_min_wait
209            } else {
210                DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
211            },
212
213            // How long connectivity checks can fail before the ICE Agent
214            // goes to disconnected
215            disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
216                disconnected_timeout
217            } else {
218                DEFAULT_DISCONNECTED_TIMEOUT
219            },
220
221            // How long connectivity checks can fail before the ICE Agent
222            // goes to failed
223            failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
224                failed_timeout
225            } else {
226                DEFAULT_FAILED_TIMEOUT
227            },
228
229            // How often should we send keepalive packets?
230            // 0 means never
231            keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
232                keepalive_interval
233            } else {
234                DEFAULT_KEEPALIVE_INTERVAL
235            },
236
237            // How often should we run our internal taskLoop to check for state changes when connecting
238            check_interval: if config.check_interval == Duration::from_secs(0) {
239                DEFAULT_CHECK_INTERVAL
240            } else {
241                config.check_interval
242            },
243            checking_duration: Instant::now(),
244            last_checking_time: Instant::now(),
245            last_connection_state: ConnectionState::Unspecified,
246
247            ufrag_pwd: UfragPwd::default(),
248
249            local_candidates: vec![],
250            remote_candidates: vec![],
251
252            // LRU of outbound Binding request Transaction IDs
253            pending_binding_requests: vec![],
254
255            candidate_types,
256            urls: config.urls.clone(),
257
258            transmits: VecDeque::new(),
259            events: VecDeque::new(),
260        };
261
262        // Restart is also used to initialize the agent for the first time
263        if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd, false) {
264            let _ = agent.close();
265            return Err(err);
266        }
267
268        Ok(agent)
269    }
270
271    /// Adds a new local candidate.
272    pub fn add_local_candidate(&mut self, c: Candidate) -> Result<()> {
273        for cand in &self.local_candidates {
274            if cand.equal(&c) {
275                return Ok(());
276            }
277        }
278
279        self.local_candidates.push(c);
280
281        for remote_index in 0..self.remote_candidates.len() {
282            self.add_pair(self.local_candidates.len() - 1, remote_index);
283        }
284
285        self.request_connectivity_check();
286
287        Ok(())
288    }
289
290    /// Adds a new remote candidate.
291    pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
292        // If we have a mDNS Candidate lets fully resolve it before adding it locally
293        if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
294            warn!(
295                "remote mDNS candidate added, but mDNS is disabled: ({})",
296                c.address()
297            );
298            return Err(Error::ErrMulticastDnsNotSupported);
299        }
300
301        for cand in &self.remote_candidates {
302            if cand.equal(&c) {
303                return Ok(());
304            }
305        }
306
307        self.remote_candidates.push(c);
308
309        for local_index in 0..self.local_candidates.len() {
310            self.add_pair(local_index, self.remote_candidates.len() - 1);
311        }
312
313        self.request_connectivity_check();
314
315        Ok(())
316    }
317
318    /// Sets the credentials of the remote agent.
319    pub fn set_remote_credentials(
320        &mut self,
321        remote_ufrag: String,
322        remote_pwd: String,
323    ) -> Result<()> {
324        if remote_ufrag.is_empty() {
325            return Err(Error::ErrRemoteUfragEmpty);
326        } else if remote_pwd.is_empty() {
327            return Err(Error::ErrRemotePwdEmpty);
328        }
329
330        self.ufrag_pwd.remote_credentials = Some(Credentials {
331            ufrag: remote_ufrag,
332            pwd: remote_pwd,
333        });
334
335        Ok(())
336    }
337
338    /// Returns the remote credentials.
339    pub async fn get_remote_credentials(&self) -> Option<&Credentials> {
340        self.ufrag_pwd.remote_credentials.as_ref()
341    }
342
343    /// Returns the local credentials.
344    pub fn get_local_credentials(&self) -> &Credentials {
345        &self.ufrag_pwd.local_credentials
346    }
347
348    pub fn handle_read(&mut self, msg: Transmit<BytesMut>) -> Result<()> {
349        if let Some(local_index) =
350            self.find_local_candidate(msg.transport.local_addr, msg.transport.protocol)
351        {
352            self.handle_inbound_candidate_msg(
353                local_index,
354                &msg.message,
355                msg.transport.peer_addr,
356                msg.transport.local_addr,
357            )
358        } else {
359            warn!(
360                "[{}]: Discarded message, not a valid local candidate from {:?}:{}",
361                self.get_name(),
362                msg.transport.protocol,
363                msg.transport.local_addr,
364            );
365            Err(Error::ErrUnhandledStunpacket)
366        }
367    }
368
369    pub fn poll_transmit(&mut self) -> Option<Transmit<BytesMut>> {
370        self.transmits.pop_front()
371    }
372
373    pub fn handle_timeout(&mut self, now: Instant) {
374        if self.ufrag_pwd.remote_credentials.is_some()
375            && self.last_checking_time + self.get_timeout_interval() <= now
376        {
377            self.contact(now);
378        }
379    }
380
381    pub fn poll_timeout(&self) -> Option<Instant> {
382        if self.ufrag_pwd.remote_credentials.is_some() {
383            Some(self.last_checking_time + self.get_timeout_interval())
384        } else {
385            None
386        }
387    }
388
389    pub fn poll_event(&mut self) -> Option<Event> {
390        self.events.pop_front()
391    }
392
393    fn get_timeout_interval(&self) -> Duration {
394        let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
395            self.check_interval,
396            self.keepalive_interval,
397            self.disconnected_timeout,
398            self.failed_timeout,
399        );
400        let mut interval = DEFAULT_CHECK_INTERVAL;
401
402        let mut update_interval = |x: Duration| {
403            if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
404                interval = x;
405            }
406        };
407
408        match self.last_connection_state {
409            ConnectionState::New | ConnectionState::Checking => {
410                // While connecting, check candidates more frequently
411                update_interval(check_interval);
412            }
413            ConnectionState::Connected | ConnectionState::Disconnected => {
414                update_interval(keepalive_interval);
415            }
416            _ => {}
417        };
418        // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
419        update_interval(disconnected_timeout);
420        update_interval(failed_timeout);
421        interval
422    }
423
424    /// Cleans up the Agent.
425    pub fn close(&mut self) -> Result<()> {
426        self.set_selected_pair(None);
427        self.delete_all_candidates(false);
428        self.update_connection_state(ConnectionState::Closed);
429
430        Ok(())
431    }
432
433    /// Returns the selected pair (local_candidate, remote_candidate) or none
434    pub fn get_selected_candidate_pair(&self) -> Option<(Candidate, Candidate)> {
435        if let Some(pair_index) = self.get_selected_pair() {
436            let candidate_pair = &self.candidate_pairs[pair_index];
437            Some((
438                self.local_candidates[candidate_pair.local_index].clone(),
439                self.remote_candidates[candidate_pair.remote_index].clone(),
440            ))
441        } else {
442            None
443        }
444    }
445
446    /// start connectivity checks
447    pub fn start_connectivity_checks(
448        &mut self,
449        is_controlling: bool,
450        remote_ufrag: String,
451        remote_pwd: String,
452    ) -> Result<()> {
453        debug!(
454            "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
455            is_controlling, remote_ufrag, remote_pwd
456        );
457        self.set_remote_credentials(remote_ufrag, remote_pwd)?;
458        self.is_controlling = is_controlling;
459        self.start();
460
461        self.update_connection_state(ConnectionState::Checking);
462        self.request_connectivity_check();
463
464        Ok(())
465    }
466
467    /// Restarts the ICE Agent with the provided ufrag/pwd
468    /// If no ufrag/pwd is provided the Agent will generate one itself.
469    pub fn restart(
470        &mut self,
471        mut ufrag: String,
472        mut pwd: String,
473        keep_local_candidates: bool,
474    ) -> Result<()> {
475        if ufrag.is_empty() {
476            ufrag = generate_ufrag();
477        }
478        if pwd.is_empty() {
479            pwd = generate_pwd();
480        }
481
482        if ufrag.len() * 8 < 24 {
483            return Err(Error::ErrLocalUfragInsufficientBits);
484        }
485        if pwd.len() * 8 < 128 {
486            return Err(Error::ErrLocalPwdInsufficientBits);
487        }
488
489        // Clear all agent needed to take back to fresh state
490        self.ufrag_pwd.local_credentials.ufrag = ufrag;
491        self.ufrag_pwd.local_credentials.pwd = pwd;
492        self.ufrag_pwd.remote_credentials = None;
493
494        self.pending_binding_requests = vec![];
495
496        self.candidate_pairs = vec![];
497
498        self.set_selected_pair(None);
499        self.delete_all_candidates(keep_local_candidates);
500        self.start();
501
502        // Restart is used by NewAgent. Accept/Connect should be used to move to checking
503        // for new Agents
504        if self.connection_state != ConnectionState::New {
505            self.update_connection_state(ConnectionState::Checking);
506        }
507
508        Ok(())
509    }
510
511    // Returns the local candidates.
512    pub(crate) fn get_local_candidates(&self) -> &[Candidate] {
513        &self.local_candidates
514    }
515
516    fn contact(&mut self, now: Instant) {
517        if self.connection_state == ConnectionState::Failed {
518            // The connection is currently failed so don't send any checks
519            // In the future it may be restarted though
520            self.last_connection_state = self.connection_state;
521            return;
522        }
523        if self.connection_state == ConnectionState::Checking {
524            // We have just entered checking for the first time so update our checking timer
525            if self.last_connection_state != self.connection_state {
526                self.checking_duration = now;
527            }
528
529            // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
530            if now
531                .checked_duration_since(self.checking_duration)
532                .unwrap_or_else(|| Duration::from_secs(0))
533                > self.disconnected_timeout + self.failed_timeout
534            {
535                self.update_connection_state(ConnectionState::Failed);
536                self.last_connection_state = self.connection_state;
537                return;
538            }
539        }
540
541        self.contact_candidates();
542
543        self.last_connection_state = self.connection_state;
544        self.last_checking_time = now;
545    }
546
547    pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
548        if self.connection_state != new_state {
549            // Connection has gone to failed, release all gathered candidates
550            if new_state == ConnectionState::Failed {
551                self.set_selected_pair(None);
552                self.delete_all_candidates(false);
553            }
554
555            info!(
556                "[{}]: Setting new connection state: {}",
557                self.get_name(),
558                new_state
559            );
560            self.connection_state = new_state;
561            self.events
562                .push_back(Event::ConnectionStateChange(new_state));
563        }
564    }
565
566    pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
567        if let Some(pair_index) = selected_pair {
568            trace!(
569                "[{}]: Set selected candidate pair: {:?}",
570                self.get_name(),
571                self.candidate_pairs[pair_index]
572            );
573
574            self.candidate_pairs[pair_index].nominated = true;
575            self.selected_pair = Some(pair_index);
576
577            self.update_connection_state(ConnectionState::Connected);
578
579            // Notify when the selected pair changes
580            let candidate_pair = &self.candidate_pairs[pair_index];
581            self.events.push_back(Event::SelectedCandidatePairChange(
582                Box::new(self.local_candidates[candidate_pair.local_index].clone()),
583                Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
584            ));
585        } else {
586            self.selected_pair = None;
587        }
588    }
589
590    pub(crate) fn ping_all_candidates(&mut self) {
591        trace!("[{}]: pinging all candidates", self.get_name(),);
592
593        let mut pairs: Vec<(usize, usize)> = vec![];
594
595        {
596            let name = self.get_name().to_string();
597            if self.candidate_pairs.is_empty() {
598                warn!(
599                "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
600                name,
601            );
602            }
603            for p in &mut self.candidate_pairs {
604                if p.state == CandidatePairState::Waiting {
605                    p.state = CandidatePairState::InProgress;
606                } else if p.state != CandidatePairState::InProgress {
607                    continue;
608                }
609
610                if p.binding_request_count > self.max_binding_requests {
611                    trace!(
612                        "[{}]: max requests reached for pair {}, marking it as failed",
613                        name,
614                        *p
615                    );
616                    p.state = CandidatePairState::Failed;
617                } else {
618                    p.binding_request_count += 1;
619                    let local = p.local_index;
620                    let remote = p.remote_index;
621                    pairs.push((local, remote));
622                }
623            }
624        }
625
626        for (local, remote) in pairs {
627            self.ping_candidate(local, remote);
628        }
629    }
630
631    pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
632        let p = CandidatePair::new(
633            local_index,
634            remote_index,
635            self.local_candidates[local_index].priority(),
636            self.remote_candidates[remote_index].priority(),
637            self.is_controlling,
638        );
639        self.candidate_pairs.push(p);
640    }
641
642    pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
643        for (index, p) in self.candidate_pairs.iter().enumerate() {
644            if p.local_index == local_index && p.remote_index == remote_index {
645                return Some(index);
646            }
647        }
648        None
649    }
650
651    /// Checks if the selected pair is (still) valid.
652    /// Note: the caller should hold the agent lock.
653    pub(crate) fn validate_selected_pair(&mut self) -> bool {
654        let (valid, disconnected_time) = {
655            self.selected_pair.as_ref().map_or_else(
656                || (false, Duration::from_secs(0)),
657                |&pair_index| {
658                    let remote_index = self.candidate_pairs[pair_index].remote_index;
659
660                    let disconnected_time = Instant::now()
661                        .duration_since(self.remote_candidates[remote_index].last_received());
662                    (true, disconnected_time)
663                },
664            )
665        };
666
667        if valid {
668            // Only allow transitions to fail if a.failedTimeout is non-zero
669            let mut total_time_to_failure = self.failed_timeout;
670            if total_time_to_failure != Duration::from_secs(0) {
671                total_time_to_failure += self.disconnected_timeout;
672            }
673
674            if total_time_to_failure != Duration::from_secs(0)
675                && disconnected_time > total_time_to_failure
676            {
677                self.update_connection_state(ConnectionState::Failed);
678            } else if self.disconnected_timeout != Duration::from_secs(0)
679                && disconnected_time > self.disconnected_timeout
680            {
681                self.update_connection_state(ConnectionState::Disconnected);
682            } else {
683                self.update_connection_state(ConnectionState::Connected);
684            }
685        }
686
687        valid
688    }
689
690    /// Sends STUN Binding Indications to the selected pair.
691    /// if no packet has been sent on that pair in the last keepaliveInterval.
692    /// Note: the caller should hold the agent lock.
693    pub(crate) fn check_keepalive(&mut self) {
694        let (local_index, remote_index) = {
695            self.selected_pair
696                .as_ref()
697                .map_or((None, None), |&pair_index| {
698                    let p = &self.candidate_pairs[pair_index];
699                    (Some(p.local_index), Some(p.remote_index))
700                })
701        };
702
703        if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
704            let last_sent =
705                Instant::now().duration_since(self.local_candidates[local_index].last_sent());
706
707            let last_received =
708                Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
709
710            if (self.keepalive_interval != Duration::from_secs(0))
711                && ((last_sent > self.keepalive_interval)
712                    || (last_received > self.keepalive_interval))
713            {
714                // we use binding request instead of indication to support refresh consent schemas
715                // see https://tools.ietf.org/html/rfc7675
716                self.ping_candidate(local_index, remote_index);
717            }
718        }
719    }
720
721    fn request_connectivity_check(&mut self) {
722        if self.ufrag_pwd.remote_credentials.is_some() {
723            self.contact(Instant::now());
724        }
725    }
726
727    /// Remove all candidates.
728    /// This closes any listening sockets and removes both the local and remote candidate lists.
729    ///
730    /// This is used for restarts, failures and on close.
731    pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
732        if !keep_local_candidates {
733            self.local_candidates.clear();
734        }
735        self.remote_candidates.clear();
736    }
737
738    pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
739        let (ip, port) = (addr.ip(), addr.port());
740        for (index, c) in self.remote_candidates.iter().enumerate() {
741            if c.address() == ip.to_string() && c.port() == port {
742                return Some(index);
743            }
744        }
745        None
746    }
747
748    pub(crate) fn find_local_candidate(
749        &self,
750        addr: SocketAddr,
751        protocol: Protocol,
752    ) -> Option<usize> {
753        for (index, c) in self.local_candidates.iter().enumerate() {
754            if c.addr() == addr && c.network_type().to_protocol() == protocol {
755                return Some(index);
756            }
757        }
758        None
759    }
760
761    pub(crate) fn send_binding_request(
762        &mut self,
763        m: &Message,
764        local_index: usize,
765        remote_index: usize,
766    ) {
767        trace!(
768            "[{}]: ping STUN from {} to {}",
769            self.get_name(),
770            local_index,
771            remote_index
772        );
773
774        self.invalidate_pending_binding_requests(Instant::now());
775
776        self.pending_binding_requests.push(BindingRequest {
777            timestamp: Instant::now(),
778            transaction_id: m.transaction_id,
779            destination: self.remote_candidates[remote_index].addr(),
780            is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
781        });
782
783        self.send_stun(m, local_index, remote_index);
784    }
785
786    pub(crate) fn send_binding_success(
787        &mut self,
788        m: &Message,
789        local_index: usize,
790        remote_index: usize,
791    ) {
792        let addr = self.remote_candidates[remote_index].addr();
793        let (ip, port) = (addr.ip(), addr.port());
794        let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
795
796        let (out, result) = {
797            let mut out = Message::new();
798            let result = out.build(&[
799                Box::new(m.clone()),
800                Box::new(BINDING_SUCCESS),
801                Box::new(XorMappedAddress { ip, port }),
802                Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
803                Box::new(FINGERPRINT),
804            ]);
805            (out, result)
806        };
807
808        if let Err(err) = result {
809            warn!(
810                "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
811                self.get_name(),
812                local_index,
813                remote_index,
814                err
815            );
816        } else {
817            self.send_stun(&out, local_index, remote_index);
818        }
819    }
820
821    /// Removes pending binding requests that are over `maxBindingRequestTimeout` old Let HTO be the
822    /// transaction timeout, which SHOULD be 2*RTT if RTT is known or 500 ms otherwise.
823    ///
824    /// reference: (IETF ref-8445)[https://tools.ietf.org/html/rfc8445#appendix-B.1].
825    pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
826        let pending_binding_requests = &mut self.pending_binding_requests;
827        let initial_size = pending_binding_requests.len();
828
829        let mut temp = vec![];
830        for binding_request in pending_binding_requests.drain(..) {
831            if filter_time
832                .checked_duration_since(binding_request.timestamp)
833                .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
834                .unwrap_or(true)
835            {
836                temp.push(binding_request);
837            }
838        }
839
840        *pending_binding_requests = temp;
841        let bind_requests_remaining = pending_binding_requests.len();
842        let bind_requests_removed = initial_size - bind_requests_remaining;
843        if bind_requests_removed > 0 {
844            trace!(
845                "[{}]: Discarded {} binding requests because they expired, still {} remaining",
846                self.get_name(),
847                bind_requests_removed,
848                bind_requests_remaining,
849            );
850        }
851    }
852
853    /// Assert that the passed `TransactionID` is in our `pendingBindingRequests` and returns the
854    /// destination, If the bindingRequest was valid remove it from our pending cache.
855    pub(crate) fn handle_inbound_binding_success(
856        &mut self,
857        id: TransactionId,
858    ) -> Option<BindingRequest> {
859        self.invalidate_pending_binding_requests(Instant::now());
860
861        let pending_binding_requests = &mut self.pending_binding_requests;
862        for i in 0..pending_binding_requests.len() {
863            if pending_binding_requests[i].transaction_id == id {
864                let valid_binding_request = pending_binding_requests.remove(i);
865                return Some(valid_binding_request);
866            }
867        }
868        None
869    }
870
871    /// Processes STUN traffic from a remote candidate.
872    pub(crate) fn handle_inbound(
873        &mut self,
874        m: &mut Message,
875        local_index: usize,
876        remote_addr: SocketAddr,
877    ) -> Result<()> {
878        if m.typ.method != METHOD_BINDING
879            || !(m.typ.class == CLASS_SUCCESS_RESPONSE
880                || m.typ.class == CLASS_REQUEST
881                || m.typ.class == CLASS_INDICATION)
882        {
883            trace!(
884                "[{}]: unhandled STUN from {} to {} class({}) method({})",
885                self.get_name(),
886                remote_addr,
887                local_index,
888                m.typ.class,
889                m.typ.method
890            );
891            return Err(Error::ErrUnhandledStunpacket);
892        }
893
894        if self.is_controlling {
895            if m.contains(ATTR_ICE_CONTROLLING) {
896                debug!(
897                    "[{}]: inbound isControlling && a.isControlling == true",
898                    self.get_name(),
899                );
900                return Err(Error::ErrUnexpectedStunrequestMessage);
901            } else if m.contains(ATTR_USE_CANDIDATE) {
902                debug!(
903                    "[{}]: useCandidate && a.isControlling == true",
904                    self.get_name(),
905                );
906                return Err(Error::ErrUnexpectedStunrequestMessage);
907            }
908        } else if m.contains(ATTR_ICE_CONTROLLED) {
909            debug!(
910                "[{}]: inbound isControlled && a.isControlling == false",
911                self.get_name(),
912            );
913            return Err(Error::ErrUnexpectedStunrequestMessage);
914        }
915
916        let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
917            debug!(
918                "[{}]: ufrag_pwd.remote_credentials.is_none",
919                self.get_name(),
920            );
921            return Err(Error::ErrPasswordEmpty);
922        };
923
924        let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
925        if m.typ.class == CLASS_SUCCESS_RESPONSE {
926            if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
927            {
928                warn!(
929                    "[{}]: discard message from ({}), {}",
930                    self.get_name(),
931                    remote_addr,
932                    err
933                );
934                return Err(err);
935            }
936
937            if let Some(remote_index) = &remote_candidate_index {
938                self.handle_success_response(m, local_index, *remote_index, remote_addr);
939            } else {
940                warn!(
941                    "[{}]: discard success message from ({}), no such remote",
942                    self.get_name(),
943                    remote_addr
944                );
945                return Err(Error::ErrUnhandledStunpacket);
946            }
947        } else if m.typ.class == CLASS_REQUEST {
948            {
949                let username = self.ufrag_pwd.local_credentials.ufrag.clone()
950                    + ":"
951                    + remote_credentials.ufrag.as_str();
952                if let Err(err) = assert_inbound_username(m, &username) {
953                    warn!(
954                        "[{}]: discard message from ({}), {}",
955                        self.get_name(),
956                        remote_addr,
957                        err
958                    );
959                    return Err(err);
960                } else if let Err(err) = assert_inbound_message_integrity(
961                    m,
962                    self.ufrag_pwd.local_credentials.pwd.as_bytes(),
963                ) {
964                    warn!(
965                        "[{}]: discard message from ({}), {}",
966                        self.get_name(),
967                        remote_addr,
968                        err
969                    );
970                    return Err(err);
971                }
972            }
973
974            if remote_candidate_index.is_none() {
975                let (ip, port, network_type) =
976                    (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
977
978                let prflx_candidate_config = CandidatePeerReflexiveConfig {
979                    base_config: CandidateConfig {
980                        network: network_type.to_string(),
981                        address: ip.to_string(),
982                        port,
983                        component: self.local_candidates[local_index].component(),
984                        ..CandidateConfig::default()
985                    },
986                    rel_addr: "".to_owned(),
987                    rel_port: 0,
988                };
989
990                match prflx_candidate_config.new_candidate_peer_reflexive() {
991                    Ok(prflx_candidate) => {
992                        let _ = self.add_remote_candidate(prflx_candidate);
993                        remote_candidate_index = Some(self.remote_candidates.len() - 1);
994                    }
995                    Err(err) => {
996                        error!(
997                            "[{}]: Failed to create new remote prflx candidate ({})",
998                            self.get_name(),
999                            err
1000                        );
1001                        return Err(err);
1002                    }
1003                };
1004
1005                debug!(
1006                    "[{}]: adding a new peer-reflexive candidate: {} ",
1007                    self.get_name(),
1008                    remote_addr
1009                );
1010            }
1011
1012            trace!(
1013                "[{}]: inbound STUN (Request) from {} to {}",
1014                self.get_name(),
1015                remote_addr,
1016                local_index
1017            );
1018
1019            if let Some(remote_index) = &remote_candidate_index {
1020                self.handle_binding_request(m, local_index, *remote_index);
1021            }
1022        }
1023
1024        if let Some(remote_index) = remote_candidate_index {
1025            self.remote_candidates[remote_index].seen(false);
1026        }
1027
1028        Ok(())
1029    }
1030
1031    // Processes non STUN traffic from a remote candidate, and returns true if it is an actual
1032    // remote candidate.
1033    pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1034        self.find_remote_candidate(remote_addr)
1035            .map_or(false, |remote_index| {
1036                self.remote_candidates[remote_index].seen(false);
1037                true
1038            })
1039    }
1040
1041    pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1042        let peer_addr = self.remote_candidates[remote_index].addr();
1043        let local_addr = self.local_candidates[local_index].addr();
1044        let protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1045            Protocol::TCP
1046        } else {
1047            Protocol::UDP
1048        };
1049
1050        self.transmits.push_back(Transmit {
1051            now: Instant::now(),
1052            transport: TransportContext {
1053                local_addr,
1054                peer_addr,
1055                ecn: None,
1056                protocol,
1057            },
1058            message: BytesMut::from(&msg.raw[..]),
1059        });
1060
1061        self.local_candidates[local_index].seen(true);
1062    }
1063
1064    fn handle_inbound_candidate_msg(
1065        &mut self,
1066        local_index: usize,
1067        buf: &[u8],
1068        remote_addr: SocketAddr,
1069        local_addr: SocketAddr,
1070    ) -> Result<()> {
1071        if stun::message::is_message(buf) {
1072            let mut m = Message {
1073                raw: vec![],
1074                ..Message::default()
1075            };
1076            // Explicitly copy raw buffer so Message can own the memory.
1077            m.raw.extend_from_slice(buf);
1078
1079            if let Err(err) = m.decode() {
1080                warn!(
1081                    "[{}]: Failed to handle decode ICE from {} to {}: {}",
1082                    self.get_name(),
1083                    local_addr,
1084                    remote_addr,
1085                    err
1086                );
1087                Err(err)
1088            } else {
1089                self.handle_inbound(&mut m, local_index, remote_addr)
1090            }
1091        } else {
1092            if !self.validate_non_stun_traffic(remote_addr) {
1093                warn!(
1094                    "[{}]: Discarded message, not a valid remote candidate from {}",
1095                    self.get_name(),
1096                    remote_addr,
1097                );
1098            } else {
1099                error!(
1100                    "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1101                    self.get_name(),
1102                    remote_addr
1103                );
1104            }
1105            Err(Error::ErrNonStunmessage)
1106        }
1107    }
1108
1109    pub(crate) fn get_name(&self) -> &str {
1110        if self.is_controlling {
1111            "controlling"
1112        } else {
1113            "controlled"
1114        }
1115    }
1116
1117    pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1118        self.selected_pair
1119    }
1120
1121    pub(crate) fn get_best_available_candidate_pair(&self) -> Option<usize> {
1122        let mut best_pair_index: Option<usize> = None;
1123
1124        for (index, p) in self.candidate_pairs.iter().enumerate() {
1125            if p.state == CandidatePairState::Failed {
1126                continue;
1127            }
1128
1129            if let Some(pair_index) = &mut best_pair_index {
1130                let b = &self.candidate_pairs[*pair_index];
1131                if b.priority() < p.priority() {
1132                    *pair_index = index;
1133                }
1134            } else {
1135                best_pair_index = Some(index);
1136            }
1137        }
1138
1139        best_pair_index
1140    }
1141
1142    pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1143        let mut best_pair_index: Option<usize> = None;
1144
1145        for (index, p) in self.candidate_pairs.iter().enumerate() {
1146            if p.state != CandidatePairState::Succeeded {
1147                continue;
1148            }
1149
1150            if let Some(pair_index) = &mut best_pair_index {
1151                let b = &self.candidate_pairs[*pair_index];
1152                if b.priority() < p.priority() {
1153                    *pair_index = index;
1154                }
1155            } else {
1156                best_pair_index = Some(index);
1157            }
1158        }
1159
1160        best_pair_index
1161    }
1162}