use std::cell::{Cell, RefCell};
use std::fmt;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use bytes::Bytes;
use crate::backend::Driver;
#[cfg(has_io_uring)]
use crate::completion::{OpTag, UserData};
use crate::error::TimerExhausted;
use crate::handler::ConnToken;
#[cfg(has_io_uring)]
use crate::runtime::TimerSlotPool;
use crate::runtime::task::TaskId;
use crate::runtime::waker::STANDALONE_BIT;
use crate::runtime::{CURRENT_TASK_ID, Executor, IoResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParseResult {
Consumed(usize),
NeedMore,
}
pub(crate) struct DriverState {
pub(crate) driver: NonNull<Driver>,
pub(crate) executor: NonNull<Executor>,
}
thread_local! {
pub(crate) static CURRENT_DRIVER: Cell<Option<NonNull<DriverState>>> =
const { Cell::new(None) };
}
pub(crate) unsafe fn set_driver_state(state: &mut DriverState) {
CURRENT_DRIVER.with(|c| c.set(Some(NonNull::from(state))));
}
pub(crate) fn clear_driver_state() {
CURRENT_DRIVER.with(|c| c.set(None));
}
pub(crate) fn with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> R {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
let mut non_null = opt_non_null.expect("called outside executor");
let state = unsafe { non_null.as_mut() };
let driver = unsafe { &mut *state.driver.as_mut() };
let executor = unsafe { &mut *state.executor.as_mut() };
f(driver, executor)
}
pub(crate) fn try_with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> Option<R> {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
let mut non_null = opt_non_null?;
let state = unsafe { non_null.as_mut() };
let driver = unsafe { &mut *state.driver.as_mut() };
let executor = unsafe { &mut *state.executor.as_mut() };
Some(f(driver, executor))
}
pub fn spawn(future: impl Future<Output = ()> + 'static) -> io::Result<TaskId> {
try_with_state(
|_driver, executor| match executor.standalone_slab.spawn(Box::pin(future)) {
Some(idx) => {
executor.ready_queue.push_back(idx | STANDALONE_BIT);
Ok(TaskId(idx))
}
None => Err(io::Error::other("standalone task slab exhausted")),
},
)
.unwrap_or_else(|| Err(io::Error::other("called outside executor")))
}
impl TaskId {
pub fn cancel(self) {
with_state(|_driver, executor| {
executor.standalone_slab.remove(self.0);
});
}
}
struct JoinState<T> {
result: Option<T>,
waiter: Option<u32>,
aborted: bool,
}
pub struct JoinHandle<T> {
state: Rc<RefCell<JoinState<T>>>,
task_id: TaskId,
}
impl<T> JoinHandle<T> {
pub fn id(&self) -> TaskId {
self.task_id
}
pub fn abort(&self) {
self.state.borrow_mut().aborted = true;
self.task_id.cancel();
}
}
impl<T: 'static> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
let mut s = self.state.borrow_mut();
if s.aborted {
return Poll::Pending;
}
if let Some(value) = s.result.take() {
return Poll::Ready(value);
}
s.waiter = Some(CURRENT_TASK_ID.with(|c| c.get()));
Poll::Pending
}
}
pub fn spawn_with_handle<T: 'static>(
future: impl Future<Output = T> + 'static,
) -> io::Result<JoinHandle<T>> {
let state = Rc::new(RefCell::new(JoinState {
result: None,
waiter: None,
aborted: false,
}));
let state_for_wrapper = Rc::clone(&state);
let wrapper = async move {
let value = future.await;
let mut s = state_for_wrapper.borrow_mut();
s.result = Some(value);
let waiter = s.waiter.take();
drop(s);
if let Some(waiter_id) = waiter {
with_state(|_driver, executor| {
executor.wake_task(waiter_id);
});
}
};
try_with_state(
|_driver, executor| match executor.standalone_slab.spawn(Box::pin(wrapper)) {
Some(idx) => {
executor.ready_queue.push_back(idx | STANDALONE_BIT);
Ok(JoinHandle {
state: Rc::clone(&state),
task_id: TaskId(idx),
})
}
None => Err(io::Error::other("standalone task slab exhausted")),
},
)
.unwrap_or_else(|| Err(io::Error::other("called outside executor")))
}
pub fn spawn_blocking<T: Send + 'static>(
f: impl FnOnce() -> T + Send + 'static,
) -> io::Result<BlockingJoinHandle<T>> {
try_with_state(|driver, executor| {
let pool = driver
.blocking_pool
.as_ref()
.ok_or_else(|| io::Error::other("blocking pool not configured"))?;
let blocking_tx = driver
.blocking_tx
.as_ref()
.ok_or_else(|| io::Error::other("blocking pool not configured"))?;
let request_id = executor.next_blocking_id;
executor.next_blocking_id += 1;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor
.pending_blocking
.insert(request_id, (task_id, None));
let work = Box::new(move || -> Box<dyn std::any::Any + Send> { Box::new(f()) });
pool.request_tx
.send(crate::blocking::BlockingRequest {
work,
request_id,
response_tx: blocking_tx.clone(),
wake_handle: driver.wake_handle,
})
.map_err(|_| io::Error::other("blocking pool shut down"))?;
Ok(BlockingJoinHandle {
request_id,
_phantom: std::marker::PhantomData,
})
})
.unwrap_or_else(|| Err(io::Error::other("called outside executor")))
}
pub struct BlockingJoinHandle<T> {
request_id: u64,
_phantom: std::marker::PhantomData<T>,
}
impl<T: 'static> Future for BlockingJoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
with_state(|_driver, executor| {
if let Some((_, slot)) = executor.pending_blocking.get_mut(&self.request_id)
&& let Some(boxed) = slot.take()
{
executor.pending_blocking.remove(&self.request_id);
let value = *boxed
.downcast::<T>()
.expect("type mismatch in BlockingJoinHandle");
return Poll::Ready(value);
}
Poll::Pending
})
}
}
pub fn connect(addr: SocketAddr) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect(addr)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_with_timeout(addr: SocketAddr, timeout_ms: u64) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_with_timeout(addr, timeout_ms)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_unix(path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_unix(path.as_ref())
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_tls(addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls(addr, server_name)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_tls_with_timeout(
addr: SocketAddr,
server_name: &str,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls_with_timeout(addr, server_name, timeout_ms)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn resolve(host: &str, port: u16) -> io::Result<ResolveFuture> {
let host = host.to_string();
try_with_state(|driver, executor| {
let resolver = driver
.resolver
.as_ref()
.ok_or_else(|| io::Error::other("resolver pool not configured"))?;
let resolve_tx = driver
.resolve_tx
.as_ref()
.ok_or_else(|| io::Error::other("resolver pool not configured"))?;
let request_id = executor.next_resolve_id;
executor.next_resolve_id += 1;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor
.pending_resolves
.insert(request_id, (task_id, None));
resolver
.request_tx
.send(crate::resolver::ResolveRequest {
host,
port,
request_id,
response_tx: resolve_tx.clone(),
wake_handle: driver.wake_handle,
})
.map_err(|_| io::Error::other("resolver pool shut down"))?;
Ok(ResolveFuture { request_id })
})
.unwrap_or_else(|| Err(io::Error::other("called outside executor")))
}
pub struct ResolveFuture {
request_id: u64,
}
impl Future for ResolveFuture {
type Output = io::Result<std::net::SocketAddr>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
with_state(|_driver, executor| {
if let Some((_, slot)) = executor.pending_resolves.get_mut(&self.request_id)
&& let Some(result) = slot.take()
{
executor.pending_resolves.remove(&self.request_id);
return Poll::Ready(result);
}
Poll::Pending
})
}
}
pub fn request_shutdown() -> io::Result<()> {
try_with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.request_shutdown();
})
.ok_or_else(|| io::Error::other("called outside executor"))
}
#[derive(Clone, Copy)]
pub struct ConnCtx {
pub(crate) conn_index: u32,
pub(crate) generation: u32,
}
impl ConnCtx {
pub(crate) fn new(conn_index: u32, generation: u32) -> Self {
ConnCtx {
conn_index,
generation,
}
}
pub fn index(&self) -> usize {
self.conn_index as usize
}
pub fn token(&self) -> ConnToken {
ConnToken::new(self.conn_index, self.generation)
}
pub fn with_data<F: FnMut(&[u8]) -> ParseResult>(&self, f: F) -> WithDataFuture<F> {
WithDataFuture {
conn_index: self.conn_index,
generation: self.generation,
f: Some(f),
}
}
pub fn with_bytes<F: FnMut(Bytes) -> ParseResult>(&self, f: F) -> WithBytesFuture<F> {
WithBytesFuture {
conn_index: self.conn_index,
generation: self.generation,
f: Some(f),
}
}
pub unsafe fn set_recv_sink(&self, target: *mut u8, len: usize) {
with_state(|_driver, executor| {
executor.recv_sinks[self.conn_index as usize] = Some(crate::runtime::RecvSink {
ptr: target,
cap: len,
pos: 0,
});
});
}
pub fn take_recv_sink(&self) -> usize {
with_state(
|_driver, executor| match executor.recv_sinks[self.conn_index as usize].take() {
Some(sink) => sink.pos,
None => 0,
},
)
}
pub fn recv_ready(&self) -> RecvReadyFuture {
RecvReadyFuture {
conn_index: self.conn_index,
generation: self.generation,
}
}
pub fn try_with_data<F: FnOnce(&[u8]) -> ParseResult>(&self, f: F) -> Option<ParseResult> {
with_state(|driver, _executor| {
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
return None;
}
let result = f(data);
if let ParseResult::Consumed(consumed) = result {
driver.accumulators.consume(self.conn_index, consumed);
}
Some(result)
})
}
pub fn send_nowait(&self, data: &[u8]) -> io::Result<()> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.send(self.token(), data)
})
}
pub fn forward_recv_buf(&self, data: &[u8]) -> io::Result<()> {
with_state(|driver, _| {
#[cfg_attr(not(has_io_uring), allow(unused_variables))]
let conn_index = self.conn_index;
#[cfg(has_io_uring)]
{
if let Some(pending) = driver.pending_recv_bufs[conn_index as usize].take() {
let pending_ptr = pending.ptr;
let data_ptr = data.as_ptr();
if data_ptr == pending_ptr && data.len() == pending.len as usize {
let payload = pending.bid as u32;
driver.send_recv_buf_original_lens[conn_index as usize] = pending.len;
driver.send_recv_buf_remaining[conn_index as usize] = pending.len;
let user_data = crate::completion::UserData::encode(
crate::completion::OpTag::SendRecvBuf,
conn_index,
payload,
);
let entry = io_uring::opcode::Send::new(
io_uring::types::Fixed(conn_index),
pending_ptr,
pending.len,
)
.build()
.user_data(user_data.raw());
let built = crate::handler::BuiltSend {
entry,
pool_slot: u16::MAX,
#[cfg(has_io_uring)]
slab_idx: u16::MAX,
total_len: pending.len,
};
let result = driver.submit_or_queue_send(conn_index, built);
if result.is_err() {
driver.pending_replenish.push(pending.bid);
}
return result;
}
driver.pending_recv_bufs[conn_index as usize] = Some(pending);
}
}
let mut ctx = driver.make_ctx();
ctx.send(self.token(), data)
})
}
#[cfg(has_io_uring)]
pub fn run_direct_echo(&self) -> DirectEchoFuture {
DirectEchoFuture {
conn_index: self.conn_index,
generation: self.generation,
armed: false,
}
}
#[cfg(has_io_uring)]
pub fn enable_recv_forward(&self) {
with_state(|driver, _| {
driver.recv_forward[self.conn_index as usize] = true;
});
}
#[cfg(has_io_uring)]
pub fn forward_held(&self) -> io::Result<SendFuture> {
with_state(|driver, executor| {
let conn_index = self.conn_index;
if driver.connections.generation(conn_index) != self.generation {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"stale connection",
));
}
let n = driver.recv_hold[conn_index as usize]
.len()
.min(crate::buffer::send_slab::MAX_IOVECS);
if n == 0 {
executor.io_results[conn_index as usize] = Some(IoResult::Send(Ok(0)));
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
return Ok(SendFuture {
conn_index,
generation: self.generation,
});
}
let mut iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; crate::buffer::send_slab::MAX_IOVECS];
let mut bids = [0u16; crate::buffer::send_slab::MAX_IOVECS];
let mut total: u32 = 0;
for i in 0..n {
let p = driver.recv_hold[conn_index as usize][i];
iovecs[i] = libc::iovec {
iov_base: p.ptr as *mut libc::c_void,
iov_len: p.len as usize,
};
bids[i] = p.bid;
total += p.len;
}
let (slab_idx, msg_ptr) = driver
.send_slab
.allocate_recv_forward(conn_index, &iovecs[..n], &bids[..n], total)
.ok_or_else(|| io::Error::other("send slab exhausted"))?;
match driver
.ring
.submit_send_recv_bufs_coalesced(conn_index, msg_ptr, slab_idx)
{
Ok(()) => {
for _ in 0..n {
driver.recv_hold[conn_index as usize].pop_front();
}
driver.send_queues[conn_index as usize].in_flight = true;
executor.owner_task[conn_index as usize] =
Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
Ok(SendFuture {
conn_index,
generation: self.generation,
})
}
Err(e) => {
driver.send_slab.release(slab_idx);
Err(e)
}
}
})
}
#[cfg(not(has_io_uring))]
pub fn enable_recv_forward(&self) {}
#[cfg(not(has_io_uring))]
pub fn forward_held(&self) -> io::Result<SendFuture> {
with_state(|driver, executor| {
let conn_index = self.conn_index;
if driver.connections.generation(conn_index) != self.generation {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"stale connection",
));
}
let data = driver.accumulators.data(conn_index).to_vec();
if data.is_empty() {
executor.io_results[conn_index as usize] = Some(IoResult::Send(Ok(0)));
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
return Ok(SendFuture {
conn_index,
generation: self.generation,
});
}
let mut ctx = driver.make_ctx();
ctx.send(self.token(), &data)?;
driver.accumulators.consume(conn_index, data.len());
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
driver.send_completions[conn_index as usize].push_back(data.len() as u32);
Ok(SendFuture {
conn_index,
generation: self.generation,
})
})
}
#[cfg(has_io_uring)]
pub fn send_parts(&self) -> AsyncSendBuilder {
AsyncSendBuilder {
token: self.token(),
}
}
pub fn send(&self, data: &[u8]) -> io::Result<SendFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
ctx.send(self.token(), data)?;
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[self.conn_index as usize] = true;
#[cfg(not(has_io_uring))]
{
driver.send_completions[self.conn_index as usize].push_back(data.len() as u32);
}
Ok(SendFuture {
conn_index: self.conn_index,
generation: self.generation,
})
})
}
pub fn connect(&self, addr: SocketAddr) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect(addr)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_with_timeout(
&self,
addr: SocketAddr,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_with_timeout(addr, timeout_ms)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_unix(&self, path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_unix(path.as_ref())
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
#[cfg(has_io_uring)]
pub fn send_chain_nowait<F, R>(&self, f: F) -> R
where
F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> R,
{
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let token = ConnToken::new(self.conn_index, self.generation);
let builder = ctx.send_chain(token);
f(builder)
})
}
#[cfg(has_io_uring)]
pub fn send_chain<F>(&self, f: F) -> io::Result<SendFuture>
where
F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ConnToken::new(self.conn_index, self.generation);
let builder = ctx.send_chain(token);
f(builder)?;
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[self.conn_index as usize] = true;
Ok(SendFuture {
conn_index: self.conn_index,
generation: self.generation,
})
})
}
pub fn shutdown_write(&self) {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.shutdown_write(self.token());
})
}
pub fn cancel(&self) -> io::Result<()> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.cancel(self.token())
})
}
pub fn request_shutdown(&self) {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.request_shutdown();
})
}
pub fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
with_state(|driver, _| {
let ctx = driver.make_ctx();
ctx.tls_info(self.token())
})
}
pub fn connect_tls(&self, addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls(addr, server_name)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_tls_with_timeout(
&self,
addr: SocketAddr,
server_name: &str,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls_with_timeout(addr, server_name, timeout_ms)
.map_err(io::Error::other::<crate::error::Error>)?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
#[cfg(feature = "timestamps")]
pub fn recv_timestamp(&self) -> u64 {
with_state(|driver, _| {
driver
.connections
.get(self.conn_index)
.map(|cs| cs.recv_timestamp_ns)
.unwrap_or(0)
})
}
pub fn close(&self) {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
let driver = unsafe { &mut *state.driver.as_mut() };
driver.close_connection(self.conn_index);
}
pub fn peer_addr(&self) -> Option<crate::connection::PeerAddr> {
with_state(|driver, _| {
let conn = driver.connections.get(self.conn_index)?;
if conn.generation != self.generation {
return None;
}
conn.peer_addr.clone()
})
}
pub fn is_outbound(&self) -> bool {
with_state(|driver, _| {
driver
.connections
.get(self.conn_index)
.map(|cs| cs.generation == self.generation && cs.outbound)
.unwrap_or(false)
})
}
}
#[cfg(has_io_uring)]
pub struct AsyncSendBuilder {
token: ConnToken,
}
#[cfg(has_io_uring)]
impl AsyncSendBuilder {
pub fn build<F>(self, f: F) -> io::Result<()>
where
F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let builder = ctx.send_parts(self.token);
f(builder)
})
}
pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
where
F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let builder = ctx.send_parts(self.token);
f(builder)?;
let conn_index = self.token.index;
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
Ok(SendFuture {
conn_index,
generation: self.token.generation,
})
})
}
pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
use crate::handler::SendPart;
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let mut builder = ctx.send_parts(self.token);
let mut consumed = 0usize;
for part in parts {
match part {
SendPart::Copy(data) => {
builder = builder.copy(data);
}
SendPart::Guard(guard) => {
builder = builder.guard(guard);
}
}
consumed += 1;
}
if consumed == 0 {
return Ok(0);
}
builder.submit()?;
Ok(consumed)
})
}
pub fn submit_batch_await(
self,
parts: Vec<crate::handler::SendPart<'_>>,
) -> io::Result<(usize, SendFuture)> {
use crate::handler::SendPart;
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let mut builder = ctx.send_parts(self.token);
let mut consumed = 0usize;
for part in parts {
match part {
SendPart::Copy(data) => {
builder = builder.copy(data);
}
SendPart::Guard(guard) => {
builder = builder.guard(guard);
}
}
consumed += 1;
}
if consumed == 0 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty batch"));
}
builder.submit()?;
let conn_index = self.token.index;
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
Ok((
consumed,
SendFuture {
conn_index,
generation: self.token.generation,
},
))
})
}
}
#[cfg(not(has_io_uring))]
pub struct AsyncSendBuilder {
token: ConnToken,
}
#[cfg(not(has_io_uring))]
impl ConnCtx {
pub fn send_parts(&self) -> AsyncSendBuilder {
AsyncSendBuilder {
token: self.token(),
}
}
}
#[cfg(not(has_io_uring))]
impl AsyncSendBuilder {
pub fn build<F>(self, f: F) -> io::Result<()>
where
F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
{
with_state(|driver, _| {
let mut buf = Vec::new();
let builder = MioSendBuilder { buf: &mut buf };
f(builder)?;
if !buf.is_empty() {
let mut ctx = driver.make_ctx();
ctx.send(self.token, &buf)?;
}
Ok(())
})
}
pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
use crate::handler::SendPart;
with_state(|driver, _| {
let mut buf = Vec::new();
let mut consumed = 0usize;
for part in &parts {
match part {
SendPart::Copy(data) => buf.extend_from_slice(data),
SendPart::Guard(guard) => {
let (ptr, len) = guard.as_ptr_len();
let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
buf.extend_from_slice(data);
}
}
consumed += 1;
}
if !buf.is_empty() {
let mut ctx = driver.make_ctx();
ctx.send(self.token, &buf)?;
}
Ok(consumed)
})
}
pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
where
F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
{
with_state(|driver, executor| {
let mut buf = Vec::new();
let builder = MioSendBuilder { buf: &mut buf };
f(builder)?;
if !buf.is_empty() {
let mut ctx = driver.make_ctx();
ctx.send(self.token, &buf)?;
}
let conn_index = self.token.index;
executor.owner_task[conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[conn_index as usize] = true;
Ok(SendFuture {
conn_index,
generation: self.token.generation,
})
})
}
}
#[cfg(not(has_io_uring))]
pub struct MioSendBuilder<'a> {
buf: &'a mut Vec<u8>,
}
#[cfg(not(has_io_uring))]
impl<'a> MioSendBuilder<'a> {
pub fn copy(self, data: &[u8]) -> Self {
self.buf.extend_from_slice(data);
self
}
pub fn guard(self, guard: crate::guard::GuardBox) -> Self {
let (ptr, len) = guard.as_ptr_len();
let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
self.buf.extend_from_slice(data);
self
}
pub fn submit(self) -> io::Result<()> {
Ok(())
}
}
pub struct WithDataFuture<F> {
conn_index: u32,
generation: u32,
f: Option<F>,
}
impl<F: FnMut(&[u8]) -> ParseResult + Unpin> Future for WithDataFuture<F> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
self.f.take();
return Poll::Ready(0);
}
#[cfg(has_io_uring)]
if driver.pending_recv_bufs[self.conn_index as usize].is_some() {
let acc_empty = driver.accumulators.data(self.conn_index).is_empty();
if acc_empty {
let pending = driver.pending_recv_bufs[self.conn_index as usize].unwrap();
let data =
unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
let result = f(data);
match result {
ParseResult::Consumed(consumed) if consumed > 0 => {
if let Some(pending) =
driver.pending_recv_bufs[self.conn_index as usize].take()
{
if consumed < pending.len as usize {
let remainder = unsafe {
std::slice::from_raw_parts(
pending.ptr.add(consumed),
pending.len as usize - consumed,
)
};
driver.accumulators.append(self.conn_index, remainder);
}
driver.pending_replenish.push(pending.bid);
}
self.f.take();
return Poll::Ready(consumed);
}
_ => {
if let Some(pending) =
driver.pending_recv_bufs[self.conn_index as usize].take()
{
let pending_data = unsafe {
std::slice::from_raw_parts(pending.ptr, pending.len as usize)
};
driver.accumulators.append(self.conn_index, pending_data);
driver.pending_replenish.push(pending.bid);
}
}
}
} else {
let pending = driver.pending_recv_bufs[self.conn_index as usize]
.take()
.unwrap();
let pending_data =
unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
driver.accumulators.prepend(self.conn_index, pending_data);
driver.pending_replenish.push(pending.bid);
}
}
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true); if is_closed {
let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
let result = f(&[]);
self.f.take();
return Poll::Ready(match result {
ParseResult::Consumed(n) => n,
ParseResult::NeedMore => 0,
});
}
executor.owner_task[self.conn_index as usize] =
Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
return Poll::Pending;
}
let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
let result = f(data);
match result {
ParseResult::Consumed(consumed) if consumed > 0 => {
driver.accumulators.consume(self.conn_index, consumed);
self.f.take();
return Poll::Ready(consumed);
}
_ => {}
}
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
self.f.take();
return Poll::Ready(0);
}
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct WithBytesFuture<F> {
conn_index: u32,
generation: u32,
f: Option<F>,
}
impl<F: FnMut(Bytes) -> ParseResult + Unpin> Future for WithBytesFuture<F> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
self.f.take();
return Poll::Ready(0);
}
#[cfg(has_io_uring)]
if let Some(pending) = driver.pending_recv_bufs[self.conn_index as usize].take() {
let pending_data =
unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
driver.accumulators.append(self.conn_index, pending_data);
driver.pending_replenish.push(pending.bid);
}
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
let result = f(Bytes::new());
self.f.take();
return Poll::Ready(match result {
ParseResult::Consumed(n) => n,
ParseResult::NeedMore => 0,
});
}
executor.owner_task[self.conn_index as usize] =
Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
return Poll::Pending;
}
let frozen = driver.accumulators.take_frozen(self.conn_index);
let len = frozen.len();
let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
let result = f(frozen.clone());
match result {
ParseResult::Consumed(consumed) if consumed > 0 => {
if consumed < len {
driver
.accumulators
.prepend(self.conn_index, &frozen[consumed..]);
}
self.f.take();
return Poll::Ready(consumed);
}
_ => {}
}
driver.accumulators.prepend(self.conn_index, &frozen[..]);
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
self.f.take();
return Poll::Ready(0);
}
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct RecvReadyFuture {
conn_index: u32,
generation: u32,
}
impl Future for RecvReadyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
return Poll::Ready(());
}
if let Some(sink) = &executor.recv_sinks[self.conn_index as usize]
&& sink.pos > 0
{
return Poll::Ready(());
}
if !driver.accumulators.data(self.conn_index).is_empty() {
return Poll::Ready(());
}
#[cfg(has_io_uring)]
if !driver.recv_hold[self.conn_index as usize].is_empty() {
return Poll::Ready(());
}
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
return Poll::Ready(());
}
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct SendFuture {
conn_index: u32,
generation: u32,
}
impl Future for SendFuture {
type Output = io::Result<u32>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u32>> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"connection closed",
)));
}
match executor.io_results[self.conn_index as usize].take() {
Some(IoResult::Send(result)) => Poll::Ready(result),
_ => {
executor.owner_task[self.conn_index as usize] =
Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.send_waiters[self.conn_index as usize] = true;
Poll::Pending
}
}
})
}
}
impl Drop for SendFuture {
fn drop(&mut self) {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
#[cfg(has_io_uring)]
let driver = unsafe { &mut *state.driver.as_mut() };
#[cfg(not(has_io_uring))]
let driver = unsafe { &mut *state.driver.as_mut() };
if driver.connections.generation(self.conn_index) != self.generation {
return;
}
let executor = unsafe { &mut *state.executor.as_mut() };
executor.send_waiters[self.conn_index as usize] = false;
}
}
#[cfg(has_io_uring)]
pub struct DirectEchoFuture {
conn_index: u32,
generation: u32,
armed: bool,
}
#[cfg(has_io_uring)]
impl Future for DirectEchoFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
return Poll::Ready(());
}
if !self.armed {
if let Some(cs) = driver.connections.get_mut(self.conn_index) {
cs.direct_echo = true;
}
self.armed = true;
if let Some(pending) = driver.pending_recv_bufs[self.conn_index as usize].take() {
assert!(
pending.len <= 0xFFFF,
"DirectEchoFuture: data length {} exceeds 16-bit payload capacity",
pending.len,
);
let payload = (pending.bid as u32) | (pending.len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, self.conn_index, payload);
let entry = io_uring::opcode::Send::new(
io_uring::types::Fixed(self.conn_index),
pending.ptr,
pending.len,
)
.build()
.user_data(ud.raw());
let built = crate::handler::BuiltSend {
entry,
pool_slot: u16::MAX,
slab_idx: u16::MAX,
total_len: pending.len,
};
driver.send_recv_buf_original_lens[self.conn_index as usize] = pending.len;
if driver.submit_or_queue_send(self.conn_index, built).is_err() {
driver.pending_replenish.push(pending.bid);
}
}
}
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
return Poll::Ready(());
}
executor.owner_task[self.conn_index as usize] = Some(CURRENT_TASK_ID.with(|c| c.get()));
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
#[cfg(has_io_uring)]
impl Drop for DirectEchoFuture {
fn drop(&mut self) {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
let driver = unsafe { &mut *state.driver.as_mut() };
if driver.connections.generation(self.conn_index) != self.generation {
return;
}
let executor = unsafe { &mut *state.executor.as_mut() };
executor.recv_waiters[self.conn_index as usize] = false;
}
}
pub struct ConnectFuture {
conn_index: u32,
generation: u32,
}
impl Future for ConnectFuture {
type Output = io::Result<ConnCtx>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<ConnCtx>> {
with_state(|driver, executor| {
if driver.connections.generation(self.conn_index) != self.generation {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"connection closed",
)));
}
match executor.io_results[self.conn_index as usize].take() {
Some(IoResult::Connect(result)) => match result {
Ok(()) => Poll::Ready(Ok(ConnCtx::new(self.conn_index, self.generation))),
Err(e) => Poll::Ready(Err(e)),
},
_ => {
executor.connect_waiters[self.conn_index as usize] = true;
Poll::Pending
}
}
})
}
}
impl Drop for ConnectFuture {
fn drop(&mut self) {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
let driver = unsafe { &mut *state.driver.as_mut() };
if driver.connections.generation(self.conn_index) != self.generation {
return;
}
let executor = unsafe { &mut *state.executor.as_mut() };
executor.connect_waiters[self.conn_index as usize] = false;
}
}
pub fn sleep(duration: Duration) -> SleepFuture {
SleepFuture {
duration,
timer_slot: None,
generation: 0,
absolute: None,
}
}
pub struct SleepFuture {
duration: Duration,
timer_slot: Option<u32>,
generation: u16,
absolute: Option<Deadline>,
}
impl Future for SleepFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
let _ = driver; if let Some(slot) = self.timer_slot {
if executor.timer_pool.is_fired(slot) {
executor.timer_pool.release(slot);
self.timer_slot = None;
return Poll::Ready(());
}
return Poll::Pending;
}
let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = match executor.timer_pool.allocate(waker_id) {
Some(pair) => pair,
None => {
return Poll::Ready(());
}
};
#[cfg(has_io_uring)]
{
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let submit_result = if let Some(deadline) = self.absolute {
let ts_ptr =
executor
.timer_pool
.set_absolute(slot, deadline.secs, deadline.nsecs);
driver.ring.submit_timeout_abs(ts_ptr, ud)
} else {
let ts_ptr = executor.timer_pool.set_relative(slot, self.duration);
driver.ring.submit_timeout(ts_ptr, ud)
};
if let Err(_e) = submit_result {
executor.timer_pool.release(slot);
return Poll::Ready(());
}
}
#[cfg(not(has_io_uring))]
{
if let Some(deadline) = self.absolute {
executor
.timer_pool
.set_absolute(slot, deadline.secs, deadline.nsecs);
} else {
executor.timer_pool.set_relative(slot, self.duration);
}
}
self.timer_slot = Some(slot);
self.generation = generation;
Poll::Pending
})
}
}
impl Drop for SleepFuture {
fn drop(&mut self) {
if let Some(slot) = self.timer_slot {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
#[cfg(has_io_uring)]
let driver = unsafe { &mut *state.driver.as_mut() };
let executor = unsafe { &mut *state.executor.as_mut() };
if !executor.timer_pool.is_fired(slot) {
#[cfg(has_io_uring)]
{
let payload = TimerSlotPool::encode_payload(slot, self.generation);
let target_ud = UserData::encode(OpTag::Timer, 0, payload);
let _ = driver.ring.submit_async_cancel(target_ud.raw(), 0);
}
}
executor.timer_pool.release(slot);
}
}
}
pub fn try_sleep(duration: Duration) -> Result<SleepFuture, TimerExhausted> {
with_state(|driver, executor| {
let _ = driver; let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
TimerExhausted
})?;
#[cfg(has_io_uring)]
{
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr = executor.timer_pool.set_relative(slot, duration);
if let Err(_e) = driver.ring.submit_timeout(ts_ptr, ud) {
executor.timer_pool.release(slot);
return Ok(SleepFuture {
duration,
timer_slot: None,
generation: 0,
absolute: None,
});
}
}
#[cfg(not(has_io_uring))]
{
executor.timer_pool.set_relative(slot, duration);
}
Ok(SleepFuture {
duration,
timer_slot: Some(slot),
generation,
absolute: None,
})
})
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Deadline {
pub(crate) secs: u64,
pub(crate) nsecs: u32,
}
impl Deadline {
pub fn now() -> Self {
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe {
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
}
Deadline {
secs: ts.tv_sec as u64,
nsecs: ts.tv_nsec as u32,
}
}
pub fn after(duration: Duration) -> Self {
let now = Self::now();
let mut secs = now.secs + duration.as_secs();
let mut nsecs = now.nsecs + duration.subsec_nanos();
if nsecs >= 1_000_000_000 {
nsecs -= 1_000_000_000;
secs += 1;
}
Deadline { secs, nsecs }
}
pub fn remaining(&self) -> Duration {
let now = Self::now();
if now.secs > self.secs || (now.secs == self.secs && now.nsecs >= self.nsecs) {
return Duration::ZERO;
}
let mut secs = self.secs - now.secs;
let nsecs = if self.nsecs >= now.nsecs {
self.nsecs - now.nsecs
} else {
secs -= 1;
1_000_000_000 + self.nsecs - now.nsecs
};
Duration::new(secs, nsecs)
}
}
pub fn sleep_until(deadline: Deadline) -> SleepFuture {
SleepFuture {
duration: Duration::ZERO, timer_slot: None,
generation: 0,
absolute: Some(deadline),
}
}
pub fn try_sleep_until(deadline: Deadline) -> Result<SleepFuture, TimerExhausted> {
with_state(|driver, executor| {
let _ = driver; let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
TimerExhausted
})?;
#[cfg(has_io_uring)]
{
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr = executor
.timer_pool
.set_absolute(slot, deadline.secs, deadline.nsecs);
if let Err(_e) = driver.ring.submit_timeout_abs(ts_ptr, ud) {
executor.timer_pool.release(slot);
return Ok(SleepFuture {
duration: Duration::ZERO,
timer_slot: None,
generation: 0,
absolute: Some(deadline),
});
}
}
#[cfg(not(has_io_uring))]
{
executor
.timer_pool
.set_absolute(slot, deadline.secs, deadline.nsecs);
}
Ok(SleepFuture {
duration: Duration::ZERO,
timer_slot: Some(slot),
generation,
absolute: Some(deadline),
})
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Elapsed;
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("deadline has elapsed")
}
}
impl std::error::Error for Elapsed {}
pub fn timeout<F: Future>(duration: Duration, future: F) -> TimeoutFuture<F> {
TimeoutFuture {
future,
sleep: sleep(duration),
}
}
pub fn try_timeout<F: Future>(
duration: Duration,
future: F,
) -> Result<TimeoutFuture<F>, TimerExhausted> {
let sleep = try_sleep(duration)?;
Ok(TimeoutFuture { future, sleep })
}
pub fn timeout_at<F: Future>(deadline: Deadline, future: F) -> TimeoutFuture<F> {
TimeoutFuture {
future,
sleep: sleep_until(deadline),
}
}
pub fn try_timeout_at<F: Future>(
deadline: Deadline,
future: F,
) -> Result<TimeoutFuture<F>, TimerExhausted> {
let sleep = try_sleep_until(deadline)?;
Ok(TimeoutFuture { future, sleep })
}
pin_project_lite::pin_project! {
pub struct TimeoutFuture<F> {
#[pin]
future: F,
#[pin]
sleep: SleepFuture,
}
}
impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, Elapsed>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(output) = this.future.poll(cx) {
return Poll::Ready(Ok(output));
}
if let Poll::Ready(()) = this.sleep.poll(cx) {
return Poll::Ready(Err(Elapsed));
}
Poll::Pending
}
}
pub struct DiskIoFuture {
pub(crate) seq: u32,
}
impl Future for DiskIoFuture {
type Output = io::Result<i32>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<i32>> {
with_state(|_driver, executor| {
match executor.disk_io_results.remove(&self.seq) {
Some(result) if result < 0 => {
Poll::Ready(Err(io::Error::from_raw_os_error(-result)))
}
Some(result) => Poll::Ready(Ok(result)),
None => {
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(self.seq, task_id);
Poll::Pending
}
}
})
}
}
impl Drop for DiskIoFuture {
fn drop(&mut self) {
let opt_non_null = CURRENT_DRIVER.with(|c| c.get());
if opt_non_null.is_none() {
return;
}
let mut non_null = opt_non_null.unwrap();
let state = unsafe { non_null.as_mut() };
let executor = unsafe { &mut *state.executor.as_mut() };
executor.disk_io_waiters.remove(&self.seq);
}
}
pub fn open_direct_io_file(path: &str) -> io::Result<crate::direct_io::DirectIoFile> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.open_direct_io_file(path)
})
}
pub fn open_nvme_device(path: &str, nsid: u32) -> io::Result<crate::nvme::NvmeDevice> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.open_nvme_device(path, nsid)
})
}
pub unsafe fn direct_io_read(
file: crate::direct_io::DirectIoFile,
offset: u64,
buf: *mut u8,
len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
#[allow(unused_unsafe)]
let seq = unsafe { ctx.direct_io_read(file, offset, buf, len)? };
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub fn nvme_read(
device: crate::nvme::NvmeDevice,
lba: u64,
num_blocks: u16,
buf_addr: u64,
buf_len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = ctx.nvme_read(device, lba, num_blocks, buf_addr, buf_len)?;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub unsafe fn direct_io_write(
file: crate::direct_io::DirectIoFile,
offset: u64,
buf: *const u8,
len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
#[allow(unused_unsafe)]
let seq = unsafe { ctx.direct_io_write(file, offset, buf, len)? };
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub fn nvme_flush(device: crate::nvme::NvmeDevice) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = ctx.nvme_flush(device)?;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub fn nvme_write(
device: crate::nvme::NvmeDevice,
lba: u64,
num_blocks: u16,
buf_addr: u64,
buf_len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = ctx.nvme_write(device, lba, num_blocks, buf_addr, buf_len)?;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
#[derive(Clone, Copy)]
pub struct UdpCtx {
pub(crate) udp_index: u32,
}
impl UdpCtx {
pub fn index(&self) -> usize {
self.udp_index as usize
}
pub fn recv_from(&self) -> UdpRecvFuture {
UdpRecvFuture {
udp_index: self.udp_index,
}
}
pub fn with_datagram<F, R>(&self, f: F) -> UdpWithDatagramFuture<F, R>
where
F: FnMut(&[u8], SocketAddr) -> R + Unpin,
{
UdpWithDatagramFuture {
udp_index: self.udp_index,
f: Some(f),
_marker: std::marker::PhantomData,
}
}
pub fn recv_batch_timed<F>(&self, max: usize, f: F) -> UdpRecvBatchTimedFuture<F>
where
F: FnMut(&[u8], SocketAddr, Instant) + Unpin,
{
debug_assert!(max > 0, "recv_batch_timed max must be at least 1");
UdpRecvBatchTimedFuture {
udp_index: self.udp_index,
max,
f: Some(f),
}
}
pub fn recv_batch<F>(&self, max: usize, f: F) -> UdpRecvBatchFuture<F>
where
F: FnMut(&[u8], SocketAddr) + Unpin,
{
debug_assert!(max > 0, "recv_batch max must be at least 1");
UdpRecvBatchFuture {
udp_index: self.udp_index,
max,
f: Some(f),
}
}
pub fn send_ready(&self) -> UdpSendReadyFuture {
UdpSendReadyFuture {
udp_index: self.udp_index,
}
}
#[cfg(has_io_uring)]
pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
with_state(|driver, _executor| driver.udp_send_to(self.udp_index, peer, data, None))
}
#[cfg(not(has_io_uring))]
pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
with_state(|driver, _executor| {
let idx = self.udp_index as usize;
if idx >= driver.udp_sockets.len() {
return Err(crate::error::UdpSendError::Io(io::Error::other(
"invalid UDP socket index",
)));
}
match driver.udp_sockets[idx].send_to(data, peer) {
Ok(_) => {
crate::metrics::UDP.increment(crate::metrics::udp::DATAGRAMS_SENT);
Ok(())
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
Err(crate::error::UdpSendError::PoolExhausted)
}
Err(e) => Err(crate::error::UdpSendError::Io(e)),
}
})
}
#[cfg(has_io_uring)]
pub fn send_to_gso(
&self,
peer: SocketAddr,
data: &[u8],
segment_size: u16,
) -> Result<(), crate::error::UdpSendError> {
with_state(|driver, _executor| {
driver.udp_send_to(self.udp_index, peer, data, Some(segment_size))
})
}
#[cfg(not(has_io_uring))]
pub fn send_to_gso(
&self,
peer: SocketAddr,
data: &[u8],
segment_size: u16,
) -> Result<(), crate::error::UdpSendError> {
if segment_size == 0 || (segment_size as usize) > data.len() {
return Err(crate::error::UdpSendError::Io(io::Error::new(
io::ErrorKind::InvalidInput,
"GSO segment_size invalid for data length",
)));
}
let mut offset = 0;
while offset < data.len() {
let end = (offset + segment_size as usize).min(data.len());
self.send_to(peer, &data[offset..end])?;
offset = end;
}
Ok(())
}
}
pub struct UdpRecvFuture {
udp_index: u32,
}
impl Future for UdpRecvFuture {
type Output = (Vec<u8>, SocketAddr);
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
with_state(|driver, executor| {
let idx = self.udp_index as usize;
if idx < executor.udp_recv_queues.len()
&& let Some(entry) = executor.udp_recv_queues[idx].pop_front()
{
let bid = entry.bid_to_release();
let owned = entry.into_owned();
#[cfg(has_io_uring)]
if let Some(bid) = bid {
driver.udp_pending_replenish.push(bid);
}
#[cfg(not(has_io_uring))]
let _ = (bid, driver);
return Poll::Ready(owned);
}
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_recv_waiters.len() {
debug_assert!(
executor.udp_recv_waiters[idx].is_none_or(|t| t == task_id),
"two distinct tasks awaiting recv_from on UdpCtx index {idx}; \
UdpCtx::recv_from supports a single consumer per socket"
);
executor.udp_recv_waiters[idx] = Some(task_id);
}
Poll::Pending
})
}
}
pub struct UdpWithDatagramFuture<F, R>
where
F: FnMut(&[u8], SocketAddr) -> R + Unpin,
{
udp_index: u32,
f: Option<F>,
_marker: std::marker::PhantomData<fn() -> R>,
}
impl<F, R> Future for UdpWithDatagramFuture<F, R>
where
F: FnMut(&[u8], SocketAddr) -> R + Unpin,
{
type Output = R;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
let this = self.get_mut();
with_state(|driver, executor| {
let idx = this.udp_index as usize;
if idx < executor.udp_recv_queues.len()
&& let Some(entry) = executor.udp_recv_queues[idx].pop_front()
{
let bid = entry.bid_to_release();
let f = this
.f
.as_mut()
.expect("UdpWithDatagramFuture polled after Ready");
let r = f(entry.data(), entry.peer);
this.f.take();
drop(entry);
#[cfg(has_io_uring)]
if let Some(bid) = bid {
driver.udp_pending_replenish.push(bid);
}
#[cfg(not(has_io_uring))]
let _ = (bid, driver);
return Poll::Ready(r);
}
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_recv_waiters.len() {
debug_assert!(
executor.udp_recv_waiters[idx].is_none_or(|t| t == task_id),
"two distinct tasks awaiting recv on UdpCtx index {idx}; \
UdpCtx::with_datagram supports a single consumer per socket"
);
executor.udp_recv_waiters[idx] = Some(task_id);
}
Poll::Pending
})
}
}
pub struct UdpRecvBatchFuture<F>
where
F: FnMut(&[u8], SocketAddr) + Unpin,
{
udp_index: u32,
max: usize,
f: Option<F>,
}
impl<F> Future for UdpRecvBatchFuture<F>
where
F: FnMut(&[u8], SocketAddr) + Unpin,
{
type Output = usize;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
let this = self.get_mut();
with_state(|driver, executor| {
let idx = this.udp_index as usize;
if idx >= executor.udp_recv_queues.len() || executor.udp_recv_queues[idx].is_empty() {
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_recv_waiters.len() {
debug_assert!(
executor.udp_recv_waiters[idx].is_none_or(|t| t == task_id),
"two distinct tasks awaiting recv on UdpCtx index {idx}; \
UdpCtx::recv_batch supports a single consumer per socket"
);
executor.udp_recv_waiters[idx] = Some(task_id);
}
return Poll::Pending;
}
let f = this
.f
.as_mut()
.expect("UdpRecvBatchFuture polled after Ready");
let mut drained: usize = 0;
while drained < this.max
&& let Some(entry) = executor.udp_recv_queues[idx].pop_front()
{
let bid = entry.bid_to_release();
let peer = entry.peer;
entry.for_each_segment(|seg| f(seg, peer));
drop(entry);
#[cfg(has_io_uring)]
if let Some(bid) = bid {
driver.udp_pending_replenish.push(bid);
}
#[cfg(not(has_io_uring))]
let _ = bid;
drained += 1;
}
this.f.take();
#[cfg(not(has_io_uring))]
let _ = driver;
Poll::Ready(drained)
})
}
}
pub struct UdpRecvBatchTimedFuture<F>
where
F: FnMut(&[u8], SocketAddr, Instant) + Unpin,
{
udp_index: u32,
max: usize,
f: Option<F>,
}
impl<F> Future for UdpRecvBatchTimedFuture<F>
where
F: FnMut(&[u8], SocketAddr, Instant) + Unpin,
{
type Output = usize;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
let this = self.get_mut();
with_state(|driver, executor| {
let idx = this.udp_index as usize;
if idx >= executor.udp_recv_queues.len() || executor.udp_recv_queues[idx].is_empty() {
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_recv_waiters.len() {
debug_assert!(
executor.udp_recv_waiters[idx].is_none_or(|t| t == task_id),
"two distinct tasks awaiting recv on UdpCtx index {idx}; \
UdpCtx::recv_batch_timed supports a single consumer per socket"
);
executor.udp_recv_waiters[idx] = Some(task_id);
}
return Poll::Pending;
}
let f = this
.f
.as_mut()
.expect("UdpRecvBatchTimedFuture polled after Ready");
let mut drained: usize = 0;
while drained < this.max
&& let Some(entry) = executor.udp_recv_queues[idx].pop_front()
{
let bid = entry.bid_to_release();
let recv_at = entry.recv_at;
let peer = entry.peer;
entry.for_each_segment(|seg| f(seg, peer, recv_at));
drop(entry);
#[cfg(has_io_uring)]
if let Some(bid) = bid {
driver.udp_pending_replenish.push(bid);
}
#[cfg(not(has_io_uring))]
let _ = bid;
drained += 1;
}
this.f.take();
#[cfg(not(has_io_uring))]
let _ = driver;
Poll::Ready(drained)
})
}
}
pub struct UdpSendReadyFuture {
#[cfg_attr(not(has_io_uring), allow(dead_code))]
udp_index: u32,
}
impl Future for UdpSendReadyFuture {
type Output = ();
#[cfg(has_io_uring)]
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
let idx = self.udp_index as usize;
if idx < driver.udp_sockets.len() && !driver.udp_sockets[idx].send_freelist.is_empty() {
return Poll::Ready(());
}
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_send_ready_waiters.len() {
debug_assert!(
executor.udp_send_ready_waiters[idx].is_none_or(|t| t == task_id),
"two distinct tasks awaiting send_ready on UdpCtx index {idx}; \
UdpCtx::send_ready supports a single consumer per socket"
);
executor.udp_send_ready_waiters[idx] = Some(task_id);
}
Poll::Pending
})
}
#[cfg(not(has_io_uring))]
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}