use std::borrow::{Borrow, BorrowMut};
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::io::Error;
use std::mem::MaybeUninit;
use std::os::fd::{AsFd, OwnedFd};
use std::os::unix::io::AsRawFd;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use libc::{self, c_int};
use super::exfiltrator::Exfiltrator;
use crate::low_level::pipe::{self, WakeMethod};
use crate::SigId;
const MAX_SIGNUM: usize = 128;
trait SelfPipeWrite: Debug + Send + Sync {
fn wake_readers(&self);
}
impl<W: AsFd + Debug + Send + Sync> SelfPipeWrite for W {
fn wake_readers(&self) {
pipe::wake(self.as_fd(), WakeMethod::Send);
}
}
#[derive(Debug)]
struct DeliveryState {
closed: AtomicBool,
registered_signal_ids: Mutex<Vec<Option<SigId>>>,
}
impl DeliveryState {
fn new() -> Self {
let ids = (0..MAX_SIGNUM).map(|_| None).collect();
Self {
closed: AtomicBool::new(false),
registered_signal_ids: Mutex::new(ids),
}
}
}
impl Drop for DeliveryState {
fn drop(&mut self) {
let lock = self.registered_signal_ids.lock().unwrap();
for id in lock.iter().filter_map(|s| *s) {
crate::low_level::unregister(id);
}
}
}
struct PendingSignals<E: Exfiltrator> {
exfiltrator: E,
slots: [E::Storage; MAX_SIGNUM],
}
impl<E: Exfiltrator> PendingSignals<E> {
fn new(exfiltrator: E) -> Self {
let mut slots = MaybeUninit::<[E::Storage; MAX_SIGNUM]>::uninit();
for i in 0..MAX_SIGNUM {
unsafe {
let slot: *mut E::Storage = slots.as_mut_ptr() as *mut _;
let slot = slot.add(i);
ptr::write(slot, E::Storage::default());
}
}
Self {
exfiltrator,
slots: unsafe { slots.assume_init() },
}
}
}
trait AddSignal: Debug + Send + Sync {
fn add_signal(
self: Arc<Self>,
write: Arc<dyn SelfPipeWrite>,
signal: c_int,
) -> Result<SigId, Error>;
}
impl<E: Exfiltrator> Debug for PendingSignals<E> {
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
fmt.debug_struct("PendingSignals")
.field("exfiltrator", &self.exfiltrator)
.field("slots", &&self.slots[..])
.finish()
}
}
impl<E: Exfiltrator> AddSignal for PendingSignals<E> {
fn add_signal(
self: Arc<Self>,
write: Arc<dyn SelfPipeWrite>,
signal: c_int,
) -> Result<SigId, Error> {
assert!(signal >= 0);
assert!(
(signal as usize) < MAX_SIGNUM,
"Signal number {} too large. If your OS really supports such signal, file a bug",
signal,
);
assert!(
self.exfiltrator.supports_signal(signal),
"Signal {} not supported by exfiltrator {:?}",
signal,
self.exfiltrator,
);
self.exfiltrator.init(&self.slots[signal as usize], signal);
let action = move |act: &_| {
let slot = &self.slots[signal as usize];
let ex = &self.exfiltrator;
ex.store(slot, signal, act);
write.wake_readers();
};
let id = unsafe { signal_hook_registry::register_sigaction(signal, action) }?;
Ok(id)
}
}
#[derive(Debug, Clone)]
pub struct Handle {
pending: Arc<dyn AddSignal>,
write: Arc<dyn SelfPipeWrite>,
delivery_state: Arc<DeliveryState>,
}
impl Handle {
fn new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self
where
W: 'static + SelfPipeWrite,
{
Self {
pending,
write: Arc::new(write),
delivery_state: Arc::new(DeliveryState::new()),
}
}
pub fn add_signal(&self, signal: c_int) -> Result<(), Error> {
let mut lock = self.delivery_state.registered_signal_ids.lock().unwrap();
if lock[signal as usize].is_some() {
return Ok(());
}
let id = Arc::clone(&self.pending).add_signal(Arc::clone(&self.write), signal)?;
lock[signal as usize] = Some(id);
Ok(())
}
pub fn close(&self) {
self.delivery_state.closed.store(true, Ordering::SeqCst);
self.write.wake_readers();
}
pub fn is_closed(&self) -> bool {
self.delivery_state.closed.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub struct SignalDelivery<R, E: Exfiltrator> {
read: R,
handle: Handle,
pending: Arc<PendingSignals<E>>,
}
impl<R, E: Exfiltrator> SignalDelivery<R, E>
where
R: 'static + AsRawFd + Send + Sync,
{
pub fn with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error>
where
I: IntoIterator<Item = S>,
S: Borrow<c_int>,
W: 'static + Into<OwnedFd> + Debug + Send + Sync,
{
let pending = Arc::new(PendingSignals::new(exfiltrator));
let pending_add_signal = Arc::clone(&pending);
let handle = Handle::new(write.into(), pending_add_signal);
let me = Self {
read,
handle,
pending,
};
for sig in signals {
me.handle.add_signal(*sig.borrow())?;
}
Ok(me)
}
pub fn get_read(&self) -> &R {
&self.read
}
pub fn get_read_mut(&mut self) -> &mut R {
&mut self.read
}
fn flush(&mut self) {
const SIZE: usize = 1024;
let mut buff = [0u8; SIZE];
unsafe {
#[cfg(target_os = "aix")]
let nowait_flag = libc::MSG_NONBLOCK;
#[cfg(not(target_os = "aix"))]
let nowait_flag = libc::MSG_DONTWAIT;
while libc::recv(
self.read.as_raw_fd(),
buff.as_mut_ptr() as *mut libc::c_void,
SIZE,
nowait_flag,
) > 0
{}
}
}
pub fn pending(&mut self) -> Pending<E> {
self.flush();
Pending::new(Arc::clone(&self.pending))
}
pub fn poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error>
where
F: FnMut(&mut R) -> Result<bool, Error>,
{
if self.handle.is_closed() {
return Ok(None);
}
match has_signals(self.get_read_mut()) {
Ok(false) => Ok(None),
Ok(true) => Ok(Some(self.pending())),
Err(err) => Err(err),
}
}
pub fn handle(&self) -> Handle {
self.handle.clone()
}
}
#[derive(Debug)]
pub struct Pending<E: Exfiltrator> {
pending: Arc<PendingSignals<E>>,
position: usize,
}
impl<E: Exfiltrator> Pending<E> {
fn new(pending: Arc<PendingSignals<E>>) -> Self {
Self {
pending,
position: 0,
}
}
}
impl<E: Exfiltrator> Iterator for Pending<E> {
type Item = E::Output;
fn next(&mut self) -> Option<E::Output> {
while self.position < self.pending.slots.len() {
let sig = self.position;
let slot = &self.pending.slots[sig];
let result = self.pending.exfiltrator.load(slot, sig as c_int);
if result.is_some() {
return result;
} else {
self.position += 1;
}
}
None
}
}
pub enum PollResult<O> {
Signal(O),
Pending,
Closed,
Err(Error),
}
pub struct SignalIterator<SD, E: Exfiltrator> {
signals: SD,
iter: Pending<E>,
}
impl<SD, E: Exfiltrator> SignalIterator<SD, E> {
pub fn new<R>(mut signals: SD) -> Self
where
SD: BorrowMut<SignalDelivery<R, E>>,
R: 'static + AsRawFd + Send + Sync,
{
let iter = signals.borrow_mut().pending();
Self { signals, iter }
}
pub fn poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output>
where
SD: BorrowMut<SignalDelivery<R, E>>,
R: 'static + AsRawFd + Send + Sync,
F: FnMut(&mut R) -> Result<bool, Error>,
{
while !self.signals.borrow_mut().handle.is_closed() {
if let Some(result) = self.iter.next() {
return PollResult::Signal(result);
}
match self.signals.borrow_mut().poll_pending(has_signals) {
Ok(Some(pending)) => self.iter = pending,
Ok(None) => return PollResult::Pending,
Err(err) => return PollResult::Err(err),
}
}
PollResult::Closed
}
pub fn handle<R>(&self) -> Handle
where
SD: Borrow<SignalDelivery<R, E>>,
R: 'static + AsRawFd + Send + Sync,
{
self.signals.borrow().handle()
}
}
pub type OwningSignalIterator<R, E> = SignalIterator<SignalDelivery<R, E>, E>;
pub type RefSignalIterator<'a, R, E> = SignalIterator<&'a mut SignalDelivery<R, E>, E>;