#![cfg(any(target_os="android", target_os="l4re", target_os="linux"))]
#![doc(cfg(any(target_os="android", target_os="l4re", target_os="linux")))]
use {
core::{
ffi::CStr,
ptr,
sync::atomic::AtomicBool,
time::Duration,
},
alloc::{
collections::{
BTreeMap,
btree_map::Entry,
},
sync::Arc,
},
std::{
io::{Error, Write},
os::{
fd::{AsRawFd, RawFd},
unix::net::UnixListener,
},
thread,
},
crate::{
Result,
SLEEP_DURATION,
UdsxUnixStream,
linux::{Cmd, STREAM_ID},
print_err,
},
super::ATOMIC_ORDERING,
libc::{EPOLLIN, EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, epoll_event},
};
const EVENTS_BUF_SIZE: usize = 999;
struct Data {
listener: UnixListener,
clonable: bool,
stop_flag: Arc<AtomicBool>,
event: epoll_event,
}
impl Data {
fn make(listener: UnixListener, clonable: bool, stop_flag: Arc<AtomicBool>) -> Result<Self> {
let fd = listener.as_raw_fd();
Ok(Self {
listener,
clonable,
stop_flag,
event: epoll_event {
events: EPOLLIN.try_into().map_err(|_| err!())?,
u64: fd.try_into().map_err(|_| err!())?,
},
})
}
}
pub (super) struct Server {
epoll_fd: RawFd,
map: BTreeMap<RawFd, Data>,
events: [epoll_event; EVENTS_BUF_SIZE],
}
impl Server {
pub fn make() -> Result<Self> {
Ok(Self {
epoll_fd: match unsafe { libc::epoll_create1(EPOLL_CLOEXEC) } {
-1 => return Err(fmt_err_code("epoll_create1")),
fd => fd,
},
map: BTreeMap::new(),
events: [epoll_event {
events: EPOLLIN.try_into().map_err(|_| err!())?,
u64: u64::MIN,
}; EVENTS_BUF_SIZE],
})
}
pub fn push(&mut self, listener: UnixListener, clonable: bool, stop_flag: Arc<AtomicBool>) -> Result<()> {
listener.set_nonblocking(true)?;
let fd = listener.as_raw_fd();
match self.map.entry(fd) {
Entry::Vacant(vacant) => {
let mut data = Data::make(listener, clonable, stop_flag)?;
match unsafe { libc::epoll_ctl(self.epoll_fd, EPOLL_CTL_ADD, data.listener.as_raw_fd(), ptr::from_mut(&mut data.event)) } {
-1 => Err(fmt_err_code("epoll_ctl")),
_ => {
vacant.insert(data);
Ok(())
},
}
},
Entry::Occupied(_) => Err(err!("Duplicate: {listener:?}")),
}
}
pub fn check(&mut self) -> Result<()> {
self.process_stop_flags();
if self.map.is_empty() {
thread::sleep(SLEEP_DURATION);
return Ok(());
}
let count = match unsafe {
libc::epoll_wait(self.epoll_fd, self.events.as_mut_ptr(), self.events.len().try_into().map_err(|_| err!())?, sleep_duration!())
} {
-1 => return Err(fmt_err_code("epoll_wait")),
count => self.events.len().min(count.try_into().map_err(|_| err!())?),
};
for event in &self.events[..count] {
let fd = RawFd::try_from(event.u64).map_err(|_| err!())?;
let Entry::Occupied(occupied) = self.map.entry(fd) else {
continue;
};
if let Err(err) = (|| {
const RW_TIMEOUT: Duration = Duration::from_millis(512);
let (mut stream, _) = occupied.get().listener.accept()?;
stream.set_read_timeout(Some(RW_TIMEOUT))?;
stream.set_write_timeout(Some(RW_TIMEOUT))?;
match Cmd::decode(&mut stream)? {
Cmd::DuplicateServerFd => if occupied.get().clonable {
stream.send_streams(STREAM_ID, [fd])?;
stream.flush()?;
},
};
Result::Ok(())
})() {
print_err(__!("Failed processing new client of {fd}: {err}"));
}
}
Ok(())
}
fn process_stop_flags(&mut self) {
for (fd, _) in self.map.extract_if(.., |_, data| data.stop_flag.load(ATOMIC_ORDERING)) {
unsafe {
libc::epoll_ctl(self.epoll_fd, EPOLL_CTL_DEL, fd, ptr::null_mut());
}
}
}
}
fn fmt_err_code<S>(fn_name: S) -> Error where S: AsRef<str> {
let err_code = unsafe {
#[cfg(not(target_os="android"))] {
*libc::__errno_location()
}
#[cfg(target_os="android")] {
*libc::__errno()
}
};
err!(
"libc::{fn_name}() failed: {err_code}: {err:?}",
fn_name=fn_name.as_ref(),
err=unsafe { CStr::from_ptr(libc::strerror(err_code)) },
)
}