use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use bytes::Bytes;
use crate::completion::{OpTag, UserData};
use crate::driver::Driver;
use crate::error::TimerExhausted;
use crate::handler::ConnToken;
use crate::runtime::task::TaskId;
use crate::runtime::waker::STANDALONE_BIT;
use crate::runtime::{CURRENT_TASK_ID, Executor, IoResult, TimerSlotPool};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParseResult {
Consumed(usize),
NeedMore,
}
pub(crate) struct DriverState {
pub(crate) driver: *mut Driver,
pub(crate) executor: *mut Executor,
}
thread_local! {
pub(crate) static CURRENT_DRIVER: Cell<*mut DriverState> =
const { Cell::new(std::ptr::null_mut()) };
}
pub(crate) fn set_driver_state(state: *mut DriverState) {
CURRENT_DRIVER.with(|c| c.set(state));
}
pub(crate) fn clear_driver_state() {
CURRENT_DRIVER.with(|c| c.set(std::ptr::null_mut()));
}
fn with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> R {
let ptr = CURRENT_DRIVER.with(|c| c.get());
assert!(!ptr.is_null(), "called outside executor");
let state = unsafe { &mut *ptr };
let driver = unsafe { &mut *state.driver };
let executor = unsafe { &mut *state.executor };
f(driver, executor)
}
fn try_with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> Option<R> {
let ptr = CURRENT_DRIVER.with(|c| c.get());
if ptr.is_null() {
return None;
}
let state = unsafe { &mut *ptr };
let driver = unsafe { &mut *state.driver };
let executor = unsafe { &mut *state.executor };
Some(f(driver, executor))
}
pub fn spawn(future: impl Future<Output = ()> + 'static) -> io::Result<TaskId> {
try_with_state(
|_driver, executor| match executor.standalone_slab.spawn(Box::pin(future)) {
Some(idx) => {
executor.ready_queue.push_back(idx | STANDALONE_BIT);
Ok(TaskId(idx))
}
None => Err(io::Error::other("standalone task slab exhausted")),
},
)
.unwrap_or_else(|| Err(io::Error::other("called outside executor")))
}
impl TaskId {
pub fn cancel(self) {
with_state(|_driver, executor| {
executor.standalone_slab.remove(self.0);
});
}
}
pub fn connect(addr: SocketAddr) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect(addr)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_with_timeout(addr: SocketAddr, timeout_ms: u64) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_with_timeout(addr, timeout_ms)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
#[cfg(feature = "tls")]
pub fn connect_tls(addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls(addr, server_name)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
#[cfg(feature = "tls")]
pub fn connect_tls_with_timeout(
addr: SocketAddr,
server_name: &str,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls_with_timeout(addr, server_name, timeout_ms)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn request_shutdown() -> io::Result<()> {
try_with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.request_shutdown();
})
.ok_or_else(|| io::Error::other("called outside executor"))
}
#[derive(Clone, Copy)]
pub struct ConnCtx {
pub(crate) conn_index: u32,
pub(crate) generation: u32,
}
impl ConnCtx {
pub(crate) fn new(conn_index: u32, generation: u32) -> Self {
ConnCtx {
conn_index,
generation,
}
}
pub fn index(&self) -> usize {
self.conn_index as usize
}
pub fn token(&self) -> ConnToken {
ConnToken::new(self.conn_index, self.generation)
}
pub fn with_data<F: FnMut(&[u8]) -> ParseResult>(&self, f: F) -> WithDataFuture<F> {
WithDataFuture {
conn_index: self.conn_index,
f: Some(f),
}
}
pub fn with_bytes<F: FnMut(Bytes) -> ParseResult>(&self, f: F) -> WithBytesFuture<F> {
WithBytesFuture {
conn_index: self.conn_index,
f: Some(f),
}
}
pub unsafe fn set_recv_sink(&self, target: *mut u8, len: usize) {
with_state(|_driver, executor| {
executor.recv_sinks[self.conn_index as usize] = Some(crate::runtime::RecvSink {
ptr: target,
cap: len,
pos: 0,
});
});
}
pub fn take_recv_sink(&self) -> usize {
with_state(
|_driver, executor| match executor.recv_sinks[self.conn_index as usize].take() {
Some(sink) => sink.pos,
None => 0,
},
)
}
pub fn recv_ready(&self) -> RecvReadyFuture {
RecvReadyFuture {
conn_index: self.conn_index,
}
}
pub fn try_with_data<F: FnOnce(&[u8]) -> ParseResult>(&self, f: F) -> Option<ParseResult> {
with_state(|driver, _executor| {
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
return None;
}
let result = f(data);
if let ParseResult::Consumed(consumed) = result {
driver.accumulators.consume(self.conn_index, consumed);
}
Some(result)
})
}
pub fn send_nowait(&self, data: &[u8]) -> io::Result<()> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.send(self.token(), data)
})
}
pub fn send_parts(&self) -> AsyncSendBuilder {
AsyncSendBuilder {
token: self.token(),
}
}
pub fn send(&self, data: &[u8]) -> io::Result<SendFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
ctx.send(self.token(), data)?;
executor.send_waiters[self.conn_index as usize] = true;
Ok(SendFuture {
conn_index: self.conn_index,
})
})
}
pub fn connect(&self, addr: SocketAddr) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect(addr)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn connect_with_timeout(
&self,
addr: SocketAddr,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_with_timeout(addr, timeout_ms)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn send_chain_nowait<F, R>(&self, f: F) -> R
where
F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> R,
{
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let token = ConnToken::new(self.conn_index, self.generation);
let builder = ctx.send_chain(token);
f(builder)
})
}
pub fn send_chain<F>(&self, f: F) -> io::Result<SendFuture>
where
F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ConnToken::new(self.conn_index, self.generation);
let builder = ctx.send_chain(token);
f(builder)?;
executor.send_waiters[self.conn_index as usize] = true;
Ok(SendFuture {
conn_index: self.conn_index,
})
})
}
pub fn shutdown_write(&self) {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.shutdown_write(self.token());
})
}
pub fn cancel(&self) -> io::Result<()> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.cancel(self.token())
})
}
pub fn request_shutdown(&self) {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.request_shutdown();
})
}
#[cfg(feature = "tls")]
pub fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
with_state(|driver, _| {
let ctx = driver.make_ctx();
ctx.tls_info(self.token())
})
}
#[cfg(feature = "tls")]
pub fn connect_tls(&self, addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls(addr, server_name)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
#[cfg(feature = "tls")]
pub fn connect_tls_with_timeout(
&self,
addr: SocketAddr,
server_name: &str,
timeout_ms: u64,
) -> io::Result<ConnectFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let token = ctx
.connect_tls_with_timeout(addr, server_name, timeout_ms)
.map_err(|e| io::Error::other(e.to_string()))?;
let calling_task = CURRENT_TASK_ID.with(|c| c.get());
executor.owner_task[token.index as usize] = Some(calling_task);
executor.connect_waiters[token.index as usize] = true;
Ok(ConnectFuture {
conn_index: token.index,
generation: token.generation,
})
})
}
pub fn close(&self) {
let ptr = CURRENT_DRIVER.with(|c| c.get());
if ptr.is_null() {
return;
}
let state = unsafe { &mut *ptr };
let driver = unsafe { &mut *state.driver };
driver.close_connection(self.conn_index);
}
pub fn peer_addr(&self) -> Option<SocketAddr> {
with_state(|driver, _| {
let conn = driver.connections.get(self.conn_index)?;
if conn.generation != self.generation {
return None;
}
conn.peer_addr
})
}
pub fn is_outbound(&self) -> bool {
with_state(|driver, _| {
driver
.connections
.get(self.conn_index)
.map(|cs| cs.generation == self.generation && cs.outbound)
.unwrap_or(false)
})
}
}
pub struct AsyncSendBuilder {
token: ConnToken,
}
impl AsyncSendBuilder {
pub fn build<F>(self, f: F) -> io::Result<()>
where
F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let builder = ctx.send_parts(self.token);
f(builder)
})
}
pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
where
F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
{
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let builder = ctx.send_parts(self.token);
f(builder)?;
let conn_index = self.token.index;
executor.send_waiters[conn_index as usize] = true;
Ok(SendFuture { conn_index })
})
}
pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
use crate::handler::SendPart;
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
let mut builder = ctx.send_parts(self.token);
let mut consumed = 0usize;
for part in parts {
match part {
SendPart::Copy(data) => {
builder = builder.copy(data);
}
SendPart::Guard(guard) => {
builder = builder.guard(guard);
}
}
consumed += 1;
}
if consumed == 0 {
return Ok(0);
}
builder.submit()?;
Ok(consumed)
})
}
pub fn submit_batch_await(
self,
parts: Vec<crate::handler::SendPart<'_>>,
) -> io::Result<(usize, SendFuture)> {
use crate::handler::SendPart;
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let mut builder = ctx.send_parts(self.token);
let mut consumed = 0usize;
for part in parts {
match part {
SendPart::Copy(data) => {
builder = builder.copy(data);
}
SendPart::Guard(guard) => {
builder = builder.guard(guard);
}
}
consumed += 1;
}
if consumed == 0 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty batch"));
}
builder.submit()?;
let conn_index = self.token.index;
executor.send_waiters[conn_index as usize] = true;
Ok((consumed, SendFuture { conn_index }))
})
}
}
pub struct WithDataFuture<F> {
conn_index: u32,
f: Option<F>,
}
impl<F: FnMut(&[u8]) -> ParseResult + Unpin> Future for WithDataFuture<F> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
with_state(|driver, executor| {
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true); if is_closed {
let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
let result = f(&[]);
self.f.take();
return Poll::Ready(match result {
ParseResult::Consumed(n) => n,
ParseResult::NeedMore => 0,
});
}
executor.recv_waiters[self.conn_index as usize] = true;
return Poll::Pending;
}
let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
let result = f(data);
match result {
ParseResult::Consumed(consumed) if consumed > 0 => {
driver.accumulators.consume(self.conn_index, consumed);
self.f.take();
return Poll::Ready(consumed);
}
_ => {}
}
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
self.f.take();
return Poll::Ready(0);
}
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct WithBytesFuture<F> {
conn_index: u32,
f: Option<F>,
}
impl<F: FnMut(Bytes) -> ParseResult + Unpin> Future for WithBytesFuture<F> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
with_state(|driver, executor| {
let data = driver.accumulators.data(self.conn_index);
if data.is_empty() {
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
let result = f(Bytes::new());
self.f.take();
return Poll::Ready(match result {
ParseResult::Consumed(n) => n,
ParseResult::NeedMore => 0,
});
}
executor.recv_waiters[self.conn_index as usize] = true;
return Poll::Pending;
}
let frozen = driver.accumulators.take_frozen(self.conn_index);
let len = frozen.len();
let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
let result = f(frozen.clone());
match result {
ParseResult::Consumed(consumed) if consumed > 0 => {
if consumed < len {
driver
.accumulators
.prepend(self.conn_index, &frozen[consumed..]);
}
self.f.take();
return Poll::Ready(consumed);
}
_ => {}
}
driver.accumulators.prepend(self.conn_index, &frozen[..]);
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
self.f.take();
return Poll::Ready(0);
}
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct RecvReadyFuture {
conn_index: u32,
}
impl Future for RecvReadyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
if let Some(sink) = &executor.recv_sinks[self.conn_index as usize]
&& sink.pos > 0
{
return Poll::Ready(());
}
if !driver.accumulators.data(self.conn_index).is_empty() {
return Poll::Ready(());
}
let is_closed = driver
.connections
.get(self.conn_index)
.map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
.unwrap_or(true);
if is_closed {
return Poll::Ready(());
}
executor.recv_waiters[self.conn_index as usize] = true;
Poll::Pending
})
}
}
pub struct SendFuture {
conn_index: u32,
}
impl Future for SendFuture {
type Output = io::Result<u32>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u32>> {
with_state(|_driver, executor| {
match executor.io_results[self.conn_index as usize].take() {
Some(IoResult::Send(result)) => Poll::Ready(result),
_ => {
executor.send_waiters[self.conn_index as usize] = true;
Poll::Pending
}
}
})
}
}
pub struct ConnectFuture {
conn_index: u32,
generation: u32,
}
impl Future for ConnectFuture {
type Output = io::Result<ConnCtx>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<ConnCtx>> {
with_state(|_driver, executor| {
match executor.io_results[self.conn_index as usize].take() {
Some(IoResult::Connect(result)) => match result {
Ok(()) => Poll::Ready(Ok(ConnCtx::new(self.conn_index, self.generation))),
Err(e) => Poll::Ready(Err(e)),
},
_ => {
executor.connect_waiters[self.conn_index as usize] = true;
Poll::Pending
}
}
})
}
}
pub fn sleep(duration: Duration) -> SleepFuture {
SleepFuture {
duration,
timer_slot: None,
generation: 0,
absolute: None,
}
}
pub struct SleepFuture {
duration: Duration,
timer_slot: Option<u32>,
generation: u16,
absolute: Option<Deadline>,
}
impl Future for SleepFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
with_state(|driver, executor| {
if let Some(slot) = self.timer_slot {
if executor.timer_pool.is_fired(slot) {
executor.timer_pool.release(slot);
self.timer_slot = None;
return Poll::Ready(());
}
return Poll::Pending;
}
let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = executor
.timer_pool
.allocate(waker_id)
.expect("timer slot pool exhausted");
let is_absolute = self.absolute.is_some();
if let Some(deadline) = self.absolute {
executor.timer_pool.timespecs[slot as usize] = io_uring::types::Timespec::new()
.sec(deadline.secs)
.nsec(deadline.nsecs);
} else {
let secs = self.duration.as_secs();
let nsecs = self.duration.subsec_nanos();
executor.timer_pool.timespecs[slot as usize] =
io_uring::types::Timespec::new().sec(secs).nsec(nsecs);
}
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr =
&executor.timer_pool.timespecs[slot as usize] as *const io_uring::types::Timespec;
let submit_result = if is_absolute {
driver.ring.submit_timeout_abs(ts_ptr, ud)
} else {
driver.ring.submit_timeout(ts_ptr, ud)
};
if let Err(_e) = submit_result {
executor.timer_pool.release(slot);
return Poll::Ready(());
}
self.timer_slot = Some(slot);
self.generation = generation;
Poll::Pending
})
}
}
impl Drop for SleepFuture {
fn drop(&mut self) {
if let Some(slot) = self.timer_slot {
let ptr = CURRENT_DRIVER.with(|c| c.get());
if ptr.is_null() {
return;
}
let state = unsafe { &mut *ptr };
let driver = unsafe { &mut *state.driver };
let executor = unsafe { &mut *state.executor };
if !executor.timer_pool.is_fired(slot) {
let payload = TimerSlotPool::encode_payload(slot, self.generation);
let target_ud = UserData::encode(OpTag::Timer, 0, payload);
let _ = driver.ring.submit_async_cancel(target_ud.raw(), 0);
}
executor.timer_pool.release(slot);
}
}
}
pub fn try_sleep(duration: Duration) -> Result<SleepFuture, TimerExhausted> {
with_state(|driver, executor| {
let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
crate::metrics::TIMER_POOL_EXHAUSTED.increment();
TimerExhausted
})?;
let secs = duration.as_secs();
let nsecs = duration.subsec_nanos();
executor.timer_pool.timespecs[slot as usize] =
io_uring::types::Timespec::new().sec(secs).nsec(nsecs);
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr =
&executor.timer_pool.timespecs[slot as usize] as *const io_uring::types::Timespec;
if let Err(_e) = driver.ring.submit_timeout(ts_ptr, ud) {
executor.timer_pool.release(slot);
return Ok(SleepFuture {
duration,
timer_slot: None,
generation: 0,
absolute: None,
});
}
Ok(SleepFuture {
duration,
timer_slot: Some(slot),
generation,
absolute: None,
})
})
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Deadline {
pub(crate) secs: u64,
pub(crate) nsecs: u32,
}
impl Deadline {
pub fn now() -> Self {
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe {
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
}
Deadline {
secs: ts.tv_sec as u64,
nsecs: ts.tv_nsec as u32,
}
}
pub fn after(duration: Duration) -> Self {
let now = Self::now();
let mut secs = now.secs + duration.as_secs();
let mut nsecs = now.nsecs + duration.subsec_nanos();
if nsecs >= 1_000_000_000 {
nsecs -= 1_000_000_000;
secs += 1;
}
Deadline { secs, nsecs }
}
pub fn remaining(&self) -> Duration {
let now = Self::now();
if now.secs > self.secs || (now.secs == self.secs && now.nsecs >= self.nsecs) {
return Duration::ZERO;
}
let mut secs = self.secs - now.secs;
let nsecs = if self.nsecs >= now.nsecs {
self.nsecs - now.nsecs
} else {
secs -= 1;
1_000_000_000 + self.nsecs - now.nsecs
};
Duration::new(secs, nsecs)
}
}
pub fn sleep_until(deadline: Deadline) -> SleepFuture {
SleepFuture {
duration: Duration::ZERO, timer_slot: None,
generation: 0,
absolute: Some(deadline),
}
}
pub fn try_sleep_until(deadline: Deadline) -> Result<SleepFuture, TimerExhausted> {
with_state(|driver, executor| {
let waker_id = CURRENT_TASK_ID.with(|c| c.get());
let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
crate::metrics::TIMER_POOL_EXHAUSTED.increment();
TimerExhausted
})?;
executor.timer_pool.timespecs[slot as usize] = io_uring::types::Timespec::new()
.sec(deadline.secs)
.nsec(deadline.nsecs);
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr =
&executor.timer_pool.timespecs[slot as usize] as *const io_uring::types::Timespec;
if let Err(_e) = driver.ring.submit_timeout_abs(ts_ptr, ud) {
executor.timer_pool.release(slot);
return Ok(SleepFuture {
duration: Duration::ZERO,
timer_slot: None,
generation: 0,
absolute: Some(deadline),
});
}
Ok(SleepFuture {
duration: Duration::ZERO,
timer_slot: Some(slot),
generation,
absolute: Some(deadline),
})
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Elapsed;
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("deadline has elapsed")
}
}
impl std::error::Error for Elapsed {}
pub fn timeout<F: Future>(duration: Duration, future: F) -> TimeoutFuture<F> {
TimeoutFuture {
future,
sleep: sleep(duration),
}
}
pub fn try_timeout<F: Future>(
duration: Duration,
future: F,
) -> Result<TimeoutFuture<F>, TimerExhausted> {
let sleep = try_sleep(duration)?;
Ok(TimeoutFuture { future, sleep })
}
pub fn timeout_at<F: Future>(deadline: Deadline, future: F) -> TimeoutFuture<F> {
TimeoutFuture {
future,
sleep: sleep_until(deadline),
}
}
pub fn try_timeout_at<F: Future>(
deadline: Deadline,
future: F,
) -> Result<TimeoutFuture<F>, TimerExhausted> {
let sleep = try_sleep_until(deadline)?;
Ok(TimeoutFuture { future, sleep })
}
pin_project_lite::pin_project! {
pub struct TimeoutFuture<F> {
#[pin]
future: F,
#[pin]
sleep: SleepFuture,
}
}
impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, Elapsed>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(output) = this.future.poll(cx) {
return Poll::Ready(Ok(output));
}
if let Poll::Ready(()) = this.sleep.poll(cx) {
return Poll::Ready(Err(Elapsed));
}
Poll::Pending
}
}
pub struct DiskIoFuture {
seq: u32,
}
impl Future for DiskIoFuture {
type Output = io::Result<i32>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<i32>> {
with_state(|_driver, executor| {
match executor.disk_io_results.remove(&self.seq) {
Some(result) if result < 0 => {
Poll::Ready(Err(io::Error::from_raw_os_error(-result)))
}
Some(result) => Poll::Ready(Ok(result)),
None => {
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(self.seq, task_id);
Poll::Pending
}
}
})
}
}
pub fn open_direct_io_file(path: &str) -> io::Result<crate::direct_io::DirectIoFile> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.open_direct_io_file(path)
})
}
pub fn open_nvme_device(path: &str, nsid: u32) -> io::Result<crate::nvme::NvmeDevice> {
with_state(|driver, _| {
let mut ctx = driver.make_ctx();
ctx.open_nvme_device(path, nsid)
})
}
pub unsafe fn direct_io_read(
file: crate::direct_io::DirectIoFile,
offset: u64,
buf: *mut u8,
len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = unsafe { ctx.direct_io_read(file, offset, buf, len)? };
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub fn nvme_read(
device: crate::nvme::NvmeDevice,
lba: u64,
num_blocks: u16,
buf_addr: u64,
buf_len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = ctx.nvme_read(device, lba, num_blocks, buf_addr, buf_len)?;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub unsafe fn direct_io_write(
file: crate::direct_io::DirectIoFile,
offset: u64,
buf: *const u8,
len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = unsafe { ctx.direct_io_write(file, offset, buf, len)? };
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
pub fn nvme_write(
device: crate::nvme::NvmeDevice,
lba: u64,
num_blocks: u16,
buf_addr: u64,
buf_len: u32,
) -> io::Result<DiskIoFuture> {
with_state(|driver, executor| {
let mut ctx = driver.make_ctx();
let seq = ctx.nvme_write(device, lba, num_blocks, buf_addr, buf_len)?;
let task_id = CURRENT_TASK_ID.with(|c| c.get());
executor.disk_io_waiters.insert(seq, task_id);
Ok(DiskIoFuture { seq })
})
}
#[derive(Clone, Copy)]
pub struct UdpCtx {
pub(crate) udp_index: u32,
}
impl UdpCtx {
pub fn index(&self) -> usize {
self.udp_index as usize
}
pub fn recv_from(&self) -> UdpRecvFuture {
UdpRecvFuture {
udp_index: self.udp_index,
}
}
pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
with_state(|driver, _executor| driver.udp_send_to(self.udp_index, peer, data))
}
}
pub struct UdpRecvFuture {
udp_index: u32,
}
impl Future for UdpRecvFuture {
type Output = (Vec<u8>, SocketAddr);
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
with_state(|_driver, executor| {
let idx = self.udp_index as usize;
if idx < executor.udp_recv_queues.len()
&& let Some(datagram) = executor.udp_recv_queues[idx].pop_front()
{
return Poll::Ready(datagram);
}
let task_id = CURRENT_TASK_ID.with(|c| c.get());
if idx < executor.udp_recv_waiters.len() {
executor.udp_recv_waiters[idx] = Some(task_id);
}
Poll::Pending
})
}
}