wit_bindgen/rt/async_support/future_support.rs
1//! Runtime support for `future<T>` in the component model.
2//!
3//! There are a number of tricky concerns to all balance when implementing
4//! bindings to `future<T>`, specifically with how it interacts with Rust. This
5//! will attempt to go over some of the high-level details of the implementation
6//! here.
7//!
8//! ## Leak safety
9//!
10//! It's safe to leak any value at any time currently in Rust. In other words
11//! Rust doesn't have linear types (yet). Typically this isn't really a problem
12//! but the component model intrinsics we're working with here operate by being
13//! given a pointer and then at some point in the future the pointer may be
14//! read. This means that it's our responsibility to keep this pointer alive and
15//! valid for the entire duration of an asynchronous operation.
16//!
17//! Chiefly this means that borrowed values are a no-no in this module. For
18//! example if you were to send a `&[u8]` as an implementation of
19//! `future<list<u8>>` that would not be sound. For example:
20//!
21//! * The future send operation is started, recording an address of `&[u8]`.
22//! * The future is then leaked.
23//! * According to rustc, later in code the original `&[u8]` is then no longer
24//! borrowed.
25//! * The original source of `&[u8]` could then be deallocated.
26//! * Then the component model actually reads the pointer that it was given.
27//!
28//! This constraint effectively means that all types flowing in-and-out of
29//! futures, streams, and async APIs are all "owned values", notably no
30//! lifetimes. This requires, for example, that `future<list<u8>>` operates on
31//! `Vec<u8>`.
32//!
33//! This is in stark contrast to bindings generated for `list<u8>` otherwise,
34//! however, where for example a synchronous import with a `list<u8>` argument
35//! would be bound with a `&[u8]` argument. Until Rust has some form of linear
36//! types, however, it's not possible to loosen this restriction soundly because
37//! it's generally not safe to leak an active I/O operation. This restriction is
38//! similar to why it's so difficult to bind `io_uring` in safe Rust, which
39//! operates similarly to the component model where pointers are submitted and
40//! read in the future after the original call for submission returns.
41//!
42//! ## Lowering Owned Values
43//!
44//! According to the above everything with futures/streams operates on owned
45//! values already, but this also affects precisely how lifting and lowering is
46//! performed. In general any active asynchronous operation could be cancelled
47//! at any time, meaning we have to deal with situations such as:
48//!
49//! * A `write` hasn't even started yet.
50//! * A `write` was started and then cancelled.
51//! * A `write` was started and then the other end dropped the channel.
52//! * A `write` was started and then the other end received the value.
53//!
54//! In all of these situations regardless of the structure of `T` we can't leak
55//! memory. The `future.write` intrinsic, however, takes no ownership of the
56//! memory involved which means that we're still responsible for cleaning up
57//! lists. It does take ownership, however, of `own<T>` handles and other
58//! resources.
59//!
60//! The way that this is solved for futures/streams is to lean further into
61//! processing owned values. Namely lowering a `T` takes `T`-by-value, not `&T`.
62//! This means that lowering operates similarly to return values of exported
63//! functions, not parameters to imported functions. By lowering an owned value
64//! of `T` this preserves a nice property where the lowered value has exclusive
65//! ownership of all of its pointers/resources/etc. Lowering `&T` may require a
66//! "cleanup list" for example which we avoid here entirely.
67//!
68//! This then makes the second and third cases above, getting a value back after
69//! lowering, much easier. Namely re-acquisition of a value is simple `lift`
70//! operation as if we received a value on the channel.
71//!
72//! ## Inefficiencies
73//!
74//! The above requirements generally mean that this is not a hyper-efficient
75//! implementation. All writes and reads, for example, start out with allocation
76//! memory on the heap to be owned by the asynchronous operation. Writing a
77//! `list<u8>` to a future passes ownership of `Vec<u8>` but in theory doesn't
78//! not actually require relinquishing ownership of the vector. Furthermore
79//! there's no way to re-acquire a `T` after it has been sent, but all of `T` is
80//! still valid except for `own<U>` resources.
81//!
82//! That's all to say that this implementation can probably still be improved
83//! upon, but doing so is thought to be pretty nontrivial at this time. It
84//! should be noted though that there are other high-level inefficiencies with
85//! WIT unrelated to this module. For example `list<T>` is not always
86//! represented the same in Rust as it is in the canonical ABI. That means that
87//! sending `list<T>` into a future might require copying the entire list and
88//! changing its layout. Currently this is par-for-the-course with bindings.
89//!
90//! ## Linear (exactly once) Writes
91//!
92//! The component model requires that a writable end of a future must be written
93//! to before closing, otherwise the drop operation traps. Ideally usage of
94//! this API shouldn't result in traps so this is modeled in the Rust-level API
95//! to prevent this trap from occurring. Rust does not support linear types
96//! (types that must be used exactly once), instead it only has affine types
97//! (types which must be used at most once), meaning that this requires some
98//! runtime support.
99//!
100//! Specifically the `FutureWriter` structure stores two auxiliary Rust-specific
101//! pieces of information:
102//!
103//! * A `should_write_default_value` boolean - if `true` on destruction then a
104//! value has not yet been written and something must be written.
105//! * A `default: fn() -> T` constructor to lazily create the default value to
106//! be sent in this situation.
107//!
108//! This `default` field is provided by the user when the future is initially
109//! created. Additionally during `Drop` a new Rust-level task will be spawned to
110//! perform the write in the background. That'll keep the component-level task
111//! alive until that write completes but otherwise shouldn't hinder anything
112//! else.
113
114use crate::rt::Cleanup;
115use crate::rt::async_support::ReturnCode;
116use crate::rt::async_support::try_lock::TryLock;
117use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
118use alloc::sync::Arc;
119use alloc::task::Wake;
120use core::alloc::Layout;
121use core::fmt;
122use core::future::{Future, IntoFuture};
123use core::marker;
124use core::mem::{self, ManuallyDrop};
125use core::pin::Pin;
126use core::ptr;
127use core::sync::atomic::{AtomicU32, Ordering::Relaxed};
128use core::task::{Context, Poll, Waker};
129
130/// Helper trait which encapsulates the various operations which can happen
131/// with a future.
132pub trait FutureOps {
133 /// The Rust type that's sent or received on this future.
134 type Payload;
135
136 /// The `future.new` intrinsic.
137 fn new(&mut self) -> u64;
138 /// The canonical ABI layout of the type that this future is
139 /// sending/receiving.
140 fn elem_layout(&mut self) -> Layout;
141 /// Converts a Rust type to its canonical ABI representation.
142 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
143 /// Used to deallocate any Rust-owned lists in the canonical ABI
144 /// representation for when a value is successfully sent but needs to be
145 /// cleaned up.
146 unsafe fn dealloc_lists(&mut self, dst: *mut u8);
147 /// Converts from the canonical ABI representation to a Rust value.
148 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
149 /// The `future.write` intrinsic
150 unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32;
151 /// The `future.read` intrinsic
152 unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32;
153 /// The `future.cancel-read` intrinsic
154 unsafe fn cancel_read(&mut self, future: u32) -> u32;
155 /// The `future.cancel-write` intrinsic
156 unsafe fn cancel_write(&mut self, future: u32) -> u32;
157 /// The `future.drop-readable` intrinsic
158 unsafe fn drop_readable(&mut self, future: u32);
159 /// The `future.drop-writable` intrinsic
160 unsafe fn drop_writable(&mut self, future: u32);
161}
162
163/// Function table used for [`FutureWriter`] and [`FutureReader`]
164///
165/// Instances of this table are generated by `wit_bindgen::generate!`. This is
166/// not a trait to enable different `FutureVtable<()>` instances to exist, for
167/// example, through different calls to `wit_bindgen::generate!`.
168///
169/// It's not intended that any user implements this vtable, instead it's
170/// intended to only be auto-generated.
171#[doc(hidden)]
172pub struct FutureVtable<T> {
173 /// The Canonical ABI layout of `T` in-memory.
174 pub layout: Layout,
175
176 /// A callback to consume a value of `T` and lower it to the canonical ABI
177 /// pointed to by `dst`.
178 ///
179 /// The `dst` pointer should have `self.layout`. This is used to convert
180 /// in-memory representations in Rust to their canonical representations in
181 /// the component model.
182 pub lower: unsafe fn(value: T, dst: *mut u8),
183
184 /// A callback to deallocate any lists within the canonical ABI value `dst`
185 /// provided.
186 ///
187 /// This is used when a value is successfully sent to another component. In
188 /// such a situation it may be possible that the canonical lowering of `T`
189 /// has lists that are still owned by this component and must be
190 /// deallocated. This is akin to a `post-return` callback for returns of
191 /// exported functions.
192 pub dealloc_lists: unsafe fn(dst: *mut u8),
193
194 /// A callback to lift a value of `T` from the canonical ABI representation
195 /// provided.
196 pub lift: unsafe fn(dst: *mut u8) -> T,
197
198 /// The raw `future.write` intrinsic.
199 pub start_write: unsafe extern "C" fn(future: u32, val: *const u8) -> u32,
200 /// The raw `future.read` intrinsic.
201 pub start_read: unsafe extern "C" fn(future: u32, val: *mut u8) -> u32,
202 /// The raw `future.cancel-write` intrinsic.
203 pub cancel_write: unsafe extern "C" fn(future: u32) -> u32,
204 /// The raw `future.cancel-read` intrinsic.
205 pub cancel_read: unsafe extern "C" fn(future: u32) -> u32,
206 /// The raw `future.drop-writable` intrinsic.
207 pub drop_writable: unsafe extern "C" fn(future: u32),
208 /// The raw `future.drop-readable` intrinsic.
209 pub drop_readable: unsafe extern "C" fn(future: u32),
210 /// The raw `future.new` intrinsic.
211 pub new: unsafe extern "C" fn() -> u64,
212}
213
214impl<T> FutureOps for &FutureVtable<T> {
215 type Payload = T;
216
217 fn new(&mut self) -> u64 {
218 unsafe { (self.new)() }
219 }
220 fn elem_layout(&mut self) -> Layout {
221 self.layout
222 }
223 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
224 unsafe { (self.lower)(payload, dst) }
225 }
226 unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
227 unsafe { (self.dealloc_lists)(dst) }
228 }
229 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
230 unsafe { (self.lift)(dst) }
231 }
232 unsafe fn start_write(&mut self, future: u32, val: *const u8) -> u32 {
233 unsafe { (self.start_write)(future, val) }
234 }
235 unsafe fn start_read(&mut self, future: u32, val: *mut u8) -> u32 {
236 unsafe { (self.start_read)(future, val) }
237 }
238 unsafe fn cancel_read(&mut self, future: u32) -> u32 {
239 unsafe { (self.cancel_read)(future) }
240 }
241 unsafe fn cancel_write(&mut self, future: u32) -> u32 {
242 unsafe { (self.cancel_write)(future) }
243 }
244 unsafe fn drop_readable(&mut self, future: u32) {
245 unsafe { (self.drop_readable)(future) }
246 }
247 unsafe fn drop_writable(&mut self, future: u32) {
248 unsafe { (self.drop_writable)(future) }
249 }
250}
251
252/// Helper function to create a new read/write pair for a component model
253/// future.
254///
255/// # Unsafety
256///
257/// This function is unsafe as it requires the functions within `vtable` to
258/// correctly uphold the contracts of the component model.
259pub unsafe fn future_new<T>(
260 default: fn() -> T,
261 vtable: &'static FutureVtable<T>,
262) -> (FutureWriter<T>, FutureReader<T>) {
263 let (tx, rx) = unsafe { raw_future_new(vtable) };
264 (unsafe { FutureWriter::new(tx, default) }, rx)
265}
266
267/// Helper function to create a new read/write pair for a component model
268/// future.
269///
270/// # Unsafety
271///
272/// This function is unsafe as it requires the functions within `vtable` to
273/// correctly uphold the contracts of the component model.
274pub unsafe fn raw_future_new<O>(mut ops: O) -> (RawFutureWriter<O>, RawFutureReader<O>)
275where
276 O: FutureOps + Clone,
277{
278 unsafe {
279 let handles = ops.new();
280 let reader = handles as u32;
281 let writer = (handles >> 32) as u32;
282 rtdebug!("future.new() = [{writer}, {reader}]");
283 (
284 RawFutureWriter::new(writer, ops.clone()),
285 RawFutureReader::new(reader, ops),
286 )
287 }
288}
289
290/// Represents the writable end of a Component Model `future`.
291///
292/// A [`FutureWriter`] can be used to send a single value of `T` to the other
293/// end of a `future`. In a sense this is similar to a oneshot channel in Rust.
294pub struct FutureWriter<T: 'static> {
295 raw: ManuallyDrop<RawFutureWriter<&'static FutureVtable<T>>>,
296
297 /// Whether or not a value should be written during `drop`.
298 ///
299 /// This is set to `false` when a value is successfully written or when a
300 /// value is written but the future is witnessed as being dropped.
301 ///
302 /// Note that this is set to `true` on construction to ensure that only
303 /// location which actually witness a completed write set it to `false`.
304 should_write_default_value: bool,
305
306 /// Constructor for the default value to write during `drop`, should one
307 /// need to be written.
308 default: fn() -> T,
309}
310
311impl<T> FutureWriter<T> {
312 /// Helper function to wrap a handle/vtable into a `FutureWriter`.
313 ///
314 /// # Unsafety
315 ///
316 /// This function is unsafe as it requires the functions within `vtable` to
317 /// correctly uphold the contracts of the component model.
318 unsafe fn new(raw: RawFutureWriter<&'static FutureVtable<T>>, default: fn() -> T) -> Self {
319 Self {
320 raw: ManuallyDrop::new(raw),
321 default,
322 should_write_default_value: true,
323 }
324 }
325
326 /// Write the specified `value` to this `future`.
327 ///
328 /// This method is equivalent to an `async fn` which sends the `value` into
329 /// this future. The asynchronous operation acts as a rendezvous where the
330 /// operation does not complete until the other side has successfully
331 /// received the value.
332 ///
333 /// # Return Value
334 ///
335 /// The returned [`FutureWrite`] is a future that can be `.await`'d. The
336 /// return value of this future is:
337 ///
338 /// * `Ok(())` - the `value` was sent and received. The `self` value was
339 /// consumed along the way and will no longer be accessible.
340 /// * `Err(FutureWriteError { value })` - an attempt was made to send
341 /// `value` but the other half of this [`FutureWriter`] was dropped before
342 /// the value was received. This consumes `self` because the channel is
343 /// now dropped, but `value` is returned in case the caller wants to reuse
344 /// it.
345 ///
346 /// # Cancellation
347 ///
348 /// The returned future can be cancelled normally via `drop` which means
349 /// that the `value` provided here, along with this `FutureWriter` itself,
350 /// will be lost. There is also [`FutureWrite::cancel`] which can be used to
351 /// possibly re-acquire `value` and `self` if the operation was cancelled.
352 /// In such a situation the operation can be retried at a future date.
353 pub fn write(mut self, value: T) -> FutureWrite<T> {
354 let raw = unsafe { ManuallyDrop::take(&mut self.raw).write(value) };
355 let default = self.default;
356 mem::forget(self);
357 FutureWrite { raw, default }
358 }
359}
360
361impl<T> fmt::Debug for FutureWriter<T> {
362 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363 f.debug_struct("FutureWriter")
364 .field("handle", &self.raw.handle)
365 .finish()
366 }
367}
368
369impl<T> Drop for FutureWriter<T> {
370 fn drop(&mut self) {
371 // If a value has not yet been written into this writer than that must
372 // be done so now. Take the `raw` writer and perform the write via a
373 // waker that drives the future.
374 //
375 // If `should_write_default_value` is `false` then a write has already
376 // happened and we can go ahead and just synchronously drop this writer
377 // as we would any other handle.
378 if self.should_write_default_value {
379 let raw = unsafe { ManuallyDrop::take(&mut self.raw) };
380 let value = (self.default)();
381 raw.write_and_forget(value);
382 } else {
383 unsafe { ManuallyDrop::drop(&mut self.raw) }
384 }
385 }
386}
387
388/// Represents a write operation which may be cancelled prior to completion.
389///
390/// This is returned by [`FutureWriter::write`].
391pub struct FutureWrite<T: 'static> {
392 raw: RawFutureWrite<&'static FutureVtable<T>>,
393 default: fn() -> T,
394}
395
396/// Result of [`FutureWrite::cancel`].
397#[derive(Debug)]
398pub enum FutureWriteCancel<T: 'static> {
399 /// The cancel request raced with the receipt of the sent value, and the
400 /// value was actually sent. Neither the value nor the writer are made
401 /// available here as both are gone.
402 AlreadySent,
403
404 /// The other end was dropped before cancellation happened.
405 ///
406 /// In this case the original value is returned back to the caller but the
407 /// writer itself is not longer accessible as it's no longer usable.
408 Dropped(T),
409
410 /// The pending write was successfully cancelled and the value being written
411 /// is returned along with the writer to resume again in the future if
412 /// necessary.
413 Cancelled(T, FutureWriter<T>),
414}
415
416impl<T: 'static> Future for FutureWrite<T> {
417 type Output = Result<(), FutureWriteError<T>>;
418
419 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
420 self.pin_project().poll(cx)
421 }
422}
423
424impl<T: 'static> FutureWrite<T> {
425 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut RawFutureWrite<&'static FutureVtable<T>>> {
426 // SAFETY: we've chosen that when `Self` is pinned that it translates to
427 // always pinning the inner field, so that's codified here.
428 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().raw) }
429 }
430
431 /// Cancel this write if it hasn't already completed.
432 ///
433 /// This method can be used to cancel a write-in-progress and re-acquire
434 /// the writer and the value being sent. Note that the write operation may
435 /// succeed racily or the other end may also drop racily, and these
436 /// outcomes are reflected in the returned value here.
437 ///
438 /// # Panics
439 ///
440 /// Panics if the operation has already been completed via `Future::poll`,
441 /// or if this method is called twice.
442 pub fn cancel(self: Pin<&mut Self>) -> FutureWriteCancel<T> {
443 let default = self.default;
444 match self.pin_project().cancel() {
445 RawFutureWriteCancel::AlreadySent => FutureWriteCancel::AlreadySent,
446 RawFutureWriteCancel::Dropped(val) => FutureWriteCancel::Dropped(val),
447 RawFutureWriteCancel::Cancelled(val, raw) => FutureWriteCancel::Cancelled(
448 val,
449 FutureWriter {
450 raw: ManuallyDrop::new(raw),
451 default,
452 should_write_default_value: true,
453 },
454 ),
455 }
456 }
457}
458
459impl<T: 'static> Drop for FutureWrite<T> {
460 fn drop(&mut self) {
461 if self.raw.op.is_done() {
462 return;
463 }
464
465 // Although the underlying `WaitableOperation` will already
466 // auto-cancel-on-drop we need to specially handle that here because if
467 // the cancellation goes through then it means that no value will have
468 // been written to this future which will cause a trap. By using
469 // `Self::cancel` it's ensured that if cancellation succeeds a
470 // `FutureWriter` is created. In `Drop for FutureWriter` that'll handle
471 // the last-ditch write-default logic.
472 //
473 // SAFETY: we're in the destructor here so the value `self` is about
474 // to go away and we can guarantee we're not moving out of it.
475 let pin = unsafe { Pin::new_unchecked(self) };
476 pin.cancel();
477 }
478}
479
480/// Raw version of [`FutureWriter`].
481pub struct RawFutureWriter<O: FutureOps> {
482 handle: u32,
483 ops: O,
484}
485
486impl<O: FutureOps> RawFutureWriter<O> {
487 unsafe fn new(handle: u32, ops: O) -> Self {
488 Self { handle, ops }
489 }
490
491 /// Same as [`FutureWriter::write`], but the raw version.
492 pub fn write(self, value: O::Payload) -> RawFutureWrite<O> {
493 RawFutureWrite {
494 op: WaitableOperation::new(FutureWriteOp(marker::PhantomData), (self, value)),
495 }
496 }
497
498 /// Writes `value` in the background.
499 ///
500 /// This does not block and is not cancellable.
501 pub fn write_and_forget(self, value: O::Payload)
502 where
503 O: 'static,
504 {
505 return Arc::new(DeferredWrite {
506 write: TryLock::new(self.write(value)),
507 })
508 .wake();
509
510 /// Helper structure which behaves both as a future of sorts and an
511 /// executor of sorts.
512 ///
513 /// This type is constructed in `Drop for FutureWriter<T>` to send out a
514 /// default value when no other has been written. This manages the
515 /// `FutureWrite` operation happening internally through a `Wake`
516 /// implementation. That means that this is a sort of cyclical future
517 /// which, when woken, will complete the write operation.
518 ///
519 /// The purpose of this is to be a "lightweight" way of "spawn"-ing a
520 /// future write to happen in the background. Crucially, however, this
521 /// doesn't require the `async-spawn` feature and instead works with the
522 /// `wasip3_task` C ABI structures (which spawn doesn't support).
523 struct DeferredWrite<O: FutureOps> {
524 write: TryLock<RawFutureWrite<O>>,
525 }
526
527 // SAFETY: Needed to satisfy `Waker::from` but otherwise should be ok
528 // because wasm doesn't have threads anyway right now.
529 unsafe impl<O: FutureOps> Send for DeferredWrite<O> {}
530 unsafe impl<O: FutureOps> Sync for DeferredWrite<O> {}
531
532 impl<O: FutureOps + 'static> Wake for DeferredWrite<O> {
533 fn wake(self: Arc<Self>) {
534 // When a `wake` signal comes in that should happen in two
535 // locations:
536 //
537 // 1. When `DeferredWrite` is initially constructed.
538 // 2. When an event comes in indicating that the internal write
539 // has completed.
540 //
541 // The implementation here is the same in both cases. A clone of
542 // `self` is converted to a `Waker`, and this `Waker` notably
543 // owns the internal future itself. The internal write operation
544 // is then pushed forward (e.g. it's issued in (1) or checked up
545 // on in (2)).
546 //
547 // If `Pending` is returned then `waker` should have been stored
548 // away within the `wasip3_task` C ABI structure. Otherwise it
549 // should not have been stored away and `self` should be the
550 // sole reference which means everything will get cleaned up
551 // when this function returns.
552 let poll = {
553 let waker = Waker::from(self.clone());
554 let mut cx = Context::from_waker(&waker);
555 let mut write = self.write.try_lock().unwrap();
556 unsafe { Pin::new_unchecked(&mut *write).poll(&mut cx) }
557 };
558 if poll.is_ready() {
559 assert_eq!(Arc::strong_count(&self), 1);
560 } else {
561 assert!(Arc::strong_count(&self) > 1);
562 }
563 assert_eq!(Arc::weak_count(&self), 0);
564 }
565 }
566 }
567}
568
569impl<O: FutureOps> fmt::Debug for RawFutureWriter<O> {
570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571 f.debug_struct("RawFutureWriter")
572 .field("handle", &self.handle)
573 .finish()
574 }
575}
576
577impl<O: FutureOps> Drop for RawFutureWriter<O> {
578 fn drop(&mut self) {
579 unsafe {
580 rtdebug!("future.drop-writable({})", self.handle);
581 self.ops.drop_writable(self.handle);
582 }
583 }
584}
585
586/// Represents a write operation which may be cancelled prior to completion.
587///
588/// This is returned by [`FutureWriter::write`].
589pub struct RawFutureWrite<O: FutureOps> {
590 op: WaitableOperation<FutureWriteOp<O>>,
591}
592
593struct FutureWriteOp<O>(marker::PhantomData<O>);
594
595enum WriteComplete<T> {
596 Written,
597 Dropped(T),
598 Cancelled(T),
599}
600
601unsafe impl<O: FutureOps> WaitableOp for FutureWriteOp<O> {
602 type Start = (RawFutureWriter<O>, O::Payload);
603 type InProgress = (RawFutureWriter<O>, Option<Cleanup>);
604 type Result = (WriteComplete<O::Payload>, RawFutureWriter<O>);
605 type Cancel = RawFutureWriteCancel<O>;
606
607 fn start(&mut self, (mut writer, value): Self::Start) -> (u32, Self::InProgress) {
608 // TODO: it should be safe to store the lower-destination in
609 // `WaitableOperation` using `Pin` memory and such, but that would
610 // require some type-level trickery to get a correctly-sized value
611 // plumbed all the way to here. For now just dynamically allocate it and
612 // leave the optimization of leaving out this dynamic allocation to the
613 // future.
614 //
615 // In lieu of that a dedicated location on the heap is created for the
616 // lowering, and then `value`, as an owned value, is lowered into this
617 // pointer to initialize it.
618 let (ptr, cleanup) = Cleanup::new(writer.ops.elem_layout());
619 // SAFETY: `ptr` is allocated with `ops.layout` and should be
620 // safe to use here.
621 let code = unsafe {
622 writer.ops.lower(value, ptr);
623 writer.ops.start_write(writer.handle, ptr)
624 };
625 rtdebug!("future.write({}, {ptr:?}) = {code:#x}", writer.handle);
626 (code, (writer, cleanup))
627 }
628
629 fn start_cancelled(&mut self, (writer, value): Self::Start) -> Self::Cancel {
630 RawFutureWriteCancel::Cancelled(value, writer)
631 }
632
633 fn in_progress_update(
634 &mut self,
635 (mut writer, cleanup): Self::InProgress,
636 code: u32,
637 ) -> Result<Self::Result, Self::InProgress> {
638 let ptr = cleanup
639 .as_ref()
640 .map(|c| c.ptr.as_ptr())
641 .unwrap_or(ptr::null_mut());
642 match code {
643 super::BLOCKED => Err((writer, cleanup)),
644
645 // The other end has dropped its end.
646 //
647 // The value was not received by the other end so `ptr` still has
648 // all of its resources intact. Use `lift` to construct a new
649 // instance of `T` which takes ownership of pointers and resources
650 // and such. The allocation of `ptr` is then cleaned up naturally
651 // when `cleanup` goes out of scope.
652 super::DROPPED | super::CANCELLED => {
653 // SAFETY: we're the ones managing `ptr` so we know it's safe to
654 // pass here.
655 let value = unsafe { writer.ops.lift(ptr) };
656 let status = if code == super::DROPPED {
657 WriteComplete::Dropped(value)
658 } else {
659 WriteComplete::Cancelled(value)
660 };
661 Ok((status, writer))
662 }
663
664 // This write has completed.
665 //
666 // Here we need to clean up our allocations. The `ptr` exclusively
667 // owns all of the value being sent and we notably need to cleanup
668 // the transitive list allocations present in this pointer. Use
669 // `dealloc_lists` for that (effectively a post-return lookalike).
670 //
671 // Afterwards the `cleanup` itself is naturally dropped and cleaned
672 // up.
673 super::COMPLETED => {
674 // SAFETY: we're the ones managing `ptr` so we know it's safe to
675 // pass here.
676 unsafe {
677 writer.ops.dealloc_lists(ptr);
678 }
679 Ok((WriteComplete::Written, writer))
680 }
681
682 other => unreachable!("unexpected code {other:?}"),
683 }
684 }
685
686 fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 {
687 writer.handle
688 }
689
690 fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 {
691 // SAFETY: we're managing `writer` and all the various operational bits,
692 // so this relies on `WaitableOperation` being safe.
693 let code = unsafe { writer.ops.cancel_write(writer.handle) };
694 rtdebug!("future.cancel-write({}) = {code:#x}", writer.handle);
695 code
696 }
697
698 fn result_into_cancel(&mut self, (result, writer): Self::Result) -> Self::Cancel {
699 match result {
700 // The value was actually sent, meaning we can't yield back the
701 // future nor the value.
702 WriteComplete::Written => RawFutureWriteCancel::AlreadySent,
703
704 // The value was not sent because the other end either hung up or we
705 // successfully cancelled. In both cases return back the value here
706 // with the writer.
707 WriteComplete::Dropped(val) => RawFutureWriteCancel::Dropped(val),
708 WriteComplete::Cancelled(val) => RawFutureWriteCancel::Cancelled(val, writer),
709 }
710 }
711}
712
713impl<O: FutureOps> Future for RawFutureWrite<O> {
714 type Output = Result<(), FutureWriteError<O::Payload>>;
715
716 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
717 self.pin_project()
718 .poll_complete(cx)
719 .map(|(result, _writer)| match result {
720 WriteComplete::Written => Ok(()),
721 WriteComplete::Dropped(value) | WriteComplete::Cancelled(value) => {
722 Err(FutureWriteError { value })
723 }
724 })
725 }
726}
727
728impl<O: FutureOps> RawFutureWrite<O> {
729 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureWriteOp<O>>> {
730 // SAFETY: we've chosen that when `Self` is pinned that it translates to
731 // always pinning the inner field, so that's codified here.
732 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
733 }
734
735 /// Same as [`FutureWrite::cancel`], but returns a [`RawFutureWriteCancel`]
736 /// instead.
737 pub fn cancel(self: Pin<&mut Self>) -> RawFutureWriteCancel<O> {
738 self.pin_project().cancel()
739 }
740}
741
742/// Error type in the result of [`FutureWrite`], or the error type that is a result of
743/// a failure to write a future.
744pub struct FutureWriteError<T> {
745 /// The value that could not be sent.
746 pub value: T,
747}
748
749impl<T> fmt::Debug for FutureWriteError<T> {
750 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
751 f.debug_struct("FutureWriteError").finish_non_exhaustive()
752 }
753}
754
755impl<T> fmt::Display for FutureWriteError<T> {
756 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
757 "read end dropped".fmt(f)
758 }
759}
760
761impl<T> core::error::Error for FutureWriteError<T> {}
762
763/// Result of [`FutureWrite::cancel`].
764#[derive(Debug)]
765pub enum RawFutureWriteCancel<O: FutureOps> {
766 /// The cancel request raced with the receipt of the sent value, and the
767 /// value was actually sent. Neither the value nor the writer are made
768 /// available here as both are gone.
769 AlreadySent,
770
771 /// The other end was dropped before cancellation happened.
772 ///
773 /// In this case the original value is returned back to the caller but the
774 /// writer itself is not longer accessible as it's no longer usable.
775 Dropped(O::Payload),
776
777 /// The pending write was successfully cancelled and the value being written
778 /// is returned along with the writer to resume again in the future if
779 /// necessary.
780 Cancelled(O::Payload, RawFutureWriter<O>),
781}
782
783/// Represents the readable end of a Component Model `future<T>`.
784pub type FutureReader<T> = RawFutureReader<&'static FutureVtable<T>>;
785
786/// Represents the readable end of a Component Model `future<T>`.
787pub struct RawFutureReader<O: FutureOps> {
788 handle: AtomicU32,
789 ops: O,
790}
791
792impl<O: FutureOps> fmt::Debug for RawFutureReader<O> {
793 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
794 f.debug_struct("RawFutureReader")
795 .field("handle", &self.handle)
796 .finish()
797 }
798}
799
800impl<O: FutureOps> RawFutureReader<O> {
801 /// Raw constructor for a future reader.
802 ///
803 /// Takes ownership of the `handle` provided.
804 ///
805 /// # Safety
806 ///
807 /// The `ops` specified must be both valid and well-typed for `handle`.
808 pub unsafe fn new(handle: u32, ops: O) -> Self {
809 Self {
810 handle: AtomicU32::new(handle),
811 ops,
812 }
813 }
814
815 #[doc(hidden)]
816 pub fn take_handle(&self) -> u32 {
817 let ret = self.opt_handle().unwrap();
818 self.handle.store(u32::MAX, Relaxed);
819 ret
820 }
821
822 fn handle(&self) -> u32 {
823 self.opt_handle().unwrap()
824 }
825
826 fn opt_handle(&self) -> Option<u32> {
827 match self.handle.load(Relaxed) {
828 u32::MAX => None,
829 other => Some(other),
830 }
831 }
832}
833
834impl<O: FutureOps> IntoFuture for RawFutureReader<O> {
835 type Output = O::Payload;
836 type IntoFuture = RawFutureRead<O>;
837
838 /// Convert this object into a `Future` which will resolve when a value is
839 /// written to the writable end of this `future`.
840 fn into_future(self) -> Self::IntoFuture {
841 RawFutureRead {
842 op: WaitableOperation::new(FutureReadOp(marker::PhantomData), self),
843 }
844 }
845}
846
847impl<O: FutureOps> Drop for RawFutureReader<O> {
848 fn drop(&mut self) {
849 let Some(handle) = self.opt_handle() else {
850 return;
851 };
852 unsafe {
853 rtdebug!("future.drop-readable({handle})");
854 self.ops.drop_readable(handle);
855 }
856 }
857}
858
859/// Represents a read operation which may be cancelled prior to completion.
860///
861/// This represents a read operation on a [`FutureReader`] and is created via
862/// `IntoFuture`.
863pub type FutureRead<T> = RawFutureRead<&'static FutureVtable<T>>;
864
865/// Represents a read operation which may be cancelled prior to completion.
866///
867/// This represents a read operation on a [`FutureReader`] and is created via
868/// `IntoFuture`.
869pub struct RawFutureRead<O: FutureOps> {
870 op: WaitableOperation<FutureReadOp<O>>,
871}
872
873struct FutureReadOp<O>(marker::PhantomData<O>);
874
875enum ReadComplete<T> {
876 Value(T),
877 Cancelled,
878}
879
880unsafe impl<O: FutureOps> WaitableOp for FutureReadOp<O> {
881 type Start = RawFutureReader<O>;
882 type InProgress = (RawFutureReader<O>, Option<Cleanup>);
883 type Result = (ReadComplete<O::Payload>, RawFutureReader<O>);
884 type Cancel = Result<O::Payload, RawFutureReader<O>>;
885
886 fn start(&mut self, mut reader: Self::Start) -> (u32, Self::InProgress) {
887 let (ptr, cleanup) = Cleanup::new(reader.ops.elem_layout());
888 // SAFETY: `ptr` is allocated with `vtable.layout` and should be
889 // safe to use here. Its lifetime for the async operation is hinged on
890 // `WaitableOperation` being safe.
891 let code = unsafe { reader.ops.start_read(reader.handle(), ptr) };
892 rtdebug!("future.read({}, {ptr:?}) = {code:#x}", reader.handle());
893 (code, (reader, cleanup))
894 }
895
896 fn start_cancelled(&mut self, state: Self::Start) -> Self::Cancel {
897 Err(state)
898 }
899
900 fn in_progress_update(
901 &mut self,
902 (mut reader, cleanup): Self::InProgress,
903 code: u32,
904 ) -> Result<Self::Result, Self::InProgress> {
905 match ReturnCode::decode(code) {
906 ReturnCode::Blocked => Err((reader, cleanup)),
907
908 // Let `cleanup` fall out of scope to clean up its allocation here,
909 // and otherwise tahe reader is plumbed through to possibly restart
910 // the read in the future.
911 ReturnCode::Cancelled(0) => Ok((ReadComplete::Cancelled, reader)),
912
913 // The read has completed, so lift the value from the stored memory and
914 // `cleanup` naturally falls out of scope after transferring ownership of
915 // everything to the returned `value`.
916 ReturnCode::Completed(0) => {
917 let ptr = cleanup
918 .as_ref()
919 .map(|c| c.ptr.as_ptr())
920 .unwrap_or(ptr::null_mut());
921
922 // SAFETY: we're the ones managing `ptr` so we know it's safe to
923 // pass here.
924 let value = unsafe { reader.ops.lift(ptr) };
925 Ok((ReadComplete::Value(value), reader))
926 }
927
928 other => panic!("unexpected code {other:?}"),
929 }
930 }
931
932 fn in_progress_waitable(&mut self, (reader, _): &Self::InProgress) -> u32 {
933 reader.handle()
934 }
935
936 fn in_progress_cancel(&mut self, (reader, _): &mut Self::InProgress) -> u32 {
937 // SAFETY: we're managing `reader` and all the various operational bits,
938 // so this relies on `WaitableOperation` being safe.
939 let code = unsafe { reader.ops.cancel_read(reader.handle()) };
940 rtdebug!("future.cancel-read({}) = {code:#x}", reader.handle());
941 code
942 }
943
944 fn result_into_cancel(&mut self, (value, reader): Self::Result) -> Self::Cancel {
945 match value {
946 // The value was actually read, so thread that through here.
947 ReadComplete::Value(value) => Ok(value),
948
949 // The read was successfully cancelled, so thread through the
950 // `reader` to possibly restart later on.
951 ReadComplete::Cancelled => Err(reader),
952 }
953 }
954}
955
956impl<O: FutureOps> Future for RawFutureRead<O> {
957 type Output = O::Payload;
958
959 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
960 self.pin_project()
961 .poll_complete(cx)
962 .map(|(result, _reader)| match result {
963 ReadComplete::Value(val) => val,
964 // This is only possible if, after calling `FutureRead::cancel`,
965 // the future is polled again. The `cancel` method is documented
966 // as "don't do that" so this is left to panic.
967 ReadComplete::Cancelled => panic!("cannot poll after cancelling"),
968 })
969 }
970}
971
972impl<O: FutureOps> RawFutureRead<O> {
973 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<FutureReadOp<O>>> {
974 // SAFETY: we've chosen that when `Self` is pinned that it translates to
975 // always pinning the inner field, so that's codified here.
976 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
977 }
978
979 /// Cancel this read if it hasn't already completed.
980 ///
981 /// Return values include:
982 ///
983 /// * `Ok(value)` - future completed before this cancellation request
984 /// was received.
985 /// * `Err(reader)` - read operation was cancelled and it can be retried in
986 /// the future if desired.
987 ///
988 /// # Panics
989 ///
990 /// Panics if the operation has already been completed via `Future::poll`,
991 /// or if this method is called twice. Additionally if this method completes
992 /// then calling `poll` again on `self` will panic.
993 pub fn cancel(self: Pin<&mut Self>) -> Result<O::Payload, RawFutureReader<O>> {
994 self.pin_project().cancel()
995 }
996}