wit_bindgen/rt/async_support/stream_support.rs
1//! For a high-level overview of how this module is implemented see the
2//! module documentation in `future_support.rs`.
3
4use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
5use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
6use {
7 crate::rt::Cleanup,
8 std::{
9 alloc::Layout,
10 fmt,
11 future::Future,
12 marker,
13 pin::Pin,
14 ptr,
15 sync::atomic::{AtomicU32, Ordering::Relaxed},
16 task::{Context, Poll},
17 vec::Vec,
18 },
19};
20
21/// Operations that a stream requires throughout the implementation.
22///
23/// This is generated by `wit_bindgen::generate!` primarily.
24#[doc(hidden)]
25pub struct StreamVtable<T> {
26 /// The in-memory canonical ABI layout of a single value of `T`.
27 pub layout: Layout,
28
29 /// An optional callback where if provided will lower an owned `T` value
30 /// into the `dst` pointer.
31 ///
32 /// If this is called the ownership of all of `T`'s lists and resources are
33 /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
34 /// the canonical ABI layout.
35 ///
36 /// If this is `None` then it means that `T` has the same layout in-memory
37 /// in Rust as it does in the canonical ABI. In such a situation the
38 /// lower/lift operation can be dropped.
39 pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
40
41 /// Callback used to deallocate any owned lists in `dst` after a value has
42 /// been successfully sent along a stream.
43 ///
44 /// `None` means that `T` has no lists internally.
45 pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
46
47 /// Dual of `lower`, and like `lower` if this is missing then it means that
48 /// `T` has the same in-memory representation in Rust and the canonical ABI.
49 pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
50
51 /// The raw `stream.write` intrinsic.
52 pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
53 /// The raw `stream.read` intrinsic.
54 pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
55 /// The raw `stream.cancel-write` intrinsic.
56 pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
57 /// The raw `stream.cancel-read` intrinsic.
58 pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
59 /// The raw `stream.drop-writable` intrinsic.
60 pub drop_writable: unsafe extern "C" fn(stream: u32),
61 /// The raw `stream.drop-readable` intrinsic.
62 pub drop_readable: unsafe extern "C" fn(stream: u32),
63 /// The raw `stream.new` intrinsic.
64 pub new: unsafe extern "C" fn() -> u64,
65}
66
67/// Helper function to create a new read/write pair for a component model
68/// stream.
69pub unsafe fn stream_new<T>(
70 vtable: &'static StreamVtable<T>,
71) -> (StreamWriter<T>, StreamReader<T>) {
72 unsafe {
73 let handles = (vtable.new)();
74 let reader = handles as u32;
75 let writer = (handles >> 32) as u32;
76 rtdebug!("stream.new() = [{writer}, {reader}]");
77 (
78 StreamWriter::new(writer, vtable),
79 StreamReader::new(reader, vtable),
80 )
81 }
82}
83
84/// Represents the writable end of a Component Model `stream`.
85pub struct StreamWriter<T: 'static> {
86 handle: u32,
87 vtable: &'static StreamVtable<T>,
88 done: bool,
89}
90
91impl<T> StreamWriter<T> {
92 #[doc(hidden)]
93 pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
94 Self {
95 handle,
96 vtable,
97 done: false,
98 }
99 }
100
101 /// Initiate a write of the `values` provided into this stream.
102 ///
103 /// This method is akin to an `async fn` except that the returned
104 /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
105 /// to re-acquire undelivered values.
106 ///
107 /// This method will perform at most a single write of the `values`
108 /// provided. The returned future will resolve once the write has completed.
109 ///
110 /// # Return Values
111 ///
112 /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
113 /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
114 /// were sent from `values` into this writer. A result of
115 /// `StreamResult::Dropped` means that no values were sent and the other side
116 /// has hung-up and sending values will no longer be possible.
117 ///
118 /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
119 /// original `values` provided here. That can be used to re-acquire `values`
120 /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
121 /// internal cursor of how many values have been written and if the write
122 /// should be resumed to write the entire buffer then the
123 /// [`StreamWriter::write_buf`] method can be used to resume writing at the
124 /// next value in the buffer.
125 ///
126 /// # Cancellation
127 ///
128 /// The returned [`StreamWrite`] future can be cancelled like any other Rust
129 /// future via `drop`, but this means that `values` will be lost within the
130 /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
131 /// in-progress write that is being done with `values`. This is effectively
132 /// a way of forcing the future to immediately resolve.
133 ///
134 /// Note that if this future is cancelled via `drop` it does not mean that
135 /// no values were sent. It may be possible that values were still sent
136 /// despite being cancelled. Cancelling a write and determining what
137 /// happened must be done with [`StreamWrite::cancel`].
138 pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
139 self.write_buf(AbiBuffer::new(values, self.vtable))
140 }
141
142 /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
143 /// instead of `Vec<T>`.
144 pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
145 StreamWrite {
146 op: WaitableOperation::new(StreamWriteOp(marker::PhantomData), (self, values)),
147 }
148 }
149
150 /// Writes all of the `values` provided into this stream.
151 ///
152 /// This is a higher-level method than [`StreamWriter::write`] and does not
153 /// expose cancellation for example. This will successively attempt to write
154 /// all of `values` provided into this stream. Upon completion the same
155 /// vector will be returned and any remaining elements in the vector were
156 /// not sent because the stream was dropped.
157 pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
158 // Perform an initial write which converts `values` into `AbiBuffer`.
159 let (mut status, mut buf) = self.write(values).await;
160
161 // While the previous write completed and there's still remaining items
162 // in the buffer, perform another write.
163 while let StreamResult::Complete(_) = status {
164 if buf.remaining() == 0 {
165 break;
166 }
167 (status, buf) = self.write_buf(buf).await;
168
169 // FIXME(WebAssembly/component-model#490)
170 if status == StreamResult::Cancelled {
171 status = StreamResult::Complete(0);
172 }
173 }
174
175 // Return back any values that weren't written by shifting them to the
176 // front of the returned vector.
177 assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
178 buf.into_vec()
179 }
180
181 /// Writes the singular `value` provided
182 ///
183 /// This is a higher-level method than [`StreamWriter::write`] and does not
184 /// expose cancellation for example. This will attempt to send `value` on
185 /// this stream.
186 ///
187 /// If the other end hangs up then the value is returned back as
188 /// `Some(value)`, otherwise `None` is returned indicating the value was
189 /// sent.
190 pub async fn write_one(&mut self, value: T) -> Option<T> {
191 // TODO: can probably be a bit more efficient about this and avoid
192 // moving `value` onto the heap in some situations, but that's left as
193 // an optimization for later.
194 self.write_all(std::vec![value]).await.pop()
195 }
196}
197
198impl<T> fmt::Debug for StreamWriter<T> {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 f.debug_struct("StreamWriter")
201 .field("handle", &self.handle)
202 .finish()
203 }
204}
205
206impl<T> Drop for StreamWriter<T> {
207 fn drop(&mut self) {
208 rtdebug!("stream.drop-writable({})", self.handle);
209 unsafe {
210 (self.vtable.drop_writable)(self.handle);
211 }
212 }
213}
214
215/// Represents a write operation which may be cancelled prior to completion.
216pub struct StreamWrite<'a, T: 'static> {
217 op: WaitableOperation<StreamWriteOp<'a, T>>,
218}
219
220struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);
221
222/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
223/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
224#[derive(Copy, Clone, PartialEq, Eq, Debug)]
225pub enum StreamResult {
226 /// The provided number of values were successfully transferred.
227 ///
228 /// For writes this is how many items were written, and for reads this is
229 /// how many items were read.
230 Complete(usize),
231 /// No values were written, the other end has dropped its handle.
232 Dropped,
233 /// No values were written, the operation was cancelled.
234 Cancelled,
235}
236
237unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
238where
239 T: 'static,
240{
241 type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
242 type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
243 type Result = (StreamResult, AbiBuffer<T>);
244 type Cancel = (StreamResult, AbiBuffer<T>);
245
246 fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) {
247 if writer.done {
248 return (DROPPED, (writer, buf));
249 }
250
251 let (ptr, len) = buf.abi_ptr_and_len();
252 // SAFETY: sure hope this is safe, everything in this module and
253 // `AbiBuffer` is trying to make this safe.
254 let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
255 rtdebug!(
256 "stream.write({}, {ptr:?}, {len}) = {code:#x}",
257 writer.handle
258 );
259 (code, (writer, buf))
260 }
261
262 fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel {
263 (StreamResult::Cancelled, buf)
264 }
265
266 fn in_progress_update(
267 &mut self,
268 (writer, mut buf): Self::InProgress,
269 code: u32,
270 ) -> Result<Self::Result, Self::InProgress> {
271 match ReturnCode::decode(code) {
272 ReturnCode::Blocked => Err((writer, buf)),
273 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
274 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
275 code @ (ReturnCode::Completed(amt)
276 | ReturnCode::Dropped(amt)
277 | ReturnCode::Cancelled(amt)) => {
278 let amt = amt.try_into().unwrap();
279 buf.advance(amt);
280 if let ReturnCode::Dropped(_) = code {
281 writer.done = true;
282 }
283 Ok((StreamResult::Complete(amt), buf))
284 }
285 }
286 }
287
288 fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 {
289 writer.handle
290 }
291
292 fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 {
293 // SAFETY: we're managing `writer` and all the various operational bits,
294 // so this relies on `WaitableOperation` being safe.
295 let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
296 rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
297 code
298 }
299
300 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
301 result
302 }
303}
304
305impl<T: 'static> Future for StreamWrite<'_, T> {
306 type Output = (StreamResult, AbiBuffer<T>);
307
308 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
309 self.pin_project().poll_complete(cx)
310 }
311}
312
313impl<'a, T: 'static> StreamWrite<'a, T> {
314 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
315 // SAFETY: we've chosen that when `Self` is pinned that it translates to
316 // always pinning the inner field, so that's codified here.
317 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
318 }
319
320 /// Cancel this write if it hasn't already completed.
321 ///
322 /// This method can be used to cancel a write-in-progress and re-acquire
323 /// values being sent. Note that the result here may still indicate that
324 /// some values were written if the race to cancel the write was lost.
325 ///
326 /// # Panics
327 ///
328 /// Panics if the operation has already been completed via `Future::poll`,
329 /// or if this method is called twice.
330 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
331 self.pin_project().cancel()
332 }
333}
334
335/// Represents the readable end of a Component Model `stream`.
336pub struct StreamReader<T: 'static> {
337 handle: AtomicU32,
338 vtable: &'static StreamVtable<T>,
339 done: bool,
340}
341
342impl<T> fmt::Debug for StreamReader<T> {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 f.debug_struct("StreamReader")
345 .field("handle", &self.handle)
346 .finish()
347 }
348}
349
350impl<T> StreamReader<T> {
351 #[doc(hidden)]
352 pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
353 Self {
354 handle: AtomicU32::new(handle),
355 vtable,
356 done: false,
357 }
358 }
359
360 #[doc(hidden)]
361 pub fn take_handle(&self) -> u32 {
362 let ret = self.opt_handle().unwrap();
363 self.handle.store(u32::MAX, Relaxed);
364 ret
365 }
366
367 fn handle(&self) -> u32 {
368 self.opt_handle().unwrap()
369 }
370
371 fn opt_handle(&self) -> Option<u32> {
372 match self.handle.load(Relaxed) {
373 u32::MAX => None,
374 other => Some(other),
375 }
376 }
377
378 /// Starts a new read operation on this stream into `buf`.
379 ///
380 /// This method will read values into the spare capacity of the `buf`
381 /// provided. If `buf` has no spare capacity then this will be equivalent
382 /// to a zero-length read.
383 ///
384 /// Upon completion the `buf` will be yielded back to the caller via the
385 /// completion of the [`StreamRead`] future.
386 ///
387 /// # Cancellation
388 ///
389 /// Cancelling the returned future can be done with `drop` like all Rust
390 /// futures, but it does not mean that no values were read. To accurately
391 /// determine if values were read the [`StreamRead::cancel`] method must be
392 /// used.
393 pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
394 StreamRead {
395 op: WaitableOperation::new(StreamReadOp(marker::PhantomData), (self, buf)),
396 }
397 }
398
399 /// Reads a single item from this stream.
400 ///
401 /// This is a higher-level method than [`StreamReader::read`] in that it
402 /// reads only a single item and does not expose control over cancellation.
403 pub async fn next(&mut self) -> Option<T> {
404 // TODO: should amortize this allocation and avoid doing it every time.
405 // Or somehow perhaps make this more optimal.
406 let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
407 buf.pop()
408 }
409
410 /// Reads all items from this stream and returns the list.
411 ///
412 /// This method will read all remaining items from this stream into a list
413 /// and await the stream to be dropped.
414 pub async fn collect(mut self) -> Vec<T> {
415 let mut ret = Vec::new();
416 loop {
417 // If there's no more spare capacity then reserve room for one item
418 // which should trigger `Vec`'s built-in resizing logic, which will
419 // free up likely more capacity than just one slot.
420 if ret.len() == ret.capacity() {
421 ret.reserve(1);
422 }
423 let (status, buf) = self.read(ret).await;
424 ret = buf;
425 match status {
426 StreamResult::Complete(_) => {}
427 StreamResult::Dropped => break,
428 StreamResult::Cancelled => unreachable!(),
429 }
430 }
431 ret
432 }
433}
434
435impl<T> Drop for StreamReader<T> {
436 fn drop(&mut self) {
437 let Some(handle) = self.opt_handle() else {
438 return;
439 };
440 unsafe {
441 rtdebug!("stream.drop-readable({})", handle);
442 (self.vtable.drop_readable)(handle);
443 }
444 }
445}
446
447/// Represents a read operation which may be cancelled prior to completion.
448pub struct StreamRead<'a, T: 'static> {
449 op: WaitableOperation<StreamReadOp<'a, T>>,
450}
451
452struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);
453
454unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
455where
456 T: 'static,
457{
458 type Start = (&'a mut StreamReader<T>, Vec<T>);
459 type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
460 type Result = (StreamResult, Vec<T>);
461 type Cancel = (StreamResult, Vec<T>);
462
463 fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
464 if reader.done {
465 return (DROPPED, (reader, buf, None));
466 }
467
468 let cap = buf.spare_capacity_mut();
469 let ptr;
470 let cleanup;
471 // If `T` requires a lifting operation, then allocate a slab of memory
472 // which will store the canonical ABI read. Otherwise we can use the
473 // raw capacity in `buf` itself.
474 if reader.vtable.lift.is_some() {
475 let layout = Layout::from_size_align(
476 reader.vtable.layout.size() * cap.len(),
477 reader.vtable.layout.align(),
478 )
479 .unwrap();
480 (ptr, cleanup) = Cleanup::new(layout);
481 } else {
482 ptr = cap.as_mut_ptr().cast();
483 cleanup = None;
484 }
485 // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
486 // persist with this async operation itself.
487 let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
488 rtdebug!(
489 "stream.read({}, {ptr:?}, {}) = {code:#x}",
490 reader.handle(),
491 cap.len()
492 );
493 (code, (reader, buf, cleanup))
494 }
495
496 fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel {
497 (StreamResult::Cancelled, buf)
498 }
499
500 fn in_progress_update(
501 &mut self,
502 (reader, mut buf, cleanup): Self::InProgress,
503 code: u32,
504 ) -> Result<Self::Result, Self::InProgress> {
505 match ReturnCode::decode(code) {
506 ReturnCode::Blocked => Err((reader, buf, cleanup)),
507
508 // Note that the `cleanup`, if any, is discarded here.
509 ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
510
511 // When an in-progress read is successfully cancelled then the
512 // allocation that was being read into, if any, is just discarded.
513 //
514 // TODO: should maybe thread this around like `AbiBuffer` to cache
515 // the read allocation?
516 ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
517
518 code @ (ReturnCode::Completed(amt)
519 | ReturnCode::Dropped(amt)
520 | ReturnCode::Cancelled(amt)) => {
521 let amt = usize::try_from(amt).unwrap();
522 let cur_len = buf.len();
523 assert!(amt <= buf.capacity() - cur_len);
524
525 match reader.vtable.lift {
526 // With a `lift` operation this now requires reading `amt` items
527 // from `cleanup` and pushing them into `buf`.
528 Some(lift) => {
529 let mut ptr = cleanup
530 .as_ref()
531 .map(|c| c.ptr.as_ptr())
532 .unwrap_or(ptr::null_mut());
533 for _ in 0..amt {
534 unsafe {
535 buf.push(lift(ptr));
536 ptr = ptr.add(reader.vtable.layout.size());
537 }
538 }
539 }
540
541 // If no `lift` was necessary, then the results of this operation
542 // were read directly into `buf`, so just update its length now that
543 // values have been initialized.
544 None => unsafe { buf.set_len(cur_len + amt) },
545 }
546
547 // Intentionally dispose of `cleanup` here as, if it was used, all
548 // allocations have been read from it and appended to `buf`.
549 drop(cleanup);
550 if let ReturnCode::Dropped(_) = code {
551 reader.done = true;
552 }
553 Ok((StreamResult::Complete(amt), buf))
554 }
555 }
556 }
557
558 fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 {
559 reader.handle()
560 }
561
562 fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 {
563 // SAFETY: we're managing `reader` and all the various operational bits,
564 // so this relies on `WaitableOperation` being safe.
565 let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
566 rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
567 code
568 }
569
570 fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
571 result
572 }
573}
574
575impl<T: 'static> Future for StreamRead<'_, T> {
576 type Output = (StreamResult, Vec<T>);
577
578 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579 self.pin_project().poll_complete(cx)
580 }
581}
582
583impl<'a, T> StreamRead<'a, T> {
584 fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
585 // SAFETY: we've chosen that when `Self` is pinned that it translates to
586 // always pinning the inner field, so that's codified here.
587 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
588 }
589
590 /// Cancel this read if it hasn't already completed.
591 ///
592 /// This method will initiate a cancellation operation for this active
593 /// read. This may race with the actual read itself and so this may actually
594 /// complete with some results.
595 ///
596 /// The final result of cancellation is returned, along with the original
597 /// buffer.
598 ///
599 /// # Panics
600 ///
601 /// Panics if the operation has already been completed via `Future::poll`,
602 /// or if this method is called twice.
603 pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
604 self.pin_project().cancel()
605 }
606}