1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40 pub(crate) timestamp: Instant,
41 pub(crate) transaction_id: TransactionId,
42 pub(crate) destination: SocketAddr,
43 pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47 fn default() -> Self {
48 Self {
49 timestamp: Instant::now(),
50 transaction_id: TransactionId::default(),
51 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52 is_use_candidate: false,
53 }
54 }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59 pub ufrag: String,
60 pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65 pub(crate) local_credentials: Credentials,
66 pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70 let mut username = Username::new(ATTR_USERNAME, String::new());
71 username.get_from(m)?;
72
73 if username.to_string() != expected_username {
74 return Err(Error::Other(format!(
75 "{:?} expected({}) actual({})",
76 Error::ErrMismatchUsername,
77 expected_username,
78 username,
79 )));
80 }
81
82 Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86 let message_integrity_attr = MessageIntegrity(key.to_vec());
87 message_integrity_attr.check(m)
88}
89
90pub enum Event {
91 ConnectionStateChange(ConnectionState),
92 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93}
94
95pub struct Agent {
97 pub(crate) tie_breaker: u64,
98 pub(crate) is_controlling: bool,
99 pub(crate) lite: bool,
100
101 pub(crate) start_time: Instant,
102
103 pub(crate) connection_state: ConnectionState,
104 pub(crate) last_connection_state: ConnectionState,
105
106 pub(crate) ufrag_pwd: UfragPwd,
108
109 pub(crate) local_candidates: Vec<Candidate>,
110 pub(crate) remote_candidates: Vec<Candidate>,
111 pub(crate) candidate_pairs: Vec<CandidatePair>,
112 pub(crate) nominated_pair: Option<usize>,
113 pub(crate) selected_pair: Option<usize>,
114
115 pub(crate) pending_binding_requests: Vec<BindingRequest>,
117
118 pub(crate) insecure_skip_verify: bool,
120 pub(crate) max_binding_requests: u16,
121 pub(crate) host_acceptance_min_wait: Duration,
122 pub(crate) srflx_acceptance_min_wait: Duration,
123 pub(crate) prflx_acceptance_min_wait: Duration,
124 pub(crate) relay_acceptance_min_wait: Duration,
125 pub(crate) disconnected_timeout: Duration,
128 pub(crate) failed_timeout: Duration,
131 pub(crate) keepalive_interval: Duration,
134 pub(crate) check_interval: Duration,
136 pub(crate) checking_duration: Instant,
137 pub(crate) last_checking_time: Instant,
138
139 pub(crate) mdns: Option<Mdns>,
140 pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
141
142 pub(crate) mdns_mode: MulticastDnsMode,
143 pub(crate) mdns_local_name: String,
144 pub(crate) mdns_local_ip: Option<IpAddr>,
145
146 pub(crate) candidate_types: Vec<CandidateType>,
147 pub(crate) network_types: Vec<NetworkType>,
148 pub(crate) urls: Vec<Url>,
149
150 pub(crate) write_outs: VecDeque<TaggedBytesMut>,
151 pub(crate) event_outs: VecDeque<Event>,
152}
153
154impl Default for Agent {
155 fn default() -> Self {
156 Self {
157 tie_breaker: 0,
158 is_controlling: false,
159 lite: false,
160 start_time: Instant::now(),
161 connection_state: Default::default(),
162 last_connection_state: Default::default(),
163 ufrag_pwd: Default::default(),
164 local_candidates: vec![],
165 remote_candidates: vec![],
166 candidate_pairs: vec![],
167 nominated_pair: None,
168 selected_pair: None,
169 pending_binding_requests: vec![],
170 insecure_skip_verify: false,
171 max_binding_requests: 0,
172 host_acceptance_min_wait: Default::default(),
173 srflx_acceptance_min_wait: Default::default(),
174 prflx_acceptance_min_wait: Default::default(),
175 relay_acceptance_min_wait: Default::default(),
176 disconnected_timeout: Default::default(),
177 failed_timeout: Default::default(),
178 keepalive_interval: Default::default(),
179 check_interval: Default::default(),
180 checking_duration: Instant::now(),
181 last_checking_time: Instant::now(),
182 mdns_mode: MulticastDnsMode::Disabled,
183 mdns_local_name: "".to_owned(),
184 mdns_local_ip: None,
185 mdns_queries: HashMap::new(),
186 mdns: None,
187 candidate_types: vec![],
188 network_types: vec![],
189 urls: vec![],
190 write_outs: Default::default(),
191 event_outs: Default::default(),
192 }
193 }
194}
195
196impl Agent {
197 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
199 let mut mdns_local_name = config.multicast_dns_local_name.clone();
200 if mdns_local_name.is_empty() {
201 mdns_local_name = generate_multicast_dns_name();
202 }
203
204 if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
205 return Err(Error::ErrInvalidMulticastDnshostName);
206 }
207
208 let mdns_mode = config.multicast_dns_mode;
209 let mdns = create_multicast_dns(
210 mdns_mode,
211 &mdns_local_name,
212 &config.multicast_dns_local_ip,
213 &config.multicast_dns_query_timeout,
214 )
215 .unwrap_or_else(|err| {
216 warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
219 None
220 });
221
222 let candidate_types = if config.candidate_types.is_empty() {
223 default_candidate_types()
224 } else {
225 config.candidate_types.clone()
226 };
227
228 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
229 {
230 return Err(Error::ErrLiteUsingNonHostCandidates);
231 }
232
233 if !config.urls.is_empty()
234 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
235 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
236 {
237 return Err(Error::ErrUselessUrlsProvided);
238 }
239
240 let mut agent = Self {
241 tie_breaker: rand::random::<u64>(),
242 is_controlling: config.is_controlling,
243 lite: config.lite,
244
245 start_time: Instant::now(),
246
247 nominated_pair: None,
248 selected_pair: None,
249 candidate_pairs: vec![],
250
251 connection_state: ConnectionState::New,
252
253 insecure_skip_verify: config.insecure_skip_verify,
254
255 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
259 max_binding_requests
260 } else {
261 DEFAULT_MAX_BINDING_REQUESTS
262 },
263 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
264 config.host_acceptance_min_wait
265 {
266 host_acceptance_min_wait
267 } else {
268 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
269 },
270 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
271 config.srflx_acceptance_min_wait
272 {
273 srflx_acceptance_min_wait
274 } else {
275 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
276 },
277 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
278 config.prflx_acceptance_min_wait
279 {
280 prflx_acceptance_min_wait
281 } else {
282 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
283 },
284 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
285 config.relay_acceptance_min_wait
286 {
287 relay_acceptance_min_wait
288 } else {
289 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
290 },
291
292 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
295 disconnected_timeout
296 } else {
297 DEFAULT_DISCONNECTED_TIMEOUT
298 },
299
300 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
303 failed_timeout
304 } else {
305 DEFAULT_FAILED_TIMEOUT
306 },
307
308 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
311 keepalive_interval
312 } else {
313 DEFAULT_KEEPALIVE_INTERVAL
314 },
315
316 check_interval: if config.check_interval == Duration::from_secs(0) {
318 DEFAULT_CHECK_INTERVAL
319 } else {
320 config.check_interval
321 },
322 checking_duration: Instant::now(),
323 last_checking_time: Instant::now(),
324 last_connection_state: ConnectionState::Unspecified,
325
326 mdns,
327 mdns_queries: HashMap::new(),
328
329 mdns_mode,
330 mdns_local_name,
331 mdns_local_ip: config.multicast_dns_local_ip,
332
333 ufrag_pwd: UfragPwd::default(),
334
335 local_candidates: vec![],
336 remote_candidates: vec![],
337
338 pending_binding_requests: vec![],
340
341 candidate_types,
342 network_types: config.network_types.clone(),
343 urls: config.urls.clone(),
344
345 write_outs: VecDeque::new(),
346 event_outs: VecDeque::new(),
347 };
348
349 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
351 {
352 let _ = agent.close();
353 return Err(err);
354 }
355
356 Ok(agent)
357 }
358
359 pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
361 if !self.network_types.is_empty() {
363 let candidate_network_type = c.network_type();
364 if !self.network_types.contains(&candidate_network_type) {
365 debug!(
366 "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
367 candidate_network_type, self.network_types
368 );
369 return Ok(false);
370 }
371 }
372
373 if c.candidate_type() == CandidateType::Host
374 && self.mdns_mode == MulticastDnsMode::QueryAndGather
375 && c.network_type == NetworkType::Udp4
376 && self
377 .mdns_local_ip
378 .is_some_and(|local_ip| local_ip == c.addr().ip())
379 {
380 trace!(
383 "mDNS hides local ip {} with local name {}",
384 c.address, self.mdns_local_name
385 );
386 c.address = self.mdns_local_name.clone();
387 }
388
389 for cand in &self.local_candidates {
390 if cand.equal(&c) {
391 return Ok(false);
392 }
393 }
394
395 self.local_candidates.push(c);
396 let local_index = self.local_candidates.len() - 1;
397 let local_tcp_type = self.local_candidates[local_index].tcp_type();
398
399 for remote_index in 0..self.remote_candidates.len() {
400 let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
401
402 let should_pair = match (local_tcp_type, remote_tcp_type) {
408 (TcpType::Active, TcpType::Passive) => true,
409 (TcpType::Passive, TcpType::Active) => true,
410 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
411 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
413 };
414
415 if should_pair {
416 self.add_pair(local_index, remote_index);
417 }
418 }
419
420 self.request_connectivity_check();
421
422 Ok(true)
423 }
424
425 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
427 if !self.network_types.is_empty() {
429 let candidate_network_type = c.network_type();
430 if !self.network_types.contains(&candidate_network_type) {
431 debug!(
432 "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
433 candidate_network_type, self.network_types
434 );
435 return Ok(false);
436 }
437 }
438
439 if c.tcp_type() == TcpType::Active {
443 debug!(
444 "Ignoring remote candidate with tcptype active: {}",
445 c.address()
446 );
447 return Ok(false);
448 }
449
450 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
452 if self.mdns_mode == MulticastDnsMode::Disabled {
453 warn!(
454 "remote mDNS candidate added, but mDNS is disabled: ({})",
455 c.address()
456 );
457 return Ok(false);
458 }
459
460 if c.candidate_type() != CandidateType::Host {
461 return Err(Error::ErrAddressParseFailed);
462 }
463
464 if let Some(mdns_conn) = &mut self.mdns {
465 let query_id = mdns_conn.query(c.address());
466 self.mdns_queries.insert(query_id, c);
467 }
468
469 return Ok(false);
470 }
471
472 self.trigger_request_connectivity_check(vec![c]);
473 Ok(true)
474 }
475
476 fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
477 for c in remote_candidates {
478 if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
479 let remote_tcp_type = c.tcp_type();
480 self.remote_candidates.push(c);
481 let remote_index = self.remote_candidates.len() - 1;
482
483 for local_index in 0..self.local_candidates.len() {
485 let local_tcp_type = self.local_candidates[local_index].tcp_type();
486
487 let should_pair = match (local_tcp_type, remote_tcp_type) {
493 (TcpType::Active, TcpType::Passive) => true,
494 (TcpType::Passive, TcpType::Active) => true,
495 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
496 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
498 };
499
500 if should_pair {
501 self.add_pair(local_index, remote_index);
502 }
503 }
504
505 self.request_connectivity_check();
506 }
507 }
508 }
509
510 pub fn set_remote_credentials(
512 &mut self,
513 remote_ufrag: String,
514 remote_pwd: String,
515 ) -> Result<()> {
516 if remote_ufrag.is_empty() {
517 return Err(Error::ErrRemoteUfragEmpty);
518 } else if remote_pwd.is_empty() {
519 return Err(Error::ErrRemotePwdEmpty);
520 }
521
522 self.ufrag_pwd.remote_credentials = Some(Credentials {
523 ufrag: remote_ufrag,
524 pwd: remote_pwd,
525 });
526
527 Ok(())
528 }
529
530 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
532 self.ufrag_pwd.remote_credentials.as_ref()
533 }
534
535 pub fn get_local_credentials(&self) -> &Credentials {
537 &self.ufrag_pwd.local_credentials
538 }
539
540 pub fn role(&self) -> bool {
541 self.is_controlling
542 }
543
544 pub fn set_role(&mut self, is_controlling: bool) {
545 self.is_controlling = is_controlling;
546 }
547
548 pub fn state(&self) -> ConnectionState {
549 self.connection_state
550 }
551
552 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
553 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
554 .is_some()
555 && self.validate_non_stun_traffic(transport.peer_addr)
556 }
557
558 fn get_timeout_interval(&self) -> Duration {
559 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
560 self.check_interval,
561 self.keepalive_interval,
562 self.disconnected_timeout,
563 self.failed_timeout,
564 );
565 let mut interval = DEFAULT_CHECK_INTERVAL;
566
567 let mut update_interval = |x: Duration| {
568 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
569 interval = x;
570 }
571 };
572
573 match self.last_connection_state {
574 ConnectionState::New | ConnectionState::Checking => {
575 update_interval(check_interval);
577 }
578 ConnectionState::Connected | ConnectionState::Disconnected => {
579 update_interval(keepalive_interval);
580 }
581 _ => {}
582 };
583 update_interval(disconnected_timeout);
585 update_interval(failed_timeout);
586 interval
587 }
588
589 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
591 if let Some(pair_index) = self.get_selected_pair() {
592 let candidate_pair = &self.candidate_pairs[pair_index];
593 Some((
594 &self.local_candidates[candidate_pair.local_index],
595 &self.remote_candidates[candidate_pair.remote_index],
596 ))
597 } else {
598 None
599 }
600 }
601
602 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
603 if let Some(pair_index) = self.get_best_available_pair() {
604 let candidate_pair = &self.candidate_pairs[pair_index];
605 Some((
606 &self.local_candidates[candidate_pair.local_index],
607 &self.remote_candidates[candidate_pair.remote_index],
608 ))
609 } else {
610 None
611 }
612 }
613
614 pub fn start_connectivity_checks(
616 &mut self,
617 is_controlling: bool,
618 remote_ufrag: String,
619 remote_pwd: String,
620 ) -> Result<()> {
621 debug!(
622 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
623 is_controlling, remote_ufrag, remote_pwd
624 );
625 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
626 self.is_controlling = is_controlling;
627 self.start();
628
629 self.update_connection_state(ConnectionState::Checking);
630 self.request_connectivity_check();
631
632 Ok(())
633 }
634
635 pub fn restart(
638 &mut self,
639 mut ufrag: String,
640 mut pwd: String,
641 keep_local_candidates: bool,
642 ) -> Result<()> {
643 if ufrag.is_empty() {
644 ufrag = generate_ufrag();
645 }
646 if pwd.is_empty() {
647 pwd = generate_pwd();
648 }
649
650 if ufrag.len() * 8 < 24 {
651 return Err(Error::ErrLocalUfragInsufficientBits);
652 }
653 if pwd.len() * 8 < 128 {
654 return Err(Error::ErrLocalPwdInsufficientBits);
655 }
656
657 self.ufrag_pwd.local_credentials.ufrag = ufrag;
659 self.ufrag_pwd.local_credentials.pwd = pwd;
660 self.ufrag_pwd.remote_credentials = None;
661
662 self.pending_binding_requests = vec![];
663
664 self.candidate_pairs = vec![];
665
666 self.set_selected_pair(None);
667 self.delete_all_candidates(keep_local_candidates);
668 self.start();
669
670 if self.connection_state != ConnectionState::New {
673 self.update_connection_state(ConnectionState::Checking);
674 }
675
676 Ok(())
677 }
678
679 pub fn get_local_candidates(&self) -> &[Candidate] {
681 &self.local_candidates
682 }
683
684 fn contact(&mut self, now: Instant) {
685 if self.connection_state == ConnectionState::Failed {
686 self.last_connection_state = self.connection_state;
689 return;
690 }
691 if self.connection_state == ConnectionState::Checking {
692 if self.last_connection_state != self.connection_state {
694 self.checking_duration = now;
695 }
696
697 if now
699 .checked_duration_since(self.checking_duration)
700 .unwrap_or_else(|| Duration::from_secs(0))
701 > self.disconnected_timeout + self.failed_timeout
702 {
703 self.update_connection_state(ConnectionState::Failed);
704 self.last_connection_state = self.connection_state;
705 return;
706 }
707 }
708
709 self.contact_candidates();
710
711 self.last_connection_state = self.connection_state;
712 self.last_checking_time = now;
713 }
714
715 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
716 if self.connection_state != new_state {
717 if new_state == ConnectionState::Failed {
719 self.set_selected_pair(None);
720 self.delete_all_candidates(false);
721 }
722
723 info!(
724 "[{}]: Setting new connection state: {}",
725 self.get_name(),
726 new_state
727 );
728 self.connection_state = new_state;
729 self.event_outs
730 .push_back(Event::ConnectionStateChange(new_state));
731 }
732 }
733
734 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
735 if let Some(pair_index) = selected_pair {
736 trace!(
737 "[{}]: Set selected candidate pair: {:?}",
738 self.get_name(),
739 self.candidate_pairs[pair_index]
740 );
741
742 self.candidate_pairs[pair_index].nominated = true;
743 self.selected_pair = Some(pair_index);
744
745 self.update_connection_state(ConnectionState::Connected);
746
747 let candidate_pair = &self.candidate_pairs[pair_index];
749 self.event_outs
750 .push_back(Event::SelectedCandidatePairChange(
751 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
752 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
753 ));
754 } else {
755 self.selected_pair = None;
756 }
757 }
758
759 pub(crate) fn ping_all_candidates(&mut self) {
760 let mut pairs: Vec<(usize, usize)> = vec![];
761
762 let name = self.get_name().to_string();
763 if self.candidate_pairs.is_empty() {
764 warn!(
765 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
766 name,
767 );
768 }
769 for p in &mut self.candidate_pairs {
770 if p.state == CandidatePairState::Waiting {
771 p.state = CandidatePairState::InProgress;
772 } else if p.state != CandidatePairState::InProgress {
773 continue;
774 }
775
776 if p.binding_request_count > self.max_binding_requests {
777 trace!(
778 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
779 name,
780 *p,
781 self.local_candidates[p.local_index].addr(),
782 self.remote_candidates[p.remote_index].addr()
783 );
784 p.state = CandidatePairState::Failed;
785 } else {
786 p.binding_request_count += 1;
787 let local = p.local_index;
788 let remote = p.remote_index;
789 pairs.push((local, remote));
790 }
791 }
792
793 if !pairs.is_empty() {
794 trace!(
795 "[{}]: pinging all {} candidates",
796 self.get_name(),
797 pairs.len()
798 );
799 }
800
801 for (local, remote) in pairs {
802 self.ping_candidate(local, remote);
803 }
804 }
805
806 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
807 let p = CandidatePair::new(
808 local_index,
809 remote_index,
810 self.local_candidates[local_index].priority(),
811 self.remote_candidates[remote_index].priority(),
812 self.is_controlling,
813 );
814 self.candidate_pairs.push(p);
815 }
816
817 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
818 for (index, p) in self.candidate_pairs.iter().enumerate() {
819 if p.local_index == local_index && p.remote_index == remote_index {
820 return Some(index);
821 }
822 }
823 None
824 }
825
826 pub(crate) fn validate_selected_pair(&mut self) -> bool {
829 let (valid, disconnected_time) = {
830 self.selected_pair.as_ref().map_or_else(
831 || (false, Duration::from_secs(0)),
832 |&pair_index| {
833 let remote_index = self.candidate_pairs[pair_index].remote_index;
834
835 let disconnected_time = Instant::now()
836 .duration_since(self.remote_candidates[remote_index].last_received());
837 (true, disconnected_time)
838 },
839 )
840 };
841
842 if valid {
843 let mut total_time_to_failure = self.failed_timeout;
845 if total_time_to_failure != Duration::from_secs(0) {
846 total_time_to_failure += self.disconnected_timeout;
847 }
848
849 if total_time_to_failure != Duration::from_secs(0)
850 && disconnected_time > total_time_to_failure
851 {
852 self.update_connection_state(ConnectionState::Failed);
853 } else if self.disconnected_timeout != Duration::from_secs(0)
854 && disconnected_time > self.disconnected_timeout
855 {
856 self.update_connection_state(ConnectionState::Disconnected);
857 } else {
858 self.update_connection_state(ConnectionState::Connected);
859 }
860 }
861
862 valid
863 }
864
865 pub(crate) fn check_keepalive(&mut self) {
869 let (local_index, remote_index, pair_index) = {
870 self.selected_pair
871 .as_ref()
872 .map_or((None, None, None), |&pair_index| {
873 let p = &self.candidate_pairs[pair_index];
874 (Some(p.local_index), Some(p.remote_index), Some(pair_index))
875 })
876 };
877
878 if let (Some(local_index), Some(remote_index), Some(pair_index)) =
879 (local_index, remote_index, pair_index)
880 {
881 let last_sent =
882 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
883
884 let last_received =
885 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
886
887 if (self.keepalive_interval != Duration::from_secs(0))
888 && ((last_sent > self.keepalive_interval)
889 || (last_received > self.keepalive_interval))
890 {
891 self.candidate_pairs[pair_index].on_consent_request_sent();
895 self.ping_candidate(local_index, remote_index);
896 }
897 }
898 }
899
900 fn request_connectivity_check(&mut self) {
901 if self.ufrag_pwd.remote_credentials.is_some() {
902 self.contact(Instant::now());
903 }
904 }
905
906 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
911 if !keep_local_candidates {
912 self.local_candidates.clear();
913 }
914 self.remote_candidates.clear();
915 }
916
917 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
918 for (index, c) in self.remote_candidates.iter().enumerate() {
919 if c.addr() == addr {
920 return Some(index);
921 }
922 }
923 None
924 }
925
926 pub(crate) fn find_local_candidate(
927 &self,
928 addr: SocketAddr,
929 transport_protocol: TransportProtocol,
930 ) -> Option<usize> {
931 for (index, c) in self.local_candidates.iter().enumerate() {
932 if c.network_type().to_protocol() != transport_protocol {
933 continue;
934 }
935
936 if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
940 if c.addr().ip() == addr.ip() {
941 return Some(index);
942 }
943 } else if c.addr() == addr {
944 return Some(index);
945 } else if let Some(related_address) = c.related_address()
946 && related_address.address == addr.ip().to_string()
947 && related_address.port == addr.port()
948 {
949 return Some(index);
950 }
951 }
952 None
953 }
954
955 pub(crate) fn send_binding_request(
956 &mut self,
957 m: &Message,
958 local_index: usize,
959 remote_index: usize,
960 ) {
961 trace!(
962 "[{}]: ping STUN from {} to {}",
963 self.get_name(),
964 self.local_candidates[local_index],
965 self.remote_candidates[remote_index],
966 );
967
968 self.invalidate_pending_binding_requests(Instant::now());
969
970 self.pending_binding_requests.push(BindingRequest {
971 timestamp: Instant::now(),
972 transaction_id: m.transaction_id,
973 destination: self.remote_candidates[remote_index].addr(),
974 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
975 });
976
977 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
979 self.candidate_pairs[pair_index].on_request_sent();
980 }
981
982 self.send_stun(m, local_index, remote_index);
983 }
984
985 pub(crate) fn send_binding_success(
986 &mut self,
987 m: &Message,
988 local_index: usize,
989 remote_index: usize,
990 ) {
991 let addr = self.remote_candidates[remote_index].addr();
992 let (ip, port) = (addr.ip(), addr.port());
993 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
994
995 let (out, result) = {
996 let mut out = Message::new();
997 let result = out.build(&[
998 Box::new(m.clone()),
999 Box::new(BINDING_SUCCESS),
1000 Box::new(XorMappedAddress { ip, port }),
1001 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
1002 Box::new(FINGERPRINT),
1003 ]);
1004 (out, result)
1005 };
1006
1007 if let Err(err) = result {
1008 warn!(
1009 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
1010 self.get_name(),
1011 self.local_candidates[local_index],
1012 self.remote_candidates[remote_index],
1013 err
1014 );
1015 } else {
1016 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
1018 self.candidate_pairs[pair_index].on_response_sent();
1019 }
1020 self.send_stun(&out, local_index, remote_index);
1021 }
1022 }
1023
1024 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1029 let pending_binding_requests = &mut self.pending_binding_requests;
1030 let initial_size = pending_binding_requests.len();
1031
1032 let mut temp = vec![];
1033 for binding_request in pending_binding_requests.drain(..) {
1034 if filter_time
1035 .checked_duration_since(binding_request.timestamp)
1036 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1037 .unwrap_or(true)
1038 {
1039 temp.push(binding_request);
1040 }
1041 }
1042
1043 *pending_binding_requests = temp;
1044 let bind_requests_remaining = pending_binding_requests.len();
1045 let bind_requests_removed = initial_size - bind_requests_remaining;
1046 if bind_requests_removed > 0 {
1047 trace!(
1048 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1049 self.get_name(),
1050 bind_requests_removed,
1051 bind_requests_remaining,
1052 );
1053 }
1054 }
1055
1056 pub(crate) fn handle_inbound_binding_success(
1059 &mut self,
1060 id: TransactionId,
1061 ) -> Option<BindingRequest> {
1062 self.invalidate_pending_binding_requests(Instant::now());
1063
1064 let pending_binding_requests = &mut self.pending_binding_requests;
1065 for i in 0..pending_binding_requests.len() {
1066 if pending_binding_requests[i].transaction_id == id {
1067 let valid_binding_request = pending_binding_requests.remove(i);
1068 return Some(valid_binding_request);
1069 }
1070 }
1071 None
1072 }
1073
1074 pub(crate) fn handle_inbound(
1076 &mut self,
1077 now: Instant,
1078 m: &mut Message,
1079 local_index: usize,
1080 remote_addr: SocketAddr,
1081 ) -> Result<()> {
1082 if m.typ.method != METHOD_BINDING
1083 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1084 || m.typ.class == CLASS_REQUEST
1085 || m.typ.class == CLASS_INDICATION)
1086 {
1087 trace!(
1088 "[{}]: unhandled STUN from {} to {} class({}) method({})",
1089 self.get_name(),
1090 remote_addr,
1091 self.local_candidates[local_index],
1092 m.typ.class,
1093 m.typ.method
1094 );
1095 return Err(Error::ErrUnhandledStunpacket);
1096 }
1097
1098 if self.is_controlling {
1099 if m.contains(ATTR_ICE_CONTROLLING) {
1100 debug!(
1101 "[{}]: inbound isControlling && a.isControlling == true",
1102 self.get_name(),
1103 );
1104 return Err(Error::ErrUnexpectedStunrequestMessage);
1105 } else if m.contains(ATTR_USE_CANDIDATE) {
1106 debug!(
1107 "[{}]: useCandidate && a.isControlling == true",
1108 self.get_name(),
1109 );
1110 return Err(Error::ErrUnexpectedStunrequestMessage);
1111 }
1112 } else if m.contains(ATTR_ICE_CONTROLLED) {
1113 debug!(
1114 "[{}]: inbound isControlled && a.isControlling == false",
1115 self.get_name(),
1116 );
1117 return Err(Error::ErrUnexpectedStunrequestMessage);
1118 }
1119
1120 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1121 debug!(
1122 "[{}]: ufrag_pwd.remote_credentials.is_none",
1123 self.get_name(),
1124 );
1125 return Err(Error::ErrPasswordEmpty);
1126 };
1127
1128 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1129 if m.typ.class == CLASS_SUCCESS_RESPONSE {
1130 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1131 {
1132 warn!(
1133 "[{}]: discard message from ({}), {}",
1134 self.get_name(),
1135 remote_addr,
1136 err
1137 );
1138 return Err(err);
1139 }
1140
1141 if let Some(remote_index) = &remote_candidate_index {
1142 self.handle_success_response(now, m, local_index, *remote_index, remote_addr);
1143 } else {
1144 warn!(
1145 "[{}]: discard success message from ({}), no such remote",
1146 self.get_name(),
1147 remote_addr
1148 );
1149 return Err(Error::ErrUnhandledStunpacket);
1150 }
1151 } else if m.typ.class == CLASS_REQUEST {
1152 {
1153 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1154 + ":"
1155 + remote_credentials.ufrag.as_str();
1156 if let Err(err) = assert_inbound_username(m, &username) {
1157 warn!(
1158 "[{}]: discard message from ({}), {}",
1159 self.get_name(),
1160 remote_addr,
1161 err
1162 );
1163 return Err(err);
1164 } else if let Err(err) = assert_inbound_message_integrity(
1165 m,
1166 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1167 ) {
1168 warn!(
1169 "[{}]: discard message from ({}), {}",
1170 self.get_name(),
1171 remote_addr,
1172 err
1173 );
1174 return Err(err);
1175 }
1176 }
1177
1178 if remote_candidate_index.is_none() {
1179 let network_type = self.local_candidates[local_index].network_type();
1181 let (ip, port) = (remote_addr.ip(), remote_addr.port());
1182
1183 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1184 base_config: CandidateConfig {
1185 network: network_type.to_string(),
1186 address: ip.to_string(),
1187 port,
1188 component: self.local_candidates[local_index].component(),
1189 ..CandidateConfig::default()
1190 },
1191 rel_addr: "".to_owned(),
1192 rel_port: 0,
1193 };
1194
1195 match prflx_candidate_config.new_candidate_peer_reflexive() {
1196 Ok(prflx_candidate) => {
1197 if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1198 && added
1199 {
1200 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1201 }
1202 }
1203 Err(err) => {
1204 error!(
1205 "[{}]: Failed to create new remote prflx candidate ({})",
1206 self.get_name(),
1207 err
1208 );
1209 return Err(err);
1210 }
1211 };
1212
1213 debug!(
1214 "[{}]: adding a new peer-reflexive candidate: {} ",
1215 self.get_name(),
1216 remote_addr
1217 );
1218 }
1219
1220 trace!(
1221 "[{}]: inbound STUN (Request) from {} to {}",
1222 self.get_name(),
1223 remote_addr,
1224 self.local_candidates[local_index]
1225 );
1226
1227 if let Some(remote_index) = &remote_candidate_index {
1228 self.handle_binding_request(m, local_index, *remote_index);
1229 }
1230 }
1231
1232 if let Some(remote_index) = remote_candidate_index {
1233 self.remote_candidates[remote_index].seen(false);
1234 }
1235
1236 Ok(())
1237 }
1238
1239 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1242 self.find_remote_candidate(remote_addr)
1243 .is_some_and(|remote_index| {
1244 self.remote_candidates[remote_index].seen(false);
1245 true
1246 })
1247 }
1248
1249 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1250 let peer_addr = self.remote_candidates[remote_index].addr();
1251 let local_addr = self.local_candidates[local_index].addr();
1252 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1253 TransportProtocol::TCP
1254 } else {
1255 TransportProtocol::UDP
1256 };
1257
1258 self.write_outs.push_back(TaggedBytesMut {
1259 now: Instant::now(),
1260 transport: TransportContext {
1261 local_addr,
1262 peer_addr,
1263 ecn: None,
1264 transport_protocol,
1265 },
1266 message: BytesMut::from(&msg.raw[..]),
1267 });
1268
1269 self.local_candidates[local_index].seen(true);
1270 }
1271
1272 fn handle_inbound_candidate_msg(
1273 &mut self,
1274 local_index: usize,
1275 msg: TaggedBytesMut,
1276 ) -> Result<()> {
1277 if is_stun_message(&msg.message) {
1278 let mut m = Message {
1279 raw: msg.message.to_vec(),
1280 ..Message::default()
1281 };
1282
1283 if let Err(err) = m.decode() {
1284 warn!(
1285 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1286 self.get_name(),
1287 msg.transport.local_addr,
1288 msg.transport.peer_addr,
1289 err
1290 );
1291 Err(err)
1292 } else {
1293 self.handle_inbound(msg.now, &mut m, local_index, msg.transport.peer_addr)
1294 }
1295 } else {
1296 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1297 warn!(
1298 "[{}]: Discarded message, not a valid remote candidate from {}",
1299 self.get_name(),
1300 msg.transport.peer_addr,
1301 );
1302 } else {
1303 warn!(
1304 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1305 self.get_name(),
1306 msg.transport.peer_addr
1307 );
1308 }
1309 Err(Error::ErrNonStunmessage)
1310 }
1311 }
1312
1313 pub(crate) fn get_name(&self) -> &str {
1314 if self.is_controlling {
1315 "controlling"
1316 } else {
1317 "controlled"
1318 }
1319 }
1320
1321 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1322 self.selected_pair
1323 }
1324
1325 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1326 let mut best_pair_index: Option<usize> = None;
1327
1328 for (index, p) in self.candidate_pairs.iter().enumerate() {
1329 if p.state == CandidatePairState::Failed {
1330 continue;
1331 }
1332
1333 if let Some(pair_index) = &mut best_pair_index {
1334 let b = &self.candidate_pairs[*pair_index];
1335 if b.priority() < p.priority() {
1336 *pair_index = index;
1337 }
1338 } else {
1339 best_pair_index = Some(index);
1340 }
1341 }
1342
1343 best_pair_index
1344 }
1345
1346 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1347 let mut best_pair_index: Option<usize> = None;
1348
1349 for (index, p) in self.candidate_pairs.iter().enumerate() {
1350 if p.state != CandidatePairState::Succeeded {
1351 continue;
1352 }
1353
1354 if let Some(pair_index) = &mut best_pair_index {
1355 let b = &self.candidate_pairs[*pair_index];
1356 if b.priority() < p.priority() {
1357 *pair_index = index;
1358 }
1359 } else {
1360 best_pair_index = Some(index);
1361 }
1362 }
1363
1364 best_pair_index
1365 }
1366}