use std::{
cell::RefCell,
collections::BTreeMap,
ffi::CString,
fmt,
future::Future,
io,
mem,
os::unix::{ffi::OsStrExt, io::RawFd},
path::Path,
rc::Rc,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::Waker,
time::{Duration, Instant},
};
use ahash::AHashMap;
use log::error;
use nix::sys::socket::{MsgFlags, SockAddr};
use smallvec::SmallVec;
use crate::{
io::{FileScheduler, IoScheduler, ScheduledSource},
iou::sqe::SockAddrStorage,
sys::{
self,
common_flags,
read_flags,
sysfs,
DirectIo,
DmaBuffer,
DmaSource,
IoBuffer,
PollableStatus,
SleepNotifier,
Source,
SourceType,
StatsCollection,
},
IoRequirements,
IoStats,
PoolPlacement,
TaskQueueHandle,
};
use nix::poll::PollFlags;
type SharedChannelWakerChecker = (SmallVec<[Waker; 1]>, Option<Box<dyn Fn() -> usize>>);
struct SharedChannels {
id: u64,
wakers_map: BTreeMap<u64, SharedChannelWakerChecker>,
connection_wakers: Vec<Waker>,
}
impl SharedChannels {
fn new() -> SharedChannels {
SharedChannels {
id: 0,
connection_wakers: Vec::new(),
wakers_map: BTreeMap::new(),
}
}
fn process_shared_channels(&mut self) -> usize {
let mut woke = self.connection_wakers.len();
for waker in self.connection_wakers.drain(..) {
wake!(waker);
}
for (_, (pending, check)) in self.wakers_map.iter_mut() {
if pending.is_empty() {
continue;
}
let room = std::cmp::min(check.as_ref().unwrap()(), pending.len());
for waker in pending.drain(0..room).rev() {
woke += 1;
wake!(waker);
}
}
woke
}
}
struct Timers {
timer_id: u64,
timers_by_id: AHashMap<u64, Instant>,
timers: BTreeMap<(Instant, u64), Waker>,
}
impl Timers {
fn new() -> Timers {
Timers {
timer_id: 0,
timers_by_id: AHashMap::new(),
timers: BTreeMap::new(),
}
}
fn new_id(&mut self) -> u64 {
self.timer_id += 1;
self.timer_id
}
fn remove(&mut self, id: u64) -> Option<Waker> {
if let Some(when) = self.timers_by_id.remove(&id) {
return self.timers.remove(&(when, id));
}
None
}
fn insert(&mut self, id: u64, when: Instant, waker: Waker) {
if let Some(when) = self.timers_by_id.get_mut(&id) {
self.timers.remove(&(*when, id));
}
self.timers_by_id.insert(id, when);
self.timers.insert((when, id), waker);
}
fn process_timers(&mut self) -> (Option<Duration>, usize) {
let now = Instant::now();
let pending = self.timers.split_off(&(now, 0));
let ready = mem::replace(&mut self.timers, pending);
let woke = ready.len();
for (_, waker) in ready {
wake!(waker);
}
let next = self
.timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now));
(next, woke)
}
}
pub(crate) struct Reactor {
pub(crate) sys: sys::Reactor,
timers: RefCell<Timers>,
shared_channels: RefCell<SharedChannels>,
io_scheduler: Rc<IoScheduler>,
record_io_latencies: bool,
preempt_ptr_head: *const u32,
preempt_ptr_tail: *const AtomicU32,
}
impl Reactor {
pub(crate) fn new(
notifier: Arc<SleepNotifier>,
io_memory: usize,
ring_depth: usize,
record_io_latencies: bool,
thread_pool_placement: PoolPlacement,
) -> io::Result<Reactor> {
let sys = sys::Reactor::new(notifier, io_memory, ring_depth, thread_pool_placement)?;
let (preempt_ptr_head, preempt_ptr_tail) = sys.preempt_pointers();
Ok(Reactor {
sys,
timers: RefCell::new(Timers::new()),
shared_channels: RefCell::new(SharedChannels::new()),
io_scheduler: Rc::new(IoScheduler::new()),
record_io_latencies,
preempt_ptr_head,
preempt_ptr_tail: preempt_ptr_tail as _,
})
}
pub(crate) fn io_stats(&self) -> IoStats {
self.sys.io_stats()
}
pub(crate) fn task_queue_io_stats(&self, handle: &TaskQueueHandle) -> Option<IoStats> {
self.sys.task_queue_io_stats(handle)
}
#[inline(always)]
pub(crate) fn need_preempt(&self) -> bool {
unsafe { *self.preempt_ptr_head != (*self.preempt_ptr_tail).load(Ordering::Acquire) }
}
pub(crate) fn id(&self) -> usize {
self.sys.id()
}
pub(crate) fn ring_depth(&self) -> usize {
self.sys.ring_depth()
}
fn new_source(
&self,
raw: RawFd,
stype: SourceType,
stats_collection: Option<StatsCollection>,
) -> Source {
sys::Source::new(
self.io_scheduler.requirements(),
raw,
stype,
stats_collection,
Some(crate::executor().current_task_queue()),
)
}
pub(crate) fn inform_io_requirements(&self, req: IoRequirements) {
self.io_scheduler.inform_requirements(req);
}
pub(crate) async fn probe_iopoll_support(
&self,
raw: RawFd,
alignment: u64,
major: usize,
minor: usize,
path: &Path,
) -> bool {
match sysfs::BlockDevice::iopoll(major, minor) {
None => {
let source =
self.new_source(raw, SourceType::Read(PollableStatus::Pollable, None), None);
self.sys.read_dma(&source, 0, alignment as usize);
let iopoll = if let Err(err) = source.collect_rw().await {
if let Some(libc::ENOTSUP) = err.raw_os_error() {
false
} else {
error!(
"got unexpected error when probing iopoll support for file {:?} (fd: \
{}) hosted on ({}, {}); the poll ring will be disabled for this \
device: {}",
path, raw, major, minor, err
);
false
}
} else {
true
};
sysfs::BlockDevice::set_iopoll_support(major, minor, iopoll);
iopoll
}
Some(iopoll) => iopoll,
}
}
pub(crate) fn register_shared_channel<F>(&self, test_function: Box<F>) -> u64
where
F: Fn() -> usize + 'static,
{
let mut channels = self.shared_channels.borrow_mut();
let id = channels.id;
channels.id += 1;
let ret = channels
.wakers_map
.insert(id, (Default::default(), Some(test_function)));
assert!(ret.is_none());
id
}
pub(crate) fn unregister_shared_channel(&self, id: u64) {
let mut channels = self.shared_channels.borrow_mut();
channels.wakers_map.remove(&id);
}
pub(crate) fn add_shared_channel_connection_waker(&self, waker: Waker) {
let mut channels = self.shared_channels.borrow_mut();
channels.connection_wakers.push(waker);
}
pub(crate) fn add_shared_channel_waker(&self, id: u64, waker: Waker) {
let mut channels = self.shared_channels.borrow_mut();
let map = channels
.wakers_map
.entry(id)
.or_insert_with(|| (SmallVec::new(), None));
map.0.push(waker);
}
pub(crate) fn alloc_dma_buffer(&self, size: usize) -> DmaBuffer {
self.sys.alloc_dma_buffer(size)
}
pub(crate) fn write_dma(
&self,
raw: RawFd,
buf: DmaSource,
pos: u64,
pollable: PollableStatus,
) -> Source {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if let Ok(result) = result {
stats.file_writes += op_count;
stats.file_bytes_written += *result as u64 * op_count;
}
}),
reused: None,
latency: None,
};
let source = self.new_source(
raw,
SourceType::Write(pollable, IoBuffer::DmaSource(buf)),
Some(stats),
);
self.sys.write_dma(&source, pos);
source
}
pub(crate) fn write_buffered(&self, raw: RawFd, buf: Vec<u8>, pos: u64) -> Source {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if let Ok(result) = result {
stats.file_buffered_writes += op_count;
stats.file_buffered_bytes_written += *result as u64 * op_count;
}
}),
reused: None,
latency: None,
};
let source = self.new_source(
raw,
SourceType::Write(
PollableStatus::NonPollable(DirectIo::Disabled),
IoBuffer::Buffered(buf),
),
Some(stats),
);
self.sys.write_buffered(&source, pos);
source
}
pub(crate) fn connect(&self, raw: RawFd, addr: SockAddr) -> Source {
let source = self.new_source(raw, SourceType::Connect(addr), None);
self.sys.connect(&source);
source
}
pub(crate) fn connect_timeout(&self, raw: RawFd, addr: SockAddr, d: Duration) -> Source {
let source = self.new_source(raw, SourceType::Connect(addr), None);
source.set_timeout(d);
self.sys.connect(&source);
source
}
pub(crate) fn accept(&self, raw: RawFd) -> Source {
let addr = SockAddrStorage::uninit();
let source = self.new_source(raw, SourceType::Accept(addr), None);
self.sys.accept(&source);
source
}
pub(crate) fn poll_read_ready(&self, fd: RawFd) -> Source {
let source = self.new_source(fd, SourceType::PollAdd, None);
self.sys.poll_ready(&source, common_flags() | read_flags());
source
}
pub(crate) fn poll_write_ready(&self, fd: RawFd) -> Source {
let source = self.new_source(fd, SourceType::PollAdd, None);
self.sys
.poll_ready(&source, common_flags() | PollFlags::POLLOUT);
source
}
pub(crate) fn rushed_send(
&self,
fd: RawFd,
buf: DmaBuffer,
timeout: Option<Duration>,
) -> io::Result<Source> {
let source = self.new_source(fd, SourceType::SockSend(buf), None);
if let Some(timeout) = timeout {
source.set_timeout(timeout);
}
self.sys.send(&source, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}
pub(crate) fn rushed_sendmsg(
&self,
fd: RawFd,
buf: DmaBuffer,
addr: nix::sys::socket::SockAddr,
timeout: Option<Duration>,
) -> io::Result<Source> {
let iov = libc::iovec {
iov_base: buf.as_ptr() as *mut libc::c_void,
iov_len: 1,
};
let hdr = libc::msghdr {
msg_name: std::ptr::null_mut(),
msg_namelen: 0,
msg_iov: std::ptr::null_mut(),
msg_iovlen: 0,
msg_control: std::ptr::null_mut(),
msg_controllen: 0,
msg_flags: 0,
};
let source = self.new_source(fd, SourceType::SockSendMsg(buf, iov, hdr, addr), None);
if let Some(timeout) = timeout {
source.set_timeout(timeout);
}
self.sys.sendmsg(&source, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}
pub(crate) fn rushed_recvmsg(
&self,
fd: RawFd,
size: usize,
flags: MsgFlags,
timeout: Option<Duration>,
) -> io::Result<Source> {
let hdr = libc::msghdr {
msg_name: std::ptr::null_mut(),
msg_namelen: 0,
msg_iov: std::ptr::null_mut(),
msg_iovlen: 0,
msg_control: std::ptr::null_mut(),
msg_controllen: 0,
msg_flags: 0,
};
let iov = libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
};
let source = self.new_source(
fd,
SourceType::SockRecvMsg(
None,
iov,
hdr,
std::mem::MaybeUninit::<nix::sys::socket::sockaddr_storage>::uninit(),
),
None,
);
if let Some(timeout) = timeout {
source.set_timeout(timeout);
}
self.sys.recvmsg(&source, size, flags);
self.rush_dispatch(&source)?;
Ok(source)
}
pub(crate) fn rushed_recv(
&self,
fd: RawFd,
size: usize,
timeout: Option<Duration>,
) -> io::Result<Source> {
let source = self.new_source(fd, SourceType::SockRecv(None), None);
if let Some(timeout) = timeout {
source.set_timeout(timeout);
}
self.sys.recv(&source, size, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}
pub(crate) fn recv(&self, fd: RawFd, size: usize, flags: MsgFlags) -> Source {
let source = self.new_source(fd, SourceType::SockRecv(None), None);
self.sys.recv(&source, size, flags);
source
}
pub(crate) fn read_dma(
&self,
raw: RawFd,
pos: u64,
size: usize,
pollable: PollableStatus,
scheduler: Option<&FileScheduler>,
) -> ScheduledSource {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if let Ok(result) = result {
stats.file_reads += op_count;
stats.file_bytes_read += *result as u64 * op_count;
}
}),
reused: Some(|result, stats, op_count| {
if let Ok(result) = result {
stats.file_deduped_reads += op_count;
stats.file_deduped_bytes_read += *result as u64 * op_count;
}
}),
latency: if self.record_io_latencies {
Some(|pre_lat, io_lat, post_lat, stats| {
stats
.pre_reactor_io_scheduler_latency_us
.add(pre_lat.as_micros() as f64);
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(post_lat.as_micros() as f64)
})
} else {
None
},
};
let source = self.new_source(raw, SourceType::Read(pollable, None), Some(stats));
if let Some(scheduler) = scheduler {
if let Some(source) =
scheduler.consume_scheduled(pos..pos + size as u64, Some(&self.sys))
{
source
} else {
self.sys.read_dma(&source, pos, size);
scheduler.schedule(source, pos..pos + size as u64)
}
} else {
self.sys.read_dma(&source, pos, size);
ScheduledSource::new_raw(source, pos..pos + size as u64)
}
}
pub(crate) fn read_buffered(
&self,
raw: RawFd,
pos: u64,
size: usize,
scheduler: Option<&FileScheduler>,
) -> ScheduledSource {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if let Ok(result) = result {
stats.file_buffered_reads += op_count;
stats.file_buffered_bytes_read += *result as u64 * op_count;
}
}),
reused: None,
latency: if self.record_io_latencies {
Some(|pre_lat, io_lat, post_lat, stats| {
stats
.pre_reactor_io_scheduler_latency_us
.add(pre_lat.as_micros() as f64);
stats.io_latency_us.add(io_lat.as_micros() as f64);
stats
.post_reactor_io_scheduler_latency_us
.add(post_lat.as_micros() as f64)
})
} else {
None
},
};
let source = self.new_source(
raw,
SourceType::Read(PollableStatus::NonPollable(DirectIo::Disabled), None),
Some(stats),
);
if let Some(scheduler) = scheduler {
if let Some(source) =
scheduler.consume_scheduled(pos..pos + size as u64, Some(&self.sys))
{
source
} else {
self.sys.read_buffered(&source, pos, size);
scheduler.schedule(source, pos..pos + size as u64)
}
} else {
self.sys.read_buffered(&source, pos, size);
ScheduledSource::new_raw(source, pos..pos + size as u64)
}
}
pub(crate) fn fdatasync(&self, raw: RawFd) -> Source {
let source = self.new_source(raw, SourceType::FdataSync, None);
self.sys.fdatasync(&source);
source
}
pub(crate) fn fallocate(
&self,
raw: RawFd,
position: u64,
size: u64,
flags: libc::c_int,
) -> Source {
let source = self.new_source(raw, SourceType::Fallocate, None);
self.sys.fallocate(&source, position, size, flags);
source
}
pub(crate) fn truncate(&self, raw: RawFd, size: u64) -> impl Future<Output = Source> {
let source = self.new_source(raw, SourceType::Truncate, None);
let waiter = self.sys.truncate(&source, size);
async move {
waiter.await;
source
}
}
pub(crate) fn rename<P, Q>(&self, old_path: P, new_path: Q) -> impl Future<Output = Source>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
let source = self.new_source(
-1,
SourceType::Rename(old_path.as_ref().to_owned(), new_path.as_ref().to_owned()),
None,
);
let waiter = self.sys.rename(&source);
async move {
waiter.await;
source
}
}
pub(crate) fn remove_file<P: AsRef<Path>>(&self, path: P) -> impl Future<Output = Source> {
let source = self.new_source(-1, SourceType::Remove(path.as_ref().to_owned()), None);
let waiter = self.sys.remove_file(&source);
async move {
waiter.await;
source
}
}
pub(crate) fn create_dir<P: AsRef<Path>>(
&self,
path: P,
mode: libc::c_int,
) -> impl Future<Output = Source> {
let source = self.new_source(-1, SourceType::CreateDir(path.as_ref().to_owned()), None);
let waiter = self.sys.create_dir(&source, mode);
async move {
waiter.await;
source
}
}
pub(crate) fn run_blocking(
&self,
func: Box<dyn FnOnce() + Send + 'static>,
) -> impl Future<Output = Source> {
let source = self.new_source(-1, SourceType::BlockingFn, None);
let waiter = self.sys.run_blocking(&source, func);
async move {
waiter.await;
source
}
}
pub(crate) fn close(&self, raw: RawFd) -> Source {
let source = self.new_source(
raw,
SourceType::Close,
Some(StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if result.is_ok() {
stats.files_closed += op_count
}
}),
reused: None,
latency: None,
}),
);
self.sys.close(&source);
source
}
pub(crate) fn statx(&self, raw: RawFd, path: &Path) -> Source {
let path = CString::new(path.as_os_str().as_bytes()).expect("path contained null!");
let statx_buf = unsafe {
let statx_buf = mem::MaybeUninit::<libc::statx>::zeroed();
statx_buf.assume_init()
};
let source = self.new_source(
raw,
SourceType::Statx(path, Box::new(RefCell::new(statx_buf))),
None,
);
self.sys.statx(&source);
source
}
pub(crate) fn open_at(
&self,
dir: RawFd,
path: &Path,
flags: libc::c_int,
mode: libc::mode_t,
) -> Source {
let path = CString::new(path.as_os_str().as_bytes()).expect("path contained null!");
let source = self.new_source(
dir,
SourceType::Open(path),
Some(StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if result.is_ok() {
stats.files_opened += op_count
}
}),
reused: None,
latency: None,
}),
);
self.sys.open_at(&source, flags, mode);
source
}
#[cfg(feature = "bench")]
pub(crate) fn nop(&self) -> Source {
let source = self.new_source(-1, SourceType::Noop, None);
self.sys.nop(&source);
source
}
pub(crate) fn register_timer(&self) -> u64 {
let mut timers = self.timers.borrow_mut();
timers.new_id()
}
pub(crate) fn insert_timer(&self, id: u64, when: Instant, waker: Waker) {
let mut timers = self.timers.borrow_mut();
timers.insert(id, when, waker);
}
pub(crate) fn remove_timer(&self, id: u64) -> Option<Waker> {
let mut timers = self.timers.borrow_mut();
timers.remove(id)
}
pub(crate) fn timer_exists(&self, id: &(Instant, u64)) -> bool {
let timers = self.timers.borrow();
timers.timers.contains_key(id)
}
fn process_timers(&self) -> (Option<Duration>, usize) {
let mut timers = self.timers.borrow_mut();
timers.process_timers()
}
fn process_shared_channels(&self) -> usize {
let mut channels = self.shared_channels.borrow_mut();
let mut processed = channels.process_shared_channels();
processed += self.sys.process_foreign_wakes();
processed
}
pub(crate) fn process_shared_channels_by_id(&self, id: u64) -> usize {
match self.shared_channels.borrow_mut().wakers_map.get_mut(&id) {
Some(wakers) => {
let processed = wakers.0.len();
wakers.0.drain(..).for_each(|w| {
wake!(w);
});
processed
}
None => 0,
}
}
pub(crate) fn rush_dispatch(&self, source: &Source) -> io::Result<()> {
self.sys.rush_dispatch(source, &mut 0)
}
pub(crate) fn spin_poll_io(&self) -> io::Result<bool> {
let mut woke = 0;
self.sys.poll_io(&mut woke)?;
woke += self.process_timers().1;
woke += self.process_shared_channels();
Ok(woke > 0)
}
fn process_external_events(&self) -> (Option<Duration>, usize) {
let (next_timer, mut woke) = self.process_timers();
woke += self.process_shared_channels();
(next_timer, woke)
}
pub(crate) fn react(&self, timeout: impl Fn() -> Option<Duration>) -> io::Result<bool> {
let (next_timer, woke) = self.process_external_events();
match self
.sys
.wait(timeout, next_timer, woke, || self.process_shared_channels())
{
Ok(true) => {
self.process_external_events();
Ok(true)
}
Ok(false) => Ok(false),
Err(err) => Err(err),
}
}
pub(crate) fn io_scheduler(&self) -> &Rc<IoScheduler> {
&self.io_scheduler
}
}
impl fmt::Debug for Reactor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Reactor { .. }")
}
}