use crate::{
iou::sqe::{SockAddr, SockAddrStorage},
sys::{
DmaBuffer,
IoBuffer,
OsResult,
PollableStatus,
ReactorQueue,
SourceId,
TimeSpec64,
Wakers,
},
GlommioError,
IoRequirements,
ReactorErrorKind,
RingIoStats,
TaskQueueHandle,
};
use futures_lite::{future, io};
use std::{
cell::{Ref, RefCell, RefMut},
convert::TryFrom,
ffi::CString,
fmt,
mem::MaybeUninit,
os::unix::io::RawFd,
path::PathBuf,
pin::Pin,
rc::Rc,
task::{Poll, Waker},
time::Duration,
};
#[derive(Debug)]
pub(crate) enum SourceType {
Write(PollableStatus, IoBuffer),
Read(PollableStatus, Option<IoBuffer>),
PollAdd,
SockSend(DmaBuffer),
SockRecv(Option<DmaBuffer>),
SockRecvMsg(
Option<DmaBuffer>,
libc::iovec,
libc::msghdr,
MaybeUninit<nix::sys::socket::sockaddr_storage>,
),
SockSendMsg(
DmaBuffer,
libc::iovec,
libc::msghdr,
nix::sys::socket::SockAddr,
),
Open(CString),
FdataSync,
Fallocate,
Truncate,
Close,
LinkRings,
ForeignNotifier(u64, bool),
Statx(CString, Box<RefCell<libc::statx>>),
Timeout(TimeSpec64, u32),
Connect(SockAddr),
Accept(SockAddrStorage),
Rename(PathBuf, PathBuf),
CreateDir(PathBuf),
Remove(PathBuf),
BlockingFn,
Invalid,
#[cfg(feature = "bench")]
Noop,
}
impl TryFrom<SourceType> for libc::statx {
type Error = GlommioError<()>;
fn try_from(value: SourceType) -> Result<Self, Self::Error> {
match value {
SourceType::Statx(_, buf) => Ok(buf.into_inner()),
_ => Err(GlommioError::ReactorError(
ReactorErrorKind::IncorrectSourceType,
)),
}
}
}
#[derive(PartialEq, Clone, Copy)]
pub(crate) enum EnqueuedStatus {
Enqueued,
Canceled,
Dispatched,
}
#[derive(Clone)]
pub struct EnqueuedSource {
pub(crate) id: SourceId,
pub(crate) queue: ReactorQueue,
pub(crate) status: EnqueuedStatus,
}
pub(crate) type StatsCollectionFn = fn(&io::Result<usize>, &mut RingIoStats, waiters: u64) -> ();
pub(crate) type LatencyCollectionFn =
fn(std::time::Duration, std::time::Duration, std::time::Duration, &mut RingIoStats) -> ();
#[derive(Copy, Clone)]
pub(crate) struct StatsCollection {
pub(crate) fulfilled: Option<StatsCollectionFn>,
pub(crate) reused: Option<StatsCollectionFn>,
pub(crate) latency: Option<LatencyCollectionFn>,
}
pub(crate) struct InnerSource {
pub(crate) raw: RawFd,
pub(crate) wakers: Wakers,
pub(crate) source_type: SourceType,
pub(crate) io_requirements: IoRequirements,
pub(crate) timeout: Option<TimeSpec64>,
pub(crate) enqueued: Option<EnqueuedSource>,
pub(crate) stats_collection: Option<StatsCollection>,
pub(crate) task_queue: Option<TaskQueueHandle>,
}
impl InnerSource {
pub(crate) fn update_source_type(&mut self, source_type: SourceType) -> SourceType {
std::mem::replace(&mut self.source_type, source_type)
}
}
impl fmt::Debug for InnerSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InnerSource")
.field("raw", &self.raw)
.field("wakers", &self.wakers)
.field("source_type", &self.source_type)
.field("io_requirements", &self.io_requirements)
.finish()
}
}
#[derive(Debug)]
pub struct Source {
pub(crate) inner: Pin<Rc<RefCell<InnerSource>>>,
}
impl Source {
pub(crate) fn new(
ioreq: IoRequirements,
raw: RawFd,
source_type: SourceType,
stats_collection: Option<StatsCollection>,
task_queue: Option<TaskQueueHandle>,
) -> Source {
Source {
inner: Rc::pin(RefCell::new(InnerSource {
raw,
wakers: Wakers::new(),
source_type,
io_requirements: ioreq,
enqueued: None,
timeout: None,
stats_collection,
task_queue,
})),
}
}
pub(crate) fn set_timeout(&self, d: Duration) -> Option<Duration> {
let mut inner = self.inner.borrow_mut();
let t = &mut inner.timeout;
let old = *t;
*t = Some(TimeSpec64::from(d));
old.map(Duration::from)
}
pub(super) fn timeout_ref(&self) -> Ref<'_, Option<TimeSpec64>> {
Ref::map(self.inner.borrow(), |x| &x.timeout)
}
pub(super) fn source_type(&self) -> Ref<'_, SourceType> {
Ref::map(self.inner.borrow(), |x| &x.source_type)
}
pub(crate) fn source_type_mut(&self) -> RefMut<'_, SourceType> {
RefMut::map(self.inner.borrow_mut(), |x| &mut x.source_type)
}
pub(crate) fn extract_source_type(self) -> SourceType {
self.inner
.borrow_mut()
.update_source_type(SourceType::Invalid)
}
pub(crate) fn extract_buffer(self) -> IoBuffer {
let stype = self.extract_source_type();
match stype {
SourceType::Read(_, Some(buffer)) => buffer,
SourceType::Write(_, buffer) => buffer,
x => panic!("Could not extract buffer. Source: {:?}", x),
}
}
pub(crate) fn buffer(&self) -> Ref<'_, IoBuffer> {
Ref::map(self.source_type(), |stype| match &*stype {
SourceType::Read(_, Some(buffer)) => buffer,
SourceType::Write(_, buffer) => buffer,
x => panic!("Could not extract buffer. Source: {:?}", x),
})
}
pub(crate) fn result(&self) -> Option<io::Result<usize>> {
let mut inner = self.inner.borrow_mut();
let ret = inner
.wakers
.result
.as_ref()
.map(|x| OsResult::from(x).into());
if ret.is_none() {
return ret;
}
if let Some(Some(stat_fn)) = inner.stats_collection.as_ref().map(|x| x.latency) {
if let Some(lat) = inner.wakers.timestamps() {
drop(inner);
let pre_lat = lat.submitted_at - lat.queued_at;
let io_lat = lat.fulfilled_at - lat.submitted_at;
let post_lat = lat.fulfilled_at.elapsed();
let reactor = &crate::executor().reactor().sys;
(stat_fn)(
pre_lat,
io_lat,
post_lat,
reactor.ring_for_source(self).io_stats_mut(),
);
(stat_fn)(
pre_lat,
io_lat,
post_lat,
reactor
.ring_for_source(self)
.io_stats_for_task_queue_mut(crate::executor().current_task_queue()),
);
}
}
ret
}
pub(crate) fn add_waiter_single(&self, waker: &Waker) {
let mut inner = self.inner.borrow_mut();
let waiters = &mut inner.wakers.waiters;
match waiters.first_mut() {
Some(w) => {
if !w.will_wake(waker) {
*w = waker.clone();
}
}
None => waiters.push(waker.clone()),
}
debug_assert_eq!(inner.wakers.waiters.len(), 1)
}
pub(crate) fn add_waiter_many(&self, waker: Waker) {
self.inner.borrow_mut().wakers.waiters.push(waker)
}
pub(super) fn is_installed(&self) -> Option<bool> {
match &self.inner.borrow().source_type {
SourceType::ForeignNotifier(_, installed) => Some(*installed),
_ => None,
}
}
pub(super) fn raw(&self) -> RawFd {
self.inner.borrow().raw
}
pub(crate) fn stats_collection(&self) -> Option<StatsCollection> {
self.inner.borrow().stats_collection
}
pub(crate) async fn collect_rw(&self) -> io::Result<usize> {
future::poll_fn(|cx| {
if let Some(result) = self.result() {
return Poll::Ready(result);
}
self.add_waiter_many(cx.waker().clone());
Poll::Pending
})
.await
}
}
impl Drop for Source {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
let enqueued = inner.enqueued.as_mut();
if let Some(EnqueuedSource { id, queue, status }) = enqueued {
match status {
EnqueuedStatus::Enqueued => {
*status = EnqueuedStatus::Canceled;
}
EnqueuedStatus::Dispatched => {
queue.borrow_mut().cancel_request(*id);
*status = EnqueuedStatus::Canceled; }
EnqueuedStatus::Canceled => unreachable!(),
}
}
}
}