wit-bindgen 0.46.0

Rust bindings generator and runtime support for WIT and the component model. Used when compiling Rust programs to the component model.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
//! For a high-level overview of how this module is implemented see the
//! module documentation in `future_support.rs`.

use crate::rt::async_support::waitable::{WaitableOp, WaitableOperation};
use crate::rt::async_support::{AbiBuffer, ReturnCode, DROPPED};
use {
    crate::rt::Cleanup,
    std::{
        alloc::Layout,
        fmt,
        future::Future,
        marker,
        pin::Pin,
        ptr,
        sync::atomic::{AtomicU32, Ordering::Relaxed},
        task::{Context, Poll},
        vec::Vec,
    },
};

/// Operations that a stream requires throughout the implementation.
///
/// This is generated by `wit_bindgen::generate!` primarily.
#[doc(hidden)]
pub struct StreamVtable<T> {
    /// The in-memory canonical ABI layout of a single value of `T`.
    pub layout: Layout,

    /// An optional callback where if provided will lower an owned `T` value
    /// into the `dst` pointer.
    ///
    /// If this is called the ownership of all of `T`'s lists and resources are
    /// passed to `dst`, possibly by reallocating if `T`'s layout differs from
    /// the canonical ABI layout.
    ///
    /// If this is `None` then it means that `T` has the same layout in-memory
    /// in Rust as it does in the canonical ABI. In such a situation the
    /// lower/lift operation can be dropped.
    pub lower: Option<unsafe fn(value: T, dst: *mut u8)>,

    /// Callback used to deallocate any owned lists in `dst` after a value has
    /// been successfully sent along a stream.
    ///
    /// `None` means that `T` has no lists internally.
    pub dealloc_lists: Option<unsafe fn(dst: *mut u8)>,

    /// Dual of `lower`, and like `lower` if this is missing then it means that
    /// `T` has the same in-memory representation in Rust and the canonical ABI.
    pub lift: Option<unsafe fn(dst: *mut u8) -> T>,

    /// The raw `stream.write` intrinsic.
    pub start_write: unsafe extern "C" fn(stream: u32, val: *const u8, amt: usize) -> u32,
    /// The raw `stream.read` intrinsic.
    pub start_read: unsafe extern "C" fn(stream: u32, val: *mut u8, amt: usize) -> u32,
    /// The raw `stream.cancel-write` intrinsic.
    pub cancel_write: unsafe extern "C" fn(stream: u32) -> u32,
    /// The raw `stream.cancel-read` intrinsic.
    pub cancel_read: unsafe extern "C" fn(stream: u32) -> u32,
    /// The raw `stream.drop-writable` intrinsic.
    pub drop_writable: unsafe extern "C" fn(stream: u32),
    /// The raw `stream.drop-readable` intrinsic.
    pub drop_readable: unsafe extern "C" fn(stream: u32),
    /// The raw `stream.new` intrinsic.
    pub new: unsafe extern "C" fn() -> u64,
}

/// Helper function to create a new read/write pair for a component model
/// stream.
pub unsafe fn stream_new<T>(
    vtable: &'static StreamVtable<T>,
) -> (StreamWriter<T>, StreamReader<T>) {
    unsafe {
        let handles = (vtable.new)();
        let reader = handles as u32;
        let writer = (handles >> 32) as u32;
        rtdebug!("stream.new() = [{writer}, {reader}]");
        (
            StreamWriter::new(writer, vtable),
            StreamReader::new(reader, vtable),
        )
    }
}

/// Represents the writable end of a Component Model `stream`.
pub struct StreamWriter<T: 'static> {
    handle: u32,
    vtable: &'static StreamVtable<T>,
    done: bool,
}

