#![warn(missing_docs)]
extern crate link_cplusplus;
use crate::execlet::Execlet;
use crate::execlet::ExecletReaper;
use crate::execlet::RustExeclet;
use futures::Stream;
use futures::StreamExt;
use std::convert::From;
use std::error::Error;
use std::ffi::CStr;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fmt::Result as FmtResult;
use std::future::Future;
use std::io;
use std::io::Write;
use std::os::raw::c_char;
use std::panic;
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::process;
use std::ptr;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use std::task::RawWaker;
use std::task::RawWakerVTable;
use std::task::Waker;
const FUTURE_STATUS_PENDING: u32 = 0;
const FUTURE_STATUS_COMPLETE: u32 = 1;
const FUTURE_STATUS_ERROR: u32 = 2;
const FUTURE_STATUS_RUNNING: u32 = 3;
const SEND_RESULT_WAIT: u32 = 0;
const SEND_RESULT_SENT: u32 = 1;
const SEND_RESULT_FINISHED: u32 = 2;
pub use cxx_async_macro::bridge;
macro_rules! safe_panic {
($($args:expr),*) => {
{
use ::std::io::Write;
drop(write!(::std::io::stderr(), $($args),*));
drop(writeln!(::std::io::stderr(), " at {}:{}", file!(), line!()));
::std::process::abort();
}
}
}
#[cfg(debug_assertions)]
macro_rules! safe_debug_assert {
($cond:expr) => {{
use ::std::io::Write;
if !$cond {
drop(writeln!(
::std::io::stderr(),
"assertion failed: {}",
stringify!($cond)
));
::std::process::abort();
}
}};
}
#[cfg(not(debug_assertions))]
macro_rules! safe_debug_assert {
($cond:expr) => {};
}
macro_rules! safe_unreachable {
() => {
safe_panic!("unreachable code executed")
};
}
trait SafeExpect {
type Output;
fn safe_expect(self, message: &str) -> Self::Output;
}
impl<T> SafeExpect for Option<T> {
type Output = T;
fn safe_expect(self, message: &str) -> T {
match self {
Some(value) => value,
None => safe_panic!("{}", message),
}
}
}
trait SafeUnwrap {
type Output;
fn safe_unwrap(self) -> Self::Output;
}
impl<T, E> SafeUnwrap for Result<T, E>
where
E: Debug,
{
type Output = T;
fn safe_unwrap(self) -> Self::Output {
match self {
Ok(value) => value,
Err(error) => safe_panic!("unexpected Result::Err: {:?}", error),
}
}
}
#[doc(hidden)]
pub mod execlet;
extern "C" {
fn cxxasync_suspended_coroutine_clone(waker_data: *mut u8) -> *mut u8;
fn cxxasync_suspended_coroutine_wake(waker_data: *mut u8);
fn cxxasync_suspended_coroutine_wake_by_ref(waker_data: *mut u8);
fn cxxasync_suspended_coroutine_drop(waker_data: *mut u8);
}
static CXXASYNC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
rust_suspended_coroutine_clone,
rust_suspended_coroutine_wake,
rust_suspended_coroutine_wake_by_ref,
rust_suspended_coroutine_drop,
);
#[derive(Debug)]
pub struct CxxAsyncException {
what: Box<str>,
}
impl CxxAsyncException {
pub fn new(what: Box<str>) -> Self {
Self { what }
}
pub fn what(&self) -> &str {
&self.what
}
}
impl Display for CxxAsyncException {
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
formatter.write_str(&self.what)
}
}
impl Error for CxxAsyncException {}
pub type CxxAsyncResult<T> = Result<T, CxxAsyncException>;
#[repr(C)]
#[doc(hidden)]
pub struct CxxAsyncVtable {
pub channel: *mut u8,
pub sender_send: *mut u8,
pub sender_drop: *mut u8,
pub future_poll: *mut u8,
pub future_drop: *mut u8,
}
unsafe impl Send for CxxAsyncVtable {}
unsafe impl Sync for CxxAsyncVtable {}
#[repr(C)]
#[doc(hidden)]
pub struct CxxAsyncFutureChannel<Fut, Out> {
future: Fut,
sender: CxxAsyncSender<Out>,
}
#[repr(C)]
#[doc(hidden)]
pub struct CxxAsyncStreamChannel<Stm, Item>
where
Stm: Stream<Item = CxxAsyncResult<Item>>,
{
future: Stm,
sender: CxxAsyncSender<Item>,
}
struct SpscChannel<T>(Arc<Mutex<SpscChannelImpl<T>>>);
struct SpscChannelImpl<T> {
waiter: Option<Waker>,
value: Option<T>,
exception: Option<CxxAsyncException>,
closed: bool,
}
impl<T> SpscChannel<T> {
fn new() -> SpscChannel<T> {
SpscChannel(Arc::new(Mutex::new(SpscChannelImpl {
waiter: None,
value: None,
exception: None,
closed: false,
})))
}
fn close(&self) {
let waiter;
{
let mut this = self.0.lock().safe_unwrap();
if this.closed {
safe_panic!("Attempted to close an `SpscChannel` that's already closed!")
}
this.closed = true;
waiter = this.waiter.take();
}
if let Some(waiter) = waiter {
waiter.wake();
}
}
fn try_send_value_with<F>(&self, context: Option<&Context>, getter: F) -> bool
where
F: FnOnce() -> T,
{
let waiter;
{
let mut this = self.0.lock().safe_unwrap();
if this.value.is_none() {
this.value = Some(getter());
waiter = this.waiter.take();
} else if context.is_some() && this.waiter.is_some() {
safe_panic!("Only one task may block on a `SpscChannel`!")
} else {
if let Some(context) = context {
this.waiter = Some((*context.waker()).clone());
}
return false;
}
}
if let Some(waiter) = waiter {
waiter.wake();
}
true
}
fn send_exception(&self, exception: CxxAsyncException) {
let waiter = {
let mut this = self.0.lock().safe_unwrap();
safe_debug_assert!(this.exception.is_none());
this.exception = Some(exception);
this.waiter.take()
};
if let Some(waiter) = waiter {
waiter.wake();
}
}
fn recv(&self, cx: &Context) -> Poll<Option<CxxAsyncResult<T>>> {
let (result, waiter);
{
let mut this = self.0.lock().safe_unwrap();
match this.value.take() {
Some(value) => {
result = Ok(value);
waiter = this.waiter.take();
}
None => match this.exception.take() {
Some(exception) => {
result = Err(exception);
waiter = this.waiter.take();
}
None if this.closed => return Poll::Ready(None),
None => {
this.waiter = Some((*cx.waker()).clone());
return Poll::Pending;
}
},
}
}
if let Some(waiter) = waiter {
waiter.wake();
}
Poll::Ready(Some(result))
}
}
impl<T> Clone for SpscChannel<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[doc(hidden)]
pub struct CxxAsyncReceiver<Item> {
receiver: SpscChannel<Item>,
execlet: Option<Execlet>,
}
#[doc(hidden)]
#[repr(transparent)]
pub struct CxxAsyncSender<Item>(*mut SpscChannel<Item>);
impl<Item> Drop for CxxAsyncSender<Item> {
fn drop(&mut self) {
unsafe { drop(Box::from_raw(self.0)) }
}
}
impl<Item> Stream for CxxAsyncReceiver<Item> {
type Item = CxxAsyncResult<Item>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(ref execlet) = self.execlet {
execlet.run(cx);
}
self.receiver.recv(cx)
}
}
impl<Output> Future for CxxAsyncReceiver<Output> {
type Output = CxxAsyncResult<Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref execlet) = self.execlet {
execlet.run(cx);
}
match self.receiver.recv(cx) {
Poll::Ready(Some(Ok(value))) => Poll::Ready(Ok(value)),
Poll::Ready(Some(Err(exception))) => Poll::Ready(Err(exception)),
Poll::Ready(None) => {
safe_panic!("Attempted to use a stream as a future!")
}
Poll::Pending => Poll::Pending,
}
}
}
impl<Item> From<SpscChannel<Item>> for CxxAsyncReceiver<Item> {
fn from(receiver: SpscChannel<Item>) -> Self {
CxxAsyncReceiver {
receiver,
execlet: None,
}
}
}
impl<Item> Drop for CxxAsyncReceiver<Item> {
fn drop(&mut self) {
let execlet = match self.execlet {
Some(ref execlet) => execlet,
None => return,
};
let receiver = self.receiver.0.lock().safe_unwrap();
if receiver.closed {
return;
}
ExecletReaper::get().add((*execlet).clone());
}
}
#[doc(hidden)]
pub trait RustSender {
type Output;
fn send(&mut self, value: CxxAsyncResult<Self::Output>);
}
pub trait IntoCxxAsyncFuture: Sized {
type Output;
fn infallible<Fut>(future: Fut) -> Self
where
Fut: Future<Output = Self::Output> + Send + 'static,
{
Self::fallible(async move { Ok(future.await) })
}
fn fallible<Fut>(future: Fut) -> Self
where
Fut: Future<Output = CxxAsyncResult<Self::Output>> + Send + 'static;
}
pub trait IntoCxxAsyncStream: Sized {
type Item;
fn infallible<Stm>(stream: Stm) -> Self
where
Stm: Stream<Item = Self::Item> + Send + 'static,
Stm::Item: 'static,
{
Self::fallible(stream.map(Ok))
}
fn fallible<Stm>(stream: Stm) -> Self
where
Stm: Stream<Item = CxxAsyncResult<Self::Item>> + Send + 'static;
}
#[doc(hidden)]
pub unsafe extern "C" fn future_channel<Fut, Out>(
out_oneshot: *mut CxxAsyncFutureChannel<Fut, Out>,
execlet: *mut RustExeclet,
) where
Fut: From<CxxAsyncReceiver<Out>> + Future<Output = CxxAsyncResult<Out>>,
{
let channel = SpscChannel::new();
let oneshot = CxxAsyncFutureChannel {
sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
future: CxxAsyncReceiver::<Out> {
receiver: channel,
execlet: Some(Execlet::from_raw_ref(execlet)),
}
.into(),
};
ptr::write(out_oneshot, oneshot);
}
#[doc(hidden)]
pub unsafe extern "C" fn stream_channel<Stm, Item>(
out_stream: *mut CxxAsyncStreamChannel<Stm, Item>,
execlet: *mut RustExeclet,
) where
Stm: From<CxxAsyncReceiver<Item>> + Stream<Item = CxxAsyncResult<Item>>,
{
let channel = SpscChannel::new();
let stream = CxxAsyncStreamChannel {
sender: CxxAsyncSender(Box::into_raw(Box::new(channel.clone()))),
future: CxxAsyncReceiver {
receiver: channel,
execlet: Some(Execlet::from_raw_ref(execlet)),
}
.into(),
};
ptr::write(out_stream, stream);
}
#[doc(hidden)]
pub unsafe extern "C" fn sender_future_send<Item>(
this: &mut CxxAsyncSender<Item>,
status: u32,
value: *const u8,
waker_data: *const u8,
) -> u32 {
safe_debug_assert!(waker_data.is_null());
let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
match status {
FUTURE_STATUS_COMPLETE => {
let sent = this.try_send_value_with(None, || ptr::read(value as *const Item));
safe_debug_assert!(sent);
}
FUTURE_STATUS_ERROR => this.send_exception(unpack_exception(value)),
_ => safe_unreachable!(),
}
this.close();
SEND_RESULT_FINISHED
}
#[doc(hidden)]
pub unsafe extern "C" fn sender_stream_send<Item>(
this: &mut CxxAsyncSender<Item>,
status: u32,
value: *const u8,
waker_data: *const u8,
) -> u32 {
let (waker, context);
if waker_data.is_null() {
context = None;
} else {
waker = Waker::from_raw(RawWaker::new(
waker_data as *const (),
&CXXASYNC_WAKER_VTABLE,
));
context = Some(Context::from_waker(&waker));
}
let this = this.0.as_mut().safe_expect("Where's the SPSC sender?");
match status {
FUTURE_STATUS_COMPLETE => {
this.close();
SEND_RESULT_FINISHED
}
FUTURE_STATUS_RUNNING => {
let sent =
this.try_send_value_with(context.as_ref(), || ptr::read(value as *const Item));
if sent {
SEND_RESULT_SENT
} else {
SEND_RESULT_WAIT
}
}
FUTURE_STATUS_ERROR => {
this.send_exception(unpack_exception(value));
this.close();
SEND_RESULT_FINISHED
}
_ => safe_unreachable!(),
}
}
#[doc(hidden)]
pub unsafe extern "C" fn sender_drop<Item>(_: CxxAsyncSender<Item>) {
}
unsafe fn unpack_exception(value: *const u8) -> CxxAsyncException {
let string = CStr::from_ptr(value as *const c_char);
CxxAsyncException::new(string.to_string_lossy().into_owned().into_boxed_str())
}
#[doc(hidden)]
pub unsafe extern "C" fn future_poll<Fut, Out>(
this: Pin<&mut Fut>,
result: *mut u8,
waker_data: *const u8,
) -> u32
where
Fut: Future<Output = CxxAsyncResult<Out>>,
{
let waker = Waker::from_raw(RawWaker::new(
waker_data as *const (),
&CXXASYNC_WAKER_VTABLE,
));
let result = panic::catch_unwind(AssertUnwindSafe(move || {
let mut context = Context::from_waker(&waker);
match this.poll(&mut context) {
Poll::Ready(Ok(value)) => {
ptr::write(result as *mut Out, value);
FUTURE_STATUS_COMPLETE
}
Poll::Ready(Err(error)) => {
let error = error.what().to_owned();
ptr::write(result as *mut String, error);
FUTURE_STATUS_ERROR
}
Poll::Pending => FUTURE_STATUS_PENDING,
}
}));
match result {
Ok(result) => result,
Err(error) => {
drop(writeln!(
io::stderr(),
"Rust async code panicked when awaited from C++: {error:?}"
));
process::abort();
}
}
}
#[doc(hidden)]
pub unsafe extern "C" fn future_drop<Fut>(future: *mut Fut) {
ptr::drop_in_place(future);
}
unsafe fn rust_suspended_coroutine_clone(address: *const ()) -> RawWaker {
RawWaker::new(
cxxasync_suspended_coroutine_clone(address as *mut () as *mut u8) as *mut () as *const (),
&CXXASYNC_WAKER_VTABLE,
)
}
unsafe fn rust_suspended_coroutine_wake(address: *const ()) {
cxxasync_suspended_coroutine_wake(address as *mut () as *mut u8)
}
unsafe fn rust_suspended_coroutine_wake_by_ref(address: *const ()) {
cxxasync_suspended_coroutine_wake_by_ref(address as *mut () as *mut u8)
}
unsafe fn rust_suspended_coroutine_drop(address: *const ()) {
cxxasync_suspended_coroutine_drop(address as *mut () as *mut u8)
}
#[doc(hidden)]
pub mod private {
pub use futures::future::BoxFuture;
pub use futures::stream::BoxStream;
}