use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::{ptr, task};
use crate::fd::{self, AsyncFd};
use crate::{kqueue, lock, syscall};
#[derive(Debug)]
pub(crate) struct State {
shared: AtomicPtr<SharedState>,
}
pub(super) type SharedState = Mutex<OpState>;
impl State {
pub(crate) const fn new() -> State {
State {
shared: AtomicPtr::new(ptr::null_mut()),
}
}
pub(super) fn lock<'a>(&'a self) -> MutexGuard<'a, OpState> {
let mut ptr = self.shared.load(Ordering::Acquire);
if ptr.is_null() {
let op_state = Box::new(Mutex::new(OpState {
ops: Vec::with_capacity(1),
}));
let state_ptr = Box::into_raw(op_state);
let res = self.shared.compare_exchange(
ptr::null_mut(),
state_ptr,
Ordering::AcqRel,
Ordering::Acquire,
);
match res {
Ok(old_ptr) => {
debug_assert!(old_ptr.is_null());
ptr = state_ptr;
}
Err(old_ptr) => {
debug_assert!(!old_ptr.is_null());
debug_assert!(old_ptr != state_ptr);
ptr = old_ptr;
drop(unsafe { Box::from_raw(state_ptr) });
}
}
}
lock(unsafe { &*ptr })
}
pub(super) fn as_udata(&self) -> *mut libc::c_void {
self.shared.load(Ordering::Relaxed).cast()
}
}
#[derive(Debug)]
pub(super) struct OpState {
ops: Vec<(OpKind, task::Waker)>,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub(crate) enum OpKind {
Read,
Write,
}
impl OpState {
pub(crate) fn has_waiting_op(&self, op: OpKind) -> bool {
self.ops.iter().any(|(kind, _)| op == *kind)
}
pub(crate) fn add(&mut self, op: OpKind, waker: task::Waker) {
self.ops.push((op, waker));
}
pub(crate) fn wake(&mut self, event: &kqueue::Event) {
self.ops
.extract_if(.., |(filter, _)| match filter {
OpKind::Read => event.0.filter == libc::EVFILT_READ,
OpKind::Write => event.0.filter == libc::EVFILT_WRITE,
})
.for_each(|(_, waker)| waker.wake());
}
}
impl Drop for AsyncFd {
fn drop(&mut self) {
let result = match self.kind() {
fd::Kind::File => syscall!(close(self.fd())).map(|_| ()),
};
if let Err(err) = result {
log::warn!("error closing a10::AsyncFd: {err}");
}
let ptr = *self.state.shared.get_mut();
if ptr.is_null() {
return;
}
self.sq.submissions().add(|event| {
event.0.ident = ptr as _;
event.0.filter = libc::EVFILT_USER;
});
}
}