impl<T> StreamWriter<T> {
    #[doc(hidden)]
    pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
        Self {
            handle,
            vtable,
            done: false,
        }
    }

    /// Initiate a write of the `values` provided into this stream.
    ///
    /// This method is akin to an `async fn` except that the returned
    /// [`StreamWrite`] future can also be cancelled via [`StreamWrite::cancel`]
    /// to re-acquire undelivered values.
    ///
    /// This method will perform at most a single write of the `values`
    /// provided. The returned future will resolve once the write has completed.
    ///
    /// # Return Values
    ///
    /// The returned [`StreamWrite`] future returns a tuple of `(result, buf)`.
    /// The `result` can be `StreamResult::Complete(n)` meaning that `n` values
    /// were sent from `values` into this writer. A result of
    /// `StreamResult::Dropped` means that no values were sent and the other side
    /// has hung-up and sending values will no longer be possible.
    ///
    /// The `buf` returned is an [`AbiBuffer<T>`] which retains ownership of the
    /// original `values` provided here. That can be used to re-acquire `values`
    /// through the [`AbiBuffer::into_vec`] method. The `buf` maintains an
    /// internal cursor of how many values have been written and if the write
    /// should be resumed to write the entire buffer then the
    /// [`StreamWriter::write_buf`] method can be used to resume writing at the
    /// next value in the buffer.
    ///
    /// # Cancellation
    ///
    /// The returned [`StreamWrite`] future can be cancelled like any other Rust
    /// future via `drop`, but this means that `values` will be lost within the
    /// future. The [`StreamWrite::cancel`] method can be used to re-acquire the
    /// in-progress write that is being done with `values`. This is effectively
    /// a way of forcing the future to immediately resolve.
    ///
    /// Note that if this future is cancelled via `drop` it does not mean that
    /// no values were sent. It may be possible that values were still sent
    /// despite being cancelled. Cancelling a write and determining what
    /// happened must be done with [`StreamWrite::cancel`].
    pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
        self.write_buf(AbiBuffer::new(values, self.vtable))
    }

    /// Same as [`StreamWriter::write`], except this takes [`AbiBuffer<T>`]
    /// instead of `Vec<T>`.
    pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
        StreamWrite {
            op: WaitableOperation::new((self, values)),
        }
    }

    /// Writes all of the `values` provided into this stream.
    ///
    /// This is a higher-level method than [`StreamWriter::write`] and does not
    /// expose cancellation for example. This will successively attempt to write
    /// all of `values` provided into this stream. Upon completion the same
    /// vector will be returned and any remaining elements in the vector were
    /// not sent because the stream was dropped.
    pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
        // Perform an initial write which converts `values` into `AbiBuffer`.
        let (mut status, mut buf) = self.write(values).await;

        // While the previous write completed and there's still remaining items
        // in the buffer, perform another write.
        while let StreamResult::Complete(_) = status {
            if buf.remaining() == 0 {
                break;
            }
            (status, buf) = self.write_buf(buf).await;

            // FIXME(WebAssembly/component-model#490)
            if status == StreamResult::Cancelled {
                status = StreamResult::Complete(0);
            }
        }

        // Return back any values that weren't written by shifting them to the
        // front of the returned vector.
        assert!(buf.remaining() == 0 || matches!(status, StreamResult::Dropped));
        buf.into_vec()
    }

    /// Writes the singular `value` provided
    ///
    /// This is a higher-level method than [`StreamWriter::write`] and does not
    /// expose cancellation for example. This will attempt to send `value` on
    /// this stream.
    ///
    /// If the other end hangs up then the value is returned back as
    /// `Some(value)`, otherwise `None` is returned indicating the value was
    /// sent.
    pub async fn write_one(&mut self, value: T) -> Option<T> {
        // TODO: can probably be a bit more efficient about this and avoid
        // moving `value` onto the heap in some situations, but that's left as
        // an optimization for later.
        self.write_all(std::vec![value]).await.pop()
    }
}

impl<T> fmt::Debug for StreamWriter<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("StreamWriter")
            .field("handle", &self.handle)
            .finish()
    }
}

impl<T> Drop for StreamWriter<T> {
    fn drop(&mut self) {
        rtdebug!("stream.drop-writable({})", self.handle);
        unsafe {
            (self.vtable.drop_writable)(self.handle);
        }
    }
}

/// Represents a write operation which may be cancelled prior to completion.
pub struct StreamWrite<'a, T: 'static> {
    op: WaitableOperation<StreamWriteOp<'a, T>>,
}

struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);

/// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,
/// yielded by the [`StreamWrite`] or [`StreamRead`] futures.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum StreamResult {
    /// The provided number of values were successfully transferred.
    ///
    /// For writes this is how many items were written, and for reads this is
    /// how many items were read.
    Complete(usize),
    /// No values were written, the other end has dropped its handle.
    Dropped,
    /// No values were written, the operation was cancelled.
    Cancelled,
}

unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
where
    T: 'static,
{
    type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
    type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
    type Result = (StreamResult, AbiBuffer<T>);
    type Cancel = (StreamResult, AbiBuffer<T>);

    fn start((writer, buf): Self::Start) -> (u32, Self::InProgress) {
        if writer.done {
            return (DROPPED, (writer, buf));
        }

        let (ptr, len) = buf.abi_ptr_and_len();
        // SAFETY: sure hope this is safe, everything in this module and
        // `AbiBuffer` is trying to make this safe.
        let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
        rtdebug!(
            "stream.write({}, {ptr:?}, {len}) = {code:#x}",
            writer.handle
        );
        (code, (writer, buf))
    }

    fn start_cancelled((_writer, buf): Self::Start) -> Self::Cancel {
        (StreamResult::Cancelled, buf)
    }

    fn in_progress_update(
        (writer, mut buf): Self::InProgress,
        code: u32,
    ) -> Result<Self::Result, Self::InProgress> {
        match ReturnCode::decode(code) {
            ReturnCode::Blocked => Err((writer, buf)),
            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),
            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),
            code @ (ReturnCode::Completed(amt)
            | ReturnCode::Dropped(amt)
            | ReturnCode::Cancelled(amt)) => {
                let amt = amt.try_into().unwrap();
                buf.advance(amt);
                if let ReturnCode::Dropped(_) = code {
                    writer.done = true;
                }
                Ok((StreamResult::Complete(amt), buf))
            }
        }
    }

    fn in_progress_waitable((writer, _): &Self::InProgress) -> u32 {
        writer.handle
    }

    fn in_progress_cancel((writer, _): &Self::InProgress) -> u32 {
        // SAFETY: we're managing `writer` and all the various operational bits,
        // so this relies on `WaitableOperation` being safe.
        let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
        rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
        code
    }

    fn result_into_cancel(result: Self::Result) -> Self::Cancel {
        result
    }
}

