cxx_async/lib.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10// cxx-async/src/lib.rs
11//
12//! `cxx-async` is a Rust crate that extends the [`cxx`](http://cxx.rs/) library to provide
13//! seamless interoperability between asynchronous Rust code using `async`/`await` and [C++20
14//! coroutines] using `co_await`. If your C++ code is asynchronous, `cxx-async` can provide a more
15//! convenient, and potentially more efficient, alternative to callbacks. You can freely convert
16//! between C++ coroutines and Rust futures and await one from the other.
17//!
18//! It's important to emphasize what `cxx-async` isn't: it isn't a C++ binding to Tokio or any
19//! other Rust I/O library. Nor is it a Rust binding to `boost::asio` or similar. Such bindings
20//! could in principle be layered on top of `cxx-async` if desired, but this crate doesn't provide
21//! them out of the box. (Note that this is a tricky problem even in theory, since Rust async I/O
22//! code is generally tightly coupled to a single library such as Tokio, in much the same way C++
23//! async I/O code tends to be tightly coupled to libraries like `boost::asio`.) If you're writing
24//! server code, you can still use `cxx-async`, but you will need to ensure that both the Rust and
25//! C++ sides run separate I/O executors.
26//!
27//! `cxx-async` aims for compatibility with popular C++ coroutine support libraries. Right now,
28//! both the lightweight [`cppcoro`](https://github.com/lewissbaker/cppcoro) and the more
29//! comprehensive [Folly](https://github.com/facebook/folly/) are supported. Pull requests are
30//! welcome to support others.
31//!
32//! ## Quick tutorial
33//!
34//! To use `cxx-async`, first start by adding `cxx` to your project. Then add the following to your
35//! `Cargo.toml`:
36//!
37//! ```toml
38//! [dependencies]
39//! cxx-async = "0.1"
40//! ```
41//!
42//! Now, inside your `#[cxx::bridge]` module, declare a future type and some methods like so:
43//!
44//! ```ignore
45//! #[cxx::bridge]
46//! mod ffi {
47//! // Declare type aliases for each of the future types you wish to use here. Then declare
48//! // async C++ methods that you wish Rust to call. Make sure they return one of the future
49//! // types you declared.
50//! unsafe extern "C++" {
51//! type RustFutureString = crate::RustFutureString;
52//!
53//! fn hello_from_cpp() -> RustFutureString;
54//! }
55//!
56//! // Async Rust methods that you wish C++ to call go here. Again, make sure they return one of
57//! // the future types you declared above.
58//! extern "Rust" {
59//! fn hello_from_rust() -> RustFutureString;
60//! }
61//! }
62//! ```
63//!
64//! After the `#[cxx::bridge]` block, define the future types using the
65//! `#[cxx_async::bridge]` attribute:
66//!
67//! ```ignore
68//! // The `Output` type is the Rust type that this future yields.
69//! #[cxx_async::bridge]
70//! unsafe impl Future for RustFutureString {
71//! type Output = String;
72//! }
73//! ```
74//!
75//! Note that it's your responsibility to ensure that the type you specify for Output actually
76//! matches the type of the value that your future resolves to. Otherwise, undefined behavior can
77//! result.
78//!
79//! Next, in your C++ header, make sure to `#include` the right headers:
80//!
81//! ```cpp
82//! #include "rust/cxx.h"
83//! #include "rust/cxx_async.h"
84//! #include "rust/cxx_async_cppcoro.h" // Or cxx_async_folly.h, as appropriate.
85//! ```
86//!
87//! And add a call to the `CXXASYNC_DEFINE_FUTURE` macro in your headers to define the C++ side of
88//! the future:
89//!
90//! ```cpp
91//! // The first argument is the C++ type that the future yields, and the second argument is the
92//! // fully-qualified name of the future, with `::` namespace separators replaced with commas. (For
93//! // instance, if your future is named `mycompany::myproject::RustFutureString`, you might write
94//! // `CXXASYNC_DEFINE_FUTURE(rust::String, mycompany, myproject, RustFutureString);`. The first
95//! // argument is the C++ type that `cxx` maps your Rust type to: in this case, `String` maps to
96//! // `rust::String`, so we supply `rust::String` here.
97//! //
98//! // Note that, because the C preprocessor doesn't know about the `<` and `>` brackets that
99//! // surround template arguments, a template type that contains multiple arguments (e.g.
100//! // `std::pair<int, std::string>`) will need to be factored out into a `typedef` to be used
101//! // inside `CXXASYNC_DEFINE_FUTURE`. Otherwise, the C preprocessor won't parse it properly.
102//! //
103//! // This macro must be invoked at the top level, not in a namespace.
104//! CXXASYNC_DEFINE_FUTURE(rust::String, RustFutureString);
105//! ```
106//!
107//! You're done! Now you can define asynchronous C++ code that Rust can call:
108//!
109//! ```cpp
110//! RustFutureString hello_from_cpp() {
111//! co_return std::string("Hello world!");
112//! }
113//! ```
114//!
115//! On the Rust side:
116//!
117//! ```ignore
118//! async fn call_cpp() -> String {
119//! // This returns a Result (with the error variant populated if C++ threw an exception), so
120//! // you need to unwrap it:
121//! ffi::hello_from_cpp().await.unwrap()
122//! }
123//! ```
124//!
125//! And likewise, define some asynchronous Rust code that C++ can call:
126//!
127//! ```ignore
128//! use cxx_async::CxxAsyncResult;
129//! fn hello_from_rust() -> RustFutureString {
130//! // You can instead use `fallible` if your async block returns a Result.
131//! RustFutureString::infallible(async { "Hello world!".to_owned() })
132//! }
133//! ```
134//!
135//! Over on the C++ side:
136//!
137//! ```cpp
138//! cppcoro::task<rust::String> call_rust() {
139//! co_return hello_from_rust();
140//! }
141//! ```
142//!
143//! In this way, you should now be able to freely await futures on either side.
144//!
145//! [C++20 coroutines]: https://en.cppreference.com/w/cpp/language/coroutines
146
147#![warn(missing_docs)]
148
149extern crate link_cplusplus;
150
151use crate::execlet::Execlet;
152use crate::execlet::ExecletReaper;
153use crate::execlet::RustExeclet;
154use futures::Stream;
155use futures::StreamExt;
156use std::convert::From;
157use std::error::Error;
158use std::ffi::CStr;
159use std::fmt::Debug;
160use std::fmt::Display;
161use std::fmt::Formatter;
162use std::fmt::Result as FmtResult;
163use std::future::Future;
164use std::io;
165use std::io::Write;
166use std::os::raw::c_char;
167use std::panic;
168use std::panic::AssertUnwindSafe;
169use std::pin::Pin;
170use std::process;
171use std::ptr;
172use std::sync::Arc;
173use std::sync::Mutex;
174use std::task::Context;
175use std::task::Poll;
176use std::task::RawWaker;
177use std::task::RawWakerVTable;
178use std::task::Waker;
179
180const FUTURE_STATUS_PENDING: u32 = 0;
181const FUTURE_STATUS_COMPLETE: u32 = 1;
182const FUTURE_STATUS_ERROR: u32 = 2;
183const FUTURE_STATUS_RUNNING: u32 = 3;
184
185const SEND_RESULT_WAIT: u32 = 0;
186const SEND_RESULT_SENT: u32 = 1;
187const SEND_RESULT_FINISHED: u32 = 2;
188
189pub use cxx_async_macro::bridge;
190
191// Replacements for macros that panic that are guaranteed to cause an abort, so that we don't unwind
192// across C++ frames.
193
194macro_rules! safe_panic {
195 ($($args:expr),*) => {
196 {
197 use ::std::io::Write;
198 drop(write!(::std::io::stderr(), $($args),*));
199 drop(writeln!(::std::io::stderr(), " at {}:{}", file!(), line!()));
200 ::std::process::abort();
201 }
202 }
203}
204
205#[cfg(debug_assertions)]
206macro_rules! safe_debug_assert {
207 ($cond:expr) => {{
208 use ::std::io::Write;
209 if !$cond {
210 drop(writeln!(
211 ::std::io::stderr(),
212 "assertion failed: {}",
213 stringify!($cond)
214 ));
215 ::std::process::abort();
216 }
217 }};
218}
219#[cfg(not(debug_assertions))]
220macro_rules! safe_debug_assert {
221 ($cond:expr) => {};
222}
223
224macro_rules! safe_unreachable {
225 () => {
226 safe_panic!("unreachable code executed")
227 };
228}
229
230trait SafeExpect {
231 type Output;
232 fn safe_expect(self, message: &str) -> Self::Output;
233}
234
235impl<T> SafeExpect for Option<T> {
236 type Output = T;
237 fn safe_expect(self, message: &str) -> T {
238 match self {
239 Some(value) => value,
240 None => safe_panic!("{}", message),
241 }
242 }
243}
244
245trait SafeUnwrap {
246 type Output;
247 fn safe_unwrap(self) -> Self::Output;
248}
249
250impl<T, E> SafeUnwrap for Result<T, E>
251where
252 E: Debug,
253{
254 type Output = T;
255 fn safe_unwrap(self) -> Self::Output {
256 match self {
257 Ok(value) => value,
258 Err(error) => safe_panic!("unexpected Result::Err: {:?}", error),
259 }
260 }
261}
262
263#[doc(hidden)]
264pub mod execlet;
265
266// Bridged glue functions.
267extern "C" {
268 fn cxxasync_suspended_coroutine_clone(waker_data: *mut u8) -> *mut u8;
269 fn cxxasync_suspended_coroutine_wake(waker_data: *mut u8);
270 fn cxxasync_suspended_coroutine_wake_by_ref(waker_data: *mut u8);
271 fn cxxasync_suspended_coroutine_drop(waker_data: *mut u8);
272}
273
274// A suspended C++ coroutine needs to act as a waker if it awaits a Rust future. This vtable
275// provides that glue.
276static CXXASYNC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
277 rust_suspended_coroutine_clone,
278 rust_suspended_coroutine_wake,
279 rust_suspended_coroutine_wake_by_ref,
280 rust_suspended_coroutine_drop,
281);
282
283/// Any exception that a C++ coroutine throws is automatically caught and converted into this error
284/// type.
285///
286/// This is just a wrapper around the result of `std::exception::what()`.
287#[derive(Debug)]
288pub struct CxxAsyncException {
289 what: Box<str>,
290}
291
292impl CxxAsyncException {
293 /// Creates a new exception with the given error message.
294 pub fn new(what: Box<str>) -> Self {
295 Self { what }
296 }
297
298 /// The value returned by `std::exception::what()`.
299 pub fn what(&self) -> &str {
300 &self.what
301 }
302}
303
304impl Display for CxxAsyncException {
305 fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
306 formatter.write_str(&self.what)
307 }
308}
309
310impl Error for CxxAsyncException {}
311
312/// A convenient shorthand for `Result<T, CxxAsyncException>`.
313pub type CxxAsyncResult<T> = Result<T, CxxAsyncException>;
314
315// A table of functions that the `bridge` macro emits for the C++ bridge to use.
316//
317// This must match the definition in `cxx_async.h`.
318#[repr(C)]
319#[doc(hidden)]
320pub struct CxxAsyncVtable {
321 pub channel: *mut u8,
322 pub sender_send: *mut u8,
323 pub sender_drop: *mut u8,
324 pub future_poll: *mut u8,
325 pub future_drop: *mut u8,
326}
327
328unsafe impl Send for CxxAsyncVtable {}
329unsafe impl Sync for CxxAsyncVtable {}
330
331// A sender/receiver pair for the return value of a wrapped one-shot C++ coroutine.
332//
333// This is an implementation detail and is not exposed to the programmer. It must match the
334// definition in `cxx_async.h`.
335#[repr(C)]
336#[doc(hidden)]
337pub struct CxxAsyncFutureChannel<Fut, Out> {
338 // The receiving end.
339 future: Fut,
340 // The sending end.
341 sender: CxxAsyncSender<Out>,
342}
343
344// A sender/receiver pair for the return value of a wrapped C++ multi-shot coroutine.
345//
346// This is an implementation detail and is not exposed to the programmer. It must match the
347// definition in `cxx_async.h`.
348#[repr(C)]
349#[doc(hidden)]
350pub struct CxxAsyncStreamChannel<Stm, Item>
351where
352 Stm: Stream<Item = CxxAsyncResult<Item>>,
353{
354 // The receiving end.
355 future: Stm,
356 // The sending end.
357 sender: CxxAsyncSender<Item>,
358}
359
360// The single-producer/single-consumer channel type that future and stream implementations use to
361// pass values between the two languages.
362//
363// We can't use a `futures::channel::mpsc` channel because it can deadlock. With a standard MPSC
364// channel, if we try to send a value when the buffer is full and the receiving end is woken up and
365// then tries to receive the value, a deadlock occurs, as the MPSC channel doesn't drop locks before
366// calling the waker.
367struct SpscChannel<T>(Arc<Mutex<SpscChannelImpl<T>>>);
368
369// Data for each SPSC channel.
370struct SpscChannelImpl<T> {
371 // The waker waiting on the value.
372 //
373 // This can either be the sending end waiting for the receiving end to receive a previously-sent
374 // value (for streams only) or the receiving end waiting for the sending end to post a value.
375 waiter: Option<Waker>,
376 // The value waiting to be read.
377 value: Option<T>,
378 // An exception from the C++ side that is to be delivered over to the Rust side.
379 exception: Option<CxxAsyncException>,
380 // True if the channel is closed; false otherwise.
381 closed: bool,
382}
383
384impl<T> SpscChannel<T> {
385 // Creates a new SPSC channel.
386 fn new() -> SpscChannel<T> {
387 SpscChannel(Arc::new(Mutex::new(SpscChannelImpl {
388 waiter: None,
389 value: None,
390 exception: None,
391 closed: false,
392 })))
393 }
394
395 // Marks the channel as closed. Only the sending end may call this.
396 fn close(&self) {
397 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
398 let waiter;
399 {
400 let mut this = self.0.lock().safe_unwrap();
401 if this.closed {
402 safe_panic!("Attempted to close an `SpscChannel` that's already closed!")
403 }
404 this.closed = true;
405 waiter = this.waiter.take();
406 }
407
408 if let Some(waiter) = waiter {
409 waiter.wake();
410 }
411 }
412
413 // Attempts to send a value. If this channel already has a value yet to be read, this function
414 // returns false. Otherwise, it calls the provided closure to retrieve the value and returns
415 // true.
416 //
417 // This callback-based design eliminates the requirement to return the original value if the
418 // send fails.
419 fn try_send_value_with<F>(&self, context: Option<&Context>, getter: F) -> bool
420 where
421 F: FnOnce() -> T,
422 {
423 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
424 let waiter;
425 {
426 let mut this = self.0.lock().safe_unwrap();
427 if this.value.is_none() {
428 this.value = Some(getter());
429 waiter = this.waiter.take();
430 } else if context.is_some() && this.waiter.is_some() {
431 safe_panic!("Only one task may block on a `SpscChannel`!")
432 } else {
433 if let Some(context) = context {
434 this.waiter = Some((*context.waker()).clone());
435 }
436 return false;
437 }
438 }
439
440 if let Some(waiter) = waiter {
441 waiter.wake();
442 }
443 true
444 }
445
446 // Raises an exception. This is synchronous and thus should never fail. It must only be called
447 // once, or not at all, for a given `SpscSender`.
448 fn send_exception(&self, exception: CxxAsyncException) {
449 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
450 let waiter = {
451 let mut this = self.0.lock().safe_unwrap();
452 safe_debug_assert!(this.exception.is_none());
453 this.exception = Some(exception);
454 this.waiter.take()
455 };
456
457 if let Some(waiter) = waiter {
458 waiter.wake();
459 }
460 }
461
462 // Attempts to receive a value. Returns `Poll::Pending` if no value is available and the channel
463 // isn't closed. Returns `Poll::Ready(None)` if the channel is closed. Otherwise, receives a
464 // value and returns `Poll::Ready(Some)`.
465 fn recv(&self, cx: &Context) -> Poll<Option<CxxAsyncResult<T>>> {
466 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
467 let (result, waiter);
468 {
469 let mut this = self.0.lock().safe_unwrap();
470 match this.value.take() {
471 Some(value) => {
472 result = Ok(value);
473 waiter = this.waiter.take();
474 }
475 None => match this.exception.take() {
476 Some(exception) => {
477 result = Err(exception);
478 waiter = this.waiter.take();
479 }
480 None if this.closed => return Poll::Ready(None),
481 None => {
482 this.waiter = Some((*cx.waker()).clone());
483 return Poll::Pending;
484 }
485 },
486 }
487 }
488
489 if let Some(waiter) = waiter {
490 waiter.wake();
491 }
492 Poll::Ready(Some(result))
493 }
494}
495
496impl<T> Clone for SpscChannel<T> {
497 fn clone(&self) -> Self {
498 Self(self.0.clone())
499 }
500}
501
502// The concrete type of the stream that wraps a C++ coroutine, either one-shot (future) or
503// multi-shot (stream).
504//
505// The programmer only interacts with this abstractly behind a `Box<dyn Future>` or
506// `Box<dyn Stream>` trait object, so this type is considered an implementation detail. It must be
507// public because the `bridge_stream` macro needs to name it.
508#[doc(hidden)]
509pub struct CxxAsyncReceiver<Item> {
510 // The SPSC channel to receive on.
511 receiver: SpscChannel<Item>,
512 // Any execlet that must be driven when receiving.
513 execlet: Option<Execlet>,
514}
515
516// The concrete type of the sending end of a stream.
517//
518// This must be public because the `bridge_stream` macro needs to name it.
519#[doc(hidden)]
520#[repr(transparent)]
521pub struct CxxAsyncSender<Item>(*mut SpscChannel<Item>);
522
523impl<Item> Drop for CxxAsyncSender<Item> {
524 fn drop(&mut self) {
525 unsafe { drop(Box::from_raw(self.0)) }
526 }
527}
528
529// This is a little weird in that `CxxAsyncReceiver` behaves as a oneshot if it's treated as a
530// Future and an SPSC stream if it's treated as a Stream. But since the programmer only ever
531// interacts with these objects behind boxed trait objects that only expose one of the two traits,
532// it's not a problem.
533impl<Item> Stream for CxxAsyncReceiver<Item> {
534 type Item = CxxAsyncResult<Item>;
535
536 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
537 if let Some(ref execlet) = self.execlet {
538 execlet.run(cx);
539 }
540 self.receiver.recv(cx)
541 }
542}
543
544impl<Output> Future for CxxAsyncReceiver<Output> {
545 type Output = CxxAsyncResult<Output>;
546
547 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548 if let Some(ref execlet) = self.execlet {
549 execlet.run(cx);
550 }
551 match self.receiver.recv(cx) {
552 Poll::Ready(Some(Ok(value))) => Poll::Ready(Ok(value)),
553 Poll::Ready(Some(Err(exception))) => Poll::Ready(Err(exception)),
554 Poll::Ready(None) => {
555 // This should never happen, because a future should never be polled again after
556 // returning `Ready`.
557 safe_panic!("Attempted to use a stream as a future!")
558 }
559 Poll::Pending => Poll::Pending,
560 }
561 }
562}
563
564impl<Item> From<SpscChannel<Item>> for CxxAsyncReceiver<Item> {
565 fn from(receiver: SpscChannel<Item>) -> Self {
566 CxxAsyncReceiver {
567 receiver,
568 execlet: None,
569 }
570 }
571}
572
573impl<Item> Drop for CxxAsyncReceiver<Item> {
574 fn drop(&mut self) {
575 let execlet = match self.execlet {
576 Some(ref execlet) => execlet,
577 None => return,
578 };
579 let receiver = self.receiver.0.lock().safe_unwrap();
580 if receiver.closed {
581 return;
582 }
583 ExecletReaper::get().add((*execlet).clone());
584 }
585}
586
587// The sending end that the C++ bridge uses to return a value to a Rust future.
588//
589// This is an implementation detail.
590#[doc(hidden)]
591pub trait RustSender {
592 type Output;
593 fn send(&mut self, value: CxxAsyncResult<Self::Output>);
594}
595
596/// Wraps an arbitrary Rust Future in a boxed `cxx-async` future so that it can be returned to C++.
597///
598/// You should not need to implement this manually; it's automatically implemented by the `bridge`
599/// macro.
600pub trait IntoCxxAsyncFuture: Sized {
601 /// The type of the value yielded by the future.
602 type Output;
603
604 /// Wraps a Rust Future that directly returns the output type.
605 ///
606 /// Use this when you aren't interested in propagating errors to C++ as exceptions.
607 fn infallible<Fut>(future: Fut) -> Self
608 where
609 Fut: Future<Output = Self::Output> + Send + 'static,
610 {
611 Self::fallible(async move { Ok(future.await) })
612 }
613
614 /// Wraps a Rust Future that returns the output type, wrapped in a `CxxAsyncResult`.
615 ///
616 /// Use this when you have error values that you want to turn into exceptions on the C++ side.
617 fn fallible<Fut>(future: Fut) -> Self
618 where
619 Fut: Future<Output = CxxAsyncResult<Self::Output>> + Send + 'static;
620}
621
622/// Wraps an arbitrary Rust Stream in a boxed `cxx-async` stream so that it can be returned to C++.
623///
624/// You should not need to implement this manually; it's automatically implemented by the
625/// `bridge_stream` macro.
626pub trait IntoCxxAsyncStream: Sized {
627 /// The type of the values yielded by the stream.
628 type Item;
629
630 /// Wraps a Rust Stream that directly yields items of the output type.
631 ///
632 /// Use this when you aren't interested in propagating errors to C++ as exceptions.
633 fn infallible<Stm>(stream: Stm) -> Self
634 where
635 Stm: Stream<Item = Self::Item> + Send + 'static,
636 Stm::Item: 'static,
637 {
638 Self::fallible(stream.map(Ok))
639 }
640
641 /// Wraps a Rust Stream that yields items of the output type, wrapped in `CxxAsyncResult`s.
642 ///
643 /// Use this when you have error values that you want to turn into exceptions on the C++ side.
644 fn fallible<Stm>(stream: Stm) -> Self
645 where
646 Stm: Stream<Item = CxxAsyncResult<Self::Item>> + Send + 'static;
647}
648
649// Creates a new oneshot sender/receiver pair for a future.
650//
651// SAFETY: This is a raw FFI function called by our C++ code.
652//
653// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
654#[doc(hidden)]
655pub unsafe extern "C" fn future_channel<Fut, Out>(
656 out_oneshot: *mut CxxAsyncFutureChannel<Fut, Out>,
657 execlet: *mut RustExeclet,
658) where
659 Fut: From<CxxAsyncReceiver<Out>> + Future<Output = CxxAsyncResult<Out>>,
660{
661 let channel = SpscChannel::new();
662 let oneshot = CxxAsyncFutureChannel {
663 sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
664 future: CxxAsyncReceiver::<Out> {
665 receiver: channel,
666 execlet: Some(Execlet::from_raw_ref(execlet)),
667 }
668 .into(),
669 };
670 ptr::write(out_oneshot, oneshot);
671}
672
673// Creates a new multi-shot sender/receiver pair for a stream.
674//
675// SAFETY: This is a raw FFI function called by our C++ code.
676//
677// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
678#[doc(hidden)]
679pub unsafe extern "C" fn stream_channel<Stm, Item>(
680 out_stream: *mut CxxAsyncStreamChannel<Stm, Item>,
681 execlet: *mut RustExeclet,
682) where
683 Stm: From<CxxAsyncReceiver<Item>> + Stream<Item = CxxAsyncResult<Item>>,
684{
685 let channel = SpscChannel::new();
686 let stream = CxxAsyncStreamChannel {
687 sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
688 future: CxxAsyncReceiver {
689 receiver: channel,
690 execlet: Some(Execlet::from_raw_ref(execlet)),
691 }
692 .into(),
693 };
694 ptr::write(out_stream, stream);
695}
696
697// C++ calls this to yield a value for a one-shot coroutine (future).
698//
699// SAFETY: This is a low-level function called by our C++ code.
700//
701// Takes ownership of the value. The caller must not call its destructor.
702//
703// This function always closes the channel and returns `SEND_RESULT_FINISHED`.
704//
705// If `status` is `FUTURE_STATUS_COMPLETE`, then the given value is sent; otherwise, if `status` is
706// `FUTURE_STATUS_ERROR`, `value` must point to an exception string. `FUTURE_STATUS_RUNNING` is
707// illegal, because that value is only for streams, not futures.
708//
709// The `waker_data` parameter should always be null, because a one-shot coroutine should never
710// block on yielding a value.
711//
712// Any errors when sending are dropped on the floor. This is the right behavior because futures
713// can be legally dropped in Rust to signal cancellation.
714#[doc(hidden)]
715pub unsafe extern "C" fn sender_future_send<Item>(
716 this: &mut CxxAsyncSender<Item>,
717 status: u32,
718 value: *const u8,
719 waker_data: *const u8,
720) -> u32 {
721 safe_debug_assert!(waker_data.is_null());
722
723 let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
724 match status {
725 FUTURE_STATUS_COMPLETE => {
726 // This is a one-shot sender, so sending must always succeed.
727 let sent = this.try_send_value_with(None, || ptr::read(value as *const Item));
728 safe_debug_assert!(sent);
729 }
730 FUTURE_STATUS_ERROR => this.send_exception(unpack_exception(value)),
731 _ => safe_unreachable!(),
732 }
733
734 this.close();
735 SEND_RESULT_FINISHED
736}
737
738// C++ calls this to yield a value for a multi-shot coroutine (stream).
739//
740// SAFETY: This is a low-level function called by our C++ code.
741//
742// Takes ownership of the value if and only if it was successfully sent (otherwise, leaves it
743// alone). The caller must not call its destructor on a successful send.
744//
745// This function returns `SEND_RESULT_SENT` if the value was successfully sent, `SEND_RESULT_WAIT`
746// if the value wasn't sent because another value was already in the slot (in which case the task
747// will need to go to sleep), and `SEND_RESULT_FINISHED` if the channel was closed.
748//
749// If `status` is `FUTURE_STATUS_COMPLETE`, then the channel is closed, the value is ignored, and
750// this function immediately returns `SEND_RESULT_FINISHED`. Note that this behavior is different
751// from `sender_future_send`, which actually sends the value if `status` is
752// `FUTURE_STATUS_COMPLETE`.
753//
754// If `waker_data` is present, this identifies the coroutine handle that will be awakened if the
755// channel is currently full.
756//
757// Any errors when sending are dropped on the floor. This is because futures can be legally dropped
758// in Rust to signal cancellation.
759#[doc(hidden)]
760pub unsafe extern "C" fn sender_stream_send<Item>(
761 this: &mut CxxAsyncSender<Item>,
762 status: u32,
763 value: *const u8,
764 waker_data: *const u8,
765) -> u32 {
766 let (waker, context);
767 if waker_data.is_null() {
768 context = None;
769 } else {
770 waker = Waker::from_raw(RawWaker::new(
771 waker_data as *const (),
772 &CXXASYNC_WAKER_VTABLE,
773 ));
774 context = Some(Context::from_waker(&waker));
775 }
776
777 let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
778 match status {
779 FUTURE_STATUS_COMPLETE => {
780 this.close();
781 SEND_RESULT_FINISHED
782 }
783 FUTURE_STATUS_RUNNING => {
784 let sent =
785 this.try_send_value_with(context.as_ref(), || ptr::read(value as *const Item));
786 if sent {
787 SEND_RESULT_SENT
788 } else {
789 SEND_RESULT_WAIT
790 }
791 }
792 FUTURE_STATUS_ERROR => {
793 this.send_exception(unpack_exception(value));
794 this.close();
795 SEND_RESULT_FINISHED
796 }
797 _ => safe_unreachable!(),
798 }
799}
800
801// C++ calls this to destroy a sender.
802//
803// SAFETY: This is a low-level function called by our C++ code.
804#[doc(hidden)]
805pub unsafe extern "C" fn sender_drop<Item>(_: CxxAsyncSender<Item>) {
806 // Destructor automatically runs.
807}
808
809unsafe fn unpack_exception(value: *const u8) -> CxxAsyncException {
810 let string = CStr::from_ptr(value as *const c_char);
811 CxxAsyncException::new(string.to_string_lossy().into_owned().into_boxed_str())
812}
813
814// C++ calls this to poll a wrapped Rust future.
815//
816// SAFETY:
817// * This is a low-level function called by our C++ code.
818// * `Pin<&mut Future>` is marked `#[repr(transparent)]`, so it's FFI-safe.
819// * We catch all panics inside `poll` so that they don't unwind into C++.
820#[doc(hidden)]
821pub unsafe extern "C" fn future_poll<Fut, Out>(
822 this: Pin<&mut Fut>,
823 result: *mut u8,
824 waker_data: *const u8,
825) -> u32
826where
827 Fut: Future<Output = CxxAsyncResult<Out>>,
828{
829 let waker = Waker::from_raw(RawWaker::new(
830 waker_data as *const (),
831 &CXXASYNC_WAKER_VTABLE,
832 ));
833
834 let result = panic::catch_unwind(AssertUnwindSafe(move || {
835 let mut context = Context::from_waker(&waker);
836 match this.poll(&mut context) {
837 Poll::Ready(Ok(value)) => {
838 ptr::write(result as *mut Out, value);
839 FUTURE_STATUS_COMPLETE
840 }
841 Poll::Ready(Err(error)) => {
842 let error = error.what().to_owned();
843 ptr::write(result as *mut String, error);
844 FUTURE_STATUS_ERROR
845 }
846 Poll::Pending => FUTURE_STATUS_PENDING,
847 }
848 }));
849
850 match result {
851 Ok(result) => result,
852 Err(error) => {
853 drop(writeln!(
854 io::stderr(),
855 "Rust async code panicked when awaited from C++: {error:?}"
856 ));
857 process::abort();
858 }
859 }
860}
861
862// C++ calls this to drop a Rust future.
863//
864// SAFETY:
865// * This is a low-level function called by our C++ code.
866#[doc(hidden)]
867pub unsafe extern "C" fn future_drop<Fut>(future: *mut Fut) {
868 ptr::drop_in_place(future);
869}
870
871// Bumps the reference count on a suspended C++ coroutine.
872//
873// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
874unsafe fn rust_suspended_coroutine_clone(address: *const ()) -> RawWaker {
875 RawWaker::new(
876 cxxasync_suspended_coroutine_clone(address as *mut () as *mut u8) as *mut () as *const (),
877 &CXXASYNC_WAKER_VTABLE,
878 )
879}
880
881// Resumes a suspended C++ coroutine and decrements its reference count.
882//
883// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
884unsafe fn rust_suspended_coroutine_wake(address: *const ()) {
885 cxxasync_suspended_coroutine_wake(address as *mut () as *mut u8)
886}
887
888// Resumes a suspended C++ coroutine without decrementing its reference count.
889//
890// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
891unsafe fn rust_suspended_coroutine_wake_by_ref(address: *const ()) {
892 cxxasync_suspended_coroutine_wake_by_ref(address as *mut () as *mut u8)
893}
894
895// Decrements the reference count on a suspended C++ coroutine.
896//
897// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
898unsafe fn rust_suspended_coroutine_drop(address: *const ()) {
899 cxxasync_suspended_coroutine_drop(address as *mut () as *mut u8)
900}
901
902// Reexports for the `#[bridge]` macro to use internally. Users of this crate shouldn't use these;
903// they should import the `futures` crate directly.
904#[doc(hidden)]
905pub mod private {
906 pub use futures::future::BoxFuture;
907 pub use futures::stream::BoxStream;
908}