hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	future::Future,
	pin::Pin,
	sync::{Arc, Condvar, Mutex},
	task::{Context, Waker},
};

use futures::task::ArcWake;

use super::{ffi_result::FFIResult, FFIResultStatus};

pub struct FFIFuture {
	inner: Mutex<Option<FFIFutureInner>>,
}

pub struct FFIFutureInner {
	future: Pin<Box<dyn Future<Output = FFIResult> + Send + Sync>>,
	waker: Option<Waker>,
}

impl FFIFuture {
	pub(super) fn new(future: impl Future<Output = FFIResult> + Send + Sync + 'static) -> Arc<FFIFuture> {
		Arc::new(FFIFuture { inner: Mutex::new(Some(FFIFutureInner { future: Box::pin(future), waker: None })) })
	}

	pub(super) fn close(self: &Arc<Self>) {
		let mut inner = self.inner.lock().unwrap();
		if let Some(mut inner) = inner.take() {
			if let Some(waker) = inner.waker.take() {
				waker.wake();
			}
		}
	}

	pub(super) fn empty() -> Arc<FFIFuture> {
		Arc::new(FFIFuture { inner: Mutex::new(None) })
	}

	fn poll(ffi_future: &Arc<FFIFuture>, new_waker: Arc<CondvarWaiter>) -> FFIResult {
		let mut inner = ffi_future.inner.lock().unwrap();
		if let Some(FFIFutureInner { ref mut future, ref mut waker }) = inner.as_mut() {
			*waker = Some(futures::task::waker(new_waker));
			let mut cx = Context::from_waker(waker.as_ref().unwrap());
			match future.as_mut().poll(&mut cx) {
				std::task::Poll::Ready(result) => {
					*waker = None;
					result
				}
				std::task::Poll::Pending => FFIResult::pending(),
			}
		} else {
			FFIResult::end_of_stream()
		}
	}
}

#[no_mangle]
pub extern "C" fn hakuban_future_clone(ffi_future: *mut Arc<FFIFuture>) -> *mut Arc<FFIFuture> {
	let ffi_future = unsafe { ffi_future.as_mut().unwrap() };
	Box::into_raw(Box::new(ffi_future.clone()))
}

struct CondvarWaiter {
	woken: Mutex<bool>,
	condvar: Condvar,
}

impl ArcWake for CondvarWaiter {
	fn wake_by_ref(arc_self: &Arc<Self>) {
		let mut woken = arc_self.woken.lock().unwrap();
		*woken = true;
		arc_self.condvar.notify_one();
	}
}

#[no_mangle]
pub extern "C" fn hakuban_future_await(ffi_future: *mut Arc<FFIFuture>) -> FFIResult {
	let ffi_future = unsafe { Box::from_raw(ffi_future) };
	loop {
		let waker = Arc::new(CondvarWaiter { woken: Mutex::new(false), condvar: Condvar::new() });
		let result = FFIFuture::poll(&ffi_future, waker.clone());
		if result.status != FFIResultStatus::Pending {
			return result;
		};
		let waiter_mutex_lock = waker.woken.lock().unwrap();
		let mut woken = waker.condvar.wait(waiter_mutex_lock).unwrap();
		*woken = false;
	}
}

#[no_mangle]
pub extern "C" fn hakuban_future_drop(ffi_future: *mut Arc<FFIFuture>) {
	let ffi_future = unsafe { Box::from_raw(ffi_future) };
	ffi_future.close();
}