use core::cell::RefCell;
use core::{
cell::{Cell, UnsafeCell},
future::Future,
mem,
pin::Pin,
task::{Context, Poll, Waker},
};
use pin_list::{InitializedNode, NodeData, PinList, id::Unchecked};
use pin_project::{pin_project, pinned_drop};
pub trait Buffer {
fn as_slice(&self) -> &[u8];
}
impl<const N: usize> Buffer for [u8; N] {
#[inline]
fn as_slice(&self) -> &[u8] {
&self[..]
}
}
impl Buffer for &[u8] {
#[inline]
fn as_slice(&self) -> &[u8] {
self
}
}
impl Buffer for &mut [u8] {
#[inline]
fn as_slice(&self) -> &[u8] {
self
}
}
type WaiterTypes =
dyn pin_list::Types<Id = Unchecked, Protected = Waker, Removed = (), Unprotected = usize>;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum LoadState<F> {
Idle(F),
Loading,
}
struct LoaderGuard<'s, B: Buffer, F: AsyncFnMut(&mut B)> {
shared: &'s SharedBufData<B, F>,
loader: Option<F>,
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Drop for LoaderGuard<'s, B, F> {
fn drop(&mut self) {
if let Some(loader) = self.loader.take() {
self.shared.load_state.set(LoadState::Idle(loader));
if self.shared.remaining.get() == 0 {
self.shared.wake_one();
}
}
}
}
struct SharedBufData<B: Buffer, F: AsyncFnMut(&mut B)> {
buf: UnsafeCell<B>,
remaining: Cell<usize>,
total: Cell<usize>,
epoch: Cell<usize>,
handle_id: Cell<usize>,
load_state: Cell<LoadState<F>>,
active: RefCell<PinList<WaiterTypes>>,
clear: RefCell<PinList<WaiterTypes>>,
}
pub struct SharedBuf<'s, B: Buffer, F: AsyncFnMut(&mut B)>(&'s SharedBufData<B, F>);
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Clone for SharedBuf<'s, B, F> {
fn clone(&self) -> Self {
*self
}
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Copy for SharedBuf<'s, B, F> {}
unsafe impl<B: Send + Buffer, F: Send + AsyncFnMut(&mut B)> Send for SharedBufData<B, F> {}
impl<B: Buffer, F: AsyncFnMut(&mut B)> SharedBufData<B, F> {
fn new(initial: B, loader: F) -> Self {
Self {
buf: UnsafeCell::new(initial),
remaining: Cell::new(0),
total: Cell::new(0),
epoch: Cell::new(1),
handle_id: Cell::new(0),
load_state: Cell::new(LoadState::Idle(loader)),
active: RefCell::new(PinList::new(unsafe { Unchecked::new() })),
clear: RefCell::new(PinList::new(unsafe { Unchecked::new() })),
}
}
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> SharedBuf<'s, B, F> {
pub fn with<R>(initial: B, loader: F, f: impl FnOnce(SharedBuf<'_, B, F>) -> R) -> R {
let shared = SharedBufData::new(initial, loader);
f(SharedBuf(&shared))
}
pub async fn with_async<R>(
initial: B,
loader: F,
f: impl AsyncFnOnce(SharedBuf<'_, B, F>) -> R,
) -> R {
let shared = SharedBufData::new(initial, loader);
f(SharedBuf(&shared)).await
}
pub fn fork(self) -> Handle<'s, B, F> {
self.0.fork_helper()
}
#[cfg(test)]
fn inner(&self) -> &SharedBufData<B, F> {
self.0
}
}
impl<B: Buffer, F: AsyncFnMut(&mut B)> SharedBufData<B, F> {
#[inline]
fn wake_one(&self) -> bool {
let Some(waker) = self.remove_active_front() else {
return false;
};
waker.wake();
true
}
async fn load_and_wake(&self) {
self.run_loader().await;
self.epoch.set(self.epoch.get() + 1);
self.active.swap(&self.clear);
self.remaining.set(self.total.get());
while let Some(waker) = self.remove_clear_front() {
waker.wake();
}
}
fn needs_load(&self) -> bool {
self.remaining.get() == 0
}
fn remove_active_front(&self) -> Option<Waker> {
self.active
.borrow_mut()
.cursor_front_mut()
.remove_current(())
.ok()
}
fn remove_clear_front(&self) -> Option<Waker> {
self.clear
.borrow_mut()
.cursor_front_mut()
.remove_current(())
.ok()
}
fn reset_node<'a, 'b: 'c, 'c>(
&'a self,
initialized: Pin<&'b mut InitializedNode<'c, WaiterTypes>>,
) -> (NodeData<WaiterTypes>, usize) {
if *initialized.unprotected() == self.epoch.get() {
initialized.reset(&mut *self.active.borrow_mut())
} else {
initialized.reset(&mut *self.clear.borrow_mut())
}
}
#[inline]
fn phase(&self) -> LoadState<()> {
let state = self.load_state.replace(LoadState::Loading);
let phase = match state {
LoadState::Idle(_) => LoadState::Idle(()),
LoadState::Loading => LoadState::Loading,
};
self.load_state.set(state);
phase
}
fn checkout_loader(&self) -> LoaderGuard<'_, B, F> {
let prev = self.load_state.replace(LoadState::Loading);
let loader = match prev {
LoadState::Idle(f) => f,
LoadState::Loading => unreachable!("loader checked out twice"),
};
LoaderGuard {
shared: self,
loader: Some(loader),
}
}
async fn run_loader(&self) {
let mut guard = self.checkout_loader();
let b = unsafe { &mut *self.buf.get() };
(guard.loader.as_mut().unwrap())(b).await;
self.load_state
.set(LoadState::Idle(guard.loader.take().unwrap()));
mem::forget(guard);
}
fn fork_helper(&self) -> Handle<'_, B, F> {
debug_assert_eq!(self.phase(), LoadState::Idle(()));
self.remaining.set(self.remaining.get() + 1);
self.total.set(self.total.get() + 1);
let id = self.handle_id.get();
self.handle_id.set(id + 1);
Handle { shared: self, id }
}
}
pub struct Handle<'s, B: Buffer, F: AsyncFnMut(&mut B)> {
shared: &'s SharedBufData<B, F>,
id: usize,
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Handle<'s, B, F> {
pub fn buf(&self) -> &[u8] {
unsafe { (*self.shared.buf.get()).as_slice() }
}
pub fn fork(&mut self) -> Handle<'s, B, F> {
self.shared.fork_helper()
}
pub async fn next(self) -> Handle<'s, B, F> {
let shared = self.shared;
let id = self.id;
mem::forget(self);
let remaining = shared.remaining.get() - 1;
shared.remaining.set(remaining);
struct TotalGuard<'a>(&'a Cell<usize>);
impl<'a> Drop for TotalGuard<'a> {
fn drop(&mut self) {
let count = self.0.get();
debug_assert_ne!(count, 0);
self.0.set(count.saturating_sub(1));
}
}
let guard = TotalGuard(&shared.total);
let handle = if remaining == 0 {
debug_assert_eq!(shared.phase(), LoadState::Idle(()));
shared.load_and_wake().await;
Handle { shared, id }
} else {
let shared = WaitFuture::wait(shared).await;
if shared.remaining.get() == 0 {
debug_assert_eq!(shared.phase(), LoadState::Idle(()));
shared.load_and_wake().await;
}
Handle { shared, id }
};
mem::forget(guard);
handle
}
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Drop for Handle<'s, B, F> {
fn drop(&mut self) {
let remaining = self.shared.remaining.get();
debug_assert_ne!(remaining, 0);
let remaining = remaining.saturating_sub(1);
self.shared.remaining.set(remaining);
let total = self.shared.total.get();
debug_assert_ne!(total, 0);
self.shared.total.set(total.saturating_sub(1));
if remaining == 0 && self.shared.total.get() > 0 {
self.shared.wake_one();
}
}
}
#[pin_project(PinnedDrop)]
struct WaitFuture<'s, B: Buffer, F: AsyncFnMut(&mut B)> {
shared: &'s SharedBufData<B, F>,
#[pin]
node: pin_list::Node<WaiterTypes>,
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> WaitFuture<'s, B, F> {
async fn wait(shared: &'s SharedBufData<B, F>) -> &'s SharedBufData<B, F> {
WaitFuture {
shared,
node: pin_list::Node::new(),
}
.await
}
}
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> Future for WaitFuture<'s, B, F> {
type Output = &'s SharedBufData<B, F>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if !this.node.is_initial() {
let initialized = this
.node
.initialized_mut()
.expect("node must be initialized when waiting");
let list = &mut *this.shared.active.borrow_mut();
return match initialized.take_removed(list) {
Ok((_removed, _epoch)) => {
Poll::Ready(*this.shared)
}
Err(still_linked) => {
*still_linked.protected_mut(list).unwrap() = cx.waker().clone();
Poll::Pending
}
};
}
let epoch = this.shared.epoch.get();
if this.shared.remaining.get() == 0 {
return Poll::Ready(*this.shared);
}
let waker = cx.waker().clone();
this.shared
.active
.borrow_mut()
.push_front(this.node, waker, epoch);
Poll::Pending
}
}
#[pinned_drop]
impl<'s, B: Buffer, F: AsyncFnMut(&mut B)> PinnedDrop for WaitFuture<'s, B, F> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if this.node.is_initial() {
return;
}
let initialized = this
.node
.initialized_mut()
.expect("node must be initialized when waiting");
match this.shared.reset_node(initialized) {
(pin_list::NodeData::Linked(_waker), epoch) if epoch == this.shared.epoch.get() => {}
(pin_list::NodeData::Linked(_waker), _epoch) => {
let remaining = this.shared.remaining.get();
debug_assert_ne!(remaining, 0);
this.shared.remaining.set(remaining.saturating_sub(1));
}
(pin_list::NodeData::Removed(()), _epoch) if this.shared.needs_load() => {
debug_assert_eq!(_epoch, this.shared.epoch.get());
this.shared.wake_one();
}
(pin_list::NodeData::Removed(()), _epoch) => {
debug_assert_ne!(
this.shared.phase(),
LoadState::Loading,
"WaitFuture::drop: Phase::Loading should be unreachable"
);
let remaining = this.shared.remaining.get();
debug_assert_ne!(remaining, 0);
this.shared.remaining.set(remaining.saturating_sub(1));
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn poll_once<F: Future>(f: Pin<&mut F>) -> Poll<F::Output> {
let w = strede_test_util::noop_waker();
let mut cx = Context::from_waker(&w);
f.poll(&mut cx)
}
fn yield_now() -> YieldNow {
YieldNow { yielded: false }
}
struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
Poll::Pending
}
}
}
#[test]
fn rewrite() {
let values: &[&[u8; 5]] = &[b"world", b"xxxxx"];
let mut i = 0usize;
SharedBuf::with(
*b"hello",
async |b: &mut [u8; 5]| {
*b = *values[i];
i += 1;
},
|shared| {
let h = shared.fork();
assert_eq!(h.buf(), b"hello");
let mut fut = core::pin::pin!(h.next());
let h2 = match poll_once(fut.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("expected Ready for sole handle"),
};
assert_eq!(h2.buf(), b"world");
let mut fut = core::pin::pin!(h2.next());
let h3 = match poll_once(fut.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("expected Ready"),
};
assert_eq!(h3.buf(), b"xxxxx");
},
);
}
#[test]
fn n1_single_cycle() {
let bufs: &[&[u8]] = &[b"hello", b"world!!", b"x"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let h = shared.fork();
assert_eq!(h.buf(), b"hello");
let mut fut = core::pin::pin!(h.next());
let h2 = match poll_once(fut.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("expected Ready for sole handle"),
};
assert_eq!(h2.buf(), b"world!!");
let mut fut = core::pin::pin!(h2.next());
let h3 = match poll_once(fut.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("expected Ready"),
};
assert_eq!(h3.buf(), b"x");
},
);
}
#[test]
fn n2_second_handle_is_loader() {
let bufs: &[&[u8]] = &[b"abc", b"defghi"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let h1 = h0.fork();
let mut fut0 = core::pin::pin!(h0.next());
match poll_once(fut0.as_mut()) {
Poll::Pending => {}
Poll::Ready(_) => panic!("h0 should pend"),
}
let mut fut1 = core::pin::pin!(h1.next());
let h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h1 should be Ready"),
};
assert_eq!(h1_new.buf(), b"defghi");
let h0_new = match poll_once(fut0.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut0 should be Ready after load"),
};
assert_eq!(h0_new.buf(), b"defghi");
},
);
}
#[test]
fn n3_round_trip_non_uniform_sizes() {
let bufs: &[&[u8]] = &[b"one", b"two-but-longer"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let mut fut0 = core::pin::pin!(h0.next());
let mut fut1 = core::pin::pin!(h1.next());
let mut fut2 = core::pin::pin!(h2.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
let h_loader_new = match poll_once(fut2.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h2 should be Ready (last)"),
};
assert_eq!(h_loader_new.buf(), b"two-but-longer");
let h0_new = match poll_once(fut0.as_mut()) {
Poll::Ready(h) => h,
_ => panic!("fut0 should be Ready"),
};
let h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
_ => panic!("fut1 should be Ready"),
};
assert_eq!(h0_new.buf(), b"two-but-longer");
assert_eq!(h1_new.buf(), b"two-but-longer");
},
);
}
#[test]
fn wait_future_drop_reduces_total() {
let bufs: &[&[u8]] = &[b"data", b"next"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork(); let h1 = h0.fork();
{
let mut fut0 = core::pin::pin!(h0.next());
match poll_once(fut0.as_mut()) {
Poll::Pending => {}
Poll::Ready(_) => panic!(),
}
}
let mut fut1 = core::pin::pin!(h1.next());
let h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h1 should be loader after fut0 dropped"),
};
assert_eq!(h1_new.buf(), b"next");
},
);
}
#[test]
fn handle_drop_without_next_still_reaches_loader() {
let bufs: &[&[u8]] = &[b"drop", b"ok"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork(); let h1 = h0.fork();
drop(h0);
let mut fut1 = core::pin::pin!(h1.next());
let h = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("should be loader"),
};
assert_eq!(h.buf(), b"ok");
},
);
}
#[test]
fn all_handles_dropped_with_waiters_pending() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork(); let h1 = h0.fork();
let mut fut0 = core::pin::pin!(h0.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
drop(h1);
let h0_new = match poll_once(fut0.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut0 should load and resolve"),
};
assert_eq!(h0_new.buf(), b"loaded");
assert_eq!(h0_new.shared.total.get(), 1);
assert_eq!(h0_new.shared.remaining.get(), 1);
},
);
}
#[test]
fn cancel_during_loader_does_not_wedge_state() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
yield_now().await;
i += 1;
*b = bufs[i];
},
|shared| {
let h = shared.fork();
{
let mut fut = core::pin::pin!(h.next());
assert!(matches!(poll_once(fut.as_mut()), Poll::Pending));
}
let h2 = shared.fork();
assert_eq!(h2.buf(), b"initial");
let mut fut = core::pin::pin!(h2.next());
let h3 = match poll_once(fut.as_mut()) {
Poll::Pending => match poll_once(fut.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("loader stuck after cancel"),
},
Poll::Ready(h) => h,
};
assert_eq!(h3.buf(), b"loaded");
},
);
}
#[test]
fn two_waiters_second_polls_first_gets_correct_buf() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
yield_now().await;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let mut fut0 = core::pin::pin!(h0.next());
let mut fut1 = core::pin::pin!(h1.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
drop(h2);
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
let h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut1 should be ready after load"),
};
assert_eq!(h1_new.buf(), b"loaded");
let h0_new = match poll_once(fut0.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut0 should be ready after wake_all"),
};
assert_eq!(h0_new.buf(), b"loaded");
},
);
}
#[test]
fn fork_after_all_waiters_drop_during_needs_load() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
{
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
{
let mut fut0 = core::pin::pin!(h0.next());
let mut fut1 = core::pin::pin!(h1.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
drop(h2);
}
}
assert_eq!(shared.inner().total.get(), 0, "total leaked waiter slots");
assert_eq!(shared.inner().remaining.get(), 0);
let h = shared.fork();
assert_eq!(h.buf(), b"initial");
},
);
}
#[test]
fn cancelled_waiter_during_needs_load_hands_off_to_next_waiter() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
yield_now().await;
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let mut fut0 = core::pin::pin!(h0.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
{
let mut fut1 = core::pin::pin!(h1.next());
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
drop(h2);
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
}
let mut h0_new = None;
for _ in 0..8 {
if let Poll::Ready(h) = poll_once(fut0.as_mut()) {
h0_new = Some(h);
break;
}
}
let h0_new =
h0_new.expect("fut0 deadlocked: baton not handed off after fut1 cancel");
assert_eq!(h0_new.buf(), b"loaded");
},
);
}
#[test]
fn drop_stale_epoch_waiter_resets_against_clear_list() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let mut fut0 = core::pin::pin!(h0.next());
let mut fut1 = core::pin::pin!(h1.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
let mut fut2 = core::pin::pin!(h2.next());
let _h2_new = match poll_once(fut2.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h2 should be Ready (last handle)"),
};
let _h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut1 should be Ready after wake"),
};
#[allow(clippy::drop_non_drop)]
drop(fut0);
},
);
}
#[test]
fn drop_stale_epoch_waiter_needs_load_hands_off_baton() {
let bufs: &[&[u8]] = &[b"initial", b"loaded", b"third"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let mut fut0 = core::pin::pin!(h0.next());
let mut fut1 = core::pin::pin!(h1.next());
assert!(matches!(poll_once(fut0.as_mut()), Poll::Pending));
assert!(matches!(poll_once(fut1.as_mut()), Poll::Pending));
let mut fut2 = core::pin::pin!(h2.next());
let h2_new = match poll_once(fut2.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h2 should be Ready"),
};
let h1_new = match poll_once(fut1.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("fut1 should be Ready"),
};
drop(h1_new);
drop(h2_new);
#[allow(clippy::drop_non_drop)]
drop(fut0);
},
);
}
#[test]
fn reentrant_wake_drops_linked_stale_node() {
let bufs: &[&[u8]] = &[b"initial", b"loaded"];
let mut i = 0usize;
SharedBuf::with(
bufs[0],
async |b: &mut &[u8]| {
i += 1;
*b = bufs[i];
},
|shared| {
let mut h0 = shared.fork();
let mut h1 = h0.fork();
let h2 = h1.fork();
let fut0_storage: Cell<Option<strede_test_util::RawAction>> = Cell::new(None);
let _fut1_storage: Cell<Option<strede_test_util::RawAction>> = Cell::new(None);
let mut fut1 = core::pin::pin!(mem::ManuallyDrop::new(h1.next()));
macro_rules! fut1_pin {
($f:expr) => {
unsafe { $f.as_mut().map_unchecked_mut(|md| &mut **md) }
};
}
assert!(matches!(poll_once(fut1_pin!(fut1)), Poll::Pending));
let mut fut0 = core::pin::pin!(h0.next());
{
let w = strede_test_util::reentrant_waker(&fut0_storage);
let mut cx = Context::from_waker(&w);
assert!(matches!(fut0.as_mut().poll(&mut cx), Poll::Pending));
}
fn type_token<T>(_: &T) -> core::marker::PhantomData<fn(*const T)> {
core::marker::PhantomData
}
fn drop_erased<T>(_: core::marker::PhantomData<fn(*const T)>, ptr: *mut ()) {
unsafe { core::ptr::drop_in_place(ptr as *mut T) };
}
let token = type_token(unsafe { &**fut1.as_mut().get_unchecked_mut() });
let fut1_raw: *mut () =
unsafe { &mut **fut1.as_mut().get_unchecked_mut() } as *mut _ as *mut ();
fut0_storage.set(Some(strede_test_util::RawAction::new(move || {
drop_erased(token, fut1_raw);
})));
let mut fut2 = core::pin::pin!(h2.next());
let _h2_new = match poll_once(fut2.as_mut()) {
Poll::Ready(h) => h,
Poll::Pending => panic!("h2 should be Ready (last handle)"),
};
},
);
}
}