impl<T: 'static> Future for StreamWrite<'_, T> {
    type Output = (StreamResult, AbiBuffer<T>);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.pin_project().poll_complete(cx)
    }
}

impl<'a, T: 'static> StreamWrite<'a, T> {
    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
        // SAFETY: we've chosen that when `Self` is pinned that it translates to
        // always pinning the inner field, so that's codified here.
        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
    }

    /// Cancel this write if it hasn't already completed.
    ///
    /// This method can be used to cancel a write-in-progress and re-acquire
    /// values being sent. Note that the result here may still indicate that
    /// some values were written if the race to cancel the write was lost.
    ///
    /// # Panics
    ///
    /// Panics if the operation has already been completed via `Future::poll`,
    /// or if this method is called twice.
    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
        self.pin_project().cancel()
    }
}

/// Represents the readable end of a Component Model `stream`.
pub struct StreamReader<T: 'static> {
    handle: AtomicU32,
    vtable: &'static StreamVtable<T>,
    done: bool,
}

impl<T> fmt::Debug for StreamReader<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("StreamReader")
            .field("handle", &self.handle)
            .finish()
    }
}

impl<T> StreamReader<T> {
    #[doc(hidden)]
    pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
        Self {
            handle: AtomicU32::new(handle),
            vtable,
            done: false,
        }
    }

    #[doc(hidden)]
    pub fn take_handle(&self) -> u32 {
        let ret = self.opt_handle().unwrap();
        self.handle.store(u32::MAX, Relaxed);
        ret
    }

    fn handle(&self) -> u32 {
        self.opt_handle().unwrap()
    }

    fn opt_handle(&self) -> Option<u32> {
        match self.handle.load(Relaxed) {
            u32::MAX => None,
            other => Some(other),
        }
    }

    /// Starts a new read operation on this stream into `buf`.
    ///
    /// This method will read values into the spare capacity of the `buf`
    /// provided. If `buf` has no spare capacity then this will be equivalent
    /// to a zero-length read.
    ///
    /// Upon completion the `buf` will be yielded back to the caller via the
    /// completion of the [`StreamRead`] future.
    ///
    /// # Cancellation
    ///
    /// Cancelling the returned future can be done with `drop` like all Rust
    /// futures, but it does not mean that no values were read. To accurately
    /// determine if values were read the [`StreamRead::cancel`] method must be
    /// used.
    pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
        StreamRead {
            op: WaitableOperation::new((self, buf)),
        }
    }

    /// Reads a single item from this stream.
    ///
    /// This is a higher-level method than [`StreamReader::read`] in that it
    /// reads only a single item and does not expose control over cancellation.
    pub async fn next(&mut self) -> Option<T> {
        // TODO: should amortize this allocation and avoid doing it every time.
        // Or somehow perhaps make this more optimal.
        let (_result, mut buf) = self.read(Vec::with_capacity(1)).await;
        buf.pop()
    }

    /// Reads all items from this stream and returns the list.
    ///
    /// This method will read all remaining items from this stream into a list
    /// and await the stream to be dropped.
    pub async fn collect(mut self) -> Vec<T> {
        let mut ret = Vec::new();
        loop {
            // If there's no more spare capacity then reserve room for one item
            // which should trigger `Vec`'s built-in resizing logic, which will
            // free up likely more capacity than just one slot.
            if ret.len() == ret.capacity() {
                ret.reserve(1);
            }
            let (status, buf) = self.read(ret).await;
            ret = buf;
            match status {
                StreamResult::Complete(_) => {}
                StreamResult::Dropped => break,
                StreamResult::Cancelled => unreachable!(),
            }
        }
        ret
    }
}

impl<T> Drop for StreamReader<T> {
    fn drop(&mut self) {
        let Some(handle) = self.opt_handle() else {
            return;
        };
        unsafe {
            rtdebug!("stream.drop-readable({})", handle);
            (self.vtable.drop_readable)(handle);
        }
    }
}

