1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5mod agent_proto;
6pub mod agent_selector;
7pub mod agent_stats;
8
9use agent_config::*;
10use bytes::BytesMut;
11use log::{debug, error, info, trace, warn};
12use sansio::Protocol;
13use std::collections::VecDeque;
14use std::net::{Ipv4Addr, SocketAddr};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use stun::attributes::*;
18use stun::fingerprint::*;
19use stun::integrity::*;
20use stun::message::*;
21use stun::textattrs::*;
22use stun::xoraddr::*;
23
24use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
25use crate::candidate::{candidate_pair::*, *};
26use crate::network_type::NetworkType;
27use crate::rand::*;
28use crate::state::*;
29use crate::url::*;
30use shared::error::*;
31use shared::{TransportContext, TransportMessage, TransportProtocol};
32
33const ZERO_DURATION: Duration = Duration::from_secs(0);
34
35#[derive(Debug, Clone)]
36pub(crate) struct BindingRequest {
37 pub(crate) timestamp: Instant,
38 pub(crate) transaction_id: TransactionId,
39 pub(crate) destination: SocketAddr,
40 pub(crate) is_use_candidate: bool,
41}
42
43impl Default for BindingRequest {
44 fn default() -> Self {
45 Self {
46 timestamp: Instant::now(),
47 transaction_id: TransactionId::default(),
48 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
49 is_use_candidate: false,
50 }
51 }
52}
53
54#[derive(Default, Clone)]
55pub struct Credentials {
56 pub ufrag: String,
57 pub pwd: String,
58}
59
60#[derive(Default, Clone)]
61pub(crate) struct UfragPwd {
62 pub(crate) local_credentials: Credentials,
63 pub(crate) remote_credentials: Option<Credentials>,
64}
65
66fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
67 let mut username = Username::new(ATTR_USERNAME, String::new());
68 username.get_from(m)?;
69
70 if username.to_string() != expected_username {
71 return Err(Error::Other(format!(
72 "{:?} expected({}) actual({})",
73 Error::ErrMismatchUsername,
74 expected_username,
75 username,
76 )));
77 }
78
79 Ok(())
80}
81
82fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
83 let message_integrity_attr = MessageIntegrity(key.to_vec());
84 message_integrity_attr.check(m)
85}
86
87pub enum Event {
88 ConnectionStateChange(ConnectionState),
89 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
90}
91
92pub struct Agent {
94 pub(crate) tie_breaker: u64,
95 pub(crate) is_controlling: bool,
96 pub(crate) lite: bool,
97
98 pub(crate) start_time: Instant,
99
100 pub(crate) connection_state: ConnectionState,
101 pub(crate) last_connection_state: ConnectionState,
102
103 pub(crate) ufrag_pwd: UfragPwd,
105
106 pub(crate) local_candidates: Vec<Candidate>,
107 pub(crate) remote_candidates: Vec<Candidate>,
108 pub(crate) candidate_pairs: Vec<CandidatePair>,
109 pub(crate) nominated_pair: Option<usize>,
110 pub(crate) selected_pair: Option<usize>,
111
112 pub(crate) pending_binding_requests: Vec<BindingRequest>,
114
115 pub(crate) insecure_skip_verify: bool,
117 pub(crate) max_binding_requests: u16,
118 pub(crate) host_acceptance_min_wait: Duration,
119 pub(crate) srflx_acceptance_min_wait: Duration,
120 pub(crate) prflx_acceptance_min_wait: Duration,
121 pub(crate) relay_acceptance_min_wait: Duration,
122 pub(crate) disconnected_timeout: Duration,
125 pub(crate) failed_timeout: Duration,
128 pub(crate) keepalive_interval: Duration,
131 pub(crate) check_interval: Duration,
133 pub(crate) checking_duration: Instant,
134 pub(crate) last_checking_time: Instant,
135
136 pub(crate) candidate_types: Vec<CandidateType>,
137 pub(crate) urls: Vec<Url>,
138
139 pub(crate) write_outs: VecDeque<TransportMessage<BytesMut>>,
140 pub(crate) event_outs: VecDeque<Event>,
141}
142
143impl Default for Agent {
144 fn default() -> Self {
145 Self {
146 tie_breaker: 0,
147 is_controlling: false,
148 lite: false,
149 start_time: Instant::now(),
150 connection_state: Default::default(),
151 last_connection_state: Default::default(),
152 ufrag_pwd: Default::default(),
153 local_candidates: vec![],
154 remote_candidates: vec![],
155 candidate_pairs: vec![],
156 nominated_pair: None,
157 selected_pair: None,
158 pending_binding_requests: vec![],
159 insecure_skip_verify: false,
160 max_binding_requests: 0,
161 host_acceptance_min_wait: Default::default(),
162 srflx_acceptance_min_wait: Default::default(),
163 prflx_acceptance_min_wait: Default::default(),
164 relay_acceptance_min_wait: Default::default(),
165 disconnected_timeout: Default::default(),
166 failed_timeout: Default::default(),
167 keepalive_interval: Default::default(),
168 check_interval: Default::default(),
169 checking_duration: Instant::now(),
170 last_checking_time: Instant::now(),
171 candidate_types: vec![],
172 urls: vec![],
173 write_outs: Default::default(),
174 event_outs: Default::default(),
175 }
176 }
177}
178
179impl Agent {
180 pub fn new(config: Arc<AgentConfig>) -> Result<Self> {
182 let candidate_types = if config.candidate_types.is_empty() {
183 default_candidate_types()
184 } else {
185 config.candidate_types.clone()
186 };
187
188 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
189 {
190 return Err(Error::ErrLiteUsingNonHostCandidates);
191 }
192
193 if !config.urls.is_empty()
194 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
195 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
196 {
197 return Err(Error::ErrUselessUrlsProvided);
198 }
199
200 let mut agent = Self {
201 tie_breaker: rand::random::<u64>(),
202 is_controlling: config.is_controlling,
203 lite: config.lite,
204
205 start_time: Instant::now(),
206
207 nominated_pair: None,
208 selected_pair: None,
209 candidate_pairs: vec![],
210
211 connection_state: ConnectionState::New,
212
213 insecure_skip_verify: config.insecure_skip_verify,
214
215 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
219 max_binding_requests
220 } else {
221 DEFAULT_MAX_BINDING_REQUESTS
222 },
223 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
224 config.host_acceptance_min_wait
225 {
226 host_acceptance_min_wait
227 } else {
228 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
229 },
230 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
231 config.srflx_acceptance_min_wait
232 {
233 srflx_acceptance_min_wait
234 } else {
235 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
236 },
237 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
238 config.prflx_acceptance_min_wait
239 {
240 prflx_acceptance_min_wait
241 } else {
242 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
243 },
244 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
245 config.relay_acceptance_min_wait
246 {
247 relay_acceptance_min_wait
248 } else {
249 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
250 },
251
252 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
255 disconnected_timeout
256 } else {
257 DEFAULT_DISCONNECTED_TIMEOUT
258 },
259
260 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
263 failed_timeout
264 } else {
265 DEFAULT_FAILED_TIMEOUT
266 },
267
268 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
271 keepalive_interval
272 } else {
273 DEFAULT_KEEPALIVE_INTERVAL
274 },
275
276 check_interval: if config.check_interval == Duration::from_secs(0) {
278 DEFAULT_CHECK_INTERVAL
279 } else {
280 config.check_interval
281 },
282 checking_duration: Instant::now(),
283 last_checking_time: Instant::now(),
284 last_connection_state: ConnectionState::Unspecified,
285
286 ufrag_pwd: UfragPwd::default(),
287
288 local_candidates: vec![],
289 remote_candidates: vec![],
290
291 pending_binding_requests: vec![],
293
294 candidate_types,
295 urls: config.urls.clone(),
296
297 write_outs: VecDeque::new(),
298 event_outs: VecDeque::new(),
299 };
300
301 if let Err(err) = agent.restart(config.local_ufrag.clone(), config.local_pwd.clone(), false)
303 {
304 let _ = agent.close();
305 return Err(err);
306 }
307
308 Ok(agent)
309 }
310
311 pub fn add_local_candidate(&mut self, c: Candidate) -> Result<()> {
313 for cand in &self.local_candidates {
314 if cand.equal(&c) {
315 return Ok(());
316 }
317 }
318
319 self.local_candidates.push(c);
320
321 for remote_index in 0..self.remote_candidates.len() {
322 self.add_pair(self.local_candidates.len() - 1, remote_index);
323 }
324
325 self.request_connectivity_check();
326
327 Ok(())
328 }
329
330 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
332 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
334 warn!(
335 "remote mDNS candidate added, but mDNS is disabled: ({})",
336 c.address()
337 );
338 return Err(Error::ErrMulticastDnsNotSupported);
339 }
340
341 for cand in &self.remote_candidates {
342 if cand.equal(&c) {
343 return Ok(());
344 }
345 }
346
347 self.remote_candidates.push(c);
348
349 for local_index in 0..self.local_candidates.len() {
350 self.add_pair(local_index, self.remote_candidates.len() - 1);
351 }
352
353 self.request_connectivity_check();
354
355 Ok(())
356 }
357
358 pub fn set_remote_credentials(
360 &mut self,
361 remote_ufrag: String,
362 remote_pwd: String,
363 ) -> Result<()> {
364 if remote_ufrag.is_empty() {
365 return Err(Error::ErrRemoteUfragEmpty);
366 } else if remote_pwd.is_empty() {
367 return Err(Error::ErrRemotePwdEmpty);
368 }
369
370 self.ufrag_pwd.remote_credentials = Some(Credentials {
371 ufrag: remote_ufrag,
372 pwd: remote_pwd,
373 });
374
375 Ok(())
376 }
377
378 pub fn get_remote_credentials(&self) -> Option<&Credentials> {
380 self.ufrag_pwd.remote_credentials.as_ref()
381 }
382
383 pub fn get_local_credentials(&self) -> &Credentials {
385 &self.ufrag_pwd.local_credentials
386 }
387
388 pub fn role(&self) -> bool {
389 self.is_controlling
390 }
391
392 pub fn set_role(&mut self, is_controlling: bool) {
393 self.is_controlling = is_controlling;
394 }
395
396 pub fn state(&self) -> ConnectionState {
397 self.connection_state
398 }
399
400 pub fn is_valid_non_stun_traffic(&mut self, transport: TransportContext) -> bool {
401 self.find_local_candidate(transport.local_addr, transport.transport_protocol)
402 .is_some()
403 && self.validate_non_stun_traffic(transport.peer_addr)
404 }
405
406 fn get_timeout_interval(&self) -> Duration {
407 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
408 self.check_interval,
409 self.keepalive_interval,
410 self.disconnected_timeout,
411 self.failed_timeout,
412 );
413 let mut interval = DEFAULT_CHECK_INTERVAL;
414
415 let mut update_interval = |x: Duration| {
416 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
417 interval = x;
418 }
419 };
420
421 match self.last_connection_state {
422 ConnectionState::New | ConnectionState::Checking => {
423 update_interval(check_interval);
425 }
426 ConnectionState::Connected | ConnectionState::Disconnected => {
427 update_interval(keepalive_interval);
428 }
429 _ => {}
430 };
431 update_interval(disconnected_timeout);
433 update_interval(failed_timeout);
434 interval
435 }
436
437 pub fn get_selected_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
439 if let Some(pair_index) = self.get_selected_pair() {
440 let candidate_pair = &self.candidate_pairs[pair_index];
441 Some((
442 &self.local_candidates[candidate_pair.local_index],
443 &self.remote_candidates[candidate_pair.remote_index],
444 ))
445 } else {
446 None
447 }
448 }
449
450 pub fn get_best_available_candidate_pair(&self) -> Option<(&Candidate, &Candidate)> {
451 if let Some(pair_index) = self.get_best_available_pair() {
452 let candidate_pair = &self.candidate_pairs[pair_index];
453 Some((
454 &self.local_candidates[candidate_pair.local_index],
455 &self.remote_candidates[candidate_pair.remote_index],
456 ))
457 } else {
458 None
459 }
460 }
461
462 pub fn start_connectivity_checks(
464 &mut self,
465 is_controlling: bool,
466 remote_ufrag: String,
467 remote_pwd: String,
468 ) -> Result<()> {
469 debug!(
470 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
471 is_controlling, remote_ufrag, remote_pwd
472 );
473 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
474 self.is_controlling = is_controlling;
475 self.start();
476
477 self.update_connection_state(ConnectionState::Checking);
478 self.request_connectivity_check();
479
480 Ok(())
481 }
482
483 pub fn restart(
486 &mut self,
487 mut ufrag: String,
488 mut pwd: String,
489 keep_local_candidates: bool,
490 ) -> Result<()> {
491 if ufrag.is_empty() {
492 ufrag = generate_ufrag();
493 }
494 if pwd.is_empty() {
495 pwd = generate_pwd();
496 }
497
498 if ufrag.len() * 8 < 24 {
499 return Err(Error::ErrLocalUfragInsufficientBits);
500 }
501 if pwd.len() * 8 < 128 {
502 return Err(Error::ErrLocalPwdInsufficientBits);
503 }
504
505 self.ufrag_pwd.local_credentials.ufrag = ufrag;
507 self.ufrag_pwd.local_credentials.pwd = pwd;
508 self.ufrag_pwd.remote_credentials = None;
509
510 self.pending_binding_requests = vec![];
511
512 self.candidate_pairs = vec![];
513
514 self.set_selected_pair(None);
515 self.delete_all_candidates(keep_local_candidates);
516 self.start();
517
518 if self.connection_state != ConnectionState::New {
521 self.update_connection_state(ConnectionState::Checking);
522 }
523
524 Ok(())
525 }
526
527 pub fn get_local_candidates(&self) -> &[Candidate] {
529 &self.local_candidates
530 }
531
532 fn contact(&mut self, now: Instant) {
533 if self.connection_state == ConnectionState::Failed {
534 self.last_connection_state = self.connection_state;
537 return;
538 }
539 if self.connection_state == ConnectionState::Checking {
540 if self.last_connection_state != self.connection_state {
542 self.checking_duration = now;
543 }
544
545 if now
547 .checked_duration_since(self.checking_duration)
548 .unwrap_or_else(|| Duration::from_secs(0))
549 > self.disconnected_timeout + self.failed_timeout
550 {
551 self.update_connection_state(ConnectionState::Failed);
552 self.last_connection_state = self.connection_state;
553 return;
554 }
555 }
556
557 self.contact_candidates();
558
559 self.last_connection_state = self.connection_state;
560 self.last_checking_time = now;
561 }
562
563 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
564 if self.connection_state != new_state {
565 if new_state == ConnectionState::Failed {
567 self.set_selected_pair(None);
568 self.delete_all_candidates(false);
569 }
570
571 info!(
572 "[{}]: Setting new connection state: {}",
573 self.get_name(),
574 new_state
575 );
576 self.connection_state = new_state;
577 self.event_outs
578 .push_back(Event::ConnectionStateChange(new_state));
579 }
580 }
581
582 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
583 if let Some(pair_index) = selected_pair {
584 trace!(
585 "[{}]: Set selected candidate pair: {:?}",
586 self.get_name(),
587 self.candidate_pairs[pair_index]
588 );
589
590 self.candidate_pairs[pair_index].nominated = true;
591 self.selected_pair = Some(pair_index);
592
593 self.update_connection_state(ConnectionState::Connected);
594
595 let candidate_pair = &self.candidate_pairs[pair_index];
597 self.event_outs
598 .push_back(Event::SelectedCandidatePairChange(
599 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
600 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
601 ));
602 } else {
603 self.selected_pair = None;
604 }
605 }
606
607 pub(crate) fn ping_all_candidates(&mut self) {
608 let mut pairs: Vec<(usize, usize)> = vec![];
609
610 let name = self.get_name().to_string();
611 if self.candidate_pairs.is_empty() {
612 warn!(
613 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
614 name,
615 );
616 }
617 for p in &mut self.candidate_pairs {
618 if p.state == CandidatePairState::Waiting {
619 p.state = CandidatePairState::InProgress;
620 } else if p.state != CandidatePairState::InProgress {
621 continue;
622 }
623
624 if p.binding_request_count > self.max_binding_requests {
625 trace!(
626 "[{}]: max requests reached for pair {} (local_addr {} <-> remote_addr {}), marking it as failed",
627 name,
628 *p,
629 self.local_candidates[p.local_index].addr(),
630 self.remote_candidates[p.remote_index].addr()
631 );
632 p.state = CandidatePairState::Failed;
633 } else {
634 p.binding_request_count += 1;
635 let local = p.local_index;
636 let remote = p.remote_index;
637 pairs.push((local, remote));
638 }
639 }
640
641 if !pairs.is_empty() {
642 trace!(
643 "[{}]: pinging all {} candidates",
644 self.get_name(),
645 pairs.len()
646 );
647 }
648
649 for (local, remote) in pairs {
650 self.ping_candidate(local, remote);
651 }
652 }
653
654 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
655 let p = CandidatePair::new(
656 local_index,
657 remote_index,
658 self.local_candidates[local_index].priority(),
659 self.remote_candidates[remote_index].priority(),
660 self.is_controlling,
661 );
662 self.candidate_pairs.push(p);
663 }
664
665 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
666 for (index, p) in self.candidate_pairs.iter().enumerate() {
667 if p.local_index == local_index && p.remote_index == remote_index {
668 return Some(index);
669 }
670 }
671 None
672 }
673
674 pub(crate) fn validate_selected_pair(&mut self) -> bool {
677 let (valid, disconnected_time) = {
678 self.selected_pair.as_ref().map_or_else(
679 || (false, Duration::from_secs(0)),
680 |&pair_index| {
681 let remote_index = self.candidate_pairs[pair_index].remote_index;
682
683 let disconnected_time = Instant::now()
684 .duration_since(self.remote_candidates[remote_index].last_received());
685 (true, disconnected_time)
686 },
687 )
688 };
689
690 if valid {
691 let mut total_time_to_failure = self.failed_timeout;
693 if total_time_to_failure != Duration::from_secs(0) {
694 total_time_to_failure += self.disconnected_timeout;
695 }
696
697 if total_time_to_failure != Duration::from_secs(0)
698 && disconnected_time > total_time_to_failure
699 {
700 self.update_connection_state(ConnectionState::Failed);
701 } else if self.disconnected_timeout != Duration::from_secs(0)
702 && disconnected_time > self.disconnected_timeout
703 {
704 self.update_connection_state(ConnectionState::Disconnected);
705 } else {
706 self.update_connection_state(ConnectionState::Connected);
707 }
708 }
709
710 valid
711 }
712
713 pub(crate) fn check_keepalive(&mut self) {
717 let (local_index, remote_index) = {
718 self.selected_pair
719 .as_ref()
720 .map_or((None, None), |&pair_index| {
721 let p = &self.candidate_pairs[pair_index];
722 (Some(p.local_index), Some(p.remote_index))
723 })
724 };
725
726 if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
727 let last_sent =
728 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
729
730 let last_received =
731 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
732
733 if (self.keepalive_interval != Duration::from_secs(0))
734 && ((last_sent > self.keepalive_interval)
735 || (last_received > self.keepalive_interval))
736 {
737 self.ping_candidate(local_index, remote_index);
740 }
741 }
742 }
743
744 fn request_connectivity_check(&mut self) {
745 if self.ufrag_pwd.remote_credentials.is_some() {
746 self.contact(Instant::now());
747 }
748 }
749
750 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
755 if !keep_local_candidates {
756 self.local_candidates.clear();
757 }
758 self.remote_candidates.clear();
759 }
760
761 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
762 let (ip, port) = (addr.ip(), addr.port());
763 for (index, c) in self.remote_candidates.iter().enumerate() {
764 if c.address() == ip.to_string() && c.port() == port {
765 return Some(index);
766 }
767 }
768 None
769 }
770
771 pub(crate) fn find_local_candidate(
772 &self,
773 addr: SocketAddr,
774 transport_protocol: TransportProtocol,
775 ) -> Option<usize> {
776 for (index, c) in self.local_candidates.iter().enumerate() {
777 if c.addr() == addr && c.network_type().to_protocol() == transport_protocol {
778 return Some(index);
779 }
780 }
781 None
782 }
783
784 pub(crate) fn send_binding_request(
785 &mut self,
786 m: &Message,
787 local_index: usize,
788 remote_index: usize,
789 ) {
790 trace!(
791 "[{}]: ping STUN from {} to {}",
792 self.get_name(),
793 self.local_candidates[local_index],
794 self.remote_candidates[remote_index],
795 );
796
797 self.invalidate_pending_binding_requests(Instant::now());
798
799 self.pending_binding_requests.push(BindingRequest {
800 timestamp: Instant::now(),
801 transaction_id: m.transaction_id,
802 destination: self.remote_candidates[remote_index].addr(),
803 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
804 });
805
806 self.send_stun(m, local_index, remote_index);
807 }
808
809 pub(crate) fn send_binding_success(
810 &mut self,
811 m: &Message,
812 local_index: usize,
813 remote_index: usize,
814 ) {
815 let addr = self.remote_candidates[remote_index].addr();
816 let (ip, port) = (addr.ip(), addr.port());
817 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
818
819 let (out, result) = {
820 let mut out = Message::new();
821 let result = out.build(&[
822 Box::new(m.clone()),
823 Box::new(BINDING_SUCCESS),
824 Box::new(XorMappedAddress { ip, port }),
825 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
826 Box::new(FINGERPRINT),
827 ]);
828 (out, result)
829 };
830
831 if let Err(err) = result {
832 warn!(
833 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
834 self.get_name(),
835 self.local_candidates[local_index],
836 self.remote_candidates[remote_index],
837 err
838 );
839 } else {
840 self.send_stun(&out, local_index, remote_index);
841 }
842 }
843
844 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
849 let pending_binding_requests = &mut self.pending_binding_requests;
850 let initial_size = pending_binding_requests.len();
851
852 let mut temp = vec![];
853 for binding_request in pending_binding_requests.drain(..) {
854 if filter_time
855 .checked_duration_since(binding_request.timestamp)
856 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
857 .unwrap_or(true)
858 {
859 temp.push(binding_request);
860 }
861 }
862
863 *pending_binding_requests = temp;
864 let bind_requests_remaining = pending_binding_requests.len();
865 let bind_requests_removed = initial_size - bind_requests_remaining;
866 if bind_requests_removed > 0 {
867 trace!(
868 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
869 self.get_name(),
870 bind_requests_removed,
871 bind_requests_remaining,
872 );
873 }
874 }
875
876 pub(crate) fn handle_inbound_binding_success(
879 &mut self,
880 id: TransactionId,
881 ) -> Option<BindingRequest> {
882 self.invalidate_pending_binding_requests(Instant::now());
883
884 let pending_binding_requests = &mut self.pending_binding_requests;
885 for i in 0..pending_binding_requests.len() {
886 if pending_binding_requests[i].transaction_id == id {
887 let valid_binding_request = pending_binding_requests.remove(i);
888 return Some(valid_binding_request);
889 }
890 }
891 None
892 }
893
894 pub(crate) fn handle_inbound(
896 &mut self,
897 m: &mut Message,
898 local_index: usize,
899 remote_addr: SocketAddr,
900 ) -> Result<()> {
901 if m.typ.method != METHOD_BINDING
902 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
903 || m.typ.class == CLASS_REQUEST
904 || m.typ.class == CLASS_INDICATION)
905 {
906 trace!(
907 "[{}]: unhandled STUN from {} to {} class({}) method({})",
908 self.get_name(),
909 remote_addr,
910 self.local_candidates[local_index],
911 m.typ.class,
912 m.typ.method
913 );
914 return Err(Error::ErrUnhandledStunpacket);
915 }
916
917 if self.is_controlling {
918 if m.contains(ATTR_ICE_CONTROLLING) {
919 debug!(
920 "[{}]: inbound isControlling && a.isControlling == true",
921 self.get_name(),
922 );
923 return Err(Error::ErrUnexpectedStunrequestMessage);
924 } else if m.contains(ATTR_USE_CANDIDATE) {
925 debug!(
926 "[{}]: useCandidate && a.isControlling == true",
927 self.get_name(),
928 );
929 return Err(Error::ErrUnexpectedStunrequestMessage);
930 }
931 } else if m.contains(ATTR_ICE_CONTROLLED) {
932 debug!(
933 "[{}]: inbound isControlled && a.isControlling == false",
934 self.get_name(),
935 );
936 return Err(Error::ErrUnexpectedStunrequestMessage);
937 }
938
939 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
940 debug!(
941 "[{}]: ufrag_pwd.remote_credentials.is_none",
942 self.get_name(),
943 );
944 return Err(Error::ErrPasswordEmpty);
945 };
946
947 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
948 if m.typ.class == CLASS_SUCCESS_RESPONSE {
949 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
950 {
951 warn!(
952 "[{}]: discard message from ({}), {}",
953 self.get_name(),
954 remote_addr,
955 err
956 );
957 return Err(err);
958 }
959
960 if let Some(remote_index) = &remote_candidate_index {
961 self.handle_success_response(m, local_index, *remote_index, remote_addr);
962 } else {
963 warn!(
964 "[{}]: discard success message from ({}), no such remote",
965 self.get_name(),
966 remote_addr
967 );
968 return Err(Error::ErrUnhandledStunpacket);
969 }
970 } else if m.typ.class == CLASS_REQUEST {
971 {
972 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
973 + ":"
974 + remote_credentials.ufrag.as_str();
975 if let Err(err) = assert_inbound_username(m, &username) {
976 warn!(
977 "[{}]: discard message from ({}), {}",
978 self.get_name(),
979 remote_addr,
980 err
981 );
982 return Err(err);
983 } else if let Err(err) = assert_inbound_message_integrity(
984 m,
985 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
986 ) {
987 warn!(
988 "[{}]: discard message from ({}), {}",
989 self.get_name(),
990 remote_addr,
991 err
992 );
993 return Err(err);
994 }
995 }
996
997 if remote_candidate_index.is_none() {
998 let (ip, port, network_type) =
999 (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
1000
1001 let prflx_candidate_config = CandidatePeerReflexiveConfig {
1002 base_config: CandidateConfig {
1003 network: network_type.to_string(),
1004 address: ip.to_string(),
1005 port,
1006 component: self.local_candidates[local_index].component(),
1007 ..CandidateConfig::default()
1008 },
1009 rel_addr: "".to_owned(),
1010 rel_port: 0,
1011 };
1012
1013 match prflx_candidate_config.new_candidate_peer_reflexive() {
1014 Ok(prflx_candidate) => {
1015 let _ = self.add_remote_candidate(prflx_candidate);
1016 remote_candidate_index = Some(self.remote_candidates.len() - 1);
1017 }
1018 Err(err) => {
1019 error!(
1020 "[{}]: Failed to create new remote prflx candidate ({})",
1021 self.get_name(),
1022 err
1023 );
1024 return Err(err);
1025 }
1026 };
1027
1028 debug!(
1029 "[{}]: adding a new peer-reflexive candidate: {} ",
1030 self.get_name(),
1031 remote_addr
1032 );
1033 }
1034
1035 trace!(
1036 "[{}]: inbound STUN (Request) from {} to {}",
1037 self.get_name(),
1038 remote_addr,
1039 self.local_candidates[local_index]
1040 );
1041
1042 if let Some(remote_index) = &remote_candidate_index {
1043 self.handle_binding_request(m, local_index, *remote_index);
1044 }
1045 }
1046
1047 if let Some(remote_index) = remote_candidate_index {
1048 self.remote_candidates[remote_index].seen(false);
1049 }
1050
1051 Ok(())
1052 }
1053
1054 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1057 self.find_remote_candidate(remote_addr)
1058 .is_some_and(|remote_index| {
1059 self.remote_candidates[remote_index].seen(false);
1060 true
1061 })
1062 }
1063
1064 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1065 let peer_addr = self.remote_candidates[remote_index].addr();
1066 let local_addr = self.local_candidates[local_index].addr();
1067 let transport_protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1068 TransportProtocol::TCP
1069 } else {
1070 TransportProtocol::UDP
1071 };
1072
1073 self.write_outs.push_back(TransportMessage {
1074 now: Instant::now(),
1075 transport: TransportContext {
1076 local_addr,
1077 peer_addr,
1078 ecn: None,
1079 transport_protocol,
1080 },
1081 message: BytesMut::from(&msg.raw[..]),
1082 });
1083
1084 self.local_candidates[local_index].seen(true);
1085 }
1086
1087 fn handle_inbound_candidate_msg(
1088 &mut self,
1089 local_index: usize,
1090 msg: TransportMessage<BytesMut>,
1091 ) -> Result<()> {
1092 if is_stun_message(&msg.message) {
1093 let mut m = Message {
1094 raw: msg.message.to_vec(),
1095 ..Message::default()
1096 };
1097
1098 if let Err(err) = m.decode() {
1099 warn!(
1100 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1101 self.get_name(),
1102 msg.transport.local_addr,
1103 msg.transport.peer_addr,
1104 err
1105 );
1106 Err(err)
1107 } else {
1108 self.handle_inbound(&mut m, local_index, msg.transport.peer_addr)
1109 }
1110 } else {
1111 if !self.validate_non_stun_traffic(msg.transport.peer_addr) {
1112 warn!(
1113 "[{}]: Discarded message, not a valid remote candidate from {}",
1114 self.get_name(),
1115 msg.transport.peer_addr,
1116 );
1117 } else {
1118 warn!(
1119 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1120 self.get_name(),
1121 msg.transport.peer_addr
1122 );
1123 }
1124 Err(Error::ErrNonStunmessage)
1125 }
1126 }
1127
1128 pub(crate) fn get_name(&self) -> &str {
1129 if self.is_controlling {
1130 "controlling"
1131 } else {
1132 "controlled"
1133 }
1134 }
1135
1136 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1137 self.selected_pair
1138 }
1139
1140 pub(crate) fn get_best_available_pair(&self) -> Option<usize> {
1141 let mut best_pair_index: Option<usize> = None;
1142
1143 for (index, p) in self.candidate_pairs.iter().enumerate() {
1144 if p.state == CandidatePairState::Failed {
1145 continue;
1146 }
1147
1148 if let Some(pair_index) = &mut best_pair_index {
1149 let b = &self.candidate_pairs[*pair_index];
1150 if b.priority() < p.priority() {
1151 *pair_index = index;
1152 }
1153 } else {
1154 best_pair_index = Some(index);
1155 }
1156 }
1157
1158 best_pair_index
1159 }
1160
1161 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1162 let mut best_pair_index: Option<usize> = None;
1163
1164 for (index, p) in self.candidate_pairs.iter().enumerate() {
1165 if p.state != CandidatePairState::Succeeded {
1166 continue;
1167 }
1168
1169 if let Some(pair_index) = &mut best_pair_index {
1170 let b = &self.candidate_pairs[*pair_index];
1171 if b.priority() < p.priority() {
1172 *pair_index = index;
1173 }
1174 } else {
1175 best_pair_index = Some(index);
1176 }
1177 }
1178
1179 best_pair_index
1180 }
1181}