cxx_async/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10// cxx-async/src/lib.rs
11//
12//! `cxx-async` is a Rust crate that extends the [`cxx`](http://cxx.rs/) library to provide
13//! seamless interoperability between asynchronous Rust code using `async`/`await` and [C++20
14//! coroutines] using `co_await`. If your C++ code is asynchronous, `cxx-async` can provide a more
15//! convenient, and potentially more efficient, alternative to callbacks. You can freely convert
16//! between C++ coroutines and Rust futures and await one from the other.
17//!
18//! It's important to emphasize what `cxx-async` isn't: it isn't a C++ binding to Tokio or any
19//! other Rust I/O library. Nor is it a Rust binding to `boost::asio` or similar. Such bindings
20//! could in principle be layered on top of `cxx-async` if desired, but this crate doesn't provide
21//! them out of the box. (Note that this is a tricky problem even in theory, since Rust async I/O
22//! code is generally tightly coupled to a single library such as Tokio, in much the same way C++
23//! async I/O code tends to be tightly coupled to libraries like `boost::asio`.) If you're writing
24//! server code, you can still use `cxx-async`, but you will need to ensure that both the Rust and
25//! C++ sides run separate I/O executors.
26//!
27//! `cxx-async` aims for compatibility with popular C++ coroutine support libraries. Right now,
28//! both the lightweight [`cppcoro`](https://github.com/lewissbaker/cppcoro) and the more
29//! comprehensive [Folly](https://github.com/facebook/folly/) are supported. Pull requests are
30//! welcome to support others.
31//!
32//! ## Quick tutorial
33//!
34//! To use `cxx-async`, first start by adding `cxx` to your project. Then add the following to your
35//! `Cargo.toml`:
36//!
37//! ```toml
38//! [dependencies]
39//! cxx-async = "0.1"
40//! ```
41//!
42//! Now, inside your `#[cxx::bridge]` module, declare a future type and some methods like so:
43//!
44//! ```ignore
45//! #[cxx::bridge]
46//! mod ffi {
47//!     // Declare type aliases for each of the future types you wish to use here. Then declare
48//!     // async C++ methods that you wish Rust to call. Make sure they return one of the future
49//!     // types you declared.
50//!     unsafe extern "C++" {
51//!         type RustFutureString = crate::RustFutureString;
52//!
53//!         fn hello_from_cpp() -> RustFutureString;
54//!     }
55//!
56//!     // Async Rust methods that you wish C++ to call go here. Again, make sure they return one of
57//!     // the future types you declared above.
58//!     extern "Rust" {
59//!         fn hello_from_rust() -> RustFutureString;
60//!     }
61//! }
62//! ```
63//!
64//! After the `#[cxx::bridge]` block, define the future types using the
65//! `#[cxx_async::bridge]` attribute:
66//!
67//! ```ignore
68//! // The `Output` type is the Rust type that this future yields.
69//! #[cxx_async::bridge]
70//! unsafe impl Future for RustFutureString {
71//!     type Output = String;
72//! }
73//! ```
74//!
75//! Note that it's your responsibility to ensure that the type you specify for Output actually
76//! matches the type of the value that your future resolves to. Otherwise, undefined behavior can
77//! result.
78//!
79//! Next, in your C++ header, make sure to `#include` the right headers:
80//!
81//! ```cpp
82//! #include "rust/cxx.h"
83//! #include "rust/cxx_async.h"
84//! #include "rust/cxx_async_cppcoro.h"  // Or cxx_async_folly.h, as appropriate.
85//! ```
86//!
87//! And add a call to the `CXXASYNC_DEFINE_FUTURE` macro in your headers to define the C++ side of
88//! the future:
89//!
90//! ```cpp
91//! // The first argument is the C++ type that the future yields, and the second argument is the
92//! // fully-qualified name of the future, with `::` namespace separators replaced with commas. (For
93//! // instance, if your future is named `mycompany::myproject::RustFutureString`, you might write
94//! // `CXXASYNC_DEFINE_FUTURE(rust::String, mycompany, myproject, RustFutureString);`. The first
95//! // argument is the C++ type that `cxx` maps your Rust type to: in this case, `String` maps to
96//! // `rust::String`, so we supply `rust::String` here.
97//! //
98//! // Note that, because the C preprocessor doesn't know about the `<` and `>` brackets that
99//! // surround template arguments, a template type that contains multiple arguments (e.g.
100//! // `std::pair<int, std::string>`) will need to be factored out into a `typedef` to be used
101//! // inside `CXXASYNC_DEFINE_FUTURE`. Otherwise, the C preprocessor won't parse it properly.
102//! //
103//! // This macro must be invoked at the top level, not in a namespace.
104//! CXXASYNC_DEFINE_FUTURE(rust::String, RustFutureString);
105//! ```
106//!
107//! You're done! Now you can define asynchronous C++ code that Rust can call:
108//!
109//! ```cpp
110//! RustFutureString hello_from_cpp() {
111//!     co_return std::string("Hello world!");
112//! }
113//! ```
114//!
115//! On the Rust side:
116//!
117//! ```ignore
118//! async fn call_cpp() -> String {
119//!     // This returns a Result (with the error variant populated if C++ threw an exception), so
120//!     // you need to unwrap it:
121//!     ffi::hello_from_cpp().await.unwrap()
122//! }
123//! ```
124//!
125//! And likewise, define some asynchronous Rust code that C++ can call:
126//!
127//! ```ignore
128//! use cxx_async::CxxAsyncResult;
129//! fn hello_from_rust() -> RustFutureString {
130//!     // You can instead use `fallible` if your async block returns a Result.
131//!     RustFutureString::infallible(async { "Hello world!".to_owned() })
132//! }
133//! ```
134//!
135//! Over on the C++ side:
136//!
137//! ```cpp
138//! cppcoro::task<rust::String> call_rust() {
139//!     co_return hello_from_rust();
140//! }
141//! ```
142//!
143//! In this way, you should now be able to freely await futures on either side.
144//!
145//! [C++20 coroutines]: https://en.cppreference.com/w/cpp/language/coroutines
146
147#![warn(missing_docs)]
148
149extern crate link_cplusplus;
150
151use crate::execlet::Execlet;
152use crate::execlet::ExecletReaper;
153use crate::execlet::RustExeclet;
154use futures::Stream;
155use futures::StreamExt;
156use std::convert::From;
157use std::error::Error;
158use std::ffi::CStr;
159use std::fmt::Debug;
160use std::fmt::Display;
161use std::fmt::Formatter;
162use std::fmt::Result as FmtResult;
163use std::future::Future;
164use std::io;
165use std::io::Write;
166use std::os::raw::c_char;
167use std::panic;
168use std::panic::AssertUnwindSafe;
169use std::pin::Pin;
170use std::process;
171use std::ptr;
172use std::sync::Arc;
173use std::sync::Mutex;
174use std::task::Context;
175use std::task::Poll;
176use std::task::RawWaker;
177use std::task::RawWakerVTable;
178use std::task::Waker;
179
180const FUTURE_STATUS_PENDING: u32 = 0;
181const FUTURE_STATUS_COMPLETE: u32 = 1;
182const FUTURE_STATUS_ERROR: u32 = 2;
183const FUTURE_STATUS_RUNNING: u32 = 3;
184
185const SEND_RESULT_WAIT: u32 = 0;
186const SEND_RESULT_SENT: u32 = 1;
187const SEND_RESULT_FINISHED: u32 = 2;
188
189pub use cxx_async_macro::bridge;
190
191// Replacements for macros that panic that are guaranteed to cause an abort, so that we don't unwind
192// across C++ frames.
193
194macro_rules! safe_panic {
195    ($($args:expr),*) => {
196        {
197            use ::std::io::Write;
198            drop(write!(::std::io::stderr(), $($args),*));
199            drop(writeln!(::std::io::stderr(), " at {}:{}", file!(), line!()));
200            ::std::process::abort();
201        }
202    }
203}
204
205#[cfg(debug_assertions)]
206macro_rules! safe_debug_assert {
207    ($cond:expr) => {{
208        use ::std::io::Write;
209        if !$cond {
210            drop(writeln!(
211                ::std::io::stderr(),
212                "assertion failed: {}",
213                stringify!($cond)
214            ));
215            ::std::process::abort();
216        }
217    }};
218}
219#[cfg(not(debug_assertions))]
220macro_rules! safe_debug_assert {
221    ($cond:expr) => {};
222}
223
224macro_rules! safe_unreachable {
225    () => {
226        safe_panic!("unreachable code executed")
227    };
228}
229
230trait SafeExpect {
231    type Output;
232    fn safe_expect(self, message: &str) -> Self::Output;
233}
234
235impl<T> SafeExpect for Option<T> {
236    type Output = T;
237    fn safe_expect(self, message: &str) -> T {
238        match self {
239            Some(value) => value,
240            None => safe_panic!("{}", message),
241        }
242    }
243}
244
245trait SafeUnwrap {
246    type Output;
247    fn safe_unwrap(self) -> Self::Output;
248}
249
250impl<T, E> SafeUnwrap for Result<T, E>
251where
252    E: Debug,
253{
254    type Output = T;
255    fn safe_unwrap(self) -> Self::Output {
256        match self {
257            Ok(value) => value,
258            Err(error) => safe_panic!("unexpected Result::Err: {:?}", error),
259        }
260    }
261}
262
263#[doc(hidden)]
264pub mod execlet;
265
266// Bridged glue functions.
267extern "C" {
268    fn cxxasync_suspended_coroutine_clone(waker_data: *mut u8) -> *mut u8;
269    fn cxxasync_suspended_coroutine_wake(waker_data: *mut u8);
270    fn cxxasync_suspended_coroutine_wake_by_ref(waker_data: *mut u8);
271    fn cxxasync_suspended_coroutine_drop(waker_data: *mut u8);
272}
273
274// A suspended C++ coroutine needs to act as a waker if it awaits a Rust future. This vtable
275// provides that glue.
276static CXXASYNC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
277    rust_suspended_coroutine_clone,
278    rust_suspended_coroutine_wake,
279    rust_suspended_coroutine_wake_by_ref,
280    rust_suspended_coroutine_drop,
281);
282
283/// Any exception that a C++ coroutine throws is automatically caught and converted into this error
284/// type.
285///
286/// This is just a wrapper around the result of `std::exception::what()`.
287#[derive(Debug)]
288pub struct CxxAsyncException {
289    what: Box<str>,
290}
291
292impl CxxAsyncException {
293    /// Creates a new exception with the given error message.
294    pub fn new(what: Box<str>) -> Self {
295        Self { what }
296    }
297
298    /// The value returned by `std::exception::what()`.
299    pub fn what(&self) -> &str {
300        &self.what
301    }
302}
303
304impl Display for CxxAsyncException {
305    fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
306        formatter.write_str(&self.what)
307    }
308}
309
310impl Error for CxxAsyncException {}
311
312/// A convenient shorthand for `Result<T, CxxAsyncException>`.
313pub type CxxAsyncResult<T> = Result<T, CxxAsyncException>;
314
315// A table of functions that the `bridge` macro emits for the C++ bridge to use.
316//
317// This must match the definition in `cxx_async.h`.
318#[repr(C)]
319#[doc(hidden)]
320pub struct CxxAsyncVtable {
321    pub channel: *mut u8,
322    pub sender_send: *mut u8,
323    pub sender_drop: *mut u8,
324    pub future_poll: *mut u8,
325    pub future_drop: *mut u8,
326}
327
328unsafe impl Send for CxxAsyncVtable {}
329unsafe impl Sync for CxxAsyncVtable {}
330
331// A sender/receiver pair for the return value of a wrapped one-shot C++ coroutine.
332//
333// This is an implementation detail and is not exposed to the programmer. It must match the
334// definition in `cxx_async.h`.
335#[repr(C)]
336#[doc(hidden)]
337pub struct CxxAsyncFutureChannel<Fut, Out> {
338    // The receiving end.
339    future: Fut,
340    // The sending end.
341    sender: CxxAsyncSender<Out>,
342}
343
344// A sender/receiver pair for the return value of a wrapped C++ multi-shot coroutine.
345//
346// This is an implementation detail and is not exposed to the programmer. It must match the
347// definition in `cxx_async.h`.
348#[repr(C)]
349#[doc(hidden)]
350pub struct CxxAsyncStreamChannel<Stm, Item>
351where
352    Stm: Stream<Item = CxxAsyncResult<Item>>,
353{
354    // The receiving end.
355    future: Stm,
356    // The sending end.
357    sender: CxxAsyncSender<Item>,
358}
359
360// The single-producer/single-consumer channel type that future and stream implementations use to
361// pass values between the two languages.
362//
363// We can't use a `futures::channel::mpsc` channel because it can deadlock. With a standard MPSC
364// channel, if we try to send a value when the buffer is full and the receiving end is woken up and
365// then tries to receive the value, a deadlock occurs, as the MPSC channel doesn't drop locks before
366// calling the waker.
367struct SpscChannel<T>(Arc<Mutex<SpscChannelImpl<T>>>);
368
369// Data for each SPSC channel.
370struct SpscChannelImpl<T> {
371    // The waker waiting on the value.
372    //
373    // This can either be the sending end waiting for the receiving end to receive a previously-sent
374    // value (for streams only) or the receiving end waiting for the sending end to post a value.
375    waiter: Option<Waker>,
376    // The value waiting to be read.
377    value: Option<T>,
378    // An exception from the C++ side that is to be delivered over to the Rust side.
379    exception: Option<CxxAsyncException>,
380    // True if the channel is closed; false otherwise.
381    closed: bool,
382}
383
384impl<T> SpscChannel<T> {
385    // Creates a new SPSC channel.
386    fn new() -> SpscChannel<T> {
387        SpscChannel(Arc::new(Mutex::new(SpscChannelImpl {
388            waiter: None,
389            value: None,
390            exception: None,
391            closed: false,
392        })))
393    }
394
395    // Marks the channel as closed. Only the sending end may call this.
396    fn close(&self) {
397        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
398        let waiter;
399        {
400            let mut this = self.0.lock().safe_unwrap();
401            if this.closed {
402                safe_panic!("Attempted to close an `SpscChannel` that's already closed!")
403            }
404            this.closed = true;
405            waiter = this.waiter.take();
406        }
407
408        if let Some(waiter) = waiter {
409            waiter.wake();
410        }
411    }
412
413    // Attempts to send a value. If this channel already has a value yet to be read, this function
414    // returns false. Otherwise, it calls the provided closure to retrieve the value and returns
415    // true.
416    //
417    // This callback-based design eliminates the requirement to return the original value if the
418    // send fails.
419    fn try_send_value_with<F>(&self, context: Option<&Context>, getter: F) -> bool
420    where
421        F: FnOnce() -> T,
422    {
423        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
424        let waiter;
425        {
426            let mut this = self.0.lock().safe_unwrap();
427            if this.value.is_none() {
428                this.value = Some(getter());
429                waiter = this.waiter.take();
430            } else if context.is_some() && this.waiter.is_some() {
431                safe_panic!("Only one task may block on a `SpscChannel`!")
432            } else {
433                if let Some(context) = context {
434                    this.waiter = Some((*context.waker()).clone());
435                }
436                return false;
437            }
438        }
439
440        if let Some(waiter) = waiter {
441            waiter.wake();
442        }
443        true
444    }
445
446    // Raises an exception. This is synchronous and thus should never fail. It must only be called
447    // once, or not at all, for a given `SpscSender`.
448    fn send_exception(&self, exception: CxxAsyncException) {
449        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
450        let waiter = {
451            let mut this = self.0.lock().safe_unwrap();
452            safe_debug_assert!(this.exception.is_none());
453            this.exception = Some(exception);
454            this.waiter.take()
455        };
456
457        if let Some(waiter) = waiter {
458            waiter.wake();
459        }
460    }
461
462    // Attempts to receive a value. Returns `Poll::Pending` if no value is available and the channel
463    // isn't closed. Returns `Poll::Ready(None)` if the channel is closed. Otherwise, receives a
464    // value and returns `Poll::Ready(Some)`.
465    fn recv(&self, cx: &Context) -> Poll<Option<CxxAsyncResult<T>>> {
466        // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
467        let (result, waiter);
468        {
469            let mut this = self.0.lock().safe_unwrap();
470            match this.value.take() {
471                Some(value) => {
472                    result = Ok(value);
473                    waiter = this.waiter.take();
474                }
475                None => match this.exception.take() {
476                    Some(exception) => {
477                        result = Err(exception);
478                        waiter = this.waiter.take();
479                    }
480                    None if this.closed => return Poll::Ready(None),
481                    None => {
482                        this.waiter = Some((*cx.waker()).clone());
483                        return Poll::Pending;
484                    }
485                },
486            }
487        }
488
489        if let Some(waiter) = waiter {
490            waiter.wake();
491        }
492        Poll::Ready(Some(result))
493    }
494}
495
496impl<T> Clone for SpscChannel<T> {
497    fn clone(&self) -> Self {
498        Self(self.0.clone())
499    }
500}
501
502// The concrete type of the stream that wraps a C++ coroutine, either one-shot (future) or
503// multi-shot (stream).
504//
505// The programmer only interacts with this abstractly behind a `Box<dyn Future>` or
506// `Box<dyn Stream>` trait object, so this type is considered an implementation detail. It must be
507// public because the `bridge_stream` macro needs to name it.
508#[doc(hidden)]
509pub struct CxxAsyncReceiver<Item> {
510    // The SPSC channel to receive on.
511    receiver: SpscChannel<Item>,
512    // Any execlet that must be driven when receiving.
513    execlet: Option<Execlet>,
514}
515
516// The concrete type of the sending end of a stream.
517//
518// This must be public because the `bridge_stream` macro needs to name it.
519#[doc(hidden)]
520#[repr(transparent)]
521pub struct CxxAsyncSender<Item>(*mut SpscChannel<Item>);
522
523impl<Item> Drop for CxxAsyncSender<Item> {
524    fn drop(&mut self) {
525        unsafe { drop(Box::from_raw(self.0)) }
526    }
527}
528
529// This is a little weird in that `CxxAsyncReceiver` behaves as a oneshot if it's treated as a
530// Future and an SPSC stream if it's treated as a Stream. But since the programmer only ever
531// interacts with these objects behind boxed trait objects that only expose one of the two traits,
532// it's not a problem.
533impl<Item> Stream for CxxAsyncReceiver<Item> {
534    type Item = CxxAsyncResult<Item>;
535
536    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
537        if let Some(ref execlet) = self.execlet {
538            execlet.run(cx);
539        }
540        self.receiver.recv(cx)
541    }
542}
543
544impl<Output> Future for CxxAsyncReceiver<Output> {
545    type Output = CxxAsyncResult<Output>;
546
547    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548        if let Some(ref execlet) = self.execlet {
549            execlet.run(cx);
550        }
551        match self.receiver.recv(cx) {
552            Poll::Ready(Some(Ok(value))) => Poll::Ready(Ok(value)),
553            Poll::Ready(Some(Err(exception))) => Poll::Ready(Err(exception)),
554            Poll::Ready(None) => {
555                // This should never happen, because a future should never be polled again after
556                // returning `Ready`.
557                safe_panic!("Attempted to use a stream as a future!")
558            }
559            Poll::Pending => Poll::Pending,
560        }
561    }
562}
563
564impl<Item> From<SpscChannel<Item>> for CxxAsyncReceiver<Item> {
565    fn from(receiver: SpscChannel<Item>) -> Self {
566        CxxAsyncReceiver {
567            receiver,
568            execlet: None,
569        }
570    }
571}
572
573impl<Item> Drop for CxxAsyncReceiver<Item> {
574    fn drop(&mut self) {
575        let execlet = match self.execlet {
576            Some(ref execlet) => execlet,
577            None => return,
578        };
579        let receiver = self.receiver.0.lock().safe_unwrap();
580        if receiver.closed {
581            return;
582        }
583        ExecletReaper::get().add((*execlet).clone());
584    }
585}
586
587// The sending end that the C++ bridge uses to return a value to a Rust future.
588//
589// This is an implementation detail.
590#[doc(hidden)]
591pub trait RustSender {
592    type Output;
593    fn send(&mut self, value: CxxAsyncResult<Self::Output>);
594}
595
596/// Wraps an arbitrary Rust Future in a boxed `cxx-async` future so that it can be returned to C++.
597///
598/// You should not need to implement this manually; it's automatically implemented by the `bridge`
599/// macro.
600pub trait IntoCxxAsyncFuture: Sized {
601    /// The type of the value yielded by the future.
602    type Output;
603
604    /// Wraps a Rust Future that directly returns the output type.
605    ///
606    /// Use this when you aren't interested in propagating errors to C++ as exceptions.
607    fn infallible<Fut>(future: Fut) -> Self
608    where
609        Fut: Future<Output = Self::Output> + Send + 'static,
610    {
611        Self::fallible(async move { Ok(future.await) })
612    }
613
614    /// Wraps a Rust Future that returns the output type, wrapped in a `CxxAsyncResult`.
615    ///
616    /// Use this when you have error values that you want to turn into exceptions on the C++ side.
617    fn fallible<Fut>(future: Fut) -> Self
618    where
619        Fut: Future<Output = CxxAsyncResult<Self::Output>> + Send + 'static;
620}
621
622/// Wraps an arbitrary Rust Stream in a boxed `cxx-async` stream so that it can be returned to C++.
623///
624/// You should not need to implement this manually; it's automatically implemented by the
625/// `bridge_stream` macro.
626pub trait IntoCxxAsyncStream: Sized {
627    /// The type of the values yielded by the stream.
628    type Item;
629
630    /// Wraps a Rust Stream that directly yields items of the output type.
631    ///
632    /// Use this when you aren't interested in propagating errors to C++ as exceptions.
633    fn infallible<Stm>(stream: Stm) -> Self
634    where
635        Stm: Stream<Item = Self::Item> + Send + 'static,
636        Stm::Item: 'static,
637    {
638        Self::fallible(stream.map(Ok))
639    }
640
641    /// Wraps a Rust Stream that yields items of the output type, wrapped in `CxxAsyncResult`s.
642    ///
643    /// Use this when you have error values that you want to turn into exceptions on the C++ side.
644    fn fallible<Stm>(stream: Stm) -> Self
645    where
646        Stm: Stream<Item = CxxAsyncResult<Self::Item>> + Send + 'static;
647}
648
649// Creates a new oneshot sender/receiver pair for a future.
650//
651// SAFETY: This is a raw FFI function called by our C++ code.
652//
653// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
654#[doc(hidden)]
655pub unsafe extern "C" fn future_channel<Fut, Out>(
656    out_oneshot: *mut CxxAsyncFutureChannel<Fut, Out>,
657    execlet: *mut RustExeclet,
658) where
659    Fut: From<CxxAsyncReceiver<Out>> + Future<Output = CxxAsyncResult<Out>>,
660{
661    let channel = SpscChannel::new();
662    let oneshot = CxxAsyncFutureChannel {
663        sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
664        future: CxxAsyncReceiver::<Out> {
665            receiver: channel,
666            execlet: Some(Execlet::from_raw_ref(execlet)),
667        }
668        .into(),
669    };
670    ptr::write(out_oneshot, oneshot);
671}
672
673// Creates a new multi-shot sender/receiver pair for a stream.
674//
675// SAFETY: This is a raw FFI function called by our C++ code.
676//
677// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
678#[doc(hidden)]
679pub unsafe extern "C" fn stream_channel<Stm, Item>(
680    out_stream: *mut CxxAsyncStreamChannel<Stm, Item>,
681    execlet: *mut RustExeclet,
682) where
683    Stm: From<CxxAsyncReceiver<Item>> + Stream<Item = CxxAsyncResult<Item>>,
684{
685    let channel = SpscChannel::new();
686    let stream = CxxAsyncStreamChannel {
687        sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
688        future: CxxAsyncReceiver {
689            receiver: channel,
690            execlet: Some(Execlet::from_raw_ref(execlet)),
691        }
692        .into(),
693    };
694    ptr::write(out_stream, stream);
695}
696
697// C++ calls this to yield a value for a one-shot coroutine (future).
698//
699// SAFETY: This is a low-level function called by our C++ code.
700//
701// Takes ownership of the value. The caller must not call its destructor.
702//
703// This function always closes the channel and returns `SEND_RESULT_FINISHED`.
704//
705// If `status` is `FUTURE_STATUS_COMPLETE`, then the given value is sent; otherwise, if `status` is
706// `FUTURE_STATUS_ERROR`, `value` must point to an exception string. `FUTURE_STATUS_RUNNING` is
707// illegal, because that value is only for streams, not futures.
708//
709// The `waker_data` parameter should always be null, because a one-shot coroutine should never
710// block on yielding a value.
711//
712// Any errors when sending are dropped on the floor. This is the right behavior because futures
713// can be legally dropped in Rust to signal cancellation.
714#[doc(hidden)]
715pub unsafe extern "C" fn sender_future_send<Item>(
716    this: &mut CxxAsyncSender<Item>,
717    status: u32,
718    value: *const u8,
719    waker_data: *const u8,
720) -> u32 {
721    safe_debug_assert!(waker_data.is_null());
722
723    let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
724    match status {
725        FUTURE_STATUS_COMPLETE => {
726            // This is a one-shot sender, so sending must always succeed.
727            let sent = this.try_send_value_with(None, || ptr::read(value as *const Item));
728            safe_debug_assert!(sent);
729        }
730        FUTURE_STATUS_ERROR => this.send_exception(unpack_exception(value)),
731        _ => safe_unreachable!(),
732    }
733
734    this.close();
735    SEND_RESULT_FINISHED
736}
737
738// C++ calls this to yield a value for a multi-shot coroutine (stream).
739//
740// SAFETY: This is a low-level function called by our C++ code.
741//
742// Takes ownership of the value if and only if it was successfully sent (otherwise, leaves it
743// alone). The caller must not call its destructor on a successful send.
744//
745// This function returns `SEND_RESULT_SENT` if the value was successfully sent, `SEND_RESULT_WAIT`
746// if the value wasn't sent because another value was already in the slot (in which case the task
747// will need to go to sleep), and `SEND_RESULT_FINISHED` if the channel was closed.
748//
749// If `status` is `FUTURE_STATUS_COMPLETE`, then the channel is closed, the value is ignored, and
750// this function immediately returns `SEND_RESULT_FINISHED`. Note that this behavior is different
751// from `sender_future_send`, which actually sends the value if `status` is
752// `FUTURE_STATUS_COMPLETE`.
753//
754// If `waker_data` is present, this identifies the coroutine handle that will be awakened if the
755// channel is currently full.
756//
757// Any errors when sending are dropped on the floor. This is because futures can be legally dropped
758// in Rust to signal cancellation.
759#[doc(hidden)]
760pub unsafe extern "C" fn sender_stream_send<Item>(
761    this: &mut CxxAsyncSender<Item>,
762    status: u32,
763    value: *const u8,
764    waker_data: *const u8,
765) -> u32 {
766    let (waker, context);
767    if waker_data.is_null() {
768        context = None;
769    } else {
770        waker = Waker::from_raw(RawWaker::new(
771            waker_data as *const (),
772            &CXXASYNC_WAKER_VTABLE,
773        ));
774        context = Some(Context::from_waker(&waker));
775    }
776
777    let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
778    match status {
779        FUTURE_STATUS_COMPLETE => {
780            this.close();
781            SEND_RESULT_FINISHED
782        }
783        FUTURE_STATUS_RUNNING => {
784            let sent =
785                this.try_send_value_with(context.as_ref(), || ptr::read(value as *const Item));
786            if sent {
787                SEND_RESULT_SENT
788            } else {
789                SEND_RESULT_WAIT
790            }
791        }
792        FUTURE_STATUS_ERROR => {
793            this.send_exception(unpack_exception(value));
794            this.close();
795            SEND_RESULT_FINISHED
796        }
797        _ => safe_unreachable!(),
798    }
799}
800
801// C++ calls this to destroy a sender.
802//
803// SAFETY: This is a low-level function called by our C++ code.
804#[doc(hidden)]
805pub unsafe extern "C" fn sender_drop<Item>(_: CxxAsyncSender<Item>) {
806    // Destructor automatically runs.
807}
808
809unsafe fn unpack_exception(value: *const u8) -> CxxAsyncException {
810    let string = CStr::from_ptr(value as *const c_char);
811    CxxAsyncException::new(string.to_string_lossy().into_owned().into_boxed_str())
812}
813
814// C++ calls this to poll a wrapped Rust future.
815//
816// SAFETY:
817// * This is a low-level function called by our C++ code.
818// * `Pin<&mut Future>` is marked `#[repr(transparent)]`, so it's FFI-safe.
819// * We catch all panics inside `poll` so that they don't unwind into C++.
820#[doc(hidden)]
821pub unsafe extern "C" fn future_poll<Fut, Out>(
822    this: Pin<&mut Fut>,
823    result: *mut u8,
824    waker_data: *const u8,
825) -> u32
826where
827    Fut: Future<Output = CxxAsyncResult<Out>>,
828{
829    let waker = Waker::from_raw(RawWaker::new(
830        waker_data as *const (),
831        &CXXASYNC_WAKER_VTABLE,
832    ));
833
834    let result = panic::catch_unwind(AssertUnwindSafe(move || {
835        let mut context = Context::from_waker(&waker);
836        match this.poll(&mut context) {
837            Poll::Ready(Ok(value)) => {
838                ptr::write(result as *mut Out, value);
839                FUTURE_STATUS_COMPLETE
840            }
841            Poll::Ready(Err(error)) => {
842                let error = error.what().to_owned();
843                ptr::write(result as *mut String, error);
844                FUTURE_STATUS_ERROR
845            }
846            Poll::Pending => FUTURE_STATUS_PENDING,
847        }
848    }));
849
850    match result {
851        Ok(result) => result,
852        Err(error) => {
853            drop(writeln!(
854                io::stderr(),
855                "Rust async code panicked when awaited from C++: {error:?}"
856            ));
857            process::abort();
858        }
859    }
860}
861
862// C++ calls this to drop a Rust future.
863//
864// SAFETY:
865// * This is a low-level function called by our C++ code.
866#[doc(hidden)]
867pub unsafe extern "C" fn future_drop<Fut>(future: *mut Fut) {
868    ptr::drop_in_place(future);
869}
870
871// Bumps the reference count on a suspended C++ coroutine.
872//
873// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
874unsafe fn rust_suspended_coroutine_clone(address: *const ()) -> RawWaker {
875    RawWaker::new(
876        cxxasync_suspended_coroutine_clone(address as *mut () as *mut u8) as *mut () as *const (),
877        &CXXASYNC_WAKER_VTABLE,
878    )
879}
880
881// Resumes a suspended C++ coroutine and decrements its reference count.
882//
883// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
884unsafe fn rust_suspended_coroutine_wake(address: *const ()) {
885    cxxasync_suspended_coroutine_wake(address as *mut () as *mut u8)
886}
887
888// Resumes a suspended C++ coroutine without decrementing its reference count.
889//
890// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
891unsafe fn rust_suspended_coroutine_wake_by_ref(address: *const ()) {
892    cxxasync_suspended_coroutine_wake_by_ref(address as *mut () as *mut u8)
893}
894
895// Decrements the reference count on a suspended C++ coroutine.
896//
897// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
898unsafe fn rust_suspended_coroutine_drop(address: *const ()) {
899    cxxasync_suspended_coroutine_drop(address as *mut () as *mut u8)
900}
901
902// Reexports for the `#[bridge]` macro to use internally. Users of this crate shouldn't use these;
903// they should import the `futures` crate directly.
904#[doc(hidden)]
905pub mod private {
906    pub use futures::future::BoxFuture;
907    pub use futures::stream::BoxStream;
908}