/// Represents a read operation which may be cancelled prior to completion.
pub struct StreamRead<'a, T: 'static> {
    op: WaitableOperation<StreamReadOp<'a, T>>,
}

struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);

unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
where
    T: 'static,
{
    type Start = (&'a mut StreamReader<T>, Vec<T>);
    type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
    type Result = (StreamResult, Vec<T>);
    type Cancel = (StreamResult, Vec<T>);

    fn start((reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
        if reader.done {
            return (DROPPED, (reader, buf, None));
        }

        let cap = buf.spare_capacity_mut();
        let ptr;
        let cleanup;
        // If `T` requires a lifting operation, then allocate a slab of memory
        // which will store the canonical ABI read. Otherwise we can use the
        // raw capacity in `buf` itself.
        if reader.vtable.lift.is_some() {
            let layout = Layout::from_size_align(
                reader.vtable.layout.size() * cap.len(),
                reader.vtable.layout.align(),
            )
            .unwrap();
            (ptr, cleanup) = Cleanup::new(layout);
        } else {
            ptr = cap.as_mut_ptr().cast();
            cleanup = None;
        }
        // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
        // persist with this async operation itself.
        let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
        rtdebug!(
            "stream.read({}, {ptr:?}, {}) = {code:#x}",
            reader.handle(),
            cap.len()
        );
        (code, (reader, buf, cleanup))
    }

    fn start_cancelled((_, buf): Self::Start) -> Self::Cancel {
        (StreamResult::Cancelled, buf)
    }

    fn in_progress_update(
        (reader, mut buf, cleanup): Self::InProgress,
        code: u32,
    ) -> Result<Self::Result, Self::InProgress> {
        match ReturnCode::decode(code) {
            ReturnCode::Blocked => Err((reader, buf, cleanup)),

            // Note that the `cleanup`, if any, is discarded here.
            ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),

            // When an in-progress read is successfully cancelled then the
            // allocation that was being read into, if any, is just discarded.
            //
            // TODO: should maybe thread this around like `AbiBuffer` to cache
            // the read allocation?
            ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),

            code @ (ReturnCode::Completed(amt)
            | ReturnCode::Dropped(amt)
            | ReturnCode::Cancelled(amt)) => {
                let amt = usize::try_from(amt).unwrap();
                let cur_len = buf.len();
                assert!(amt <= buf.capacity() - cur_len);

                match reader.vtable.lift {
                    // With a `lift` operation this now requires reading `amt` items
                    // from `cleanup` and pushing them into `buf`.
                    Some(lift) => {
                        let mut ptr = cleanup
                            .as_ref()
                            .map(|c| c.ptr.as_ptr())
                            .unwrap_or(ptr::null_mut());
                        for _ in 0..amt {
                            unsafe {
                                buf.push(lift(ptr));
                                ptr = ptr.add(reader.vtable.layout.size());
                            }
                        }
                    }

                    // If no `lift` was necessary, then the results of this operation
                    // were read directly into `buf`, so just update its length now that
                    // values have been initialized.
                    None => unsafe { buf.set_len(cur_len + amt) },
                }

                // Intentionally dispose of `cleanup` here as, if it was used, all
                // allocations have been read from it and appended to `buf`.
                drop(cleanup);
                if let ReturnCode::Dropped(_) = code {
                    reader.done = true;
                }
                Ok((StreamResult::Complete(amt), buf))
            }
        }
    }

    fn in_progress_waitable((reader, ..): &Self::InProgress) -> u32 {
        reader.handle()
    }

    fn in_progress_cancel((reader, ..): &Self::InProgress) -> u32 {
        // SAFETY: we're managing `reader` and all the various operational bits,
        // so this relies on `WaitableOperation` being safe.
        let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
        rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
        code
    }

    fn result_into_cancel(result: Self::Result) -> Self::Cancel {
        result
    }
}

impl<T: 'static> Future for StreamRead<'_, T> {
    type Output = (StreamResult, Vec<T>);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.pin_project().poll_complete(cx)
    }
}

impl<'a, T> StreamRead<'a, T> {
    fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
        // SAFETY: we've chosen that when `Self` is pinned that it translates to
        // always pinning the inner field, so that's codified here.
        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().op) }
    }

    /// Cancel this read if it hasn't already completed.
    ///
    /// This method will initiate a cancellation operation for this active
    /// read. This may race with the actual read itself and so this may actually
    /// complete with some results.
    ///
    /// The final result of cancellation is returned, along with the original
    /// buffer.
    ///
    /// # Panics
    ///
    /// Panics if the operation has already been completed via `Future::poll`,
    /// or if this method is called twice.
    pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
        self.pin_project().cancel()
    }
}