1#[cfg(test)]
2mod agent_test;
3
4pub mod agent_config;
5pub mod agent_selector;
6pub mod agent_stats;
7
8use agent_config::*;
9use bytes::BytesMut;
10use log::{debug, error, info, trace, warn};
11use std::collections::VecDeque;
12use std::net::{Ipv4Addr, SocketAddr};
13use std::time::{Duration, Instant};
14use stun::attributes::*;
15use stun::fingerprint::*;
16use stun::integrity::*;
17use stun::message::*;
18use stun::textattrs::*;
19use stun::xoraddr::*;
20
21use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
22use crate::candidate::{candidate_pair::*, *};
23use crate::network_type::NetworkType;
24use crate::rand::*;
25use crate::state::*;
26use crate::url::*;
27use shared::error::*;
28use shared::{Protocol, Transmit, TransportContext};
29
30const ZERO_DURATION: Duration = Duration::from_secs(0);
31
32#[derive(Debug, Clone)]
33pub(crate) struct BindingRequest {
34 pub(crate) timestamp: Instant,
35 pub(crate) transaction_id: TransactionId,
36 pub(crate) destination: SocketAddr,
37 pub(crate) is_use_candidate: bool,
38}
39
40impl Default for BindingRequest {
41 fn default() -> Self {
42 Self {
43 timestamp: Instant::now(),
44 transaction_id: TransactionId::default(),
45 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
46 is_use_candidate: false,
47 }
48 }
49}
50
51#[derive(Default, Clone)]
52pub struct Credentials {
53 pub ufrag: String,
54 pub pwd: String,
55}
56
57#[derive(Default, Clone)]
58pub(crate) struct UfragPwd {
59 pub(crate) local_credentials: Credentials,
60 pub(crate) remote_credentials: Option<Credentials>,
61}
62
63fn assert_inbound_username(m: &Message, expected_username: &str) -> Result<()> {
64 let mut username = Username::new(ATTR_USERNAME, String::new());
65 username.get_from(m)?;
66
67 if username.to_string() != expected_username {
68 return Err(Error::Other(format!(
69 "{:?} expected({}) actual({})",
70 Error::ErrMismatchUsername,
71 expected_username,
72 username,
73 )));
74 }
75
76 Ok(())
77}
78
79fn assert_inbound_message_integrity(m: &mut Message, key: &[u8]) -> Result<()> {
80 let message_integrity_attr = MessageIntegrity(key.to_vec());
81 message_integrity_attr.check(m)
82}
83
84pub enum Event {
85 ConnectionStateChange(ConnectionState),
86 SelectedCandidatePairChange(Box<Candidate>, Box<Candidate>),
87}
88
89pub struct Agent {
91 pub(crate) tie_breaker: u64,
92 pub(crate) is_controlling: bool,
93 pub(crate) lite: bool,
94
95 pub(crate) start_time: Instant,
96
97 pub(crate) connection_state: ConnectionState,
98 pub(crate) last_connection_state: ConnectionState,
99
100 pub(crate) ufrag_pwd: UfragPwd,
102
103 pub(crate) local_candidates: Vec<Candidate>,
104 pub(crate) remote_candidates: Vec<Candidate>,
105 pub(crate) candidate_pairs: Vec<CandidatePair>,
106 pub(crate) nominated_pair: Option<usize>,
107 pub(crate) selected_pair: Option<usize>,
108
109 pub(crate) pending_binding_requests: Vec<BindingRequest>,
111
112 pub(crate) insecure_skip_verify: bool,
114 pub(crate) max_binding_requests: u16,
115 pub(crate) host_acceptance_min_wait: Duration,
116 pub(crate) srflx_acceptance_min_wait: Duration,
117 pub(crate) prflx_acceptance_min_wait: Duration,
118 pub(crate) relay_acceptance_min_wait: Duration,
119 pub(crate) disconnected_timeout: Duration,
122 pub(crate) failed_timeout: Duration,
125 pub(crate) keepalive_interval: Duration,
128 pub(crate) check_interval: Duration,
130 pub(crate) checking_duration: Instant,
131 pub(crate) last_checking_time: Instant,
132
133 pub(crate) candidate_types: Vec<CandidateType>,
134 pub(crate) urls: Vec<Url>,
135
136 pub(crate) transmits: VecDeque<Transmit<BytesMut>>,
137 pub(crate) events: VecDeque<Event>,
138}
139
140impl Agent {
141 pub fn new(config: AgentConfig) -> Result<Self> {
143 let candidate_types = if config.candidate_types.is_empty() {
144 default_candidate_types()
145 } else {
146 config.candidate_types.clone()
147 };
148
149 if config.lite && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
150 {
151 return Err(Error::ErrLiteUsingNonHostCandidates);
152 }
153
154 if !config.urls.is_empty()
155 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
156 && !contains_candidate_type(CandidateType::Relay, &candidate_types)
157 {
158 return Err(Error::ErrUselessUrlsProvided);
159 }
160
161 let mut agent = Self {
162 tie_breaker: rand::random::<u64>(),
163 is_controlling: config.is_controlling,
164 lite: config.lite,
165
166 start_time: Instant::now(),
167
168 nominated_pair: None,
169 selected_pair: None,
170 candidate_pairs: vec![],
171
172 connection_state: ConnectionState::New,
173
174 insecure_skip_verify: config.insecure_skip_verify,
175
176 max_binding_requests: if let Some(max_binding_requests) = config.max_binding_requests {
180 max_binding_requests
181 } else {
182 DEFAULT_MAX_BINDING_REQUESTS
183 },
184 host_acceptance_min_wait: if let Some(host_acceptance_min_wait) =
185 config.host_acceptance_min_wait
186 {
187 host_acceptance_min_wait
188 } else {
189 DEFAULT_HOST_ACCEPTANCE_MIN_WAIT
190 },
191 srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) =
192 config.srflx_acceptance_min_wait
193 {
194 srflx_acceptance_min_wait
195 } else {
196 DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT
197 },
198 prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) =
199 config.prflx_acceptance_min_wait
200 {
201 prflx_acceptance_min_wait
202 } else {
203 DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT
204 },
205 relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) =
206 config.relay_acceptance_min_wait
207 {
208 relay_acceptance_min_wait
209 } else {
210 DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT
211 },
212
213 disconnected_timeout: if let Some(disconnected_timeout) = config.disconnected_timeout {
216 disconnected_timeout
217 } else {
218 DEFAULT_DISCONNECTED_TIMEOUT
219 },
220
221 failed_timeout: if let Some(failed_timeout) = config.failed_timeout {
224 failed_timeout
225 } else {
226 DEFAULT_FAILED_TIMEOUT
227 },
228
229 keepalive_interval: if let Some(keepalive_interval) = config.keepalive_interval {
232 keepalive_interval
233 } else {
234 DEFAULT_KEEPALIVE_INTERVAL
235 },
236
237 check_interval: if config.check_interval == Duration::from_secs(0) {
239 DEFAULT_CHECK_INTERVAL
240 } else {
241 config.check_interval
242 },
243 checking_duration: Instant::now(),
244 last_checking_time: Instant::now(),
245 last_connection_state: ConnectionState::Unspecified,
246
247 ufrag_pwd: UfragPwd::default(),
248
249 local_candidates: vec![],
250 remote_candidates: vec![],
251
252 pending_binding_requests: vec![],
254
255 candidate_types,
256 urls: config.urls.clone(),
257
258 transmits: VecDeque::new(),
259 events: VecDeque::new(),
260 };
261
262 if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd, false) {
264 let _ = agent.close();
265 return Err(err);
266 }
267
268 Ok(agent)
269 }
270
271 pub fn add_local_candidate(&mut self, c: Candidate) -> Result<()> {
273 for cand in &self.local_candidates {
274 if cand.equal(&c) {
275 return Ok(());
276 }
277 }
278
279 self.local_candidates.push(c);
280
281 for remote_index in 0..self.remote_candidates.len() {
282 self.add_pair(self.local_candidates.len() - 1, remote_index);
283 }
284
285 self.request_connectivity_check();
286
287 Ok(())
288 }
289
290 pub fn add_remote_candidate(&mut self, c: Candidate) -> Result<()> {
292 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
294 warn!(
295 "remote mDNS candidate added, but mDNS is disabled: ({})",
296 c.address()
297 );
298 return Err(Error::ErrMulticastDnsNotSupported);
299 }
300
301 for cand in &self.remote_candidates {
302 if cand.equal(&c) {
303 return Ok(());
304 }
305 }
306
307 self.remote_candidates.push(c);
308
309 for local_index in 0..self.local_candidates.len() {
310 self.add_pair(local_index, self.remote_candidates.len() - 1);
311 }
312
313 self.request_connectivity_check();
314
315 Ok(())
316 }
317
318 pub fn set_remote_credentials(
320 &mut self,
321 remote_ufrag: String,
322 remote_pwd: String,
323 ) -> Result<()> {
324 if remote_ufrag.is_empty() {
325 return Err(Error::ErrRemoteUfragEmpty);
326 } else if remote_pwd.is_empty() {
327 return Err(Error::ErrRemotePwdEmpty);
328 }
329
330 self.ufrag_pwd.remote_credentials = Some(Credentials {
331 ufrag: remote_ufrag,
332 pwd: remote_pwd,
333 });
334
335 Ok(())
336 }
337
338 pub async fn get_remote_credentials(&self) -> Option<&Credentials> {
340 self.ufrag_pwd.remote_credentials.as_ref()
341 }
342
343 pub fn get_local_credentials(&self) -> &Credentials {
345 &self.ufrag_pwd.local_credentials
346 }
347
348 pub fn handle_read(&mut self, msg: Transmit<BytesMut>) -> Result<()> {
349 if let Some(local_index) =
350 self.find_local_candidate(msg.transport.local_addr, msg.transport.protocol)
351 {
352 self.handle_inbound_candidate_msg(
353 local_index,
354 &msg.message,
355 msg.transport.peer_addr,
356 msg.transport.local_addr,
357 )
358 } else {
359 warn!(
360 "[{}]: Discarded message, not a valid local candidate from {:?}:{}",
361 self.get_name(),
362 msg.transport.protocol,
363 msg.transport.local_addr,
364 );
365 Err(Error::ErrUnhandledStunpacket)
366 }
367 }
368
369 pub fn poll_transmit(&mut self) -> Option<Transmit<BytesMut>> {
370 self.transmits.pop_front()
371 }
372
373 pub fn handle_timeout(&mut self, now: Instant) {
374 if self.ufrag_pwd.remote_credentials.is_some()
375 && self.last_checking_time + self.get_timeout_interval() <= now
376 {
377 self.contact(now);
378 }
379 }
380
381 pub fn poll_timeout(&self) -> Option<Instant> {
382 if self.ufrag_pwd.remote_credentials.is_some() {
383 Some(self.last_checking_time + self.get_timeout_interval())
384 } else {
385 None
386 }
387 }
388
389 pub fn poll_event(&mut self) -> Option<Event> {
390 self.events.pop_front()
391 }
392
393 fn get_timeout_interval(&self) -> Duration {
394 let (check_interval, keepalive_interval, disconnected_timeout, failed_timeout) = (
395 self.check_interval,
396 self.keepalive_interval,
397 self.disconnected_timeout,
398 self.failed_timeout,
399 );
400 let mut interval = DEFAULT_CHECK_INTERVAL;
401
402 let mut update_interval = |x: Duration| {
403 if x != ZERO_DURATION && (interval == ZERO_DURATION || interval > x) {
404 interval = x;
405 }
406 };
407
408 match self.last_connection_state {
409 ConnectionState::New | ConnectionState::Checking => {
410 update_interval(check_interval);
412 }
413 ConnectionState::Connected | ConnectionState::Disconnected => {
414 update_interval(keepalive_interval);
415 }
416 _ => {}
417 };
418 update_interval(disconnected_timeout);
420 update_interval(failed_timeout);
421 interval
422 }
423
424 pub fn close(&mut self) -> Result<()> {
426 self.set_selected_pair(None);
427 self.delete_all_candidates(false);
428 self.update_connection_state(ConnectionState::Closed);
429
430 Ok(())
431 }
432
433 pub fn get_selected_candidate_pair(&self) -> Option<(Candidate, Candidate)> {
435 if let Some(pair_index) = self.get_selected_pair() {
436 let candidate_pair = &self.candidate_pairs[pair_index];
437 Some((
438 self.local_candidates[candidate_pair.local_index].clone(),
439 self.remote_candidates[candidate_pair.remote_index].clone(),
440 ))
441 } else {
442 None
443 }
444 }
445
446 pub fn start_connectivity_checks(
448 &mut self,
449 is_controlling: bool,
450 remote_ufrag: String,
451 remote_pwd: String,
452 ) -> Result<()> {
453 debug!(
454 "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}",
455 is_controlling, remote_ufrag, remote_pwd
456 );
457 self.set_remote_credentials(remote_ufrag, remote_pwd)?;
458 self.is_controlling = is_controlling;
459 self.start();
460
461 self.update_connection_state(ConnectionState::Checking);
462 self.request_connectivity_check();
463
464 Ok(())
465 }
466
467 pub fn restart(
470 &mut self,
471 mut ufrag: String,
472 mut pwd: String,
473 keep_local_candidates: bool,
474 ) -> Result<()> {
475 if ufrag.is_empty() {
476 ufrag = generate_ufrag();
477 }
478 if pwd.is_empty() {
479 pwd = generate_pwd();
480 }
481
482 if ufrag.len() * 8 < 24 {
483 return Err(Error::ErrLocalUfragInsufficientBits);
484 }
485 if pwd.len() * 8 < 128 {
486 return Err(Error::ErrLocalPwdInsufficientBits);
487 }
488
489 self.ufrag_pwd.local_credentials.ufrag = ufrag;
491 self.ufrag_pwd.local_credentials.pwd = pwd;
492 self.ufrag_pwd.remote_credentials = None;
493
494 self.pending_binding_requests = vec![];
495
496 self.candidate_pairs = vec![];
497
498 self.set_selected_pair(None);
499 self.delete_all_candidates(keep_local_candidates);
500 self.start();
501
502 if self.connection_state != ConnectionState::New {
505 self.update_connection_state(ConnectionState::Checking);
506 }
507
508 Ok(())
509 }
510
511 pub(crate) fn get_local_candidates(&self) -> &[Candidate] {
513 &self.local_candidates
514 }
515
516 fn contact(&mut self, now: Instant) {
517 if self.connection_state == ConnectionState::Failed {
518 self.last_connection_state = self.connection_state;
521 return;
522 }
523 if self.connection_state == ConnectionState::Checking {
524 if self.last_connection_state != self.connection_state {
526 self.checking_duration = now;
527 }
528
529 if now
531 .checked_duration_since(self.checking_duration)
532 .unwrap_or_else(|| Duration::from_secs(0))
533 > self.disconnected_timeout + self.failed_timeout
534 {
535 self.update_connection_state(ConnectionState::Failed);
536 self.last_connection_state = self.connection_state;
537 return;
538 }
539 }
540
541 self.contact_candidates();
542
543 self.last_connection_state = self.connection_state;
544 self.last_checking_time = now;
545 }
546
547 pub(crate) fn update_connection_state(&mut self, new_state: ConnectionState) {
548 if self.connection_state != new_state {
549 if new_state == ConnectionState::Failed {
551 self.set_selected_pair(None);
552 self.delete_all_candidates(false);
553 }
554
555 info!(
556 "[{}]: Setting new connection state: {}",
557 self.get_name(),
558 new_state
559 );
560 self.connection_state = new_state;
561 self.events
562 .push_back(Event::ConnectionStateChange(new_state));
563 }
564 }
565
566 pub(crate) fn set_selected_pair(&mut self, selected_pair: Option<usize>) {
567 if let Some(pair_index) = selected_pair {
568 trace!(
569 "[{}]: Set selected candidate pair: {:?}",
570 self.get_name(),
571 self.candidate_pairs[pair_index]
572 );
573
574 self.candidate_pairs[pair_index].nominated = true;
575 self.selected_pair = Some(pair_index);
576
577 self.update_connection_state(ConnectionState::Connected);
578
579 let candidate_pair = &self.candidate_pairs[pair_index];
581 self.events.push_back(Event::SelectedCandidatePairChange(
582 Box::new(self.local_candidates[candidate_pair.local_index].clone()),
583 Box::new(self.remote_candidates[candidate_pair.remote_index].clone()),
584 ));
585 } else {
586 self.selected_pair = None;
587 }
588 }
589
590 pub(crate) fn ping_all_candidates(&mut self) {
591 trace!("[{}]: pinging all candidates", self.get_name(),);
592
593 let mut pairs: Vec<(usize, usize)> = vec![];
594
595 {
596 let name = self.get_name().to_string();
597 if self.candidate_pairs.is_empty() {
598 warn!(
599 "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.",
600 name,
601 );
602 }
603 for p in &mut self.candidate_pairs {
604 if p.state == CandidatePairState::Waiting {
605 p.state = CandidatePairState::InProgress;
606 } else if p.state != CandidatePairState::InProgress {
607 continue;
608 }
609
610 if p.binding_request_count > self.max_binding_requests {
611 trace!(
612 "[{}]: max requests reached for pair {}, marking it as failed",
613 name,
614 *p
615 );
616 p.state = CandidatePairState::Failed;
617 } else {
618 p.binding_request_count += 1;
619 let local = p.local_index;
620 let remote = p.remote_index;
621 pairs.push((local, remote));
622 }
623 }
624 }
625
626 for (local, remote) in pairs {
627 self.ping_candidate(local, remote);
628 }
629 }
630
631 pub(crate) fn add_pair(&mut self, local_index: usize, remote_index: usize) {
632 let p = CandidatePair::new(
633 local_index,
634 remote_index,
635 self.local_candidates[local_index].priority(),
636 self.remote_candidates[remote_index].priority(),
637 self.is_controlling,
638 );
639 self.candidate_pairs.push(p);
640 }
641
642 pub(crate) fn find_pair(&self, local_index: usize, remote_index: usize) -> Option<usize> {
643 for (index, p) in self.candidate_pairs.iter().enumerate() {
644 if p.local_index == local_index && p.remote_index == remote_index {
645 return Some(index);
646 }
647 }
648 None
649 }
650
651 pub(crate) fn validate_selected_pair(&mut self) -> bool {
654 let (valid, disconnected_time) = {
655 self.selected_pair.as_ref().map_or_else(
656 || (false, Duration::from_secs(0)),
657 |&pair_index| {
658 let remote_index = self.candidate_pairs[pair_index].remote_index;
659
660 let disconnected_time = Instant::now()
661 .duration_since(self.remote_candidates[remote_index].last_received());
662 (true, disconnected_time)
663 },
664 )
665 };
666
667 if valid {
668 let mut total_time_to_failure = self.failed_timeout;
670 if total_time_to_failure != Duration::from_secs(0) {
671 total_time_to_failure += self.disconnected_timeout;
672 }
673
674 if total_time_to_failure != Duration::from_secs(0)
675 && disconnected_time > total_time_to_failure
676 {
677 self.update_connection_state(ConnectionState::Failed);
678 } else if self.disconnected_timeout != Duration::from_secs(0)
679 && disconnected_time > self.disconnected_timeout
680 {
681 self.update_connection_state(ConnectionState::Disconnected);
682 } else {
683 self.update_connection_state(ConnectionState::Connected);
684 }
685 }
686
687 valid
688 }
689
690 pub(crate) fn check_keepalive(&mut self) {
694 let (local_index, remote_index) = {
695 self.selected_pair
696 .as_ref()
697 .map_or((None, None), |&pair_index| {
698 let p = &self.candidate_pairs[pair_index];
699 (Some(p.local_index), Some(p.remote_index))
700 })
701 };
702
703 if let (Some(local_index), Some(remote_index)) = (local_index, remote_index) {
704 let last_sent =
705 Instant::now().duration_since(self.local_candidates[local_index].last_sent());
706
707 let last_received =
708 Instant::now().duration_since(self.remote_candidates[remote_index].last_received());
709
710 if (self.keepalive_interval != Duration::from_secs(0))
711 && ((last_sent > self.keepalive_interval)
712 || (last_received > self.keepalive_interval))
713 {
714 self.ping_candidate(local_index, remote_index);
717 }
718 }
719 }
720
721 fn request_connectivity_check(&mut self) {
722 if self.ufrag_pwd.remote_credentials.is_some() {
723 self.contact(Instant::now());
724 }
725 }
726
727 pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
732 if !keep_local_candidates {
733 self.local_candidates.clear();
734 }
735 self.remote_candidates.clear();
736 }
737
738 pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option<usize> {
739 let (ip, port) = (addr.ip(), addr.port());
740 for (index, c) in self.remote_candidates.iter().enumerate() {
741 if c.address() == ip.to_string() && c.port() == port {
742 return Some(index);
743 }
744 }
745 None
746 }
747
748 pub(crate) fn find_local_candidate(
749 &self,
750 addr: SocketAddr,
751 protocol: Protocol,
752 ) -> Option<usize> {
753 for (index, c) in self.local_candidates.iter().enumerate() {
754 if c.addr() == addr && c.network_type().to_protocol() == protocol {
755 return Some(index);
756 }
757 }
758 None
759 }
760
761 pub(crate) fn send_binding_request(
762 &mut self,
763 m: &Message,
764 local_index: usize,
765 remote_index: usize,
766 ) {
767 trace!(
768 "[{}]: ping STUN from {} to {}",
769 self.get_name(),
770 local_index,
771 remote_index
772 );
773
774 self.invalidate_pending_binding_requests(Instant::now());
775
776 self.pending_binding_requests.push(BindingRequest {
777 timestamp: Instant::now(),
778 transaction_id: m.transaction_id,
779 destination: self.remote_candidates[remote_index].addr(),
780 is_use_candidate: m.contains(ATTR_USE_CANDIDATE),
781 });
782
783 self.send_stun(m, local_index, remote_index);
784 }
785
786 pub(crate) fn send_binding_success(
787 &mut self,
788 m: &Message,
789 local_index: usize,
790 remote_index: usize,
791 ) {
792 let addr = self.remote_candidates[remote_index].addr();
793 let (ip, port) = (addr.ip(), addr.port());
794 let local_pwd = self.ufrag_pwd.local_credentials.pwd.clone();
795
796 let (out, result) = {
797 let mut out = Message::new();
798 let result = out.build(&[
799 Box::new(m.clone()),
800 Box::new(BINDING_SUCCESS),
801 Box::new(XorMappedAddress { ip, port }),
802 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
803 Box::new(FINGERPRINT),
804 ]);
805 (out, result)
806 };
807
808 if let Err(err) = result {
809 warn!(
810 "[{}]: Failed to handle inbound ICE from: {} to: {} error: {}",
811 self.get_name(),
812 local_index,
813 remote_index,
814 err
815 );
816 } else {
817 self.send_stun(&out, local_index, remote_index);
818 }
819 }
820
821 pub(crate) fn invalidate_pending_binding_requests(&mut self, filter_time: Instant) {
826 let pending_binding_requests = &mut self.pending_binding_requests;
827 let initial_size = pending_binding_requests.len();
828
829 let mut temp = vec![];
830 for binding_request in pending_binding_requests.drain(..) {
831 if filter_time
832 .checked_duration_since(binding_request.timestamp)
833 .map(|duration| duration < MAX_BINDING_REQUEST_TIMEOUT)
834 .unwrap_or(true)
835 {
836 temp.push(binding_request);
837 }
838 }
839
840 *pending_binding_requests = temp;
841 let bind_requests_remaining = pending_binding_requests.len();
842 let bind_requests_removed = initial_size - bind_requests_remaining;
843 if bind_requests_removed > 0 {
844 trace!(
845 "[{}]: Discarded {} binding requests because they expired, still {} remaining",
846 self.get_name(),
847 bind_requests_removed,
848 bind_requests_remaining,
849 );
850 }
851 }
852
853 pub(crate) fn handle_inbound_binding_success(
856 &mut self,
857 id: TransactionId,
858 ) -> Option<BindingRequest> {
859 self.invalidate_pending_binding_requests(Instant::now());
860
861 let pending_binding_requests = &mut self.pending_binding_requests;
862 for i in 0..pending_binding_requests.len() {
863 if pending_binding_requests[i].transaction_id == id {
864 let valid_binding_request = pending_binding_requests.remove(i);
865 return Some(valid_binding_request);
866 }
867 }
868 None
869 }
870
871 pub(crate) fn handle_inbound(
873 &mut self,
874 m: &mut Message,
875 local_index: usize,
876 remote_addr: SocketAddr,
877 ) -> Result<()> {
878 if m.typ.method != METHOD_BINDING
879 || !(m.typ.class == CLASS_SUCCESS_RESPONSE
880 || m.typ.class == CLASS_REQUEST
881 || m.typ.class == CLASS_INDICATION)
882 {
883 trace!(
884 "[{}]: unhandled STUN from {} to {} class({}) method({})",
885 self.get_name(),
886 remote_addr,
887 local_index,
888 m.typ.class,
889 m.typ.method
890 );
891 return Err(Error::ErrUnhandledStunpacket);
892 }
893
894 if self.is_controlling {
895 if m.contains(ATTR_ICE_CONTROLLING) {
896 debug!(
897 "[{}]: inbound isControlling && a.isControlling == true",
898 self.get_name(),
899 );
900 return Err(Error::ErrUnexpectedStunrequestMessage);
901 } else if m.contains(ATTR_USE_CANDIDATE) {
902 debug!(
903 "[{}]: useCandidate && a.isControlling == true",
904 self.get_name(),
905 );
906 return Err(Error::ErrUnexpectedStunrequestMessage);
907 }
908 } else if m.contains(ATTR_ICE_CONTROLLED) {
909 debug!(
910 "[{}]: inbound isControlled && a.isControlling == false",
911 self.get_name(),
912 );
913 return Err(Error::ErrUnexpectedStunrequestMessage);
914 }
915
916 let Some(remote_credentials) = &self.ufrag_pwd.remote_credentials else {
917 debug!(
918 "[{}]: ufrag_pwd.remote_credentials.is_none",
919 self.get_name(),
920 );
921 return Err(Error::ErrPasswordEmpty);
922 };
923
924 let mut remote_candidate_index = self.find_remote_candidate(remote_addr);
925 if m.typ.class == CLASS_SUCCESS_RESPONSE {
926 if let Err(err) = assert_inbound_message_integrity(m, remote_credentials.pwd.as_bytes())
927 {
928 warn!(
929 "[{}]: discard message from ({}), {}",
930 self.get_name(),
931 remote_addr,
932 err
933 );
934 return Err(err);
935 }
936
937 if let Some(remote_index) = &remote_candidate_index {
938 self.handle_success_response(m, local_index, *remote_index, remote_addr);
939 } else {
940 warn!(
941 "[{}]: discard success message from ({}), no such remote",
942 self.get_name(),
943 remote_addr
944 );
945 return Err(Error::ErrUnhandledStunpacket);
946 }
947 } else if m.typ.class == CLASS_REQUEST {
948 {
949 let username = self.ufrag_pwd.local_credentials.ufrag.clone()
950 + ":"
951 + remote_credentials.ufrag.as_str();
952 if let Err(err) = assert_inbound_username(m, &username) {
953 warn!(
954 "[{}]: discard message from ({}), {}",
955 self.get_name(),
956 remote_addr,
957 err
958 );
959 return Err(err);
960 } else if let Err(err) = assert_inbound_message_integrity(
961 m,
962 self.ufrag_pwd.local_credentials.pwd.as_bytes(),
963 ) {
964 warn!(
965 "[{}]: discard message from ({}), {}",
966 self.get_name(),
967 remote_addr,
968 err
969 );
970 return Err(err);
971 }
972 }
973
974 if remote_candidate_index.is_none() {
975 let (ip, port, network_type) =
976 (remote_addr.ip(), remote_addr.port(), NetworkType::Udp4);
977
978 let prflx_candidate_config = CandidatePeerReflexiveConfig {
979 base_config: CandidateConfig {
980 network: network_type.to_string(),
981 address: ip.to_string(),
982 port,
983 component: self.local_candidates[local_index].component(),
984 ..CandidateConfig::default()
985 },
986 rel_addr: "".to_owned(),
987 rel_port: 0,
988 };
989
990 match prflx_candidate_config.new_candidate_peer_reflexive() {
991 Ok(prflx_candidate) => {
992 let _ = self.add_remote_candidate(prflx_candidate);
993 remote_candidate_index = Some(self.remote_candidates.len() - 1);
994 }
995 Err(err) => {
996 error!(
997 "[{}]: Failed to create new remote prflx candidate ({})",
998 self.get_name(),
999 err
1000 );
1001 return Err(err);
1002 }
1003 };
1004
1005 debug!(
1006 "[{}]: adding a new peer-reflexive candidate: {} ",
1007 self.get_name(),
1008 remote_addr
1009 );
1010 }
1011
1012 trace!(
1013 "[{}]: inbound STUN (Request) from {} to {}",
1014 self.get_name(),
1015 remote_addr,
1016 local_index
1017 );
1018
1019 if let Some(remote_index) = &remote_candidate_index {
1020 self.handle_binding_request(m, local_index, *remote_index);
1021 }
1022 }
1023
1024 if let Some(remote_index) = remote_candidate_index {
1025 self.remote_candidates[remote_index].seen(false);
1026 }
1027
1028 Ok(())
1029 }
1030
1031 pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool {
1034 self.find_remote_candidate(remote_addr)
1035 .map_or(false, |remote_index| {
1036 self.remote_candidates[remote_index].seen(false);
1037 true
1038 })
1039 }
1040
1041 pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
1042 let peer_addr = self.remote_candidates[remote_index].addr();
1043 let local_addr = self.local_candidates[local_index].addr();
1044 let protocol = if self.local_candidates[local_index].network_type().is_tcp() {
1045 Protocol::TCP
1046 } else {
1047 Protocol::UDP
1048 };
1049
1050 self.transmits.push_back(Transmit {
1051 now: Instant::now(),
1052 transport: TransportContext {
1053 local_addr,
1054 peer_addr,
1055 ecn: None,
1056 protocol,
1057 },
1058 message: BytesMut::from(&msg.raw[..]),
1059 });
1060
1061 self.local_candidates[local_index].seen(true);
1062 }
1063
1064 fn handle_inbound_candidate_msg(
1065 &mut self,
1066 local_index: usize,
1067 buf: &[u8],
1068 remote_addr: SocketAddr,
1069 local_addr: SocketAddr,
1070 ) -> Result<()> {
1071 if stun::message::is_message(buf) {
1072 let mut m = Message {
1073 raw: vec![],
1074 ..Message::default()
1075 };
1076 m.raw.extend_from_slice(buf);
1078
1079 if let Err(err) = m.decode() {
1080 warn!(
1081 "[{}]: Failed to handle decode ICE from {} to {}: {}",
1082 self.get_name(),
1083 local_addr,
1084 remote_addr,
1085 err
1086 );
1087 Err(err)
1088 } else {
1089 self.handle_inbound(&mut m, local_index, remote_addr)
1090 }
1091 } else {
1092 if !self.validate_non_stun_traffic(remote_addr) {
1093 warn!(
1094 "[{}]: Discarded message, not a valid remote candidate from {}",
1095 self.get_name(),
1096 remote_addr,
1097 );
1098 } else {
1099 error!(
1100 "[{}]: non-STUN traffic message from a valid remote candidate from {}",
1101 self.get_name(),
1102 remote_addr
1103 );
1104 }
1105 Err(Error::ErrNonStunmessage)
1106 }
1107 }
1108
1109 pub(crate) fn get_name(&self) -> &str {
1110 if self.is_controlling {
1111 "controlling"
1112 } else {
1113 "controlled"
1114 }
1115 }
1116
1117 pub(crate) fn get_selected_pair(&self) -> Option<usize> {
1118 self.selected_pair
1119 }
1120
1121 pub(crate) fn get_best_available_candidate_pair(&self) -> Option<usize> {
1122 let mut best_pair_index: Option<usize> = None;
1123
1124 for (index, p) in self.candidate_pairs.iter().enumerate() {
1125 if p.state == CandidatePairState::Failed {
1126 continue;
1127 }
1128
1129 if let Some(pair_index) = &mut best_pair_index {
1130 let b = &self.candidate_pairs[*pair_index];
1131 if b.priority() < p.priority() {
1132 *pair_index = index;
1133 }
1134 } else {
1135 best_pair_index = Some(index);
1136 }
1137 }
1138
1139 best_pair_index
1140 }
1141
1142 pub(crate) fn get_best_valid_candidate_pair(&self) -> Option<usize> {
1143 let mut best_pair_index: Option<usize> = None;
1144
1145 for (index, p) in self.candidate_pairs.iter().enumerate() {
1146 if p.state != CandidatePairState::Succeeded {
1147 continue;
1148 }
1149
1150 if let Some(pair_index) = &mut best_pair_index {
1151 let b = &self.candidate_pairs[*pair_index];
1152 if b.priority() < p.priority() {
1153 *pair_index = index;
1154 }
1155 } else {
1156 best_pair_index = Some(index);
1157 }
1158 }
1159
1160 best_pair_index
1161 }
1162}