use std::{
future::Future,
pin::Pin,
sync::{Arc, Condvar, Mutex},
task::{Context, Waker},
};
use futures::task::ArcWake;
use super::{ffi_result::FFIResult, FFIResultStatus};
pub struct FFIFuture {
inner: Mutex<Option<FFIFutureInner>>,
}
pub struct FFIFutureInner {
future: Pin<Box<dyn Future<Output = FFIResult> + Send + Sync>>,
waker: Option<Waker>,
}
impl FFIFuture {
pub(super) fn new(future: impl Future<Output = FFIResult> + Send + Sync + 'static) -> Arc<FFIFuture> {
Arc::new(FFIFuture { inner: Mutex::new(Some(FFIFutureInner { future: Box::pin(future), waker: None })) })
}
pub(super) fn close(self: &Arc<Self>) {
let mut inner = self.inner.lock().unwrap();
if let Some(mut inner) = inner.take() {
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
}
pub(super) fn empty() -> Arc<FFIFuture> {
Arc::new(FFIFuture { inner: Mutex::new(None) })
}
fn poll(ffi_future: &Arc<FFIFuture>, new_waker: Arc<CondvarWaiter>) -> FFIResult {
let mut inner = ffi_future.inner.lock().unwrap();
if let Some(FFIFutureInner { ref mut future, ref mut waker }) = inner.as_mut() {
*waker = Some(futures::task::waker(new_waker));
let mut cx = Context::from_waker(waker.as_ref().unwrap());
match future.as_mut().poll(&mut cx) {
std::task::Poll::Ready(result) => {
*waker = None;
result
}
std::task::Poll::Pending => FFIResult::pending(),
}
} else {
FFIResult::end_of_stream()
}
}
}
#[no_mangle]
pub extern "C" fn hakuban_future_clone(ffi_future: *mut Arc<FFIFuture>) -> *mut Arc<FFIFuture> {
let ffi_future = unsafe { ffi_future.as_mut().unwrap() };
Box::into_raw(Box::new(ffi_future.clone()))
}
struct CondvarWaiter {
woken: Mutex<bool>,
condvar: Condvar,
}
impl ArcWake for CondvarWaiter {
fn wake_by_ref(arc_self: &Arc<Self>) {
let mut woken = arc_self.woken.lock().unwrap();
*woken = true;
arc_self.condvar.notify_one();
}
}
#[no_mangle]
pub extern "C" fn hakuban_future_await(ffi_future: *mut Arc<FFIFuture>) -> FFIResult {
let ffi_future = unsafe { Box::from_raw(ffi_future) };
loop {
let waker = Arc::new(CondvarWaiter { woken: Mutex::new(false), condvar: Condvar::new() });
let result = FFIFuture::poll(&ffi_future, waker.clone());
if result.status != FFIResultStatus::Pending {
return result;
};
let waiter_mutex_lock = waker.woken.lock().unwrap();
let mut woken = waker.condvar.wait(waiter_mutex_lock).unwrap();
*woken = false;
}
}
#[no_mangle]
pub extern "C" fn hakuban_future_drop(ffi_future: *mut Arc<FFIFuture>) {
let ffi_future = unsafe { Box::from_raw(ffi_future) };
ffi_future.close();
}