cxx_async/lib.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10// cxx-async/src/lib.rs
11//
12//! `cxx-async` is a Rust crate that extends the [`cxx`](http://cxx.rs/) library to provide
13//! seamless interoperability between asynchronous Rust code using `async`/`await` and [C++20
14//! coroutines] using `co_await`. If your C++ code is asynchronous, `cxx-async` can provide a more
15//! convenient, and potentially more efficient, alternative to callbacks. You can freely convert
16//! between C++ coroutines and Rust futures and await one from the other.
17//!
18//! It's important to emphasize what `cxx-async` isn't: it isn't a C++ binding to Tokio or any
19//! other Rust I/O library. Nor is it a Rust binding to `boost::asio` or similar. Such bindings
20//! could in principle be layered on top of `cxx-async` if desired, but this crate doesn't provide
21//! them out of the box. (Note that this is a tricky problem even in theory, since Rust async I/O
22//! code is generally tightly coupled to a single library such as Tokio, in much the same way C++
23//! async I/O code tends to be tightly coupled to libraries like `boost::asio`.) If you're writing
24//! server code, you can still use `cxx-async`, but you will need to ensure that both the Rust and
25//! C++ sides run separate I/O executors.
26//!
27//! `cxx-async` aims for compatibility with popular C++ coroutine support libraries. Right now,
28//! both the lightweight [`cppcoro`](https://github.com/lewissbaker/cppcoro) and the more
29//! comprehensive [Folly](https://github.com/facebook/folly/) are supported. Pull requests are
30//! welcome to support others.
31//!
32//! ## Quick tutorial
33//!
34//! To use `cxx-async`, first start by adding `cxx` to your project. Then add the following to your
35//! `Cargo.toml`:
36//!
37//! ```toml
38//! [dependencies]
39//! cxx-async = "0.1"
40//! ```
41//!
42//! Now, inside your `#[cxx::bridge]` module, declare a future type and some methods like so:
43//!
44//! ```ignore
45//! #[cxx::bridge]
46//! mod ffi {
47//! // Declare type aliases for each of the future types you wish to use here. Then declare
48//! // async C++ methods that you wish Rust to call. Make sure they return one of the future
49//! // types you declared.
50//! unsafe extern "C++" {
51//! type RustFutureString = crate::RustFutureString;
52//!
53//! fn hello_from_cpp() -> RustFutureString;
54//! }
55//!
56//! // Async Rust methods that you wish C++ to call go here. Again, make sure they return one of
57//! // the future types you declared above.
58//! extern "Rust" {
59//! fn hello_from_rust() -> RustFutureString;
60//! }
61//! }
62//! ```
63//!
64//! After the `#[cxx::bridge]` block, define the future types using the
65//! `#[cxx_async::bridge]` attribute:
66//!
67//! ```ignore
68//! // The `Output` type is the Rust type that this future yields.
69//! #[cxx_async::bridge]
70//! unsafe impl Future for RustFutureString {
71//! type Output = String;
72//! }
73//! ```
74//!
75//! Note that it's your responsibility to ensure that the type you specify for Output actually
76//! matches the type of the value that your future resolves to. Otherwise, undefined behavior can
77//! result.
78//!
79//! Next, in your C++ header, make sure to `#include` the right headers:
80//!
81//! ```cpp
82//! #include "rust/cxx.h"
83//! #include "rust/cxx_async.h"
84//! #include "rust/cxx_async_cppcoro.h" // Or cxx_async_folly.h, as appropriate.
85//! ```
86//!
87//! And add a call to the `CXXASYNC_DEFINE_FUTURE` macro in your headers to define the C++ side of
88//! the future:
89//!
90//! ```cpp
91//! // The first argument is the C++ type that the future yields, and the second argument is the
92//! // fully-qualified name of the future, with `::` namespace separators replaced with commas. (For
93//! // instance, if your future is named `mycompany::myproject::RustFutureString`, you might write
94//! // `CXXASYNC_DEFINE_FUTURE(rust::String, mycompany, myproject, RustFutureString);`. The first
95//! // argument is the C++ type that `cxx` maps your Rust type to: in this case, `String` maps to
96//! // `rust::String`, so we supply `rust::String` here.
97//! //
98//! // Note that, because the C preprocessor doesn't know about the `<` and `>` brackets that
99//! // surround template arguments, a template type that contains multiple arguments (e.g.
100//! // `std::pair<int, std::string>`) will need to be factored out into a `typedef` to be used
101//! // inside `CXXASYNC_DEFINE_FUTURE`. Otherwise, the C preprocessor won't parse it properly.
102//! //
103//! // This macro must be invoked at the top level, not in a namespace.
104//! CXXASYNC_DEFINE_FUTURE(rust::String, RustFutureString);
105//! ```
106//!
107//! You're done! Now you can define asynchronous C++ code that Rust can call:
108//!
109//! ```cpp
110//! RustFutureString hello_from_cpp() {
111//! co_return std::string("Hello world!");
112//! }
113//! ```
114//!
115//! On the Rust side:
116//!
117//! ```ignore
118//! async fn call_cpp() -> String {
119//! // This returns a Result (with the error variant populated if C++ threw an exception), so
120//! // you need to unwrap it:
121//! ffi::hello_from_cpp().await.unwrap()
122//! }
123//! ```
124//!
125//! And likewise, define some asynchronous Rust code that C++ can call:
126//!
127//! ```ignore
128//! use cxx_async::CxxAsyncResult;
129//! fn hello_from_rust() -> RustFutureString {
130//! // You can instead use `fallible` if your async block returns a Result.
131//! RustFutureString::infallible(async { "Hello world!".to_owned() })
132//! }
133//! ```
134//!
135//! Over on the C++ side:
136//!
137//! ```cpp
138//! cppcoro::task<rust::String> call_rust() {
139//! co_return hello_from_rust();
140//! }
141//! ```
142//!
143//! In this way, you should now be able to freely await futures on either side.
144//!
145//! [C++20 coroutines]: https://en.cppreference.com/w/cpp/language/coroutines
146
147#![warn(missing_docs)]
148
149extern crate link_cplusplus;
150
151use crate::execlet::Execlet;
152use crate::execlet::ExecletReaper;
153use crate::execlet::RustExeclet;
154use futures::Stream;
155use futures::StreamExt;
156use std::convert::From;
157use std::error::Error;
158use std::ffi::CStr;
159use std::fmt::Debug;
160use std::fmt::Display;
161use std::fmt::Formatter;
162use std::fmt::Result as FmtResult;
163use std::future::Future;
164use std::io;
165use std::io::Write;
166use std::os::raw::c_char;
167use std::panic;
168use std::panic::AssertUnwindSafe;
169use std::pin::Pin;
170use std::process;
171use std::ptr;
172use std::sync::Arc;
173use std::sync::Mutex;
174use std::task::Context;
175use std::task::Poll;
176use std::task::RawWaker;
177use std::task::RawWakerVTable;
178use std::task::Waker;
179
180const FUTURE_STATUS_PENDING: u32 = 0;
181const FUTURE_STATUS_COMPLETE: u32 = 1;
182const FUTURE_STATUS_ERROR: u32 = 2;
183const FUTURE_STATUS_RUNNING: u32 = 3;
184
185const SEND_RESULT_WAIT: u32 = 0;
186const SEND_RESULT_SENT: u32 = 1;
187const SEND_RESULT_FINISHED: u32 = 2;
188
189pub use cxx_async_macro::bridge;
190
191#[doc(hidden)]
192pub use pin_utils::unsafe_pinned;
193
194// Replacements for macros that panic that are guaranteed to cause an abort, so that we don't unwind
195// across C++ frames.
196
197macro_rules! safe_panic {
198 ($($args:expr),*) => {
199 {
200 use ::std::io::Write;
201 drop(write!(::std::io::stderr(), $($args),*));
202 drop(writeln!(::std::io::stderr(), " at {}:{}", file!(), line!()));
203 ::std::process::abort();
204 }
205 }
206}
207
208#[cfg(debug_assertions)]
209macro_rules! safe_debug_assert {
210 ($cond:expr) => {{
211 use ::std::io::Write;
212 if !$cond {
213 drop(writeln!(
214 ::std::io::stderr(),
215 "assertion failed: {}",
216 stringify!($cond)
217 ));
218 ::std::process::abort();
219 }
220 }};
221}
222#[cfg(not(debug_assertions))]
223macro_rules! safe_debug_assert {
224 ($cond:expr) => {};
225}
226
227macro_rules! safe_unreachable {
228 () => {
229 safe_panic!("unreachable code executed")
230 };
231}
232
233trait SafeExpect {
234 type Output;
235 fn safe_expect(self, message: &str) -> Self::Output;
236}
237
238impl<T> SafeExpect for Option<T> {
239 type Output = T;
240 fn safe_expect(self, message: &str) -> T {
241 match self {
242 Some(value) => value,
243 None => safe_panic!("{}", message),
244 }
245 }
246}
247
248trait SafeUnwrap {
249 type Output;
250 fn safe_unwrap(self) -> Self::Output;
251}
252
253impl<T, E> SafeUnwrap for Result<T, E>
254where
255 E: Debug,
256{
257 type Output = T;
258 fn safe_unwrap(self) -> Self::Output {
259 match self {
260 Ok(value) => value,
261 Err(error) => safe_panic!("unexpected Result::Err: {:?}", error),
262 }
263 }
264}
265
266#[doc(hidden)]
267pub mod execlet;
268
269// Bridged glue functions.
270extern "C" {
271 fn cxxasync_suspended_coroutine_clone(waker_data: *mut u8) -> *mut u8;
272 fn cxxasync_suspended_coroutine_wake(waker_data: *mut u8);
273 fn cxxasync_suspended_coroutine_wake_by_ref(waker_data: *mut u8);
274 fn cxxasync_suspended_coroutine_drop(waker_data: *mut u8);
275}
276
277// A suspended C++ coroutine needs to act as a waker if it awaits a Rust future. This vtable
278// provides that glue.
279static CXXASYNC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
280 rust_suspended_coroutine_clone,
281 rust_suspended_coroutine_wake,
282 rust_suspended_coroutine_wake_by_ref,
283 rust_suspended_coroutine_drop,
284);
285
286/// Any exception that a C++ coroutine throws is automatically caught and converted into this error
287/// type.
288///
289/// This is just a wrapper around the result of `std::exception::what()`.
290#[derive(Debug)]
291pub struct CxxAsyncException {
292 what: Box<str>,
293}
294
295impl CxxAsyncException {
296 /// Creates a new exception with the given error message.
297 pub fn new(what: Box<str>) -> Self {
298 Self { what }
299 }
300
301 /// The value returned by `std::exception::what()`.
302 pub fn what(&self) -> &str {
303 &self.what
304 }
305}
306
307impl Display for CxxAsyncException {
308 fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
309 formatter.write_str(&self.what)
310 }
311}
312
313impl Error for CxxAsyncException {}
314
315/// A convenient shorthand for `Result<T, CxxAsyncException>`.
316pub type CxxAsyncResult<T> = Result<T, CxxAsyncException>;
317
318// A table of functions that the `bridge` macro emits for the C++ bridge to use.
319//
320// This must match the definition in `cxx_async.h`.
321#[repr(C)]
322#[doc(hidden)]
323pub struct CxxAsyncVtable {
324 pub channel: *mut u8,
325 pub sender_send: *mut u8,
326 pub sender_drop: *mut u8,
327 pub future_poll: *mut u8,
328 pub future_drop: *mut u8,
329}
330
331unsafe impl Send for CxxAsyncVtable {}
332unsafe impl Sync for CxxAsyncVtable {}
333
334// A sender/receiver pair for the return value of a wrapped one-shot C++ coroutine.
335//
336// This is an implementation detail and is not exposed to the programmer. It must match the
337// definition in `cxx_async.h`.
338#[repr(C)]
339#[doc(hidden)]
340pub struct CxxAsyncFutureChannel<Fut, Out> {
341 // The receiving end.
342 future: Fut,
343 // The sending end.
344 sender: CxxAsyncSender<Out>,
345}
346
347// A sender/receiver pair for the return value of a wrapped C++ multi-shot coroutine.
348//
349// This is an implementation detail and is not exposed to the programmer. It must match the
350// definition in `cxx_async.h`.
351#[repr(C)]
352#[doc(hidden)]
353pub struct CxxAsyncStreamChannel<Stm, Item>
354where
355 Stm: Stream<Item = CxxAsyncResult<Item>>,
356{
357 // The receiving end.
358 future: Stm,
359 // The sending end.
360 sender: CxxAsyncSender<Item>,
361}
362
363// The single-producer/single-consumer channel type that future and stream implementations use to
364// pass values between the two languages.
365//
366// We can't use a `futures::channel::mpsc` channel because it can deadlock. With a standard MPSC
367// channel, if we try to send a value when the buffer is full and the receiving end is woken up and
368// then tries to receive the value, a deadlock occurs, as the MPSC channel doesn't drop locks before
369// calling the waker.
370struct SpscChannel<T>(Arc<Mutex<SpscChannelImpl<T>>>);
371
372// Data for each SPSC channel.
373struct SpscChannelImpl<T> {
374 // The waker waiting on the value.
375 //
376 // This can either be the sending end waiting for the receiving end to receive a previously-sent
377 // value (for streams only) or the receiving end waiting for the sending end to post a value.
378 waiter: Option<Waker>,
379 // The value waiting to be read.
380 value: Option<T>,
381 // An exception from the C++ side that is to be delivered over to the Rust side.
382 exception: Option<CxxAsyncException>,
383 // True if the channel is closed; false otherwise.
384 closed: bool,
385}
386
387impl<T> SpscChannel<T> {
388 // Creates a new SPSC channel.
389 fn new() -> SpscChannel<T> {
390 SpscChannel(Arc::new(Mutex::new(SpscChannelImpl {
391 waiter: None,
392 value: None,
393 exception: None,
394 closed: false,
395 })))
396 }
397
398 // Marks the channel as closed. Only the sending end may call this.
399 fn close(&self) {
400 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
401 let waiter;
402 {
403 let mut this = self.0.lock().safe_unwrap();
404 if this.closed {
405 safe_panic!("Attempted to close an `SpscChannel` that's already closed!")
406 }
407 this.closed = true;
408 waiter = this.waiter.take();
409 }
410
411 if let Some(waiter) = waiter {
412 waiter.wake();
413 }
414 }
415
416 // Attempts to send a value. If this channel already has a value yet to be read, this function
417 // returns false. Otherwise, it calls the provided closure to retrieve the value and returns
418 // true.
419 //
420 // This callback-based design eliminates the requirement to return the original value if the
421 // send fails.
422 fn try_send_value_with<F>(&self, context: Option<&Context>, getter: F) -> bool
423 where
424 F: FnOnce() -> T,
425 {
426 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
427 let waiter;
428 {
429 let mut this = self.0.lock().safe_unwrap();
430 if this.value.is_none() {
431 this.value = Some(getter());
432 waiter = this.waiter.take();
433 } else if context.is_some() && this.waiter.is_some() {
434 safe_panic!("Only one task may block on a `SpscChannel`!")
435 } else {
436 if let Some(context) = context {
437 this.waiter = Some((*context.waker()).clone());
438 }
439 return false;
440 }
441 }
442
443 if let Some(waiter) = waiter {
444 waiter.wake();
445 }
446 true
447 }
448
449 // Raises an exception. This is synchronous and thus should never fail. It must only be called
450 // once, or not at all, for a given `SpscSender`.
451 fn send_exception(&self, exception: CxxAsyncException) {
452 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
453 let waiter = {
454 let mut this = self.0.lock().safe_unwrap();
455 safe_debug_assert!(this.exception.is_none());
456 this.exception = Some(exception);
457 this.waiter.take()
458 };
459
460 if let Some(waiter) = waiter {
461 waiter.wake();
462 }
463 }
464
465 // Attempts to receive a value. Returns `Poll::Pending` if no value is available and the channel
466 // isn't closed. Returns `Poll::Ready(None)` if the channel is closed. Otherwise, receives a
467 // value and returns `Poll::Ready(Some)`.
468 fn recv(&self, cx: &Context) -> Poll<Option<CxxAsyncResult<T>>> {
469 // Drop the lock before possibly calling the waiter because we could deadlock otherwise.
470 let (result, waiter);
471 {
472 let mut this = self.0.lock().safe_unwrap();
473 match this.value.take() {
474 Some(value) => {
475 result = Ok(value);
476 waiter = this.waiter.take();
477 }
478 None => match this.exception.take() {
479 Some(exception) => {
480 result = Err(exception);
481 waiter = this.waiter.take();
482 }
483 None if this.closed => return Poll::Ready(None),
484 None => {
485 this.waiter = Some((*cx.waker()).clone());
486 return Poll::Pending;
487 }
488 },
489 }
490 }
491
492 if let Some(waiter) = waiter {
493 waiter.wake();
494 }
495 Poll::Ready(Some(result))
496 }
497}
498
499impl<T> Clone for SpscChannel<T> {
500 fn clone(&self) -> Self {
501 Self(self.0.clone())
502 }
503}
504
505// The concrete type of the stream that wraps a C++ coroutine, either one-shot (future) or
506// multi-shot (stream).
507//
508// The programmer only interacts with this abstractly behind a `Box<dyn Future>` or
509// `Box<dyn Stream>` trait object, so this type is considered an implementation detail. It must be
510// public because the `bridge_stream` macro needs to name it.
511#[doc(hidden)]
512pub struct CxxAsyncReceiver<Item> {
513 // The SPSC channel to receive on.
514 receiver: SpscChannel<Item>,
515 // Any execlet that must be driven when receiving.
516 execlet: Option<Execlet>,
517}
518
519// The concrete type of the sending end of a stream.
520//
521// This must be public because the `bridge_stream` macro needs to name it.
522#[doc(hidden)]
523#[repr(transparent)]
524pub struct CxxAsyncSender<Item>(*mut SpscChannel<Item>);
525
526impl<Item> Drop for CxxAsyncSender<Item> {
527 fn drop(&mut self) {
528 unsafe { drop(Box::from_raw(self.0)) }
529 }
530}
531
532// This is a little weird in that `CxxAsyncReceiver` behaves as a oneshot if it's treated as a
533// Future and an SPSC stream if it's treated as a Stream. But since the programmer only ever
534// interacts with these objects behind boxed trait objects that only expose one of the two traits,
535// it's not a problem.
536impl<Item> Stream for CxxAsyncReceiver<Item> {
537 type Item = CxxAsyncResult<Item>;
538
539 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
540 if let Some(ref execlet) = self.execlet {
541 execlet.run(cx);
542 }
543 self.receiver.recv(cx)
544 }
545}
546
547impl<Output> Future for CxxAsyncReceiver<Output> {
548 type Output = CxxAsyncResult<Output>;
549
550 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
551 if let Some(ref execlet) = self.execlet {
552 execlet.run(cx);
553 }
554 match self.receiver.recv(cx) {
555 Poll::Ready(Some(Ok(value))) => Poll::Ready(Ok(value)),
556 Poll::Ready(Some(Err(exception))) => Poll::Ready(Err(exception)),
557 Poll::Ready(None) => {
558 // This should never happen, because a future should never be polled again after
559 // returning `Ready`.
560 safe_panic!("Attempted to use a stream as a future!")
561 }
562 Poll::Pending => Poll::Pending,
563 }
564 }
565}
566
567impl<Item> From<SpscChannel<Item>> for CxxAsyncReceiver<Item> {
568 fn from(receiver: SpscChannel<Item>) -> Self {
569 CxxAsyncReceiver {
570 receiver,
571 execlet: None,
572 }
573 }
574}
575
576impl<Item> Drop for CxxAsyncReceiver<Item> {
577 fn drop(&mut self) {
578 let execlet = match self.execlet {
579 Some(ref execlet) => execlet,
580 None => return,
581 };
582 let receiver = self.receiver.0.lock().safe_unwrap();
583 if receiver.closed {
584 return;
585 }
586 ExecletReaper::get().add((*execlet).clone());
587 }
588}
589
590// The sending end that the C++ bridge uses to return a value to a Rust future.
591//
592// This is an implementation detail.
593#[doc(hidden)]
594pub trait RustSender {
595 type Output;
596 fn send(&mut self, value: CxxAsyncResult<Self::Output>);
597}
598
599/// Wraps an arbitrary Rust Future in a boxed `cxx-async` future so that it can be returned to C++.
600///
601/// You should not need to implement this manually; it's automatically implemented by the `bridge`
602/// macro.
603pub trait IntoCxxAsyncFuture: Sized {
604 /// The type of the value yielded by the future.
605 type Output;
606
607 /// Wraps a Rust Future that directly returns the output type.
608 ///
609 /// Use this when you aren't interested in propagating errors to C++ as exceptions.
610 fn infallible<Fut>(future: Fut) -> Self
611 where
612 Fut: Future<Output = Self::Output> + Send + 'static,
613 {
614 Self::fallible(async move { Ok(future.await) })
615 }
616
617 /// Wraps a Rust Future that returns the output type, wrapped in a `CxxAsyncResult`.
618 ///
619 /// Use this when you have error values that you want to turn into exceptions on the C++ side.
620 fn fallible<Fut>(future: Fut) -> Self
621 where
622 Fut: Future<Output = CxxAsyncResult<Self::Output>> + Send + 'static;
623}
624
625/// Wraps an arbitrary Rust Stream in a boxed `cxx-async` stream so that it can be returned to C++.
626///
627/// You should not need to implement this manually; it's automatically implemented by the
628/// `bridge_stream` macro.
629pub trait IntoCxxAsyncStream: Sized {
630 /// The type of the values yielded by the stream.
631 type Item;
632
633 /// Wraps a Rust Stream that directly yields items of the output type.
634 ///
635 /// Use this when you aren't interested in propagating errors to C++ as exceptions.
636 fn infallible<Stm>(stream: Stm) -> Self
637 where
638 Stm: Stream<Item = Self::Item> + Send + 'static,
639 Stm::Item: 'static,
640 {
641 Self::fallible(stream.map(Ok))
642 }
643
644 /// Wraps a Rust Stream that yields items of the output type, wrapped in `CxxAsyncResult`s.
645 ///
646 /// Use this when you have error values that you want to turn into exceptions on the C++ side.
647 fn fallible<Stm>(stream: Stm) -> Self
648 where
649 Stm: Stream<Item = CxxAsyncResult<Self::Item>> + Send + 'static;
650}
651
652// Creates a new oneshot sender/receiver pair for a future.
653//
654// SAFETY: This is a raw FFI function called by our C++ code.
655//
656// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
657#[doc(hidden)]
658pub unsafe extern "C" fn future_channel<Fut, Out>(
659 out_oneshot: *mut CxxAsyncFutureChannel<Fut, Out>,
660 execlet: *mut RustExeclet,
661) where
662 Fut: From<CxxAsyncReceiver<Out>> + Future<Output = CxxAsyncResult<Out>>,
663{
664 let channel = SpscChannel::new();
665 let oneshot = CxxAsyncFutureChannel {
666 sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
667 future: CxxAsyncReceiver::<Out> {
668 receiver: channel,
669 execlet: Some(Execlet::from_raw_ref(execlet)),
670 }
671 .into(),
672 };
673 ptr::write(out_oneshot, oneshot);
674}
675
676// Creates a new multi-shot sender/receiver pair for a stream.
677//
678// SAFETY: This is a raw FFI function called by our C++ code.
679//
680// This needs an out pointer because of https://github.com/rust-lang/rust-bindgen/issues/778
681#[doc(hidden)]
682pub unsafe extern "C" fn stream_channel<Stm, Item>(
683 out_stream: *mut CxxAsyncStreamChannel<Stm, Item>,
684 execlet: *mut RustExeclet,
685) where
686 Stm: From<CxxAsyncReceiver<Item>> + Stream<Item = CxxAsyncResult<Item>>,
687{
688 let channel = SpscChannel::new();
689 let stream = CxxAsyncStreamChannel {
690 sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
691 future: CxxAsyncReceiver {
692 receiver: channel,
693 execlet: Some(Execlet::from_raw_ref(execlet)),
694 }
695 .into(),
696 };
697 ptr::write(out_stream, stream);
698}
699
700// C++ calls this to yield a value for a one-shot coroutine (future).
701//
702// SAFETY: This is a low-level function called by our C++ code.
703//
704// Takes ownership of the value. The caller must not call its destructor.
705//
706// This function always closes the channel and returns `SEND_RESULT_FINISHED`.
707//
708// If `status` is `FUTURE_STATUS_COMPLETE`, then the given value is sent; otherwise, if `status` is
709// `FUTURE_STATUS_ERROR`, `value` must point to an exception string. `FUTURE_STATUS_RUNNING` is
710// illegal, because that value is only for streams, not futures.
711//
712// The `waker_data` parameter should always be null, because a one-shot coroutine should never
713// block on yielding a value.
714//
715// Any errors when sending are dropped on the floor. This is the right behavior because futures
716// can be legally dropped in Rust to signal cancellation.
717#[doc(hidden)]
718pub unsafe extern "C" fn sender_future_send<Item>(
719 this: &mut CxxAsyncSender<Item>,
720 status: u32,
721 value: *const u8,
722 waker_data: *const u8,
723) -> u32 {
724 safe_debug_assert!(waker_data.is_null());
725
726 let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
727 match status {
728 FUTURE_STATUS_COMPLETE => {
729 // This is a one-shot sender, so sending must always succeed.
730 let sent = this.try_send_value_with(None, || ptr::read(value as *const Item));
731 safe_debug_assert!(sent);
732 }
733 FUTURE_STATUS_ERROR => this.send_exception(unpack_exception(value)),
734 _ => safe_unreachable!(),
735 }
736
737 this.close();
738 SEND_RESULT_FINISHED
739}
740
741// C++ calls this to yield a value for a multi-shot coroutine (stream).
742//
743// SAFETY: This is a low-level function called by our C++ code.
744//
745// Takes ownership of the value if and only if it was successfully sent (otherwise, leaves it
746// alone). The caller must not call its destructor on a successful send.
747//
748// This function returns `SEND_RESULT_SENT` if the value was successfully sent, `SEND_RESULT_WAIT`
749// if the value wasn't sent because another value was already in the slot (in which case the task
750// will need to go to sleep), and `SEND_RESULT_FINISHED` if the channel was closed.
751//
752// If `status` is `FUTURE_STATUS_COMPLETE`, then the channel is closed, the value is ignored, and
753// this function immediately returns `SEND_RESULT_FINISHED`. Note that this behavior is different
754// from `sender_future_send`, which actually sends the value if `status` is
755// `FUTURE_STATUS_COMPLETE`.
756//
757// If `waker_data` is present, this identifies the coroutine handle that will be awakened if the
758// channel is currently full.
759//
760// Any errors when sending are dropped on the floor. This is because futures can be legally dropped
761// in Rust to signal cancellation.
762#[doc(hidden)]
763pub unsafe extern "C" fn sender_stream_send<Item>(
764 this: &mut CxxAsyncSender<Item>,
765 status: u32,
766 value: *const u8,
767 waker_data: *const u8,
768) -> u32 {
769 let (waker, context);
770 if waker_data.is_null() {
771 context = None;
772 } else {
773 waker = Waker::from_raw(RawWaker::new(
774 waker_data as *const (),
775 &CXXASYNC_WAKER_VTABLE,
776 ));
777 context = Some(Context::from_waker(&waker));
778 }
779
780 let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
781 match status {
782 FUTURE_STATUS_COMPLETE => {
783 this.close();
784 SEND_RESULT_FINISHED
785 }
786 FUTURE_STATUS_RUNNING => {
787 let sent =
788 this.try_send_value_with(context.as_ref(), || ptr::read(value as *const Item));
789 if sent {
790 SEND_RESULT_SENT
791 } else {
792 SEND_RESULT_WAIT
793 }
794 }
795 FUTURE_STATUS_ERROR => {
796 this.send_exception(unpack_exception(value));
797 this.close();
798 SEND_RESULT_FINISHED
799 }
800 _ => safe_unreachable!(),
801 }
802}
803
804// C++ calls this to destroy a sender.
805//
806// SAFETY: This is a low-level function called by our C++ code.
807#[doc(hidden)]
808pub unsafe extern "C" fn sender_drop<Item>(_: CxxAsyncSender<Item>) {
809 // Destructor automatically runs.
810}
811
812unsafe fn unpack_exception(value: *const u8) -> CxxAsyncException {
813 let string = CStr::from_ptr(value as *const c_char);
814 CxxAsyncException::new(string.to_string_lossy().into_owned().into_boxed_str())
815}
816
817// C++ calls this to poll a wrapped Rust future.
818//
819// SAFETY:
820// * This is a low-level function called by our C++ code.
821// * `Pin<&mut Future>` is marked `#[repr(transparent)]`, so it's FFI-safe.
822// * We catch all panics inside `poll` so that they don't unwind into C++.
823#[doc(hidden)]
824pub unsafe extern "C" fn future_poll<Fut, Out>(
825 this: Pin<&mut Fut>,
826 result: *mut u8,
827 waker_data: *const u8,
828) -> u32
829where
830 Fut: Future<Output = CxxAsyncResult<Out>>,
831{
832 let waker = Waker::from_raw(RawWaker::new(
833 waker_data as *const (),
834 &CXXASYNC_WAKER_VTABLE,
835 ));
836
837 let result = panic::catch_unwind(AssertUnwindSafe(move || {
838 let mut context = Context::from_waker(&waker);
839 match this.poll(&mut context) {
840 Poll::Ready(Ok(value)) => {
841 ptr::write(result as *mut Out, value);
842 FUTURE_STATUS_COMPLETE
843 }
844 Poll::Ready(Err(error)) => {
845 let error = error.what().to_owned();
846 ptr::write(result as *mut String, error);
847 FUTURE_STATUS_ERROR
848 }
849 Poll::Pending => FUTURE_STATUS_PENDING,
850 }
851 }));
852
853 match result {
854 Ok(result) => result,
855 Err(error) => {
856 drop(writeln!(
857 io::stderr(),
858 "Rust async code panicked when awaited from C++: {:?}",
859 error
860 ));
861 process::abort();
862 }
863 }
864}
865
866// C++ calls this to drop a Rust future.
867//
868// SAFETY:
869// * This is a low-level function called by our C++ code.
870#[doc(hidden)]
871pub unsafe extern "C" fn future_drop<Fut>(future: *mut Fut) {
872 ptr::drop_in_place(future);
873}
874
875// Bumps the reference count on a suspended C++ coroutine.
876//
877// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
878unsafe fn rust_suspended_coroutine_clone(address: *const ()) -> RawWaker {
879 RawWaker::new(
880 cxxasync_suspended_coroutine_clone(address as *mut () as *mut u8) as *mut () as *const (),
881 &CXXASYNC_WAKER_VTABLE,
882 )
883}
884
885// Resumes a suspended C++ coroutine and decrements its reference count.
886//
887// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
888unsafe fn rust_suspended_coroutine_wake(address: *const ()) {
889 cxxasync_suspended_coroutine_wake(address as *mut () as *mut u8)
890}
891
892// Resumes a suspended C++ coroutine without decrementing its reference count.
893//
894// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
895unsafe fn rust_suspended_coroutine_wake_by_ref(address: *const ()) {
896 cxxasync_suspended_coroutine_wake_by_ref(address as *mut () as *mut u8)
897}
898
899// Decrements the reference count on a suspended C++ coroutine.
900//
901// SAFETY: This is a raw FFI function called by the currently-running Rust executor.
902unsafe fn rust_suspended_coroutine_drop(address: *const ()) {
903 cxxasync_suspended_coroutine_drop(address as *mut () as *mut u8)
904}
905
906// Reexports for the `#[bridge]` macro to use internally. Users of this crate shouldn't use these;
907// they should import the `futures` crate directly.
908#[doc(hidden)]
909pub mod private {
910 pub use futures::future::BoxFuture;
911 pub use futures::stream::BoxStream;
912}