1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
//! For a high-level overview of how this module is implemented see the
//! module documentation in `future_support.rs`.
use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
use crate::rt::async_support::{AbiBuffer, DROPPED, ReturnCode};
use {
crate::rt::Cleanup,
std::{
alloc::Layout,
fmt,
future::Future,
pin::Pin,
ptr,
sync::atomic::{AtomicU32, Ordering::Relaxed},
task::{Context, Poll},
vec::Vec,
},
};
/// Operations that a stream requires throughout the implementation.
///
/// This is generated by `wit_bindgen::generate!` primarily.
#[doc(hidden)]
pub unsafe trait StreamOps: Clone {
/// The Rust type that's sent or received on this stream.
type Payload: 'static;
/// The `stream.new` intrinsic.
fn new(&mut self) -> u64;
/// The canonical ABI layout of the type that this stream is
/// sending/receiving.
fn elem_layout(&self) -> Layout;
/// Returns whether `lift` or `lower` is required to create `Self::Payload`.
///
/// If this returns `false` then `Self::Payload` is natively in its
/// canonical ABI representation.
fn native_abi_matches_canonical_abi(&self) -> bool;
/// Returns whether `O::Payload` has lists that need to be deallocated with
/// `dealloc_lists`.
fn contains_lists(&self) -> bool;
/// Converts a Rust type to its canonical ABI representation.
unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
/// Used to deallocate any Rust-owned lists in the canonical ABI
/// representation for when a value is successfully sent but needs to be
/// cleaned up.
unsafe fn dealloc_lists(&mut self, dst: *mut u8);
/// Converts from the canonical ABI representation to a Rust value.
unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
/// The `stream.write` intrinsic
unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
/// The `stream.read` intrinsic
unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
/// The `stream.cancel-read` intrinsic
unsafe fn cancel_read(&mut self, stream: u32) -> u32;
/// The `stream.cancel-write` intrinsic
unsafe fn cancel_write(&mut self, stream: u32) -> u32;
/// The `stream.drop-readable` intrinsic
unsafe fn drop_readable(&mut self, stream: u32);
/// The `stream.drop-writable` intrinsic
unsafe fn drop_writable(&mut self, stream: u32);
}
/// Operations that a stream requires throughout the implementation.
///
/// This is generated by `wit_bindgen::generate!` primarily.
#[doc(hidden)]
pub struct StreamVtable<T> {
/// The in-memory canonical ABI layout of a single value of `T`.
pub layout: Layout,
/// An optional callback where if provided will lower an owned `T` value
/// into the `dst` pointer.
///
/// If this is called the ownership of all of `T`'s lists and resources are
/// passed to `dst`, possibly by reallocating if `T`'s layout differs from
/// the canonical ABI layout.
///
/// If this is `None` then it means that `T` has the same layout in-memory
/// in Rust as it does in the canonical ABI. In such a situation the
/// lower/lift operation can be dropped.
pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,
/// Callback used to deallocate any owned lists in `dst` after a value has
/// been successfully sent along a stream.
///
/// `None` means that `T` has no lists internally.
pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,
/// Dual of `lower`, and like `lower` if this is missing then it means that
/// `T` has the same in-memory representation in Rust and the canonical ABI.
pub lift: Option<unsafe fn(dst: *mut u8) -> T>,
/// The raw `stream.write` intrinsic.
pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
/// The raw `stream.read` intrinsic.
pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
/// The raw `stream.cancel-write` intrinsic.
pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
/// The raw `stream.cancel-read` intrinsic.
pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
/// The raw `stream.drop-writable` intrinsic.
pub drop_writable: unsafe extern "C" fn(stream: u32),
/// The raw `stream.drop-readable` intrinsic.
pub drop_readable: unsafe extern "C" fn(stream: u32),
/// The raw `stream.new` intrinsic.
pub new: unsafe extern "C" fn() -> u64,
}
unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
type Payload = T;
fn new(&mut self) -> u64 {
unsafe { (self.new)() }
}
fn elem_layout(&self) -> Layout {
self.layout
}
fn native_abi_matches_canonical_abi(&self) -> bool {
self.lift.is_none()
}
fn contains_lists(&self) -> bool {
self.dealloc_lists.is_some()
}
unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
if let Some(f) = self.lower {
unsafe { f(payload, dst) }
}
}
unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
if let Some(f) = self.dealloc_lists {
unsafe { f(dst) }
}
}
unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
unsafe { (self.lift.unwrap())(dst) }
}
unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
unsafe { (self.start_write)(stream, val, amt) }
}
unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
unsafe { (self.start_read)(stream, val, amt) }
}
unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
unsafe { (self.cancel_read)(stream) }
}
unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
unsafe { (self.cancel_write)(stream) }
}
unsafe fn drop_readable(&mut self, stream: u32) {
unsafe { (self.drop_readable)(stream) }
}
unsafe fn drop_writable(&mut self, stream: u32) {
unsafe { (self.drop_writable)(stream) }
}
}
/// Helper function to create a new read/write pair for a component model
/// stream.
pub unsafe fn stream_new<T>(
vtable: &'static StreamVtable<T>,
) -> (StreamWriter<T>, StreamReader<T>) {
unsafe { raw_stream_new(vtable) }
}
/// Helper function to create a new read/write pair for a component model
/// stream.
pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
where
O: StreamOps + Clone,
{
unsafe {
let handles = ops.new();
let reader = handles as u32;
let writer = (handles >> 32) as u32;
rtdebug!("stream.new() = [{writer}, {reader}]");
(
RawStreamWriter::new(writer, ops.clone()),
RawStreamReader::new(reader, ops),
)
}
}
/// Represents the writable end of a Component Model `stream`.
pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
/// Represents the writable end of a Component Model `stream`.
pub struct RawStreamWriter<O: StreamOps> {
handle: u32,
ops: O,
done: bool,
}
impl<O> RawStreamWriter<O>
where
O: StreamOps,
{
#[doc(hidden)]
pub unsafe fn new(handle: u32, ops: O) -> Self {
Self {
handle,
ops,
done: false,
}
}
/// Returns the index of the component-model handle that this stream is
/// using.
pub fn handle(&self) -> u32 {
self.handle
}
/// Initiate a write of the `values` provided into this stream.
///
/// This method is akin to an `async fn` except that the returned
/// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
/// to re-acquire undelivered values.
///
/// This method will perform at most a single write of the `values`
/// provided. The returned future will resolve once the write has completed.
///
/// # Return Values
///
/// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
/// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
/// were sent from `values` into this writer. A result of
/// `StreamResult::Dropped` means that no values were sent and the other side
/// has hung-up and sending values will no longer be possible.
///
/// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
/// original `values` provided here. That can be used to re-acquire `values`
/// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
/// internal cursor of how many values have been written and if the write
/// should be resumed to write the entire buffer then the
/// [`StreamWriter::write_buf`] method can be used to resume writing at the
/// next value in the buffer.
///
/// # Cancellation
///
/// The returned [`StreamWrite`] future can be cancelled like any other Rust
/// future via `drop`, but this means that `values` will be lost within the
/// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
/// in-progress write that is being done with `values`. This is effectively
/// a way of forcing the future to immediately resolve.
///
/// Note that if this future is cancelled via `drop` it does not mean that
/// no values were sent. It may be possible that values were still sent
/// despite being cancelled. Cancelling a write and determining what
/// happened must be done with [`StreamWrite::cancel`].
pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
self.write_buf(AbiBuffer::new(values, self.ops.clone()))
}
/// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
/// instead of `Vec<T>`.
pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
RawStreamWrite {
op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
}
}
/// Writes all of the `values` provided into this stream.
///
/// This is a higher-level method than [`StreamWriter::write`] and does not
/// expose cancellation for example. This will successively attempt to write
/// all of `values` provided into this stream. Upon completion the same
/// vector will be returned and any remaining elements in the vector were
/// not sent because the stream was dropped.
pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
// Perform an initial write which converts `values` into `AbiBuffer`.
let (mut status, mut buf) = self.write(values).await;
// While the previous write completed and there's still remaining items
// in the buffer, perform another write.
while let StreamResult::Complete(_) = status {
if buf.remaining() == 0 {
break;
}
(status, buf) = self.write_buf(buf).await;
// FIXME(WebAssembly/component-model#490)
if status == StreamResult::Cancelled {
status = StreamResult::Complete(0);
}
}
// Return back any values that weren't written by shifting them to the
// front of the returned vector.
assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
buf.into_vec()
}
/// Writes the singular `value` provided
///
/// This is a higher-level method than [`StreamWriter::write`] and does not
/// expose cancellation for example. This will attempt to send `value` on
/// this stream.
///
/// If the other end hangs up then the value is returned back as
/// `Some(value)`, otherwise `None` is returned indicating the value was
/// sent.
pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
// TODO: can probably be a bit more efficient about this and avoid
// moving `value` onto the heap in some situations, but that's left as
// an optimization for later.
self.write_all(std::vec![value]).await.pop()
}
}
impl<O> fmt::Debug for RawStreamWriter<O>
where
O: StreamOps,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamWriter")
.field("handle", &self.handle)
.finish()
}
}
impl<O> Drop for RawStreamWriter<O>
where
O: StreamOps,
{
fn drop(&mut self) {
rtdebug!("stream.drop-writable({})", self.handle);
unsafe {
self.ops.drop_writable(self.handle);
}
}
}
/// Represents a write operation which may be cancelled prior to completion.
pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
/// Represents a write operation which may be cancelled prior to completion.
pub struct RawStreamWrite<'a, O: StreamOps> {
op: WaitableOperation<StreamWriteOp<'a, O>>,
}
struct StreamWriteOp<'a, O: StreamOps> {
writer: &'a mut RawStreamWriter<O>,
}
/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum StreamResult {
/// The provided number of values were successfully transferred.
///
/// For writes this is how many items were written, and for reads this is
/// how many items were read.
Complete(usize),
/// No values were written, the other end has dropped its handle.
Dropped,
/// No values were written, the operation was cancelled.
Cancelled,
}
unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
where
O: StreamOps,
{
type Start = AbiBuffer<O>;
type InProgress = AbiBuffer<O>;
type Result = (StreamResult, AbiBuffer<O>);
type Cancel = (StreamResult, AbiBuffer<O>);
fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
if self.writer.done {
return (DROPPED, buf);
}
let (ptr, len) = buf.abi_ptr_and_len();
// SAFETY: sure hope this is safe, everything in this module and
// `AbiBuffer` is trying to make this safe.
let code = unsafe { self.writer.ops.start_write(self.writer.handle, ptr, len) };
rtdebug!(
"stream.write({}, {ptr:?}, {len}) = {code:#x}",
self.writer.handle
);
(code, buf)
}
fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
(StreamResult::Cancelled, buf)
}
fn in_progress_update(
&mut self,
mut buf: Self::InProgress,
code: u32,
) -> Result<Self::Result, Self::InProgress> {
match ReturnCode::decode(code) {
ReturnCode::Blocked => Err(buf),
ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
code @ (ReturnCode::Completed(amt)
| ReturnCode::Dropped(amt)
| ReturnCode::Cancelled(amt)) => {
let amt = amt.try_into().unwrap();
buf.advance(amt);
if let ReturnCode::Dropped(_) = code {
self.writer.done = true;
}
Ok((StreamResult::Complete(amt), buf))
}
}
}
fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
self.writer.handle
}
fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
// SAFETY: we're managing `writer` and all the various operational bits,
// so this relies on `WaitableOperation` being safe.
let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
code
}
fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
result
}
}
impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
type Output = (StreamResult, AbiBuffer<O>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pin_project().poll_complete(cx)
}
}
impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
// SAFETY: we've chosen that when `Self` is pinned that it translates to
// always pinning the inner field, so that's codified here.
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
}
/// Cancel this write if it hasn't already completed.
///
/// This method can be used to cancel a write-in-progress and re-acquire
/// values being sent. Note that the result here may still indicate that
/// some values were written if the race to cancel the write was lost.
///
/// # Panics
///
/// Panics if the operation has already been completed via `Future::poll`,
/// or if this method is called twice.
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
self.pin_project().cancel()
}
}
/// Represents the readable end of a Component Model `stream`.
pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
/// Represents the readable end of a Component Model `stream`.
pub struct RawStreamReader<O: StreamOps> {
handle: AtomicU32,
ops: O,
done: bool,
}
impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamReader")
.field("handle", &self.handle)
.finish()
}
}
impl<O: StreamOps> RawStreamReader<O> {
#[doc(hidden)]
pub fn new(handle: u32, ops: O) -> Self {
Self {
handle: AtomicU32::new(handle),
ops,
done: false,
}
}
#[doc(hidden)]
pub fn take_handle(&self) -> u32 {
let ret = self.opt_handle().unwrap();
self.handle.store(u32::MAX, Relaxed);
ret
}
/// Returns the index of the component-model handle that this stream is
/// using.
pub fn handle(&self) -> u32 {
self.opt_handle().unwrap()
}
fn opt_handle(&self) -> Option<u32> {
match self.handle.load(Relaxed) {
u32::MAX => None,
other => Some(other),
}
}
/// Starts a new read operation on this stream into `buf`.
///
/// This method will read values into the spare capacity of the `buf`
/// provided. If `buf` has no spare capacity then this will be equivalent
/// to a zero-length read.
///
/// Upon completion the `buf` will be yielded back to the caller via the
/// completion of the [`StreamRead`] future.
///
/// # Cancellation
///
/// Cancelling the returned future can be done with `drop` like all Rust
/// futures, but it does not mean that no values were read. To accurately
/// determine if values were read the [`StreamRead::cancel`] method must be
/// used.
pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
RawStreamRead {
op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
}
}
/// Reads a single item from this stream.
///
/// This is a higher-level method than [`StreamReader::read`] in that it
/// reads only a single item and does not expose control over cancellation.
pub async fn next(&mut self) -> Option<O::Payload> {
// TODO: should amortize this allocation and avoid doing it every time.
// Or somehow perhaps make this more optimal.
let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
buf.pop()
}
/// Reads all items from this stream and returns the list.
///
/// This method will read all remaining items from this stream into a list
/// and await the stream to be dropped.
pub async fn collect(mut self) -> Vec<O::Payload> {
let mut ret = Vec::new();
loop {
// If there's no more spare capacity then reserve room for one item
// which should trigger `Vec`'s built-in resizing logic, which will
// free up likely more capacity than just one slot.
if ret.len() == ret.capacity() {
ret.reserve(1);
}
let (status, buf) = self.read(ret).await;
ret = buf;
match status {
StreamResult::Complete(_) => {}
StreamResult::Dropped => break,
StreamResult::Cancelled => unreachable!(),
}
}
ret
}
}
impl<O: StreamOps> Drop for RawStreamReader<O> {
fn drop(&mut self) {
let Some(handle) = self.opt_handle() else {
return;
};
unsafe {
rtdebug!("stream.drop-readable({})", handle);
self.ops.drop_readable(handle);
}
}
}
/// Represents a read operation which may be cancelled prior to completion.
pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
/// Represents a read operation which may be cancelled prior to completion.
pub struct RawStreamRead<'a, O: StreamOps> {
op: WaitableOperation<StreamReadOp<'a, O>>,
}
struct StreamReadOp<'a, O: StreamOps> {
reader: &'a mut RawStreamReader<O>,
}
unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
type Start = Vec<O::Payload>;
type InProgress = (Vec<O::Payload>, Option<Cleanup>);
type Result = (StreamResult, Vec<O::Payload>);
type Cancel = (StreamResult, Vec<O::Payload>);
fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
if self.reader.done {
return (DROPPED, (buf, None));
}
let cap = buf.spare_capacity_mut();
let ptr;
let cleanup;
// If `T` requires a lifting operation, then allocate a slab of memory
// which will store the canonical ABI read. Otherwise we can use the
// raw capacity in `buf` itself.
if self.reader.ops.native_abi_matches_canonical_abi() {
ptr = cap.as_mut_ptr().cast();
cleanup = None;
} else {
let elem_layout = self.reader.ops.elem_layout();
let layout =
Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
.unwrap();
(ptr, cleanup) = Cleanup::new(layout);
}
// SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
// persist with this async operation itself.
let code = unsafe {
self.reader
.ops
.start_read(self.reader.handle(), ptr, cap.len())
};
rtdebug!(
"stream.read({}, {ptr:?}, {}) = {code:#x}",
self.reader.handle(),
cap.len()
);
(code, (buf, cleanup))
}
fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
(StreamResult::Cancelled, buf)
}
fn in_progress_update(
&mut self,
(mut buf, cleanup): Self::InProgress,
code: u32,
) -> Result<Self::Result, Self::InProgress> {
match ReturnCode::decode(code) {
ReturnCode::Blocked => Err((buf, cleanup)),
// Note that the `cleanup`, if any, is discarded here.
ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
// When an in-progress read is successfully cancelled then the
// allocation that was being read into, if any, is just discarded.
//
// TODO: should maybe thread this around like `AbiBuffer` to cache
// the read allocation?
ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
code @ (ReturnCode::Completed(amt)
| ReturnCode::Dropped(amt)
| ReturnCode::Cancelled(amt)) => {
let amt = usize::try_from(amt).unwrap();
let cur_len = buf.len();
assert!(amt <= buf.capacity() - cur_len);
if self.reader.ops.native_abi_matches_canonical_abi() {
// If no `lift` was necessary, then the results of this operation
// were read directly into `buf`, so just update its length now that
// values have been initialized.
unsafe {
buf.set_len(cur_len + amt);
}
} else {
// With a `lift` operation this now requires reading `amt` items
// from `cleanup` and pushing them into `buf`.
let mut ptr = cleanup
.as_ref()
.map(|c| c.ptr.as_ptr())
.unwrap_or(ptr::null_mut());
for _ in 0..amt {
unsafe {
buf.push(self.reader.ops.lift(ptr));
ptr = ptr.add(self.reader.ops.elem_layout().size());
}
}
}
// Intentionally dispose of `cleanup` here as, if it was used, all
// allocations have been read from it and appended to `buf`.
drop(cleanup);
if let ReturnCode::Dropped(_) = code {
self.reader.done = true;
}
Ok((StreamResult::Complete(amt), buf))
}
}
}
fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
self.reader.handle()
}
fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
// SAFETY: we're managing `reader` and all the various operational bits,
// so this relies on `WaitableOperation` being safe.
let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
code
}
fn result_into_cancel(&mut self, result: Self::Result) -> Self::Cancel {
result
}
}
impl<O: StreamOps> Future for RawStreamRead<'_, O> {
type Output = (StreamResult, Vec<O::Payload>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pin_project().poll_complete(cx)
}
}
impl<'a, O> RawStreamRead<'a, O>
where
O: StreamOps,
{
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
// SAFETY: we've chosen that when `Self` is pinned that it translates to
// always pinning the inner field, so that's codified here.
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
}
/// Cancel this read if it hasn't already completed.
///
/// This method will initiate a cancellation operation for this active
/// read. This may race with the actual read itself and so this may actually
/// complete with some results.
///
/// The final result of cancellation is returned, along with the original
/// buffer.
///
/// # Panics
///
/// Panics if the operation has already been completed via `Future::poll`,
/// or if this method is called twice.
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
self.pin_project().cancel()
}
}