wit_bindgen/rt/async_support/future_support.rs
1//! Runtime support for `future<T>` in the component model.
2//!
3//! There are a number of tricky concerns to all balance when implementing
4//! bindings to `future<T>`, specifically with how it interacts with Rust. This
5//! will attempt to go over some of the high-level details of the implementation
6//! here.
7//!
8//! ## Leak safety
9//!
10//! It's safe to leak any value at any time currently in Rust. In other words
11//! Rust doesn't have linear types (yet). Typically this isn't really a problem
12//! but the component model intrinsics we're working with here operate by being
13//! given a pointer and then at some point in the future the pointer may be
14//! read. This means that it's our responsibility to keep this pointer alive and
15//! valid for the entire duration of an asynchronous operation.
16//!
17//! Chiefly this means that borrowed values are a no-no in this module. For
18//! example if you were to send a `&[u8]` as an implementation of
19//! `future<list<u8>>` that would not be sound. For example:
20//!
21//! * The future send operation is started, recording an address of `&[u8]`.
22//! * The future is then leaked.
23//! * According to rustc, later in code the original `&[u8]` is then no longer
24//! borrowed.
25//! * The original source of `&[u8]` could then be deallocated.
26//! * Then the component model actually reads the pointer that it was given.
27//!
28//! This constraint effectively means that all types flowing in-and-out of
29//! futures, streams, and async APIs are all "owned values", notably no
30//! lifetimes. This requires, for example, that `future<list<u8>>` operates on
31//! `Vec<u8>`.
32//!
33//! This is in stark contrast to bindings generated for `list<u8>` otherwise,
34//! however, where for example a synchronous import with a `list<u8>` argument
35//! would be bound with a `&[u8]` argument. Until Rust has some form of linear
36//! types, however, it's not possible to loosen this restriction soundly because
37//! it's generally not safe to leak an active I/O operation. This restriction is
38//! similar to why it's so difficult to bind `io_uring` in safe Rust, which
39//! operates similarly to the component model where pointers are submitted and
40//! read in the future after the original call for submission returns.
41//!
42//! ## Lowering Owned Values
43//!
44//! According to the above everything with futures/streams operates on owned
45//! values already, but this also affects precisely how lifting and lowering is
46//! performed. In general any active asynchronous operation could be cancelled
47//! at any time, meaning we have to deal with situations such as:
48//!
49//! * A `write` hasn't even started yet.
50//! * A `write` was started and then cancelled.
51//! * A `write` was started and then the other end dropped the channel.
52//! * A `write` was started and then the other end received the value.
53//!
54//! In all of these situations regardless of the structure of `T` we can't leak
55//! memory. The `future.write` intrinsic, however, takes no ownership of the
56//! memory involved which means that we're still responsible for cleaning up
57//! lists. It does take ownership, however, of `own<T>` handles and other
58//! resources.
59//!
60//! The way that this is solved for futures/streams is to lean further into
61//! processing owned values. Namely lowering a `T` takes `T`-by-value, not `&T`.
62//! This means that lowering operates similarly to return values of exported
63//! functions, not parameters to imported functions. By lowering an owned value
64//! of `T` this preserves a nice property where the lowered value has exclusive
65//! ownership of all of its pointers/resources/etc. Lowering `&T` may require a
66//! "cleanup list" for example which we avoid here entirely.
67//!
68//! This then makes the second and third cases above, getting a value back after
69//! lowering, much easier. Namely re-acquisition of a value is simple `lift`
70//! operation as if we received a value on the channel.
71//!
72//! ## Inefficiencies
73//!
74//! The above requirements generally mean that this is not a hyper-efficient
75//! implementation. All writes and reads, for example, start out with allocation
76//! memory on the heap to be owned by the asynchronous operation. Writing a
77//! `list<u8>` to a future passes ownership of `Vec<u8>` but in theory doesn't
78//! not actually require relinquishing ownership of the vector. Furthermore
79//! there's no way to re-acquire a `T` after it has been sent, but all of `T` is
80//! still valid except for `own<U>` resources.
81//!
82//! That's all to say that this implementation can probably still be improved
83//! upon, but doing so is thought to be pretty nontrivial at this time. It
84//! should be noted though that there are other high-level inefficiencies with
85//! WIT unrelated to this module. For example `list<T>` is not always
86//! represented the same in Rust as it is in the canonical ABI. That means that
87//! sending `list<T>` into a future might require copying the entire list and
88//! changing its layout. Currently this is par-for-the-course with bindings.
89//!
90//! ## Linear (exactly once) Writes
91//!
92//! The component model requires that a writable end of a future must be written
93//! to before closing, otherwise the drop operation traps. Ideally usage of
94//! this API shouldn't result in traps so this is modeled in the Rust-level API
95//! to prevent this trap from occurring. Rust does not support linear types
96//! (types that must be used exactly once), instead it only has affine types
97//! (types which must be used at most once), meaning that this requires some
98//! runtime support.
99//!
100//! Specifically the `FutureWriter` structure stores two auxiliary Rust-specific
101//! pieces of information:
102//!
103//! * A `should_write_default_value` boolean - if `true` on destruction then a
104//! value has not yet been written and something must be written.
105//! * A `default: fn() -> T` constructor to lazily create the default value to
106//! be sent in this situation.
107//!
108//! This `default` field is provided by the user when the future is initially
109//! created. Additionally during `Drop` a new Rust-level task will be spawned to
110//! perform the write in the background. That'll keep the component-level task
111//! alive until that write completes but otherwise shouldn't hinder anything
112//! else.
113
114use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
115use crate::rt::async_support::ReturnCode;
116use crate::rt::Cleanup;
117use std::alloc::Layout;
118use std::fmt;
119use std::future::{Future, IntoFuture};
120use std::marker;
121use std::pin::Pin;
122use std::ptr;
123use std::sync::atomic::{AtomicU32, Ordering::Relaxed};
124use std::sync::{Arc, Mutex};
125use std::task::{Context, Poll, Wake, Waker};
126
127/// Function table used for [`FutureWriter`] and [`FutureReader`]
128///
129/// Instances of this table are generated by `wit_bindgen::generate!`. This is
130/// not a trait to enable different `FutureVtable<()>` instances to exist, for
131/// example, through different calls to `wit_bindgen::generate!`.
132///
133/// It's not intended that any user implements this vtable, instead it's
134/// intended to only be auto-generated.
135#[doc(hidden)]
136pub struct FutureVtable<T> {
137 /// The Canonical ABI layout of `T` in-memory.
138 pub layout: Layout,
139
140 /// A callback to consume a value of `T` and lower it to the canonical ABI
141 /// pointed to by `dst`.
142 ///
143 /// The `dst` pointer should have `self.layout`. This is used to convert
144 /// in-memory representations in Rust to their canonical representations in
145 /// the component model.
146 pub lower: unsafe fn(value: T, dst: *mut u8),
147
148 /// A callback to deallocate any lists within the canonical ABI value `dst`
149 /// provided.
150 ///
151 /// This is used when a value is successfully sent to another component. In
152 /// such a situation it may be possible that the canonical lowering of `T`
153 /// has lists that are still owned by this component and must be
154 /// deallocated. This is akin to a `post-return` callback for returns of
155 /// exported functions.
156 pub dealloc_lists: unsafe fn(dst: *mut u8),
157
158 /// A callback to lift a value of `T` from the canonical ABI representation
159 /// provided.
160 pub lift: unsafe fn(dst: *mut u8) -> T,
161
162 /// The raw `future.write` intrinsic.
163 pub start_write: unsafe extern "C" fn(future: u32, val: *const u8) -> u32,
164 /// The raw `future.read` intrinsic.
165 pub start_read: unsafe extern "C" fn(future: u32, val: *mut u8) -> u32,
166 /// The raw `future.cancel-write` intrinsic.
167 pub cancel_write: unsafe extern "C" fn(future: u32) -> u32,
168 /// The raw `future.cancel-read` intrinsic.
169 pub cancel_read: unsafe extern "C" fn(future: u32) -> u32,
170 /// The raw `future.drop-writable` intrinsic.
171 pub drop_writable: unsafe extern "C" fn(future: u32),
172 /// The raw `future.drop-readable` intrinsic.
173 pub drop_readable: unsafe extern "C" fn(future: u32),
174 /// The raw `future.new` intrinsic.
175 pub new: unsafe extern "C" fn() -> u64,
176}
177
178/// Helper function to create a new read/write pair for a component model
179/// future.
180///
181/// # Unsafety
182///
183/// This function is unsafe as it requires the functions within `vtable` to
184/// correctly uphold the contracts of the component model.
185pub unsafe fn future_new<T>(
186 default: fn() -> T,
187 vtable: &'static FutureVtable<T>,
188) -> (FutureWriter<T>, FutureReader<T>) {
189 unsafe {
190 let handles = (vtable.new)();
191 let reader = handles as u32;
192 let writer = (handles >> 32) as u32;
193 rtdebug!("future.new() = [{writer}, {reader}]");
194 (
195 FutureWriter::new(writer, default, vtable),
196 FutureReader::new(reader, vtable),
197 )
198 }
199}
200
201/// Represents the writable end of a Component Model `future`.
202///
203/// A [`FutureWriter`] can be used to send a single value of `T` to the other
204/// end of a `future`. In a sense this is similar to a oneshot channel in Rust.
205pub struct FutureWriter<T: 'static> {
206 handle: u32,
207 vtable: &'static FutureVtable<T>,
208
209 /// Whether or not a value should be written during `drop`.
210 ///
211 /// This is set to `false` when a value is successfully written or when a
212 /// value is written but the future is witnessed as being dropped.
213 ///
214 /// Note that this is set to `true` on construction to ensure that only
215 /// location which actually witness a completed write set it to `false`.
216 should_write_default_value: bool,
217
218 /// Constructor for the default value to write during `drop`, should one
219 /// need to be written.
220 default: fn() -> T,
221}
222
223impl<T> FutureWriter<T> {
224 /// Helper function to wrap a handle/vtable into a `FutureWriter`.
225 ///
226 /// # Unsafety
227 ///
228 /// This function is unsafe as it requires the functions within `vtable` to
229 /// correctly uphold the contracts of the component model.
230 #[doc(hidden)]
231 pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable<T>) -> Self {
232 Self {
233 handle,
234 default,
235 should_write_default_value: true,
236 vtable,
237 }
238 }
239
240 /// Write the specified `value` to this `future`.
241 ///
242 /// This method is equivalent to an `async fn` which sends the `value` into
243 /// this future. The asynchronous operation acts as a rendezvous where the
244 /// operation does not complete until the other side has successfully
245 /// received the value.
246 ///
247 /// # Return Value
248 ///
249 /// The returned [`FutureWrite`] is a future that can be `.await`'d. The
250 /// return value of this future is:
251 ///
252 /// * `Ok(())` - the `value` was sent and received. The `self` value was
253 /// consumed along the way and will no longer be accessible.
254 /// * `Err(FutureWriteError { value })` - an attempt was made to send
255 /// `value` but the other half of this [`FutureWriter`] was dropped before
256 /// the value was received. This consumes `self` because the channel is
257 /// now dropped, but `value` is returned in case the caller wants to reuse
258 /// it.
259 ///
260 /// # Cancellation
261 ///
262 /// The returned future can be cancelled normally via `drop` which means
263 /// that the `value` provided here, along with this `FutureWriter` itself,
264 /// will be lost. There is also [`FutureWrite::cancel`] which can be used to
265 /// possibly re-acquire `value` and `self` if the operation was cancelled.
266 /// In such a situation the operation can be retried at a future date.
267 pub fn write(self, value: T) -> FutureWrite<T> {
268 FutureWrite {
269 op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)),
270 }
271 }
272}
273
274impl<T> fmt::Debug for FutureWriter<T> {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 f.debug_struct("FutureWriter")
277 .field("handle", &self.handle)
278 .finish()
279 }
280}
281
282impl<T> Drop for FutureWriter<T> {
283 fn drop(&mut self) {
284 // If a value has not yet been written into this writer than that must
285 // be done so now. Perform a "clone" of `self` ensuring that
286 // `should_write_default_value` is set to `false` to avoid infinite
287 // loops by accident. The cloned `FutureWriter` will be responsible for
288 // performing the `drop-writable` call below once the write has
289 // completed.
290 //
291 // Note, though, that if `should_write_default_value` is `false` then a
292 // write has already happened and we can go ahead and just synchronously
293 // drop this writer as we would any other handle.
294 if self.should_write_default_value {
295 let clone = FutureWriter {
296 handle: self.handle,
297 default: self.default,
298 should_write_default_value: false,
299 vtable: self.vtable,
300 };
301 let value = (clone.default)();
302 let write = clone.write(value);
303 Arc::new(DeferredWrite::new(write)).wake();
304 } else {
305 unsafe {
306 rtdebug!("future.drop-writable({})", self.handle);
307 (self.vtable.drop_writable)(self.handle);
308 }
309 }
310 }
311}
312
313/// Helper structure which behaves both as a future of sorts and an executor of
314/// sorts.
315///
316/// This type is constructed in `Drop for FutureWriter<T>` to send out a
317/// default value when no other has been written. This manages the
318/// `FutureWrite` operation happening internally through a `Wake`
319/// implementation. That means that this is a sort of cyclical future which,
320/// when woken, will complete the write operation.
321///
322/// The purpose of this is to be a "lightweight" way of "spawn"-ing a future
323/// write to happen in the background. Crucially, however, this doesn't require
324/// the `async-spawn` feature and instead works with the `wasip3_task` C ABI
325/// structures (which spawn doesn't support).
326struct DeferredWrite<T: 'static> {
327 write: Mutex<FutureWrite<T>>,
328}
329
330// TODO
331unsafe impl<T> Send for DeferredWrite<T> {}
332unsafe impl<T> Sync for DeferredWrite<T> {}
333
334impl<T> DeferredWrite<T> {
335 fn new(write: FutureWrite<T>) -> DeferredWrite<T> {
336 DeferredWrite {
337 write: Mutex::new(write),
338 }
339 }
340}
341
342impl<T> Wake for DeferredWrite<T> {
343 fn wake(self: Arc<Self>) {
344 // When a `wake` signal comes in that should happen in two locations:
345 //
346 // 1. When `DeferredWrite` is initially constructed.
347 // 2. When an event comes in indicating that the internal write has
348 // completed.
349 //
350 // The implementation here is the same in both cases. A clone of `self`
351 // is converted to a `Waker`, and this `Waker` notably owns the
352 // internal future itself. The internal write operation is then pushed
353 // forward (e.g. it's issued in (1) or checked up on in (2)).
354 //
355 // If `Pending` is returned then `waker` should have been stored away
356 // within the `wasip3_task` C ABI structure. Otherwise it should not
357 // have been stored away and `self` should be the sole reference which
358 // means everything will get cleaned up when this function returns.
359 let poll = {
360 let waker = Waker::from(self.clone());
361 let mut cx = Context::from_waker(&waker);
362 let mut write = self.write.lock().unwrap();
363 unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) }
364 };
365 if poll.is_ready() {
366 assert_eq!(Arc::strong_count(&self), 1);
367 } else {
368 assert!(Arc::strong_count(&self) > 1);
369 }
370 assert_eq!(Arc::weak_count(&self), 0);
371 }
372}
373
374/// Represents a write operation which may be cancelled prior to completion.
375///
376/// This is returned by [`FutureWriter::write`].
377pub struct FutureWrite<T: 'static> {
378 op: WaitableOperation<FutureWriteOp<T>>,
379}
380
381struct FutureWriteOp<T>(marker::PhantomData<T>);
382
383enum WriteComplete<T> {
384 Written,
385 Dropped(T),
386 Cancelled(T),
387}
388
389unsafe impl<T> WaitableOp for FutureWriteOp<T>
390where
391 T: 'static,
392{
393 type Start = (FutureWriter<T>, T);
394 type InProgress = (FutureWriter<T>, Option<Cleanup>);
395 type Result = (WriteComplete<T>, FutureWriter<T>);
396 type Cancel = FutureWriteCancel<T>;
397
398 fn start(&self, (writer, value): Self::Start) -> (u32, Self::InProgress) {
399 // TODO: it should be safe to store the lower-destination in
400 // `WaitableOperation` using `Pin` memory and such, but that would
401 // require some type-level trickery to get a correctly-sized value
402 // plumbed all the way to here. For now just dynamically allocate it and
403 // leave the optimization of leaving out this dynamic allocation to the
404 // future.
405 //
406 // In lieu of that a dedicated location on the heap is created for the
407 // lowering, and then `value`, as an owned value, is lowered into this
408 // pointer to initialize it.
409 let (ptr, cleanup) = Cleanup::new(writer.vtable.layout);
410 // SAFETY: `ptr` is allocated with `vtable.layout` and should be
411 // safe to use here.
412 let code = unsafe {
413 (writer.vtable.lower)(value, ptr);
414 (writer.vtable.start_write)(writer.handle, ptr)
415 };
416 rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle);
417 (code, (writer, cleanup))
418 }
419
420 fn start_cancelled(&self, (writer, value): Self::Start) -> Self::Cancel {
421 FutureWriteCancel::Cancelled(value, writer)
422 }
423
424 fn in_progress_update(
425 &self,
426 (mut writer, cleanup): Self::InProgress,
427 code: u32,
428 ) -> Result<Self::Result, Self::InProgress> {
429 let ptr = cleanup
430 .as_ref()
431 .map(|c| c.ptr.as_ptr())
432 .unwrap_or(ptr::null_mut());
433 match code {
434 super::BLOCKED => Err((writer, cleanup)),
435
436 // The other end has dropped its end.
437 //
438 // The value was not received by the other end so `ptr` still has
439 // all of its resources intact. Use `lift` to construct a new
440 // instance of `T` which takes ownership of pointers and resources
441 // and such. The allocation of `ptr` is then cleaned up naturally
442 // when `cleanup` goes out of scope.
443 super::DROPPED | super::CANCELLED => {
444 // SAFETY: we're the ones managing `ptr` so we know it's safe to
445 // pass here.
446 let value = unsafe { (writer.vtable.lift)(ptr) };
447 let status = if code == super::DROPPED {
448 // This writer has been witnessed to be dropped, meaning that
449 // `writer` is going to get destroyed soon as this return
450 // value propagates up the stack. There's no need to write
451 // the default value, so set this to `false`.
452 writer.should_write_default_value = false;
453 WriteComplete::Dropped(value)
454 } else {
455 WriteComplete::Cancelled(value)
456 };
457 Ok((status, writer))
458 }
459
460 // This write has completed.
461 //
462 // Here we need to clean up our allocations. The `ptr` exclusively
463 // owns all of the value being sent and we notably need to cleanup
464 // the transitive list allocations present in this pointer. Use
465 // `dealloc_lists` for that (effectively a post-return lookalike).
466 //
467 // Afterwards the `cleanup` itself is naturally dropped and cleaned
468 // up.
469 super::COMPLETED => {
470 // A value was written, so no need to write the default value.
471 writer.should_write_default_value = false;
472
473 // SAFETY: we're the ones managing `ptr` so we know it's safe to
474 // pass here.
475 unsafe {
476 (writer.vtable.dealloc_lists)(ptr);
477 }
478 Ok((WriteComplete::Written, writer))
479 }
480
481 other => unreachable!("unexpected code {other:?}"),
482 }
483 }
484
485 fn in_progress_waitable(&self, (writer, _): &Self::InProgress) -> u32 {
486 writer.handle
487 }
488
489 fn in_progress_cancel(&self, (writer, _): &Self::InProgress) -> u32 {
490 // SAFETY: we're managing `writer` and all the various operational bits,
491 // so this relies on `WaitableOperation` being safe.
492 let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
493 rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle);
494 code
495 }
496
497 fn result_into_cancel(&self, (result, writer): Self::Result) -> Self::Cancel {
498 match result {
499 // The value was actually sent, meaning we can't yield back the
500 // future nor the value.
501 WriteComplete::Written => FutureWriteCancel::AlreadySent,
502
503 // The value was not sent because the other end either hung up or we
504 // successfully cancelled. In both cases return back the value here
505 // with the writer.
506 WriteComplete::Dropped(val) => FutureWriteCancel::Dropped(val),
507 WriteComplete::Cancelled(val) => FutureWriteCancel::Cancelled(val, writer),
508 }
509 }
510}
511
512impl<T: 'static> Future for FutureWrite<T> {
513 type Output = Result<(), FutureWriteError<T>>;
514
515 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
516 self.pin_project()
517 .poll_complete(cx)
518 .map(|(result, _writer)| match result {
519 WriteComplete::Written => Ok(()),
520 WriteComplete::Dropped(value) | WriteComplete::Cancelled(value) => {
521 Err(FutureWriteError { value })
522 }
523 })
524 }
525}
526
527impl<T: 'static> FutureWrite<T> {
528 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureWriteOp<T>>> {
529 // SAFETY: we've chosen that when `Self` is pinned that it translates to
530 // always pinning the inner field, so that's codified here.
531 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
532 }
533
534 /// Cancel this write if it hasn't already completed.
535 ///
536 /// This method can be used to cancel a write-in-progress and re-acquire
537 /// the writer and the value being sent. Note that the write operation may
538 /// succeed racily or the other end may also drop racily, and these
539 /// outcomes are reflected in the returned value here.
540 ///
541 /// # Panics
542 ///
543 /// Panics if the operation has already been completed via `Future::poll`,
544 /// or if this method is called twice.
545 pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel<T> {
546 self.pin_project().cancel()
547 }
548}
549
550/// Error type in the result of [`FutureWrite`], or the error type that is a result of
551/// a failure to write a future.
552pub struct FutureWriteError<T> {
553 /// The value that could not be sent.
554 pub value: T,
555}
556
557impl<T> fmt::Debug for FutureWriteError<T> {
558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559 f.debug_struct("FutureWriteError").finish_non_exhaustive()
560 }
561}
562
563impl<T> fmt::Display for FutureWriteError<T> {
564 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565 "read end dropped".fmt(f)
566 }
567}
568
569impl<T> std::error::Error for FutureWriteError<T> {}
570
571/// Result of [`FutureWrite::cancel`].
572#[derive(Debug)]
573pub enum FutureWriteCancel<T: 'static> {
574 /// The cancel request raced with the receipt of the sent value, and the
575 /// value was actually sent. Neither the value nor the writer are made
576 /// available here as both are gone.
577 AlreadySent,
578
579 /// The other end was dropped before cancellation happened.
580 ///
581 /// In this case the original value is returned back to the caller but the
582 /// writer itself is not longer accessible as it's no longer usable.
583 Dropped(T),
584
585 /// The pending write was successfully cancelled and the value being written
586 /// is returned along with the writer to resume again in the future if
587 /// necessary.
588 Cancelled(T, FutureWriter<T>),
589}
590
591/// Represents the readable end of a Component Model `future<T>`.
592pub struct FutureReader<T: 'static> {
593 handle: AtomicU32,
594 vtable: &'static FutureVtable<T>,
595}
596
597impl<T> fmt::Debug for FutureReader<T> {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 f.debug_struct("FutureReader")
600 .field("handle", &self.handle)
601 .finish()
602 }
603}
604
605impl<T> FutureReader<T> {
606 #[doc(hidden)]
607 pub fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
608 Self {
609 handle: AtomicU32::new(handle),
610 vtable,
611 }
612 }
613
614 #[doc(hidden)]
615 pub fn take_handle(&self) -> u32 {
616 let ret = self.opt_handle().unwrap();
617 self.handle.store(u32::MAX, Relaxed);
618 ret
619 }
620
621 fn handle(&self) -> u32 {
622 self.opt_handle().unwrap()
623 }
624
625 fn opt_handle(&self) -> Option<u32> {
626 match self.handle.load(Relaxed) {
627 u32::MAX => None,
628 other => Some(other),
629 }
630 }
631}
632
633impl<T> IntoFuture for FutureReader<T> {
634 type Output = T;
635 type IntoFuture = FutureRead<T>;
636
637 /// Convert this object into a `Future` which will resolve when a value is
638 /// written to the writable end of this `future`.
639 fn into_future(self) -> Self::IntoFuture {
640 FutureRead {
641 op: WaitableOperation::new(FutureReadOp(marker::PhantomData), self),
642 }
643 }
644}
645
646impl<T> Drop for FutureReader<T> {
647 fn drop(&mut self) {
648 let Some(handle) = self.opt_handle() else {
649 return;
650 };
651 unsafe {
652 rtdebug!("future.drop-readable({handle})");
653 (self.vtable.drop_readable)(handle);
654 }
655 }
656}
657
658/// Represents a read operation which may be cancelled prior to completion.
659///
660/// This represents a read operation on a [`FutureReader`] and is created via
661/// `IntoFuture`.
662pub struct FutureRead<T: 'static> {
663 op: WaitableOperation<FutureReadOp<T>>,
664}
665
666struct FutureReadOp<T>(marker::PhantomData<T>);
667
668enum ReadComplete<T> {
669 Value(T),
670 Cancelled,
671}
672
673unsafe impl<T> WaitableOp for FutureReadOp<T>
674where
675 T: 'static,
676{
677 type Start = FutureReader<T>;
678 type InProgress = (FutureReader<T>, Option<Cleanup>);
679 type Result = (ReadComplete<T>, FutureReader<T>);
680 type Cancel = Result<T, FutureReader<T>>;
681
682 fn start(&self, reader: Self::Start) -> (u32, Self::InProgress) {
683 let (ptr, cleanup) = Cleanup::new(reader.vtable.layout);
684 // SAFETY: `ptr` is allocated with `vtable.layout` and should be
685 // safe to use here. Its lifetime for the async operation is hinged on
686 // `WaitableOperation` being safe.
687 let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr) };
688 rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle());
689 (code, (reader, cleanup))
690 }
691
692 fn start_cancelled(&self, state: Self::Start) -> Self::Cancel {
693 Err(state)
694 }
695
696 fn in_progress_update(
697 &self,
698 (reader, cleanup): Self::InProgress,
699 code: u32,
700 ) -> Result<Self::Result, Self::InProgress> {
701 match ReturnCode::decode(code) {
702 ReturnCode::Blocked => Err((reader, cleanup)),
703
704 // Let `cleanup` fall out of scope to clean up its allocation here,
705 // and otherwise tahe reader is plumbed through to possibly restart
706 // the read in the future.
707 ReturnCode::Cancelled(0) => Ok((ReadComplete::Cancelled, reader)),
708
709 // The read has completed, so lift the value from the stored memory and
710 // `cleanup` naturally falls out of scope after transferring ownership of
711 // everything to the returned `value`.
712 ReturnCode::Completed(0) => {
713 let ptr = cleanup
714 .as_ref()
715 .map(|c| c.ptr.as_ptr())
716 .unwrap_or(ptr::null_mut());
717
718 // SAFETY: we're the ones managing `ptr` so we know it's safe to
719 // pass here.
720 let value = unsafe { (reader.vtable.lift)(ptr) };
721 Ok((ReadComplete::Value(value), reader))
722 }
723
724 other => panic!("unexpected code {other:?}"),
725 }
726 }
727
728 fn in_progress_waitable(&self, (reader, _): &Self::InProgress) -> u32 {
729 reader.handle()
730 }
731
732 fn in_progress_cancel(&self, (reader, _): &Self::InProgress) -> u32 {
733 // SAFETY: we're managing `reader` and all the various operational bits,
734 // so this relies on `WaitableOperation` being safe.
735 let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
736 rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle());
737 code
738 }
739
740 fn result_into_cancel(&self, (value, reader): Self::Result) -> Self::Cancel {
741 match value {
742 // The value was actually read, so thread that through here.
743 ReadComplete::Value(value) => Ok(value),
744
745 // The read was successfully cancelled, so thread through the
746 // `reader` to possibly restart later on.
747 ReadComplete::Cancelled => Err(reader),
748 }
749 }
750}
751
752impl<T: 'static> Future for FutureRead<T> {
753 type Output = T;
754
755 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
756 self.pin_project()
757 .poll_complete(cx)
758 .map(|(result, _reader)| match result {
759 ReadComplete::Value(val) => val,
760 // This is only possible if, after calling `FutureRead::cancel`,
761 // the future is polled again. The `cancel` method is documented
762 // as "don't do that" so this is left to panic.
763 ReadComplete::Cancelled => panic!("cannot poll after cancelling"),
764 })
765 }
766}
767
768impl<T> FutureRead<T> {
769 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureReadOp<T>>> {
770 // SAFETY: we've chosen that when `Self` is pinned that it translates to
771 // always pinning the inner field, so that's codified here.
772 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
773 }
774
775 /// Cancel this read if it hasn't already completed.
776 ///
777 /// Return values include:
778 ///
779 /// * `Ok(value)` - future completed before this cancellation request
780 /// was received.
781 /// * `Err(reader)` - read operation was cancelled and it can be retried in
782 /// the future if desired.
783 ///
784 /// # Panics
785 ///
786 /// Panics if the operation has already been completed via `Future::poll`,
787 /// or if this method is called twice. Additionally if this method completes
788 /// then calling `poll` again on `self` will panic.
789 pub fn cancel(self: Pin<&mut Self>) -> Result<T, FutureReader<T>> {
790 self.pin_project().cancel()
791 }
792}