use crate::{Executor, IpMode};
use parking_lot::RwLock;
use recv::*;
use send::*;
use socket2::{Domain, Protocol, Socket as Socket2, Type};
use std::{
collections::HashMap,
io::{Error, ErrorKind},
net::SocketAddr,
sync::Arc,
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
mod filter;
mod recv;
mod send;
pub use filter::{
rate_limiter::{RateLimiter, RateLimiterBuilder},
FilterConfig,
};
pub use recv::InboundPacket;
pub use send::OutboundPacket;
pub struct SocketConfig {
pub executor: Box<dyn Executor + Send + Sync>,
pub socket_addr: SocketAddr,
pub filter_config: FilterConfig,
pub ban_duration: Option<Duration>,
pub expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
pub local_node_id: enr::NodeId,
}
pub struct Socket {
pub send: mpsc::Sender<OutboundPacket>,
pub recv: mpsc::Receiver<InboundPacket>,
sender_exit: Option<oneshot::Sender<()>>,
recv_exit: Option<oneshot::Sender<()>>,
}
impl Socket {
pub(crate) async fn new_socket(
socket_addr: &SocketAddr,
ip_mode: IpMode,
) -> Result<tokio::net::UdpSocket, Error> {
match ip_mode {
IpMode::Ip4 => match socket_addr {
SocketAddr::V6(_) => Err(Error::new(
ErrorKind::InvalidInput,
"Cannot create an ipv4 socket from an ipv6 address",
)),
ip4 => tokio::net::UdpSocket::bind(ip4).await,
},
IpMode::Ip6 {
enable_mapped_addresses,
} => {
let addr = match socket_addr {
SocketAddr::V4(_) => Err(Error::new(
ErrorKind::InvalidInput,
"Cannot create an ipv6 socket from an ipv4 address",
)),
SocketAddr::V6(ip6) => Ok((*ip6).into()),
}?;
let socket = Socket2::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
let only_v6 = !enable_mapped_addresses;
socket.set_only_v6(only_v6)?;
socket.set_nonblocking(true)?;
socket.bind(&addr)?;
tokio::net::UdpSocket::from_std(socket.into())
}
}
}
pub(crate) fn new(socket: tokio::net::UdpSocket, config: SocketConfig) -> Result<Self, Error> {
let recv_udp = Arc::new(socket);
let send_udp = recv_udp.clone();
let recv_config = RecvHandlerConfig {
filter_config: config.filter_config,
executor: config.executor.clone(),
recv: recv_udp,
local_node_id: config.local_node_id,
expected_responses: config.expected_responses,
ban_duration: config.ban_duration,
};
let (recv, recv_exit) = RecvHandler::spawn(recv_config);
let (send, sender_exit) = SendHandler::spawn(config.executor.clone(), send_udp);
Ok(Socket {
send,
recv,
sender_exit: Some(sender_exit),
recv_exit: Some(recv_exit),
})
}
}
impl std::ops::Drop for Socket {
fn drop(&mut self) {
let _ = self
.sender_exit
.take()
.expect("Exit always exists")
.send(());
let _ = self.recv_exit.take().expect("Exit always exists").send(());
}
}