cxx_async/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10// cxx-async/src/lib.rs
11//
12//! `cxx-async` is a Rust crate that extends the [`cxx`](http://cxx.rs/) library to provide
13//! seamless interoperability between asynchronous Rust code using `async`/`await` and [C++20
14//! coroutines] using `co_await`. If your C++ code is asynchronous, `cxx-async` can provide a more
15//! convenient, and potentially more efficient, alternative to callbacks. You can freely convert
16//! between C++ coroutines and Rust futures and await one from the other.
17//!
18//! It's important to emphasize what `cxx-async` isn't: it isn't a C++ binding to Tokio or any
19//! other Rust I/O library. Nor is it a Rust binding to `boost::asio` or similar. Such bindings
20//! could in principle be layered on top of `cxx-async` if desired, but this crate doesn't provide
21//! them out of the box. (Note that this is a tricky problem even in theory, since Rust async I/O
22//! code is generally tightly coupled to a single library such as Tokio, in much the same way C++
23//! async I/O code tends to be tightly coupled to libraries like `boost::asio`.) If you're writing
24//! server code, you can still use `cxx-async`, but you will need to ensure that both the Rust and
25//! C++ sides run separate I/O executors.
26//!
27//! `cxx-async` aims for compatibility with popular C++ coroutine support libraries. Right now,
28//! both the lightweight [`cppcoro`](https://github.com/lewissbaker/cppcoro) and the more
29//! comprehensive [Folly](https://github.com/facebook/folly/) are supported. Pull requests are
30//! welcome to support others.
31//!
32//! ## Quick tutorial
33//!
34//! To use `cxx-async`, first start by adding `cxx` to your project. Then add the following to your
35//! `Cargo.toml`:
36//!
37//! ```toml
38//! [dependencies]
39//! cxx-async = "0.1"
40//! ```
41//!
42//! Now, inside your `#[cxx::bridge]` module, declare a future type and some methods like so:
43//!
44//! ```ignore
45//! #[cxx::bridge]
46//! mod ffi {
47//!     // Declare type aliases for each of the future types you wish to use here. Then declare
48//!     // async C++ methods that you wish Rust to call. Make sure they return one of the future
49//!     // types you declared.
50//!     unsafe extern "C++" {
51//!         type RustFutureString = crate::RustFutureString;
52//!
53//!         fn hello_from_cpp() -> RustFutureString;
54//!     }
55//!
56//!     // Async Rust methods that you wish C++ to call go here. Again, make sure they return one of
57//!     // the future types you declared above.
58//!     extern "Rust" {
59//!         fn hello_from_rust() -> RustFutureString;
60//!     }
61//! }
62//! ```
63//!
64//! After the `#[cxx::bridge]` block, define the future types using the
65//! `#[cxx_async::bridge]` attribute:
66//!
67//! ```ignore
68//! // The `Output` type is the Rust type that this future yields.
69//! #[cxx_async::bridge]
70//! unsafe impl Future for RustFutureString {
71//!     type Output = String;
72//! }
73//! ```
74//!
75//! Note that it's your responsibility to ensure that the type you specify for Output actually
76//! matches the type of the value that your future resolves to. Otherwise, undefined behavior can
77//! result.
78//!
79//! Next, in your C++ header, make sure to `#include` the right headers:
80//!
81//! ```cpp
82//! #include "rust/cxx.h"
83//! #include "rust/cxx_async.h"
84//! #include "rust/cxx_async_cppcoro.h"  // Or cxx_async_folly.h, as appropriate.
85//! ```
86//!
87//! And add a call to the `CXXASYNC_DEFINE_FUTURE` macro in your headers to define the C++ side of
88//! the future:
89//!
90//! ```cpp
91//! // The first argument is the C++ type that the future yields, and the second argument is the
92//! // fully-qualified name of the future, with `::` namespace separators replaced with commas. (For
93//! // instance, if your future is named `mycompany::myproject::RustFutureString`, you might write
94//! // `CXXASYNC_DEFINE_FUTURE(rust::String, mycompany, myproject, RustFutureString);`. The first
95//! // argument is the C++ type that `cxx` maps your Rust type to: in this case, `String` maps to
96//! // `rust::String`, so we supply `rust::String` here.
97//! //
98//! // Note that, because the C preprocessor doesn't know about the `<` and `>` brackets that
99//! // surround template arguments, a template type that contains multiple arguments (e.g.
100//! // `std::pair<int, std::string>`) will need to be factored out into a `typedef` to be used
101//! // inside `CXXASYNC_DEFINE_FUTURE`. Otherwise, the C preprocessor won't parse it properly.
102//! //
103//! // This macro must be invoked at the top level, not in a namespace.
104//! CXXASYNC_DEFINE_FUTURE(rust::String, RustFutureString);
105//! ```
106//!
107//! You're done! Now you can define asynchronous C++ code that Rust can call:
108//!
109//! ```cpp
110//! RustFutureString hello_from_cpp() {
111//!     co_return std::string("Hello world!");
112//! }
113//! ```
114//!
115//! On the Rust side:
116//!
117//! ```ignore
118//! async fn call_cpp() -> String {
119//!     // This returns a Result (with the error variant populated if C++ threw an exception), so
120//!     // you need to unwrap it:
121//!     ffi::hello_from_cpp().await.unwrap()
122//! }
123//! ```
124//!
125//! And likewise, define some asynchronous Rust code that C++ can call:
126//!
127//! ```ignore
128//! use cxx_async::CxxAsyncResult;
129//! fn hello_from_rust() -> RustFutureString {
130//!     // You can instead use `fallible` if your async block returns a Result.
131//!     RustFutureString::infallible(async { "Hello world!".to_owned() })
132//! }
133//! ```
134//!
135//! Over on the C++ side:
136//!
137//! ```cpp
138//! cppcoro::task<rust::String> call_rust() {
139//!     co_return hello_from_rust();
140//! }
141//! ```
142//!
143//! In this way, you should now be able to freely await futures on either side.
144//!
145//! [C++20 coroutines]: https://en.cppreference.com/w/cpp/language/coroutines
146
147#![warn(missing_docs)]
148
149extern crate link_cplusplus;
150
151use crate::execlet::Execlet;
152use crate::execlet::ExecletReaper;
153use crate::execlet::RustExeclet;
154use futures::Stream;
155use futures::StreamExt;
156use std::convert::From;
157use std::error::Error;
158use std::ffi::CStr;
159use std::fmt::Debug;
160use std::fmt::Display;
161use std::fmt::Formatter;
162use std::fmt::Result as FmtResult;
163use std::future::Future;
164use std::io;
165use std::io::Write;
166use std::os::raw::c_char;
167use std::panic;
168use std::panic::AssertUnwindSafe;
169use std::pin::Pin;
170use std::process;
171use std::ptr;
172use std::sync::Arc;
173use std::sync::Mutex;
174use std::task::Context;
175use std::task::Poll;
176use std::task::RawWaker;
177use std::task::RawWakerVTable;
178use std::task::Waker;
179
180const FUTURE_STATUS_PENDING: u32 = 0;
181const FUTURE_STATUS_COMPLETE: u32 = 1;
182const FUTURE_STATUS_ERROR: u32 = 2;
183const FUTURE_STATUS_RUNNING: u32 = 3;
184
185const SEND_RESULT_WAIT: u32 = 0;
186const SEND_RESULT_SENT: u32 = 1;
187const SEND_RESULT_FINISHED: u32 = 2;
188
189pub use cxx_async_macro::bridge;
190
191#[doc(hidden)]
192pub use pin_utils::unsafe_pinned;
193
194// Replacements for macros that panic that are guaranteed to cause an abort, so that we don't unwind
195// across C++ frames.
196
197macro_rules! safe_panic {
198    ($($args:expr),*) => {
199        {
200            use ::std::io::Write;
201            drop(write!(::std::io::stderr(), $($args),*));
202            drop(writeln!(::std::io::stderr(), " at {}:{}", file!(), line!()));
203            ::std::process::abort();
204        }
205    }
206}
207
208#[cfg(debug_assertions)]
209macro_rules! safe_debug_assert {
210    ($cond:expr) => {{
211        use ::std::io::Write;
212        if !$cond {
213            drop(writeln!(
214                ::std::io::stderr(),
215                "assertion failed: {}",
216                stringify!($cond)
217            ));
218            ::std::process::abort();
219        }
220    }};
221}
222#[cfg(not(debug_assertions))]
223macro_rules! safe_debug_assert {
224    ($cond:expr) => {};
225}
226
227macro_rules! safe_unreachable {
228    () => {
229        safe_panic!("unreachable code executed")
230    };
231}
232
233trait SafeExpect {
234    type Output;
235    fn safe_expect(self, message: &str) -> Self::Output;
236}
237
238impl<T> SafeExpect for Option<T> {
239    type Output = T;
240    fn safe_expect(self, message: &str) -> T {
241        match self {
242            Some(value) => value,
243            None => safe_panic!("{}", message),
244        }
245    }
246}
247
248trait SafeUnwrap {
249    type Output;
250    fn safe_unwrap(self) -> Self::Output;
251}
252
253impl<T, E> SafeUnwrap for Result<T, E>
254where
255    E: Debug,
256{
257    type Output = T;
258    fn safe_unwrap(self) -> Self::Output {
259        match self {
260            Ok(value) => value,
261            Err(error) => safe_panic!("unexpected Result::Err: {:?}", error),
262        }
263    }
264}
265
266#[doc(hidden)]
267pub mod execlet;
268
269// Bridged glue functions.
270extern "C" {
271    fn cxxasync_suspended_coroutine_clone(waker_data: *mut u8) -> *mut u8;
272    fn cxxasync_suspended_coroutine_wake(waker_data: *mut u8);
273    fn cxxasync_suspended_coroutine_wake_by_ref(waker_data: *mut u8);
274    fn cxxasync_suspended_coroutine_drop(waker_data: *mut u8);
275}
276
277// A suspended C++ coroutine needs to act as a waker if it awaits a Rust future. This vtable
278// provides that glue.
279static CXXASYNC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
280    rust_suspended_coroutine_clone,
281    rust_suspended_coroutine_wake,
282    rust_suspended_coroutine_wake_by_ref,
283    rust_suspended_coroutine_drop,
284);
285
286/// Any exception that a C++ coroutine throws is automatically caught and converted into this error
287/// type.
288///
289/// This is just a wrapper around the result of `std::exception::what()`.
290#[derive(Debug)]
291pub struct CxxAsyncException {
292    what: Box<str>,
293}
294
295impl CxxAsyncException {
296    /// Creates a new exception with the given error message.
297    pub fn new(what: Box<str>) -> Self {
298        Self { what }
299    }
300
301    /// The value returned by `std::exception::what()`.
302    pub fn what(&self) -> &str {
303        &self.what
304    }
305}
306
307impl Display for CxxAsyncException {
308    fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
309        formatter.write_str(&self.what)
310    }
311}
312
313impl Error for CxxAsyncException {}
314
315/// A convenient shorthand for `Result<T, CxxAsyncException>`.
316pub type CxxAsyncResult<T> = Result<T, CxxAsyncException>;
317
318// A table of functions that the `bridge` macro emits for the C++ bridge to use.
319//
320// This must match the definition in `cxx_async.h`.
321#[repr(C)]
322#[doc(hidden)]
323pub struct CxxAsyncVtable {
324    pub channel: *mut u8,
325    pub sender_send: *mut u8,
326    pub sender_drop: *mut u8,
327    pub future_poll: *mut u8,
328    pub future_drop: *mut u8,
329}
330
331unsafe impl Send for CxxAsyncVtable {}
332unsafe impl Sync for CxxAsyncVtable {}
333
334// A sender/receiver pair for the return value of a wrapped one-shot C++ coroutine.
335//
336// This is an implementation detail and is not exposed to the programmer. It must match the
337// definition in `cxx_async.h`.
338#[repr(C)]
339#[doc(hidden)]
340pub struct CxxAsyncFutureChannel<Fut, Out> {
341    // The receiving end.
342    future: Fut,
343    // The sending end.
344    sender: CxxAsyncSender<Out>,
345}
346
347// A sender/receiver pair for the return value of a wrapped C++ multi-shot coroutine.
348//
349// This is an implementation detail and is not exposed to the programmer. It must match the
350// definition in `cxx_async.h`.
351#[repr(C)]
352#[doc(hidden)]
353pub struct CxxAsyncStreamChannel<Stm, Item>
354where
355    Stm: Stream<Item = CxxAsyncResult<Item>>,
356{
357    // The receiving end.
358    future: Stm,
359    // The sending end.
360    sender: CxxAsyncSender<Item>,
361}
362
363// The single-producer/single-consumer channel type that future and stream implementations use to
364// pass values between the two languages.
365//
366// We can't use a `futures::channel::mpsc` channel because it can deadlock. With a standard MPSC
367// channel, if we try to send a value when the buffer is full and the receiving end is woken up and
368// then tries to receive the value, a deadlock occurs, as the MPSC channel doesn't drop locks before
369// calling the waker.
370struct SpscChannel<T>(Arc<Mutex<SpscChannelImpl<T>>>);
371
372// Data for each SPSC channel.
373struct SpscChannelImpl<T> {
374    // The waker waiting on the value.
375    //
376    // This can either be the sending end waiting for the receiving end to receive a previously-sent
377    // value (for streams only) or the receiving end waiting for the sending end to post a value.
378    waiter: Option<Waker>,
379    // The value waiting to be read.
380    value: Option<T>,
381    // An exception from the C++ side that is to be delivered over to the Rust side.
382    exception: Option<CxxAsyncException>,
383    // True if the channel is closed; false otherwise.
384    closed: bool,
385}
386
387impl<T> SpscChannel<T> {
388    // Creates a new SPSC channel.
389    fn new() -> SpscChannel<T> {
390        SpscChannel(Arc::new(Mutex::new(SpscChannelImpl {
391            waiter: None,
392            value: None,
393            exception: None,
394            closed: false,
395        })))
396    }
397
398    // Marks the channel as closed. Only the sending end may call this.
399    fn close(&self) {
400        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
401        let waiter;
402        {
403            let mut this = self.0.lock().safe_unwrap();
404            if this.closed {
405                safe_panic!("Attempted to close an `SpscChannel` that's already closed!")
406            }
407            this.closed = true;
408            waiter = this.waiter.take();
409        }
410
411        if let Some(waiter) = waiter {
412            waiter.wake();
413        }
414    }
415
416    // Attempts to send a value. If this channel already has a value yet to be read, this function
417    // returns false. Otherwise, it calls the provided closure to retrieve the value and returns
418    // true.
419    //
420    // This callback-based design eliminates the requirement to return the original value if the
421    // send fails.
422    fn try_send_value_with<F>(&self, context: Option<&Context>, getter: F) -> bool
423    where
424        F: FnOnce() -> T,
425    {
426        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
427        let waiter;
428        {
429            let mut this = self.0.lock().safe_unwrap();
430            if this.value.is_none() {
431                this.value = Some(getter());
432                waiter = this.waiter.take();
433            } else if context.is_some() && this.waiter.is_some() {
434                safe_panic!("Only one task may block on a `SpscChannel`!")
435            } else {
436                if let Some(context) = context {
437                    this.waiter = Some((*context.waker()).clone());
438                }
439                return false;
440            }
441        }
442
443        if let Some(waiter) = waiter {
444            waiter.wake();
445        }
446        true
447    }
448
449    // Raises an exception. This is synchronous and thus should never fail. It must only be called
450    // once, or not at all, for a given `SpscSender`.
451    fn send_exception(&self, exception: CxxAsyncException) {
452        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
453        let waiter = {
454            let mut this = self.0.lock().safe_unwrap();
455            safe_debug_assert!(this.exception.is_none());
456            this.exception = Some(exception);
457            this.waiter.take()
458        };
459
460        if let Some(waiter) = waiter {
461            waiter.wake();
462        }
463    }
464
465    // Attempts to receive a value. Returns `Poll::Pending` if no value is available and the channel
466    // isn't closed. Returns `Poll::Ready(None)` if the channel is closed. Otherwise, receives a
467    // value and returns `Poll::Ready(Some)`.
468    fn recv(&self, cx: &Context) -> Poll<Option<CxxAsyncResult<T>>> {
469        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
470        let (result, waiter);
471        {
472            let mut this = self.0.lock().safe_unwrap();
473            match this.value.take() {
474                Some(value) => {
475                    result = Ok(value);
476                    waiter = this.waiter.take();
477                }
478                None => match this.exception.take() {
479                    Some(exception) => {
480                        result = Err(exception);
481                        waiter = this.waiter.take();
482                    }
483                    None if this.closed => return Poll::Ready(None),
484                    None => {
485                        this.waiter = Some((*cx.waker()).clone());
486                        return Poll::Pending;
487                    }
488                },
489            }
490        }
491
492        if let Some(waiter) = waiter {
493            waiter.wake();
494        }
495        Poll::Ready(Some(result))
496    }
497}
498
499impl<T> Clone for SpscChannel<T> {
500    fn clone(&self) -> Self {
501        Self(self.0.clone())
502    }
503}
504
505// The concrete type of the stream that wraps a C++ coroutine, either one-shot (future) or
506// multi-shot (stream).
507//
508// The programmer only interacts with this abstractly behind a `Box<dyn Future>` or
509// `Box<dyn Stream>` trait object, so this type is considered an implementation detail. It must be
510// public because the `bridge_stream` macro needs to name it.
511#[doc(hidden)]
512pub struct CxxAsyncReceiver<Item> {
513    // The SPSC channel to receive on.
514    receiver: SpscChannel<Item>,
515    // Any execlet that must be driven when receiving.
516    execlet: Option<Execlet>,
517}
518
519// The concrete type of the sending end of a stream.
520//
521// This must be public because the `bridge_stream` macro needs to name it.
522#[doc(hidden)]
523#[repr(transparent)]
524pub struct CxxAsyncSender<Item>(*mut SpscChannel<Item>);
525
526impl<Item> Drop for CxxAsyncSender<Item> {
527    fn drop(&mut self) {
528        unsafe { drop(Box::from_raw(self.0)) }
529    }
530}
531
532// This is a little weird in that `CxxAsyncReceiver` behaves as a oneshot if it's treated as a
533// Future and an SPSC stream if it's treated as a Stream. But since the programmer only ever
534// interacts with these objects behind boxed trait objects that only expose one of the two traits,
535// it's not a problem.
536impl<Item> Stream for CxxAsyncReceiver<Item> {
537    type Item = CxxAsyncResult<Item>;
538
539    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
540        if let Some(ref execlet) = self.execlet {
541            execlet.run(cx);
542        }
543        self.receiver.recv(cx)
544    }
545}
546
547impl<Output> Future for CxxAsyncReceiver<Output> {
548    type Output = CxxAsyncResult<Output>;
549
550    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
551        if let Some(ref execlet) = self.execlet {
552            execlet.run(cx);
553        }
554        match self.receiver.recv(cx) {
555            Poll::Ready(Some(Ok(value))) => Poll::Ready(Ok(value)),
556            Poll::Ready(Some(Err(exception))) => Poll::Ready(Err(exception)),
557            Poll::Ready(None) => {
558                // This should never happen, because a future should never be polled again after
559                // returning `Ready`.
560                safe_panic!("Attempted to use a stream as a future!")
561            }
562            Poll::Pending => Poll::Pending,
563        }
564    }
565}
566
567impl<Item> From<SpscChannel<Item>> for CxxAsyncReceiver<Item> {
568    fn from(receiver: SpscChannel<Item>) -> Self {
569        CxxAsyncReceiver {
570            receiver,
571            execlet: None,
572        }
573    }
574}
575
576impl<Item> Drop for CxxAsyncReceiver<Item> {
577    fn drop(&mut self) {
578        let execlet = match self.execlet {
579            Some(ref execlet) => execlet,
580            None => return,
581        };
582        let receiver = self.receiver.0.lock().safe_unwrap();
583        if receiver.closed {
584            return;
585        }
586        ExecletReaper::get().add((*execlet).clone());
587    }
588}
589
590// The sending end that the C++ bridge uses to return a value to a Rust future.
591//
592// This is an implementation detail.
593#[doc(hidden)]
594pub trait RustSender {
595    type Output;
596    fn send(&mut self, value: CxxAsyncResult<Self::Output>);
597}
598
599/// Wraps an arbitrary Rust Future in a boxed `cxx-async` future so that it can be returned to C++.
600///
601/// You should not need to implement this manually; it's automatically implemented by the `bridge`
602/// macro.
603pub trait IntoCxxAsyncFuture: Sized {
604    /// The type of the value yielded by the future.
605    type Output;
606
607    /// Wraps a Rust Future that directly returns the output type.
608    ///
609    /// Use this when you aren't interested in propagating errors to C++ as exceptions.
610    fn infallible<Fut>(future: Fut) -> Self
611    where
612        Fut: Future<Output = Self::Output> + Send + 'static,
613    {
614        Self::fallible(async move { Ok(future.await) })
615    }
616
617    /// Wraps a Rust Future that returns the output type, wrapped in a `CxxAsyncResult`.
618    ///
619    /// Use this when you have error values that you want to turn into exceptions on the C++ side.
620    fn fallible<Fut>(future: Fut) -> Self
621    where
622        Fut: Future<Output = CxxAsyncResult<Self::Output>> + Send + 'static;
623}
624
625/// Wraps an arbitrary Rust Stream in a boxed `cxx-async` stream so that it can be returned to C++.
626///
627/// You should not need to implement this manually; it's automatically implemented by the
628/// `bridge_stream` macro.
629pub trait IntoCxxAsyncStream: Sized {
630    /// The type of the values yielded by the stream.
631    type Item;
632
633    /// Wraps a Rust Stream that directly yields items of the output type.
634    ///
635    /// Use this when you aren't interested in propagating errors to C++ as exceptions.
636    fn infallible<Stm>(stream: Stm) -> Self
637    where
638        Stm: Stream<Item = Self::Item> + Send + 'static,
639        Stm::Item: 'static,
640    {
641        Self::fallible(stream.map(Ok))
642    }
643
644    /// Wraps a Rust Stream that yields items of the output type, wrapped in `CxxAsyncResult`s.
645    ///
646    /// Use this when you have error values that you want to turn into exceptions on the C++ side.
647    fn fallible<Stm>(stream: Stm) -> Self
648    where
649        Stm: Stream<Item = CxxAsyncResult<Self::Item>> + Send + 'static;
650}
651
652// Creates a new oneshot sender/receiver pair for a future.
653//
654// SAFETY: This is a raw FFI function called by our C++ code.
655//
656// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
657#[doc(hidden)]
658pub unsafe extern "C" fn future_channel<Fut, Out>(
659    out_oneshot: *mut CxxAsyncFutureChannel<Fut, Out>,
660    execlet: *mut RustExeclet,
661) where
662    Fut: From<CxxAsyncReceiver<Out>> + Future<Output = CxxAsyncResult<Out>>,
663{
664    let channel = SpscChannel::new();
665    let oneshot = CxxAsyncFutureChannel {
666        sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
667        future: CxxAsyncReceiver::<Out> {
668            receiver: channel,
669            execlet: Some(Execlet::from_raw_ref(execlet)),
670        }
671        .into(),
672    };
673    ptr::write(out_oneshot, oneshot);
674}
675
676// Creates a new multi-shot sender/receiver pair for a stream.
677//
678// SAFETY: This is a raw FFI function called by our C++ code.
679//
680// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
681#[doc(hidden)]
682pub unsafe extern "C" fn stream_channel<Stm, Item>(
683    out_stream: *mut CxxAsyncStreamChannel<Stm, Item>,
684    execlet: *mut RustExeclet,
685) where
686    Stm: From<CxxAsyncReceiver<Item>> + Stream<Item = CxxAsyncResult<Item>>,
687{
688    let channel = SpscChannel::new();
689    let stream = CxxAsyncStreamChannel {
690        sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
691        future: CxxAsyncReceiver {
692            receiver: channel,
693            execlet: Some(Execlet::from_raw_ref(execlet)),
694        }
695        .into(),
696    };
697    ptr::write(out_stream, stream);
698}
699
700// C++ calls this to yield a value for a one-shot coroutine (future).
701//
702// SAFETY: This is a low-level function called by our C++ code.
703//
704// Takes ownership of the value. The caller must not call its destructor.
705//
706// This function always closes the channel and returns `SEND_RESULT_FINISHED`.
707//
708// If `status` is `FUTURE_STATUS_COMPLETE`, then the given value is sent; otherwise, if `status` is
709// `FUTURE_STATUS_ERROR`, `value` must point to an exception string. `FUTURE_STATUS_RUNNING` is
710// illegal, because that value is only for streams, not futures.
711//
712// The `waker_data` parameter should always be null, because a one-shot coroutine should never
713// block on yielding a value.
714//
715// Any errors when sending are dropped on the floor. This is the right behavior because futures
716// can be legally dropped in Rust to signal cancellation.
717#[doc(hidden)]
718pub unsafe extern "C" fn sender_future_send<Item>(
719    this: &mut CxxAsyncSender<Item>,
720    status: u32,
721    value: *const u8,
722    waker_data: *const u8,
723) -> u32 {
724    safe_debug_assert!(waker_data.is_null());
725
726    let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
727    match status {
728        FUTURE_STATUS_COMPLETE => {
729            // This is a one-shot sender, so sending must always succeed.
730            let sent = this.try_send_value_with(None, || ptr::read(value as *const Item));
731            safe_debug_assert!(sent);
732        }
733        FUTURE_STATUS_ERROR => this.send_exception(unpack_exception(value)),
734        _ => safe_unreachable!(),
735    }
736
737    this.close();
738    SEND_RESULT_FINISHED
739}
740
741// C++ calls this to yield a value for a multi-shot coroutine (stream).
742//
743// SAFETY: This is a low-level function called by our C++ code.
744//
745// Takes ownership of the value if and only if it was successfully sent (otherwise, leaves it
746// alone). The caller must not call its destructor on a successful send.
747//
748// This function returns `SEND_RESULT_SENT` if the value was successfully sent, `SEND_RESULT_WAIT`
749// if the value wasn't sent because another value was already in the slot (in which case the task
750// will need to go to sleep), and `SEND_RESULT_FINISHED` if the channel was closed.
751//
752// If `status` is `FUTURE_STATUS_COMPLETE`, then the channel is closed, the value is ignored, and
753// this function immediately returns `SEND_RESULT_FINISHED`. Note that this behavior is different
754// from `sender_future_send`, which actually sends the value if `status` is
755// `FUTURE_STATUS_COMPLETE`.
756//
757// If `waker_data` is present, this identifies the coroutine handle that will be awakened if the
758// channel is currently full.
759//
760// Any errors when sending are dropped on the floor. This is because futures can be legally dropped
761// in Rust to signal cancellation.
762#[doc(hidden)]
763pub unsafe extern "C" fn sender_stream_send<Item>(
764    this: &mut CxxAsyncSender<Item>,
765    status: u32,
766    value: *const u8,
767    waker_data: *const u8,
768) -> u32 {
769    let (waker, context);
770    if waker_data.is_null() {
771        context = None;
772    } else {
773        waker = Waker::from_raw(RawWaker::new(
774            waker_data as *const (),
775            &CXXASYNC_WAKER_VTABLE,
776        ));
777        context = Some(Context::from_waker(&waker));
778    }
779
780    let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
781    match status {
782        FUTURE_STATUS_COMPLETE => {
783            this.close();
784            SEND_RESULT_FINISHED
785        }
786        FUTURE_STATUS_RUNNING => {
787            let sent =
788                this.try_send_value_with(context.as_ref(), || ptr::read(value as *const Item));
789            if sent {
790                SEND_RESULT_SENT
791            } else {
792                SEND_RESULT_WAIT
793            }
794        }
795        FUTURE_STATUS_ERROR => {
796            this.send_exception(unpack_exception(value));
797            this.close();
798            SEND_RESULT_FINISHED
799        }
800        _ => safe_unreachable!(),
801    }
802}
803
804// C++ calls this to destroy a sender.
805//
806// SAFETY: This is a low-level function called by our C++ code.
807#[doc(hidden)]
808pub unsafe extern "C" fn sender_drop<Item>(_: CxxAsyncSender<Item>) {
809    // Destructor automatically runs.
810}
811
812unsafe fn unpack_exception(value: *const u8) -> CxxAsyncException {
813    let string = CStr::from_ptr(value as *const c_char);
814    CxxAsyncException::new(string.to_string_lossy().into_owned().into_boxed_str())
815}
816
817// C++ calls this to poll a wrapped Rust future.
818//
819// SAFETY:
820// * This is a low-level function called by our C++ code.
821// * `Pin<&mut Future>` is marked `#[repr(transparent)]`, so it's FFI-safe.
822// * We catch all panics inside `poll` so that they don't unwind into C++.
823#[doc(hidden)]
824pub unsafe extern "C" fn future_poll<Fut, Out>(
825    this: Pin<&mut Fut>,
826    result: *mut u8,
827    waker_data: *const u8,
828) -> u32
829where
830    Fut: Future<Output = CxxAsyncResult<Out>>,
831{
832    let waker = Waker::from_raw(RawWaker::new(
833        waker_data as *const (),
834        &CXXASYNC_WAKER_VTABLE,
835    ));
836
837    let result = panic::catch_unwind(AssertUnwindSafe(move || {
838        let mut context = Context::from_waker(&waker);
839        match this.poll(&mut context) {
840            Poll::Ready(Ok(value)) => {
841                ptr::write(result as *mut Out, value);
842                FUTURE_STATUS_COMPLETE
843            }
844            Poll::Ready(Err(error)) => {
845                let error = error.what().to_owned();
846                ptr::write(result as *mut String, error);
847                FUTURE_STATUS_ERROR
848            }
849            Poll::Pending => FUTURE_STATUS_PENDING,
850        }
851    }));
852
853    match result {
854        Ok(result) => result,
855        Err(error) => {
856            drop(writeln!(
857                io::stderr(),
858                "Rust async code panicked when awaited from C++: {:?}",
859                error
860            ));
861            process::abort();
862        }
863    }
864}
865
866// C++ calls this to drop a Rust future.
867//
868// SAFETY:
869// * This is a low-level function called by our C++ code.
870#[doc(hidden)]
871pub unsafe extern "C" fn future_drop<Fut>(future: *mut Fut) {
872    ptr::drop_in_place(future);
873}
874
875// Bumps the reference count on a suspended C++ coroutine.
876//
877// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
878unsafe fn rust_suspended_coroutine_clone(address: *const ()) -> RawWaker {
879    RawWaker::new(
880        cxxasync_suspended_coroutine_clone(address as *mut () as *mut u8) as *mut () as *const (),
881        &CXXASYNC_WAKER_VTABLE,
882    )
883}
884
885// Resumes a suspended C++ coroutine and decrements its reference count.
886//
887// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
888unsafe fn rust_suspended_coroutine_wake(address: *const ()) {
889    cxxasync_suspended_coroutine_wake(address as *mut () as *mut u8)
890}
891
892// Resumes a suspended C++ coroutine without decrementing its reference count.
893//
894// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
895unsafe fn rust_suspended_coroutine_wake_by_ref(address: *const ()) {
896    cxxasync_suspended_coroutine_wake_by_ref(address as *mut () as *mut u8)
897}
898
899// Decrements the reference count on a suspended C++ coroutine.
900//
901// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
902unsafe fn rust_suspended_coroutine_drop(address: *const ()) {
903    cxxasync_suspended_coroutine_drop(address as *mut () as *mut u8)
904}
905
906// Reexports for the `#[bridge]` macro to use internally. Users of this crate shouldn't use these;
907// they should import the `futures` crate directly.
908#[doc(hidden)]
909pub mod private {
910    pub use futures::future::BoxFuture;
911    pub use futures::stream::BoxStream;
912}