wit_bindgen/rt/async_support/stream_support.rs
1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode};
6use {
7 crate::rt::Cleanup,
8 std::{
9 alloc::Layout,
10 fmt,
11 future::Future,
12 pin::Pin,
13 ptr,
14 sync::atomic::{AtomicU32, Ordering::Relaxed},
15 task::{Context, Poll},
16 vec::Vec,
17 },
18};
19
20/// Maximum size of a read/write operation as specified by the canonical ABI.
21const MAX_LENGTH: usize = (1 << 28) - 1;
22
23/// Operations that a stream requires throughout the implementation.
24///
25/// This is generated by `wit_bindgen::generate!` primarily.
26#[doc(hidden)]
27pub unsafe trait StreamOps: Clone {
28 /// The Rust type that's sent or received on this stream.
29 type Payload: 'static;
30
31 /// The `stream.new` intrinsic.
32 fn new(&mut self) -> u64;
33
34 /// The canonical ABI layout of the type that this stream is
35 /// sending/receiving.
36 fn elem_layout(&self) -> Layout;
37
38 /// Returns whether `lift` or `lower` is required to create `Self::Payload`.
39 ///
40 /// If this returns `false` then `Self::Payload` is natively in its
41 /// canonical ABI representation.
42 fn native_abi_matches_canonical_abi(&self) -> bool;
43
44 /// Returns whether `O::Payload` has lists that need to be deallocated with
45 /// `dealloc_lists`.
46 fn contains_lists(&self) -> bool;
47
48 /// Converts a Rust type to its canonical ABI representation.
49 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
50 /// Used to deallocate any Rust-owned lists in the canonical ABI
51 /// representation for when a value is successfully sent but needs to be
52 /// cleaned up.
53 unsafe fn dealloc_lists(&mut self, dst: *mut u8);
54 /// Converts from the canonical ABI representation to a Rust value.
55 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
56 /// The `stream.write` intrinsic
57 unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
58 /// The `stream.read` intrinsic
59 unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
60 /// The `stream.cancel-read` intrinsic
61 unsafe fn cancel_read(&mut self, stream: u32) -> u32;
62 /// The `stream.cancel-write` intrinsic
63 unsafe fn cancel_write(&mut self, stream: u32) -> u32;
64 /// The `stream.drop-readable` intrinsic
65 unsafe fn drop_readable(&mut self, stream: u32);
66 /// The `stream.drop-writable` intrinsic
67 unsafe fn drop_writable(&mut self, stream: u32);
68}
69/// Operations that a stream requires throughout the implementation.
70///
71/// This is generated by `wit_bindgen::generate!` primarily.
72#[doc(hidden)]
73pub struct StreamVtable<T> {
74 /// The in-memory canonical ABI layout of a single value of `T`.
75 pub layout: Layout,
76
77 /// An optional callback where if provided will lower an owned `T` value
78 /// into the `dst` pointer.
79 ///
80 /// If this is called the ownership of all of `T`'s lists and resources are
81 /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
82 /// the canonical ABI layout.
83 ///
84 /// If this is `None` then it means that `T` has the same layout in-memory
85 /// in Rust as it does in the canonical ABI. In such a situation the
86 /// lower/lift operation can be dropped.
87 pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
88
89 /// Callback used to deallocate any owned lists in `dst` after a value has
90 /// been successfully sent along a stream.
91 ///
92 /// `None` means that `T` has no lists internally.
93 pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
94
95 /// Dual of `lower`, and like `lower` if this is missing then it means that
96 /// `T` has the same in-memory representation in Rust and the canonical ABI.
97 pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
98
99 /// The raw `stream.write` intrinsic.
100 pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
101 /// The raw `stream.read` intrinsic.
102 pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
103 /// The raw `stream.cancel-write` intrinsic.
104 pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
105 /// The raw `stream.cancel-read` intrinsic.
106 pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
107 /// The raw `stream.drop-writable` intrinsic.
108 pub drop_writable: unsafe extern "C" fn(stream: u32),
109 /// The raw `stream.drop-readable` intrinsic.
110 pub drop_readable: unsafe extern "C" fn(stream: u32),
111 /// The raw `stream.new` intrinsic.
112 pub new: unsafe extern "C" fn() -> u64,
113}
114
115unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
116 type Payload = T;
117
118 fn new(&mut self) -> u64 {
119 unsafe { (self.new)() }
120 }
121 fn elem_layout(&self) -> Layout {
122 self.layout
123 }
124 fn native_abi_matches_canonical_abi(&self) -> bool {
125 self.lift.is_none()
126 }
127 fn contains_lists(&self) -> bool {
128 self.dealloc_lists.is_some()
129 }
130 unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
131 if let Some(f) = self.lower {
132 unsafe { f(payload, dst) }
133 }
134 }
135 unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
136 if let Some(f) = self.dealloc_lists {
137 unsafe { f(dst) }
138 }
139 }
140 unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
141 unsafe { (self.lift.unwrap())(dst) }
142 }
143 unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
144 unsafe { (self.start_write)(stream, val, amt) }
145 }
146 unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
147 unsafe { (self.start_read)(stream, val, amt) }
148 }
149 unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
150 unsafe { (self.cancel_read)(stream) }
151 }
152 unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
153 unsafe { (self.cancel_write)(stream) }
154 }
155 unsafe fn drop_readable(&mut self, stream: u32) {
156 unsafe { (self.drop_readable)(stream) }
157 }
158 unsafe fn drop_writable(&mut self, stream: u32) {
159 unsafe { (self.drop_writable)(stream) }
160 }
161}
162
163/// Helper function to create a new read/write pair for a component model
164/// stream.
165pub unsafe fn stream_new<T>(
166 vtable: &'static StreamVtable<T>,
167) -> (StreamWriter<T>, StreamReader<T>) {
168 unsafe { raw_stream_new(vtable) }
169}
170
171/// Helper function to create a new read/write pair for a component model
172/// stream.
173pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
174where
175 O: StreamOps + Clone,
176{
177 unsafe {
178 let handles = ops.new();
179 let reader = handles as u32;
180 let writer = (handles >> 32) as u32;
181 rtdebug!("stream.new() = [{writer}, {reader}]");
182 (
183 RawStreamWriter::new(writer, ops.clone()),
184 RawStreamReader::new(reader, ops),
185 )
186 }
187}
188
189/// Represents the writable end of a Component Model `stream`.
190pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
191
192/// Represents the writable end of a Component Model `stream`.
193pub struct RawStreamWriter<O: StreamOps> {
194 handle: u32,
195 ops: O,
196 done: bool,
197}
198
199impl<O> RawStreamWriter<O>
200where
201 O: StreamOps,
202{
203 #[doc(hidden)]
204 pub unsafe fn new(handle: u32, ops: O) -> Self {
205 Self {
206 handle,
207 ops,
208 done: false,
209 }
210 }
211
212 /// Returns the index of the component-model handle that this stream is
213 /// using.
214 pub fn handle(&self) -> u32 {
215 self.handle
216 }
217
218 /// Initiate a write of the `values` provided into this stream.
219 ///
220 /// This method is akin to an `async fn` except that the returned
221 /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
222 /// to re-acquire undelivered values.
223 ///
224 /// This method will perform at most a single write of the `values`
225 /// provided. The returned future will resolve once the write has completed.
226 ///
227 /// # Return Values
228 ///
229 /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
230 /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
231 /// were sent from `values` into this writer. A result of
232 /// `StreamResult::Dropped` means that no values were sent and the other side
233 /// has hung-up and sending values will no longer be possible.
234 ///
235 /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
236 /// original `values` provided here. That can be used to re-acquire `values`
237 /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
238 /// internal cursor of how many values have been written and if the write
239 /// should be resumed to write the entire buffer then the
240 /// [`StreamWriter::write_buf`] method can be used to resume writing at the
241 /// next value in the buffer.
242 ///
243 /// # Cancellation
244 ///
245 /// The returned [`StreamWrite`] future can be cancelled like any other Rust
246 /// future via `drop`, but this means that `values` will be lost within the
247 /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
248 /// in-progress write that is being done with `values`. This is effectively
249 /// a way of forcing the future to immediately resolve.
250 ///
251 /// Note that if this future is cancelled via `drop` it does not mean that
252 /// no values were sent. It may be possible that values were still sent
253 /// despite being cancelled. Cancelling a write and determining what
254 /// happened must be done with [`StreamWrite::cancel`].
255 pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
256 self.write_buf(AbiBuffer::new(values, self.ops.clone()))
257 }
258
259 /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
260 /// instead of `Vec<T>`.
261 pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
262 RawStreamWrite {
263 op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
264 }
265 }
266
267 /// Writes all of the `values` provided into this stream.
268 ///
269 /// This is a higher-level method than [`StreamWriter::write`] and does not
270 /// expose cancellation for example. This will successively attempt to write
271 /// all of `values` provided into this stream. Upon completion the same
272 /// vector will be returned and any remaining elements in the vector were
273 /// not sent because the stream was dropped.
274 pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
275 // Perform an initial write which converts `values` into `AbiBuffer`.
276 let (mut status, mut buf) = self.write(values).await;
277
278 // While the previous write completed and there's still remaining items
279 // in the buffer, perform another write.
280 while let StreamResult::Complete(_) = status {
281 if buf.remaining() == 0 {
282 break;
283 }
284 (status, buf) = self.write_buf(buf).await;
285
286 // FIXME(WebAssembly/component-model#490)
287 if status == StreamResult::Cancelled {
288 status = StreamResult::Complete(0);
289 }
290 }
291
292 // Return back any values that weren't written by shifting them to the
293 // front of the returned vector.
294 assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
295 buf.into_vec()
296 }
297
298 /// Writes the singular `value` provided
299 ///
300 /// This is a higher-level method than [`StreamWriter::write`] and does not
301 /// expose cancellation for example. This will attempt to send `value` on
302 /// this stream.
303 ///
304 /// If the other end hangs up then the value is returned back as
305 /// `Some(value)`, otherwise `None` is returned indicating the value was
306 /// sent.
307 pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
308 // TODO: can probably be a bit more efficient about this and avoid
309 // moving `value` onto the heap in some situations, but that's left as
310 // an optimization for later.
311 self.write_all(std::vec![value]).await.pop()
312 }
313}
314
315impl<O> fmt::Debug for RawStreamWriter<O>
316where
317 O: StreamOps,
318{
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 f.debug_struct("StreamWriter")
321 .field("handle", &self.handle)
322 .finish()
323 }
324}
325
326impl<O> Drop for RawStreamWriter<O>
327where
328 O: StreamOps,
329{
330 fn drop(&mut self) {
331 rtdebug!("stream.drop-writable({})", self.handle);
332 unsafe {
333 self.ops.drop_writable(self.handle);
334 }
335 }
336}
337
338/// Represents a write operation which may be cancelled prior to completion.
339pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
340
341/// Represents a write operation which may be cancelled prior to completion.
342pub struct RawStreamWrite<'a, O: StreamOps> {
343 op: WaitableOperation<StreamWriteOp<'a, O>>,
344}
345
346struct StreamWriteOp<'a, O: StreamOps> {
347 writer: &'a mut RawStreamWriter<O>,
348}
349
350/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
351/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
352#[derive(Copy, Clone, PartialEq, Eq, Debug)]
353pub enum StreamResult {
354 /// The provided number of values were successfully transferred.
355 ///
356 /// For writes this is how many items were written, and for reads this is
357 /// how many items were read.
358 Complete(usize),
359 /// No values were written, the other end has dropped its handle.
360 Dropped,
361 /// No values were written, the operation was cancelled.
362 Cancelled,
363}
364
365unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
366where
367 O: StreamOps,
368{
369 type Start = AbiBuffer<O>;
370 type InProgress = AbiBuffer<O>;
371 type Result = (StreamResult, AbiBuffer<O>);
372 type Cancel = (StreamResult, AbiBuffer<O>);
373
374 fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
375 if self.writer.done {
376 return (DROPPED, buf);
377 }
378
379 let (ptr, len) = buf.abi_ptr_and_len();
380 // SAFETY: sure hope this is safe, everything in this module and
381 // `AbiBuffer` is trying to make this safe.
382 let code = unsafe {
383 self.writer
384 .ops
385 .start_write(self.writer.handle, ptr, len.min(MAX_LENGTH))
386 };
387 rtdebug!(
388 "stream.write({}, {ptr:?}, {len}) = {code:#x}",
389 self.writer.handle
390 );
391 (code, buf)
392 }
393
394 fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
395 (StreamResult::Cancelled, buf)
396 }
397
398 fn in_progress_update(
399 &mut self,
400 mut buf: Self::InProgress,
401 code: u32,
402 ) -> Result<Self::Result, Self::InProgress> {
403 match ReturnCode::decode(code) {
404 ReturnCode::Blocked => Err(buf),
405 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
406 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
407 code @ (ReturnCode::Completed(amt)
408 | ReturnCode::Dropped(amt)
409 | ReturnCode::Cancelled(amt)) => {
410 let amt = amt.try_into().unwrap();
411 buf.advance(amt);
412 if let ReturnCode::Dropped(_) = code {
413 self.writer.done = true;
414 }
415 Ok((StreamResult::Complete(amt), buf))
416 }
417 }
418 }
419
420 fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
421 self.writer.handle
422 }
423
424 fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
425 // SAFETY: we're managing `writer` and all the various operational bits,
426 // so this relies on `WaitableOperation` being safe.
427 let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
428 rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
429 code
430 }
431
432 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
433 result
434 }
435}
436
437impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
438 type Output = (StreamResult, AbiBuffer<O>);
439
440 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441 self.pin_project().poll_complete(cx)
442 }
443}
444
445impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
446 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
447 // SAFETY: we've chosen that when `Self` is pinned that it translates to
448 // always pinning the inner field, so that's codified here.
449 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
450 }
451
452 /// Cancel this write if it hasn't already completed.
453 ///
454 /// This method can be used to cancel a write-in-progress and re-acquire
455 /// values being sent. Note that the result here may still indicate that
456 /// some values were written if the race to cancel the write was lost.
457 ///
458 /// # Panics
459 ///
460 /// Panics if the operation has already been completed via `Future::poll`,
461 /// or if this method is called twice.
462 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
463 self.pin_project().cancel()
464 }
465}
466
467/// Represents the readable end of a Component Model `stream`.
468pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
469
470/// Represents the readable end of a Component Model `stream`.
471pub struct RawStreamReader<O: StreamOps> {
472 handle: AtomicU32,
473 ops: O,
474 done: bool,
475}
476
477impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
478 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
479 f.debug_struct("StreamReader")
480 .field("handle", &self.handle)
481 .finish()
482 }
483}
484
485impl<O: StreamOps> RawStreamReader<O> {
486 #[doc(hidden)]
487 pub fn new(handle: u32, ops: O) -> Self {
488 Self {
489 handle: AtomicU32::new(handle),
490 ops,
491 done: false,
492 }
493 }
494
495 #[doc(hidden)]
496 pub fn take_handle(&self) -> u32 {
497 let ret = self.opt_handle().unwrap();
498 self.handle.store(u32::MAX, Relaxed);
499 ret
500 }
501
502 /// Returns the index of the component-model handle that this stream is
503 /// using.
504 pub fn handle(&self) -> u32 {
505 self.opt_handle().unwrap()
506 }
507
508 fn opt_handle(&self) -> Option<u32> {
509 match self.handle.load(Relaxed) {
510 u32::MAX => None,
511 other => Some(other),
512 }
513 }
514
515 /// Starts a new read operation on this stream into `buf`.
516 ///
517 /// This method will read values into the spare capacity of the `buf`
518 /// provided. If `buf` has no spare capacity then this will be equivalent
519 /// to a zero-length read.
520 ///
521 /// Upon completion the `buf` will be yielded back to the caller via the
522 /// completion of the [`StreamRead`] future.
523 ///
524 /// # Cancellation
525 ///
526 /// Cancelling the returned future can be done with `drop` like all Rust
527 /// futures, but it does not mean that no values were read. To accurately
528 /// determine if values were read the [`StreamRead::cancel`] method must be
529 /// used.
530 pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
531 RawStreamRead {
532 op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
533 }
534 }
535
536 /// Reads a single item from this stream.
537 ///
538 /// This is a higher-level method than [`StreamReader::read`] in that it
539 /// reads only a single item and does not expose control over cancellation.
540 pub async fn next(&mut self) -> Option<O::Payload> {
541 // TODO: should amortize this allocation and avoid doing it every time.
542 // Or somehow perhaps make this more optimal.
543 let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
544 buf.pop()
545 }
546
547 /// Reads all items from this stream and returns the list.
548 ///
549 /// This method will read all remaining items from this stream into a list
550 /// and await the stream to be dropped.
551 pub async fn collect(mut self) -> Vec<O::Payload> {
552 let mut ret = Vec::new();
553 loop {
554 // If there's no more spare capacity then reserve room for one item
555 // which should trigger `Vec`'s built-in resizing logic, which will
556 // free up likely more capacity than just one slot.
557 if ret.len() == ret.capacity() {
558 ret.reserve(1);
559 }
560 let (status, buf) = self.read(ret).await;
561 ret = buf;
562 match status {
563 StreamResult::Complete(_) => {}
564 StreamResult::Dropped => break,
565 StreamResult::Cancelled => unreachable!(),
566 }
567 }
568 ret
569 }
570}
571
572impl<O: StreamOps> Drop for RawStreamReader<O> {
573 fn drop(&mut self) {
574 let Some(handle) = self.opt_handle() else {
575 return;
576 };
577 unsafe {
578 rtdebug!("stream.drop-readable({})", handle);
579 self.ops.drop_readable(handle);
580 }
581 }
582}
583
584/// Represents a read operation which may be cancelled prior to completion.
585pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
586
587/// Represents a read operation which may be cancelled prior to completion.
588pub struct RawStreamRead<'a, O: StreamOps> {
589 op: WaitableOperation<StreamReadOp<'a, O>>,
590}
591
592struct StreamReadOp<'a, O: StreamOps> {
593 reader: &'a mut RawStreamReader<O>,
594}
595
596unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
597 type Start = Vec<O::Payload>;
598 type InProgress = (Vec<O::Payload>, Option<Cleanup>);
599 type Result = (StreamResult, Vec<O::Payload>);
600 type Cancel = (StreamResult, Vec<O::Payload>);
601
602 fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
603 if self.reader.done {
604 return (DROPPED, (buf, None));
605 }
606
607 let cap = buf.spare_capacity_mut();
608 let ptr;
609 let cleanup;
610 // If `T` requires a lifting operation, then allocate a slab of memory
611 // which will store the canonical ABI read. Otherwise we can use the
612 // raw capacity in `buf` itself.
613 if self.reader.ops.native_abi_matches_canonical_abi() {
614 ptr = cap.as_mut_ptr().cast();
615 cleanup = None;
616 } else {
617 let elem_layout = self.reader.ops.elem_layout();
618 let layout =
619 Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
620 .unwrap();
621 (ptr, cleanup) = Cleanup::new(layout);
622 }
623 // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
624 // persist with this async operation itself.
625 let code = unsafe {
626 self.reader
627 .ops
628 .start_read(self.reader.handle(), ptr, cap.len().min(MAX_LENGTH))
629 };
630 rtdebug!(
631 "stream.read({}, {ptr:?}, {}) = {code:#x}",
632 self.reader.handle(),
633 cap.len()
634 );
635 (code, (buf, cleanup))
636 }
637
638 fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
639 (StreamResult::Cancelled, buf)
640 }
641
642 fn in_progress_update(
643 &mut self,
644 (mut buf, cleanup): Self::InProgress,
645 code: u32,
646 ) -> Result<Self::Result, Self::InProgress> {
647 match ReturnCode::decode(code) {
648 ReturnCode::Blocked => Err((buf, cleanup)),
649
650 // Note that the `cleanup`, if any, is discarded here.
651 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
652
653 // When an in-progress read is successfully cancelled then the
654 // allocation that was being read into, if any, is just discarded.
655 //
656 // TODO: should maybe thread this around like `AbiBuffer` to cache
657 // the read allocation?
658 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
659
660 code @ (ReturnCode::Completed(amt)
661 | ReturnCode::Dropped(amt)
662 | ReturnCode::Cancelled(amt)) => {
663 let amt = usize::try_from(amt).unwrap();
664 let cur_len = buf.len();
665 assert!(amt <= buf.capacity() - cur_len);
666
667 if self.reader.ops.native_abi_matches_canonical_abi() {
668 // If no `lift` was necessary, then the results of this operation
669 // were read directly into `buf`, so just update its length now that
670 // values have been initialized.
671 unsafe {
672 buf.set_len(cur_len + amt);
673 }
674 } else {
675 // With a `lift` operation this now requires reading `amt` items
676 // from `cleanup` and pushing them into `buf`.
677 let mut ptr = cleanup
678 .as_ref()
679 .map(|c| c.ptr.as_ptr())
680 .unwrap_or(ptr::null_mut());
681 for _ in 0..amt {
682 unsafe {
683 buf.push(self.reader.ops.lift(ptr));
684 ptr = ptr.add(self.reader.ops.elem_layout().size());
685 }
686 }
687 }
688
689 // Intentionally dispose of `cleanup` here as, if it was used, all
690 // allocations have been read from it and appended to `buf`.
691 drop(cleanup);
692 if let ReturnCode::Dropped(_) = code {
693 self.reader.done = true;
694 }
695 Ok((StreamResult::Complete(amt), buf))
696 }
697 }
698 }
699
700 fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
701 self.reader.handle()
702 }
703
704 fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
705 // SAFETY: we're managing `reader` and all the various operational bits,
706 // so this relies on `WaitableOperation` being safe.
707 let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
708 rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
709 code
710 }
711
712 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
713 result
714 }
715}
716
717impl<O: StreamOps> Future for RawStreamRead<'_, O> {
718 type Output = (StreamResult, Vec<O::Payload>);
719
720 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
721 self.pin_project().poll_complete(cx)
722 }
723}
724
725impl<'a, O> RawStreamRead<'a, O>
726where
727 O: StreamOps,
728{
729 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
730 // SAFETY: we've chosen that when `Self` is pinned that it translates to
731 // always pinning the inner field, so that's codified here.
732 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
733 }
734
735 /// Cancel this read if it hasn't already completed.
736 ///
737 /// This method will initiate a cancellation operation for this active
738 /// read. This may race with the actual read itself and so this may actually
739 /// complete with some results.
740 ///
741 /// The final result of cancellation is returned, along with the original
742 /// buffer.
743 ///
744 /// # Panics
745 ///
746 /// Panics if the operation has already been completed via `Future::poll`,
747 /// or if this method is called twice.
748 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
749 self.pin_project().cancel()
750 }
751}