rama_hyper/ffi/
task.rs

1use std::ffi::c_void;
2use std::future::Future;
3use std::pin::Pin;
4use std::ptr;
5use std::sync::{
6    atomic::{AtomicBool, Ordering},
7    Arc, Mutex, Weak,
8};
9use std::task::{Context, Poll};
10
11use futures_util::stream::{FuturesUnordered, Stream};
12use libc::c_int;
13
14use super::error::hyper_code;
15use super::UserDataPointer;
16
17type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
18type BoxAny = Box<dyn AsTaskType + Send + Sync>;
19
20/// Return in a poll function to indicate it was ready.
21pub const HYPER_POLL_READY: c_int = 0;
22/// Return in a poll function to indicate it is still pending.
23///
24/// The passed in `hyper_waker` should be registered to wake up the task at
25/// some later point.
26pub const HYPER_POLL_PENDING: c_int = 1;
27/// Return in a poll function indicate an error.
28pub const HYPER_POLL_ERROR: c_int = 3;
29
30/// A task executor for `hyper_task`s.
31///
32/// A task is a unit of work that may be blocked on IO, and can be polled to
33/// make progress on that work.
34///
35/// An executor can hold many tasks, included from unrelated HTTP connections.
36/// An executor is single threaded. Typically you might have one executor per
37/// thread. Or, for simplicity, you may choose one executor per connection.
38///
39/// Progress on tasks happens only when `hyper_executor_poll` is called, and only
40/// on tasks whose corresponding `hyper_waker` has been called to indicate they
41/// are ready to make progress (for instance, because the OS has indicated there
42/// is more data to read or more buffer space available to write).
43///
44/// Deadlock potential: `hyper_executor_poll` must not be called from within a task's
45/// callback. Doing so will result in a deadlock.
46///
47/// Methods:
48///
49/// - hyper_executor_new:  Creates a new task executor.
50/// - hyper_executor_push: Push a task onto the executor.
51/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again.
52/// - hyper_executor_free: Frees an executor and any incomplete tasks still part of it.
53pub struct hyper_executor {
54    /// The executor of all task futures.
55    ///
56    /// There should never be contention on the mutex, as it is only locked
57    /// to drive the futures. However, we cannot guarantee proper usage from
58    /// `hyper_executor_poll()`, which in C could potentially be called inside
59    /// one of the stored futures. The mutex isn't re-entrant, so doing so
60    /// would result in a deadlock, but that's better than data corruption.
61    driver: Mutex<FuturesUnordered<TaskFuture>>,
62
63    /// The queue of futures that need to be pushed into the `driver`.
64    ///
65    /// This is has a separate mutex since `spawn` could be called from inside
66    /// a future, which would mean the driver's mutex is already locked.
67    spawn_queue: Mutex<Vec<TaskFuture>>,
68
69    /// This is used to track when a future calls `wake` while we are within
70    /// `hyper_executor::poll_next`.
71    is_woken: Arc<ExecWaker>,
72}
73
74#[derive(Clone)]
75pub(crate) struct WeakExec(Weak<hyper_executor>);
76
77struct ExecWaker(AtomicBool);
78
79/// An async task.
80///
81/// A task represents a chunk of work that will eventually yield exactly one
82/// `hyper_task_value`. Tasks are pushed onto an executor, and that executor is
83/// responsible for calling the necessary private functions on the task to make
84/// progress. In most cases those private functions will eventually cause read
85/// or write callbacks on a `hyper_io` object to be called.
86///
87/// Tasks are created by various functions:
88///
89/// - hyper_clientconn_handshake: Creates an HTTP client handshake task.
90/// - hyper_clientconn_send:      Creates a task to send a request on the client connection.
91/// - hyper_body_data:            Creates a task that will poll a response body for the next buffer of data.
92/// - hyper_body_foreach:         Creates a task to execute the callback with each body chunk received.
93///
94/// Tasks then have a userdata associated with them using `hyper_task_set_userdata``. This
95/// is important, for instance, to associate a request id with a given request. When multiple
96/// tasks are running on the same executor, this allows distinguishing tasks for different
97/// requests.
98///
99/// Tasks are then pushed onto an executor, and eventually yielded from hyper_executor_poll:
100///
101/// - hyper_executor_push:        Push a task onto the executor.
102/// - hyper_executor_poll:        Polls the executor, trying to make progress on any tasks that have notified that they are ready again.
103///
104/// Once a task is yielded from poll, retrieve its userdata, check its type,
105/// and extract its value. This will require a case from void* to the appropriate type.
106///
107/// Methods on hyper_task:
108///
109/// - hyper_task_type:            Query the return type of this task.
110/// - hyper_task_value:           Takes the output value of this task.
111/// - hyper_task_set_userdata:    Set a user data pointer to be associated with this task.
112/// - hyper_task_userdata:        Retrieve the userdata that has been set via hyper_task_set_userdata.
113/// - hyper_task_free:            Free a task.
114pub struct hyper_task {
115    future: BoxFuture<BoxAny>,
116    output: Option<BoxAny>,
117    userdata: UserDataPointer,
118}
119
120struct TaskFuture {
121    task: Option<Box<hyper_task>>,
122}
123
124/// An async context for a task that contains the related waker.
125///
126/// This is provided to `hyper_io`'s read and write callbacks. Currently
127/// its only purpose is to provide access to the waker. See `hyper_waker`.
128///
129/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Context.html>
130pub struct hyper_context<'a>(Context<'a>);
131
132/// A waker that is saved and used to waken a pending task.
133///
134/// This is provided to `hyper_io`'s read and write callbacks via `hyper_context`
135/// and `hyper_context_waker`.
136///
137/// When nonblocking I/O in one of those callbacks can't make progress (returns
138/// `EAGAIN` or `EWOULDBLOCK`), the callback has to return to avoid blocking the
139/// executor. But it also has to arrange to get called in the future when more
140/// data is available. That's the role of the async context and the waker. The
141/// waker can be used to tell the executor "this task is ready to make progress."
142///
143/// The read or write callback, upon finding it can't make progress, must get a
144/// waker from the context (`hyper_context_waker`), arrange for that waker to be
145/// called in the future, and then return `HYPER_POLL_PENDING`.
146///
147/// The arrangements for the waker to be called in the future are up to the
148/// application, but usually it will involve one big `select(2)` loop that checks which
149/// FDs are ready, and a correspondence between FDs and waker objects. For each
150/// FD that is ready, the corresponding waker must be called. Then `hyper_executor_poll`
151/// must be called. That will cause the executor to attempt to make progress on each
152/// woken task.
153///
154/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Waker.html>
155pub struct hyper_waker {
156    waker: std::task::Waker,
157}
158
159/// A descriptor for what type a `hyper_task` value is.
160#[repr(C)]
161pub enum hyper_task_return_type {
162    /// The value of this task is null (does not imply an error).
163    HYPER_TASK_EMPTY,
164    /// The value of this task is `hyper_error *`.
165    HYPER_TASK_ERROR,
166    /// The value of this task is `hyper_clientconn *`.
167    HYPER_TASK_CLIENTCONN,
168    /// The value of this task is `hyper_response *`.
169    HYPER_TASK_RESPONSE,
170    /// The value of this task is `hyper_buf *`.
171    HYPER_TASK_BUF,
172}
173
174pub(crate) unsafe trait AsTaskType {
175    fn as_task_type(&self) -> hyper_task_return_type;
176}
177
178pub(crate) trait IntoDynTaskType {
179    fn into_dyn_task_type(self) -> BoxAny;
180}
181
182// ===== impl hyper_executor =====
183
184impl hyper_executor {
185    fn new() -> Arc<hyper_executor> {
186        Arc::new(hyper_executor {
187            driver: Mutex::new(FuturesUnordered::new()),
188            spawn_queue: Mutex::new(Vec::new()),
189            is_woken: Arc::new(ExecWaker(AtomicBool::new(false))),
190        })
191    }
192
193    pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec {
194        WeakExec(Arc::downgrade(exec))
195    }
196
197    fn spawn(&self, task: Box<hyper_task>) {
198        self.spawn_queue
199            .lock()
200            .unwrap()
201            .push(TaskFuture { task: Some(task) });
202    }
203
204    fn poll_next(&self) -> Option<Box<hyper_task>> {
205        // Drain the queue first.
206        self.drain_queue();
207
208        let waker = futures_util::task::waker_ref(&self.is_woken);
209        let mut cx = Context::from_waker(&waker);
210
211        loop {
212            {
213                // Scope the lock on the driver to ensure it is dropped before
214                // calling drain_queue below.
215                let mut driver = self.driver.lock().unwrap();
216                match Pin::new(&mut *driver).poll_next(&mut cx) {
217                    Poll::Ready(val) => return val,
218                    Poll::Pending => {}
219                };
220            }
221
222            // poll_next returned Pending.
223            // Check if any of the pending tasks tried to spawn
224            // some new tasks. If so, drain into the driver and loop.
225            if self.drain_queue() {
226                continue;
227            }
228
229            // If the driver called `wake` while we were polling,
230            // we should poll again immediately!
231            if self.is_woken.0.swap(false, Ordering::SeqCst) {
232                continue;
233            }
234
235            return None;
236        }
237    }
238
239    /// drain_queue locks both self.spawn_queue and self.driver, so it requires
240    /// that neither of them be locked already.
241    fn drain_queue(&self) -> bool {
242        let mut queue = self.spawn_queue.lock().unwrap();
243        if queue.is_empty() {
244            return false;
245        }
246
247        let driver = self.driver.lock().unwrap();
248
249        for task in queue.drain(..) {
250            driver.push(task);
251        }
252
253        true
254    }
255}
256
257impl futures_util::task::ArcWake for ExecWaker {
258    fn wake_by_ref(me: &Arc<ExecWaker>) {
259        me.0.store(true, Ordering::SeqCst);
260    }
261}
262
263// ===== impl WeakExec =====
264
265impl WeakExec {
266    pub(crate) fn new() -> Self {
267        WeakExec(Weak::new())
268    }
269}
270
271impl<F> crate::rt::Executor<F> for WeakExec
272where
273    F: Future + Send + 'static,
274    F::Output: Send + Sync + AsTaskType,
275{
276    fn execute(&self, fut: F) {
277        if let Some(exec) = self.0.upgrade() {
278            exec.spawn(hyper_task::boxed(fut));
279        }
280    }
281}
282
283ffi_fn! {
284    /// Creates a new task executor.
285    ///
286    /// To avoid a memory leak, the executor must eventually be consumed by
287    /// `hyper_executor_free`.
288    fn hyper_executor_new() -> *const hyper_executor {
289        Arc::into_raw(hyper_executor::new())
290    } ?= ptr::null()
291}
292
293ffi_fn! {
294    /// Frees an executor and any incomplete tasks still part of it.
295    ///
296    /// This should be used for any executor once it is no longer needed.
297    fn hyper_executor_free(exec: *const hyper_executor) {
298        drop(non_null!(Arc::from_raw(exec) ?= ()));
299    }
300}
301
302ffi_fn! {
303    /// Push a task onto the executor.
304    ///
305    /// The executor takes ownership of the task, which must not be accessed
306    /// again.
307    ///
308    /// Ownership of the task will eventually be returned to the user from
309    /// `hyper_executor_poll`.
310    ///
311    /// To distinguish multiple tasks running on the same executor, use
312    /// hyper_task_set_userdata.
313    fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code {
314        let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG);
315        let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG);
316        exec.spawn(task);
317        hyper_code::HYPERE_OK
318    }
319}
320
321ffi_fn! {
322    /// Polls the executor, trying to make progress on any tasks that can do so.
323    ///
324    /// If any task from the executor is ready, returns one of them. The way
325    /// tasks signal being finished is internal to Hyper. The order in which tasks
326    /// are returned is not guaranteed. Use userdata to distinguish between tasks.
327    ///
328    /// To avoid a memory leak, the task must eventually be consumed by
329    /// `hyper_task_free`.
330    ///
331    /// If there are no ready tasks, this returns `NULL`.
332    fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task {
333        let exec = non_null!(&*exec ?= ptr::null_mut());
334        match exec.poll_next() {
335            Some(task) => Box::into_raw(task),
336            None => ptr::null_mut(),
337        }
338    } ?= ptr::null_mut()
339}
340
341// ===== impl hyper_task =====
342
343impl hyper_task {
344    pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task>
345    where
346        F: Future + Send + 'static,
347        F::Output: IntoDynTaskType + Send + Sync + 'static,
348    {
349        Box::new(hyper_task {
350            future: Box::pin(async move { fut.await.into_dyn_task_type() }),
351            output: None,
352            userdata: UserDataPointer(ptr::null_mut()),
353        })
354    }
355
356    fn output_type(&self) -> hyper_task_return_type {
357        match self.output {
358            None => hyper_task_return_type::HYPER_TASK_EMPTY,
359            Some(ref val) => val.as_task_type(),
360        }
361    }
362}
363
364impl Future for TaskFuture {
365    type Output = Box<hyper_task>;
366
367    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
368        match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) {
369            Poll::Ready(val) => {
370                let mut task = self.task.take().unwrap();
371                task.output = Some(val);
372                Poll::Ready(task)
373            }
374            Poll::Pending => Poll::Pending,
375        }
376    }
377}
378
379ffi_fn! {
380    /// Free a task.
381    ///
382    /// This should only be used if the task isn't consumed by
383    /// `hyper_clientconn_handshake` or taken ownership of by
384    /// `hyper_executor_push`.
385    fn hyper_task_free(task: *mut hyper_task) {
386        drop(non_null!(Box::from_raw(task) ?= ()));
387    }
388}
389
390ffi_fn! {
391    /// Takes the output value of this task.
392    ///
393    /// This must only be called once polling the task on an executor has finished
394    /// this task.
395    ///
396    /// Use `hyper_task_type` to determine the type of the `void *` return value.
397    ///
398    /// To avoid a memory leak, a non-empty return value must eventually be
399    /// consumed by a function appropriate for its type, one of
400    /// `hyper_error_free`, `hyper_clientconn_free`, `hyper_response_free`, or
401    /// `hyper_buf_free`.
402    fn hyper_task_value(task: *mut hyper_task) -> *mut c_void {
403        let task = non_null!(&mut *task ?= ptr::null_mut());
404
405        if let Some(val) = task.output.take() {
406            let p = Box::into_raw(val) as *mut c_void;
407            // protect from returning fake pointers to empty types
408            if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() {
409                ptr::null_mut()
410            } else {
411                p
412            }
413        } else {
414            ptr::null_mut()
415        }
416    } ?= ptr::null_mut()
417}
418
419ffi_fn! {
420    /// Query the return type of this task.
421    fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type {
422        // instead of blowing up spectacularly, just say this null task
423        // doesn't have a value to retrieve.
424        non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type()
425    }
426}
427
428ffi_fn! {
429    /// Set a user data pointer to be associated with this task.
430    ///
431    /// This value will be passed to task callbacks, and can be checked later
432    /// with `hyper_task_userdata`.
433    ///
434    /// This is useful for telling apart tasks for different requests that are
435    /// running on the same executor.
436    fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) {
437        if task.is_null() {
438            return;
439        }
440
441        unsafe { (*task).userdata = UserDataPointer(userdata) };
442    }
443}
444
445ffi_fn! {
446    /// Retrieve the userdata that has been set via `hyper_task_set_userdata`.
447    fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void {
448        non_null!(&*task ?= ptr::null_mut()).userdata.0
449    } ?= ptr::null_mut()
450}
451
452// ===== impl AsTaskType =====
453
454unsafe impl AsTaskType for () {
455    fn as_task_type(&self) -> hyper_task_return_type {
456        hyper_task_return_type::HYPER_TASK_EMPTY
457    }
458}
459
460unsafe impl AsTaskType for crate::Error {
461    fn as_task_type(&self) -> hyper_task_return_type {
462        hyper_task_return_type::HYPER_TASK_ERROR
463    }
464}
465
466impl<T> IntoDynTaskType for T
467where
468    T: AsTaskType + Send + Sync + 'static,
469{
470    fn into_dyn_task_type(self) -> BoxAny {
471        Box::new(self)
472    }
473}
474
475impl<T> IntoDynTaskType for crate::Result<T>
476where
477    T: IntoDynTaskType + Send + Sync + 'static,
478{
479    fn into_dyn_task_type(self) -> BoxAny {
480        match self {
481            Ok(val) => val.into_dyn_task_type(),
482            Err(err) => Box::new(err),
483        }
484    }
485}
486
487impl<T> IntoDynTaskType for Option<T>
488where
489    T: IntoDynTaskType + Send + Sync + 'static,
490{
491    fn into_dyn_task_type(self) -> BoxAny {
492        match self {
493            Some(val) => val.into_dyn_task_type(),
494            None => ().into_dyn_task_type(),
495        }
496    }
497}
498
499// ===== impl hyper_context =====
500
501impl hyper_context<'_> {
502    pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> {
503        // A struct with only one field has the same layout as that field.
504        unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) }
505    }
506}
507
508ffi_fn! {
509    /// Creates a waker associated with the task context.
510    ///
511    /// The waker can be used to inform the task's executor that the task is
512    /// ready to make progress (using `hyper_waker_wake``).
513    ///
514    /// Typically this only needs to be called once, but it can be called
515    /// multiple times, returning a new waker each time.
516    ///
517    /// To avoid a memory leak, the waker must eventually be consumed by
518    /// `hyper_waker_free` or `hyper_waker_wake`.
519    fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker {
520        let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone();
521        Box::into_raw(Box::new(hyper_waker { waker }))
522    } ?= ptr::null_mut()
523}
524
525// ===== impl hyper_waker =====
526
527ffi_fn! {
528    /// Free a waker.
529    ///
530    /// This should only be used if the request isn't consumed by
531    /// `hyper_waker_wake`.
532    fn hyper_waker_free(waker: *mut hyper_waker) {
533        drop(non_null!(Box::from_raw(waker) ?= ()));
534    }
535}
536
537ffi_fn! {
538    /// Wake up the task associated with a waker.
539    ///
540    /// This does not do work towards associated task. Instead, it signals
541    /// to the task's executor that the task is ready to make progress. The
542    /// application is responsible for calling hyper_executor_poll, which
543    /// will in turn do work on all tasks that are ready to make progress.
544    ///
545    /// NOTE: This consumes the waker. You should not use or free the waker afterwards.
546    fn hyper_waker_wake(waker: *mut hyper_waker) {
547        let waker = non_null!(Box::from_raw(waker) ?= ());
548        waker.waker.wake();
549    }
550}