hyper/ffi/
task.rs

1use std::ffi::{c_int, c_void};
2use std::future::Future;
3use std::pin::Pin;
4use std::ptr;
5use std::sync::{
6    atomic::{AtomicBool, Ordering},
7    Arc, Mutex, Weak,
8};
9use std::task::{Context, Poll};
10
11use futures_util::stream::{FuturesUnordered, Stream};
12
13use super::error::hyper_code;
14use super::UserDataPointer;
15
16type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
17type BoxAny = Box<dyn AsTaskType + Send + Sync>;
18
19/// Return in a poll function to indicate it was ready.
20pub const HYPER_POLL_READY: c_int = 0;
21/// Return in a poll function to indicate it is still pending.
22///
23/// The passed in `hyper_waker` should be registered to wake up the task at
24/// some later point.
25pub const HYPER_POLL_PENDING: c_int = 1;
26/// Return in a poll function indicate an error.
27pub const HYPER_POLL_ERROR: c_int = 3;
28
29/// A task executor for `hyper_task`s.
30///
31/// A task is a unit of work that may be blocked on IO, and can be polled to
32/// make progress on that work.
33///
34/// An executor can hold many tasks, included from unrelated HTTP connections.
35/// An executor is single threaded. Typically you might have one executor per
36/// thread. Or, for simplicity, you may choose one executor per connection.
37///
38/// Progress on tasks happens only when `hyper_executor_poll` is called, and only
39/// on tasks whose corresponding `hyper_waker` has been called to indicate they
40/// are ready to make progress (for instance, because the OS has indicated there
41/// is more data to read or more buffer space available to write).
42///
43/// Deadlock potential: `hyper_executor_poll` must not be called from within a task's
44/// callback. Doing so will result in a deadlock.
45///
46/// Methods:
47///
48/// - hyper_executor_new:  Creates a new task executor.
49/// - hyper_executor_push: Push a task onto the executor.
50/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again.
51/// - hyper_executor_free: Frees an executor and any incomplete tasks still part of it.
52pub struct hyper_executor {
53    /// The executor of all task futures.
54    ///
55    /// There should never be contention on the mutex, as it is only locked
56    /// to drive the futures. However, we cannot guarantee proper usage from
57    /// `hyper_executor_poll()`, which in C could potentially be called inside
58    /// one of the stored futures. The mutex isn't re-entrant, so doing so
59    /// would result in a deadlock, but that's better than data corruption.
60    driver: Mutex<FuturesUnordered<TaskFuture>>,
61
62    /// The queue of futures that need to be pushed into the `driver`.
63    ///
64    /// This is has a separate mutex since `spawn` could be called from inside
65    /// a future, which would mean the driver's mutex is already locked.
66    spawn_queue: Mutex<Vec<TaskFuture>>,
67
68    /// This is used to track when a future calls `wake` while we are within
69    /// `hyper_executor::poll_next`.
70    is_woken: Arc<ExecWaker>,
71}
72
73#[derive(Clone)]
74pub(crate) struct WeakExec(Weak<hyper_executor>);
75
76struct ExecWaker(AtomicBool);
77
78/// An async task.
79///
80/// A task represents a chunk of work that will eventually yield exactly one
81/// `hyper_task_value`. Tasks are pushed onto an executor, and that executor is
82/// responsible for calling the necessary private functions on the task to make
83/// progress. In most cases those private functions will eventually cause read
84/// or write callbacks on a `hyper_io` object to be called.
85///
86/// Tasks are created by various functions:
87///
88/// - hyper_clientconn_handshake: Creates an HTTP client handshake task.
89/// - hyper_clientconn_send:      Creates a task to send a request on the client connection.
90/// - hyper_body_data:            Creates a task that will poll a response body for the next buffer of data.
91/// - hyper_body_foreach:         Creates a task to execute the callback with each body chunk received.
92///
93/// Tasks then have a userdata associated with them using `hyper_task_set_userdata`. This
94/// is important, for instance, to associate a request id with a given request. When multiple
95/// tasks are running on the same executor, this allows distinguishing tasks for different
96/// requests.
97///
98/// Tasks are then pushed onto an executor, and eventually yielded from hyper_executor_poll:
99///
100/// - hyper_executor_push:        Push a task onto the executor.
101/// - hyper_executor_poll:        Polls the executor, trying to make progress on any tasks that have notified that they are ready again.
102///
103/// Once a task is yielded from poll, retrieve its userdata, check its type,
104/// and extract its value. This will require a case from void* to the appropriate type.
105///
106/// Methods on hyper_task:
107///
108/// - hyper_task_type:            Query the return type of this task.
109/// - hyper_task_value:           Takes the output value of this task.
110/// - hyper_task_set_userdata:    Set a user data pointer to be associated with this task.
111/// - hyper_task_userdata:        Retrieve the userdata that has been set via hyper_task_set_userdata.
112/// - hyper_task_free:            Free a task.
113pub struct hyper_task {
114    future: BoxFuture<BoxAny>,
115    output: Option<BoxAny>,
116    userdata: UserDataPointer,
117}
118
119struct TaskFuture {
120    task: Option<Box<hyper_task>>,
121}
122
123/// An async context for a task that contains the related waker.
124///
125/// This is provided to `hyper_io`'s read and write callbacks. Currently
126/// its only purpose is to provide access to the waker. See `hyper_waker`.
127///
128/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Context.html>
129pub struct hyper_context<'a>(Context<'a>);
130
131/// A waker that is saved and used to waken a pending task.
132///
133/// This is provided to `hyper_io`'s read and write callbacks via `hyper_context`
134/// and `hyper_context_waker`.
135///
136/// When nonblocking I/O in one of those callbacks can't make progress (returns
137/// `EAGAIN` or `EWOULDBLOCK`), the callback has to return to avoid blocking the
138/// executor. But it also has to arrange to get called in the future when more
139/// data is available. That's the role of the async context and the waker. The
140/// waker can be used to tell the executor "this task is ready to make progress."
141///
142/// The read or write callback, upon finding it can't make progress, must get a
143/// waker from the context (`hyper_context_waker`), arrange for that waker to be
144/// called in the future, and then return `HYPER_POLL_PENDING`.
145///
146/// The arrangements for the waker to be called in the future are up to the
147/// application, but usually it will involve one big `select(2)` loop that checks which
148/// FDs are ready, and a correspondence between FDs and waker objects. For each
149/// FD that is ready, the corresponding waker must be called. Then `hyper_executor_poll`
150/// must be called. That will cause the executor to attempt to make progress on each
151/// woken task.
152///
153/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Waker.html>
154pub struct hyper_waker {
155    waker: std::task::Waker,
156}
157
158/// A descriptor for what type a `hyper_task` value is.
159#[repr(C)]
160pub enum hyper_task_return_type {
161    /// The value of this task is null (does not imply an error).
162    HYPER_TASK_EMPTY,
163    /// The value of this task is `hyper_error *`.
164    HYPER_TASK_ERROR,
165    /// The value of this task is `hyper_clientconn *`.
166    HYPER_TASK_CLIENTCONN,
167    /// The value of this task is `hyper_response *`.
168    HYPER_TASK_RESPONSE,
169    /// The value of this task is `hyper_buf *`.
170    HYPER_TASK_BUF,
171}
172
173pub(crate) unsafe trait AsTaskType {
174    fn as_task_type(&self) -> hyper_task_return_type;
175}
176
177pub(crate) trait IntoDynTaskType {
178    fn into_dyn_task_type(self) -> BoxAny;
179}
180
181// ===== impl hyper_executor =====
182
183impl hyper_executor {
184    fn new() -> Arc<hyper_executor> {
185        Arc::new(hyper_executor {
186            driver: Mutex::new(FuturesUnordered::new()),
187            spawn_queue: Mutex::new(Vec::new()),
188            is_woken: Arc::new(ExecWaker(AtomicBool::new(false))),
189        })
190    }
191
192    pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec {
193        WeakExec(Arc::downgrade(exec))
194    }
195
196    fn spawn(&self, task: Box<hyper_task>) {
197        self.spawn_queue
198            .lock()
199            .unwrap()
200            .push(TaskFuture { task: Some(task) });
201    }
202
203    fn poll_next(&self) -> Option<Box<hyper_task>> {
204        // Drain the queue first.
205        self.drain_queue();
206
207        let waker = futures_util::task::waker_ref(&self.is_woken);
208        let mut cx = Context::from_waker(&waker);
209
210        loop {
211            {
212                // Scope the lock on the driver to ensure it is dropped before
213                // calling drain_queue below.
214                let mut driver = self.driver.lock().unwrap();
215                match Pin::new(&mut *driver).poll_next(&mut cx) {
216                    Poll::Ready(val) => return val,
217                    Poll::Pending => {}
218                };
219            }
220
221            // poll_next returned Pending.
222            // Check if any of the pending tasks tried to spawn
223            // some new tasks. If so, drain into the driver and loop.
224            if self.drain_queue() {
225                continue;
226            }
227
228            // If the driver called `wake` while we were polling,
229            // we should poll again immediately!
230            if self.is_woken.0.swap(false, Ordering::SeqCst) {
231                continue;
232            }
233
234            return None;
235        }
236    }
237
238    /// drain_queue locks both self.spawn_queue and self.driver, so it requires
239    /// that neither of them be locked already.
240    fn drain_queue(&self) -> bool {
241        let mut queue = self.spawn_queue.lock().unwrap();
242        if queue.is_empty() {
243            return false;
244        }
245
246        let driver = self.driver.lock().unwrap();
247
248        for task in queue.drain(..) {
249            driver.push(task);
250        }
251
252        true
253    }
254}
255
256impl futures_util::task::ArcWake for ExecWaker {
257    fn wake_by_ref(me: &Arc<ExecWaker>) {
258        me.0.store(true, Ordering::SeqCst);
259    }
260}
261
262// ===== impl WeakExec =====
263
264impl WeakExec {
265    pub(crate) fn new() -> Self {
266        WeakExec(Weak::new())
267    }
268}
269
270impl<F> crate::rt::Executor<F> for WeakExec
271where
272    F: Future + Send + 'static,
273    F::Output: Send + Sync + AsTaskType,
274{
275    fn execute(&self, fut: F) {
276        if let Some(exec) = self.0.upgrade() {
277            exec.spawn(hyper_task::boxed(fut));
278        }
279    }
280}
281
282ffi_fn! {
283    /// Creates a new task executor.
284    ///
285    /// To avoid a memory leak, the executor must eventually be consumed by
286    /// `hyper_executor_free`.
287    fn hyper_executor_new() -> *const hyper_executor {
288        Arc::into_raw(hyper_executor::new())
289    } ?= ptr::null()
290}
291
292ffi_fn! {
293    /// Frees an executor and any incomplete tasks still part of it.
294    ///
295    /// This should be used for any executor once it is no longer needed.
296    fn hyper_executor_free(exec: *const hyper_executor) {
297        drop(non_null!(Arc::from_raw(exec) ?= ()));
298    }
299}
300
301ffi_fn! {
302    /// Push a task onto the executor.
303    ///
304    /// The executor takes ownership of the task, which must not be accessed
305    /// again.
306    ///
307    /// Ownership of the task will eventually be returned to the user from
308    /// `hyper_executor_poll`.
309    ///
310    /// To distinguish multiple tasks running on the same executor, use
311    /// hyper_task_set_userdata.
312    fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code {
313        let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG);
314        let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG);
315        exec.spawn(task);
316        hyper_code::HYPERE_OK
317    }
318}
319
320ffi_fn! {
321    /// Polls the executor, trying to make progress on any tasks that can do so.
322    ///
323    /// If any task from the executor is ready, returns one of them. The way
324    /// tasks signal being finished is internal to Hyper. The order in which tasks
325    /// are returned is not guaranteed. Use userdata to distinguish between tasks.
326    ///
327    /// To avoid a memory leak, the task must eventually be consumed by
328    /// `hyper_task_free`.
329    ///
330    /// If there are no ready tasks, this returns `NULL`.
331    fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task {
332        let exec = non_null!(&*exec ?= ptr::null_mut());
333        match exec.poll_next() {
334            Some(task) => Box::into_raw(task),
335            None => ptr::null_mut(),
336        }
337    } ?= ptr::null_mut()
338}
339
340// ===== impl hyper_task =====
341
342impl hyper_task {
343    pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task>
344    where
345        F: Future + Send + 'static,
346        F::Output: IntoDynTaskType + Send + Sync + 'static,
347    {
348        Box::new(hyper_task {
349            future: Box::pin(async move { fut.await.into_dyn_task_type() }),
350            output: None,
351            userdata: UserDataPointer(ptr::null_mut()),
352        })
353    }
354
355    fn output_type(&self) -> hyper_task_return_type {
356        match self.output {
357            None => hyper_task_return_type::HYPER_TASK_EMPTY,
358            Some(ref val) => val.as_task_type(),
359        }
360    }
361}
362
363impl Future for TaskFuture {
364    type Output = Box<hyper_task>;
365
366    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367        match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) {
368            Poll::Ready(val) => {
369                let mut task = self.task.take().unwrap();
370                task.output = Some(val);
371                Poll::Ready(task)
372            }
373            Poll::Pending => Poll::Pending,
374        }
375    }
376}
377
378ffi_fn! {
379    /// Free a task.
380    ///
381    /// This should only be used if the task isn't consumed by
382    /// `hyper_clientconn_handshake` or taken ownership of by
383    /// `hyper_executor_push`.
384    fn hyper_task_free(task: *mut hyper_task) {
385        drop(non_null!(Box::from_raw(task) ?= ()));
386    }
387}
388
389ffi_fn! {
390    /// Takes the output value of this task.
391    ///
392    /// This must only be called once polling the task on an executor has finished
393    /// this task.
394    ///
395    /// Use `hyper_task_type` to determine the type of the `void *` return value.
396    ///
397    /// To avoid a memory leak, a non-empty return value must eventually be
398    /// consumed by a function appropriate for its type, one of
399    /// `hyper_error_free`, `hyper_clientconn_free`, `hyper_response_free`, or
400    /// `hyper_buf_free`.
401    fn hyper_task_value(task: *mut hyper_task) -> *mut c_void {
402        let task = non_null!(&mut *task ?= ptr::null_mut());
403
404        if let Some(val) = task.output.take() {
405            let p = Box::into_raw(val) as *mut c_void;
406            // protect from returning fake pointers to empty types
407            if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() {
408                ptr::null_mut()
409            } else {
410                p
411            }
412        } else {
413            ptr::null_mut()
414        }
415    } ?= ptr::null_mut()
416}
417
418ffi_fn! {
419    /// Query the return type of this task.
420    fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type {
421        // instead of blowing up spectacularly, just say this null task
422        // doesn't have a value to retrieve.
423        non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type()
424    }
425}
426
427ffi_fn! {
428    /// Set a user data pointer to be associated with this task.
429    ///
430    /// This value will be passed to task callbacks, and can be checked later
431    /// with `hyper_task_userdata`.
432    ///
433    /// This is useful for telling apart tasks for different requests that are
434    /// running on the same executor.
435    fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) {
436        if task.is_null() {
437            return;
438        }
439
440        unsafe { (*task).userdata = UserDataPointer(userdata) };
441    }
442}
443
444ffi_fn! {
445    /// Retrieve the userdata that has been set via `hyper_task_set_userdata`.
446    fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void {
447        non_null!(&*task ?= ptr::null_mut()).userdata.0
448    } ?= ptr::null_mut()
449}
450
451// ===== impl AsTaskType =====
452
453unsafe impl AsTaskType for () {
454    fn as_task_type(&self) -> hyper_task_return_type {
455        hyper_task_return_type::HYPER_TASK_EMPTY
456    }
457}
458
459unsafe impl AsTaskType for crate::Error {
460    fn as_task_type(&self) -> hyper_task_return_type {
461        hyper_task_return_type::HYPER_TASK_ERROR
462    }
463}
464
465impl<T> IntoDynTaskType for T
466where
467    T: AsTaskType + Send + Sync + 'static,
468{
469    fn into_dyn_task_type(self) -> BoxAny {
470        Box::new(self)
471    }
472}
473
474impl<T> IntoDynTaskType for crate::Result<T>
475where
476    T: IntoDynTaskType + Send + Sync + 'static,
477{
478    fn into_dyn_task_type(self) -> BoxAny {
479        match self {
480            Ok(val) => val.into_dyn_task_type(),
481            Err(err) => Box::new(err),
482        }
483    }
484}
485
486impl<T> IntoDynTaskType for Option<T>
487where
488    T: IntoDynTaskType + Send + Sync + 'static,
489{
490    fn into_dyn_task_type(self) -> BoxAny {
491        match self {
492            Some(val) => val.into_dyn_task_type(),
493            None => ().into_dyn_task_type(),
494        }
495    }
496}
497
498// ===== impl hyper_context =====
499
500impl hyper_context<'_> {
501    pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> {
502        // A struct with only one field has the same layout as that field.
503        unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) }
504    }
505}
506
507ffi_fn! {
508    /// Creates a waker associated with the task context.
509    ///
510    /// The waker can be used to inform the task's executor that the task is
511    /// ready to make progress (using `hyper_waker_wake`).
512    ///
513    /// Typically this only needs to be called once, but it can be called
514    /// multiple times, returning a new waker each time.
515    ///
516    /// To avoid a memory leak, the waker must eventually be consumed by
517    /// `hyper_waker_free` or `hyper_waker_wake`.
518    fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker {
519        let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone();
520        Box::into_raw(Box::new(hyper_waker { waker }))
521    } ?= ptr::null_mut()
522}
523
524// ===== impl hyper_waker =====
525
526ffi_fn! {
527    /// Free a waker.
528    ///
529    /// This should only be used if the request isn't consumed by
530    /// `hyper_waker_wake`.
531    fn hyper_waker_free(waker: *mut hyper_waker) {
532        drop(non_null!(Box::from_raw(waker) ?= ()));
533    }
534}
535
536ffi_fn! {
537    /// Wake up the task associated with a waker.
538    ///
539    /// This does not do work towards associated task. Instead, it signals
540    /// to the task's executor that the task is ready to make progress. The
541    /// application is responsible for calling hyper_executor_poll, which
542    /// will in turn do work on all tasks that are ready to make progress.
543    ///
544    /// NOTE: This consumes the waker. You should not use or free the waker afterwards.
545    fn hyper_waker_wake(waker: *mut hyper_waker) {
546        let waker = non_null!(Box::from_raw(waker) ?= ());
547        waker.waker.wake();
548    }
549}