use std::io;
use std::mem::{self, drop as unlock, take};
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use crate::kqueue::{Event, Shared, UseEvents, cq};
use crate::lock;
#[derive(Clone, Debug)]
pub(crate) struct Submissions {
shared: Arc<Shared>,
}
impl Submissions {
pub(crate) fn new(shared: Shared) -> Submissions {
Submissions {
shared: Arc::new(shared),
}
}
pub(super) fn add<F>(&self, fill_event: F)
where
F: FnOnce(&mut Event),
{
self.submit(ForceSubmit::Normal, fill_event);
}
fn submit<F>(&self, force_submit: ForceSubmit, fill_event: F)
where
F: FnOnce(&mut Event),
{
let shared = &*self.shared;
let mut event: Event = unsafe { mem::zeroed() };
event.0.flags |=
libc::EV_CLEAR | libc::EV_DISPATCH | libc::EV_ENABLE | libc::EV_ADD | libc::EV_RECEIPT;
fill_event(&mut event);
log::trace!(event:?; "registering event");
let mut change_list = lock(&shared.change_list);
change_list.push(event);
if let ForceSubmit::Normal = force_submit
&& (change_list.len() < (shared.max_change_list_size as usize))
{
unlock(change_list); return;
}
let mut changes = take(&mut *change_list);
unlock(change_list);
let ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
log::trace!(changes = changes.len(); "submitting changes");
let events = match force_submit {
ForceSubmit::Normal => UseEvents::UseChangeList,
ForceSubmit::Wakeup => UseEvents::UseChangeListWithWakeUp,
};
shared.kevent(&mut changes, events, Some(&ts));
shared.reuse_change_list(changes);
}
#[allow(clippy::cast_sign_loss)]
pub(crate) fn remove_unsubmitted_events(&self, fd: RawFd) {
let mut change_list = lock(&self.shared.change_list);
change_list.retain(|event| {
!(event.0.ident == fd as _
&& (event.0.filter == libc::EVFILT_READ || event.0.filter == libc::EVFILT_WRITE))
});
unlock(change_list);
}
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn wake(&self) -> io::Result<()> {
if !self.shared.polling.wake() {
return Ok(());
}
self.submit(ForceSubmit::Wakeup, |kevent| {
kevent.0.filter = libc::EVFILT_USER;
kevent.0.flags = libc::EV_ADD;
kevent.0.fflags = libc::NOTE_TRIGGER;
kevent.0.udata = cq::WAKE_USER_DATA;
});
Ok(())
}
pub(crate) fn shared(&self) -> &Shared {
&self.shared
}
pub(crate) fn fd(&self) -> RawFd {
self.shared.kq.as_raw_fd()
}
}
#[derive(Copy, Clone)]
enum ForceSubmit {
Normal,
Wakeup,
}
impl PartialEq for Submissions {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
}
impl Eq for Submissions {}