1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::tcp_type::TcpType;
32use crate::url::*;
33use shared::error::*;
34use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
35
36const ZERO_DURATION: Duration = Duration::from_secs(0);
37
38#[derive(Debug, Clone)]
39pub(crate) struct BindingRequest {
40 pub(crate) timestamp: Instant,
41 pub(crate) transaction_id: TransactionId,
42 pub(crate) destination: SocketAddr,
43 pub(crate) is_use_candidate: bool,
44}
45
46impl Default for BindingRequest {
47 fn default() -> Self {
48 Self {
49 timestamp: Instant::now(),
50 transaction_id: TransactionId::default(),
51 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
52 is_use_candidate: false,
53 }
54 }
55}
56
57#[derive(Default, Clone)]
58pub struct Credentials {
59 pub ufrag: String,
60 pub pwd: String,
61}
62
63#[derive(Default, Clone)]
64pub(crate) struct UfragPwd {
65 pub(crate) local_credentials: Credentials,
66 pub(crate) remote_credentials: Option<Credentials>,
67}
68
69fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
70 let mut username = Username::new(ATTR_USERNAME, String::new());
71 username.get_from(m)?;
72
73 if username.to_string() != expected_username {
74 return Err(Error::Other(format!(
75 "{:?} expected({}) actual({})",
76 Error::ErrMismatchUsername,
77 expected_username,
78 username,
79 )));
80 }
81
82 Ok(())
83}
84
85fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
86 let message_integrity_attr = MessageIntegrity(key.to_vec());
87 message_integrity_attr.check(m)
88}
89
90pub enum Event {
91 ConnectionStateChange(ConnectionState),
92 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
93}
94
95pub struct Agent {
97 pub(crate) tie_breaker: u64,
98 pub(crate) is_controlling: bool,
99 pub(crate) lite: bool,
100
101 pub(crate) start_time: Instant,
102
103 pub(crate) connection_state: ConnectionState,
104 pub(crate) last_connection_state: ConnectionState,
105
106 pub(crate) ufrag_pwd: UfragPwd,
108
109 pub(crate) local_candidates: Vec<Candidate>,
110 pub(crate) remote_candidates: Vec<Candidate>,
111 pub(crate) candidate_pairs: Vec<CandidatePair>,
112 pub(crate) nominated_pair: Option<usize>,
113 pub(crate) selected_pair: Option<usize>,
114
115 pub(crate) pending_binding_requests: Vec<BindingRequest>,
117
118 pub(crate) insecure_skip_verify: bool,
120 pub(crate) max_binding_requests: u16,
121 pub(crate) host_acceptance_min_wait: Duration,
122 pub(crate) srflx_acceptance_min_wait: Duration,
123 pub(crate) prflx_acceptance_min_wait: Duration,
124 pub(crate) relay_acceptance_min_wait: Duration,
125 pub(crate) disconnected_timeout: Duration,
128 pub(crate) failed_timeout: Duration,
131 pub(crate) keepalive_interval: Duration,
134 pub(crate) check_interval: Duration,
136 pub(crate) checking_duration: Instant,
137 pub(crate) last_checking_time: Instant,
138
139 pub(crate) mdns: Option<Mdns>,
140 pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
141
142 pub(crate) mdns_mode: MulticastDnsMode,
143 pub(crate) mdns_local_name: String,
144 pub(crate) mdns_local_ip: Option<IpAddr>,
145
146 pub(crate) candidate_types: Vec<CandidateType>,
147 pub(crate) network_types: Vec<NetworkType>,
148 pub(crate) urls: Vec<Url>,
149
150 pub(crate) write_outs: VecDeque<TaggedBytesMut>,
151 pub(crate) event_outs: VecDeque<Event>,
152}
153
154impl Default for Agent {
155 fn default() -> Self {
156 Self {
157 tie_breaker: 0,
158 is_controlling: false,
159 lite: false,
160 start_time: Instant::now(),
161 connection_state: Default::default(),
162 last_connection_state: Default::default(),
163 ufrag_pwd: Default::default(),
164 local_candidates: vec![],
165 remote_candidates: vec![],
166 candidate_pairs: vec![],
167 nominated_pair: None,
168 selected_pair: None,
169 pending_binding_requests: vec![],
170 insecure_skip_verify: false,
171 max_binding_requests: 0,
172 host_acceptance_min_wait: Default::default(),
173 srflx_acceptance_min_wait: Default::default(),
174 prflx_acceptance_min_wait: Default::default(),
175 relay_acceptance_min_wait: Default::default(),
176 disconnected_timeout: Default::default(),
177 failed_timeout: Default::default(),
178 keepalive_interval: Default::default(),
179 check_interval: Default::default(),
180 checking_duration: Instant::now(),
181 last_checking_time: Instant::now(),
182 mdns_mode: MulticastDnsMode::Disabled,
183 mdns_local_name: "".to_owned(),
184 mdns_local_ip: None,
185 mdns_queries: HashMap::new(),
186 mdns: None,
187 candidate_types: vec![],
188 network_types: vec![],
189 urls: vec![],
190 write_outs: Default::default(),
191 event_outs: Default::default(),
192 }
193 }
194}
195
196impl Agent {
197 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
199 let mut mdns_local_name = config.multicast_dns_local_name.clone();
200 if mdns_local_name.is_empty() {
201 mdns_local_name = generate_multicast_dns_name();
202 }
203
204 if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
205 return Err(Error::ErrInvalidMulticastDnshostName);
206 }
207
208 let mdns_mode = config.multicast_dns_mode;
209 let mdns = create_multicast_dns(
210 mdns_mode,
211 &mdns_local_name,
212 &config.multicast_dns_local_ip,
213 &config.multicast_dns_query_timeout,
214 )
215 .unwrap_or_else(|err| {
216 warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
219 None
220 });
221
222 let candidate_types = if config.candidate_types.is_empty() {
223 default_candidate_types()
224 } else {
225 config.candidate_types.clone()
226 };
227
228 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
229 {
230 return Err(Error::ErrLiteUsingNonHostCandidates);
231 }
232
233 if !config.urls.is_empty()
234 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
235 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
236 {
237 return Err(Error::ErrUselessUrlsProvided);
238 }
239
240 let mut agent = Self {
241 tie_breaker: rand::random::<u64>(),
242 is_controlling: config.is_controlling,
243 lite: config.lite,
244
245 start_time: Instant::now(),
246
247 nominated_pair: None,
248 selected_pair: None,
249 candidate_pairs: vec![],
250
251 connection_state: ConnectionState::New,
252
253 insecure_skip_verify: config.insecure_skip_verify,
254
255 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
259 max_binding_requests
260 } else {
261 DEFAULT_MAX_BINDING_REQUESTS
262 },
263 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
264 config.host_acceptance_min_wait
265 {
266 host_acceptance_min_wait
267 } else {
268 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
269 },
270 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
271 config.srflx_acceptance_min_wait
272 {
273 srflx_acceptance_min_wait
274 } else {
275 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
276 },
277 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
278 config.prflx_acceptance_min_wait
279 {
280 prflx_acceptance_min_wait
281 } else {
282 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
283 },
284 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
285 config.relay_acceptance_min_wait
286 {
287 relay_acceptance_min_wait
288 } else {
289 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
290 },
291
292 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
295 disconnected_timeout
296 } else {
297 DEFAULT_DISCONNECTED_TIMEOUT
298 },
299
300 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
303 failed_timeout
304 } else {
305 DEFAULT_FAILED_TIMEOUT
306 },
307
308 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
311 keepalive_interval
312 } else {
313 DEFAULT_KEEPALIVE_INTERVAL
314 },
315
316 check_interval: if config.check_interval == Duration::from_secs(0) {
318 DEFAULT_CHECK_INTERVAL
319 } else {
320 config.check_interval
321 },
322 checking_duration: Instant::now(),
323 last_checking_time: Instant::now(),
324 last_connection_state: ConnectionState::Unspecified,
325
326 mdns,
327 mdns_queries: HashMap::new(),
328
329 mdns_mode,
330 mdns_local_name,
331 mdns_local_ip: config.multicast_dns_local_ip,
332
333 ufrag_pwd: UfragPwd::default(),
334
335 local_candidates: vec![],
336 remote_candidates: vec![],
337
338 pending_binding_requests: vec![],
340
341 candidate_types,
342 network_types: config.network_types.clone(),
343 urls: config.urls.clone(),
344
345 write_outs: VecDeque::new(),
346 event_outs: VecDeque::new(),
347 };
348
349 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
351 {
352 let _ = agent.close();
353 return Err(err);
354 }
355
356 Ok(agent)
357 }
358
359 pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<bool> {
361 if !self.network_types.is_empty() {
363 let candidate_network_type = c.network_type();
364 if !self.network_types.contains(&candidate_network_type) {
365 debug!(
366 "Ignoring local candidate with network type {:?} (not in configured network types: {:?})",
367 candidate_network_type, self.network_types
368 );
369 return Ok(false);
370 }
371 }
372
373 if c.candidate_type() == CandidateType::Host
374 && self.mdns_mode == MulticastDnsMode::QueryAndGather
375 && c.network_type == NetworkType::Udp4
376 && self
377 .mdns_local_ip
378 .is_some_and(|local_ip| local_ip == c.addr().ip())
379 {
380 trace!(
383 "mDNS hides local ip {} with local name {}",
384 c.address, self.mdns_local_name
385 );
386 c.address = self.mdns_local_name.clone();
387 }
388
389 for cand in &self.local_candidates {
390 if cand.equal(&c) {
391 return Ok(false);
392 }
393 }
394
395 self.local_candidates.push(c);
396 let local_index = self.local_candidates.len() - 1;
397 let local_tcp_type = self.local_candidates[local_index].tcp_type();
398
399 for remote_index in 0..self.remote_candidates.len() {
400 let remote_tcp_type = self.remote_candidates[remote_index].tcp_type();
401
402 let should_pair = match (local_tcp_type, remote_tcp_type) {
408 (TcpType::Active, TcpType::Passive) => true,
409 (TcpType::Passive, TcpType::Active) => true,
410 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
411 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
413 };
414
415 if should_pair {
416 self.add_pair(local_index, remote_index);
417 }
418 }
419
420 self.request_connectivity_check();
421
422 Ok(true)
423 }
424
425 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<bool> {
427 if !self.network_types.is_empty() {
429 let candidate_network_type = c.network_type();
430 if !self.network_types.contains(&candidate_network_type) {
431 debug!(
432 "Ignoring remote candidate with network type {:?} (not in configured network types: {:?})",
433 candidate_network_type, self.network_types
434 );
435 return Ok(false);
436 }
437 }
438
439 if c.tcp_type() == TcpType::Active {
443 debug!(
444 "Ignoring remote candidate with tcptype active: {}",
445 c.address()
446 );
447 return Ok(false);
448 }
449
450 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
452 if self.mdns_mode == MulticastDnsMode::Disabled {
453 warn!(
454 "remote mDNS candidate added, but mDNS is disabled: ({})",
455 c.address()
456 );
457 return Ok(false);
458 }
459
460 if c.candidate_type() != CandidateType::Host {
461 return Err(Error::ErrAddressParseFailed);
462 }
463
464 if let Some(mdns_conn) = &mut self.mdns {
465 let query_id = mdns_conn.query(c.address());
466 self.mdns_queries.insert(query_id, c);
467 }
468
469 return Ok(false);
470 }
471
472 self.trigger_request_connectivity_check(vec![c]);
473 Ok(true)
474 }
475
476 fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
477 for c in remote_candidates {
478 if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
479 let remote_tcp_type = c.tcp_type();
480 self.remote_candidates.push(c);
481 let remote_index = self.remote_candidates.len() - 1;
482
483 for local_index in 0..self.local_candidates.len() {
485 let local_tcp_type = self.local_candidates[local_index].tcp_type();
486
487 let should_pair = match (local_tcp_type, remote_tcp_type) {
493 (TcpType::Active, TcpType::Passive) => true,
494 (TcpType::Passive, TcpType::Active) => true,
495 (TcpType::SimultaneousOpen, TcpType::SimultaneousOpen) => true,
496 (TcpType::Unspecified, TcpType::Unspecified) => true, _ => false,
498 };
499
500 if should_pair {
501 self.add_pair(local_index, remote_index);
502 }
503 }
504
505 self.request_connectivity_check();
506 }
507 }
508 }
509
510 pub fn set_remote_credentials(
512 &mut self,
513 remote_ufrag: String,
514 remote_pwd: String,
515 ) -> Result<()> {
516 if remote_ufrag.is_empty() {
517 return Err(Error::ErrRemoteUfragEmpty);
518 } else if remote_pwd.is_empty() {
519 return Err(Error::ErrRemotePwdEmpty);
520 }
521
522 self.ufrag_pwd.remote_credentials = Some(Credentials {
523 ufrag: remote_ufrag,
524 pwd: remote_pwd,
525 });
526
527 Ok(())
528 }
529
530 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
532 self.ufrag_pwd.remote_credentials.as_ref()
533 }
534
535 pub fn get_local_credentials(&self) -> &Credentials {
537 &self.ufrag_pwd.local_credentials
538 }
539
540 pub fn role(&self) -> bool {
541 self.is_controlling
542 }
543
544 pub fn set_role(&mut self, is_controlling: bool) {
545 self.is_controlling = is_controlling;
546 }
547
548 pub fn state(&self) -> ConnectionState {
549 self.connection_state
550 }
551
552 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
553 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
554 .is_some()
555 && self.validate_non_stun_traffic(transport.peer_addr)
556 }
557
558 fn get_timeout_interval(&self) -> Duration {
559 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
560 self.check_interval,
561 self.keepalive_interval,
562 self.disconnected_timeout,
563 self.failed_timeout,
564 );
565 let mut interval = DEFAULT_CHECK_INTERVAL;
566
567 let mut update_interval = |x: Duration| {
568 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
569 interval = x;
570 }
571 };
572
573 match self.last_connection_state {
574 ConnectionState::New | ConnectionState::Checking => {
575 update_interval(check_interval);
577 }
578 ConnectionState::Connected | ConnectionState::Disconnected => {
579 update_interval(keepalive_interval);
580 }
581 _ => {}
582 };
583 update_interval(disconnected_timeout);
585 update_interval(failed_timeout);
586 interval
587 }
588
589 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
591 if let Some(pair_index) = self.get_selected_pair() {
592 let candidate_pair = &self.candidate_pairs[pair_index];
593 Some((
594 &self.local_candidates[candidate_pair.local_index],
595 &self.remote_candidates[candidate_pair.remote_index],
596 ))
597 } else {
598 None
599 }
600 }
601
602 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
603 if let Some(pair_index) = self.get_best_available_pair() {
604 let candidate_pair = &self.candidate_pairs[pair_index];
605 Some((
606 &self.local_candidates[candidate_pair.local_index],
607 &self.remote_candidates[candidate_pair.remote_index],
608 ))
609 } else {
610 None
611 }
612 }
613
614 pub fn start_connectivity_checks(
616 &mut self,
617 is_controlling: bool,
618 remote_ufrag: String,
619 remote_pwd: String,
620 ) -> Result<()> {
621 debug!(
622 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
623 is_controlling, remote_ufrag, remote_pwd
624 );
625 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
626 self.is_controlling = is_controlling;
627 self.start();
628
629 self.update_connection_state(ConnectionState::Checking);
630 self.request_connectivity_check();
631
632 Ok(())
633 }
634
635 pub fn restart(
638 &mut self,
639 mut ufrag: String,
640 mut pwd: String,
641 keep_local_candidates: bool,
642 ) -> Result<()> {
643 if ufrag.is_empty() {
644 ufrag = generate_ufrag();
645 }
646 if pwd.is_empty() {
647 pwd = generate_pwd();
648 }
649
650 if ufrag.len() * 8 < 24 {
651 return Err(Error::ErrLocalUfragInsufficientBits);
652 }
653 if pwd.len() * 8 < 128 {
654 return Err(Error::ErrLocalPwdInsufficientBits);
655 }
656
657 self.ufrag_pwd.local_credentials.ufrag = ufrag;
659 self.ufrag_pwd.local_credentials.pwd = pwd;
660 self.ufrag_pwd.remote_credentials = None;
661
662 self.pending_binding_requests = vec![];
663
664 self.candidate_pairs = vec![];
665
666 self.set_selected_pair(None);
667 self.delete_all_candidates(keep_local_candidates);
668 self.start();
669
670 if self.connection_state != ConnectionState::New {
673 self.update_connection_state(ConnectionState::Checking);
674 }
675
676 Ok(())
677 }
678
679 pub fn get_local_candidates(&self) -> &[Candidate] {
681 &self.local_candidates
682 }
683
684 fn contact(&mut self, now: Instant) {
685 if self.connection_state == ConnectionState::Failed {
686 self.last_connection_state = self.connection_state;
689 return;
690 }
691 if self.connection_state == ConnectionState::Checking {
692 if self.last_connection_state != self.connection_state {
694 self.checking_duration = now;
695 }
696
697 if now
699 .checked_duration_since(self.checking_duration)
700 .unwrap_or_else(|| Duration::from_secs(0))
701 > self.disconnected_timeout + self.failed_timeout
702 {
703 self.update_connection_state(ConnectionState::Failed);
704 self.last_connection_state = self.connection_state;
705 return;
706 }
707 }
708
709 self.contact_candidates();
710
711 self.last_connection_state = self.connection_state;
712 self.last_checking_time = now;
713 }
714
715 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
716 if self.connection_state != new_state {
717 if new_state == ConnectionState::Failed {
719 self.set_selected_pair(None);
720 self.delete_all_candidates(false);
721 }
722
723 info!(
724 "[{}]: Setting new connection state: {}",
725 self.get_name(),
726 new_state
727 );
728 self.connection_state = new_state;
729 self.event_outs
730 .push_back(Event::ConnectionStateChange(new_state));
731 }
732 }
733
734 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
735 if let Some(pair_index) = selected_pair {
736 trace!(
737 "[{}]: Set selected candidate pair: {:?}",
738 self.get_name(),
739 self.candidate_pairs[pair_index]
740 );
741
742 self.candidate_pairs[pair_index].nominated = true;
743 self.selected_pair = Some(pair_index);
744
745 self.update_connection_state(ConnectionState::Connected);
746
747 let candidate_pair = &self.candidate_pairs[pair_index];
749 self.event_outs
750 .push_back(Event::SelectedCandidatePairChange(
751 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
752 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
753 ));
754 } else {
755 self.selected_pair = None;
756 }
757 }
758
759 pub(crate) fn ping_all_candidates(&mut self) {
760 let mut pairs: Vec<(usize, usize)> = vec![];
761
762 let name = self.get_name().to_string();
763 if self.candidate_pairs.is_empty() {
764 warn!(
765 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
766 name,
767 );
768 }
769 for p in &mut self.candidate_pairs {
770 if p.state == CandidatePairState::Waiting {
771 p.state = CandidatePairState::InProgress;
772 } else if p.state != CandidatePairState::InProgress {
773 continue;
774 }
775
776 if p.binding_request_count > self.max_binding_requests {
777 trace!(
778 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
779 name,
780 *p,
781 self.local_candidates[p.local_index].addr(),
782 self.remote_candidates[p.remote_index].addr()
783 );
784 p.state = CandidatePairState::Failed;
785 } else {
786 p.binding_request_count += 1;
787 let local = p.local_index;
788 let remote = p.remote_index;
789 pairs.push((local, remote));
790 }
791 }
792
793 if !pairs.is_empty() {
794 trace!(
795 "[{}]: pinging all {} candidates",
796 self.get_name(),
797 pairs.len()
798 );
799 }
800
801 for (local, remote) in pairs {
802 self.ping_candidate(local, remote);
803 }
804 }
805
806 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
807 let p = CandidatePair::new(
808 local_index,
809 remote_index,
810 self.local_candidates[local_index].priority(),
811 self.remote_candidates[remote_index].priority(),
812 self.is_controlling,
813 );
814 self.candidate_pairs.push(p);
815 }
816
817 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
818 for (index, p) in self.candidate_pairs.iter().enumerate() {
819 if p.local_index == local_index && p.remote_index == remote_index {
820 return Some(index);
821 }
822 }
823 None
824 }
825
826 pub(crate) fn validate_selected_pair(&mut self) -> bool {
829 let (valid, disconnected_time) = {
830 self.selected_pair.as_ref().map_or_else(
831 || (false, Duration::from_secs(0)),
832 |&pair_index| {
833 let remote_index = self.candidate_pairs[pair_index].remote_index;
834
835 let disconnected_time = Instant::now()
836 .duration_since(self.remote_candidates[remote_index].last_received());
837 (true, disconnected_time)
838 },
839 )
840 };
841
842 if valid {
843 let mut total_time_to_failure = self.failed_timeout;
845 if total_time_to_failure != Duration::from_secs(0) {
846 total_time_to_failure += self.disconnected_timeout;
847 }
848
849 if total_time_to_failure != Duration::from_secs(0)
850 && disconnected_time > total_time_to_failure
851 {
852 self.update_connection_state(ConnectionState::Failed);
853 } else if self.disconnected_timeout != Duration::from_secs(0)
854 && disconnected_time > self.disconnected_timeout
855 {
856 self.update_connection_state(ConnectionState::Disconnected);
857 } else {
858 self.update_connection_state(ConnectionState::Connected);
859 }
860 }
861
862 valid
863 }
864
865 pub(crate) fn check_keepalive(&mut self) {
869 let (local_index, remote_index, pair_index) = {
870 self.selected_pair
871 .as_ref()
872 .map_or((None, None, None), |&pair_index| {
873 let p = &self.candidate_pairs[pair_index];
874 (Some(p.local_index), Some(p.remote_index), Some(pair_index))
875 })
876 };
877
878 if let (Some(local_index), Some(remote_index), Some(pair_index)) =
879 (local_index, remote_index, pair_index)
880 {
881 let last_sent =
882 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
883
884 let last_received =
885 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
886
887 if (self.keepalive_interval != Duration::from_secs(0))
888 && ((last_sent > self.keepalive_interval)
889 || (last_received > self.keepalive_interval))
890 {
891 self.candidate_pairs[pair_index].on_consent_request_sent();
895 self.ping_candidate(local_index, remote_index);
896 }
897 }
898 }
899
900 fn request_connectivity_check(&mut self) {
901 if self.ufrag_pwd.remote_credentials.is_some() {
902 self.contact(Instant::now());
903 }
904 }
905
906 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
911 if !keep_local_candidates {
912 self.local_candidates.clear();
913 }
914 self.remote_candidates.clear();
915 }
916
917 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
918 for (index, c) in self.remote_candidates.iter().enumerate() {
919 if c.addr() == addr {
920 return Some(index);
921 }
922 }
923 None
924 }
925
926 pub(crate) fn find_local_candidate(
927 &self,
928 addr: SocketAddr,
929 transport_protocol: TransportProtocol,
930 ) -> Option<usize> {
931 for (index, c) in self.local_candidates.iter().enumerate() {
932 if c.network_type().to_protocol() != transport_protocol {
933 continue;
934 }
935
936 if c.tcp_type() == TcpType::Active && transport_protocol == TransportProtocol::TCP {
940 if c.addr().ip() == addr.ip() {
941 return Some(index);
942 }
943 } else if c.addr() == addr {
944 return Some(index);
945 }
946 }
947 None
948 }
949
950 pub(crate) fn send_binding_request(
951 &mut self,
952 m: &Message,
953 local_index: usize,
954 remote_index: usize,
955 ) {
956 trace!(
957 "[{}]: ping STUN from {} to {}",
958 self.get_name(),
959 self.local_candidates[local_index],
960 self.remote_candidates[remote_index],
961 );
962
963 self.invalidate_pending_binding_requests(Instant::now());
964
965 self.pending_binding_requests.push(BindingRequest {
966 timestamp: Instant::now(),
967 transaction_id: m.transaction_id,
968 destination: self.remote_candidates[remote_index].addr(),
969 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
970 });
971
972 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
974 self.candidate_pairs[pair_index].on_request_sent();
975 }
976
977 self.send_stun(m, local_index, remote_index);
978 }
979
980 pub(crate) fn send_binding_success(
981 &mut self,
982 m: &Message,
983 local_index: usize,
984 remote_index: usize,
985 ) {
986 let addr = self.remote_candidates[remote_index].addr();
987 let (ip, port) = (addr.ip(), addr.port());
988 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
989
990 let (out, result) = {
991 let mut out = Message::new();
992 let result = out.build(&[
993 Box::new(m.clone()),
994 Box::new(BINDING_SUCCESS),
995 Box::new(XorMappedAddress { ip, port }),
996 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
997 Box::new(FINGERPRINT),
998 ]);
999 (out, result)
1000 };
1001
1002 if let Err(err) = result {
1003 warn!(
1004 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
1005 self.get_name(),
1006 self.local_candidates[local_index],
1007 self.remote_candidates[remote_index],
1008 err
1009 );
1010 } else {
1011 if let Some(pair_index) = self.find_pair(local_index, remote_index) {
1013 self.candidate_pairs[pair_index].on_response_sent();
1014 }
1015 self.send_stun(&out, local_index, remote_index);
1016 }
1017 }
1018
1019 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
1024 let pending_binding_requests = &mut self.pending_binding_requests;
1025 let initial_size = pending_binding_requests.len();
1026
1027 let mut temp = vec![];
1028 for binding_request in pending_binding_requests.drain(..) {
1029 if filter_time
1030 .checked_duration_since(binding_request.timestamp)
1031 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
1032 .unwrap_or(true)
1033 {
1034 temp.push(binding_request);
1035 }
1036 }
1037
1038 *pending_binding_requests = temp;
1039 let bind_requests_remaining = pending_binding_requests.len();
1040 let bind_requests_removed = initial_size - bind_requests_remaining;
1041 if bind_requests_removed > 0 {
1042 trace!(
1043 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
1044 self.get_name(),
1045 bind_requests_removed,
1046 bind_requests_remaining,
1047 );
1048 }
1049 }
1050
1051 pub(crate) fn handle_inbound_binding_success(
1054 &mut self,
1055 id: TransactionId,
1056 ) -> Option<BindingRequest> {
1057 self.invalidate_pending_binding_requests(Instant::now());
1058
1059 let pending_binding_requests = &mut self.pending_binding_requests;
1060 for i in 0..pending_binding_requests.len() {
1061 if pending_binding_requests[i].transaction_id == id {
1062 let valid_binding_request = pending_binding_requests.remove(i);
1063 return Some(valid_binding_request);
1064 }
1065 }
1066 None
1067 }
1068
1069 pub(crate) fn handle_inbound(
1071 &mut self,
1072 now: Instant,
1073 m: &mut Message,
1074 local_index: usize,
1075 remote_addr: SocketAddr,
1076 ) -> Result<()> {
1077 if m.typ.method != METHOD_BINDING
1078 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
1079 || m.typ.class == CLASS_REQUEST
1080 || m.typ.class == CLASS_INDICATION)
1081 {
1082 trace!(
1083 "[{}]: unhandled STUN from {} to {} class({}) method({})",
1084 self.get_name(),
1085 remote_addr,
1086 self.local_candidates[local_index],
1087 m.typ.class,
1088 m.typ.method
1089 );
1090 return Err(Error::ErrUnhandledStunpacket);
1091 }
1092
1093 if self.is_controlling {
1094 if m.contains(ATTR_ICE_CONTROLLING) {
1095 debug!(
1096 "[{}]: inbound isControlling && a.isControlling == true",
1097 self.get_name(),
1098 );
1099 return Err(Error::ErrUnexpectedStunrequestMessage);
1100 } else if m.contains(ATTR_USE_CANDIDATE) {
1101 debug!(
1102 "[{}]: useCandidate && a.isControlling == true",
1103 self.get_name(),
1104 );
1105 return Err(Error::ErrUnexpectedStunrequestMessage);
1106 }
1107 } else if m.contains(ATTR_ICE_CONTROLLED) {
1108 debug!(
1109 "[{}]: inbound isControlled && a.isControlling == false",
1110 self.get_name(),
1111 );
1112 return Err(Error::ErrUnexpectedStunrequestMessage);
1113 }
1114
1115 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1116 debug!(
1117 "[{}]: ufrag_pwd.remote_credentials.is_none",
1118 self.get_name(),
1119 );
1120 return Err(Error::ErrPasswordEmpty);
1121 };
1122
1123 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1124 if m.typ.class == CLASS_SUCCESS_RESPONSE {
1125 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1126 {
1127 warn!(
1128 "[{}]: discard message from ({}), {}",
1129 self.get_name(),
1130 remote_addr,
1131 err
1132 );
1133 return Err(err);
1134 }
1135
1136 if let Some(remote_index) = &remote_candidate_index {
1137 self.handle_success_response(now, m, local_index, *remote_index, remote_addr);
1138 } else {
1139 warn!(
1140 "[{}]: discard success message from ({}), no such remote",
1141 self.get_name(),
1142 remote_addr
1143 );
1144 return Err(Error::ErrUnhandledStunpacket);
1145 }
1146 } else if m.typ.class == CLASS_REQUEST {
1147 {
1148 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1149 + ":"
1150 + remote_credentials.ufrag.as_str();
1151 if let Err(err) = assert_inbound_username(m, &username) {
1152 warn!(
1153 "[{}]: discard message from ({}), {}",
1154 self.get_name(),
1155 remote_addr,
1156 err
1157 );
1158 return Err(err);
1159 } else if let Err(err) = assert_inbound_message_integrity(
1160 m,
1161 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1162 ) {
1163 warn!(
1164 "[{}]: discard message from ({}), {}",
1165 self.get_name(),
1166 remote_addr,
1167 err
1168 );
1169 return Err(err);
1170 }
1171 }
1172
1173 if remote_candidate_index.is_none() {
1174 let network_type = self.local_candidates[local_index].network_type();
1176 let (ip, port) = (remote_addr.ip(), remote_addr.port());
1177
1178 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1179 base_config: CandidateConfig {
1180 network: network_type.to_string(),
1181 address: ip.to_string(),
1182 port,
1183 component: self.local_candidates[local_index].component(),
1184 ..CandidateConfig::default()
1185 },
1186 rel_addr: "".to_owned(),
1187 rel_port: 0,
1188 };
1189
1190 match prflx_candidate_config.new_candidate_peer_reflexive() {
1191 Ok(prflx_candidate) => {
1192 if let Ok(added) = self.add_remote_candidate(prflx_candidate)
1193 && added
1194 {
1195 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1196 }
1197 }
1198 Err(err) => {
1199 error!(
1200 "[{}]: Failed to create new remote prflx candidate ({})",
1201 self.get_name(),
1202 err
1203 );
1204 return Err(err);
1205 }
1206 };
1207
1208 debug!(
1209 "[{}]: adding a new peer-reflexive candidate: {} ",
1210 self.get_name(),
1211 remote_addr
1212 );
1213 }
1214
1215 trace!(
1216 "[{}]: inbound STUN (Request) from {} to {}",
1217 self.get_name(),
1218 remote_addr,
1219 self.local_candidates[local_index]
1220 );
1221
1222 if let Some(remote_index) = &remote_candidate_index {
1223 self.handle_binding_request(m, local_index, *remote_index);
1224 }
1225 }
1226
1227 if let Some(remote_index) = remote_candidate_index {
1228 self.remote_candidates[remote_index].seen(false);
1229 }
1230
1231 Ok(())
1232 }
1233
1234 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1237 self.find_remote_candidate(remote_addr)
1238 .is_some_and(|remote_index| {
1239 self.remote_candidates[remote_index].seen(false);
1240 true
1241 })
1242 }
1243
1244 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1245 let peer_addr = self.remote_candidates[remote_index].addr();
1246 let local_addr = self.local_candidates[local_index].addr();
1247 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1248 TransportProtocol::TCP
1249 } else {
1250 TransportProtocol::UDP
1251 };
1252
1253 self.write_outs.push_back(TaggedBytesMut {
1254 now: Instant::now(),
1255 transport: TransportContext {
1256 local_addr,
1257 peer_addr,
1258 ecn: None,
1259 transport_protocol,
1260 },
1261 message: BytesMut::from(&msg.raw[..]),
1262 });
1263
1264 self.local_candidates[local_index].seen(true);
1265 }
1266
1267 fn handle_inbound_candidate_msg(
1268 &mut self,
1269 local_index: usize,
1270 msg: TaggedBytesMut,
1271 ) -> Result<()> {
1272 if is_stun_message(&msg.message) {
1273 let mut m = Message {
1274 raw: msg.message.to_vec(),
1275 ..Message::default()
1276 };
1277
1278 if let Err(err) = m.decode() {
1279 warn!(
1280 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1281 self.get_name(),
1282 msg.transport.local_addr,
1283 msg.transport.peer_addr,
1284 err
1285 );
1286 Err(err)
1287 } else {
1288 self.handle_inbound(msg.now, &mut m, local_index, msg.transport.peer_addr)
1289 }
1290 } else {
1291 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1292 warn!(
1293 "[{}]: Discarded message, not a valid remote candidate from {}",
1294 self.get_name(),
1295 msg.transport.peer_addr,
1296 );
1297 } else {
1298 warn!(
1299 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1300 self.get_name(),
1301 msg.transport.peer_addr
1302 );
1303 }
1304 Err(Error::ErrNonStunmessage)
1305 }
1306 }
1307
1308 pub(crate) fn get_name(&self) -> &str {
1309 if self.is_controlling {
1310 "controlling"
1311 } else {
1312 "controlled"
1313 }
1314 }
1315
1316 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1317 self.selected_pair
1318 }
1319
1320 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1321 let mut best_pair_index: Option<usize> = None;
1322
1323 for (index, p) in self.candidate_pairs.iter().enumerate() {
1324 if p.state == CandidatePairState::Failed {
1325 continue;
1326 }
1327
1328 if let Some(pair_index) = &mut best_pair_index {
1329 let b = &self.candidate_pairs[*pair_index];
1330 if b.priority() < p.priority() {
1331 *pair_index = index;
1332 }
1333 } else {
1334 best_pair_index = Some(index);
1335 }
1336 }
1337
1338 best_pair_index
1339 }
1340
1341 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1342 let mut best_pair_index: Option<usize> = None;
1343
1344 for (index, p) in self.candidate_pairs.iter().enumerate() {
1345 if p.state != CandidatePairState::Succeeded {
1346 continue;
1347 }
1348
1349 if let Some(pair_index) = &mut best_pair_index {
1350 let b = &self.candidate_pairs[*pair_index];
1351 if b.priority() < p.priority() {
1352 *pair_index = index;
1353 }
1354 } else {
1355 best_pair_index = Some(index);
1356 }
1357 }
1358
1359 best_pair_index
1360 }
1361}