use std::io;
use std::mem::{self, drop as unlock, take};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::kqueue::{Event, Shared, cq};
use crate::lock;
#[derive(Clone, Debug)]
pub(crate) struct Submissions {
shared: Arc<Shared>,
}
impl Submissions {
pub(crate) fn new(shared: Shared) -> Submissions {
Submissions {
shared: Arc::new(shared),
}
}
pub(super) fn add<F>(&self, fill_event: F)
where
F: FnOnce(&mut Event),
{
self.submit(false, fill_event);
}
fn submit<F>(&self, force_kevent: bool, fill_event: F)
where
F: FnOnce(&mut Event),
{
let shared = &*self.shared;
let mut event: Event = unsafe { mem::zeroed() };
event.0.flags |=
libc::EV_CLEAR | libc::EV_DISPATCH | libc::EV_ENABLE | libc::EV_ADD | libc::EV_RECEIPT;
fill_event(&mut event);
log::trace!(event:?; "registering event");
let mut change_list = lock(&shared.change_list);
change_list.push(event);
if !force_kevent && (change_list.len() < (shared.max_change_list_size as usize)) {
unlock(change_list); return;
}
let mut changes = take(&mut *change_list);
unlock(change_list);
let ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
log::trace!(changes = changes.len(); "submitting changes");
shared.kevent(&mut changes, None, Some(&ts));
shared.reuse_change_list(changes);
}
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn wake(&self) -> io::Result<()> {
if !self.shared.is_polling.load(Ordering::Acquire) {
return Ok(());
}
self.submit(true, |kevent| {
kevent.0.filter = libc::EVFILT_USER;
kevent.0.flags = libc::EV_ADD;
kevent.0.fflags = libc::NOTE_TRIGGER;
kevent.0.udata = cq::WAKE_USER_DATA;
});
Ok(())
}
pub(crate) fn shared(&self) -> &Shared {
&self.shared
}
}