1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use mdns::{Mdns, QueryId};
13use sansio::Protocol;
14use std::collections::{HashMap, VecDeque};
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use stun::attributes::*;
19use stun::fingerprint::*;
20use stun::integrity::*;
21use stun::message::*;
22use stun::textattrs::*;
23use stun::xoraddr::*;
24
25use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
26use crate::candidate::{candidate_pair::*, *};
27use crate::mdns::{MulticastDnsMode, create_multicast_dns, generate_multicast_dns_name};
28use crate::network_type::NetworkType;
29use crate::rand::*;
30use crate::state::*;
31use crate::url::*;
32use shared::error::*;
33use shared::{TaggedBytesMut, TransportContext, TransportProtocol};
34
35const ZERO_DURATION: Duration = Duration::from_secs(0);
36
37#[derive(Debug, Clone)]
38pub(crate) struct BindingRequest {
39 pub(crate) timestamp: Instant,
40 pub(crate) transaction_id: TransactionId,
41 pub(crate) destination: SocketAddr,
42 pub(crate) is_use_candidate: bool,
43}
44
45impl Default for BindingRequest {
46 fn default() -> Self {
47 Self {
48 timestamp: Instant::now(),
49 transaction_id: TransactionId::default(),
50 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
51 is_use_candidate: false,
52 }
53 }
54}
55
56#[derive(Default, Clone)]
57pub struct Credentials {
58 pub ufrag: String,
59 pub pwd: String,
60}
61
62#[derive(Default, Clone)]
63pub(crate) struct UfragPwd {
64 pub(crate) local_credentials: Credentials,
65 pub(crate) remote_credentials: Option<Credentials>,
66}
67
68fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
69 let mut username = Username::new(ATTR_USERNAME, String::new());
70 username.get_from(m)?;
71
72 if username.to_string() != expected_username {
73 return Err(Error::Other(format!(
74 "{:?} expected({}) actual({})",
75 Error::ErrMismatchUsername,
76 expected_username,
77 username,
78 )));
79 }
80
81 Ok(())
82}
83
84fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
85 let message_integrity_attr = MessageIntegrity(key.to_vec());
86 message_integrity_attr.check(m)
87}
88
89pub enum Event {
90 ConnectionStateChange(ConnectionState),
91 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
92}
93
94pub struct Agent {
96 pub(crate) tie_breaker: u64,
97 pub(crate) is_controlling: bool,
98 pub(crate) lite: bool,
99
100 pub(crate) start_time: Instant,
101
102 pub(crate) connection_state: ConnectionState,
103 pub(crate) last_connection_state: ConnectionState,
104
105 pub(crate) ufrag_pwd: UfragPwd,
107
108 pub(crate) local_candidates: Vec<Candidate>,
109 pub(crate) remote_candidates: Vec<Candidate>,
110 pub(crate) candidate_pairs: Vec<CandidatePair>,
111 pub(crate) nominated_pair: Option<usize>,
112 pub(crate) selected_pair: Option<usize>,
113
114 pub(crate) pending_binding_requests: Vec<BindingRequest>,
116
117 pub(crate) insecure_skip_verify: bool,
119 pub(crate) max_binding_requests: u16,
120 pub(crate) host_acceptance_min_wait: Duration,
121 pub(crate) srflx_acceptance_min_wait: Duration,
122 pub(crate) prflx_acceptance_min_wait: Duration,
123 pub(crate) relay_acceptance_min_wait: Duration,
124 pub(crate) disconnected_timeout: Duration,
127 pub(crate) failed_timeout: Duration,
130 pub(crate) keepalive_interval: Duration,
133 pub(crate) check_interval: Duration,
135 pub(crate) checking_duration: Instant,
136 pub(crate) last_checking_time: Instant,
137
138 pub(crate) mdns: Option<Mdns>,
139 pub(crate) mdns_queries: HashMap<QueryId, Candidate>,
140
141 pub(crate) mdns_mode: MulticastDnsMode,
142 pub(crate) mdns_local_name: String,
143 pub(crate) mdns_local_ip: Option<IpAddr>,
144
145 pub(crate) candidate_types: Vec<CandidateType>,
146 pub(crate) urls: Vec<Url>,
147
148 pub(crate) write_outs: VecDeque<TaggedBytesMut>,
149 pub(crate) event_outs: VecDeque<Event>,
150}
151
152impl Default for Agent {
153 fn default() -> Self {
154 Self {
155 tie_breaker: 0,
156 is_controlling: false,
157 lite: false,
158 start_time: Instant::now(),
159 connection_state: Default::default(),
160 last_connection_state: Default::default(),
161 ufrag_pwd: Default::default(),
162 local_candidates: vec![],
163 remote_candidates: vec![],
164 candidate_pairs: vec![],
165 nominated_pair: None,
166 selected_pair: None,
167 pending_binding_requests: vec![],
168 insecure_skip_verify: false,
169 max_binding_requests: 0,
170 host_acceptance_min_wait: Default::default(),
171 srflx_acceptance_min_wait: Default::default(),
172 prflx_acceptance_min_wait: Default::default(),
173 relay_acceptance_min_wait: Default::default(),
174 disconnected_timeout: Default::default(),
175 failed_timeout: Default::default(),
176 keepalive_interval: Default::default(),
177 check_interval: Default::default(),
178 checking_duration: Instant::now(),
179 last_checking_time: Instant::now(),
180 mdns_mode: MulticastDnsMode::Disabled,
181 mdns_local_name: "".to_owned(),
182 mdns_local_ip: None,
183 mdns_queries: HashMap::new(),
184 mdns: None,
185 candidate_types: vec![],
186 urls: vec![],
187 write_outs: Default::default(),
188 event_outs: Default::default(),
189 }
190 }
191}
192
193impl Agent {
194 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
196 let mut mdns_local_name = config.multicast_dns_local_name.clone();
197 if mdns_local_name.is_empty() {
198 mdns_local_name = generate_multicast_dns_name();
199 }
200
201 if !mdns_local_name.ends_with(".local") || mdns_local_name.split('.').count() != 2 {
202 return Err(Error::ErrInvalidMulticastDnshostName);
203 }
204
205 let mdns_mode = config.multicast_dns_mode;
206 let mdns = create_multicast_dns(
207 mdns_mode,
208 &mdns_local_name,
209 &config.multicast_dns_local_ip,
210 &config.multicast_dns_query_timeout,
211 )
212 .unwrap_or_else(|err| {
213 warn!("Failed to initialize mDNS {mdns_local_name}: {err}");
216 None
217 });
218
219 let candidate_types = if config.candidate_types.is_empty() {
220 default_candidate_types()
221 } else {
222 config.candidate_types.clone()
223 };
224
225 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
226 {
227 return Err(Error::ErrLiteUsingNonHostCandidates);
228 }
229
230 if !config.urls.is_empty()
231 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
232 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
233 {
234 return Err(Error::ErrUselessUrlsProvided);
235 }
236
237 let mut agent = Self {
238 tie_breaker: rand::random::<u64>(),
239 is_controlling: config.is_controlling,
240 lite: config.lite,
241
242 start_time: Instant::now(),
243
244 nominated_pair: None,
245 selected_pair: None,
246 candidate_pairs: vec![],
247
248 connection_state: ConnectionState::New,
249
250 insecure_skip_verify: config.insecure_skip_verify,
251
252 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
256 max_binding_requests
257 } else {
258 DEFAULT_MAX_BINDING_REQUESTS
259 },
260 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
261 config.host_acceptance_min_wait
262 {
263 host_acceptance_min_wait
264 } else {
265 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
266 },
267 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
268 config.srflx_acceptance_min_wait
269 {
270 srflx_acceptance_min_wait
271 } else {
272 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
273 },
274 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
275 config.prflx_acceptance_min_wait
276 {
277 prflx_acceptance_min_wait
278 } else {
279 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
280 },
281 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
282 config.relay_acceptance_min_wait
283 {
284 relay_acceptance_min_wait
285 } else {
286 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
287 },
288
289 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
292 disconnected_timeout
293 } else {
294 DEFAULT_DISCONNECTED_TIMEOUT
295 },
296
297 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
300 failed_timeout
301 } else {
302 DEFAULT_FAILED_TIMEOUT
303 },
304
305 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
308 keepalive_interval
309 } else {
310 DEFAULT_KEEPALIVE_INTERVAL
311 },
312
313 check_interval: if config.check_interval == Duration::from_secs(0) {
315 DEFAULT_CHECK_INTERVAL
316 } else {
317 config.check_interval
318 },
319 checking_duration: Instant::now(),
320 last_checking_time: Instant::now(),
321 last_connection_state: ConnectionState::Unspecified,
322
323 mdns,
324 mdns_queries: HashMap::new(),
325
326 mdns_mode,
327 mdns_local_name,
328 mdns_local_ip: config.multicast_dns_local_ip,
329
330 ufrag_pwd: UfragPwd::default(),
331
332 local_candidates: vec![],
333 remote_candidates: vec![],
334
335 pending_binding_requests: vec![],
337
338 candidate_types,
339 urls: config.urls.clone(),
340
341 write_outs: VecDeque::new(),
342 event_outs: VecDeque::new(),
343 };
344
345 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
347 {
348 let _ = agent.close();
349 return Err(err);
350 }
351
352 Ok(agent)
353 }
354
355 pub fn add_local_candidate(&mut self, mut c: Candidate) -> Result<()> {
357 if c.candidate_type() == CandidateType::Host
358 && self.mdns_mode == MulticastDnsMode::QueryAndGather
359 && c.network_type == NetworkType::Udp4
360 && self
361 .mdns_local_ip
362 .is_some_and(|local_ip| local_ip == c.addr().ip())
363 {
364 trace!(
367 "mDNS hides local ip {} with local name {}",
368 c.address, self.mdns_local_name
369 );
370 c.address = self.mdns_local_name.clone();
371 }
372
373 for cand in &self.local_candidates {
374 if cand.equal(&c) {
375 return Ok(());
376 }
377 }
378
379 self.local_candidates.push(c);
380
381 for remote_index in 0..self.remote_candidates.len() {
382 self.add_pair(self.local_candidates.len() - 1, remote_index);
383 }
384
385 self.request_connectivity_check();
386
387 Ok(())
388 }
389
390 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
392 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
394 if self.mdns_mode == MulticastDnsMode::Disabled {
395 warn!(
396 "remote mDNS candidate added, but mDNS is disabled: ({})",
397 c.address()
398 );
399 return Ok(());
400 }
401
402 if c.candidate_type() != CandidateType::Host {
403 return Err(Error::ErrAddressParseFailed);
404 }
405
406 if let Some(mdns_conn) = &mut self.mdns {
407 let query_id = mdns_conn.query(c.address());
408 self.mdns_queries.insert(query_id, c);
409 }
410
411 return Ok(());
412 }
413
414 self.trigger_request_connectivity_check(vec![c]);
415 Ok(())
416 }
417
418 fn trigger_request_connectivity_check(&mut self, remote_candidates: Vec<Candidate>) {
419 for c in remote_candidates {
420 if !self.remote_candidates.iter().any(|cand| cand.equal(&c)) {
421 self.remote_candidates.push(c);
422
423 for local_index in 0..self.local_candidates.len() {
424 self.add_pair(local_index, self.remote_candidates.len() - 1);
425 }
426
427 self.request_connectivity_check();
428 }
429 }
430 }
431
432 pub fn set_remote_credentials(
434 &mut self,
435 remote_ufrag: String,
436 remote_pwd: String,
437 ) -> Result<()> {
438 if remote_ufrag.is_empty() {
439 return Err(Error::ErrRemoteUfragEmpty);
440 } else if remote_pwd.is_empty() {
441 return Err(Error::ErrRemotePwdEmpty);
442 }
443
444 self.ufrag_pwd.remote_credentials = Some(Credentials {
445 ufrag: remote_ufrag,
446 pwd: remote_pwd,
447 });
448
449 Ok(())
450 }
451
452 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
454 self.ufrag_pwd.remote_credentials.as_ref()
455 }
456
457 pub fn get_local_credentials(&self) -> &Credentials {
459 &self.ufrag_pwd.local_credentials
460 }
461
462 pub fn role(&self) -> bool {
463 self.is_controlling
464 }
465
466 pub fn set_role(&mut self, is_controlling: bool) {
467 self.is_controlling = is_controlling;
468 }
469
470 pub fn state(&self) -> ConnectionState {
471 self.connection_state
472 }
473
474 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
475 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
476 .is_some()
477 && self.validate_non_stun_traffic(transport.peer_addr)
478 }
479
480 fn get_timeout_interval(&self) -> Duration {
481 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
482 self.check_interval,
483 self.keepalive_interval,
484 self.disconnected_timeout,
485 self.failed_timeout,
486 );
487 let mut interval = DEFAULT_CHECK_INTERVAL;
488
489 let mut update_interval = |x: Duration| {
490 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
491 interval = x;
492 }
493 };
494
495 match self.last_connection_state {
496 ConnectionState::New | ConnectionState::Checking => {
497 update_interval(check_interval);
499 }
500 ConnectionState::Connected | ConnectionState::Disconnected => {
501 update_interval(keepalive_interval);
502 }
503 _ => {}
504 };
505 update_interval(disconnected_timeout);
507 update_interval(failed_timeout);
508 interval
509 }
510
511 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
513 if let Some(pair_index) = self.get_selected_pair() {
514 let candidate_pair = &self.candidate_pairs[pair_index];
515 Some((
516 &self.local_candidates[candidate_pair.local_index],
517 &self.remote_candidates[candidate_pair.remote_index],
518 ))
519 } else {
520 None
521 }
522 }
523
524 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
525 if let Some(pair_index) = self.get_best_available_pair() {
526 let candidate_pair = &self.candidate_pairs[pair_index];
527 Some((
528 &self.local_candidates[candidate_pair.local_index],
529 &self.remote_candidates[candidate_pair.remote_index],
530 ))
531 } else {
532 None
533 }
534 }
535
536 pub fn start_connectivity_checks(
538 &mut self,
539 is_controlling: bool,
540 remote_ufrag: String,
541 remote_pwd: String,
542 ) -> Result<()> {
543 debug!(
544 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
545 is_controlling, remote_ufrag, remote_pwd
546 );
547 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
548 self.is_controlling = is_controlling;
549 self.start();
550
551 self.update_connection_state(ConnectionState::Checking);
552 self.request_connectivity_check();
553
554 Ok(())
555 }
556
557 pub fn restart(
560 &mut self,
561 mut ufrag: String,
562 mut pwd: String,
563 keep_local_candidates: bool,
564 ) -> Result<()> {
565 if ufrag.is_empty() {
566 ufrag = generate_ufrag();
567 }
568 if pwd.is_empty() {
569 pwd = generate_pwd();
570 }
571
572 if ufrag.len() * 8 < 24 {
573 return Err(Error::ErrLocalUfragInsufficientBits);
574 }
575 if pwd.len() * 8 < 128 {
576 return Err(Error::ErrLocalPwdInsufficientBits);
577 }
578
579 self.ufrag_pwd.local_credentials.ufrag = ufrag;
581 self.ufrag_pwd.local_credentials.pwd = pwd;
582 self.ufrag_pwd.remote_credentials = None;
583
584 self.pending_binding_requests = vec![];
585
586 self.candidate_pairs = vec![];
587
588 self.set_selected_pair(None);
589 self.delete_all_candidates(keep_local_candidates);
590 self.start();
591
592 if self.connection_state != ConnectionState::New {
595 self.update_connection_state(ConnectionState::Checking);
596 }
597
598 Ok(())
599 }
600
601 pub fn get_local_candidates(&self) -> &[Candidate] {
603 &self.local_candidates
604 }
605
606 fn contact(&mut self, now: Instant) {
607 if self.connection_state == ConnectionState::Failed {
608 self.last_connection_state = self.connection_state;
611 return;
612 }
613 if self.connection_state == ConnectionState::Checking {
614 if self.last_connection_state != self.connection_state {
616 self.checking_duration = now;
617 }
618
619 if now
621 .checked_duration_since(self.checking_duration)
622 .unwrap_or_else(|| Duration::from_secs(0))
623 > self.disconnected_timeout + self.failed_timeout
624 {
625 self.update_connection_state(ConnectionState::Failed);
626 self.last_connection_state = self.connection_state;
627 return;
628 }
629 }
630
631 self.contact_candidates();
632
633 self.last_connection_state = self.connection_state;
634 self.last_checking_time = now;
635 }
636
637 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
638 if self.connection_state != new_state {
639 if new_state == ConnectionState::Failed {
641 self.set_selected_pair(None);
642 self.delete_all_candidates(false);
643 }
644
645 info!(
646 "[{}]: Setting new connection state: {}",
647 self.get_name(),
648 new_state
649 );
650 self.connection_state = new_state;
651 self.event_outs
652 .push_back(Event::ConnectionStateChange(new_state));
653 }
654 }
655
656 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
657 if let Some(pair_index) = selected_pair {
658 trace!(
659 "[{}]: Set selected candidate pair: {:?}",
660 self.get_name(),
661 self.candidate_pairs[pair_index]
662 );
663
664 self.candidate_pairs[pair_index].nominated = true;
665 self.selected_pair = Some(pair_index);
666
667 self.update_connection_state(ConnectionState::Connected);
668
669 let candidate_pair = &self.candidate_pairs[pair_index];
671 self.event_outs
672 .push_back(Event::SelectedCandidatePairChange(
673 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
674 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
675 ));
676 } else {
677 self.selected_pair = None;
678 }
679 }
680
681 pub(crate) fn ping_all_candidates(&mut self) {
682 let mut pairs: Vec<(usize, usize)> = vec![];
683
684 let name = self.get_name().to_string();
685 if self.candidate_pairs.is_empty() {
686 warn!(
687 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
688 name,
689 );
690 }
691 for p in &mut self.candidate_pairs {
692 if p.state == CandidatePairState::Waiting {
693 p.state = CandidatePairState::InProgress;
694 } else if p.state != CandidatePairState::InProgress {
695 continue;
696 }
697
698 if p.binding_request_count > self.max_binding_requests {
699 trace!(
700 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
701 name,
702 *p,
703 self.local_candidates[p.local_index].addr(),
704 self.remote_candidates[p.remote_index].addr()
705 );
706 p.state = CandidatePairState::Failed;
707 } else {
708 p.binding_request_count += 1;
709 let local = p.local_index;
710 let remote = p.remote_index;
711 pairs.push((local, remote));
712 }
713 }
714
715 if !pairs.is_empty() {
716 trace!(
717 "[{}]: pinging all {} candidates",
718 self.get_name(),
719 pairs.len()
720 );
721 }
722
723 for (local, remote) in pairs {
724 self.ping_candidate(local, remote);
725 }
726 }
727
728 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
729 let p = CandidatePair::new(
730 local_index,
731 remote_index,
732 self.local_candidates[local_index].priority(),
733 self.remote_candidates[remote_index].priority(),
734 self.is_controlling,
735 );
736 self.candidate_pairs.push(p);
737 }
738
739 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
740 for (index, p) in self.candidate_pairs.iter().enumerate() {
741 if p.local_index == local_index && p.remote_index == remote_index {
742 return Some(index);
743 }
744 }
745 None
746 }
747
748 pub(crate) fn validate_selected_pair(&mut self) -> bool {
751 let (valid, disconnected_time) = {
752 self.selected_pair.as_ref().map_or_else(
753 || (false, Duration::from_secs(0)),
754 |&pair_index| {
755 let remote_index = self.candidate_pairs[pair_index].remote_index;
756
757 let disconnected_time = Instant::now()
758 .duration_since(self.remote_candidates[remote_index].last_received());
759 (true, disconnected_time)
760 },
761 )
762 };
763
764 if valid {
765 let mut total_time_to_failure = self.failed_timeout;
767 if total_time_to_failure != Duration::from_secs(0) {
768 total_time_to_failure += self.disconnected_timeout;
769 }
770
771 if total_time_to_failure != Duration::from_secs(0)
772 && disconnected_time > total_time_to_failure
773 {
774 self.update_connection_state(ConnectionState::Failed);
775 } else if self.disconnected_timeout != Duration::from_secs(0)
776 && disconnected_time > self.disconnected_timeout
777 {
778 self.update_connection_state(ConnectionState::Disconnected);
779 } else {
780 self.update_connection_state(ConnectionState::Connected);
781 }
782 }
783
784 valid
785 }
786
787 pub(crate) fn check_keepalive(&mut self) {
791 let (local_index, remote_index) = {
792 self.selected_pair
793 .as_ref()
794 .map_or((None, None), |&pair_index| {
795 let p = &self.candidate_pairs[pair_index];
796 (Some(p.local_index), Some(p.remote_index))
797 })
798 };
799
800 if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
801 let last_sent =
802 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
803
804 let last_received =
805 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
806
807 if (self.keepalive_interval != Duration::from_secs(0))
808 && ((last_sent > self.keepalive_interval)
809 || (last_received > self.keepalive_interval))
810 {
811 self.ping_candidate(local_index, remote_index);
814 }
815 }
816 }
817
818 fn request_connectivity_check(&mut self) {
819 if self.ufrag_pwd.remote_credentials.is_some() {
820 self.contact(Instant::now());
821 }
822 }
823
824 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
829 if !keep_local_candidates {
830 self.local_candidates.clear();
831 }
832 self.remote_candidates.clear();
833 }
834
835 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
836 for (index, c) in self.remote_candidates.iter().enumerate() {
837 if c.addr() == addr {
838 return Some(index);
839 }
840 }
841 None
842 }
843
844 pub(crate) fn find_local_candidate(
845 &self,
846 addr: SocketAddr,
847 transport_protocol: TransportProtocol,
848 ) -> Option<usize> {
849 for (index, c) in self.local_candidates.iter().enumerate() {
850 if c.addr() == addr && c.network_type().to_protocol() == transport_protocol {
851 return Some(index);
852 }
853 }
854 None
855 }
856
857 pub(crate) fn send_binding_request(
858 &mut self,
859 m: &Message,
860 local_index: usize,
861 remote_index: usize,
862 ) {
863 trace!(
864 "[{}]: ping STUN from {} to {}",
865 self.get_name(),
866 self.local_candidates[local_index],
867 self.remote_candidates[remote_index],
868 );
869
870 self.invalidate_pending_binding_requests(Instant::now());
871
872 self.pending_binding_requests.push(BindingRequest {
873 timestamp: Instant::now(),
874 transaction_id: m.transaction_id,
875 destination: self.remote_candidates[remote_index].addr(),
876 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
877 });
878
879 self.send_stun(m, local_index, remote_index);
880 }
881
882 pub(crate) fn send_binding_success(
883 &mut self,
884 m: &Message,
885 local_index: usize,
886 remote_index: usize,
887 ) {
888 let addr = self.remote_candidates[remote_index].addr();
889 let (ip, port) = (addr.ip(), addr.port());
890 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
891
892 let (out, result) = {
893 let mut out = Message::new();
894 let result = out.build(&[
895 Box::new(m.clone()),
896 Box::new(BINDING_SUCCESS),
897 Box::new(XorMappedAddress { ip, port }),
898 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
899 Box::new(FINGERPRINT),
900 ]);
901 (out, result)
902 };
903
904 if let Err(err) = result {
905 warn!(
906 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
907 self.get_name(),
908 self.local_candidates[local_index],
909 self.remote_candidates[remote_index],
910 err
911 );
912 } else {
913 self.send_stun(&out, local_index, remote_index);
914 }
915 }
916
917 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
922 let pending_binding_requests = &mut self.pending_binding_requests;
923 let initial_size = pending_binding_requests.len();
924
925 let mut temp = vec![];
926 for binding_request in pending_binding_requests.drain(..) {
927 if filter_time
928 .checked_duration_since(binding_request.timestamp)
929 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
930 .unwrap_or(true)
931 {
932 temp.push(binding_request);
933 }
934 }
935
936 *pending_binding_requests = temp;
937 let bind_requests_remaining = pending_binding_requests.len();
938 let bind_requests_removed = initial_size - bind_requests_remaining;
939 if bind_requests_removed > 0 {
940 trace!(
941 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
942 self.get_name(),
943 bind_requests_removed,
944 bind_requests_remaining,
945 );
946 }
947 }
948
949 pub(crate) fn handle_inbound_binding_success(
952 &mut self,
953 id: TransactionId,
954 ) -> Option<BindingRequest> {
955 self.invalidate_pending_binding_requests(Instant::now());
956
957 let pending_binding_requests = &mut self.pending_binding_requests;
958 for i in 0..pending_binding_requests.len() {
959 if pending_binding_requests[i].transaction_id == id {
960 let valid_binding_request = pending_binding_requests.remove(i);
961 return Some(valid_binding_request);
962 }
963 }
964 None
965 }
966
967 pub(crate) fn handle_inbound(
969 &mut self,
970 m: &mut Message,
971 local_index: usize,
972 remote_addr: SocketAddr,
973 ) -> Result<()> {
974 if m.typ.method != METHOD_BINDING
975 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
976 || m.typ.class == CLASS_REQUEST
977 || m.typ.class == CLASS_INDICATION)
978 {
979 trace!(
980 "[{}]: unhandled STUN from {} to {} class({}) method({})",
981 self.get_name(),
982 remote_addr,
983 self.local_candidates[local_index],
984 m.typ.class,
985 m.typ.method
986 );
987 return Err(Error::ErrUnhandledStunpacket);
988 }
989
990 if self.is_controlling {
991 if m.contains(ATTR_ICE_CONTROLLING) {
992 debug!(
993 "[{}]: inbound isControlling && a.isControlling == true",
994 self.get_name(),
995 );
996 return Err(Error::ErrUnexpectedStunrequestMessage);
997 } else if m.contains(ATTR_USE_CANDIDATE) {
998 debug!(
999 "[{}]: useCandidate && a.isControlling == true",
1000 self.get_name(),
1001 );
1002 return Err(Error::ErrUnexpectedStunrequestMessage);
1003 }
1004 } else if m.contains(ATTR_ICE_CONTROLLED) {
1005 debug!(
1006 "[{}]: inbound isControlled && a.isControlling == false",
1007 self.get_name(),
1008 );
1009 return Err(Error::ErrUnexpectedStunrequestMessage);
1010 }
1011
1012 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
1013 debug!(
1014 "[{}]: ufrag_pwd.remote_credentials.is_none",
1015 self.get_name(),
1016 );
1017 return Err(Error::ErrPasswordEmpty);
1018 };
1019
1020 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
1021 if m.typ.class == CLASS_SUCCESS_RESPONSE {
1022 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
1023 {
1024 warn!(
1025 "[{}]: discard message from ({}), {}",
1026 self.get_name(),
1027 remote_addr,
1028 err
1029 );
1030 return Err(err);
1031 }
1032
1033 if let Some(remote_index) = &remote_candidate_index {
1034 self.handle_success_response(m, local_index, *remote_index, remote_addr);
1035 } else {
1036 warn!(
1037 "[{}]: discard success message from ({}), no such remote",
1038 self.get_name(),
1039 remote_addr
1040 );
1041 return Err(Error::ErrUnhandledStunpacket);
1042 }
1043 } else if m.typ.class == CLASS_REQUEST {
1044 {
1045 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
1046 + ":"
1047 + remote_credentials.ufrag.as_str();
1048 if let Err(err) = assert_inbound_username(m, &username) {
1049 warn!(
1050 "[{}]: discard message from ({}), {}",
1051 self.get_name(),
1052 remote_addr,
1053 err
1054 );
1055 return Err(err);
1056 } else if let Err(err) = assert_inbound_message_integrity(
1057 m,
1058 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
1059 ) {
1060 warn!(
1061 "[{}]: discard message from ({}), {}",
1062 self.get_name(),
1063 remote_addr,
1064 err
1065 );
1066 return Err(err);
1067 }
1068 }
1069
1070 if remote_candidate_index.is_none() {
1071 let (ip, port, network_type) =
1072 (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
1073
1074 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1075 base_config: CandidateConfig {
1076 network: network_type.to_string(),
1077 address: ip.to_string(),
1078 port,
1079 component: self.local_candidates[local_index].component(),
1080 ..CandidateConfig::default()
1081 },
1082 rel_addr: "".to_owned(),
1083 rel_port: 0,
1084 };
1085
1086 match prflx_candidate_config.new_candidate_peer_reflexive() {
1087 Ok(prflx_candidate) => {
1088 let _ = self.add_remote_candidate(prflx_candidate);
1089 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1090 }
1091 Err(err) => {
1092 error!(
1093 "[{}]: Failed to create new remote prflx candidate ({})",
1094 self.get_name(),
1095 err
1096 );
1097 return Err(err);
1098 }
1099 };
1100
1101 debug!(
1102 "[{}]: adding a new peer-reflexive candidate: {} ",
1103 self.get_name(),
1104 remote_addr
1105 );
1106 }
1107
1108 trace!(
1109 "[{}]: inbound STUN (Request) from {} to {}",
1110 self.get_name(),
1111 remote_addr,
1112 self.local_candidates[local_index]
1113 );
1114
1115 if let Some(remote_index) = &remote_candidate_index {
1116 self.handle_binding_request(m, local_index, *remote_index);
1117 }
1118 }
1119
1120 if let Some(remote_index) = remote_candidate_index {
1121 self.remote_candidates[remote_index].seen(false);
1122 }
1123
1124 Ok(())
1125 }
1126
1127 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1130 self.find_remote_candidate(remote_addr)
1131 .is_some_and(|remote_index| {
1132 self.remote_candidates[remote_index].seen(false);
1133 true
1134 })
1135 }
1136
1137 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1138 let peer_addr = self.remote_candidates[remote_index].addr();
1139 let local_addr = self.local_candidates[local_index].addr();
1140 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1141 TransportProtocol::TCP
1142 } else {
1143 TransportProtocol::UDP
1144 };
1145
1146 self.write_outs.push_back(TaggedBytesMut {
1147 now: Instant::now(),
1148 transport: TransportContext {
1149 local_addr,
1150 peer_addr,
1151 ecn: None,
1152 transport_protocol,
1153 },
1154 message: BytesMut::from(&msg.raw[..]),
1155 });
1156
1157 self.local_candidates[local_index].seen(true);
1158 }
1159
1160 fn handle_inbound_candidate_msg(
1161 &mut self,
1162 local_index: usize,
1163 msg: TaggedBytesMut,
1164 ) -> Result<()> {
1165 if is_stun_message(&msg.message) {
1166 let mut m = Message {
1167 raw: msg.message.to_vec(),
1168 ..Message::default()
1169 };
1170
1171 if let Err(err) = m.decode() {
1172 warn!(
1173 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1174 self.get_name(),
1175 msg.transport.local_addr,
1176 msg.transport.peer_addr,
1177 err
1178 );
1179 Err(err)
1180 } else {
1181 self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1182 }
1183 } else {
1184 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1185 warn!(
1186 "[{}]: Discarded message, not a valid remote candidate from {}",
1187 self.get_name(),
1188 msg.transport.peer_addr,
1189 );
1190 } else {
1191 warn!(
1192 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1193 self.get_name(),
1194 msg.transport.peer_addr
1195 );
1196 }
1197 Err(Error::ErrNonStunmessage)
1198 }
1199 }
1200
1201 pub(crate) fn get_name(&self) -> &str {
1202 if self.is_controlling {
1203 "controlling"
1204 } else {
1205 "controlled"
1206 }
1207 }
1208
1209 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1210 self.selected_pair
1211 }
1212
1213 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1214 let mut best_pair_index: Option<usize> = None;
1215
1216 for (index, p) in self.candidate_pairs.iter().enumerate() {
1217 if p.state == CandidatePairState::Failed {
1218 continue;
1219 }
1220
1221 if let Some(pair_index) = &mut best_pair_index {
1222 let b = &self.candidate_pairs[*pair_index];
1223 if b.priority() < p.priority() {
1224 *pair_index = index;
1225 }
1226 } else {
1227 best_pair_index = Some(index);
1228 }
1229 }
1230
1231 best_pair_index
1232 }
1233
1234 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1235 let mut best_pair_index: Option<usize> = None;
1236
1237 for (index, p) in self.candidate_pairs.iter().enumerate() {
1238 if p.state != CandidatePairState::Succeeded {
1239 continue;
1240 }
1241
1242 if let Some(pair_index) = &mut best_pair_index {
1243 let b = &self.candidate_pairs[*pair_index];
1244 if b.priority() < p.priority() {
1245 *pair_index = index;
1246 }
1247 } else {
1248 best_pair_index = Some(index);
1249 }
1250 }
1251
1252 best_pair_index
1253 }
1254}