1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
use std::ffi::c_void;
use std::mem::ManuallyDrop;
use std::ptr;
use std::task::{Context, Poll};

use http::HeaderMap;
use libc::{c_int, size_t};

use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
use super::{UserDataPointer, HYPER_ITER_CONTINUE};
use crate::body::{Body, Bytes, HttpBody as _};

/// A streaming HTTP body.
pub struct hyper_body(pub(super) Body);

/// A buffer of bytes that is sent or received on a `hyper_body`.
pub struct hyper_buf(pub(crate) Bytes);

pub(crate) struct UserBody {
    data_func: hyper_body_data_callback,
    userdata: *mut c_void,
}

// ===== Body =====

type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int;

type hyper_body_data_callback =
    extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut *mut hyper_buf) -> c_int;

ffi_fn! {
    /// Create a new "empty" body.
    ///
    /// If not configured, this body acts as an empty payload.
    fn hyper_body_new() -> *mut hyper_body {
        Box::into_raw(Box::new(hyper_body(Body::empty())))
    } ?= ptr::null_mut()
}

ffi_fn! {
    /// Free a `hyper_body *`.
    fn hyper_body_free(body: *mut hyper_body) {
        drop(non_null!(Box::from_raw(body) ?= ()));
    }
}

ffi_fn! {
    /// Return a task that will poll the body for the next buffer of data.
    ///
    /// The task value may have different types depending on the outcome:
    ///
    /// - `HYPER_TASK_BUF`: Success, and more data was received.
    /// - `HYPER_TASK_ERROR`: An error retrieving the data.
    /// - `HYPER_TASK_EMPTY`: The body has finished streaming data.
    ///
    /// This does not consume the `hyper_body *`, so it may be used to again.
    /// However, it MUST NOT be used or freed until the related task completes.
    fn hyper_body_data(body: *mut hyper_body) -> *mut hyper_task {
        // This doesn't take ownership of the Body, so don't allow destructor
        let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));

        Box::into_raw(hyper_task::boxed(async move {
            body.0.data().await.map(|res| res.map(hyper_buf))
        }))
    } ?= ptr::null_mut()
}

ffi_fn! {
    /// Return a task that will poll the body and execute the callback with each
    /// body chunk that is received.
    ///
    /// The `hyper_buf` pointer is only a borrowed reference, it cannot live outside
    /// the execution of the callback. You must make a copy to retain it.
    ///
    /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating
    /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel.
    ///
    /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
    fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
        let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
        let userdata = UserDataPointer(userdata);

        Box::into_raw(hyper_task::boxed(async move {
            while let Some(item) = body.0.data().await {
                let chunk = item?;
                if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
                    return Err(crate::Error::new_user_aborted_by_callback());
                }
            }
            Ok(())
        }))
    } ?= ptr::null_mut()
}

ffi_fn! {
    /// Set userdata on this body, which will be passed to callback functions.
    fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
        let b = non_null!(&mut *body ?= ());
        b.0.as_ffi_mut().userdata = userdata;
    }
}

ffi_fn! {
    /// Set the data callback for this body.
    ///
    /// The callback is called each time hyper needs to send more data for the
    /// body. It is passed the value from `hyper_body_set_userdata`.
    ///
    /// If there is data available, the `hyper_buf **` argument should be set
    /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should
    /// be returned.
    ///
    /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points
    /// to `NULL` will indicate the body has completed all data.
    ///
    /// If there is more data to send, but it isn't yet available, a
    /// `hyper_waker` should be saved from the `hyper_context *` argument, and
    /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker
    /// to signal the task when data is available.
    ///
    /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort
    /// the body.
    fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) {
        let b = non_null!{ &mut *body ?= () };
        b.0.as_ffi_mut().data_func = func;
    }
}

// ===== impl UserBody =====

impl UserBody {
    pub(crate) fn new() -> UserBody {
        UserBody {
            data_func: data_noop,
            userdata: std::ptr::null_mut(),
        }
    }

    pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
        let mut out = std::ptr::null_mut();
        match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
            super::task::HYPER_POLL_READY => {
                if out.is_null() {
                    Poll::Ready(None)
                } else {
                    let buf = unsafe { Box::from_raw(out) };
                    Poll::Ready(Some(Ok(buf.0)))
                }
            }
            super::task::HYPER_POLL_PENDING => Poll::Pending,
            super::task::HYPER_POLL_ERROR => {
                Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
            }
            unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
                "unexpected hyper_body_data_func return code {}",
                unexpected
            ))))),
        }
    }

    pub(crate) fn poll_trailers(
        &mut self,
        _cx: &mut Context<'_>,
    ) -> Poll<crate::Result<Option<HeaderMap>>> {
        Poll::Ready(Ok(None))
    }
}

/// cbindgen:ignore
extern "C" fn data_noop(
    _userdata: *mut c_void,
    _: *mut hyper_context<'_>,
    _: *mut *mut hyper_buf,
) -> c_int {
    super::task::HYPER_POLL_READY
}

unsafe impl Send for UserBody {}
unsafe impl Sync for UserBody {}

// ===== Bytes =====

ffi_fn! {
    /// Create a new `hyper_buf *` by copying the provided bytes.
    ///
    /// This makes an owned copy of the bytes, so the `buf` argument can be
    /// freed or changed afterwards.
    ///
    /// This returns `NULL` if allocating a new buffer fails.
    fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf {
        let slice = unsafe {
            std::slice::from_raw_parts(buf, len)
        };
        Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice))))
    } ?= ptr::null_mut()
}

ffi_fn! {
    /// Get a pointer to the bytes in this buffer.
    ///
    /// This should be used in conjunction with `hyper_buf_len` to get the length
    /// of the bytes data.
    ///
    /// This pointer is borrowed data, and not valid once the `hyper_buf` is
    /// consumed/freed.
    fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 {
        unsafe { (*buf).0.as_ptr() }
    } ?= ptr::null()
}

ffi_fn! {
    /// Get the length of the bytes this buffer contains.
    fn hyper_buf_len(buf: *const hyper_buf) -> size_t {
        unsafe { (*buf).0.len() }
    }
}

ffi_fn! {
    /// Free this buffer.
    fn hyper_buf_free(buf: *mut hyper_buf) {
        drop(unsafe { Box::from_raw(buf) });
    }
}

unsafe impl AsTaskType for hyper_buf {
    fn as_task_type(&self) -> hyper_task_return_type {
        hyper_task_return_type::HYPER_TASK_BUF
    }
}