use std::ffi::c_void;
use std::future::Future;
use std::pin::Pin;
use std::ptr;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, Weak,
};
use std::task::{Context, Poll};
use futures_util::stream::{FuturesUnordered, Stream};
use libc::c_int;
use super::error::hyper_code;
use super::UserDataPointer;
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
type BoxAny = Box<dyn AsTaskType + Send + Sync>;
pub const HYPER_POLL_READY: c_int = 0;
pub const HYPER_POLL_PENDING: c_int = 1;
pub const HYPER_POLL_ERROR: c_int = 3;
pub struct hyper_executor {
driver: Mutex<FuturesUnordered<TaskFuture>>,
spawn_queue: Mutex<Vec<TaskFuture>>,
is_woken: Arc<ExecWaker>,
}
#[derive(Clone)]
pub(crate) struct WeakExec(Weak<hyper_executor>);
struct ExecWaker(AtomicBool);
pub struct hyper_task {
future: BoxFuture<BoxAny>,
output: Option<BoxAny>,
userdata: UserDataPointer,
}
struct TaskFuture {
task: Option<Box<hyper_task>>,
}
pub struct hyper_context<'a>(Context<'a>);
pub struct hyper_waker {
waker: std::task::Waker,
}
#[repr(C)]
pub enum hyper_task_return_type {
HYPER_TASK_EMPTY,
HYPER_TASK_ERROR,
HYPER_TASK_CLIENTCONN,
HYPER_TASK_RESPONSE,
HYPER_TASK_BUF,
}
pub(crate) unsafe trait AsTaskType {
fn as_task_type(&self) -> hyper_task_return_type;
}
pub(crate) trait IntoDynTaskType {
fn into_dyn_task_type(self) -> BoxAny;
}
impl hyper_executor {
fn new() -> Arc<hyper_executor> {
Arc::new(hyper_executor {
driver: Mutex::new(FuturesUnordered::new()),
spawn_queue: Mutex::new(Vec::new()),
is_woken: Arc::new(ExecWaker(AtomicBool::new(false))),
})
}
pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec {
WeakExec(Arc::downgrade(exec))
}
fn spawn(&self, task: Box<hyper_task>) {
self.spawn_queue
.lock()
.unwrap()
.push(TaskFuture { task: Some(task) });
}
fn poll_next(&self) -> Option<Box<hyper_task>> {
self.drain_queue();
let waker = futures_util::task::waker_ref(&self.is_woken);
let mut cx = Context::from_waker(&waker);
loop {
match Pin::new(&mut *self.driver.lock().unwrap()).poll_next(&mut cx) {
Poll::Ready(val) => return val,
Poll::Pending => {
if self.drain_queue() {
continue;
}
if self.is_woken.0.swap(false, Ordering::SeqCst) {
continue;
}
return None;
}
}
}
}
fn drain_queue(&self) -> bool {
let mut queue = self.spawn_queue.lock().unwrap();
if queue.is_empty() {
return false;
}
let driver = self.driver.lock().unwrap();
for task in queue.drain(..) {
driver.push(task);
}
true
}
}
impl futures_util::task::ArcWake for ExecWaker {
fn wake_by_ref(me: &Arc<ExecWaker>) {
me.0.store(true, Ordering::SeqCst);
}
}
impl WeakExec {
pub(crate) fn new() -> Self {
WeakExec(Weak::new())
}
}
impl crate::rt::Executor<BoxFuture<()>> for WeakExec {
fn execute(&self, fut: BoxFuture<()>) {
if let Some(exec) = self.0.upgrade() {
exec.spawn(hyper_task::boxed(fut));
}
}
}
ffi_fn! {
fn hyper_executor_new() -> *const hyper_executor {
Arc::into_raw(hyper_executor::new())
} ?= ptr::null()
}
ffi_fn! {
fn hyper_executor_free(exec: *const hyper_executor) {
drop(non_null!(Arc::from_raw(exec) ?= ()));
}
}
ffi_fn! {
fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code {
let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG);
let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG);
exec.spawn(task);
hyper_code::HYPERE_OK
}
}
ffi_fn! {
fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task {
let exec = non_null!(&*exec ?= ptr::null_mut());
match exec.poll_next() {
Some(task) => Box::into_raw(task),
None => ptr::null_mut(),
}
} ?= ptr::null_mut()
}
impl hyper_task {
pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task>
where
F: Future + Send + 'static,
F::Output: IntoDynTaskType + Send + Sync + 'static,
{
Box::new(hyper_task {
future: Box::pin(async move { fut.await.into_dyn_task_type() }),
output: None,
userdata: UserDataPointer(ptr::null_mut()),
})
}
fn output_type(&self) -> hyper_task_return_type {
match self.output {
None => hyper_task_return_type::HYPER_TASK_EMPTY,
Some(ref val) => val.as_task_type(),
}
}
}
impl Future for TaskFuture {
type Output = Box<hyper_task>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) {
Poll::Ready(val) => {
let mut task = self.task.take().unwrap();
task.output = Some(val);
Poll::Ready(task)
}
Poll::Pending => Poll::Pending,
}
}
}
ffi_fn! {
fn hyper_task_free(task: *mut hyper_task) {
drop(non_null!(Box::from_raw(task) ?= ()));
}
}
ffi_fn! {
fn hyper_task_value(task: *mut hyper_task) -> *mut c_void {
let task = non_null!(&mut *task ?= ptr::null_mut());
if let Some(val) = task.output.take() {
let p = Box::into_raw(val) as *mut c_void;
if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() {
ptr::null_mut()
} else {
p
}
} else {
ptr::null_mut()
}
} ?= ptr::null_mut()
}
ffi_fn! {
fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type {
non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type()
}
}
ffi_fn! {
fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) {
if task.is_null() {
return;
}
unsafe { (*task).userdata = UserDataPointer(userdata) };
}
}
ffi_fn! {
fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void {
non_null!(&*task ?= ptr::null_mut()).userdata.0
} ?= ptr::null_mut()
}
unsafe impl AsTaskType for () {
fn as_task_type(&self) -> hyper_task_return_type {
hyper_task_return_type::HYPER_TASK_EMPTY
}
}
unsafe impl AsTaskType for crate::Error {
fn as_task_type(&self) -> hyper_task_return_type {
hyper_task_return_type::HYPER_TASK_ERROR
}
}
impl<T> IntoDynTaskType for T
where
T: AsTaskType + Send + Sync + 'static,
{
fn into_dyn_task_type(self) -> BoxAny {
Box::new(self)
}
}
impl<T> IntoDynTaskType for crate::Result<T>
where
T: IntoDynTaskType + Send + Sync + 'static,
{
fn into_dyn_task_type(self) -> BoxAny {
match self {
Ok(val) => val.into_dyn_task_type(),
Err(err) => Box::new(err),
}
}
}
impl<T> IntoDynTaskType for Option<T>
where
T: IntoDynTaskType + Send + Sync + 'static,
{
fn into_dyn_task_type(self) -> BoxAny {
match self {
Some(val) => val.into_dyn_task_type(),
None => ().into_dyn_task_type(),
}
}
}
impl hyper_context<'_> {
pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> {
unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) }
}
}
ffi_fn! {
fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker {
let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone();
Box::into_raw(Box::new(hyper_waker { waker }))
} ?= ptr::null_mut()
}
ffi_fn! {
fn hyper_waker_free(waker: *mut hyper_waker) {
drop(non_null!(Box::from_raw(waker) ?= ()));
}
}
ffi_fn! {
fn hyper_waker_wake(waker: *mut hyper_waker) {
let waker = non_null!(Box::from_raw(waker) ?= ());
waker.waker.wake();
}
}