1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40 pub(crate) timestamp: Instant,
41 pub(crate) transaction_id: TransactionId,
42 pub(crate) destination: SocketAddr,
43 pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47 fn default() -> Self {
48 Self {
49 timestamp: Instant::now(),
50 transaction_id: TransactionId::default(),
51 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52 is_use_candidate: false,
53 }
54 }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59 pub ufrag: String,
60 pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65 pub(crate) local_credentials: Credentials,
66 pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70 let mut username = Username::new(ATTR_USERNAME, String::new());
71 username.get_from(m)?;
72
73 if username.to_string() != expected_username {
74 return Err(Error::Other(format!(
75 "{:?} expected({}) actual({})",
76 Error::ErrMismatchUsername,
77 expected_username,
78 username,
79 )));
80 }
81
82 Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86 let message_integrity_attr = MessageIntegrity(key.to_vec());
87 message_integrity_attr.check(m)
88}
89
90pub enum Event {
91 ConnectionStateChange(ConnectionState),
92 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93}
94
95pub struct Agent {
97 pub(crate) tie_breaker: u64,
98 pub(crate) is_controlling: bool,
99 pub(crate) lite: bool,
100
101 pub(crate) start_time: Instant,
102
103 pub(crate) connection_state: ConnectionState,
104 pub(crate) last_connection_state: ConnectionState,
105
106 pub(crate) ufrag_pwd: UfragPwd,
108
109 pub(crate) local_candidates: Vec<Candidate>,
110 pub(crate) remote_candidates: Vec<Candidate>,
111 pub(crate) candidate_pairs: Vec<CandidatePair>,
112 pub(crate) nominated_pair: Option<usize>,
113 pub(crate) selected_pair: Option<usize>,
114
115 pub(crate) pending_binding_requests: Vec<BindingRequest>,
117
118 pub(crate) insecure_skip_verify: bool,
120 pub(crate) max_binding_requests: u16,
121 pub(crate) host_acceptance_min_wait: Duration,
122 pub(crate) srflx_acceptance_min_wait: Duration,
123 pub(crate) prflx_acceptance_min_wait: Duration,
124 pub(crate) relay_acceptance_min_wait: Duration,
125 pub(crate) disconnected_timeout: Duration,
128 pub(crate) failed_timeout: Duration,
131 pub(crate) keepalive_interval: Duration,
134 pub(crate) check_interval: Duration,
136 pub(crate) checking_duration: Instant,
137 pub(crate) last_checking_time: Instant,
138
139 pub(crate) mdns: Option<Mdns>,
140 pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
141
142 pub(crate) mdns_mode: MulticastDnsMode,
143 pub(crate) mdns_local_name: String,
144 pub(crate) mdns_local_ip: Option<IpAddr>,
145
146 pub(crate) candidate_types: Vec<CandidateType>,
147 pub(crate) network_types: Vec<NetworkType>,
148 pub(crate) urls: Vec<Url>,
149
150 pub(crate) write_outs: VecDeque<TaggedBytesMut>,
151 pub(crate) event_outs: VecDeque<Event>,
152}
153
154impl Default for Agent {
155 fn default() -> Self {
156 Self {
157 tie_breaker: 0,
158 is_controlling: false,
159 lite: false,
160 start_time: Instant::now(),
161 connection_state: Default::default(),
162 last_connection_state: Default::default(),
163 ufrag_pwd: Default::default(),
164 local_candidates: vec![],
165 remote_candidates: vec![],
166 candidate_pairs: vec![],
167 nominated_pair: None,
168 selected_pair: None,
169 pending_binding_requests: vec![],
170 insecure_skip_verify: false,
171 max_binding_requests: 0,
172 host_acceptance_min_wait: Default::default(),
173 srflx_acceptance_min_wait: Default::default(),
174 prflx_acceptance_min_wait: Default::default(),
175 relay_acceptance_min_wait: Default::default(),
176 disconnected_timeout: Default::default(),
177 failed_timeout: Default::default(),
178 keepalive_interval: Default::default(),
179 check_interval: Default::default(),
180 checking_duration: Instant::now(),
181 last_checking_time: Instant::now(),
182 mdns_mode: MulticastDnsMode::Disabled,
183 mdns_local_name: "".to_owned(),
184 mdns_local_ip: None,
185 mdns_queries: HashMap::new(),
186 mdns: None,
187 candidate_types: vec![],
188 network_types: vec![],
189 urls: vec![],
190 write_outs: Default::default(),
191 event_outs: Default::default(),
192 }
193 }
194}
195
196impl Agent {
197 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
199 let mut mdns_local_name = config.multicast_dns_local_name.clone();
200 if mdns_local_name.is_empty() {
201 mdns_local_name = generate_multicast_dns_name();
202 }
203
204 if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
205 return Err(Error::ErrInvalidMulticastDnshostName);
206 }
207
208 let mdns_mode = config.multicast_dns_mode;
209 let mdns = create_multicast_dns(
210 mdns_mode,
211 &mdns_local_name,
212 &config.multicast_dns_local_ip,
213 &config.multicast_dns_query_timeout,
214 )
215 .unwrap_or_else(|err| {
216 warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
219 None
220 });
221
222 let candidate_types = if config.candidate_types.is_empty() {
223 default_candidate_types()
224 } else {
225 config.candidate_types.clone()
226 };
227
228 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
229 {
230 return Err(Error::ErrLiteUsingNonHostCandidates);
231 }
232
233 if !config.urls.is_empty()
234 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
235 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
236 {
237 return Err(Error::ErrUselessUrlsProvided);
238 }
239
240 let mut agent = Self {
241 tie_breaker: rand::random::<u64>(),
242 is_controlling: config.is_controlling,
243 lite: config.lite,
244
245 start_time: Instant::now(),
246
247 nominated_pair: None,
248 selected_pair: None,
249 candidate_pairs: vec![],
250
251 connection_state: ConnectionState::New,
252
253 insecure_skip_verify: config.insecure_skip_verify,
254
255 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
259 max_binding_requests
260 } else {
261 DEFAULT_MAX_BINDING_REQUESTS
262 },
263 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
264 config.host_acceptance_min_wait
265 {
266 host_acceptance_min_wait
267 } else {
268 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
269 },
270 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
271 config.srflx_acceptance_min_wait
272 {
273 srflx_acceptance_min_wait
274 } else {
275 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
276 },
277 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
278 config.prflx_acceptance_min_wait
279 {
280 prflx_acceptance_min_wait
281 } else {
282 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
283 },
284 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
285 config.relay_acceptance_min_wait
286 {
287 relay_acceptance_min_wait
288 } else {
289 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
290 },
291
292 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
295 disconnected_timeout
296 } else {
297 DEFAULT_DISCONNECTED_TIMEOUT
298 },
299
300 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
303 failed_timeout
304 } else {
305 DEFAULT_FAILED_TIMEOUT
306 },
307
308 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
311 keepalive_interval
312 } else {
313 DEFAULT_KEEPALIVE_INTERVAL
314 },
315
316 check_interval: if config.check_interval == Duration::from_secs(0) {
318 DEFAULT_CHECK_INTERVAL
319 } else {
320 config.check_interval
321 },
322 checking_duration: Instant::now(),
323 last_checking_time: Instant::now(),
324 last_connection_state: ConnectionState::Unspecified,
325
326 mdns,
327 mdns_queries: HashMap::new(),
328
329 mdns_mode,
330 mdns_local_name,
331 mdns_local_ip: config.multicast_dns_local_ip,
332
333 ufrag_pwd: UfragPwd::default(),
334
335 local_candidates: vec![],
336 remote_candidates: vec![],
337
338 pending_binding_requests: vec![],
340
341 candidate_types,
342 network_types: config.network_types.clone(),
343 urls: config.urls.clone(),
344
345 write_outs: VecDeque::new(),
346 event_outs: VecDeque::new(),
347 };
348
349 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
351 {
352 let _ = agent.close();
353 return Err(err);
354 }
355
356 Ok(agent)
357 }
358
359 pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
361 if !self.network_types.is_empty() {
363 let candidate_network_type = c.network_type();
364 if !self.network_types.contains(&candidate_network_type) {
365 debug!(
366 "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
367 candidate_network_type, self.network_types
368 );
369 return Ok(false);
370 }
371 }
372
373 if c.candidate_type() == CandidateType::Host
374 && self.mdns_mode == MulticastDnsMode::QueryAndGather
375 && c.network_type == NetworkType::Udp4
376 && self
377 .mdns_local_ip
378 .is_some_and(|local_ip| local_ip == c.addr().ip())
379 {
380 trace!(
383 "mDNS hides local ip {} with local name {}",
384 c.address, self.mdns_local_name
385 );
386 c.address = self.mdns_local_name.clone();
387 }
388
389 for cand in &self.local_candidates {
390 if cand.equal(&c) {
391 return Ok(false);
392 }
393 }
394
395 self.local_candidates.push(c);
396 let local_index = self.local_candidates.len() - 1;
397 let local_tcp_type = self.local_candidates[local_index].tcp_type();
398
399 for remote_index in 0..self.remote_candidates.len() {
400 let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
401
402 let should_pair = match (local_tcp_type, remote_tcp_type) {
408 (TcpType::Active, TcpType::Passive) => true,
409 (TcpType::Passive, TcpType::Active) => true,
410 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
411 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
413 };
414
415 if should_pair {
416 self.add_pair(local_index, remote_index);
417 }
418 }
419
420 self.request_connectivity_check();
421
422 Ok(true)
423 }
424
425 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
427 if !self.network_types.is_empty() {
429 let candidate_network_type = c.network_type();
430 if !self.network_types.contains(&candidate_network_type) {
431 debug!(
432 "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
433 candidate_network_type, self.network_types
434 );
435 return Ok(false);
436 }
437 }
438
439 if c.tcp_type() == TcpType::Active {
443 debug!(
444 "Ignoring remote candidate with tcptype active: {}",
445 c.address()
446 );
447 return Ok(false);
448 }
449
450 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
452 if self.mdns_mode == MulticastDnsMode::Disabled {
453 warn!(
454 "remote mDNS candidate added, but mDNS is disabled: ({})",
455 c.address()
456 );
457 return Ok(false);
458 }
459
460 if c.candidate_type() != CandidateType::Host {
461 return Err(Error::ErrAddressParseFailed);
462 }
463
464 if let Some(mdns_conn) = &mut self.mdns {
465 let query_id = mdns_conn.query(c.address());
466 self.mdns_queries.insert(query_id, c);
467 }
468
469 return Ok(false);
470 }
471
472 self.trigger_request_connectivity_check(vec![c]);
473 Ok(true)
474 }
475
476 fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
477 for c in remote_candidates {
478 if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
479 let remote_tcp_type = c.tcp_type();
480 self.remote_candidates.push(c);
481 let remote_index = self.remote_candidates.len() - 1;
482
483 for local_index in 0..self.local_candidates.len() {
485 let local_tcp_type = self.local_candidates[local_index].tcp_type();
486
487 let should_pair = match (local_tcp_type, remote_tcp_type) {
493 (TcpType::Active, TcpType::Passive) => true,
494 (TcpType::Passive, TcpType::Active) => true,
495 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
496 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
498 };
499
500 if should_pair {
501 self.add_pair(local_index, remote_index);
502 }
503 }
504
505 self.request_connectivity_check();
506 }
507 }
508 }
509
510 pub fn set_remote_credentials(
512 &mut self,
513 remote_ufrag: String,
514 remote_pwd: String,
515 ) -> Result<()> {
516 if remote_ufrag.is_empty() {
517 return Err(Error::ErrRemoteUfragEmpty);
518 } else if remote_pwd.is_empty() {
519 return Err(Error::ErrRemotePwdEmpty);
520 }
521
522 self.ufrag_pwd.remote_credentials = Some(Credentials {
523 ufrag: remote_ufrag,
524 pwd: remote_pwd,
525 });
526
527 Ok(())
528 }
529
530 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
532 self.ufrag_pwd.remote_credentials.as_ref()
533 }
534
535 pub fn get_local_credentials(&self) -> &Credentials {
537 &self.ufrag_pwd.local_credentials
538 }
539
540 pub fn role(&self) -> bool {
541 self.is_controlling
542 }
543
544 pub fn set_role(&mut self, is_controlling: bool) {
545 self.is_controlling = is_controlling;
546 }
547
548 pub fn state(&self) -> ConnectionState {
549 self.connection_state
550 }
551
552 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
553 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
554 .is_some()
555 && self.validate_non_stun_traffic(transport.peer_addr)
556 }
557
558 fn get_timeout_interval(&self) -> Duration {
559 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
560 self.check_interval,
561 self.keepalive_interval,
562 self.disconnected_timeout,
563 self.failed_timeout,
564 );
565 let mut interval = DEFAULT_CHECK_INTERVAL;
566
567 let mut update_interval = |x: Duration| {
568 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
569 interval = x;
570 }
571 };
572
573 match self.last_connection_state {
574 ConnectionState::New | ConnectionState::Checking => {
575 update_interval(check_interval);
577 }
578 ConnectionState::Connected | ConnectionState::Disconnected => {
579 update_interval(keepalive_interval);
580 }
581 _ => {}
582 };
583 update_interval(disconnected_timeout);
585 update_interval(failed_timeout);
586 interval
587 }
588
589 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
591 if let Some(pair_index) = self.get_selected_pair() {
592 let candidate_pair = &self.candidate_pairs[pair_index];
593 Some((
594 &self.local_candidates[candidate_pair.local_index],
595 &self.remote_candidates[candidate_pair.remote_index],
596 ))
597 } else {
598 None
599 }
600 }
601
602 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
603 if let Some(pair_index) = self.get_best_available_pair() {
604 let candidate_pair = &self.candidate_pairs[pair_index];
605 Some((
606 &self.local_candidates[candidate_pair.local_index],
607 &self.remote_candidates[candidate_pair.remote_index],
608 ))
609 } else {
610 None
611 }
612 }
613
614 pub fn start_connectivity_checks(
616 &mut self,
617 is_controlling: bool,
618 remote_ufrag: String,
619 remote_pwd: String,
620 ) -> Result<()> {
621 debug!(
622 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
623 is_controlling, remote_ufrag, remote_pwd
624 );
625 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
626 self.is_controlling = is_controlling;
627 self.start();
628
629 self.update_connection_state(ConnectionState::Checking);
630 self.request_connectivity_check();
631
632 Ok(())
633 }
634
635 pub fn restart(
638 &mut self,
639 mut ufrag: String,
640 mut pwd: String,
641 keep_local_candidates: bool,
642 ) -> Result<()> {
643 if ufrag.is_empty() {
644 ufrag = generate_ufrag();
645 }
646 if pwd.is_empty() {
647 pwd = generate_pwd();
648 }
649
650 if ufrag.len() * 8 < 24 {
651 return Err(Error::ErrLocalUfragInsufficientBits);
652 }
653 if pwd.len() * 8 < 128 {
654 return Err(Error::ErrLocalPwdInsufficientBits);
655 }
656
657 self.ufrag_pwd.local_credentials.ufrag = ufrag;
659 self.ufrag_pwd.local_credentials.pwd = pwd;
660 self.ufrag_pwd.remote_credentials = None;
661
662 self.pending_binding_requests = vec![];
663
664 self.candidate_pairs = vec![];
665
666 self.set_selected_pair(None);
667 self.delete_all_candidates(keep_local_candidates);
668 self.start();
669
670 if self.connection_state != ConnectionState::New {
673 self.update_connection_state(ConnectionState::Checking);
674 }
675
676 Ok(())
677 }
678
679 pub fn get_local_candidates(&self) -> &[Candidate] {
681 &self.local_candidates
682 }
683
684 fn contact(&mut self, now: Instant) {
685 if self.connection_state == ConnectionState::Failed {
686 self.last_connection_state = self.connection_state;
689 return;
690 }
691 if self.connection_state == ConnectionState::Checking {
692 if self.last_connection_state != self.connection_state {
694 self.checking_duration = now;
695 }
696
697 if now
699 .checked_duration_since(self.checking_duration)
700 .unwrap_or_else(|| Duration::from_secs(0))
701 > self.disconnected_timeout + self.failed_timeout
702 {
703 self.update_connection_state(ConnectionState::Failed);
704 self.last_connection_state = self.connection_state;
705 return;
706 }
707 }
708
709 self.contact_candidates();
710
711 self.last_connection_state = self.connection_state;
712 self.last_checking_time = now;
713 }
714
715 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
716 if self.connection_state != new_state {
717 if new_state == ConnectionState::Failed {
719 self.set_selected_pair(None);
720 self.delete_all_candidates(false);
721 }
722
723 info!(
724 "[{}]: Setting new connection state: {}",
725 self.get_name(),
726 new_state
727 );
728 self.connection_state = new_state;
729 self.event_outs
730 .push_back(Event::ConnectionStateChange(new_state));
731 }
732 }
733
734 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
735 if let Some(pair_index) = selected_pair {
736 trace!(
737 "[{}]: Set selected candidate pair: {:?}",
738 self.get_name(),
739 self.candidate_pairs[pair_index]
740 );
741
742 self.candidate_pairs[pair_index].nominated = true;
743 self.selected_pair = Some(pair_index);
744
745 self.update_connection_state(ConnectionState::Connected);
746
747 let candidate_pair = &self.candidate_pairs[pair_index];
749 self.event_outs
750 .push_back(Event::SelectedCandidatePairChange(
751 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
752 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
753 ));
754 } else {
755 self.selected_pair = None;
756 }
757 }
758
759 pub(crate) fn ping_all_candidates(&mut self) {
760 let mut pairs: Vec<(usize, usize)> = vec![];
761
762 let name = self.get_name().to_string();
763 if self.candidate_pairs.is_empty() {
764 warn!(
765 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
766 name,
767 );
768 }
769 for p in &mut self.candidate_pairs {
770 if p.state == CandidatePairState::Waiting {
771 p.state = CandidatePairState::InProgress;
772 } else if p.state != CandidatePairState::InProgress {
773 continue;
774 }
775
776 if p.binding_request_count > self.max_binding_requests {
777 trace!(
778 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
779 name,
780 *p,
781 self.local_candidates[p.local_index].addr(),
782 self.remote_candidates[p.remote_index].addr()
783 );
784 p.state = CandidatePairState::Failed;
785 } else {
786 p.binding_request_count += 1;
787 let local = p.local_index;
788 let remote = p.remote_index;
789 pairs.push((local, remote));
790 }
791 }
792
793 if !pairs.is_empty() {
794 trace!(
795 "[{}]: pinging all {} candidates",
796 self.get_name(),
797 pairs.len()
798 );
799 }
800
801 for (local, remote) in pairs {
802 self.ping_candidate(local, remote);
803 }
804 }
805
806 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
807 let p = CandidatePair::new(
808 local_index,
809 remote_index,
810 self.local_candidates[local_index].priority(),
811 self.remote_candidates[remote_index].priority(),
812 self.is_controlling,
813 );
814 self.candidate_pairs.push(p);
815 }
816
817 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
818 for (index, p) in self.candidate_pairs.iter().enumerate() {
819 if p.local_index == local_index && p.remote_index == remote_index {
820 return Some(index);
821 }
822 }
823 None
824 }
825
826 pub(crate) fn validate_selected_pair(&mut self) -> bool {
829 let (valid, disconnected_time) = {
830 self.selected_pair.as_ref().map_or_else(
831 || (false, Duration::from_secs(0)),
832 |&pair_index| {
833 let remote_index = self.candidate_pairs[pair_index].remote_index;
834
835 let disconnected_time = Instant::now()
836 .duration_since(self.remote_candidates[remote_index].last_received());
837 (true, disconnected_time)
838 },
839 )
840 };
841
842 if valid {
843 let mut total_time_to_failure = self.failed_timeout;
845 if total_time_to_failure != Duration::from_secs(0) {
846 total_time_to_failure += self.disconnected_timeout;
847 }
848
849 if total_time_to_failure != Duration::from_secs(0)
850 && disconnected_time > total_time_to_failure
851 {
852 self.update_connection_state(ConnectionState::Failed);
853 } else if self.disconnected_timeout != Duration::from_secs(0)
854 && disconnected_time > self.disconnected_timeout
855 {
856 self.update_connection_state(ConnectionState::Disconnected);
857 } else {
858 self.update_connection_state(ConnectionState::Connected);
859 }
860 }
861
862 valid
863 }
864
865 pub(crate) fn check_keepalive(&mut self) {
869 let (local_index, remote_index) = {
870 self.selected_pair
871 .as_ref()
872 .map_or((None, None), |&pair_index| {
873 let p = &self.candidate_pairs[pair_index];
874 (Some(p.local_index), Some(p.remote_index))
875 })
876 };
877
878 if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
879 let last_sent =
880 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
881
882 let last_received =
883 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
884
885 if (self.keepalive_interval != Duration::from_secs(0))
886 && ((last_sent > self.keepalive_interval)
887 || (last_received > self.keepalive_interval))
888 {
889 self.ping_candidate(local_index, remote_index);
892 }
893 }
894 }
895
896 fn request_connectivity_check(&mut self) {
897 if self.ufrag_pwd.remote_credentials.is_some() {
898 self.contact(Instant::now());
899 }
900 }
901
902 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
907 if !keep_local_candidates {
908 self.local_candidates.clear();
909 }
910 self.remote_candidates.clear();
911 }
912
913 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
914 for (index, c) in self.remote_candidates.iter().enumerate() {
915 if c.addr() == addr {
916 return Some(index);
917 }
918 }
919 None
920 }
921
922 pub(crate) fn find_local_candidate(
923 &self,
924 addr: SocketAddr,
925 transport_protocol: TransportProtocol,
926 ) -> Option<usize> {
927 for (index, c) in self.local_candidates.iter().enumerate() {
928 if c.network_type().to_protocol() != transport_protocol {
929 continue;
930 }
931
932 if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
936 if c.addr().ip() == addr.ip() {
937 return Some(index);
938 }
939 } else if c.addr() == addr {
940 return Some(index);
941 }
942 }
943 None
944 }
945
946 pub(crate) fn send_binding_request(
947 &mut self,
948 m: &Message,
949 local_index: usize,
950 remote_index: usize,
951 ) {
952 trace!(
953 "[{}]: ping STUN from {} to {}",
954 self.get_name(),
955 self.local_candidates[local_index],
956 self.remote_candidates[remote_index],
957 );
958
959 self.invalidate_pending_binding_requests(Instant::now());
960
961 self.pending_binding_requests.push(BindingRequest {
962 timestamp: Instant::now(),
963 transaction_id: m.transaction_id,
964 destination: self.remote_candidates[remote_index].addr(),
965 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
966 });
967
968 self.send_stun(m, local_index, remote_index);
969 }
970
971 pub(crate) fn send_binding_success(
972 &mut self,
973 m: &Message,
974 local_index: usize,
975 remote_index: usize,
976 ) {
977 let addr = self.remote_candidates[remote_index].addr();
978 let (ip, port) = (addr.ip(), addr.port());
979 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
980
981 let (out, result) = {
982 let mut out = Message::new();
983 let result = out.build(&[
984 Box::new(m.clone()),
985 Box::new(BINDING_SUCCESS),
986 Box::new(XorMappedAddress { ip, port }),
987 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
988 Box::new(FINGERPRINT),
989 ]);
990 (out, result)
991 };
992
993 if let Err(err) = result {
994 warn!(
995 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
996 self.get_name(),
997 self.local_candidates[local_index],
998 self.remote_candidates[remote_index],
999 err
1000 );
1001 } else {
1002 self.send_stun(&out, local_index, remote_index);
1003 }
1004 }
1005
1006 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1011 let pending_binding_requests = &mut self.pending_binding_requests;
1012 let initial_size = pending_binding_requests.len();
1013
1014 let mut temp = vec![];
1015 for binding_request in pending_binding_requests.drain(..) {
1016 if filter_time
1017 .checked_duration_since(binding_request.timestamp)
1018 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1019 .unwrap_or(true)
1020 {
1021 temp.push(binding_request);
1022 }
1023 }
1024
1025 *pending_binding_requests = temp;
1026 let bind_requests_remaining = pending_binding_requests.len();
1027 let bind_requests_removed = initial_size - bind_requests_remaining;
1028 if bind_requests_removed > 0 {
1029 trace!(
1030 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1031 self.get_name(),
1032 bind_requests_removed,
1033 bind_requests_remaining,
1034 );
1035 }
1036 }
1037
1038 pub(crate) fn handle_inbound_binding_success(
1041 &mut self,
1042 id: TransactionId,
1043 ) -> Option<BindingRequest> {
1044 self.invalidate_pending_binding_requests(Instant::now());
1045
1046 let pending_binding_requests = &mut self.pending_binding_requests;
1047 for i in 0..pending_binding_requests.len() {
1048 if pending_binding_requests[i].transaction_id == id {
1049 let valid_binding_request = pending_binding_requests.remove(i);
1050 return Some(valid_binding_request);
1051 }
1052 }
1053 None
1054 }
1055
1056 pub(crate) fn handle_inbound(
1058 &mut self,
1059 m: &mut Message,
1060 local_index: usize,
1061 remote_addr: SocketAddr,
1062 ) -> Result<()> {
1063 if m.typ.method != METHOD_BINDING
1064 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1065 || m.typ.class == CLASS_REQUEST
1066 || m.typ.class == CLASS_INDICATION)
1067 {
1068 trace!(
1069 "[{}]: unhandled STUN from {} to {} class({}) method({})",
1070 self.get_name(),
1071 remote_addr,
1072 self.local_candidates[local_index],
1073 m.typ.class,
1074 m.typ.method
1075 );
1076 return Err(Error::ErrUnhandledStunpacket);
1077 }
1078
1079 if self.is_controlling {
1080 if m.contains(ATTR_ICE_CONTROLLING) {
1081 debug!(
1082 "[{}]: inbound isControlling && a.isControlling == true",
1083 self.get_name(),
1084 );
1085 return Err(Error::ErrUnexpectedStunrequestMessage);
1086 } else if m.contains(ATTR_USE_CANDIDATE) {
1087 debug!(
1088 "[{}]: useCandidate && a.isControlling == true",
1089 self.get_name(),
1090 );
1091 return Err(Error::ErrUnexpectedStunrequestMessage);
1092 }
1093 } else if m.contains(ATTR_ICE_CONTROLLED) {
1094 debug!(
1095 "[{}]: inbound isControlled && a.isControlling == false",
1096 self.get_name(),
1097 );
1098 return Err(Error::ErrUnexpectedStunrequestMessage);
1099 }
1100
1101 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1102 debug!(
1103 "[{}]: ufrag_pwd.remote_credentials.is_none",
1104 self.get_name(),
1105 );
1106 return Err(Error::ErrPasswordEmpty);
1107 };
1108
1109 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1110 if m.typ.class == CLASS_SUCCESS_RESPONSE {
1111 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1112 {
1113 warn!(
1114 "[{}]: discard message from ({}), {}",
1115 self.get_name(),
1116 remote_addr,
1117 err
1118 );
1119 return Err(err);
1120 }
1121
1122 if let Some(remote_index) = &remote_candidate_index {
1123 self.handle_success_response(m, local_index, *remote_index, remote_addr);
1124 } else {
1125 warn!(
1126 "[{}]: discard success message from ({}), no such remote",
1127 self.get_name(),
1128 remote_addr
1129 );
1130 return Err(Error::ErrUnhandledStunpacket);
1131 }
1132 } else if m.typ.class == CLASS_REQUEST {
1133 {
1134 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1135 + ":"
1136 + remote_credentials.ufrag.as_str();
1137 if let Err(err) = assert_inbound_username(m, &username) {
1138 warn!(
1139 "[{}]: discard message from ({}), {}",
1140 self.get_name(),
1141 remote_addr,
1142 err
1143 );
1144 return Err(err);
1145 } else if let Err(err) = assert_inbound_message_integrity(
1146 m,
1147 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1148 ) {
1149 warn!(
1150 "[{}]: discard message from ({}), {}",
1151 self.get_name(),
1152 remote_addr,
1153 err
1154 );
1155 return Err(err);
1156 }
1157 }
1158
1159 if remote_candidate_index.is_none() {
1160 let network_type = self.local_candidates[local_index].network_type();
1162 let (ip, port) = (remote_addr.ip(), remote_addr.port());
1163
1164 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1165 base_config: CandidateConfig {
1166 network: network_type.to_string(),
1167 address: ip.to_string(),
1168 port,
1169 component: self.local_candidates[local_index].component(),
1170 ..CandidateConfig::default()
1171 },
1172 rel_addr: "".to_owned(),
1173 rel_port: 0,
1174 };
1175
1176 match prflx_candidate_config.new_candidate_peer_reflexive() {
1177 Ok(prflx_candidate) => {
1178 if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1179 && added
1180 {
1181 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1182 }
1183 }
1184 Err(err) => {
1185 error!(
1186 "[{}]: Failed to create new remote prflx candidate ({})",
1187 self.get_name(),
1188 err
1189 );
1190 return Err(err);
1191 }
1192 };
1193
1194 debug!(
1195 "[{}]: adding a new peer-reflexive candidate: {} ",
1196 self.get_name(),
1197 remote_addr
1198 );
1199 }
1200
1201 trace!(
1202 "[{}]: inbound STUN (Request) from {} to {}",
1203 self.get_name(),
1204 remote_addr,
1205 self.local_candidates[local_index]
1206 );
1207
1208 if let Some(remote_index) = &remote_candidate_index {
1209 self.handle_binding_request(m, local_index, *remote_index);
1210 }
1211 }
1212
1213 if let Some(remote_index) = remote_candidate_index {
1214 self.remote_candidates[remote_index].seen(false);
1215 }
1216
1217 Ok(())
1218 }
1219
1220 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1223 self.find_remote_candidate(remote_addr)
1224 .is_some_and(|remote_index| {
1225 self.remote_candidates[remote_index].seen(false);
1226 true
1227 })
1228 }
1229
1230 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1231 let peer_addr = self.remote_candidates[remote_index].addr();
1232 let local_addr = self.local_candidates[local_index].addr();
1233 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1234 TransportProtocol::TCP
1235 } else {
1236 TransportProtocol::UDP
1237 };
1238
1239 self.write_outs.push_back(TaggedBytesMut {
1240 now: Instant::now(),
1241 transport: TransportContext {
1242 local_addr,
1243 peer_addr,
1244 ecn: None,
1245 transport_protocol,
1246 },
1247 message: BytesMut::from(&msg.raw[..]),
1248 });
1249
1250 self.local_candidates[local_index].seen(true);
1251 }
1252
1253 fn handle_inbound_candidate_msg(
1254 &mut self,
1255 local_index: usize,
1256 msg: TaggedBytesMut,
1257 ) -> Result<()> {
1258 if is_stun_message(&msg.message) {
1259 let mut m = Message {
1260 raw: msg.message.to_vec(),
1261 ..Message::default()
1262 };
1263
1264 if let Err(err) = m.decode() {
1265 warn!(
1266 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1267 self.get_name(),
1268 msg.transport.local_addr,
1269 msg.transport.peer_addr,
1270 err
1271 );
1272 Err(err)
1273 } else {
1274 self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1275 }
1276 } else {
1277 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1278 warn!(
1279 "[{}]: Discarded message, not a valid remote candidate from {}",
1280 self.get_name(),
1281 msg.transport.peer_addr,
1282 );
1283 } else {
1284 warn!(
1285 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1286 self.get_name(),
1287 msg.transport.peer_addr
1288 );
1289 }
1290 Err(Error::ErrNonStunmessage)
1291 }
1292 }
1293
1294 pub(crate) fn get_name(&self) -> &str {
1295 if self.is_controlling {
1296 "controlling"
1297 } else {
1298 "controlled"
1299 }
1300 }
1301
1302 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1303 self.selected_pair
1304 }
1305
1306 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1307 let mut best_pair_index: Option<usize> = None;
1308
1309 for (index, p) in self.candidate_pairs.iter().enumerate() {
1310 if p.state == CandidatePairState::Failed {
1311 continue;
1312 }
1313
1314 if let Some(pair_index) = &mut best_pair_index {
1315 let b = &self.candidate_pairs[*pair_index];
1316 if b.priority() < p.priority() {
1317 *pair_index = index;
1318 }
1319 } else {
1320 best_pair_index = Some(index);
1321 }
1322 }
1323
1324 best_pair_index
1325 }
1326
1327 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1328 let mut best_pair_index: Option<usize> = None;
1329
1330 for (index, p) in self.candidate_pairs.iter().enumerate() {
1331 if p.state != CandidatePairState::Succeeded {
1332 continue;
1333 }
1334
1335 if let Some(pair_index) = &mut best_pair_index {
1336 let b = &self.candidate_pairs[*pair_index];
1337 if b.priority() < p.priority() {
1338 *pair_index = index;
1339 }
1340 } else {
1341 best_pair_index = Some(index);
1342 }
1343 }
1344
1345 best_pair_index
1346 }
1347}