use crate::service::peer_addr::GetPeerAddr;
use crate::service::Service;
use conjure_error::Error;
use socket2::{Domain, SockAddr, SockRef, Socket, TcpKeepalive, Type};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use std::{fs, io};
use tokio::net::{TcpListener, TcpStream};
use tokio::time;
use witchcraft_log::warn;
const TCP_KEEPALIVE: Duration = Duration::from_secs(3 * 60);
pub struct AcceptService {
listener: TcpListener,
}
impl AcceptService {
pub fn new(port: u16) -> Result<Self, Error> {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
let listener =
Socket::new(Domain::IPV4, Type::STREAM, None).map_err(Error::internal_safe)?;
listener
.set_nonblocking(true)
.map_err(Error::internal_safe)?;
listener
.set_reuse_address(true)
.map_err(Error::internal_safe)?;
listener
.bind(&SockAddr::from(addr))
.map_err(Error::internal_safe)?;
listener.listen(somaxconn()).map_err(Error::internal_safe)?;
let listener = TcpListener::from_std(listener.into()).map_err(Error::internal_safe)?;
Ok(AcceptService { listener })
}
}
impl Service<()> for AcceptService {
type Response = TcpStream;
async fn call(&self, _: ()) -> Self::Response {
loop {
match self.listener.accept().await {
Ok((socket, _)) => match setup_socket(&socket) {
Ok(()) => return socket,
Err(e) => warn!("error configuring socket", error: Error::internal_safe(e)),
},
Err(e) => match e.raw_os_error() {
Some(libc::EINTR) => {}
Some(libc::EMFILE | libc::ENFILE | libc::ENOBUFS | libc::ENOMEM) => {
warn!(
"hit resource limit accepting socket",
error: Error::internal_safe(e),
);
time::sleep(Duration::from_secs(1)).await;
}
_ => warn!("error accepting socket", error: Error::internal_safe(e)),
},
}
}
}
}
fn somaxconn() -> i32 {
fs::read_to_string("/proc/sys/net/core/somaxconn")
.ok()
.and_then(|s| s.trim().parse::<i32>().ok())
.unwrap_or(128)
}
fn setup_socket(stream: &TcpStream) -> io::Result<()> {
stream.set_nodelay(true)?;
SockRef::from(stream).set_tcp_keepalive(&TcpKeepalive::new().with_time(TCP_KEEPALIVE))?;
Ok(())
}
impl GetPeerAddr for TcpStream {
fn peer_addr(&self) -> Result<SocketAddr, Error> {
self.peer_addr().map_err(Error::internal_safe)
}
}