use std::ffi::c_void;
use std::mem::ManuallyDrop;
use std::ptr;
use std::task::{Context, Poll};
use http::HeaderMap;
use libc::{c_int, size_t};
use super::task::{fluxio_context, fluxio_task, fluxio_task_return_type, AsTaskType};
use super::{UserDataPointer, FLUXIO_ITER_CONTINUE};
use crate::body::{Body, Bytes, HttpBody as _};
pub struct fluxio_body(pub(super) Body);
pub struct fluxio_buf(pub(crate) Bytes);
pub(crate) struct UserBody {
data_func: fluxio_body_data_callback,
userdata: *mut c_void,
}
type fluxio_body_foreach_callback = extern "C" fn(*mut c_void, *const fluxio_buf) -> c_int;
type fluxio_body_data_callback =
extern "C" fn(*mut c_void, *mut fluxio_context<'_>, *mut *mut fluxio_buf) -> c_int;
ffi_fn! {
fn fluxio_body_new() -> *mut fluxio_body {
Box::into_raw(Box::new(fluxio_body(Body::empty())))
} ?= ptr::null_mut()
}
ffi_fn! {
fn fluxio_body_free(body: *mut fluxio_body) {
drop(non_null!(Box::from_raw(body) ?= ()));
}
}
ffi_fn! {
fn fluxio_body_data(body: *mut fluxio_body) -> *mut fluxio_task {
let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));
Box::into_raw(fluxio_task::boxed(async move {
body.0.data().await.map(|res| res.map(fluxio_buf))
}))
} ?= ptr::null_mut()
}
ffi_fn! {
fn fluxio_body_foreach(body: *mut fluxio_body, func: fluxio_body_foreach_callback, userdata: *mut c_void) -> *mut fluxio_task {
let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
let userdata = UserDataPointer(userdata);
Box::into_raw(fluxio_task::boxed(async move {
while let Some(item) = body.0.data().await {
let chunk = item?;
if FLUXIO_ITER_CONTINUE != func(userdata.0, &fluxio_buf(chunk)) {
return Err(crate::Error::new_user_aborted_by_callback());
}
}
Ok(())
}))
} ?= ptr::null_mut()
}
ffi_fn! {
fn fluxio_body_set_userdata(body: *mut fluxio_body, userdata: *mut c_void) {
let b = non_null!(&mut *body ?= ());
b.0.as_ffi_mut().userdata = userdata;
}
}
ffi_fn! {
fn fluxio_body_set_data_func(body: *mut fluxio_body, func: fluxio_body_data_callback) {
let b = non_null!{ &mut *body ?= () };
b.0.as_ffi_mut().data_func = func;
}
}
impl UserBody {
pub(crate) fn new() -> UserBody {
UserBody {
data_func: data_noop,
userdata: std::ptr::null_mut(),
}
}
pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
let mut out = std::ptr::null_mut();
match (self.data_func)(self.userdata, fluxio_context::wrap(cx), &mut out) {
super::task::FLUXIO_POLL_READY => {
if out.is_null() {
Poll::Ready(None)
} else {
let buf = unsafe { Box::from_raw(out) };
Poll::Ready(Some(Ok(buf.0)))
}
}
super::task::FLUXIO_POLL_PENDING => Poll::Pending,
super::task::FLUXIO_POLL_ERROR => {
Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
}
unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
"unexpected fluxio_body_data_func return code {}",
unexpected
))))),
}
}
pub(crate) fn poll_trailers(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<crate::Result<Option<HeaderMap>>> {
Poll::Ready(Ok(None))
}
}
extern "C" fn data_noop(
_userdata: *mut c_void,
_: *mut fluxio_context<'_>,
_: *mut *mut fluxio_buf,
) -> c_int {
super::task::FLUXIO_POLL_READY
}
unsafe impl Send for UserBody {}
unsafe impl Sync for UserBody {}
ffi_fn! {
fn fluxio_buf_copy(buf: *const u8, len: size_t) -> *mut fluxio_buf {
let slice = unsafe {
std::slice::from_raw_parts(buf, len)
};
Box::into_raw(Box::new(fluxio_buf(Bytes::copy_from_slice(slice))))
} ?= ptr::null_mut()
}
ffi_fn! {
fn fluxio_buf_bytes(buf: *const fluxio_buf) -> *const u8 {
unsafe { (*buf).0.as_ptr() }
} ?= ptr::null()
}
ffi_fn! {
fn fluxio_buf_len(buf: *const fluxio_buf) -> size_t {
unsafe { (*buf).0.len() }
}
}
ffi_fn! {
fn fluxio_buf_free(buf: *mut fluxio_buf) {
drop(unsafe { Box::from_raw(buf) });
}
}
unsafe impl AsTaskType for fluxio_buf {
fn as_task_type(&self) -> fluxio_task_return_type {
fluxio_task_return_type::FLUXIO_TASK_BUF
}
}