use std::future::Future;
use std::io::IoSlice;
use std::marker::PhantomData;
use std::mem::{size_of, size_of_val};
use std::net::{SocketAddr, SocketAddrV6};
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Instant;
use futures::future::FusedFuture;
use futures::{FutureExt, Stream, StreamExt};
use libc::{AF_INET, AF_INET6, AF_UNIX, sa_family_t, sockaddr_in, sockaddr_in6, sockaddr_un};
use rustix::fs::AtFlags;
use rustix::io_uring::{Mode, MsgHdr, SocketAddrOpaque, iovec};
pub use rustix::fd::OwnedFd;
pub use rustix::io_uring::Advice;
pub use rustix::net::{AddressFamily, Protocol, SocketType, ipproto};
pub use rustix::net::{RecvFlags, SendFlags, SocketAddrUnix};
pub use rustix_uring::types::{OFlags, Statx};
use crate::async_event::TaskSource;
use crate::io_type::IOType;
use crate::ring_future::{OwnedFdFuture, UnitFuture, UsizeFuture};
use crate::task::{IoScopeCompletions, Task, TaskReadyState, TaskState};
use crate::task_ref::wake_task;
use crate::tracing::Events;
use crate::{
CanceledError, Completion, CompletionResources, CompletionState, Errno, TaskHandleError,
};
use rustix::fd::{AsFd, AsRawFd};
use rustix_uring::{
opcode,
types::{Fd, Timespec},
};
use std::{ffi::CStr, mem::ManuallyDrop, net::SocketAddrV4, time::Duration};
#[cfg(feature = "io_uring_cmd")]
use crate::ring_future::UringCmdFuture;
pub trait OperationFuture<T>: Future<Output = Result<T, Errno>> + FusedFuture {}
impl<T, R: Future<Output = Result<T, Errno>> + FusedFuture> OperationFuture<T> for R {}
pub fn open(filename: &CStr, flags: OFlags, mode: Mode) -> OwnedFdFuture<'_> {
let dirfd = Fd(libc::AT_FDCWD);
OwnedFdFuture::new(
opcode::OpenAt::new(dirfd, filename.as_ptr())
.flags(flags)
.mode(mode)
.build(),
-1,
None,
IOType::Open,
)
}
pub fn link<'a>(oldpath: &'a CStr, newpath: &'a CStr) -> UnitFuture<'a> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::LinkAt::new(dirfd, oldpath.as_ptr(), dirfd, newpath.as_ptr()).build(),
-1,
None,
IOType::Link,
)
}
pub fn symlink<'a>(target: &'a CStr, linkpath: &'a CStr) -> UnitFuture<'a> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::SymlinkAt::new(dirfd, target.as_ptr(), linkpath.as_ptr()).build(),
-1,
None,
IOType::Symlink,
)
}
pub fn mkdir(pathname: &CStr, mode: Mode) -> UnitFuture<'_> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::MkDirAt::new(dirfd, pathname.as_ptr())
.mode(mode)
.build(),
-1,
None,
IOType::Mkdir,
)
}
pub fn rmdir(pathname: &CStr) -> UnitFuture<'_> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::UnlinkAt::new(dirfd, pathname.as_ptr())
.flags(AtFlags::REMOVEDIR)
.build(),
-1,
None,
IOType::Unlink,
)
}
pub async fn stat(filename: &CStr) -> Result<Statx, Errno> {
let mut statx = std::mem::MaybeUninit::<Statx>::uninit();
stat_internal(filename, statx.as_mut_ptr()).await?;
Ok(unsafe { statx.assume_init() })
}
fn stat_internal<'a>(filename: &'a CStr, statx: *mut Statx) -> UnitFuture<'a> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::Statx::new(dirfd, filename.as_ptr(), statx).build(),
-1,
None,
IOType::Stat,
)
}
pub async fn fstat(fd: &impl AsFd) -> Result<Statx, Errno> {
let mut statx = std::mem::MaybeUninit::<Statx>::uninit();
fstat_internal(fd, statx.as_mut_ptr()).await?;
Ok(unsafe { statx.assume_init() })
}
fn fstat_internal<'a>(fd: &impl AsFd, statx: *mut Statx) -> UnitFuture<'a> {
let fd = Fd(fd.as_fd().as_raw_fd());
let empty_path = c"";
let statx_op = opcode::Statx::new(fd, empty_path.as_ptr(), statx)
.flags(AtFlags::EMPTY_PATH)
.build();
UnitFuture::new(statx_op, -1, None, IOType::Stat)
}
pub fn unlink(filename: &CStr) -> UnitFuture<'_> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::UnlinkAt::new(dirfd, filename.as_ptr()).build(),
-1,
None,
IOType::Unlink,
)
}
pub fn rename<'a>(oldpath: &'a CStr, newpath: &'a CStr) -> UnitFuture<'a> {
let dirfd = Fd(libc::AT_FDCWD);
UnitFuture::new(
opcode::RenameAt::new(dirfd, oldpath.as_ptr(), dirfd, newpath.as_ptr()).build(),
-1,
None,
IOType::Rename,
)
}
pub fn fadvise(fd: &impl AsFd, offset: u64, len: u64, advice: Advice) -> UnitFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(
opcode::Fadvise::new(Fd(fd), len as u32, advice)
.offset(offset)
.build(),
fd,
None,
IOType::FAdvise,
)
}
pub fn madvise(addr: *const libc::c_void, len: u64, advice: Advice) -> UnitFuture<'static> {
UnitFuture::new(
opcode::Madvise::new(addr, len as u32, advice).build(),
-1,
None,
IOType::MAdvise,
)
}
pub fn fallocate(fd: &impl AsFd, mode: i32, offset: u64, len: u64) -> UnitFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(
opcode::Fallocate::new(Fd(fd), len)
.mode(mode)
.offset(offset)
.build(),
fd,
None,
IOType::FAllocate,
)
}
pub async fn socket(
domain: AddressFamily,
socket_type: SocketType,
protocol: Option<Protocol>,
) -> Result<OwnedFd, Errno> {
let task_state = TaskState::get();
if task_state.probe.is_supported(opcode::Socket::CODE) {
let domain = u32::from(domain.as_raw()) as i32;
let socket_type = socket_type.as_raw() as i32;
let protocol = match protocol {
Some(p) => u32::from(p.as_raw()) as i32,
None => 0i32,
};
drop(task_state);
let socket_fut = OwnedFdFuture::new(
opcode::Socket::new(domain, socket_type, protocol).build(),
-1,
None,
IOType::Socket,
);
socket_fut.await
} else {
rustix::net::socket(domain, socket_type, protocol)
}
}
pub fn accept(fd: &impl AsFd) -> OwnedFdFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
OwnedFdFuture::new(
opcode::Accept::new(Fd(fd), std::ptr::null_mut(), std::ptr::null_mut()).build(),
fd,
None,
IOType::Accept,
)
}
pub fn shutdown(fd: &impl AsFd, how: i32) -> UnitFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(
opcode::Shutdown::new(Fd(fd), how).build(),
fd,
None,
IOType::Shutdown,
)
}
pub fn fsync(fd: &impl AsFd) -> UnitFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(opcode::Fsync::new(Fd(fd)).build(), fd, None, IOType::Fsync)
}
pub fn sync_file_range(fd: &impl AsFd, offset: u64, len: u32) -> UnitFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(
opcode::SyncFileRange::new(Fd(fd), len)
.offset(offset)
.build(),
fd,
None,
IOType::SyncFileRange,
)
}
pub fn bind(fd: &impl AsFd, address: &SocketAddr) -> Result<(), Errno> {
rustix::net::bind(fd, address)
}
pub fn listen(fd: &impl AsFd, backlog: i32) -> Result<(), Errno> {
rustix::net::listen(fd, backlog)
}
fn sockaddr_from_socketaddr(addr: &SocketAddrV4) -> sockaddr_in {
sockaddr_in {
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(addr.ip().octets()),
},
sin_port: u16::to_be(addr.port()),
sin_family: AF_INET as u16,
sin_zero: [0; 8],
}
}
fn sockaddr6_from_socketaddrv6(addr: &SocketAddrV6) -> sockaddr_in6 {
sockaddr_in6 {
sin6_addr: libc::in6_addr {
s6_addr: addr.ip().octets(),
},
sin6_family: AF_INET6 as u16,
sin6_flowinfo: addr.flowinfo(),
sin6_port: u16::to_be(addr.port()),
sin6_scope_id: addr.scope_id(),
}
}
fn sockaddr_from_socketaddr_unix(addr: &SocketAddrUnix) -> (sockaddr_un, usize) {
#[cfg(target_arch = "x86_64")]
let mut sun_path = [0i8; 108];
#[cfg(not(target_arch = "x86_64"))]
let mut sun_path = [0u8; 108];
let addrlen = if let Some(addr) = addr.abstract_name() {
#[cfg(target_arch = "x86_64")]
let addr = unsafe { core::slice::from_raw_parts(addr.as_ptr().cast::<i8>(), addr.len()) };
sun_path[1..1 + addr.len()].copy_from_slice(addr);
addr.len() + 1 + size_of::<sa_family_t>()
} else if let Some(addr) = addr.path() {
let addr = addr.to_bytes();
#[cfg(target_arch = "x86_64")]
let addr = unsafe { core::slice::from_raw_parts(addr.as_ptr().cast::<i8>(), addr.len()) };
sun_path[0..addr.len()].copy_from_slice(addr);
addr.len() + size_of::<sa_family_t>()
} else {
panic!("Impossible for Unix Socket Address to be neither path nor abstract name");
};
(
sockaddr_un {
sun_family: AF_UNIX as u16,
sun_path,
},
addrlen,
)
}
pub async fn connect_unix(fd: &impl AsFd, addr: &SocketAddrUnix) -> Result<(), Errno> {
let fd = fd.as_fd().as_raw_fd();
let (addr, addrlen) = sockaddr_from_socketaddr_unix(addr);
let addr = core::ptr::addr_of!(addr) as *const SocketAddrOpaque;
UnitFuture::new(
opcode::Connect::new(Fd(fd), addr, addrlen as u32).build(),
fd,
None,
IOType::Connect,
)
.await
}
pub async fn connect(fd: &impl AsFd, addr: &SocketAddr) -> Result<(), Errno> {
let fd = fd.as_fd().as_raw_fd();
match addr {
SocketAddr::V4(addr) => {
let addr = sockaddr_from_socketaddr(addr);
let addrlen = size_of_val(&addr) as u32;
let addr = core::ptr::addr_of!(addr) as *const SocketAddrOpaque;
UnitFuture::new(
opcode::Connect::new(Fd(fd), addr, addrlen).build(),
fd,
None,
IOType::Connect,
)
.await
}
SocketAddr::V6(addr) => {
let addr = sockaddr6_from_socketaddrv6(addr);
let addrlen = size_of_val(&addr) as u32;
let addr = core::ptr::addr_of!(addr) as *const SocketAddrOpaque;
UnitFuture::new(
opcode::Connect::new(Fd(fd), addr, addrlen).build(),
fd,
None,
IOType::Connect,
)
.await
}
}
}
pub fn writev<'a>(
fd: &impl AsFd,
iovec: &'a [IoSlice<'_>],
offset: Option<u64>,
) -> ErrnoOrFuture<UsizeFuture<'a>> {
writev_with_deadline(fd, iovec, offset, None)
}
pub fn writev_with_deadline<'a>(
fd: &impl AsFd,
iovec: &'a [IoSlice<'_>],
offset: Option<u64>,
deadline: Option<Instant>,
) -> ErrnoOrFuture<UsizeFuture<'a>> {
let timeout = if let Some(deadline) = deadline {
if let Some(duration) = deadline.checked_duration_since(crate::clock_now()) {
Some(duration)
} else {
return ErrnoOrFuture::Error {
errno: Errno::TIMEDOUT,
};
}
} else {
None
};
ErrnoOrFuture::Future {
fut: writev_with_timeout(fd, iovec, offset, timeout),
}
}
pub fn writev_with_timeout<'a>(
fd: &impl AsFd,
buffers: &'a [IoSlice<'_>],
offset: Option<u64>,
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
let iovec = buffers.as_ptr() as *const iovec;
UsizeFuture::new(
opcode::Writev::new(Fd(fd), iovec, buffers.len() as u32)
.offset(offset.unwrap_or(u64::MAX))
.build(),
fd,
timeout,
IOType::Write,
)
}
pub fn write<'a>(fd: &impl AsFd, buf: &'a [u8]) -> ErrnoOrFuture<UsizeFuture<'a>> {
write_with_deadline(fd, buf, None)
}
pub fn write_with_deadline<'a>(
fd: &impl AsFd,
buf: &'a [u8],
deadline: Option<Instant>,
) -> ErrnoOrFuture<UsizeFuture<'a>> {
let timeout = if let Some(deadline) = deadline {
if let Some(duration) = deadline.checked_duration_since(crate::clock_now()) {
Some(duration)
} else {
return ErrnoOrFuture::Error {
errno: Errno::TIMEDOUT,
};
}
} else {
None
};
ErrnoOrFuture::Future {
fut: write_with_timeout(fd, buf, timeout),
}
}
pub fn write_with_timeout<'a>(
fd: &impl AsFd,
buf: &'a [u8],
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::Write::new(Fd(fd), buf.as_ptr(), buf.len() as u32)
.offset(u64::MAX)
.build(),
fd,
timeout,
IOType::Write,
)
}
pub fn send<'a>(
fd: &impl AsFd,
buf: &'a [u8],
flags: SendFlags,
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::Send::new(Fd(fd), buf.as_ptr(), buf.len() as u32)
.flags(flags)
.build(),
fd,
timeout,
IOType::Send,
)
}
pub fn recv<'a>(
fd: &impl AsFd,
buf: &'a mut [u8],
flags: RecvFlags,
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::Recv::new(Fd(fd), buf.as_mut_ptr(), buf.len() as u32)
.flags(flags)
.build(),
fd,
timeout,
IOType::Recv,
)
}
pub fn recvmsg<'a>(
fd: &impl AsFd,
msghdr: &'a mut MsgHdr,
flags: RecvFlags,
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::RecvMsg::new(Fd(fd), msghdr as *mut MsgHdr)
.flags(flags)
.build(),
fd,
timeout,
IOType::Recv,
)
}
pub fn sendmsg<'a>(
fd: &impl AsFd,
msghdr: &'a mut MsgHdr,
flags: SendFlags,
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::SendMsg::new(Fd(fd), msghdr as *mut MsgHdr)
.flags(flags)
.build(),
fd,
timeout,
IOType::Send,
)
}
pub fn pwrite_polled<'a>(
fd: impl AsFd,
buf: &'a [u8],
offset: u64,
polled: bool,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::with_polled(
opcode::Write::new(Fd(fd), buf.as_ptr(), buf.len() as u32)
.offset(offset)
.build(),
fd,
None,
IOType::Write,
polled,
CompletionResources::None,
)
}
pub fn pwrite<'a>(fd: &impl AsFd, buf: &'a [u8], offset: u64) -> UsizeFuture<'a> {
pwrite_polled(fd, buf, offset, false)
}
pub fn readv<'a>(fd: &impl AsFd, iovec: &'a [iovec], offset: Option<u64>) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::Readv::new(Fd(fd), iovec.as_ptr(), iovec.len() as u32)
.offset(offset.unwrap_or(u64::MAX))
.build(),
fd,
None,
IOType::Read,
)
}
pub fn read<'a>(fd: &impl AsFd, buf: &'a mut [u8]) -> UsizeFuture<'a> {
read_with_timeout(fd, buf, None)
}
pub fn read_with_deadline<'a>(
fd: &impl AsFd,
buf: &'a mut [u8],
deadline: Option<Instant>,
) -> ErrnoOrFuture<UsizeFuture<'a>> {
let timeout = if let Some(deadline) = deadline {
if let Some(duration) = deadline.checked_duration_since(crate::clock_now()) {
Some(duration)
} else {
return ErrnoOrFuture::Error {
errno: Errno::TIMEDOUT,
};
}
} else {
None
};
ErrnoOrFuture::Future {
fut: read_with_timeout(fd, buf, timeout),
}
}
pub fn read_with_timeout<'a>(
fd: &impl AsFd,
buf: &'a mut [u8],
timeout: Option<Duration>,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::new(
opcode::Read::new(Fd(fd), buf.as_mut_ptr(), buf.len() as u32)
.offset(u64::MAX)
.build(),
fd,
timeout,
IOType::Read,
)
}
pub fn pread_polled<'a>(
fd: impl AsFd,
buf: &'a mut [u8],
offset: u64,
polled: bool,
) -> UsizeFuture<'a> {
let fd = fd.as_fd().as_raw_fd();
UsizeFuture::with_polled(
opcode::Read::new(Fd(fd), buf.as_mut_ptr(), buf.len() as u32)
.offset(offset)
.build(),
fd,
None,
IOType::Read,
polled,
CompletionResources::None,
)
}
pub fn pread<'a>(fd: &impl AsFd, buf: &'a mut [u8], offset: u64) -> UsizeFuture<'a> {
pread_polled(fd, buf, offset, false)
}
pub fn close(fd: OwnedFd) -> UnitFuture<'static> {
let fd = ManuallyDrop::new(fd);
let fd = fd.as_fd().as_raw_fd();
UnitFuture::new(opcode::Close::new(Fd(fd)).build(), fd, None, IOType::Close)
}
pub fn nop() -> UnitFuture<'static> {
UnitFuture::new(opcode::Nop::new().build(), -1, None, IOType::Nop)
}
#[cfg(feature = "io_uring_cmd")]
pub fn uring_cmd(fd: &impl AsFd, op: u32, cmd: [u8; 80]) -> UringCmdFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UringCmdFuture::new(
opcode::UringCmd80::new(Fd(fd), op).cmd(cmd).build(),
fd,
None,
IOType::NvmeCmd,
)
}
#[cfg(feature = "io_uring_cmd")]
pub fn uring_cmd_polled(fd: &impl AsFd, op: u32, cmd: [u8; 80]) -> UringCmdFuture<'_> {
let fd = fd.as_fd().as_raw_fd();
UringCmdFuture::with_polled(
opcode::UringCmd80::new(Fd(fd), op).cmd(cmd).build(),
fd,
None,
IOType::NvmeCmd,
true,
CompletionResources::None,
)
}
pub fn yield_io() -> SetYieldCpuFuture {
SetYieldCpuFuture {
state: YieldFutureState::CreatedIo,
}
}
pub fn yield_cpu() -> SetYieldCpuFuture {
SetYieldCpuFuture {
state: YieldFutureState::CreatedCpu,
}
}
#[derive(PartialEq)]
enum YieldFutureState {
CreatedIo,
CreatedCpu,
Polled,
Complete,
}
pub struct SetYieldCpuFuture {
state: YieldFutureState,
}
impl Future for SetYieldCpuFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut task_state = TaskState::get();
let current_task = task_state.get_current_task();
let current_task_state = current_task.get_state();
if current_task_state == TaskReadyState::Aborted {
panic!("Task aborted");
}
match self.state {
YieldFutureState::CreatedIo => {
task_state.schedule_io(current_task);
drop(task_state);
self.get_mut().state = YieldFutureState::Polled;
cx.waker().wake_by_ref();
Poll::Pending
}
YieldFutureState::CreatedCpu => {
task_state.schedule_cpu(current_task);
drop(task_state);
self.get_mut().state = YieldFutureState::Polled;
cx.waker().wake_by_ref();
Poll::Pending
}
YieldFutureState::Polled => {
self.get_mut().state = YieldFutureState::Complete;
Poll::Ready(())
}
YieldFutureState::Complete => panic!("Do not poll completed futures"),
}
}
}
impl FusedFuture for SetYieldCpuFuture {
fn is_terminated(&self) -> bool {
self.state == YieldFutureState::Complete
}
}
fn map_timeout_poll(poll: Poll<Result<(), Errno>>) -> Poll<Result<(), Errno>> {
match poll {
Poll::Ready(Err(Errno::TIME)) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_)) => panic!("Timeout should never return Ok"),
Poll::Pending => Poll::Pending,
}
}
#[cfg(feature = "virtual-clock")]
fn try_virtual_sleep(deadline: Instant) -> Option<SleepFuture<'static>> {
let task_state = TaskState::get();
if task_state.clock.is_some() {
Some(SleepFuture {
inner: SleepFutureInner::Virtual(crate::virtual_clock::VirtualSleepFuture::new(
deadline,
)),
})
} else {
None
}
}
#[cfg(feature = "virtual-clock")]
pub async fn poll_once<F: Future>(mut fut: Pin<&mut F>) -> Option<F::Output> {
futures::future::poll_fn(|cx| match fut.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(None),
Poll::Ready(val) => Poll::Ready(Some(val)),
})
.await
}
pub fn sleep(duration: Duration) -> SleepFuture<'static> {
#[cfg(feature = "virtual-clock")]
{
let now = crate::clock_now();
let deadline = now.checked_add(duration).unwrap_or_else(|| {
now + Duration::from_secs(365 * 24 * 3600 * 100) });
if let Some(vfut) = try_virtual_sleep(deadline) {
return vfut;
}
}
let timespec = Box::new(Timespec::from(duration));
let entry = opcode::Timeout::new(timespec.as_ref()).build();
let fut = UnitFuture::with_polled(
entry,
-1,
None,
IOType::Timeout,
false,
CompletionResources::Box(timespec),
);
#[cfg(feature = "virtual-clock")]
{
SleepFuture {
inner: SleepFutureInner::Real(fut),
}
}
#[cfg(not(feature = "virtual-clock"))]
{
SleepFuture { fut }
}
}
pub fn sleep_until(deadline: Instant) -> SleepFuture<'static> {
#[cfg(feature = "virtual-clock")]
{
if let Some(vfut) = try_virtual_sleep(deadline) {
return vfut;
}
}
let now = crate::clock_now();
let duration = deadline.saturating_duration_since(now);
sleep(duration)
}
pub async fn timeout_at<F: Future>(
deadline: Instant,
future: F,
) -> Result<F::Output, crate::TimeoutError> {
use futures::future::{Either, select};
use std::pin::pin;
let future = pin!(future);
let timer = pin!(sleep_until(deadline));
match select(future, timer).await {
Either::Left((result, _)) => Ok(result),
Either::Right((Ok(()), _)) => Err(crate::TimeoutError::Timeout),
Either::Right((Err(_), _)) => Err(crate::TimeoutError::Canceled),
}
}
#[cfg(not(feature = "virtual-clock"))]
pin_project_lite::pin_project! {
pub struct SleepFuture<'a> {
#[pin]
fut: UnitFuture<'a>,
}
}
#[cfg(not(feature = "virtual-clock"))]
impl<'a> SleepFuture<'a> {
pub fn cancel(self: Pin<&mut Self>) {
self.project().fut.cancel();
}
}
#[cfg(not(feature = "virtual-clock"))]
impl<'a> Future for SleepFuture<'a> {
type Output = Result<(), Errno>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
map_timeout_poll(self.project().fut.poll(cx))
}
}
#[cfg(not(feature = "virtual-clock"))]
impl<'a> FusedFuture for SleepFuture<'a> {
fn is_terminated(&self) -> bool {
self.fut.is_terminated()
}
}
#[cfg(feature = "virtual-clock")]
pub struct SleepFuture<'a> {
inner: SleepFutureInner<'a>,
}
#[cfg(feature = "virtual-clock")]
enum SleepFutureInner<'a> {
Real(UnitFuture<'a>),
Virtual(crate::virtual_clock::VirtualSleepFuture),
}
#[cfg(feature = "virtual-clock")]
impl<'a> SleepFuture<'a> {
pub fn cancel(self: Pin<&mut Self>) {
unsafe {
match &mut self.get_unchecked_mut().inner {
SleepFutureInner::Real(fut) => Pin::new_unchecked(fut).cancel(),
SleepFutureInner::Virtual(fut) => fut.cancel(),
}
}
}
}
#[cfg(feature = "virtual-clock")]
static_assertions::assert_impl_all!(crate::virtual_clock::VirtualSleepFuture: Unpin);
#[cfg(feature = "virtual-clock")]
impl<'a> Future for SleepFuture<'a> {
type Output = Result<(), Errno>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
match &mut self.get_unchecked_mut().inner {
SleepFutureInner::Real(fut) => map_timeout_poll(Pin::new_unchecked(fut).poll(cx)),
SleepFutureInner::Virtual(fut) => Pin::new(fut).poll(cx),
}
}
}
}
#[cfg(feature = "virtual-clock")]
impl<'a> FusedFuture for SleepFuture<'a> {
fn is_terminated(&self) -> bool {
match &self.inner {
SleepFutureInner::Real(fut) => fut.is_terminated(),
SleepFutureInner::Virtual(fut) => fut.is_terminated(),
}
}
}
pub async fn with_timeout_warning<F>(
future: F,
duration: Duration,
warning: impl FnOnce(),
) -> F::Output
where
F: FusedFuture,
{
use std::pin::pin;
let mut future = pin!(future);
let mut timer = pin!(sleep(duration).fuse());
futures::select! {
a = future => return a,
_ = timer => warning(),
}
future.await
}
pub fn write_event(event: Events) {
let task_state = TaskState::get();
let task_id = task_state.current_task.as_ref().unwrap().task_index;
task_state.write_event(task_id, event)
}
pub fn log(args: std::fmt::Arguments<'_>) {
let TaskIdentity {
thread_id,
task_index,
..
} = task_identity();
print!("{:?}:{} {}", thread_id, task_index, std::fmt::format(args))
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct TaskIdentity {
pub thread_id: std::thread::ThreadId,
pub thread_index: u8,
pub task_index: u16,
}
pub fn task_identity() -> TaskIdentity {
let task_state = TaskState::get();
let thread_id = task_state.get_current_thread_id();
if let Some(current_task) = task_state.current_task.as_ref() {
let task_index = current_task.task_index;
let thread_index = task_state.trace_buffer.thread_idx;
TaskIdentity {
thread_id,
thread_index,
task_index,
}
} else {
TaskIdentity {
thread_id,
thread_index: 0,
task_index: 0,
}
}
}
#[inline(always)]
pub fn get_activity_id() -> uuid::Uuid {
let task_state = TaskState::get();
task_state.get_current_activity_id()
}
#[inline(always)]
pub fn get_tenant_id() -> uuid::Uuid {
let task_state = TaskState::get();
task_state.get_current_tenant_id()
}
pub fn set_activity_id_and_tenant_id(activity_id: uuid::Uuid, tenant_id: uuid::Uuid) {
let mut task_state = TaskState::get();
task_state.set_current_activity_id_and_tenant_id(activity_id, tenant_id);
}
pub fn set_high_priority(high_priority: bool) {
let task_state = TaskState::get();
task_state
.current_task
.as_ref()
.unwrap()
.high_priority
.set(high_priority);
}
pub fn spawn_task<Fut>(future: Fut) -> TaskHandle<Fut::Output>
where
Fut: Future + 'static,
{
let mut task_state = TaskState::get();
let activity_id = task_state.get_current_activity_id();
let tenant_id = task_state.get_current_tenant_id();
let task = task_state.schedule_new(future, activity_id, tenant_id);
TaskHandle::new(task)
}
pin_project_lite::pin_project! {
pub struct TaskHandle<T: 'static> {
#[pin]
wait: crate::async_event::WaitFuture<TaskSource>,
_marker: PhantomData<T>,
}
}
impl<T: 'static> TaskHandle<T> {
pub fn new(task: Rc<Task>) -> Self {
TaskHandle {
wait: crate::async_event::WaitFuture::new(TaskSource::new(task)),
_marker: Default::default(),
}
}
pub fn wait(&self) -> WaitFuture {
WaitFuture::new(self.wait.source().unwrap().task())
}
pub fn abort(&self) {
let mut task_state = TaskState::get();
task_state.abort_task(self.wait.source().unwrap().task())
}
}
static_assertions::const_assert!(impls::impls!(TaskHandle<()>: !Clone));
impl<T> FusedFuture for TaskHandle<T> {
fn is_terminated(&self) -> bool {
self.wait.is_terminated()
}
}
impl<T> Future for TaskHandle<T> {
type Output = Result<T, TaskHandleError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().wait.poll(cx) {
Poll::Ready(source) => match source {
Ok(source) => {
let result = source.task.result::<T>().expect("Task result");
let result = result.map_err(TaskHandleError::Panic);
Poll::Ready(result)
}
Err(_canceled_error) => Poll::Ready(Err(TaskHandleError::Canceled)),
},
Poll::Pending => Poll::Pending,
}
}
}
pin_project_lite::pin_project! {
pub struct WaitFuture {
#[pin]
wait: crate::async_event::WaitFuture<TaskSource>,
}
}
impl WaitFuture {
pub fn new(task: Rc<Task>) -> Self {
let wait = crate::async_event::WaitFuture::new(TaskSource::new(task));
WaitFuture { wait }
}
}
impl FusedFuture for WaitFuture {
fn is_terminated(&self) -> bool {
self.wait.is_terminated()
}
}
impl Future for WaitFuture {
type Output = Result<(), CanceledError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().wait.poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
pub fn shutdown_loop() {
let mut task_state = TaskState::get();
task_state.shutdown();
}
fn version_prefix(version: &str) -> &str {
for (index, char) in version.char_indices() {
if !char.is_ascii_digit() && char != '.' {
return &version[..index];
}
}
version
}
fn parse_version(version: &str) -> Result<(u32, u32), std::num::ParseIntError> {
let version = version_prefix(version);
let mut version = version.split('.');
let version = [version.next(), version.next()];
let [major, minor] = version.map(|v| v.unwrap_or("").parse::<u32>());
Ok((major?, minor?))
}
pub fn kernel_version() -> (u32, u32) {
let uname = rustix::system::uname();
let version_str = uname.release().to_string_lossy();
let version = parse_version(&version_str);
version.unwrap_or((5, 15))
}
pub fn io_scope_drain_futures_void<E>(
result: Result<(), E>,
stream: impl Stream<Item = Result<(), E>>,
) -> impl Future<Output = Result<(), E>> {
io_scope_drain_futures(result, stream, |_, _| ())
}
pub async fn io_scope_drain_futures<Acc, T, E>(
mut result: Result<Acc, E>,
stream: impl Stream<Item = Result<T, E>>,
mut acc: impl FnMut(Acc, T) -> Acc,
) -> Result<Acc, E> {
if result.is_err() {
io_scope_cancel();
}
let mut stream = std::pin::pin!(stream);
while let Some(next_result) = stream.next().await {
match next_result {
Ok(value) => {
if let Ok(current_value) = result {
result = Ok(acc(current_value, value));
}
}
Err(e) => {
if result.is_ok() {
io_scope_cancel();
result = Err(e);
}
}
}
}
result
}
pub fn io_scope_cancel() {
let mut task_state = TaskState::get();
let task = task_state.get_current_task();
task_state = crate::runtime::submit_and_complete_io_all(task_state, true);
task.cancel_io_scope_completions(task_state);
}
fn io_scope_cancel_and_wait_internal(new_io_scope_completions: Option<IoScopeCompletions>) {
let mut task_state = TaskState::get();
let task = task_state.get_current_task();
let gathered_completions = task.replace_io_scope_completions(new_io_scope_completions);
if let Some(mut gathered_completions) = gathered_completions {
task_state = crate::runtime::submit_and_complete_io_all(task_state, true);
retain_incomplete(&mut gathered_completions.completions, &mut task_state);
for completion in &gathered_completions.completions {
completion.cancel(&mut task_state);
}
for wait in gathered_completions.waits.drain(..) {
wait.canceled.set(true);
wait.waker.use_mut(|waker| {
if let Some(waker) = waker {
wake_task(&mut task_state, waker)
}
});
}
while !gathered_completions.completions.is_empty() {
task_state = crate::runtime::submit_and_complete_io_all(task_state, true);
retain_incomplete(&mut gathered_completions.completions, &mut task_state);
}
}
}
pub fn io_scope<'a, T>(f: impl AsyncFnOnce() -> T + 'a) -> impl Future<Output = T> + 'a {
IoScopeFuture {
inner: CatchUnwindFuture { f: f() },
completions: IoScopeCompletionsGuard(Some(IoScopeCompletions::default())),
}
}
pin_project_lite::pin_project! {
struct CatchUnwindFuture<F: Future> {
#[pin]
f: F,
}
}
impl<F: Future> Future for CatchUnwindFuture<F> {
type Output = Result<F::Output, Box<dyn std::any::Any + Send>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match std::panic::catch_unwind(AssertUnwindSafe(|| this.f.poll(cx))) {
Ok(Poll::Ready(result)) => Poll::Ready(Ok(result)),
Ok(Poll::Pending) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
}
struct IoScopeCompletionsGuard(Option<IoScopeCompletions>);
impl Drop for IoScopeCompletionsGuard {
fn drop(&mut self) {
if let Some(completions) = self.0.take() {
if completions.completions.is_empty() && completions.waits.is_empty() {
return;
}
let task_state = TaskState::get();
let task = task_state.get_current_task();
let outer = task.replace_io_scope_completions(Some(completions));
drop(task_state);
io_scope_cancel_and_wait_internal(outer);
}
}
}
pin_project_lite::pin_project! {
struct IoScopeFuture<F: Future> {
#[pin]
inner: CatchUnwindFuture<F>,
completions: IoScopeCompletionsGuard,
}
}
impl<F: Future> Future for IoScopeFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let outer = {
let task_state = TaskState::get();
let task = task_state.get_current_task();
task.replace_io_scope_completions(this.completions.0.take())
};
let result = this.inner.poll(cx);
match result {
Poll::Ready(result) => {
io_scope_cancel_and_wait_internal(outer);
Poll::Ready(match result {
Ok(val) => val,
Err(e) => std::panic::resume_unwind(e),
})
}
Poll::Pending => {
let task_state = TaskState::get();
let task = task_state.get_current_task();
this.completions.0 = task.replace_io_scope_completions(outer);
Poll::Pending
}
}
}
}
fn retain_incomplete(vec: &mut Vec<Rc<Completion>>, task_state: &mut TaskState) {
let mut index = 0;
while index < vec.len() {
let completion = vec.get(index).unwrap();
let retain = completion.state.use_mut(|state| {
matches!(
state,
CompletionState::Idle { .. } | CompletionState::Submitted { .. }
)
});
if !retain {
let completion = vec.swap_remove(index);
task_state.return_completion(completion);
} else {
index += 1;
}
}
}
pin_project_lite::pin_project! {
#[project = ErrnoOrFutureProj]
pub enum ErrnoOrFuture<Fut> {
Error { errno: Errno },
Future { #[pin] fut: Fut },
}
}
impl<T, Fut: Future<Output = Result<T, Errno>>> Future for ErrnoOrFuture<Fut> {
type Output = Result<T, Errno>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
ErrnoOrFutureProj::Error { errno } => Poll::Ready(Err(*errno)),
ErrnoOrFutureProj::Future { fut } => fut.poll(cx),
}
}
}
impl<T, Fut: Future<Output = Result<T, Errno>> + IsIoPoll> IsIoPoll for ErrnoOrFuture<Fut> {
fn is_io_poll(&self) -> bool {
match self {
ErrnoOrFuture::Error { errno: _ } => false,
ErrnoOrFuture::Future { fut } => fut.is_io_poll(),
}
}
}
impl<T, Fut: FusedFuture<Output = Result<T, Errno>>> FusedFuture for ErrnoOrFuture<Fut> {
fn is_terminated(&self) -> bool {
match self {
ErrnoOrFuture::Error { errno: _ } => true,
ErrnoOrFuture::Future { fut } => fut.is_terminated(),
}
}
}
pub trait IsIoPoll: FusedFuture {
fn is_io_poll(&self) -> bool;
}
pub fn submit<F: IsIoPoll>(fut: F) -> SubmitFuture<F> {
SubmitFuture {
future: fut,
polled: false,
}
}
pin_project_lite::pin_project! {
pub struct SubmitFuture<F: IsIoPoll> {
#[pin] future: F,
polled: bool,
}
}
impl<F: IsIoPoll> Future for SubmitFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let result = this.future.as_mut().poll(cx);
match result {
Poll::Ready(result) => Poll::Ready(result),
Poll::Pending => {
if !*this.polled {
*this.polled = true;
let iopoll = this.future.is_io_poll();
let task_state = TaskState::get();
crate::runtime::submit_and_complete_io(task_state, false, iopoll);
}
Poll::Pending
}
}
}
}
#[cfg(feature = "fault_injection")]
pub fn inject_fault(operation_count: usize, fault: Errno) {
let mut task_state = TaskState::get();
task_state.inject_fault(operation_count, fault)
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_enable(enabled: bool) {
let mut task_state = TaskState::get();
if enabled {
if task_state.clock.is_none() {
task_state.clock = Some(crate::virtual_clock::VirtualClockState::new(
std::time::Instant::now(),
));
}
} else {
task_state.clock = None;
}
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_advance(duration: std::time::Duration) -> usize {
let (fired, wakers) = {
let mut task_state = TaskState::get();
let clock = task_state
.clock
.as_mut()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first");
clock.advance(duration)
}; for w in wakers {
w.wake();
}
fired
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_advance_to(target: std::time::Instant) -> usize {
let (fired, wakers) = {
let mut task_state = TaskState::get();
let clock = task_state
.clock
.as_mut()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first");
clock.advance_to(target)
}; for w in wakers {
w.wake();
}
fired
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_now() -> std::time::Instant {
TaskState::get()
.clock
.as_ref()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.now()
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_epoch() -> std::time::Instant {
TaskState::get()
.clock
.as_ref()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.epoch()
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_next_deadline() -> Option<std::time::Instant> {
TaskState::get()
.clock
.as_ref()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.next_deadline()
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_pending_timers() -> usize {
TaskState::get()
.clock
.as_ref()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.pending_timers()
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_set_idle_advance(
f: impl FnMut(std::time::Instant, Option<std::time::Instant>) -> Option<std::time::Duration>
+ 'static,
) {
TaskState::get()
.clock
.as_mut()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.set_idle_advance_fn(Some(Box::new(f)));
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_clear_idle_advance() {
TaskState::get()
.clock
.as_mut()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.set_idle_advance_fn(None);
}
#[cfg(feature = "virtual-clock")]
pub fn virtual_clock_has_idle_advance() -> bool {
TaskState::get()
.clock
.as_ref()
.expect("virtual clock not enabled; call virtual_clock_enable(true) first")
.has_idle_advance_fn()
}
#[cfg(test)]
mod test {
use core::panic;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt, select};
use rustix::fs::{Mode, OFlags};
use rustix::net::{
AddressFamily, RecvFlags, SendFlags, SocketAddrUnix, SocketType, bind, listen, socket,
};
use std::collections::HashMap;
use std::num::ParseIntError;
use std::time::{Duration, Instant};
use std::{cell::Cell, rc::Rc};
use uuid::Uuid;
use crate::task::IO_URING_SUBMISSION_ENTRIES;
use crate::operations::{self, TaskHandleError, io_scope, parse_version};
use crate::{AsyncEvent, CanceledError, Errno, MutInPlaceCell};
use super::{accept, recv, send, spawn_task};
#[crate::test]
async fn drop_futures_test() {
let (read, write) = crate::pipe::bipipe();
let mut buf1 = [0; 1];
let mut read1 = operations::read(&read, &mut buf1);
let result = select! {
_ = read1 => 1,
_ = operations::yield_io() => 2,
};
assert_eq!(result, 2);
operations::write(&write, b"53").await.unwrap();
let mut buf2 = [0; 1];
operations::read(&read, &mut buf2).await.unwrap();
assert_eq!('3', buf2[0] as char);
drop(read1);
let mut read3 = operations::read(&read, &mut buf1);
let result = select! {
_ = read3 => 1,
_ = operations::yield_io() => 2,
};
assert_eq!(result, 2);
drop(read3);
operations::write(&write, b"64").await.unwrap();
operations::read(&read, &mut buf2).await.unwrap();
assert_eq!('6', buf2[0] as char);
}
#[test]
fn yield_cpu_test() {
crate::run_test_with_post_validate(operations::yield_cpu(), |stats| {
assert!(stats.tasks_polled_cpu.get() > 0);
});
}
#[test]
fn yield_io_test() {
crate::run_test_with_post_validate(operations::yield_io(), |stats| {
assert!(stats.tasks_polled_io.get() > 0);
});
}
#[crate::test]
async fn yield_io_in_futures_unordered() {
use std::pin::Pin;
type BoxFut = Pin<Box<dyn std::future::Future<Output = i32>>>;
let mut futs: FuturesUnordered<BoxFut> = FuturesUnordered::new();
futs.push(Box::pin(async {
operations::yield_io().await;
1
}));
futs.push(Box::pin(async {
operations::yield_io().await;
2
}));
let mut results = Vec::new();
while let Some(val) = futs.next().await {
results.push(val);
}
results.sort();
assert_eq!(results, vec![1, 2]);
}
#[crate::test]
async fn yield_cpu_in_futures_unordered() {
use std::pin::Pin;
type BoxFut = Pin<Box<dyn std::future::Future<Output = i32>>>;
let mut futs: FuturesUnordered<BoxFut> = FuturesUnordered::new();
futs.push(Box::pin(async {
operations::yield_cpu().await;
1
}));
futs.push(Box::pin(async {
operations::yield_cpu().await;
2
}));
let mut results = Vec::new();
while let Some(val) = futs.next().await {
results.push(val);
}
results.sort();
assert_eq!(results, vec![1, 2]);
}
#[crate::test]
async fn spawn_test() {
let shared = Rc::new(Cell::new(0i32));
let shared1 = shared.clone();
let task1 = operations::spawn_task(async move {
shared1.set(shared1.get() + 1);
});
task1.await.unwrap();
assert_eq!(1, shared.get());
}
#[crate::test]
async fn spawn_io_test() {
let shared = Rc::new(Cell::new(0i32));
let event = Rc::new(AsyncEvent::new());
event.reset();
let task = {
let event = event.clone();
let shared = shared.clone();
operations::spawn_task(async move {
shared.set(shared.get() + 1);
event.wait().await.unwrap();
shared.set(shared.get() + 1);
})
};
assert_eq!(0, shared.get());
operations::yield_io().await;
assert_eq!(1, shared.get());
event.set();
task.await.unwrap();
assert_eq!(2, shared.get());
}
#[crate::test]
async fn spawn_60k_tasks() {
{
let mut tasks: Vec<_> = Vec::new();
for _ in 0..60000 {
tasks.push(operations::spawn_task(async {}))
}
operations::yield_io().await;
}
for _ in 0..60000 {
let handle = operations::spawn_task(async {});
handle.await.unwrap();
}
}
#[crate::test]
async fn create_10000_pending_io_test() {
const TASK_COUNT: usize = IO_URING_SUBMISSION_ENTRIES * 10;
let count = Rc::new(Cell::new(0usize));
let mut tasks = Vec::new();
for _task_index in 0..TASK_COUNT {
let count = count.clone();
tasks.push(operations::spawn_task(async move {
operations::sleep(Duration::from_millis(250)).await.unwrap();
count.set(count.get() + 1);
}));
}
for task in tasks {
task.await.unwrap();
}
assert_eq!(TASK_COUNT, count.get());
}
#[crate::test]
async fn unix_domain_socket_test() {
let listener_socket = socket(AddressFamily::UNIX, SocketType::STREAM, None)
.expect("Failed to create new UDS");
bind(
&listener_socket,
&SocketAddrUnix::new_abstract_name("unix_domain_socket_test".as_bytes())
.expect("Failed to create abstract name"),
)
.expect("Failed to bind socket");
listen(&listener_socket, 1).expect("Failed to listen");
let handle = spawn_task(async move {
let client_socket = socket(AddressFamily::UNIX, SocketType::STREAM, None)
.expect("Failed to create client socket");
operations::connect_unix(
&client_socket,
&SocketAddrUnix::new_abstract_name("unix_domain_socket_test".as_bytes()).unwrap(),
)
.await
.unwrap();
let buf = [1u8; 8];
send(&client_socket, &buf, SendFlags::empty(), None)
.await
.expect("Failed to write to socket");
});
let socket = accept(&listener_socket).await.expect("Failed to accept");
let mut buf = [0u8; 8];
recv(&socket, &mut buf, RecvFlags::empty(), None)
.await
.expect("Failed to recv");
handle.await.unwrap();
}
#[crate::test]
async fn recv_timeout_test() {
let listener_socket = socket(AddressFamily::UNIX, SocketType::STREAM, None)
.expect("Failed to create new UDS");
bind(
&listener_socket,
&SocketAddrUnix::new_abstract_name("recv_timeout_test".as_bytes())
.expect("Failed to create abstract name"),
)
.expect("Failed to bind socket");
listen(&listener_socket, 1).expect("Failed to listen");
let handle = spawn_task(async move {
let client_socket = socket(AddressFamily::UNIX, SocketType::STREAM, None)
.expect("Failed to create client socket");
operations::connect_unix(
&client_socket,
&SocketAddrUnix::new_abstract_name("recv_timeout_test".as_bytes()).unwrap(),
)
.await
.unwrap();
client_socket
});
let socket = accept(&listener_socket).await.expect("Failed to accept");
let mut buf = [0u8; 8];
assert_eq!(
Errno::TIME,
recv(
&socket,
&mut buf,
RecvFlags::empty(),
Some(Duration::from_millis(100)),
)
.await
.expect_err("No timeout as expected")
);
let _client_socket = handle.await.unwrap();
}
#[test]
fn parse_version_test() {
assert_eq!(parse_version("6.2").unwrap(), (6, 2));
assert_eq!(parse_version("5.17").unwrap(), (5, 17));
assert_eq!(parse_version("5.17GARBAGE").unwrap(), (5, 17));
assert_eq!(parse_version("5.17.8").unwrap(), (5, 17));
assert_eq!(parse_version("5.17.8GARBAGE").unwrap(), (5, 17));
assert_eq!(parse_version("5.17 More").unwrap(), (5, 17));
assert_eq!(parse_version("5.17.8 More").unwrap(), (5, 17));
assert_eq!(parse_version("5.17.").unwrap(), (5, 17));
assert_eq!(parse_version("5.17. ").unwrap(), (5, 17));
assert_eq!(parse_version("5.17. more").unwrap(), (5, 17));
fn failed(x: Result<(u32, u32), ParseIntError>) -> bool {
x.is_err()
}
assert!(failed(parse_version("5")));
assert!(failed(parse_version("5.")));
assert!(failed(parse_version("5.x")));
assert!(failed(parse_version("y")));
assert!(failed(parse_version("")));
assert!(failed(parse_version("y.5")));
}
#[crate::test]
async fn task_error_test() {
let task_handle =
operations::spawn_task(
async move { Err(Errno::from_raw_os_error(1)) as Result<(), Errno> },
);
assert_eq!(1, task_handle.await.unwrap().unwrap_err().raw_os_error());
}
#[crate::test]
async fn starvation_test() {
let terminate_time = Instant::now() + Duration::from_secs(10);
let done = Rc::new(Cell::new(false));
let yield_count = Rc::new(Cell::new(0usize));
let infinite_yield_task = {
let done = done.clone();
let yield_count = yield_count.clone();
operations::spawn_task(async move {
while !done.get() {
operations::yield_io().await;
yield_count.set(yield_count.get() + 1);
assert!(
Instant::now() < terminate_time,
"Short sleep should complete well before terminate_time"
);
}
})
};
let respawn_count = Rc::new(Cell::new(0usize));
{
let done = done.clone();
let respawn_count = respawn_count.clone();
async fn respawn(
done: Rc<Cell<bool>>,
respawn_count: Rc<Cell<usize>>,
terminate_time: Instant,
) {
assert!(
Instant::now() < terminate_time,
"Short sleep should complete well before terminate_time"
);
if !done.get() {
respawn_count.set(respawn_count.get() + 1);
operations::spawn_task(respawn(done, respawn_count, terminate_time));
}
}
operations::spawn_task(respawn(done, respawn_count, terminate_time));
}
operations::sleep(Duration::from_millis(100)).await.unwrap();
done.set(true);
infinite_yield_task.await.unwrap();
assert!(yield_count.get() > 0);
assert!(respawn_count.get() > 0);
}
#[crate::test]
async fn test_abort() {
let io_forever_task = operations::spawn_task(async {
loop {
operations::nop().await.unwrap()
}
});
let wait_forever_task = operations::spawn_task(async {
let event = AsyncEvent::new();
event.wait().await.unwrap();
});
let yield_forever_task = operations::spawn_task(async {
loop {
operations::yield_io().await;
}
});
operations::yield_io().await;
io_forever_task.abort();
wait_forever_task.abort();
yield_forever_task.abort();
fn get_abort_result(result: Result<(), TaskHandleError>) -> &'static str {
match result {
Ok(_) => panic!("Task should have been aborted"),
Err(TaskHandleError::Canceled) => panic!("Task should have been aborted"),
Err(TaskHandleError::Panic(payload)) => *payload.downcast::<&str>().unwrap(),
}
}
let result = io_forever_task.await;
assert_eq!(get_abort_result(result), "Task aborted");
let result = wait_forever_task.await;
assert_eq!(get_abort_result(result), "Task aborted");
let result = yield_forever_task.await;
assert_eq!(get_abort_result(result), "Task aborted");
}
#[crate::test]
async fn test_wait_for_multiple_tasks_with_futures_unordered() {
let task_count = 10;
let mut futures = FuturesUnordered::new();
for _ in 0..task_count {
futures.push(operations::spawn_task(async {
operations::sleep(Duration::from_millis(100)).await.unwrap();
}));
}
while futures.next().await.is_some() {}
}
#[crate::test]
async fn test_wait_and_join_task_handle() {
let task = operations::spawn_task(async { 5 });
let (_f1, f2) = futures::join!(task.wait(), task);
assert_eq!(f2.unwrap(), 5);
}
#[crate::test]
#[allow(clippy::async_yields_async)]
async fn test_cancel_sleep_with_io_scope() {
let sleep_a_long_time = io_scope(async move || {
let mut sleep_a_long_time =
operations::sleep(Duration::from_secs(3600)).map(|result| {
assert_eq!(result, Err(Errno::CANCELED));
"it was canceled"
});
futures::select! {
_ = sleep_a_long_time => panic!("sleep should not return"),
default => {}
}
sleep_a_long_time
})
.await;
let result = sleep_a_long_time.await;
assert_eq!(result.to_string(), "it was canceled".to_string());
}
#[crate::test]
async fn test_cancel_wait_with_io_scope() {
let event = AsyncEvent::new();
io_scope(async move || {
let mut wait = event.wait();
futures::select! {
_ = wait => panic!("wait should not return"),
default => {}
}
operations::io_scope_cancel();
assert_eq!(wait.await, Err(CanceledError {}));
event.set();
assert_eq!(event.wait().await, Ok(()));
})
.await;
}
#[crate::test]
async fn test_nested_io_scope_wait() {
let event1 = AsyncEvent::new();
let event2 = AsyncEvent::new();
io_scope(async move || {
let mut wait = event1.wait();
futures::select! {
_ = wait => panic!("wait should not return"),
default => {}
}
io_scope(async move || {
let mut wait2 = event2.wait();
futures::select! {
_ = wait2 => panic!("wait should not return"),
default => {}
}
operations::io_scope_cancel();
assert_eq!(wait2.await, Err(CanceledError {}));
event2.set();
assert_eq!(event2.wait().await, Ok(()));
})
.await;
futures::select! {
_ = wait => panic!("wait should not return"),
default => {}
}
operations::io_scope_cancel();
assert_eq!(wait.await, Err(CanceledError {}));
event1.set();
assert_eq!(event1.wait().await, Ok(()));
})
.await;
}
#[crate::test]
async fn test_nested_io_scope() {
io_scope(async move || {
let mut wait1 = operations::sleep(Duration::from_secs(3600));
futures::select! {
_ = wait1 => panic!("wait should not return"),
default => {}
}
io_scope(async move || {
let mut wait2 = operations::sleep(Duration::from_secs(3600));
futures::select! {
_ = wait2 => panic!("wait should not return"),
default => {}
}
operations::io_scope_cancel();
assert_eq!(wait2.await, Err(Errno::CANCELED));
})
.await;
futures::select! {
_ = wait1 => panic!("wait should not return"),
default => {}
}
operations::io_scope_cancel();
assert_eq!(wait1.await, Err(Errno::CANCELED));
})
.await;
}
#[crate::test]
async fn test_nested_io_scope_preserves_outer_wait() {
let outer_event = AsyncEvent::new();
io_scope(async || {
let mut outer_wait = outer_event.wait();
futures::select! {
_ = outer_wait => panic!("outer wait should not return yet"),
default => {}
}
io_scope(async || {
let inner_event = AsyncEvent::new();
let mut inner_wait = inner_event.wait();
futures::select! {
_ = inner_wait => panic!("inner wait should not return"),
default => {}
}
})
.await;
outer_event.set();
assert_eq!(outer_wait.await, Ok(()));
})
.await;
}
#[crate::test]
async fn test_io_scope_does_not_cancel_sibling_future() {
use crate::oneshot;
let (tx, rx) = oneshot::<i32>();
let sync_event = AsyncEvent::new();
let future_a = async {
io_scope(async || {
loop {
if sync_event.any_waiting() {
break;
}
operations::yield_io().await;
}
})
.await;
tx.send(42).unwrap();
};
let future_b = async {
let mut wait = std::pin::pin!(sync_event.wait());
futures::future::poll_fn(|cx| {
let _ = wait.as_mut().poll(cx);
std::task::Poll::Ready(())
})
.await;
let result = rx.recv().await;
assert_eq!(
result,
Ok(42),
"B's channel recv was canceled by A's io_scope"
);
};
futures::join!(future_a, future_b);
}
#[crate::test]
async fn test_cancel_too_many_pending_io() {
let nops = io_scope(async move || {
let mut requests = Vec::new();
for _ in 0..IO_URING_SUBMISSION_ENTRIES * 4 {
requests.push(operations::sleep(Duration::from_secs(3600)));
}
operations::io_scope_cancel();
let nops = (0..IO_URING_SUBMISSION_ENTRIES).map(|_| operations::nop());
let results = join_all(requests).await;
for result in results {
assert_eq!(result, Err(Errno::CANCELED));
}
nops
})
.await;
for result in join_all(nops).await {
assert_eq!(result, Ok(()));
}
}
#[crate::test]
async fn test_overflow_via_submit() {
let mut ops = Vec::new();
for _x in 0..IO_URING_SUBMISSION_ENTRIES * 100 {
ops.push(operations::submit(operations::nop()));
}
for op in ops {
let result = op.await;
assert!(result.is_ok());
}
}
#[crate::test]
async fn test_too_many_pending_io() {
let requests: Vec<_> = (0..IO_URING_SUBMISSION_ENTRIES * 100)
.map(|_| operations::sleep(Duration::from_millis(100)))
.collect();
let results = join_all(requests).await;
for result in results {
result.unwrap();
}
}
#[crate::test]
async fn test_spawn_return_future() {
#[allow(clippy::async_yields_async)]
let task = operations::spawn_task(async {
operations::nop()
});
let result = task.await.unwrap();
assert_eq!(result.await, Ok(()));
}
#[test]
fn test_ioend_activity_id() {
const ACTIVITY_ID: Uuid = Uuid::from_u128(1);
const TENANT_ID: Uuid = Uuid::from_u128(2);
struct Tracer {
id: MutInPlaceCell<HashMap<u32, Uuid>>,
}
impl Tracer {
pub fn new() -> Self {
Self {
id: MutInPlaceCell::new(HashMap::new()),
}
}
}
impl crate::TraceConfiguration for Tracer {
fn trace(&self, event: crate::EventEnvelope) {
match event.event {
crate::Events::IoStart {
activity_id, tag, ..
} => {
self.id.use_mut(|id| id.insert(tag, activity_id));
}
crate::Events::IoEnd {
activity_id, tag, ..
} => {
let expected_activity_id = self.id.use_mut(|id| id.remove(&tag));
assert_eq!(expected_activity_id, Some(activity_id));
}
_ => {}
}
}
}
let configuration = crate::configuration::Configuration::default()
.set_trace_buffer_manager(Box::new(Tracer::new()));
crate::run_with_configuration(
0,
async {
operations::set_activity_id_and_tenant_id(ACTIVITY_ID, TENANT_ID);
let mut sleep = operations::sleep(Duration::from_millis(100)).fuse();
select! {
_ = sleep => panic!("not expected"),
default => {},
}
operations::set_activity_id_and_tenant_id(Uuid::nil(), Uuid::nil());
sleep.await.unwrap();
},
configuration,
)
.unwrap()
.unwrap();
}
#[cfg(feature = "fault_injection")]
#[crate::test]
async fn test_inject_fault() {
operations::inject_fault(0, Errno::FAULT);
assert_eq!(operations::nop().await, Err(Errno::FAULT));
assert_eq!(operations::nop().await, Ok(()));
operations::inject_fault(1, Errno::FAULT);
assert_eq!(operations::nop().await, Err(Errno::FAULT));
assert_eq!(operations::nop().await, Ok(()));
operations::inject_fault(2, Errno::FAULT);
assert_eq!(operations::nop().await, Ok(()));
assert_eq!(operations::nop().await, Err(Errno::FAULT));
assert_eq!(operations::nop().await, Ok(()));
}
#[crate::test]
async fn file_tests() {
let root = c"/tmp/file_tests";
let filename = c"/tmp/file_tests/file.txt";
let newpath1 = c"/tmp/file_tests/file.txt-1.link";
let newpath2 = c"/tmp/file_tests/file.txt-2.link";
match operations::mkdir(root, 0o775.into()).await {
Ok(()) => {}
Err(Errno::EXIST) => println!("Directory {root:?} already exists"),
Err(e) => panic!("Failed to create directory {root:?}: {e}"),
}
for name in [filename, newpath1, newpath2] {
let stat = operations::stat(name).await;
if stat.is_ok() {
operations::unlink(name).await.unwrap();
}
}
let file = operations::open(
filename,
OFlags::CREATE | OFlags::RDWR,
Mode::from_raw_mode(0o666),
)
.await
.unwrap();
operations::link(filename, newpath1).await.unwrap();
operations::rename(newpath1, newpath2).await.unwrap();
operations::write(&file, b"hello, world!").await.unwrap();
operations::pwrite(&file, b"Gdday", 0).await.unwrap();
operations::pwrite_polled(&file, b"mate!", 7, false)
.await
.unwrap();
operations::fsync(&file).await.unwrap();
let mut buf = [0u8; 1024];
let amount = operations::pread(&file, &mut buf, 0).await.unwrap();
assert_eq!(amount, 13);
assert_eq!(&buf[..13], b"Gdday, mate!!");
operations::close(file);
operations::unlink(filename).await.unwrap();
let file = operations::open(newpath2, OFlags::RDONLY, Mode::from_raw_mode(0o666))
.await
.unwrap();
let stat = operations::fstat(&file).await.unwrap();
assert_eq!(stat.stx_size, 13);
let amount = operations::read_with_deadline(
&file,
&mut buf[..13],
Some(Instant::now() + Duration::from_secs(120)),
)
.await
.unwrap();
assert_eq!(amount, 13);
assert_eq!(&buf[..13], b"Gdday, mate!!");
operations::close(file);
operations::unlink(newpath2).await.unwrap();
operations::rmdir(root).await.unwrap();
}
#[cfg(feature = "virtual-clock")]
mod virtual_clock_tests {
use crate::configuration::Configuration;
use crate::operations;
use crate::{Runtime, TimeoutError};
use std::cell::Cell;
use std::rc::Rc;
use std::task::Poll;
use std::time::Duration;
fn run_virtual<Fut>(test: Fut)
where
Fut: std::future::Future<Output = ()> + 'static,
{
let mut runtime = Runtime::new(0, Configuration::new());
let result = runtime.block_on(async {
operations::virtual_clock_enable(true);
test.await;
});
if let Some(Err(payload)) = result {
std::panic::resume_unwind(payload);
}
}
#[test]
fn virtual_sleep_completes_on_advance() {
run_virtual(async {
use std::pin::pin;
let mut sleep = pin!(operations::sleep(Duration::from_secs(60)));
let completed = futures::future::poll_fn(|cx| match sleep.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(false),
Poll::Ready(_) => Poll::Ready(true),
})
.await;
assert!(!completed, "should be pending before advance");
operations::virtual_clock_advance(Duration::from_secs(60));
sleep.await.unwrap();
});
}
#[test]
fn virtual_sleep_stays_pending_without_advance() {
run_virtual(async {
use std::pin::pin;
let mut sleep = pin!(operations::sleep(Duration::from_secs(10)));
for _ in 0..3 {
let completed = futures::future::poll_fn(|cx| match sleep.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(false),
Poll::Ready(_) => Poll::Ready(true),
})
.await;
assert!(!completed);
}
operations::virtual_clock_advance(Duration::from_secs(5));
let completed = futures::future::poll_fn(|cx| match sleep.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(false),
Poll::Ready(_) => Poll::Ready(true),
})
.await;
assert!(!completed);
operations::virtual_clock_advance(Duration::from_secs(5));
sleep.await.unwrap();
});
}
#[test]
fn multiple_virtual_sleeps_wake_in_order() {
run_virtual(async {
let order = Rc::new(std::cell::RefCell::new(Vec::new()));
let o1 = order.clone();
let o2 = order.clone();
let o3 = order.clone();
operations::spawn_task(async move {
operations::sleep(Duration::from_secs(30)).await.unwrap();
o1.borrow_mut().push(30);
});
operations::spawn_task(async move {
operations::sleep(Duration::from_secs(10)).await.unwrap();
o2.borrow_mut().push(10);
});
operations::spawn_task(async move {
operations::sleep(Duration::from_secs(20)).await.unwrap();
o3.borrow_mut().push(20);
});
operations::yield_io().await;
operations::virtual_clock_advance(Duration::from_secs(30));
for _ in 0..5 {
operations::yield_io().await;
}
assert_eq!(*order.borrow(), vec![10, 20, 30]);
});
}
#[test]
fn virtual_sleep_60s_completes_fast() {
let wall_start = std::time::Instant::now();
run_virtual(async {
use std::pin::pin;
let mut sleep = pin!(operations::sleep(Duration::from_secs(60)));
futures::future::poll_fn(|cx| {
let _ = sleep.as_mut().poll(cx);
Poll::Ready(())
})
.await;
operations::virtual_clock_advance(Duration::from_secs(60));
sleep.await.unwrap();
});
assert!(wall_start.elapsed() < Duration::from_secs(1));
}
#[test]
fn spawned_virtual_sleep_completes_on_advance() {
run_virtual(async {
let done = Rc::new(Cell::new(false));
let d = done.clone();
operations::spawn_task(async move {
operations::sleep(Duration::from_secs(60)).await.unwrap();
d.set(true);
});
operations::yield_io().await;
assert!(!done.get());
operations::virtual_clock_advance(Duration::from_secs(60));
operations::yield_io().await;
operations::yield_io().await;
assert!(done.get());
});
}
#[test]
fn sleep_until_completes_at_deadline() {
run_virtual(async {
use std::pin::pin;
let deadline = operations::virtual_clock_now() + Duration::from_secs(5);
let mut sleep = pin!(operations::sleep_until(deadline));
let completed = futures::future::poll_fn(|cx| match sleep.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(false),
Poll::Ready(_) => Poll::Ready(true),
})
.await;
assert!(!completed);
operations::virtual_clock_advance(Duration::from_secs(5));
sleep.await.unwrap();
});
}
#[test]
fn sleep_until_past_deadline_completes_immediately() {
run_virtual(async {
let past = operations::virtual_clock_now() - Duration::from_secs(1);
operations::sleep_until(past).await.unwrap();
});
}
#[test]
fn timeout_at_returns_inner_result_on_fast_future() {
run_virtual(async {
let deadline = operations::virtual_clock_now() + Duration::from_secs(10);
let result = operations::timeout_at(deadline, async { 42 }).await;
assert_eq!(result, Ok(42));
});
}
#[test]
fn timeout_at_returns_timeout_on_advance() {
run_virtual(async {
let deadline = operations::virtual_clock_now() + Duration::from_secs(5);
let done = Rc::new(Cell::new(None::<Result<(), TimeoutError>>));
let d = done.clone();
operations::spawn_task(async move {
let result =
operations::timeout_at(deadline, std::future::pending::<()>()).await;
d.set(Some(result));
});
operations::yield_io().await;
assert!(done.get().is_none(), "should still be pending");
operations::virtual_clock_advance(Duration::from_secs(5));
operations::yield_io().await;
operations::yield_io().await;
assert_eq!(done.get(), Some(Err(TimeoutError::Timeout)));
});
}
#[test]
fn timeout_at_past_deadline_immediate_timeout() {
run_virtual(async {
let past_deadline = operations::virtual_clock_now() - Duration::from_secs(1);
let result =
operations::timeout_at(past_deadline, std::future::pending::<()>()).await;
assert_eq!(result, Err(TimeoutError::Timeout));
});
}
#[test]
fn dropped_virtual_sleep_cancels_timer() {
run_virtual(async {
assert_eq!(operations::virtual_clock_pending_timers(), 0);
{
use std::pin::pin;
let mut sleep = pin!(operations::sleep(Duration::from_secs(100)));
futures::future::poll_fn(|cx| {
let _ = sleep.as_mut().poll(cx);
Poll::Ready(())
})
.await;
assert_eq!(operations::virtual_clock_pending_timers(), 1);
}
assert_eq!(operations::virtual_clock_pending_timers(), 0);
});
}
#[test]
fn dropped_virtual_sleep_no_spurious_wakeup_on_advance() {
run_virtual(async {
{
use std::pin::pin;
let mut sleep = pin!(operations::sleep(Duration::from_secs(10)));
futures::future::poll_fn(|cx| {
let _ = sleep.as_mut().poll(cx);
Poll::Ready(())
})
.await;
}
assert_eq!(operations::virtual_clock_pending_timers(), 0);
let fired = operations::virtual_clock_advance(Duration::from_secs(10));
assert_eq!(fired, 0);
});
}
#[test]
fn unpolled_virtual_sleep_drops_cleanly() {
run_virtual(async {
let sleep = operations::sleep(Duration::from_secs(10));
drop(sleep);
assert_eq!(operations::virtual_clock_pending_timers(), 0);
});
}
#[test]
fn callback_completes_sleep() {
run_virtual(async {
operations::virtual_clock_set_idle_advance(|now, next| {
next.map(|d| d.saturating_duration_since(now))
});
operations::sleep(Duration::from_secs(60)).await.unwrap();
});
}
#[test]
fn callback_60s_completes_fast() {
let wall_start = std::time::Instant::now();
run_virtual(async {
operations::virtual_clock_set_idle_advance(|now, next| {
next.map(|d| d.saturating_duration_since(now))
});
operations::sleep(Duration::from_secs(60)).await.unwrap();
});
assert!(wall_start.elapsed() < Duration::from_secs(1));
}
#[test]
fn callback_fires_timers_in_sequence() {
run_virtual(async {
let epoch = operations::virtual_clock_epoch();
operations::virtual_clock_set_idle_advance(|now, next| {
next.map(|d| d.saturating_duration_since(now))
});
operations::sleep(Duration::from_secs(10)).await.unwrap();
assert_eq!(
operations::virtual_clock_now().duration_since(epoch),
Duration::from_secs(10)
);
operations::sleep(Duration::from_secs(20)).await.unwrap();
assert_eq!(
operations::virtual_clock_now().duration_since(epoch),
Duration::from_secs(30)
);
});
}
#[test]
fn callback_wakes_spawned_task() {
run_virtual(async {
let done = Rc::new(Cell::new(false));
let d = done.clone();
operations::virtual_clock_set_idle_advance(|now, next| {
next.map(|d| d.saturating_duration_since(now))
});
operations::spawn_task(async move {
operations::sleep(Duration::from_secs(60)).await.unwrap();
d.set(true);
});
operations::sleep(Duration::from_secs(60)).await.unwrap();
operations::yield_io().await;
assert!(done.get());
});
}
#[test]
fn fixed_callback_completes_sleep() {
let wall_start = std::time::Instant::now();
run_virtual(async {
operations::virtual_clock_set_idle_advance(|_, _| Some(Duration::from_millis(1)));
operations::sleep(Duration::from_secs(60)).await.unwrap();
});
assert!(wall_start.elapsed() < Duration::from_secs(1));
}
#[test]
fn no_callback_is_noop() {
run_virtual(async {
let epoch = operations::virtual_clock_epoch();
assert_eq!(operations::virtual_clock_now(), epoch);
assert!(!operations::virtual_clock_has_idle_advance());
});
}
#[test]
fn replace_callback_changes_behavior() {
run_virtual(async {
let epoch = operations::virtual_clock_epoch();
operations::virtual_clock_set_idle_advance(|now, next| {
next.map(|d| d.saturating_duration_since(now))
});
operations::sleep(Duration::from_secs(60)).await.unwrap();
assert_eq!(
operations::virtual_clock_now().duration_since(epoch),
Duration::from_secs(60)
);
operations::virtual_clock_set_idle_advance(|_, _| Some(Duration::from_millis(1)));
operations::sleep(Duration::from_millis(10)).await.unwrap();
assert!(
operations::virtual_clock_now().duration_since(epoch)
>= Duration::from_secs(60) + Duration::from_millis(10)
);
});
}
}
}