use super::*;
use crate::errors::*;
use crate::util::*;
use stun::message::*;
use async_trait::async_trait;
use crc32fast::Hasher;
use std::fmt;
use std::ops::Add;
use std::sync::atomic::{AtomicU16, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{broadcast, Mutex};
#[derive(Default)]
pub struct CandidateBaseConfig {
pub candidate_id: String,
pub network: String,
pub address: String,
pub port: u16,
pub component: u16,
pub priority: u32,
pub foundation: String,
pub conn: Option<Arc<dyn util::Conn + Send + Sync>>,
pub initialized_ch: Option<broadcast::Receiver<()>>,
}
pub(crate) type OnClose = fn() -> Result<(), Error>;
pub struct CandidateBase {
pub(crate) id: String,
pub(crate) network_type: AtomicU8,
pub(crate) candidate_type: CandidateType,
pub(crate) component: AtomicU16,
pub(crate) address: String,
pub(crate) port: u16,
pub(crate) related_address: Option<CandidateRelatedAddress>,
pub(crate) tcp_type: TcpType,
pub(crate) resolved_addr: Mutex<SocketAddr>,
pub(crate) last_sent: AtomicU64,
pub(crate) last_received: AtomicU64,
pub(crate) conn: Option<Arc<dyn util::Conn + Send + Sync>>,
pub(crate) agent_internal: Option<Arc<Mutex<AgentInternal>>>,
pub(crate) closed_ch: Arc<Mutex<Option<broadcast::Sender<()>>>>,
pub(crate) foundation_override: String,
pub(crate) priority_override: u32,
pub(crate) network: String,
pub(crate) relay_client: Option<Arc<turn::client::Client>>,
}
impl Default for CandidateBase {
fn default() -> Self {
CandidateBase {
id: String::new(),
network_type: AtomicU8::new(0),
candidate_type: CandidateType::default(),
component: AtomicU16::new(0),
address: String::new(),
port: 0,
related_address: None,
tcp_type: TcpType::default(),
resolved_addr: Mutex::new(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)),
last_sent: AtomicU64::new(0),
last_received: AtomicU64::new(0),
conn: None,
agent_internal: None,
closed_ch: Arc::new(Mutex::new(None)),
foundation_override: String::new(),
priority_override: 0,
network: String::new(),
relay_client: None,
}
}
}
impl fmt::Display for CandidateBase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(related_address) = self.related_address() {
write!(
f,
"{} {} {}:{}{}",
self.network_type(),
self.candidate_type(),
self.address(),
self.port(),
related_address,
)
} else {
write!(
f,
"{} {} {}:{}",
self.network_type(),
self.candidate_type(),
self.address(),
self.port(),
)
}
}
}
#[async_trait]
impl Candidate for CandidateBase {
fn foundation(&self) -> String {
if !self.foundation_override.is_empty() {
return self.foundation_override.clone();
}
let mut buf = vec![];
buf.extend_from_slice(self.candidate_type().to_string().as_bytes());
buf.extend_from_slice(self.address.as_bytes());
buf.extend_from_slice(self.network_type().to_string().as_bytes());
let mut hasher = Hasher::new();
hasher.update(&buf);
let checksum = hasher.finalize();
format!("{}", checksum)
}
fn id(&self) -> String {
self.id.clone()
}
fn component(&self) -> u16 {
self.component.load(Ordering::SeqCst)
}
fn set_component(&self, component: u16) {
self.component.store(component, Ordering::SeqCst);
}
fn last_received(&self) -> SystemTime {
UNIX_EPOCH.add(Duration::from_nanos(
self.last_received.load(Ordering::SeqCst),
))
}
fn last_sent(&self) -> SystemTime {
UNIX_EPOCH.add(Duration::from_nanos(self.last_sent.load(Ordering::SeqCst)))
}
fn network_type(&self) -> NetworkType {
NetworkType::from(self.network_type.load(Ordering::SeqCst))
}
fn address(&self) -> String {
self.address.clone()
}
fn port(&self) -> u16 {
self.port
}
fn priority(&self) -> u32 {
if self.priority_override != 0 {
return self.priority_override;
}
(1 << 24) * (self.candidate_type().preference() as u32)
+ (1 << 8) * (self.local_preference() as u32)
+ (256 - self.component() as u32)
}
fn related_address(&self) -> Option<CandidateRelatedAddress> {
if let Some(related_address) = &self.related_address {
Some(related_address.clone())
} else {
None
}
}
fn candidate_type(&self) -> CandidateType {
self.candidate_type
}
fn tcp_type(&self) -> TcpType {
self.tcp_type
}
fn marshal(&self) -> String {
let mut val = format!(
"{} {} {} {} {} {} typ {}",
self.foundation(),
self.component(),
self.network_type().network_short(),
self.priority(),
self.address(),
self.port(),
self.candidate_type()
);
if self.tcp_type != TcpType::Unspecified {
val += format!(" tcptype {}", self.tcp_type()).as_str();
}
if let Some(related_address) = self.related_address() {
val += format!(
" raddr {} rport {}",
related_address.address, related_address.port,
)
.as_str();
}
val
}
async fn addr(&self) -> SocketAddr {
let resolved_addr = self.resolved_addr.lock().await;
*resolved_addr
}
async fn close(&self) -> Result<(), Error> {
{
let mut closed_ch = self.closed_ch.lock().await;
if closed_ch.is_none() {
return Err(ERR_CLOSED.to_owned());
}
closed_ch.take();
}
if let Some(relay_client) = &self.relay_client {
relay_client.close().await
} else {
Ok(())
}
}
fn seen(&self, outbound: bool) {
let d = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d,
Err(_) => Duration::from_secs(0),
};
if outbound {
self.set_last_sent(d)
} else {
self.set_last_received(d)
}
}
async fn write_to(
&self,
raw: &[u8],
dst: &(dyn Candidate + Send + Sync),
) -> Result<usize, Error> {
let n = if let Some(conn) = &self.conn {
let addr = dst.addr().await;
conn.send_to(raw, addr).await?
} else {
0
};
self.seen(true);
Ok(n)
}
fn equal(&self, other: &dyn Candidate) -> bool {
self.network_type() == other.network_type()
&& self.candidate_type() == other.candidate_type()
&& self.address() == other.address()
&& self.port() == other.port()
&& self.tcp_type() == other.tcp_type()
&& self.related_address() == other.related_address()
}
async fn set_ip(&self, ip: &IpAddr) -> Result<(), Error> {
let network_type = determine_network_type(&self.network, ip)?;
self.network_type
.store(network_type as u8, Ordering::SeqCst);
let mut resolved_addr = self.resolved_addr.lock().await;
*resolved_addr = create_addr(network_type, *ip, self.port);
Ok(())
}
fn get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>> {
self.conn.as_ref()
}
fn get_agent(&self) -> Option<&Arc<Mutex<AgentInternal>>> {
self.agent_internal.as_ref()
}
fn get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>> {
self.closed_ch.clone()
}
}
impl CandidateBase {
pub fn set_last_received(&self, d: Duration) {
self.last_received
.store(d.as_nanos() as u64, Ordering::SeqCst);
}
pub fn set_last_sent(&self, d: Duration) {
self.last_sent.store(d.as_nanos() as u64, Ordering::SeqCst);
}
pub fn local_preference(&self) -> u16 {
if self.network_type().is_tcp() {
let other_pref: u16 = 8191;
let direction_pref: u16 = match self.candidate_type() {
CandidateType::Host | CandidateType::Relay => match self.tcp_type() {
TcpType::Active => 6,
TcpType::Passive => 4,
TcpType::SimultaneousOpen => 2,
TcpType::Unspecified => 0,
},
CandidateType::PeerReflexive | CandidateType::ServerReflexive => {
match self.tcp_type() {
TcpType::SimultaneousOpen => 6,
TcpType::Active => 4,
TcpType::Passive => 2,
TcpType::Unspecified => 0,
}
}
CandidateType::Unspecified => 0,
};
(1 << 13) * direction_pref + other_pref
} else {
DEFAULT_LOCAL_PREFERENCE
}
}
pub(crate) async fn recv_loop(
candidate: Arc<dyn Candidate + Send + Sync>,
agent_internal: Arc<Mutex<AgentInternal>>,
mut closed_ch_rx: broadcast::Receiver<()>,
initialized_ch: Option<broadcast::Receiver<()>>,
conn: Arc<dyn util::Conn + Send + Sync>,
addr: SocketAddr,
) -> Result<(), Error> {
if let Some(mut initialized_ch) = initialized_ch {
tokio::select! {
_ = initialized_ch.recv() => {}
_ = closed_ch_rx.recv() => return Err(ERR_CLOSED.to_owned()),
}
}
let mut buffer = vec![0u8; RECEIVE_MTU];
let mut n;
let mut src_addr;
loop {
tokio::select! {
result = conn.recv_from(&mut buffer) => {
match result {
Ok((num, src)) => {
n = num;
src_addr = src;
}
Err(err) => return Err(Error::new(err.to_string())),
}
},
_ = closed_ch_rx.recv() => return Err(ERR_CLOSED.to_owned()),
}
CandidateBase::handle_inbound_candidate_msg(
&candidate,
&agent_internal,
&buffer[..n],
src_addr,
addr,
)
.await;
}
}
async fn handle_inbound_candidate_msg(
c: &Arc<dyn Candidate + Send + Sync>,
agent_internal: &Arc<Mutex<AgentInternal>>,
buf: &[u8],
src_addr: SocketAddr,
addr: SocketAddr,
) {
if stun::message::is_message(buf) {
let mut m = Message {
raw: vec![],
..Default::default()
};
m.raw.extend_from_slice(buf);
if let Err(err) = m.decode() {
log::warn!(
"Failed to handle decode ICE from {} to {}: {}",
addr,
src_addr,
err
);
} else {
let agent_internal_clone = Arc::clone(agent_internal);
let mut ai = agent_internal.lock().await;
ai.handle_inbound(&mut m, c, src_addr, agent_internal_clone)
.await;
}
} else {
let ai = agent_internal.lock().await;
if !ai.validate_non_stun_traffic(c, src_addr).await {
log::warn!(
"Discarded message from {}, not a valid remote candidate",
c.addr().await
);
return;
} else if let Err(err) = ai.agent_conn.buffer.write(buf).await {
log::warn!("failed to write packet: {}", err);
}
}
}
}