use std::io;
use std::net::SocketAddr;
use std::os::fd::RawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use crate::acceptor::{AcceptorConfig, run_acceptor};
use crate::backend::AsyncEventLoop;
use crate::config::Config;
use crate::runtime::handler::AsyncEventHandler;
type LaunchResult = Result<
(
ShutdownHandle,
Vec<thread::JoinHandle<Result<(), crate::error::Error>>>,
),
crate::error::Error,
>;
pub struct ShutdownHandle {
shutdown_flag: Arc<AtomicBool>,
worker_wake_handles: Vec<crate::wakeup::WakeHandle>,
listen_fd: Option<RawFd>,
listen_fd_closed: Option<Arc<AtomicBool>>,
bound_addr: Option<SocketAddr>,
#[cfg_attr(not(has_io_uring), allow(dead_code))]
region_registrar: Arc<crate::region_registry::RegionRegistrar>,
}
impl ShutdownHandle {
pub fn bound_addr(&self) -> Option<SocketAddr> {
self.bound_addr
}
pub fn worker_count(&self) -> usize {
self.worker_wake_handles.len()
}
pub fn worker_wake_handle(&self, idx: usize) -> Option<crate::wakeup::WakeHandle> {
self.worker_wake_handles.get(idx).cloned()
}
pub fn register_region(
&self,
region: crate::buffer::fixed::MemoryRegion,
) -> io::Result<crate::buffer::fixed::RegionId> {
#[cfg(has_io_uring)]
{
self.region_registrar.register(region)
}
#[cfg(not(has_io_uring))]
{
let _ = region;
Err(io::Error::new(
io::ErrorKind::Unsupported,
"register_region requires the io_uring backend",
))
}
}
pub fn unregister_region(&self, id: crate::buffer::fixed::RegionId) -> io::Result<()> {
#[cfg(has_io_uring)]
{
self.region_registrar.unregister(id)
}
#[cfg(not(has_io_uring))]
{
let _ = id;
Err(io::Error::new(
io::ErrorKind::Unsupported,
"unregister_region requires the io_uring backend",
))
}
}
pub fn wait_on_signal(&self) -> crate::signal::Signal {
let sig = crate::signal::wait();
self.shutdown();
sig
}
pub fn shutdown(&self) {
self.shutdown_flag.store(true, Ordering::Release);
if let (Some(fd), Some(closed)) = (self.listen_fd, &self.listen_fd_closed)
&& !closed.swap(true, Ordering::AcqRel)
{
unsafe {
libc::close(fd);
}
}
for wh in &self.worker_wake_handles {
wh.wake();
}
}
}
impl Drop for ShutdownHandle {
fn drop(&mut self) {
self.shutdown();
}
}
enum BindAddr {
Tcp(SocketAddr),
Unix(PathBuf),
}
fn getsockname_v4_v6(fd: RawFd) -> Option<SocketAddr> {
use std::mem::{MaybeUninit, size_of};
let mut storage: MaybeUninit<libc::sockaddr_storage> = MaybeUninit::zeroed();
let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let rc = unsafe {
libc::getsockname(
fd,
storage.as_mut_ptr() as *mut libc::sockaddr,
&mut len as *mut _,
)
};
if rc != 0 {
return None;
}
let s = unsafe { storage.assume_init() };
match s.ss_family as i32 {
libc::AF_INET => {
let addr_in: libc::sockaddr_in =
unsafe { std::ptr::read(&s as *const _ as *const libc::sockaddr_in) };
let ip = std::net::Ipv4Addr::from(u32::from_be(addr_in.sin_addr.s_addr));
let port = u16::from_be(addr_in.sin_port);
Some(SocketAddr::from((ip, port)))
}
libc::AF_INET6 => {
let addr_in6: libc::sockaddr_in6 =
unsafe { std::ptr::read(&s as *const _ as *const libc::sockaddr_in6) };
let ip = std::net::Ipv6Addr::from(addr_in6.sin6_addr.s6_addr);
let port = u16::from_be(addr_in6.sin6_port);
Some(SocketAddr::from((ip, port)))
}
_ => None,
}
}
pub struct RinglineBuilder {
config: Config,
bind_addr: Option<BindAddr>,
}
impl RinglineBuilder {
pub fn new(config: Config) -> Self {
RinglineBuilder {
config,
bind_addr: None,
}
}
pub fn bind(mut self, addr: SocketAddr) -> Self {
self.bind_addr = Some(BindAddr::Tcp(addr));
self
}
pub fn bind_unix(mut self, path: impl AsRef<Path>) -> Self {
self.bind_addr = Some(BindAddr::Unix(path.as_ref().to_path_buf()));
self
}
pub fn bind_udp(mut self, addr: SocketAddr) -> Self {
self.config.udp_bind.push(addr);
self.config.udp_connect_peers.push(None);
self
}
pub fn bind_udp_connected(mut self, local: SocketAddr, peer: SocketAddr) -> Self {
self.config.udp_bind.push(local);
self.config.udp_connect_peers.push(Some(peer));
self
}
pub fn launch<A: AsyncEventHandler>(self) -> LaunchResult {
self.launch_inner(
|worker_id,
config,
accept_rx,
eventfd,
shutdown_flag,
resolve_rx,
resolve_tx,
resolver,
spawn_rx,
spawn_tx,
spawner,
blocking_rx,
blocking_tx,
blocking_pool,
region_rx,
startup_tx| {
let handler = A::create_for_worker(worker_id);
#[cfg(has_io_uring)]
let new_result = AsyncEventLoop::new(
&config,
handler,
accept_rx,
eventfd,
shutdown_flag,
resolve_rx,
resolve_tx,
resolver,
spawn_rx,
spawn_tx,
spawner,
blocking_rx,
blocking_tx,
blocking_pool,
region_rx,
);
#[cfg(not(has_io_uring))]
let new_result = {
drop(region_rx);
AsyncEventLoop::new(
&config,
handler,
accept_rx,
eventfd,
shutdown_flag,
resolve_rx,
resolve_tx,
resolver,
spawn_rx,
spawn_tx,
spawner,
blocking_rx,
blocking_tx,
blocking_pool,
)
};
#[cfg(has_io_uring)]
let event_loop_result: Result<_, crate::error::Error> = new_result;
#[cfg(not(has_io_uring))]
let event_loop_result: Result<_, crate::error::Error> =
new_result.map_err(crate::error::Error::Io);
let mut event_loop = match event_loop_result {
Ok(el) => {
let _ = startup_tx.send(Ok(()));
el
}
Err(e) => {
let _ = startup_tx.send(Err(()));
return Err(e);
}
};
drop(startup_tx);
event_loop.run()?;
Ok(())
},
)
}
#[allow(clippy::needless_range_loop)]
#[allow(clippy::type_complexity)]
fn launch_inner<F>(self, worker_fn: F) -> LaunchResult
where
F: Fn(
usize,
Config,
Option<crossbeam_channel::Receiver<(RawFd, SocketAddr)>>,
RawFd,
Arc<AtomicBool>,
Option<crossbeam_channel::Receiver<crate::resolver::ResolveResponse>>,
Option<crossbeam_channel::Sender<crate::resolver::ResolveResponse>>,
Option<Arc<crate::resolver::ResolverPool>>,
Option<crossbeam_channel::Receiver<crate::spawner::SpawnResponse>>,
Option<crossbeam_channel::Sender<crate::spawner::SpawnResponse>>,
Option<Arc<crate::spawner::SpawnerPool>>,
Option<crossbeam_channel::Receiver<crate::blocking::BlockingResponse>>,
Option<crossbeam_channel::Sender<crate::blocking::BlockingResponse>>,
Option<Arc<crate::blocking::BlockingPool>>,
crate::region_registry::RegionControlRx,
crossbeam_channel::Sender<Result<(), ()>>,
) -> Result<(), crate::error::Error>
+ Send
+ Clone
+ 'static,
{
let num_threads = if self.config.worker.threads == 0 {
crate::topology::physical_core_count()
} else {
self.config.worker.threads
};
ensure_nofile_limit(self.config.max_connections, num_threads)?;
crate::metrics::init_metadata();
let mut worker_txs = Vec::with_capacity(num_threads);
let mut worker_rxs = Vec::with_capacity(num_threads);
let mut worker_eventfds = Vec::with_capacity(num_threads);
let mut worker_wake_fds = Vec::with_capacity(num_threads);
let mut worker_wake_handles = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let (tx, rx) = crossbeam_channel::bounded::<(RawFd, SocketAddr)>(
self.config.accept_queue_capacity,
);
let (read_fd, wake_handle) =
crate::wakeup::create_wake_fd().map_err(crate::error::Error::Io)?;
worker_txs.push(tx);
worker_rxs.push(rx);
worker_eventfds.push(read_fd);
worker_wake_fds.push(wake_handle.as_wake_fd());
worker_wake_handles.push(wake_handle);
}
let shutdown_flag = Arc::new(AtomicBool::new(false));
let (resolver_pool, resolve_rxs) = if self.config.resolver_threads > 0 {
let pool = Arc::new(crate::resolver::ResolverPool::start(
self.config.resolver_threads,
));
let mut rxs = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let (tx, rx) = crossbeam_channel::unbounded::<crate::resolver::ResolveResponse>();
rxs.push((tx, rx));
}
(Some(pool), Some(rxs))
} else {
(None, None)
};
let (spawner_pool, spawn_rxs) = if self.config.spawner_threads > 0 {
let pool = Arc::new(crate::spawner::SpawnerPool::start(
self.config.spawner_threads,
));
let mut rxs = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let (tx, rx) = crossbeam_channel::unbounded::<crate::spawner::SpawnResponse>();
rxs.push((tx, rx));
}
(Some(pool), Some(rxs))
} else {
(None, None)
};
let (region_txs, region_rxs) = crate::region_registry::build_worker_channels(num_threads);
let mut region_rxs_iter: Vec<_> = region_rxs.into_iter().map(Some).collect();
let (blocking_pool, blocking_rxs) = if self.config.blocking_threads > 0 {
let pool = Arc::new(crate::blocking::BlockingPool::start(
self.config.blocking_threads,
));
let mut rxs = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let (tx, rx) = crossbeam_channel::unbounded::<crate::blocking::BlockingResponse>();
rxs.push((tx, rx));
}
(Some(pool), Some(rxs))
} else {
(None, None)
};
let mut bound_addr: Option<SocketAddr> = None;
let (listen_fd, listen_fd_closed) = if let Some(bind_addr) = self.bind_addr {
let (fd, is_unix) = match bind_addr {
BindAddr::Tcp(addr) => {
let fd = create_listener(addr, self.config.backlog)?;
bound_addr = getsockname_v4_v6(fd);
(fd, false)
}
BindAddr::Unix(ref path) => {
(create_unix_listener(path, self.config.backlog)?, true)
}
};
let closed = Arc::new(AtomicBool::new(false));
let acceptor_config = AcceptorConfig {
listen_fd: fd,
worker_channels: worker_txs,
worker_wake_handles: worker_wake_fds.clone(),
shutdown_flag: shutdown_flag.clone(),
tcp_nodelay: if is_unix {
false
} else {
self.config.tcp_nodelay
},
#[cfg(feature = "timestamps")]
timestamps: self.config.timestamps,
conn_chunk_size: self.config.conn_chunk_size,
};
let acceptor_closed = closed.clone();
thread::Builder::new()
.name("ringline-acceptor".to_string())
.spawn(move || {
run_acceptor(acceptor_config);
if !acceptor_closed.swap(true, Ordering::AcqRel) {
unsafe {
libc::close(fd);
}
}
})
.map_err(crate::error::Error::Io)?;
(Some(fd), Some(closed))
} else {
drop(worker_txs);
(None, None)
};
let mut handles = Vec::with_capacity(num_threads);
let has_acceptor = listen_fd.is_some();
let (startup_tx, startup_rx) = crossbeam_channel::bounded::<Result<(), ()>>(num_threads);
for worker_id in 0..num_threads {
let config = self.config.clone();
let rx = worker_rxs.remove(0);
let eventfd = worker_eventfds[worker_id];
let shutdown_flag = shutdown_flag.clone();
let worker_fn = worker_fn.clone();
let startup_tx = startup_tx.clone();
let (worker_resolve_rx, worker_resolve_tx, worker_resolver) =
if let Some(ref rxs) = resolve_rxs {
let (ref tx, ref rx) = rxs[worker_id];
(Some(rx.clone()), Some(tx.clone()), resolver_pool.clone())
} else {
(None, None, None)
};
let (worker_spawn_rx, worker_spawn_tx, worker_spawner) =
if let Some(ref rxs) = spawn_rxs {
let (ref tx, ref rx) = rxs[worker_id];
(Some(rx.clone()), Some(tx.clone()), spawner_pool.clone())
} else {
(None, None, None)
};
let (worker_blocking_rx, worker_blocking_tx, worker_blocking_pool) =
if let Some(ref rxs) = blocking_rxs {
let (ref tx, ref rx) = rxs[worker_id];
(Some(rx.clone()), Some(tx.clone()), blocking_pool.clone())
} else {
(None, None, None)
};
let worker_region_rx = region_rxs_iter[worker_id]
.take()
.expect("region rx already consumed");
let handle = thread::Builder::new()
.name(format!("ringline-worker-{worker_id}"))
.spawn(move || {
if config.worker.pin_to_core {
let core = config.worker.core_offset + worker_id;
if let Err(e) = pin_to_core(core) {
let _ = startup_tx.send(Err(()));
return Err(e);
}
}
metriken::set_thread_shard(worker_id);
let accept_rx = if has_acceptor { Some(rx) } else { None };
worker_fn(
worker_id,
config,
accept_rx,
eventfd,
shutdown_flag,
worker_resolve_rx,
worker_resolve_tx,
worker_resolver,
worker_spawn_rx,
worker_spawn_tx,
worker_spawner,
worker_blocking_rx,
worker_blocking_tx,
worker_blocking_pool,
worker_region_rx,
startup_tx,
)
})
.map_err(crate::error::Error::Io)?;
handles.push(handle);
}
drop(startup_tx);
let mut setup_failed = false;
for _ in 0..num_threads {
match startup_rx.recv() {
Ok(Ok(())) => {}
Ok(Err(())) | Err(_) => {
setup_failed = true;
break;
}
}
}
if setup_failed {
shutdown_flag.store(true, Ordering::SeqCst);
for w in &worker_wake_fds {
w.wake();
}
if let (Some(fd), Some(closed)) = (listen_fd, listen_fd_closed.as_ref())
&& !closed.swap(true, Ordering::AcqRel)
{
unsafe {
libc::close(fd);
}
}
let mut first_err: Option<crate::error::Error> = None;
for handle in handles {
match handle.join() {
Ok(Err(e)) if first_err.is_none() => first_err = Some(e),
Ok(_) => {}
Err(_panic) => {}
}
}
return Err(first_err.unwrap_or_else(|| {
crate::error::Error::Io(io::Error::other("worker setup failed"))
}));
}
let region_registrar = Arc::new(crate::region_registry::RegionRegistrar::new(
self.config.max_registered_regions,
self.config.registered_regions.len() as u16,
region_txs,
worker_wake_handles.clone(),
));
let shutdown_handle = ShutdownHandle {
shutdown_flag,
worker_wake_handles,
listen_fd,
listen_fd_closed,
bound_addr,
region_registrar,
};
Ok((shutdown_handle, handles))
}
}
fn ensure_nofile_limit(
max_connections: u32,
num_workers: usize,
) -> Result<(), crate::error::Error> {
let mut rlim: libc::rlimit = unsafe { std::mem::zeroed() };
let ret = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) };
if ret != 0 {
return Err(crate::error::Error::Io(io::Error::last_os_error()));
}
let per_worker_overhead: u64 = 8;
let global_overhead: u64 = 64;
let required =
max_connections as u64 + per_worker_overhead * num_workers as u64 + global_overhead;
let soft = rlim.rlim_cur;
let hard = rlim.rlim_max;
if soft >= required {
return Ok(());
}
if hard >= required || hard == libc::RLIM_INFINITY {
let new_soft = if hard == libc::RLIM_INFINITY {
required
} else {
std::cmp::min(required, hard)
};
rlim.rlim_cur = new_soft;
let ret = unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &rlim) };
if ret != 0 {
return Err(crate::error::Error::Io(io::Error::last_os_error()));
}
Ok(())
} else {
Err(crate::error::Error::ResourceLimit(format!(
"RLIMIT_NOFILE too low: need {} but hard limit is {} (soft: {}). \
Raise it with: ulimit -n {}",
required, hard, soft, required
)))
}
}
#[cfg(target_os = "linux")]
fn pin_to_core(core: usize) -> Result<(), crate::error::Error> {
unsafe {
let mut set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_ZERO(&mut set);
libc::CPU_SET(core, &mut set);
let ret = libc::sched_setaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &set);
if ret != 0 {
return Err(crate::error::Error::Io(io::Error::last_os_error()));
}
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn pin_to_core(_core: usize) -> Result<(), crate::error::Error> {
Ok(())
}
fn create_listener(addr: SocketAddr, backlog: i32) -> Result<RawFd, crate::error::Error> {
let domain = if addr.is_ipv4() {
libc::AF_INET
} else {
libc::AF_INET6
};
let fd = unsafe { libc::socket(domain, libc::SOCK_STREAM, 0) };
if fd < 0 {
return Err(crate::error::Error::Io(io::Error::last_os_error()));
}
let optval: libc::c_int = 1;
unsafe {
libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&optval as *const _ as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
);
}
let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
let addr_len = crate::backend::socket_addr_to_sockaddr(addr, &mut storage);
let ret = unsafe { libc::bind(fd, &storage as *const _ as *const libc::sockaddr, addr_len) };
if ret < 0 {
let err = io::Error::last_os_error();
unsafe {
libc::close(fd);
}
return Err(crate::error::Error::Io(err));
}
let ret = unsafe { libc::listen(fd, backlog) };
if ret < 0 {
let err = io::Error::last_os_error();
unsafe {
libc::close(fd);
}
return Err(crate::error::Error::Io(err));
}
Ok(fd)
}
fn create_unix_listener(path: &Path, backlog: i32) -> Result<RawFd, crate::error::Error> {
let _ = std::fs::remove_file(path);
let fd = unsafe { libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0) };
if fd < 0 {
return Err(crate::error::Error::Io(io::Error::last_os_error()));
}
let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
let addr_len = crate::backend::unix_path_to_sockaddr(path, &mut storage);
let ret = unsafe { libc::bind(fd, &storage as *const _ as *const libc::sockaddr, addr_len) };
if ret < 0 {
let err = io::Error::last_os_error();
unsafe {
libc::close(fd);
}
return Err(crate::error::Error::Io(err));
}
let ret = unsafe { libc::listen(fd, backlog) };
if ret < 0 {
let err = io::Error::last_os_error();
unsafe {
libc::close(fd);
}
return Err(crate::error::Error::Io(err));
}
Ok(fd)
}