wit_bindgen/rt/async_support/future_support.rs
1//! Runtime support for `future<T>` in the component model.
2//!
3//! There are a number of tricky concerns to all balance when implementing
4//! bindings to `future<T>`, specifically with how it interacts with Rust. This
5//! will attempt to go over some of the high-level details of the implementation
6//! here.
7//!
8//! ## Leak safety
9//!
10//! It's safe to leak any value at any time currently in Rust. In other words
11//! Rust doesn't have linear types (yet). Typically this isn't really a problem
12//! but the component model intrinsics we're working with here operate by being
13//! given a pointer and then at some point in the future the pointer may be
14//! read. This means that it's our responsibility to keep this pointer alive and
15//! valid for the entire duration of an asynchronous operation.
16//!
17//! Chiefly this means that borrowed values are a no-no in this module. For
18//! example if you were to send a `&[u8]` as an implementation of
19//! `future<list<u8>>` that would not be sound. For example:
20//!
21//! * The future send operation is started, recording an address of `&[u8]`.
22//! * The future is then leaked.
23//! * According to rustc, later in code the original `&[u8]` is then no longer
24//! borrowed.
25//! * The original source of `&[u8]` could then be deallocated.
26//! * Then the component model actually reads the pointer that it was given.
27//!
28//! This constraint effectively means that all types flowing in-and-out of
29//! futures, streams, and async APIs are all "owned values", notably no
30//! lifetimes. This requires, for example, that `future<list<u8>>` operates on
31//! `Vec<u8>`.
32//!
33//! This is in stark contrast to bindings generated for `list<u8>` otherwise,
34//! however, where for example a synchronous import with a `list<u8>` argument
35//! would be bound with a `&[u8]` argument. Until Rust has some form of linear
36//! types, however, it's not possible to loosen this restriction soundly because
37//! it's generally not safe to leak an active I/O operation. This restriction is
38//! similar to why it's so difficult to bind `io_uring` in safe Rust, which
39//! operates similarly to the component model where pointers are submitted and
40//! read in the future after the original call for submission returns.
41//!
42//! ## Lowering Owned Values
43//!
44//! According to the above everything with futures/streams operates on owned
45//! values already, but this also affects precisely how lifting and lowering is
46//! performed. In general any active asynchronous operation could be cancelled
47//! at any time, meaning we have to deal with situations such as:
48//!
49//! * A `write` hasn't even started yet.
50//! * A `write` was started and then cancelled.
51//! * A `write` was started and then the other end dropped the channel.
52//! * A `write` was started and then the other end received the value.
53//!
54//! In all of these situations regardless of the structure of `T` we can't leak
55//! memory. The `future.write` intrinsic, however, takes no ownership of the
56//! memory involved which means that we're still responsible for cleaning up
57//! lists. It does take ownership, however, of `own<T>` handles and other
58//! resources.
59//!
60//! The way that this is solved for futures/streams is to lean further into
61//! processing owned values. Namely lowering a `T` takes `T`-by-value, not `&T`.
62//! This means that lowering operates similarly to return values of exported
63//! functions, not parameters to imported functions. By lowering an owned value
64//! of `T` this preserves a nice property where the lowered value has exclusive
65//! ownership of all of its pointers/resources/etc. Lowering `&T` may require a
66//! "cleanup list" for example which we avoid here entirely.
67//!
68//! This then makes the second and third cases above, getting a value back after
69//! lowering, much easier. Namely re-acquisition of a value is simple `lift`
70//! operation as if we received a value on the channel.
71//!
72//! ## Inefficiencies
73//!
74//! The above requirements generally mean that this is not a hyper-efficient
75//! implementation. All writes and reads, for example, start out with allocation
76//! memory on the heap to be owned by the asynchronous operation. Writing a
77//! `list<u8>` to a future passes ownership of `Vec<u8>` but in theory doesn't
78//! not actually require relinquishing ownership of the vector. Furthermore
79//! there's no way to re-acquire a `T` after it has been sent, but all of `T` is
80//! still valid except for `own<U>` resources.
81//!
82//! That's all to say that this implementation can probably still be improved
83//! upon, but doing so is thought to be pretty nontrivial at this time. It
84//! should be noted though that there are other high-level inefficiencies with
85//! WIT unrelated to this module. For example `list<T>` is not always
86//! represented the same in Rust as it is in the canonical ABI. That means that
87//! sending `list<T>` into a future might require copying the entire list and
88//! changing its layout. Currently this is par-for-the-course with bindings.
89//!
90//! ## Linear (exactly once) Writes
91//!
92//! The component model requires that a writable end of a future must be written
93//! to before closing, otherwise the drop operation traps. Ideally usage of
94//! this API shouldn't result in traps so this is modeled in the Rust-level API
95//! to prevent this trap from occurring. Rust does not support linear types
96//! (types that must be used exactly once), instead it only has affine types
97//! (types which must be used at most once), meaning that this requires some
98//! runtime support.
99//!
100//! Specifically the `FutureWriter` structure stores two auxiliary Rust-specific
101//! pieces of information:
102//!
103//! * A `should_write_default_value` boolean - if `true` on destruction then a
104//! value has not yet been written and something must be written.
105//! * A `default: fn() -> T` constructor to lazily create the default value to
106//! be sent in this situation.
107//!
108//! This `default` field is provided by the user when the future is initially
109//! created. Additionally during `Drop` a new Rust-level task will be spawned to
110//! perform the write in the background. That'll keep the component-level task
111//! alive until that write completes but otherwise shouldn't hinder anything
112//! else.
113
114use {
115 crate::rt::async_support::waitable::{WaitableOp, WaitableOperation},
116 crate::rt::async_support::ReturnCode,
117 crate::rt::Cleanup,
118 std::{
119 alloc::Layout,
120 fmt,
121 future::{Future, IntoFuture},
122 marker,
123 pin::Pin,
124 ptr,
125 sync::atomic::{AtomicU32, Ordering::Relaxed},
126 task::{Context, Poll},
127 },
128};
129
130/// Function table used for [`FutureWriter`] and [`FutureReader`]
131///
132/// Instances of this table are generated by `wit_bindgen::generate!`. This is
133/// not a trait to enable different `FutureVtable<()>` instances to exist, for
134/// example, through different calls to `wit_bindgen::generate!`.
135///
136/// It's not intended that any user implements this vtable, instead it's
137/// intended to only be auto-generated.
138#[doc(hidden)]
139pub struct FutureVtable<T> {
140 /// The Canonical ABI layout of `T` in-memory.
141 pub layout: Layout,
142
143 /// A callback to consume a value of `T` and lower it to the canonical ABI
144 /// pointed to by `dst`.
145 ///
146 /// The `dst` pointer should have `self.layout`. This is used to convert
147 /// in-memory representations in Rust to their canonical representations in
148 /// the component model.
149 pub lower: unsafe fn(value: T, dst: *mut u8),
150
151 /// A callback to deallocate any lists within the canonical ABI value `dst`
152 /// provided.
153 ///
154 /// This is used when a value is successfully sent to another component. In
155 /// such a situation it may be possible that the canonical lowering of `T`
156 /// has lists that are still owned by this component and must be
157 /// deallocated. This is akin to a `post-return` callback for returns of
158 /// exported functions.
159 pub dealloc_lists: unsafe fn(dst: *mut u8),
160
161 /// A callback to lift a value of `T` from the canonical ABI representation
162 /// provided.
163 pub lift: unsafe fn(dst: *mut u8) -> T,
164
165 /// The raw `future.write` intrinsic.
166 pub start_write: unsafe extern "C" fn(future: u32, val: *const u8) -> u32,
167 /// The raw `future.read` intrinsic.
168 pub start_read: unsafe extern "C" fn(future: u32, val: *mut u8) -> u32,
169 /// The raw `future.cancel-write` intrinsic.
170 pub cancel_write: unsafe extern "C" fn(future: u32) -> u32,
171 /// The raw `future.cancel-read` intrinsic.
172 pub cancel_read: unsafe extern "C" fn(future: u32) -> u32,
173 /// The raw `future.drop-writable` intrinsic.
174 pub drop_writable: unsafe extern "C" fn(future: u32),
175 /// The raw `future.drop-readable` intrinsic.
176 pub drop_readable: unsafe extern "C" fn(future: u32),
177 /// The raw `future.new` intrinsic.
178 pub new: unsafe extern "C" fn() -> u64,
179}
180
181/// Helper function to create a new read/write pair for a component model
182/// future.
183///
184/// # Unsafety
185///
186/// This function is unsafe as it requires the functions within `vtable` to
187/// correctly uphold the contracts of the component model.
188pub unsafe fn future_new<T>(
189 default: fn() -> T,
190 vtable: &'static FutureVtable<T>,
191) -> (FutureWriter<T>, FutureReader<T>) {
192 unsafe {
193 let handles = (vtable.new)();
194 let reader = handles as u32;
195 let writer = (handles >> 32) as u32;
196 rtdebug!("future.new() = [{writer}, {reader}]");
197 (
198 FutureWriter::new(writer, default, vtable),
199 FutureReader::new(reader, vtable),
200 )
201 }
202}
203
204/// Represents the writable end of a Component Model `future`.
205///
206/// A [`FutureWriter`] can be used to send a single value of `T` to the other
207/// end of a `future`. In a sense this is similar to a oneshot channel in Rust.
208pub struct FutureWriter<T: 'static> {
209 handle: u32,
210 vtable: &'static FutureVtable<T>,
211
212 /// Whether or not a value should be written during `drop`.
213 ///
214 /// This is set to `false` when a value is successfully written or when a
215 /// value is written but the future is witnessed as being dropped.
216 ///
217 /// Note that this is set to `true` on construction to ensure that only
218 /// location which actually witness a completed write set it to `false`.
219 should_write_default_value: bool,
220
221 /// Constructor for the default value to write during `drop`, should one
222 /// need to be written.
223 default: fn() -> T,
224}
225
226impl<T> FutureWriter<T> {
227 /// Helper function to wrap a handle/vtable into a `FutureWriter`.
228 ///
229 /// # Unsafety
230 ///
231 /// This function is unsafe as it requires the functions within `vtable` to
232 /// correctly uphold the contracts of the component model.
233 #[doc(hidden)]
234 pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable<T>) -> Self {
235 Self {
236 handle,
237 default,
238 should_write_default_value: true,
239 vtable,
240 }
241 }
242
243 /// Write the specified `value` to this `future`.
244 ///
245 /// This method is equivalent to an `async fn` which sends the `value` into
246 /// this future. The asynchronous operation acts as a rendezvous where the
247 /// operation does not complete until the other side has successfully
248 /// received the value.
249 ///
250 /// # Return Value
251 ///
252 /// The returned [`FutureWrite`] is a future that can be `.await`'d. The
253 /// return value of this future is:
254 ///
255 /// * `Ok(())` - the `value` was sent and received. The `self` value was
256 /// consumed along the way and will no longer be accessible.
257 /// * `Err(FutureWriteError { value })` - an attempt was made to send
258 /// `value` but the other half of this [`FutureWriter`] was dropped before
259 /// the value was received. This consumes `self` because the channel is
260 /// now dropped, but `value` is returned in case the caller wants to reuse
261 /// it.
262 ///
263 /// # Cancellation
264 ///
265 /// The returned future can be cancelled normally via `drop` which means
266 /// that the `value` provided here, along with this `FutureWriter` itself,
267 /// will be lost. There is also [`FutureWrite::cancel`] which can be used to
268 /// possibly re-acquire `value` and `self` if the operation was cancelled.
269 /// In such a situation the operation can be retried at a future date.
270 pub fn write(self, value: T) -> FutureWrite<T> {
271 FutureWrite {
272 op: WaitableOperation::new((self, value)),
273 }
274 }
275}
276
277impl<T> fmt::Debug for FutureWriter<T> {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 f.debug_struct("FutureWriter")
280 .field("handle", &self.handle)
281 .finish()
282 }
283}
284
285impl<T> Drop for FutureWriter<T> {
286 fn drop(&mut self) {
287 // If a value has not yet been written into this writer than that must
288 // be done so now. Perform a "clone" of `self` by moving our data into a
289 // subtask, but ensure that `should_write_default_value` is set to
290 // `false` to avoid infinite loops by accident. Once the task is spawned
291 // we're done and the subtask's destructor of the closed-over
292 // `FutureWriter` will be responsible for performing the
293 // `drop-writable` call below.
294 //
295 // Note, though, that if `should_write_default_value` is `false` then a
296 // write has already happened and we can go ahead and just synchronously
297 // drop this writer as we would any other handle.
298 if self.should_write_default_value {
299 let clone = FutureWriter {
300 handle: self.handle,
301 default: self.default,
302 should_write_default_value: false,
303 vtable: self.vtable,
304 };
305 crate::rt::async_support::spawn(async move {
306 let value = (clone.default)();
307 let _ = clone.write(value).await;
308 });
309 } else {
310 unsafe {
311 rtdebug!("future.drop-writable({})", self.handle);
312 (self.vtable.drop_writable)(self.handle);
313 }
314 }
315 }
316}
317
318/// Represents a write operation which may be cancelled prior to completion.
319///
320/// This is returned by [`FutureWriter::write`].
321pub struct FutureWrite<T: 'static> {
322 op: WaitableOperation<FutureWriteOp<T>>,
323}
324
325struct FutureWriteOp<T>(marker::PhantomData<T>);
326
327enum WriteComplete<T> {
328 Written,
329 Dropped(T),
330 Cancelled(T),
331}
332
333unsafe impl<T> WaitableOp for FutureWriteOp<T>
334where
335 T: 'static,
336{
337 type Start = (FutureWriter<T>, T);
338 type InProgress = (FutureWriter<T>, Option<Cleanup>);
339 type Result = (WriteComplete<T>, FutureWriter<T>);
340 type Cancel = FutureWriteCancel<T>;
341
342 fn start((writer, value): Self::Start) -> (u32, Self::InProgress) {
343 // TODO: it should be safe to store the lower-destination in
344 // `WaitableOperation` using `Pin` memory and such, but that would
345 // require some type-level trickery to get a correctly-sized value
346 // plumbed all the way to here. For now just dynamically allocate it and
347 // leave the optimization of leaving out this dynamic allocation to the
348 // future.
349 //
350 // In lieu of that a dedicated location on the heap is created for the
351 // lowering, and then `value`, as an owned value, is lowered into this
352 // pointer to initialize it.
353 let (ptr, cleanup) = Cleanup::new(writer.vtable.layout);
354 // SAFETY: `ptr` is allocated with `vtable.layout` and should be
355 // safe to use here.
356 let code = unsafe {
357 (writer.vtable.lower)(value, ptr);
358 (writer.vtable.start_write)(writer.handle, ptr)
359 };
360 rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle);
361 (code, (writer, cleanup))
362 }
363
364 fn start_cancelled((writer, value): Self::Start) -> Self::Cancel {
365 FutureWriteCancel::Cancelled(value, writer)
366 }
367
368 fn in_progress_update(
369 (mut writer, cleanup): Self::InProgress,
370 code: u32,
371 ) -> Result<Self::Result, Self::InProgress> {
372 let ptr = cleanup
373 .as_ref()
374 .map(|c| c.ptr.as_ptr())
375 .unwrap_or(ptr::null_mut());
376 match code {
377 super::BLOCKED => Err((writer, cleanup)),
378
379 // The other end has dropped its end.
380 //
381 // The value was not received by the other end so `ptr` still has
382 // all of its resources intact. Use `lift` to construct a new
383 // instance of `T` which takes ownership of pointers and resources
384 // and such. The allocation of `ptr` is then cleaned up naturally
385 // when `cleanup` goes out of scope.
386 super::DROPPED | super::CANCELLED => {
387 // SAFETY: we're the ones managing `ptr` so we know it's safe to
388 // pass here.
389 let value = unsafe { (writer.vtable.lift)(ptr) };
390 let status = if code == super::DROPPED {
391 // This writer has been witnessed to be dropped, meaning that
392 // `writer` is going to get destroyed soon as this return
393 // value propagates up the stack. There's no need to write
394 // the default value, so set this to `false`.
395 writer.should_write_default_value = false;
396 WriteComplete::Dropped(value)
397 } else {
398 WriteComplete::Cancelled(value)
399 };
400 Ok((status, writer))
401 }
402
403 // This write has completed.
404 //
405 // Here we need to clean up our allocations. The `ptr` exclusively
406 // owns all of the value being sent and we notably need to cleanup
407 // the transitive list allocations present in this pointer. Use
408 // `dealloc_lists` for that (effectively a post-return lookalike).
409 //
410 // Afterwards the `cleanup` itself is naturally dropped and cleaned
411 // up.
412 super::COMPLETED => {
413 // A value was written, so no need to write the default value.
414 writer.should_write_default_value = false;
415
416 // SAFETY: we're the ones managing `ptr` so we know it's safe to
417 // pass here.
418 unsafe {
419 (writer.vtable.dealloc_lists)(ptr);
420 }
421 Ok((WriteComplete::Written, writer))
422 }
423
424 other => unreachable!("unexpected code {other:?}"),
425 }
426 }
427
428 fn in_progress_waitable((writer, _): &Self::InProgress) -> u32 {
429 writer.handle
430 }
431
432 fn in_progress_cancel((writer, _): &Self::InProgress) -> u32 {
433 // SAFETY: we're managing `writer` and all the various operational bits,
434 // so this relies on `WaitableOperation` being safe.
435 let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
436 rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle);
437 code
438 }
439
440 fn result_into_cancel((result, writer): Self::Result) -> Self::Cancel {
441 match result {
442 // The value was actually sent, meaning we can't yield back the
443 // future nor the value.
444 WriteComplete::Written => FutureWriteCancel::AlreadySent,
445
446 // The value was not sent because the other end either hung up or we
447 // successfully cancelled. In both cases return back the value here
448 // with the writer.
449 WriteComplete::Dropped(val) => FutureWriteCancel::Dropped(val),
450 WriteComplete::Cancelled(val) => FutureWriteCancel::Cancelled(val, writer),
451 }
452 }
453}
454
455impl<T: 'static> Future for FutureWrite<T> {
456 type Output = Result<(), FutureWriteError<T>>;
457
458 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459 self.pin_project()
460 .poll_complete(cx)
461 .map(|(result, _writer)| match result {
462 WriteComplete::Written => Ok(()),
463 WriteComplete::Dropped(value) | WriteComplete::Cancelled(value) => {
464 Err(FutureWriteError { value })
465 }
466 })
467 }
468}
469
470impl<T: 'static> FutureWrite<T> {
471 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureWriteOp<T>>> {
472 // SAFETY: we've chosen that when `Self` is pinned that it translates to
473 // always pinning the inner field, so that's codified here.
474 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
475 }
476
477 /// Cancel this write if it hasn't already completed.
478 ///
479 /// This method can be used to cancel a write-in-progress and re-acquire
480 /// the writer and the value being sent. Note that the write operation may
481 /// succeed racily or the other end may also drop racily, and these
482 /// outcomes are reflected in the returned value here.
483 ///
484 /// # Panics
485 ///
486 /// Panics if the operation has already been completed via `Future::poll`,
487 /// or if this method is called twice.
488 pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel<T> {
489 self.pin_project().cancel()
490 }
491}
492
493/// Error type in the result of [`FutureWrite`], or the error type that is a result of
494/// a failure to write a future.
495pub struct FutureWriteError<T> {
496 /// The value that could not be sent.
497 pub value: T,
498}
499
500impl<T> fmt::Debug for FutureWriteError<T> {
501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 f.debug_struct("FutureWriteError").finish_non_exhaustive()
503 }
504}
505
506impl<T> fmt::Display for FutureWriteError<T> {
507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 "read end dropped".fmt(f)
509 }
510}
511
512impl<T> std::error::Error for FutureWriteError<T> {}
513
514/// Result of [`FutureWrite::cancel`].
515#[derive(Debug)]
516pub enum FutureWriteCancel<T: 'static> {
517 /// The cancel request raced with the receipt of the sent value, and the
518 /// value was actually sent. Neither the value nor the writer are made
519 /// available here as both are gone.
520 AlreadySent,
521
522 /// The other end was dropped before cancellation happened.
523 ///
524 /// In this case the original value is returned back to the caller but the
525 /// writer itself is not longer accessible as it's no longer usable.
526 Dropped(T),
527
528 /// The pending write was successfully cancelled and the value being written
529 /// is returned along with the writer to resume again in the future if
530 /// necessary.
531 Cancelled(T, FutureWriter<T>),
532}
533
534/// Represents the readable end of a Component Model `future<T>`.
535pub struct FutureReader<T: 'static> {
536 handle: AtomicU32,
537 vtable: &'static FutureVtable<T>,
538}
539
540impl<T> fmt::Debug for FutureReader<T> {
541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542 f.debug_struct("FutureReader")
543 .field("handle", &self.handle)
544 .finish()
545 }
546}
547
548impl<T> FutureReader<T> {
549 #[doc(hidden)]
550 pub fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
551 Self {
552 handle: AtomicU32::new(handle),
553 vtable,
554 }
555 }
556
557 #[doc(hidden)]
558 pub fn take_handle(&self) -> u32 {
559 let ret = self.opt_handle().unwrap();
560 self.handle.store(u32::MAX, Relaxed);
561 ret
562 }
563
564 fn handle(&self) -> u32 {
565 self.opt_handle().unwrap()
566 }
567
568 fn opt_handle(&self) -> Option<u32> {
569 match self.handle.load(Relaxed) {
570 u32::MAX => None,
571 other => Some(other),
572 }
573 }
574}
575
576impl<T> IntoFuture for FutureReader<T> {
577 type Output = T;
578 type IntoFuture = FutureRead<T>;
579
580 /// Convert this object into a `Future` which will resolve when a value is
581 /// written to the writable end of this `future`.
582 fn into_future(self) -> Self::IntoFuture {
583 FutureRead {
584 op: WaitableOperation::new(self),
585 }
586 }
587}
588
589impl<T> Drop for FutureReader<T> {
590 fn drop(&mut self) {
591 let Some(handle) = self.opt_handle() else {
592 return;
593 };
594 unsafe {
595 rtdebug!("future.drop-readable({handle})");
596 (self.vtable.drop_readable)(handle);
597 }
598 }
599}
600
601/// Represents a read operation which may be cancelled prior to completion.
602///
603/// This represents a read operation on a [`FutureReader`] and is created via
604/// `IntoFuture`.
605pub struct FutureRead<T: 'static> {
606 op: WaitableOperation<FutureReadOp<T>>,
607}
608
609struct FutureReadOp<T>(marker::PhantomData<T>);
610
611enum ReadComplete<T> {
612 Value(T),
613 Cancelled,
614}
615
616unsafe impl<T> WaitableOp for FutureReadOp<T>
617where
618 T: 'static,
619{
620 type Start = FutureReader<T>;
621 type InProgress = (FutureReader<T>, Option<Cleanup>);
622 type Result = (ReadComplete<T>, FutureReader<T>);
623 type Cancel = Result<T, FutureReader<T>>;
624
625 fn start(reader: Self::Start) -> (u32, Self::InProgress) {
626 let (ptr, cleanup) = Cleanup::new(reader.vtable.layout);
627 // SAFETY: `ptr` is allocated with `vtable.layout` and should be
628 // safe to use here. Its lifetime for the async operation is hinged on
629 // `WaitableOperation` being safe.
630 let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr) };
631 rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle());
632 (code, (reader, cleanup))
633 }
634
635 fn start_cancelled(state: Self::Start) -> Self::Cancel {
636 Err(state)
637 }
638
639 fn in_progress_update(
640 (reader, cleanup): Self::InProgress,
641 code: u32,
642 ) -> Result<Self::Result, Self::InProgress> {
643 match ReturnCode::decode(code) {
644 ReturnCode::Blocked => Err((reader, cleanup)),
645
646 // Let `cleanup` fall out of scope to clean up its allocation here,
647 // and otherwise tahe reader is plumbed through to possibly restart
648 // the read in the future.
649 ReturnCode::Cancelled(0) => Ok((ReadComplete::Cancelled, reader)),
650
651 // The read has completed, so lift the value from the stored memory and
652 // `cleanup` naturally falls out of scope after transferring ownership of
653 // everything to the returned `value`.
654 ReturnCode::Completed(0) => {
655 let ptr = cleanup
656 .as_ref()
657 .map(|c| c.ptr.as_ptr())
658 .unwrap_or(ptr::null_mut());
659
660 // SAFETY: we're the ones managing `ptr` so we know it's safe to
661 // pass here.
662 let value = unsafe { (reader.vtable.lift)(ptr) };
663 Ok((ReadComplete::Value(value), reader))
664 }
665
666 other => panic!("unexpected code {other:?}"),
667 }
668 }
669
670 fn in_progress_waitable((reader, _): &Self::InProgress) -> u32 {
671 reader.handle()
672 }
673
674 fn in_progress_cancel((reader, _): &Self::InProgress) -> u32 {
675 // SAFETY: we're managing `reader` and all the various operational bits,
676 // so this relies on `WaitableOperation` being safe.
677 let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
678 rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle());
679 code
680 }
681
682 fn result_into_cancel((value, reader): Self::Result) -> Self::Cancel {
683 match value {
684 // The value was actually read, so thread that through here.
685 ReadComplete::Value(value) => Ok(value),
686
687 // The read was successfully cancelled, so thread through the
688 // `reader` to possibly restart later on.
689 ReadComplete::Cancelled => Err(reader),
690 }
691 }
692}
693
694impl<T: 'static> Future for FutureRead<T> {
695 type Output = T;
696
697 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
698 self.pin_project()
699 .poll_complete(cx)
700 .map(|(result, _reader)| match result {
701 ReadComplete::Value(val) => val,
702 // This is only possible if, after calling `FutureRead::cancel`,
703 // the future is polled again. The `cancel` method is documented
704 // as "don't do that" so this is left to panic.
705 ReadComplete::Cancelled => panic!("cannot poll after cancelling"),
706 })
707 }
708}
709
710impl<T> FutureRead<T> {
711 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureReadOp<T>>> {
712 // SAFETY: we've chosen that when `Self` is pinned that it translates to
713 // always pinning the inner field, so that's codified here.
714 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
715 }
716
717 /// Cancel this read if it hasn't already completed.
718 ///
719 /// Return values include:
720 ///
721 /// * `Ok(value)` - future completed before this cancellation request
722 /// was received.
723 /// * `Err(reader)` - read operation was cancelled and it can be retried in
724 /// the future if desired.
725 ///
726 /// # Panics
727 ///
728 /// Panics if the operation has already been completed via `Future::poll`,
729 /// or if this method is called twice. Additionally if this method completes
730 /// then calling `poll` again on `self` will panic.
731 pub fn cancel(self: Pin<&mut Self>) -> Result<T, FutureReader<T>> {
732 self.pin_project().cancel()
733 }
734}