1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40 pub(crate) timestamp: Instant,
41 pub(crate) transaction_id: TransactionId,
42 pub(crate) destination: SocketAddr,
43 pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47 fn default() -> Self {
48 Self {
49 timestamp: Instant::now(),
50 transaction_id: TransactionId::default(),
51 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52 is_use_candidate: false,
53 }
54 }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59 pub ufrag: String,
60 pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65 pub(crate) local_credentials: Credentials,
66 pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70 let mut username = Username::new(ATTR_USERNAME, String::new());
71 username.get_from(m)?;
72
73 if username.to_string() != expected_username {
74 return Err(Error::Other(format!(
75 "{:?} expected({}) actual({})",
76 Error::ErrMismatchUsername,
77 expected_username,
78 username,
79 )));
80 }
81
82 Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86 let message_integrity_attr = MessageIntegrity(key.to_vec());
87 message_integrity_attr.check(m)
88}
89
90pub enum Event {
91 ConnectionStateChange(ConnectionState),
92 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93 RoleChange(bool),
96}
97
98pub struct Agent {
100 pub(crate) tie_breaker: u64,
101 pub(crate) is_controlling: bool,
102 pub(crate) lite: bool,
103
104 pub(crate) start_time: Instant,
105
106 pub(crate) connection_state: ConnectionState,
107 pub(crate) last_connection_state: ConnectionState,
108
109 pub(crate) ufrag_pwd: UfragPwd,
111
112 pub(crate) local_candidates: Vec<Candidate>,
113 pub(crate) remote_candidates: Vec<Candidate>,
114 pub(crate) candidate_pairs: Vec<CandidatePair>,
115 pub(crate) nominated_pair: Option<usize>,
116 pub(crate) selected_pair: Option<usize>,
117
118 pub(crate) pending_binding_requests: Vec<BindingRequest>,
120
121 pub(crate) insecure_skip_verify: bool,
123 pub(crate) max_binding_requests: u16,
124 pub(crate) host_acceptance_min_wait: Duration,
125 pub(crate) srflx_acceptance_min_wait: Duration,
126 pub(crate) prflx_acceptance_min_wait: Duration,
127 pub(crate) relay_acceptance_min_wait: Duration,
128 pub(crate) disconnected_timeout: Duration,
131 pub(crate) failed_timeout: Duration,
134 pub(crate) keepalive_interval: Duration,
137 pub(crate) check_interval: Duration,
139 pub(crate) checking_duration: Instant,
140 pub(crate) last_checking_time: Instant,
141
142 pub(crate) mdns: Option<Mdns>,
143 pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
144
145 pub(crate) mdns_mode: MulticastDnsMode,
146 pub(crate) mdns_local_name: String,
147 pub(crate) mdns_local_ip: Option<IpAddr>,
148
149 pub(crate) candidate_types: Vec<CandidateType>,
150 pub(crate) network_types: Vec<NetworkType>,
151 pub(crate) urls: Vec<Url>,
152
153 pub(crate) write_outs: VecDeque<TaggedBytesMut>,
154 pub(crate) event_outs: VecDeque<Event>,
155}
156
157impl Default for Agent {
158 fn default() -> Self {
159 Self {
160 tie_breaker: 0,
161 is_controlling: false,
162 lite: false,
163 start_time: Instant::now(),
164 connection_state: Default::default(),
165 last_connection_state: Default::default(),
166 ufrag_pwd: Default::default(),
167 local_candidates: vec![],
168 remote_candidates: vec![],
169 candidate_pairs: vec![],
170 nominated_pair: None,
171 selected_pair: None,
172 pending_binding_requests: vec![],
173 insecure_skip_verify: false,
174 max_binding_requests: 0,
175 host_acceptance_min_wait: Default::default(),
176 srflx_acceptance_min_wait: Default::default(),
177 prflx_acceptance_min_wait: Default::default(),
178 relay_acceptance_min_wait: Default::default(),
179 disconnected_timeout: Default::default(),
180 failed_timeout: Default::default(),
181 keepalive_interval: Default::default(),
182 check_interval: Default::default(),
183 checking_duration: Instant::now(),
184 last_checking_time: Instant::now(),
185 mdns_mode: MulticastDnsMode::Disabled,
186 mdns_local_name: "".to_owned(),
187 mdns_local_ip: None,
188 mdns_queries: HashMap::new(),
189 mdns: None,
190 candidate_types: vec![],
191 network_types: vec![],
192 urls: vec![],
193 write_outs: Default::default(),
194 event_outs: Default::default(),
195 }
196 }
197}
198
199impl Agent {
200 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
202 let mut mdns_local_name = config.multicast_dns_local_name.clone();
203 if mdns_local_name.is_empty() {
204 mdns_local_name = generate_multicast_dns_name();
205 }
206
207 if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
208 return Err(Error::ErrInvalidMulticastDnshostName);
209 }
210
211 let mdns_mode = config.multicast_dns_mode;
212 let mdns = create_multicast_dns(
213 mdns_mode,
214 &mdns_local_name,
215 &config.multicast_dns_local_ip,
216 &config.multicast_dns_query_timeout,
217 )
218 .unwrap_or_else(|err| {
219 warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
222 None
223 });
224
225 let candidate_types = if config.candidate_types.is_empty() {
226 default_candidate_types()
227 } else {
228 config.candidate_types.clone()
229 };
230
231 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
232 {
233 return Err(Error::ErrLiteUsingNonHostCandidates);
234 }
235
236 if !config.urls.is_empty()
237 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
238 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
239 {
240 return Err(Error::ErrUselessUrlsProvided);
241 }
242
243 let mut agent = Self {
244 tie_breaker: rand::random::<u64>(),
245 is_controlling: config.is_controlling,
246 lite: config.lite,
247
248 start_time: Instant::now(),
249
250 nominated_pair: None,
251 selected_pair: None,
252 candidate_pairs: vec![],
253
254 connection_state: ConnectionState::New,
255
256 insecure_skip_verify: config.insecure_skip_verify,
257
258 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
262 max_binding_requests
263 } else {
264 DEFAULT_MAX_BINDING_REQUESTS
265 },
266 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
267 config.host_acceptance_min_wait
268 {
269 host_acceptance_min_wait
270 } else {
271 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
272 },
273 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
274 config.srflx_acceptance_min_wait
275 {
276 srflx_acceptance_min_wait
277 } else {
278 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
279 },
280 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
281 config.prflx_acceptance_min_wait
282 {
283 prflx_acceptance_min_wait
284 } else {
285 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
286 },
287 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
288 config.relay_acceptance_min_wait
289 {
290 relay_acceptance_min_wait
291 } else {
292 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
293 },
294
295 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
298 disconnected_timeout
299 } else {
300 DEFAULT_DISCONNECTED_TIMEOUT
301 },
302
303 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
306 failed_timeout
307 } else {
308 DEFAULT_FAILED_TIMEOUT
309 },
310
311 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
314 keepalive_interval
315 } else {
316 DEFAULT_KEEPALIVE_INTERVAL
317 },
318
319 check_interval: if config.check_interval == Duration::from_secs(0) {
321 DEFAULT_CHECK_INTERVAL
322 } else {
323 config.check_interval
324 },
325 checking_duration: Instant::now(),
326 last_checking_time: Instant::now(),
327 last_connection_state: ConnectionState::Unspecified,
328
329 mdns,
330 mdns_queries: HashMap::new(),
331
332 mdns_mode,
333 mdns_local_name,
334 mdns_local_ip: config.multicast_dns_local_ip,
335
336 ufrag_pwd: UfragPwd::default(),
337
338 local_candidates: vec![],
339 remote_candidates: vec![],
340
341 pending_binding_requests: vec![],
343
344 candidate_types,
345 network_types: config.network_types.clone(),
346 urls: config.urls.clone(),
347
348 write_outs: VecDeque::new(),
349 event_outs: VecDeque::new(),
350 };
351
352 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
354 {
355 let _ = agent.close();
356 return Err(err);
357 }
358
359 Ok(agent)
360 }
361
362 pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
364 if !self.network_types.is_empty() {
366 let candidate_network_type = c.network_type();
367 if !self.network_types.contains(&candidate_network_type) {
368 debug!(
369 "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
370 candidate_network_type, self.network_types
371 );
372 return Ok(false);
373 }
374 }
375
376 if c.candidate_type() == CandidateType::Host
377 && self.mdns_mode == MulticastDnsMode::QueryAndGather
378 && c.network_type == NetworkType::Udp4
379 && self
380 .mdns_local_ip
381 .is_some_and(|local_ip| local_ip == c.addr().ip())
382 {
383 trace!(
386 "mDNS hides local ip {} with local name {}",
387 c.address, self.mdns_local_name
388 );
389 c.address = self.mdns_local_name.clone();
390 }
391
392 for cand in &self.local_candidates {
393 if cand.equal(&c) {
394 return Ok(false);
395 }
396 }
397
398 self.local_candidates.push(c);
399 let local_index = self.local_candidates.len() - 1;
400 let local_tcp_type = self.local_candidates[local_index].tcp_type();
401
402 for remote_index in 0..self.remote_candidates.len() {
403 let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
404
405 let should_pair = match (local_tcp_type, remote_tcp_type) {
411 (TcpType::Active, TcpType::Passive) => true,
412 (TcpType::Passive, TcpType::Active) => true,
413 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
414 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
416 };
417
418 if should_pair {
419 self.add_pair(local_index, remote_index);
420 }
421 }
422
423 self.request_connectivity_check();
424
425 Ok(true)
426 }
427
428 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
430 if !self.network_types.is_empty() {
432 let candidate_network_type = c.network_type();
433 if !self.network_types.contains(&candidate_network_type) {
434 debug!(
435 "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
436 candidate_network_type, self.network_types
437 );
438 return Ok(false);
439 }
440 }
441
442 if c.tcp_type() == TcpType::Active {
446 debug!(
447 "Ignoring remote candidate with tcptype active: {}",
448 c.address()
449 );
450 return Ok(false);
451 }
452
453 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
455 if self.mdns_mode == MulticastDnsMode::Disabled {
456 warn!(
457 "remote mDNS candidate added, but mDNS is disabled: ({})",
458 c.address()
459 );
460 return Ok(false);
461 }
462
463 if c.candidate_type() != CandidateType::Host {
464 return Err(Error::ErrAddressParseFailed);
465 }
466
467 if let Some(mdns_conn) = &mut self.mdns {
468 let query_id = mdns_conn.query(c.address());
469 self.mdns_queries.insert(query_id, c);
470 }
471
472 return Ok(false);
473 }
474
475 self.trigger_request_connectivity_check(vec![c]);
476 Ok(true)
477 }
478
479 fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
480 for c in remote_candidates {
481 if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
482 let remote_tcp_type = c.tcp_type();
483 self.remote_candidates.push(c);
484 let remote_index = self.remote_candidates.len() - 1;
485
486 for local_index in 0..self.local_candidates.len() {
488 let local_tcp_type = self.local_candidates[local_index].tcp_type();
489
490 let should_pair = match (local_tcp_type, remote_tcp_type) {
496 (TcpType::Active, TcpType::Passive) => true,
497 (TcpType::Passive, TcpType::Active) => true,
498 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
499 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
501 };
502
503 if should_pair {
504 self.add_pair(local_index, remote_index);
505 }
506 }
507
508 self.request_connectivity_check();
509 }
510 }
511 }
512
513 pub fn set_remote_credentials(
515 &mut self,
516 remote_ufrag: String,
517 remote_pwd: String,
518 ) -> Result<()> {
519 if remote_ufrag.is_empty() {
520 return Err(Error::ErrRemoteUfragEmpty);
521 } else if remote_pwd.is_empty() {
522 return Err(Error::ErrRemotePwdEmpty);
523 }
524
525 self.ufrag_pwd.remote_credentials = Some(Credentials {
526 ufrag: remote_ufrag,
527 pwd: remote_pwd,
528 });
529
530 Ok(())
531 }
532
533 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
535 self.ufrag_pwd.remote_credentials.as_ref()
536 }
537
538 pub fn get_local_credentials(&self) -> &Credentials {
540 &self.ufrag_pwd.local_credentials
541 }
542
543 pub fn role(&self) -> bool {
544 self.is_controlling
545 }
546
547 pub fn set_role(&mut self, is_controlling: bool) {
548 self.is_controlling = is_controlling;
549 }
550
551 pub fn state(&self) -> ConnectionState {
552 self.connection_state
553 }
554
555 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
556 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
557 .is_some()
558 && self.validate_non_stun_traffic(transport.peer_addr)
559 }
560
561 fn get_timeout_interval(&self) -> Duration {
562 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
563 self.check_interval,
564 self.keepalive_interval,
565 self.disconnected_timeout,
566 self.failed_timeout,
567 );
568 let mut interval = DEFAULT_CHECK_INTERVAL;
569
570 let mut update_interval = |x: Duration| {
571 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
572 interval = x;
573 }
574 };
575
576 match self.last_connection_state {
577 ConnectionState::New | ConnectionState::Checking => {
578 update_interval(check_interval);
580 }
581 ConnectionState::Connected | ConnectionState::Disconnected => {
582 update_interval(keepalive_interval);
583 }
584 _ => {}
585 };
586 update_interval(disconnected_timeout);
588 update_interval(failed_timeout);
589 interval
590 }
591
592 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
594 if let Some(pair_index) = self.get_selected_pair() {
595 let candidate_pair = &self.candidate_pairs[pair_index];
596 Some((
597 &self.local_candidates[candidate_pair.local_index],
598 &self.remote_candidates[candidate_pair.remote_index],
599 ))
600 } else {
601 None
602 }
603 }
604
605 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
606 if let Some(pair_index) = self.get_best_available_pair() {
607 let candidate_pair = &self.candidate_pairs[pair_index];
608 Some((
609 &self.local_candidates[candidate_pair.local_index],
610 &self.remote_candidates[candidate_pair.remote_index],
611 ))
612 } else {
613 None
614 }
615 }
616
617 pub fn start_connectivity_checks(
619 &mut self,
620 is_controlling: bool,
621 remote_ufrag: String,
622 remote_pwd: String,
623 ) -> Result<()> {
624 debug!(
625 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
626 is_controlling, remote_ufrag, remote_pwd
627 );
628 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
629 self.is_controlling = is_controlling;
630 self.start();
631
632 self.update_connection_state(ConnectionState::Checking);
633 self.request_connectivity_check();
634
635 Ok(())
636 }
637
638 pub fn restart(
641 &mut self,
642 mut ufrag: String,
643 mut pwd: String,
644 keep_local_candidates: bool,
645 ) -> Result<()> {
646 if ufrag.is_empty() {
647 ufrag = generate_ufrag();
648 }
649 if pwd.is_empty() {
650 pwd = generate_pwd();
651 }
652
653 if ufrag.len() * 8 < 24 {
654 return Err(Error::ErrLocalUfragInsufficientBits);
655 }
656 if pwd.len() * 8 < 128 {
657 return Err(Error::ErrLocalPwdInsufficientBits);
658 }
659
660 self.ufrag_pwd.local_credentials.ufrag = ufrag;
662 self.ufrag_pwd.local_credentials.pwd = pwd;
663 self.ufrag_pwd.remote_credentials = None;
664
665 self.pending_binding_requests = vec![];
666
667 self.candidate_pairs = vec![];
668
669 self.set_selected_pair(None);
670 self.delete_all_candidates(keep_local_candidates);
671 self.start();
672
673 if self.connection_state != ConnectionState::New {
676 self.update_connection_state(ConnectionState::Checking);
677 }
678
679 Ok(())
680 }
681
682 pub fn get_local_candidates(&self) -> &[Candidate] {
684 &self.local_candidates
685 }
686
687 fn contact(&mut self, now: Instant) {
688 if self.connection_state == ConnectionState::Failed {
689 self.last_connection_state = self.connection_state;
692 return;
693 }
694 if self.connection_state == ConnectionState::Checking {
695 if self.last_connection_state != self.connection_state {
697 self.checking_duration = now;
698 }
699
700 if now
702 .checked_duration_since(self.checking_duration)
703 .unwrap_or_else(|| Duration::from_secs(0))
704 > self.disconnected_timeout + self.failed_timeout
705 {
706 self.update_connection_state(ConnectionState::Failed);
707 self.last_connection_state = self.connection_state;
708 return;
709 }
710 }
711
712 self.contact_candidates();
713
714 self.last_connection_state = self.connection_state;
715 self.last_checking_time = now;
716 }
717
718 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
719 if self.connection_state != new_state {
720 if new_state == ConnectionState::Failed {
722 self.set_selected_pair(None);
723 self.delete_all_candidates(false);
724 }
725
726 info!(
727 "[{}]: Setting new connection state: {}",
728 self.get_name(),
729 new_state
730 );
731 self.connection_state = new_state;
732 self.event_outs
733 .push_back(Event::ConnectionStateChange(new_state));
734 }
735 }
736
737 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
738 if let Some(pair_index) = selected_pair {
739 trace!(
740 "[{}]: Set selected candidate pair: {:?}",
741 self.get_name(),
742 self.candidate_pairs[pair_index]
743 );
744
745 self.candidate_pairs[pair_index].nominated = true;
746 self.selected_pair = Some(pair_index);
747
748 self.update_connection_state(ConnectionState::Connected);
749
750 let candidate_pair = &self.candidate_pairs[pair_index];
752 self.event_outs
753 .push_back(Event::SelectedCandidatePairChange(
754 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
755 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
756 ));
757 } else {
758 self.selected_pair = None;
759 }
760 }
761
762 pub(crate) fn ping_all_candidates(&mut self) {
763 let mut pairs: Vec<(usize, usize)> = vec![];
764
765 let name = self.get_name().to_string();
766 if self.candidate_pairs.is_empty() {
767 warn!(
768 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
769 name,
770 );
771 }
772 for p in &mut self.candidate_pairs {
773 if p.state == CandidatePairState::Waiting {
774 p.state = CandidatePairState::InProgress;
775 } else if p.state != CandidatePairState::InProgress {
776 continue;
777 }
778
779 if p.binding_request_count > self.max_binding_requests {
780 trace!(
781 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
782 name,
783 *p,
784 self.local_candidates[p.local_index].addr(),
785 self.remote_candidates[p.remote_index].addr()
786 );
787 p.state = CandidatePairState::Failed;
788 } else {
789 p.binding_request_count += 1;
790 let local = p.local_index;
791 let remote = p.remote_index;
792 pairs.push((local, remote));
793 }
794 }
795
796 if !pairs.is_empty() {
797 trace!(
798 "[{}]: pinging all {} candidates",
799 self.get_name(),
800 pairs.len()
801 );
802 }
803
804 for (local, remote) in pairs {
805 self.ping_candidate(local, remote);
806 }
807 }
808
809 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
810 let p = CandidatePair::new(
811 local_index,
812 remote_index,
813 self.local_candidates[local_index].priority(),
814 self.remote_candidates[remote_index].priority(),
815 self.is_controlling,
816 );
817 self.candidate_pairs.push(p);
818 }
819
820 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
821 for (index, p) in self.candidate_pairs.iter().enumerate() {
822 if p.local_index == local_index && p.remote_index == remote_index {
823 return Some(index);
824 }
825 }
826 None
827 }
828
829 pub(crate) fn validate_selected_pair(&mut self) -> bool {
832 let (valid, disconnected_time) = {
833 self.selected_pair.as_ref().map_or_else(
834 || (false, Duration::from_secs(0)),
835 |&pair_index| {
836 let remote_index = self.candidate_pairs[pair_index].remote_index;
837
838 let disconnected_time = Instant::now()
839 .duration_since(self.remote_candidates[remote_index].last_received());
840 (true, disconnected_time)
841 },
842 )
843 };
844
845 if valid {
846 let mut total_time_to_failure = self.failed_timeout;
848 if total_time_to_failure != Duration::from_secs(0) {
849 total_time_to_failure += self.disconnected_timeout;
850 }
851
852 if total_time_to_failure != Duration::from_secs(0)
853 && disconnected_time > total_time_to_failure
854 {
855 self.update_connection_state(ConnectionState::Failed);
856 } else if self.disconnected_timeout != Duration::from_secs(0)
857 && disconnected_time > self.disconnected_timeout
858 {
859 self.update_connection_state(ConnectionState::Disconnected);
860 } else {
861 self.update_connection_state(ConnectionState::Connected);
862 }
863 }
864
865 valid
866 }
867
868 pub(crate) fn check_keepalive(&mut self) {
872 let (local_index, remote_index, pair_index) = {
873 self.selected_pair
874 .as_ref()
875 .map_or((None, None, None), |&pair_index| {
876 let p = &self.candidate_pairs[pair_index];
877 (Some(p.local_index), Some(p.remote_index), Some(pair_index))
878 })
879 };
880
881 if let (Some(local_index), Some(remote_index), Some(pair_index)) =
882 (local_index, remote_index, pair_index)
883 {
884 let last_sent =
885 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
886
887 let last_received =
888 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
889
890 if (self.keepalive_interval != Duration::from_secs(0))
891 && ((last_sent > self.keepalive_interval)
892 || (last_received > self.keepalive_interval))
893 {
894 self.candidate_pairs[pair_index].on_consent_request_sent();
898 self.ping_candidate(local_index, remote_index);
899 }
900 }
901 }
902
903 fn request_connectivity_check(&mut self) {
904 if self.ufrag_pwd.remote_credentials.is_some() {
905 self.contact(Instant::now());
906 }
907 }
908
909 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
914 if !keep_local_candidates {
915 self.local_candidates.clear();
916 }
917 self.remote_candidates.clear();
918 }
919
920 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
921 for (index, c) in self.remote_candidates.iter().enumerate() {
922 if c.addr() == addr {
923 return Some(index);
924 }
925 }
926 None
927 }
928
929 pub(crate) fn find_local_candidate(
930 &self,
931 addr: SocketAddr,
932 transport_protocol: TransportProtocol,
933 ) -> Option<usize> {
934 for (index, c) in self.local_candidates.iter().enumerate() {
935 if c.network_type().to_protocol() != transport_protocol {
936 continue;
937 }
938
939 if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
943 if c.addr().ip() == addr.ip() {
944 return Some(index);
945 }
946 } else if c.addr() == addr {
947 return Some(index);
948 } else if let Some(related_address) = c.related_address()
949 && related_address.address == addr.ip().to_string()
950 && related_address.port == addr.port()
951 {
952 return Some(index);
953 }
954 }
955 None
956 }
957
958 pub(crate) fn send_binding_request(
959 &mut self,
960 m: &Message,
961 local_index: usize,
962 remote_index: usize,
963 ) {
964 trace!(
965 "[{}]: ping STUN from {} to {}",
966 self.get_name(),
967 self.local_candidates[local_index],
968 self.remote_candidates[remote_index],
969 );
970
971 self.invalidate_pending_binding_requests(Instant::now());
972
973 self.pending_binding_requests.push(BindingRequest {
974 timestamp: Instant::now(),
975 transaction_id: m.transaction_id,
976 destination: self.remote_candidates[remote_index].addr(),
977 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
978 });
979
980 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
982 self.candidate_pairs[pair_index].on_request_sent();
983 }
984
985 self.send_stun(m, local_index, remote_index);
986 }
987
988 pub(crate) fn send_binding_success(
989 &mut self,
990 m: &Message,
991 local_index: usize,
992 remote_index: usize,
993 ) {
994 let addr = self.remote_candidates[remote_index].addr();
995 let (ip, port) = (addr.ip(), addr.port());
996 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
997
998 let (out, result) = {
999 let mut out = Message::new();
1000 let result = out.build(&[
1001 Box::new(m.clone()),
1002 Box::new(BINDING_SUCCESS),
1003 Box::new(XorMappedAddress { ip, port }),
1004 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
1005 Box::new(FINGERPRINT),
1006 ]);
1007 (out, result)
1008 };
1009
1010 if let Err(err) = result {
1011 warn!(
1012 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
1013 self.get_name(),
1014 self.local_candidates[local_index],
1015 self.remote_candidates[remote_index],
1016 err
1017 );
1018 } else {
1019 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
1021 self.candidate_pairs[pair_index].on_response_sent();
1022 }
1023 self.send_stun(&out, local_index, remote_index);
1024 }
1025 }
1026
1027 pub(crate) fn send_role_conflict_error(
1030 &mut self,
1031 m: &Message,
1032 local_index: usize,
1033 remote_index: usize,
1034 ) {
1035 use stun::error_code::*;
1036
1037 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
1038
1039 let (out, result) = {
1040 let mut out = Message::new();
1041 let result = out.build(&[
1042 Box::new(m.clone()),
1043 Box::new(stun::message::BINDING_ERROR),
1044 Box::new(CODE_ROLE_CONFLICT),
1045 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
1046 Box::new(FINGERPRINT),
1047 ]);
1048 (out, result)
1049 };
1050
1051 if let Err(err) = result {
1052 warn!(
1053 "[{}]: Failed to send role conflict error from: {} to: {} error: {}",
1054 self.get_name(),
1055 self.local_candidates[local_index],
1056 self.remote_candidates[remote_index],
1057 err
1058 );
1059 } else {
1060 debug!(
1061 "[{}]: Sent 487 Role Conflict error from {} to {}",
1062 self.get_name(),
1063 self.local_candidates[local_index],
1064 self.remote_candidates[remote_index]
1065 );
1066 self.send_stun(&out, local_index, remote_index);
1067 }
1068 }
1069
1070 pub(crate) fn switch_role(&mut self) {
1073 self.is_controlling = !self.is_controlling;
1074
1075 for pair in &mut self.candidate_pairs {
1078 pair.ice_role_controlling = self.is_controlling;
1079 }
1080
1081 self.nominated_pair = None;
1083
1084 info!(
1085 "[{}]: Role switched, recomputed {} candidate pair priorities",
1086 self.get_name(),
1087 self.candidate_pairs.len()
1088 );
1089
1090 self.event_outs
1091 .push_back(Event::RoleChange(self.is_controlling));
1092 }
1093
1094 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1099 let pending_binding_requests = &mut self.pending_binding_requests;
1100 let initial_size = pending_binding_requests.len();
1101
1102 let mut temp = vec![];
1103 for binding_request in pending_binding_requests.drain(..) {
1104 if filter_time
1105 .checked_duration_since(binding_request.timestamp)
1106 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1107 .unwrap_or(true)
1108 {
1109 temp.push(binding_request);
1110 }
1111 }
1112
1113 *pending_binding_requests = temp;
1114 let bind_requests_remaining = pending_binding_requests.len();
1115 let bind_requests_removed = initial_size - bind_requests_remaining;
1116 if bind_requests_removed > 0 {
1117 trace!(
1118 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1119 self.get_name(),
1120 bind_requests_removed,
1121 bind_requests_remaining,
1122 );
1123 }
1124 }
1125
1126 pub(crate) fn handle_inbound_binding_success(
1129 &mut self,
1130 id: TransactionId,
1131 ) -> Option<BindingRequest> {
1132 self.invalidate_pending_binding_requests(Instant::now());
1133
1134 let pending_binding_requests = &mut self.pending_binding_requests;
1135 for i in 0..pending_binding_requests.len() {
1136 if pending_binding_requests[i].transaction_id == id {
1137 let valid_binding_request = pending_binding_requests.remove(i);
1138 return Some(valid_binding_request);
1139 }
1140 }
1141 None
1142 }
1143
1144 pub(crate) fn handle_inbound(
1146 &mut self,
1147 now: Instant,
1148 m: &mut Message,
1149 local_index: usize,
1150 remote_addr: SocketAddr,
1151 ) -> Result<()> {
1152 if m.typ.method != METHOD_BINDING
1153 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1154 || m.typ.class == CLASS_REQUEST
1155 || m.typ.class == CLASS_INDICATION)
1156 {
1157 trace!(
1158 "[{}]: unhandled STUN from {} to {} class({}) method({})",
1159 self.get_name(),
1160 remote_addr,
1161 self.local_candidates[local_index],
1162 m.typ.class,
1163 m.typ.method
1164 );
1165 return Err(Error::ErrUnhandledStunpacket);
1166 }
1167
1168 if self.is_controlling {
1170 if m.contains(ATTR_ICE_CONTROLLING) {
1171 let mut remote_controlling = crate::attributes::control::AttrControlling::default();
1173 if let Err(err) = remote_controlling.get_from(m) {
1174 warn!(
1175 "[{}]: Failed to get remote ICE-CONTROLLING attribute: {}",
1176 self.get_name(),
1177 err
1178 );
1179 return Err(err);
1180 }
1181
1182 debug!(
1183 "[{}]: Role conflict detected (both controlling), local tiebreaker: {}, remote tiebreaker: {}",
1184 self.get_name(),
1185 self.tie_breaker,
1186 remote_controlling.0
1187 );
1188
1189 if m.typ.class == CLASS_REQUEST {
1191 if let Some(remote_index) = self.find_remote_candidate(remote_addr) {
1193 self.send_role_conflict_error(m, local_index, remote_index);
1194 }
1195
1196 if self.tie_breaker < remote_controlling.0 {
1198 info!(
1199 "[{}]: Switching from controlling to controlled due to role conflict (smaller tiebreaker)",
1200 self.get_name()
1201 );
1202 self.switch_role();
1203 }
1204 }
1205 } else if m.contains(ATTR_USE_CANDIDATE) {
1207 debug!(
1208 "[{}]: useCandidate && a.isControlling == true",
1209 self.get_name(),
1210 );
1211 return Err(Error::ErrUnexpectedStunrequestMessage);
1212 }
1213 } else if m.contains(ATTR_ICE_CONTROLLED) {
1214 let mut remote_controlled = crate::attributes::control::AttrControlled::default();
1216 if let Err(err) = remote_controlled.get_from(m) {
1217 warn!(
1218 "[{}]: Failed to get remote ICE-CONTROLLED attribute: {}",
1219 self.get_name(),
1220 err
1221 );
1222 return Err(err);
1223 }
1224
1225 debug!(
1226 "[{}]: Role conflict detected (both controlled), local tiebreaker: {}, remote tiebreaker: {}",
1227 self.get_name(),
1228 self.tie_breaker,
1229 remote_controlled.0
1230 );
1231
1232 if m.typ.class == CLASS_REQUEST {
1234 if let Some(remote_index) = self.find_remote_candidate(remote_addr) {
1236 self.send_role_conflict_error(m, local_index, remote_index);
1237 }
1238
1239 if self.tie_breaker > remote_controlled.0 {
1241 info!(
1242 "[{}]: Switching from controlled to controlling due to role conflict (larger tiebreaker)",
1243 self.get_name()
1244 );
1245 self.switch_role();
1246 }
1247 }
1248 }
1250
1251 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1252 debug!(
1253 "[{}]: ufrag_pwd.remote_credentials.is_none",
1254 self.get_name(),
1255 );
1256 return Err(Error::ErrPasswordEmpty);
1257 };
1258
1259 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1260 if m.typ.class == CLASS_SUCCESS_RESPONSE {
1261 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1262 {
1263 warn!(
1264 "[{}]: discard message from ({}), {}",
1265 self.get_name(),
1266 remote_addr,
1267 err
1268 );
1269 return Err(err);
1270 }
1271
1272 if let Some(remote_index) = &remote_candidate_index {
1273 self.handle_success_response(now, m, local_index, *remote_index, remote_addr);
1274 } else {
1275 warn!(
1276 "[{}]: discard success message from ({}), no such remote",
1277 self.get_name(),
1278 remote_addr
1279 );
1280 return Err(Error::ErrUnhandledStunpacket);
1281 }
1282 } else if m.typ.class == CLASS_REQUEST {
1283 {
1284 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1285 + ":"
1286 + remote_credentials.ufrag.as_str();
1287 if let Err(err) = assert_inbound_username(m, &username) {
1288 warn!(
1289 "[{}]: discard message from ({}), {}",
1290 self.get_name(),
1291 remote_addr,
1292 err
1293 );
1294 return Err(err);
1295 } else if let Err(err) = assert_inbound_message_integrity(
1296 m,
1297 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1298 ) {
1299 warn!(
1300 "[{}]: discard message from ({}), {}",
1301 self.get_name(),
1302 remote_addr,
1303 err
1304 );
1305 return Err(err);
1306 }
1307 }
1308
1309 if remote_candidate_index.is_none() {
1310 let network_type = self.local_candidates[local_index].network_type();
1312 let (ip, port) = (remote_addr.ip(), remote_addr.port());
1313
1314 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1315 base_config: CandidateConfig {
1316 network: network_type.to_string(),
1317 address: ip.to_string(),
1318 port,
1319 component: self.local_candidates[local_index].component(),
1320 ..CandidateConfig::default()
1321 },
1322 rel_addr: "".to_owned(),
1323 rel_port: 0,
1324 };
1325
1326 match prflx_candidate_config.new_candidate_peer_reflexive() {
1327 Ok(prflx_candidate) => {
1328 if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1329 && added
1330 {
1331 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1332 }
1333 }
1334 Err(err) => {
1335 error!(
1336 "[{}]: Failed to create new remote prflx candidate ({})",
1337 self.get_name(),
1338 err
1339 );
1340 return Err(err);
1341 }
1342 };
1343
1344 debug!(
1345 "[{}]: adding a new peer-reflexive candidate: {} ",
1346 self.get_name(),
1347 remote_addr
1348 );
1349 }
1350
1351 trace!(
1352 "[{}]: inbound STUN (Request) from {} to {}",
1353 self.get_name(),
1354 remote_addr,
1355 self.local_candidates[local_index]
1356 );
1357
1358 if let Some(remote_index) = &remote_candidate_index {
1359 self.handle_binding_request(m, local_index, *remote_index);
1360 }
1361 }
1362
1363 if let Some(remote_index) = remote_candidate_index {
1364 self.remote_candidates[remote_index].seen(false);
1365 }
1366
1367 Ok(())
1368 }
1369
1370 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1373 self.find_remote_candidate(remote_addr)
1374 .is_some_and(|remote_index| {
1375 self.remote_candidates[remote_index].seen(false);
1376 true
1377 })
1378 }
1379
1380 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1381 let peer_addr = self.remote_candidates[remote_index].addr();
1382 let local_addr = self.local_candidates[local_index].addr();
1383 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1384 TransportProtocol::TCP
1385 } else {
1386 TransportProtocol::UDP
1387 };
1388
1389 self.write_outs.push_back(TaggedBytesMut {
1390 now: Instant::now(),
1391 transport: TransportContext {
1392 local_addr,
1393 peer_addr,
1394 ecn: None,
1395 transport_protocol,
1396 },
1397 message: BytesMut::from(&msg.raw[..]),
1398 });
1399
1400 self.local_candidates[local_index].seen(true);
1401 }
1402
1403 fn handle_inbound_candidate_msg(
1404 &mut self,
1405 local_index: usize,
1406 msg: TaggedBytesMut,
1407 ) -> Result<()> {
1408 if is_stun_message(&msg.message) {
1409 let mut m = Message {
1410 raw: msg.message.to_vec(),
1411 ..Message::default()
1412 };
1413
1414 if let Err(err) = m.decode() {
1415 warn!(
1416 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1417 self.get_name(),
1418 msg.transport.local_addr,
1419 msg.transport.peer_addr,
1420 err
1421 );
1422 Err(err)
1423 } else {
1424 self.handle_inbound(msg.now, &mut m, local_index, msg.transport.peer_addr)
1425 }
1426 } else {
1427 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1428 warn!(
1429 "[{}]: Discarded message, not a valid remote candidate from {}",
1430 self.get_name(),
1431 msg.transport.peer_addr,
1432 );
1433 } else {
1434 warn!(
1435 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1436 self.get_name(),
1437 msg.transport.peer_addr
1438 );
1439 }
1440 Err(Error::ErrNonStunmessage)
1441 }
1442 }
1443
1444 pub(crate) fn get_name(&self) -> &str {
1445 if self.is_controlling {
1446 "controlling"
1447 } else {
1448 "controlled"
1449 }
1450 }
1451
1452 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1453 self.selected_pair
1454 }
1455
1456 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1457 let mut best_pair_index: Option<usize> = None;
1458
1459 for (index, p) in self.candidate_pairs.iter().enumerate() {
1460 if p.state == CandidatePairState::Failed {
1461 continue;
1462 }
1463
1464 if let Some(pair_index) = &mut best_pair_index {
1465 let b = &self.candidate_pairs[*pair_index];
1466 if b.priority() < p.priority() {
1467 *pair_index = index;
1468 }
1469 } else {
1470 best_pair_index = Some(index);
1471 }
1472 }
1473
1474 best_pair_index
1475 }
1476
1477 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1478 let mut best_pair_index: Option<usize> = None;
1479
1480 for (index, p) in self.candidate_pairs.iter().enumerate() {
1481 if p.state != CandidatePairState::Succeeded {
1482 continue;
1483 }
1484
1485 if let Some(pair_index) = &mut best_pair_index {
1486 let b = &self.candidate_pairs[*pair_index];
1487 if b.priority() < p.priority() {
1488 *pair_index = index;
1489 }
1490 } else {
1491 best_pair_index = Some(index);
1492 }
1493 }
1494
1495 best_pair_index
1496 }
1497}