use std::cell::UnsafeCell;
use std::future::Future;
use std::mem::{self, size_of, ManuallyDrop, MaybeUninit};
use std::os::fd::RawFd;
use std::pin::Pin;
use std::process::Child;
use std::task::{self, Poll};
use std::{fmt, io, ptr};
use log::{error, trace};
use crate::cancel::{Cancel, CancelOp, CancelResult};
use crate::fd::{AsyncFd, Descriptor, Direct, File};
use crate::libc::{self, syscall};
use crate::op::{op_future, poll_state, OpState, NO_OFFSET};
use crate::{man_link, QueueFull, SubmissionQueue};
pub fn wait_on(sq: SubmissionQueue, process: &Child, options: libc::c_int) -> WaitId {
wait(sq, WaitOn::Process(process.id()), options)
}
#[doc = man_link!(waitid(2))]
#[doc(alias = "waitid")]
pub fn wait(sq: SubmissionQueue, wait: WaitOn, options: libc::c_int) -> WaitId {
WaitId {
sq,
state: OpState::NotStarted((wait, options)),
info: Some(Box::new(UnsafeCell::new(MaybeUninit::uninit()))),
}
}
#[doc(alias = "idtype")]
#[doc(alias = "idtype_t")]
#[derive(Copy, Clone, Debug)]
pub enum WaitOn {
#[doc(alias = "P_PID")]
Process(libc::id_t),
#[doc(alias = "P_PGID")]
Group(libc::id_t),
#[doc(alias = "P_ALL")]
All,
}
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
pub struct WaitId {
sq: SubmissionQueue,
info: Option<Box<UnsafeCell<MaybeUninit<libc::signalfd_siginfo>>>>,
state: OpState<(WaitOn, libc::c_int)>,
}
unsafe impl Sync for WaitId {}
unsafe impl Send for WaitId {}
impl Future for WaitId {
type Output = io::Result<Box<libc::siginfo_t>>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let op_index = poll_state!(
WaitId,
self.state,
self.sq,
ctx,
|submission, (wait, options)| unsafe {
let (id_type, pid) = match wait {
WaitOn::Process(pid) => (libc::P_PID, pid),
WaitOn::Group(pid) => (libc::P_PGID, pid),
WaitOn::All => (libc::P_ALL, 0), };
let info = self.info.as_ref().unwrap().get().cast();
submission.waitid(pid, id_type, options, info);
}
);
match self.sq.poll_op(ctx, op_index) {
Poll::Ready(result) => {
self.state = OpState::Done;
match result {
Ok((_, _)) => Poll::Ready(Ok(unsafe {
Box::from_raw(Box::into_raw(self.info.take().unwrap()).cast())
})),
Err(err) => Poll::Ready(Err(err)),
}
}
Poll::Pending => Poll::Pending,
}
}
}
impl Cancel for WaitId {
fn try_cancel(&mut self) -> CancelResult {
self.state.try_cancel(&self.sq)
}
fn cancel(&mut self) -> CancelOp {
self.state.cancel(&self.sq)
}
}
impl Drop for WaitId {
fn drop(&mut self) {
if let Some(info) = self.info.take() {
match self.state {
OpState::Running(op_index) => {
let result = self.sq.cancel_op(
op_index,
|| info,
|submission| unsafe {
submission.cancel_op(op_index);
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!("dropped a10::WaitId before canceling it, attempt to cancel failed: {err}");
}
}
OpState::NotStarted((_, _)) | OpState::Done => drop(info),
}
}
}
}
pub struct Signals<D: Descriptor = File> {
fd: AsyncFd<D>,
signals: SignalSet,
}
#[repr(transparent)]
struct SignalSet(libc::sigset_t);
impl Signals {
pub fn from_set(sq: SubmissionQueue, signals: libc::sigset_t) -> io::Result<Signals> {
let signals = SignalSet(signals);
trace!(signals:? = signals; "setting up signal handling");
let fd = libc::syscall!(signalfd(-1, &signals.0, libc::SFD_CLOEXEC))?;
let fd = unsafe { AsyncFd::from_raw_fd(fd, sq) };
sigprocmask(libc::SIG_BLOCK, &signals.0)?;
Ok(Signals { fd, signals })
}
pub fn from_signals<I>(sq: SubmissionQueue, signals: I) -> io::Result<Signals>
where
I: IntoIterator<Item = libc::c_int>,
{
let set = create_sigset(signals)?;
Signals::from_set(sq, set)
}
pub fn for_all_signals(sq: SubmissionQueue) -> io::Result<Signals> {
let mut set: MaybeUninit<libc::sigset_t> = MaybeUninit::uninit();
syscall!(sigfillset(set.as_mut_ptr()))?;
let set = unsafe { set.assume_init() };
Signals::from_set(sq, set)
}
pub fn to_direct_descriptor(self) -> ToSignalsDirect {
let fd = self.fd.fd();
ToSignalsDirect {
signals: ManuallyDrop::new(self),
direct_fd: ManuallyDrop::new(Box::new(UnsafeCell::new(fd))),
state: OpState::NotStarted(()),
}
}
}
impl<D: Descriptor> Signals<D> {
pub fn receive<'fd>(&'fd self) -> ReceiveSignal<'fd, D> {
let info = Box::new(MaybeUninit::uninit());
ReceiveSignal::new(&self.fd, info, ())
}
pub fn receive_signals(self) -> ReceiveSignals<D> {
ReceiveSignals {
signals: self,
info: ManuallyDrop::new(Box::new(unsafe { mem::zeroed() })),
state: OpState::NotStarted(()),
}
}
}
impl<D: Descriptor> fmt::Debug for Signals<D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Signals")
.field("fd", &self.fd)
.field("signals", &self.signals)
.finish()
}
}
fn create_sigset<I: IntoIterator<Item = libc::c_int>>(signals: I) -> io::Result<libc::sigset_t> {
let mut set: MaybeUninit<libc::sigset_t> = MaybeUninit::uninit();
syscall!(sigemptyset(set.as_mut_ptr()))?;
let mut set = unsafe { set.assume_init() };
for signal in signals {
syscall!(sigaddset(&mut set, signal))?;
}
Ok(set)
}
const KNOWN_SIGNALS: [(libc::c_int, &str); 33] = [
(libc::SIGHUP, "SIGHUP"),
(libc::SIGINT, "SIGINT"),
(libc::SIGQUIT, "SIGQUIT"),
(libc::SIGILL, "SIGILL"),
(libc::SIGTRAP, "SIGTRAP"),
(libc::SIGABRT, "SIGABRT"),
(libc::SIGIOT, "SIGIOT"),
(libc::SIGBUS, "SIGBUS"),
(libc::SIGFPE, "SIGFPE"),
(libc::SIGKILL, "SIGKILL"),
(libc::SIGUSR1, "SIGUSR1"),
(libc::SIGSEGV, "SIGSEGV"),
(libc::SIGUSR2, "SIGUSR2"),
(libc::SIGPIPE, "SIGPIPE"),
(libc::SIGALRM, "SIGALRM"),
(libc::SIGTERM, "SIGTERM"),
(libc::SIGSTKFLT, "SIGSTKFLT"),
(libc::SIGCHLD, "SIGCHLD"),
(libc::SIGCONT, "SIGCONT"),
(libc::SIGSTOP, "SIGSTOP"),
(libc::SIGTSTP, "SIGTSTP"),
(libc::SIGTTIN, "SIGTTIN"),
(libc::SIGTTOU, "SIGTTOU"),
(libc::SIGURG, "SIGURG"),
(libc::SIGXCPU, "SIGXCPU"),
(libc::SIGXFSZ, "SIGXFSZ"),
(libc::SIGVTALRM, "SIGVTALRM"),
(libc::SIGPROF, "SIGPROF"),
(libc::SIGWINCH, "SIGWINCH"),
(libc::SIGIO, "SIGIO"),
(libc::SIGPOLL, "SIGPOLL"), (libc::SIGPWR, "SIGPWR"),
(libc::SIGSYS, "SIGSYS"),
];
impl fmt::Debug for SignalSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let signals = KNOWN_SIGNALS.into_iter().filter_map(|(signal, name)| {
(unsafe { libc::sigismember(&self.0, signal) } == 1).then_some(name)
});
f.debug_list().entries(signals).finish()
}
}
impl<D: Descriptor> Drop for Signals<D> {
fn drop(&mut self) {
if let Err(err) = sigprocmask(libc::SIG_UNBLOCK, &self.signals.0) {
error!(signals:? = self.signals; "error unblocking signals: {err}");
}
}
}
fn sigprocmask(how: libc::c_int, set: &libc::sigset_t) -> io::Result<()> {
libc::syscall!(pthread_sigmask(how, set, ptr::null_mut()))?;
Ok(())
}
#[derive(Debug)]
pub struct ToSignalsDirect {
signals: ManuallyDrop<Signals<File>>,
direct_fd: ManuallyDrop<Box<UnsafeCell<RawFd>>>,
state: OpState<()>,
}
impl Future for ToSignalsDirect {
type Output = io::Result<Signals<Direct>>;
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = Pin::get_mut(self);
let op_index = poll_state!(
ToSignalsDirect,
this.state,
this.signals.fd.sq,
ctx,
|submission, ()| unsafe {
submission.create_direct_descriptor(this.direct_fd.get(), 1);
}
);
match this.signals.fd.sq.poll_op(ctx, op_index) {
Poll::Ready(Ok((_, res))) => {
this.state = OpState::Done;
debug_assert!(res == 1);
let sq = this.signals.fd.sq.clone();
let direct_fd = unsafe {
let direct_fd = ptr::read(this.direct_fd.get());
AsyncFd::from_direct_fd(direct_fd, sq)
};
let direct_signals = Signals {
fd: direct_fd,
signals: unsafe { ptr::read(&this.signals.signals) },
};
unsafe { ptr::drop_in_place(&mut this.signals.fd) };
Poll::Ready(Ok(direct_signals))
}
Poll::Ready(Err(err)) => {
this.state = OpState::Done;
unsafe { ManuallyDrop::drop(&mut this.signals) }
Poll::Ready(Err(err))
}
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for ToSignalsDirect {
fn drop(&mut self) {
match self.state {
OpState::Running(op_index) => {
let direct_fd = unsafe { ManuallyDrop::take(&mut self.direct_fd) };
let signals = unsafe { ManuallyDrop::take(&mut self.signals) };
let result = self.signals.fd.sq.cancel_op(
op_index,
|| Box::from((signals, direct_fd)),
|submission| unsafe {
submission.cancel_op(op_index);
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!(
"dropped a10::ToSignalsDirect before canceling it, attempt to cancel failed: {err}"
);
}
}
OpState::NotStarted(()) => unsafe {
ManuallyDrop::drop(&mut self.signals);
ManuallyDrop::drop(&mut self.direct_fd);
},
OpState::Done => unsafe {
ManuallyDrop::drop(&mut self.direct_fd);
},
}
}
}
op_future! {
fn Signals::receive -> Box<libc::signalfd_siginfo>,
struct ReceiveSignal<'fd> {
info: Box<MaybeUninit<libc::signalfd_siginfo>>,
},
setup_state: _unused: (),
setup: |submission, fd, (info,), _unused| unsafe {
let ptr = (**info).as_mut_ptr().cast();
submission.read_at(fd.fd(), ptr, size_of::<libc::signalfd_siginfo>() as u32, NO_OFFSET);
submission.set_async();
D::use_flags(submission);
},
map_result: |this, (info,), n| {
#[allow(clippy::cast_sign_loss)] { debug_assert_eq!(n as usize, size_of::<libc::signalfd_siginfo>()) };
Ok(unsafe { Box::from_raw(Box::into_raw(info).cast()) })
},
}
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct ReceiveSignals<D: Descriptor = File> {
signals: Signals<D>,
info: ManuallyDrop<Box<UnsafeCell<libc::signalfd_siginfo>>>,
state: OpState<()>,
}
unsafe impl<D: Descriptor + Sync> Sync for ReceiveSignals<D> {}
unsafe impl<D: Descriptor + Send> Send for ReceiveSignals<D> {}
impl<D: Descriptor> ReceiveSignals<D> {
pub fn poll_signal<'a>(
&'a mut self,
ctx: &mut task::Context<'_>,
) -> Poll<Option<io::Result<&'a libc::signalfd_siginfo>>> {
let ReceiveSignals {
signals,
info,
state,
} = self;
let op_index = match state {
OpState::Running(op_index) => *op_index,
OpState::NotStarted(()) => {
let result = signals.fd.sq.add(|submission| unsafe {
submission.read_at(
signals.fd.fd(),
info.get().cast(),
size_of::<libc::signalfd_siginfo>() as u32,
NO_OFFSET,
);
submission.set_async();
D::use_flags(submission);
});
match result {
Ok(op_index) => {
*state = OpState::Running(op_index);
op_index
}
Err(QueueFull(())) => {
signals.fd.sq.wait_for_submission(ctx.waker().clone());
return Poll::Pending;
}
}
}
OpState::Done => return Poll::Ready(None),
};
match signals.fd.sq.poll_op(ctx, op_index) {
Poll::Ready(Ok((_, n))) => {
*state = OpState::Done;
*state = OpState::NotStarted(());
#[allow(clippy::cast_sign_loss)] {
debug_assert_eq!(n as usize, size_of::<libc::signalfd_siginfo>());
}
Poll::Ready(Some(Ok(unsafe { &*info.get() })))
}
Poll::Ready(Err(err)) => {
*state = OpState::Done; Poll::Ready(Some(Err(err)))
}
Poll::Pending => Poll::Pending,
}
}
pub fn into_inner(self) -> Signals<D> {
let mut this = ManuallyDrop::new(self);
unsafe {
this._drop();
ptr::read(&this.signals)
}
}
unsafe fn _drop(&mut self) {
let signal_info = unsafe { ManuallyDrop::take(&mut self.info) };
match self.state {
OpState::Running(op_index) => {
let result = self.signals.fd.sq.cancel_op(
op_index,
|| signal_info,
|submission| unsafe {
submission.cancel_op(op_index);
submission.no_completion_event();
},
);
if let Err(err) = result {
log::error!(
"dropped a10::ReceiveSignals before canceling it, attempt to cancel failed: {err}"
);
}
}
OpState::NotStarted(()) | OpState::Done => drop(signal_info),
}
}
}
#[allow(clippy::missing_fields_in_debug)]
impl<D: Descriptor> fmt::Debug for ReceiveSignals<D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReceiveSignals")
.field("signals", &self.signals)
.field("state", &self.state)
.finish()
}
}
impl<D: Descriptor> Drop for ReceiveSignals<D> {
fn drop(&mut self) {
unsafe { self._drop() }
}
}