wit_bindgen/rt/async_support/
future_support.rs

1//! Runtime support for `future<T>` in the component model.
2//!
3//! There are a number of tricky concerns to all balance when implementing
4//! bindings to `future<T>`, specifically with how it interacts with Rust. This
5//! will attempt to go over some of the high-level details of the implementation
6//! here.
7//!
8//! ## Leak safety
9//!
10//! It's safe to leak any value at any time currently in Rust. In other words
11//! Rust doesn't have linear types (yet). Typically this isn't really a problem
12//! but the component model intrinsics we're working with here operate by being
13//! given a pointer and then at some point in the future the pointer may be
14//! read. This means that it's our responsibility to keep this pointer alive and
15//! valid for the entire duration of an asynchronous operation.
16//!
17//! Chiefly this means that borrowed values are a no-no in this module. For
18//! example if you were to send a `&[u8]` as an implementation of
19//! `future<list<u8>>` that would not be sound. For example:
20//!
21//! * The future send operation is started, recording an address of `&[u8]`.
22//! * The future is then leaked.
23//! * According to rustc, later in code the original `&[u8]` is then no longer
24//!   borrowed.
25//! * The original source of `&[u8]` could then be deallocated.
26//! * Then the component model actually reads the pointer that it was given.
27//!
28//! This constraint effectively means that all types flowing in-and-out of
29//! futures, streams, and async APIs are all "owned values", notably no
30//! lifetimes. This requires, for example, that `future<list<u8>>` operates on
31//! `Vec<u8>`.
32//!
33//! This is in stark contrast to bindings generated for `list<u8>` otherwise,
34//! however, where for example a synchronous import with a `list<u8>` argument
35//! would be bound with a `&[u8]` argument. Until Rust has some form of linear
36//! types, however, it's not possible to loosen this restriction soundly because
37//! it's generally not safe to leak an active I/O operation. This restriction is
38//! similar to why it's so difficult to bind `io_uring` in safe Rust, which
39//! operates similarly to the component model where pointers are submitted and
40//! read in the future after the original call for submission returns.
41//!
42//! ## Lowering Owned Values
43//!
44//! According to the above everything with futures/streams operates on owned
45//! values already, but this also affects precisely how lifting and lowering is
46//! performed. In general any active asynchronous operation could be cancelled
47//! at any time, meaning we have to deal with situations such as:
48//!
49//! * A `write` hasn't even started yet.
50//! * A `write` was started and then cancelled.
51//! * A `write` was started and then the other end dropped the channel.
52//! * A `write` was started and then the other end received the value.
53//!
54//! In all of these situations regardless of the structure of `T` we can't leak
55//! memory. The `future.write` intrinsic, however, takes no ownership of the
56//! memory involved which means that we're still responsible for cleaning up
57//! lists. It does take ownership, however, of `own<T>` handles and other
58//! resources.
59//!
60//! The way that this is solved for futures/streams is to lean further into
61//! processing owned values. Namely lowering a `T` takes `T`-by-value, not `&T`.
62//! This means that lowering operates similarly to return values of exported
63//! functions, not parameters to imported functions. By lowering an owned value
64//! of `T` this preserves a nice property where the lowered value has exclusive
65//! ownership of all of its pointers/resources/etc. Lowering `&T` may require a
66//! "cleanup list" for example which we avoid here entirely.
67//!
68//! This then makes the second and third cases above, getting a value back after
69//! lowering, much easier. Namely re-acquisition of a value is simple `lift`
70//! operation as if we received a value on the channel.
71//!
72//! ## Inefficiencies
73//!
74//! The above requirements generally mean that this is not a hyper-efficient
75//! implementation. All writes and reads, for example, start out with allocation
76//! memory on the heap to be owned by the asynchronous operation. Writing a
77//! `list<u8>` to a future passes ownership of `Vec<u8>` but in theory doesn't
78//! not actually require relinquishing ownership of the vector. Furthermore
79//! there's no way to re-acquire a `T` after it has been sent, but all of `T` is
80//! still valid except for `own<U>` resources.
81//!
82//! That's all to say that this implementation can probably still be improved
83//! upon, but doing so is thought to be pretty nontrivial at this time. It
84//! should be noted though that there are other high-level inefficiencies with
85//! WIT unrelated to this module. For example `list<T>` is not always
86//! represented the same in Rust as it is in the canonical ABI. That means that
87//! sending `list<T>` into a future might require copying the entire list and
88//! changing its layout. Currently this is par-for-the-course with bindings.
89//!
90//! ## Linear (exactly once) Writes
91//!
92//! The component model requires that a writable end of a future must be written
93//! to before closing, otherwise the drop operation traps. Ideally usage of
94//! this API shouldn't result in traps so this is modeled in the Rust-level API
95//! to prevent this trap from occurring. Rust does not support linear types
96//! (types that must be used exactly once), instead it only has affine types
97//! (types which must be used at most once), meaning that this requires some
98//! runtime support.
99//!
100//! Specifically the `FutureWriter` structure stores two auxiliary Rust-specific
101//! pieces of information:
102//!
103//! * A `should_write_default_value` boolean - if `true` on destruction then a
104//!   value has not yet been written and something must be written.
105//! * A `default: fn() -> T` constructor to lazily create the default value to
106//!   be sent in this situation.
107//!
108//! This `default` field is provided by the user when the future is initially
109//! created. Additionally during `Drop` a new Rust-level task will be spawned to
110//! perform the write in the background. That'll keep the component-level task
111//! alive until that write completes but otherwise shouldn't hinder anything
112//! else.
113
114use {
115    crate::rt::async_support::waitable::{WaitableOp, WaitableOperation},
116    crate::rt::async_support::ReturnCode,
117    crate::rt::Cleanup,
118    std::{
119        alloc::Layout,
120        fmt,
121        future::{Future, IntoFuture},
122        marker,
123        pin::Pin,
124        ptr,
125        sync::atomic::{AtomicU32, Ordering::Relaxed},
126        task::{Context, Poll},
127    },
128};
129
130/// Function table used for [`FutureWriter`] and [`FutureReader`]
131///
132/// Instances of this table are generated by `wit_bindgen::generate!`. This is
133/// not a trait to enable different `FutureVtable<()>` instances to exist, for
134/// example, through different calls to `wit_bindgen::generate!`.
135///
136/// It's not intended that any user implements this vtable, instead it's
137/// intended to only be auto-generated.
138#[doc(hidden)]
139pub struct FutureVtable<T> {
140    /// The Canonical ABI layout of `T` in-memory.
141    pub layout: Layout,
142
143    /// A callback to consume a value of `T` and lower it to the canonical ABI
144    /// pointed to by `dst`.
145    ///
146    /// The `dst` pointer should have `self.layout`. This is used to convert
147    /// in-memory representations in Rust to their canonical representations in
148    /// the component model.
149    pub lower: unsafe fn(value: T, dst: *mut u8),
150
151    /// A callback to deallocate any lists within the canonical ABI value `dst`
152    /// provided.
153    ///
154    /// This is used when a value is successfully sent to another component. In
155    /// such a situation it may be possible that the canonical lowering of `T`
156    /// has lists that are still owned by this component and must be
157    /// deallocated. This is akin to a `post-return` callback for returns of
158    /// exported functions.
159    pub dealloc_lists: unsafe fn(dst: *mut u8),
160
161    /// A callback to lift a value of `T` from the canonical ABI representation
162    /// provided.
163    pub lift: unsafe fn(dst: *mut u8) -> T,
164
165    /// The raw `future.write` intrinsic.
166    pub start_write: unsafe extern "C" fn(future: u32, val: *const u8) -> u32,
167    /// The raw `future.read` intrinsic.
168    pub start_read: unsafe extern "C" fn(future: u32, val: *mut u8) -> u32,
169    /// The raw `future.cancel-write` intrinsic.
170    pub cancel_write: unsafe extern "C" fn(future: u32) -> u32,
171    /// The raw `future.cancel-read` intrinsic.
172    pub cancel_read: unsafe extern "C" fn(future: u32) -> u32,
173    /// The raw `future.drop-writable` intrinsic.
174    pub drop_writable: unsafe extern "C" fn(future: u32),
175    /// The raw `future.drop-readable` intrinsic.
176    pub drop_readable: unsafe extern "C" fn(future: u32),
177    /// The raw `future.new` intrinsic.
178    pub new: unsafe extern "C" fn() -> u64,
179}
180
181/// Helper function to create a new read/write pair for a component model
182/// future.
183///
184/// # Unsafety
185///
186/// This function is unsafe as it requires the functions within `vtable` to
187/// correctly uphold the contracts of the component model.
188pub unsafe fn future_new<T>(
189    default: fn() -> T,
190    vtable: &'static FutureVtable<T>,
191) -> (FutureWriter<T>, FutureReader<T>) {
192    unsafe {
193        let handles = (vtable.new)();
194        let reader = handles as u32;
195        let writer = (handles >> 32) as u32;
196        rtdebug!("future.new() = [{writer}, {reader}]");
197        (
198            FutureWriter::new(writer, default, vtable),
199            FutureReader::new(reader, vtable),
200        )
201    }
202}
203
204/// Represents the writable end of a Component Model `future`.
205///
206/// A [`FutureWriter`] can be used to send a single value of `T` to the other
207/// end of a `future`. In a sense this is similar to a oneshot channel in Rust.
208pub struct FutureWriter<T: 'static> {
209    handle: u32,
210    vtable: &'static FutureVtable<T>,
211
212    /// Whether or not a value should be written during `drop`.
213    ///
214    /// This is set to `false` when a value is successfully written or when a
215    /// value is written but the future is witnessed as being dropped.
216    ///
217    /// Note that this is set to `true` on construction to ensure that only
218    /// location which actually witness a completed write set it to `false`.
219    should_write_default_value: bool,
220
221    /// Constructor for the default value to write during `drop`, should one
222    /// need to be written.
223    default: fn() -> T,
224}
225
226impl<T> FutureWriter<T> {
227    /// Helper function to wrap a handle/vtable into a `FutureWriter`.
228    ///
229    /// # Unsafety
230    ///
231    /// This function is unsafe as it requires the functions within `vtable` to
232    /// correctly uphold the contracts of the component model.
233    #[doc(hidden)]
234    pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable<T>) -> Self {
235        Self {
236            handle,
237            default,
238            should_write_default_value: true,
239            vtable,
240        }
241    }
242
243    /// Write the specified `value` to this `future`.
244    ///
245    /// This method is equivalent to an `async fn` which sends the `value` into
246    /// this future. The asynchronous operation acts as a rendezvous where the
247    /// operation does not complete until the other side has successfully
248    /// received the value.
249    ///
250    /// # Return Value
251    ///
252    /// The returned [`FutureWrite`] is a future that can be `.await`'d. The
253    /// return value of this future is:
254    ///
255    /// * `Ok(())` - the `value` was sent and received. The `self` value was
256    ///   consumed along the way and will no longer be accessible.
257    /// * `Err(FutureWriteError { value })` - an attempt was made to send
258    ///   `value` but the other half of this [`FutureWriter`] was dropped before
259    ///   the value was received. This consumes `self` because the channel is
260    ///   now dropped, but `value` is returned in case the caller wants to reuse
261    ///   it.
262    ///
263    /// # Cancellation
264    ///
265    /// The returned future can be cancelled normally via `drop` which means
266    /// that the `value` provided here, along with this `FutureWriter` itself,
267    /// will be lost. There is also [`FutureWrite::cancel`] which can be used to
268    /// possibly re-acquire `value` and `self` if the operation was cancelled.
269    /// In such a situation the operation can be retried at a future date.
270    pub fn write(self, value: T) -> FutureWrite<T> {
271        FutureWrite {
272            op: WaitableOperation::new((self, value)),
273        }
274    }
275}
276
277impl<T> fmt::Debug for FutureWriter<T> {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        f.debug_struct("FutureWriter")
280            .field("handle", &self.handle)
281            .finish()
282    }
283}
284
285impl<T> Drop for FutureWriter<T> {
286    fn drop(&mut self) {
287        // If a value has not yet been written into this writer than that must
288        // be done so now. Perform a "clone" of `self` by moving our data into a
289        // subtask, but ensure that `should_write_default_value` is set to
290        // `false` to avoid infinite loops by accident. Once the task is spawned
291        // we're done and the subtask's destructor of the closed-over
292        // `FutureWriter` will be responsible for performing the
293        // `drop-writable` call below.
294        //
295        // Note, though, that if `should_write_default_value` is `false` then a
296        // write has already happened and we can go ahead and just synchronously
297        // drop this writer as we would any other handle.
298        if self.should_write_default_value {
299            let clone = FutureWriter {
300                handle: self.handle,
301                default: self.default,
302                should_write_default_value: false,
303                vtable: self.vtable,
304            };
305            crate::rt::async_support::spawn(async move {
306                let value = (clone.default)();
307                let _ = clone.write(value).await;
308            });
309        } else {
310            unsafe {
311                rtdebug!("future.drop-writable({})", self.handle);
312                (self.vtable.drop_writable)(self.handle);
313            }
314        }
315    }
316}
317
318/// Represents a write operation which may be cancelled prior to completion.
319///
320/// This is returned by [`FutureWriter::write`].
321pub struct FutureWrite<T: 'static> {
322    op: WaitableOperation<FutureWriteOp<T>>,
323}
324
325struct FutureWriteOp<T>(marker::PhantomData<T>);
326
327enum WriteComplete<T> {
328    Written,
329    Dropped(T),
330    Cancelled(T),
331}
332
333unsafe impl<T> WaitableOp for FutureWriteOp<T>
334where
335    T: 'static,
336{
337    type Start = (FutureWriter<T>, T);
338    type InProgress = (FutureWriter<T>, Option<Cleanup>);
339    type Result = (WriteComplete<T>, FutureWriter<T>);
340    type Cancel = FutureWriteCancel<T>;
341
342    fn start((writer, value): Self::Start) -> (u32, Self::InProgress) {
343        // TODO: it should be safe to store the lower-destination in
344        // `WaitableOperation` using `Pin` memory and such, but that would
345        // require some type-level trickery to get a correctly-sized value
346        // plumbed all the way to here. For now just dynamically allocate it and
347        // leave the optimization of leaving out this dynamic allocation to the
348        // future.
349        //
350        // In lieu of that a dedicated location on the heap is created for the
351        // lowering, and then `value`, as an owned value, is lowered into this
352        // pointer to initialize it.
353        let (ptr, cleanup) = Cleanup::new(writer.vtable.layout);
354        // SAFETY: `ptr` is allocated with `vtable.layout` and should be
355        // safe to use here.
356        let code = unsafe {
357            (writer.vtable.lower)(value, ptr);
358            (writer.vtable.start_write)(writer.handle, ptr)
359        };
360        rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle);
361        (code, (writer, cleanup))
362    }
363
364    fn start_cancelled((writer, value): Self::Start) -> Self::Cancel {
365        FutureWriteCancel::Cancelled(value, writer)
366    }
367
368    fn in_progress_update(
369        (mut writer, cleanup): Self::InProgress,
370        code: u32,
371    ) -> Result<Self::Result, Self::InProgress> {
372        let ptr = cleanup
373            .as_ref()
374            .map(|c| c.ptr.as_ptr())
375            .unwrap_or(ptr::null_mut());
376        match code {
377            super::BLOCKED => Err((writer, cleanup)),
378
379            // The other end has dropped its end.
380            //
381            // The value was not received by the other end so `ptr` still has
382            // all of its resources intact. Use `lift` to construct a new
383            // instance of `T` which takes ownership of pointers and resources
384            // and such. The allocation of `ptr` is then cleaned up naturally
385            // when `cleanup` goes out of scope.
386            super::DROPPED | super::CANCELLED => {
387                // SAFETY: we're the ones managing `ptr` so we know it's safe to
388                // pass here.
389                let value = unsafe { (writer.vtable.lift)(ptr) };
390                let status = if code == super::DROPPED {
391                    // This writer has been witnessed to be dropped, meaning that
392                    // `writer` is going to get destroyed soon as this return
393                    // value propagates up the stack. There's no need to write
394                    // the default value, so set this to `false`.
395                    writer.should_write_default_value = false;
396                    WriteComplete::Dropped(value)
397                } else {
398                    WriteComplete::Cancelled(value)
399                };
400                Ok((status, writer))
401            }
402
403            // This write has completed.
404            //
405            // Here we need to clean up our allocations. The `ptr` exclusively
406            // owns all of the value being sent and we notably need to cleanup
407            // the transitive list allocations present in this pointer. Use
408            // `dealloc_lists` for that (effectively a post-return lookalike).
409            //
410            // Afterwards the `cleanup` itself is naturally dropped and cleaned
411            // up.
412            super::COMPLETED => {
413                // A value was written, so no need to write the default value.
414                writer.should_write_default_value = false;
415
416                // SAFETY: we're the ones managing `ptr` so we know it's safe to
417                // pass here.
418                unsafe {
419                    (writer.vtable.dealloc_lists)(ptr);
420                }
421                Ok((WriteComplete::Written, writer))
422            }
423
424            other => unreachable!("unexpected code {other:?}"),
425        }
426    }
427
428    fn in_progress_waitable((writer, _): &Self::InProgress) -> u32 {
429        writer.handle
430    }
431
432    fn in_progress_cancel((writer, _): &Self::InProgress) -> u32 {
433        // SAFETY: we're managing `writer` and all the various operational bits,
434        // so this relies on `WaitableOperation` being safe.
435        let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
436        rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle);
437        code
438    }
439
440    fn result_into_cancel((result, writer): Self::Result) -> Self::Cancel {
441        match result {
442            // The value was actually sent, meaning we can't yield back the
443            // future nor the value.
444            WriteComplete::Written => FutureWriteCancel::AlreadySent,
445
446            // The value was not sent because the other end either hung up or we
447            // successfully cancelled. In both cases return back the value here
448            // with the writer.
449            WriteComplete::Dropped(val) => FutureWriteCancel::Dropped(val),
450            WriteComplete::Cancelled(val) => FutureWriteCancel::Cancelled(val, writer),
451        }
452    }
453}
454
455impl<T: 'static> Future for FutureWrite<T> {
456    type Output = Result<(), FutureWriteError<T>>;
457
458    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459        self.pin_project()
460            .poll_complete(cx)
461            .map(|(result, _writer)| match result {
462                WriteComplete::Written => Ok(()),
463                WriteComplete::Dropped(value) | WriteComplete::Cancelled(value) => {
464                    Err(FutureWriteError { value })
465                }
466            })
467    }
468}
469
470impl<T: 'static> FutureWrite<T> {
471    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureWriteOp<T>>> {
472        // SAFETY: we've chosen that when `Self` is pinned that it translates to
473        // always pinning the inner field, so that's codified here.
474        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
475    }
476
477    /// Cancel this write if it hasn't already completed.
478    ///
479    /// This method can be used to cancel a write-in-progress and re-acquire
480    /// the writer and the value being sent. Note that the write operation may
481    /// succeed racily or the other end may also drop racily, and these
482    /// outcomes are reflected in the returned value here.
483    ///
484    /// # Panics
485    ///
486    /// Panics if the operation has already been completed via `Future::poll`,
487    /// or if this method is called twice.
488    pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel<T> {
489        self.pin_project().cancel()
490    }
491}
492
493/// Error type in the result of [`FutureWrite`], or the error type that is a result of
494/// a failure to write a future.
495pub struct FutureWriteError<T> {
496    /// The value that could not be sent.
497    pub value: T,
498}
499
500impl<T> fmt::Debug for FutureWriteError<T> {
501    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502        f.debug_struct("FutureWriteError").finish_non_exhaustive()
503    }
504}
505
506impl<T> fmt::Display for FutureWriteError<T> {
507    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508        "read end dropped".fmt(f)
509    }
510}
511
512impl<T> std::error::Error for FutureWriteError<T> {}
513
514/// Result of [`FutureWrite::cancel`].
515#[derive(Debug)]
516pub enum FutureWriteCancel<T: 'static> {
517    /// The cancel request raced with the receipt of the sent value, and the
518    /// value was actually sent. Neither the value nor the writer are made
519    /// available here as both are gone.
520    AlreadySent,
521
522    /// The other end was dropped before cancellation happened.
523    ///
524    /// In this case the original value is returned back to the caller but the
525    /// writer itself is not longer accessible as it's no longer usable.
526    Dropped(T),
527
528    /// The pending write was successfully cancelled and the value being written
529    /// is returned along with the writer to resume again in the future if
530    /// necessary.
531    Cancelled(T, FutureWriter<T>),
532}
533
534/// Represents the readable end of a Component Model `future<T>`.
535pub struct FutureReader<T: 'static> {
536    handle: AtomicU32,
537    vtable: &'static FutureVtable<T>,
538}
539
540impl<T> fmt::Debug for FutureReader<T> {
541    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542        f.debug_struct("FutureReader")
543            .field("handle", &self.handle)
544            .finish()
545    }
546}
547
548impl<T> FutureReader<T> {
549    #[doc(hidden)]
550    pub fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
551        Self {
552            handle: AtomicU32::new(handle),
553            vtable,
554        }
555    }
556
557    #[doc(hidden)]
558    pub fn take_handle(&self) -> u32 {
559        let ret = self.opt_handle().unwrap();
560        self.handle.store(u32::MAX, Relaxed);
561        ret
562    }
563
564    fn handle(&self) -> u32 {
565        self.opt_handle().unwrap()
566    }
567
568    fn opt_handle(&self) -> Option<u32> {
569        match self.handle.load(Relaxed) {
570            u32::MAX => None,
571            other => Some(other),
572        }
573    }
574}
575
576impl<T> IntoFuture for FutureReader<T> {
577    type Output = T;
578    type IntoFuture = FutureRead<T>;
579
580    /// Convert this object into a `Future` which will resolve when a value is
581    /// written to the writable end of this `future`.
582    fn into_future(self) -> Self::IntoFuture {
583        FutureRead {
584            op: WaitableOperation::new(self),
585        }
586    }
587}
588
589impl<T> Drop for FutureReader<T> {
590    fn drop(&mut self) {
591        let Some(handle) = self.opt_handle() else {
592            return;
593        };
594        unsafe {
595            rtdebug!("future.drop-readable({handle})");
596            (self.vtable.drop_readable)(handle);
597        }
598    }
599}
600
601/// Represents a read operation which may be cancelled prior to completion.
602///
603/// This represents a read operation on a [`FutureReader`] and is created via
604/// `IntoFuture`.
605pub struct FutureRead<T: 'static> {
606    op: WaitableOperation<FutureReadOp<T>>,
607}
608
609struct FutureReadOp<T>(marker::PhantomData<T>);
610
611enum ReadComplete<T> {
612    Value(T),
613    Cancelled,
614}
615
616unsafe impl<T> WaitableOp for FutureReadOp<T>
617where
618    T: 'static,
619{
620    type Start = FutureReader<T>;
621    type InProgress = (FutureReader<T>, Option<Cleanup>);
622    type Result = (ReadComplete<T>, FutureReader<T>);
623    type Cancel = Result<T, FutureReader<T>>;
624
625    fn start(reader: Self::Start) -> (u32, Self::InProgress) {
626        let (ptr, cleanup) = Cleanup::new(reader.vtable.layout);
627        // SAFETY: `ptr` is allocated with `vtable.layout` and should be
628        // safe to use here. Its lifetime for the async operation is hinged on
629        // `WaitableOperation` being safe.
630        let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr) };
631        rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle());
632        (code, (reader, cleanup))
633    }
634
635    fn start_cancelled(state: Self::Start) -> Self::Cancel {
636        Err(state)
637    }
638
639    fn in_progress_update(
640        (reader, cleanup): Self::InProgress,
641        code: u32,
642    ) -> Result<Self::Result, Self::InProgress> {
643        match ReturnCode::decode(code) {
644            ReturnCode::Blocked => Err((reader, cleanup)),
645
646            // Let `cleanup` fall out of scope to clean up its allocation here,
647            // and otherwise tahe reader is plumbed through to possibly restart
648            // the read in the future.
649            ReturnCode::Cancelled(0) => Ok((ReadComplete::Cancelled, reader)),
650
651            // The read has completed, so lift the value from the stored memory and
652            // `cleanup` naturally falls out of scope after transferring ownership of
653            // everything to the returned `value`.
654            ReturnCode::Completed(0) => {
655                let ptr = cleanup
656                    .as_ref()
657                    .map(|c| c.ptr.as_ptr())
658                    .unwrap_or(ptr::null_mut());
659
660                // SAFETY: we're the ones managing `ptr` so we know it's safe to
661                // pass here.
662                let value = unsafe { (reader.vtable.lift)(ptr) };
663                Ok((ReadComplete::Value(value), reader))
664            }
665
666            other => panic!("unexpected code {other:?}"),
667        }
668    }
669
670    fn in_progress_waitable((reader, _): &Self::InProgress) -> u32 {
671        reader.handle()
672    }
673
674    fn in_progress_cancel((reader, _): &Self::InProgress) -> u32 {
675        // SAFETY: we're managing `reader` and all the various operational bits,
676        // so this relies on `WaitableOperation` being safe.
677        let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
678        rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle());
679        code
680    }
681
682    fn result_into_cancel((value, reader): Self::Result) -> Self::Cancel {
683        match value {
684            // The value was actually read, so thread that through here.
685            ReadComplete::Value(value) => Ok(value),
686
687            // The read was successfully cancelled, so thread through the
688            // `reader` to possibly restart later on.
689            ReadComplete::Cancelled => Err(reader),
690        }
691    }
692}
693
694impl<T: 'static> Future for FutureRead<T> {
695    type Output = T;
696
697    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
698        self.pin_project()
699            .poll_complete(cx)
700            .map(|(result, _reader)| match result {
701                ReadComplete::Value(val) => val,
702                // This is only possible if, after calling `FutureRead::cancel`,
703                // the future is polled again. The `cancel` method is documented
704                // as "don't do that" so this is left to panic.
705                ReadComplete::Cancelled => panic!("cannot poll after cancelling"),
706            })
707    }
708}
709
710impl<T> FutureRead<T> {
711    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureReadOp<T>>> {
712        // SAFETY: we've chosen that when `Self` is pinned that it translates to
713        // always pinning the inner field, so that's codified here.
714        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
715    }
716
717    /// Cancel this read if it hasn't already completed.
718    ///
719    /// Return values include:
720    ///
721    /// * `Ok(value)` - future completed before this cancellation request
722    ///   was received.
723    /// * `Err(reader)` - read operation was cancelled and it can be retried in
724    ///   the future if desired.
725    ///
726    /// # Panics
727    ///
728    /// Panics if the operation has already been completed via `Future::poll`,
729    /// or if this method is called twice. Additionally if this method completes
730    /// then calling `poll` again on `self` will panic.
731    pub fn cancel(self: Pin<&mut Self>) -> Result<T, FutureReader<T>> {
732        self.pin_project().cancel()
733    }
734}