1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
// Unless explicitly stated otherwise all files in this repository are licensed // under the MIT/Apache-2.0 License, at your convenience // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // use core::{fmt, future::Future, marker::PhantomData, mem, ptr::NonNull}; use crate::task::{header::Header, raw::RawTask, state::*, JoinHandle}; /// Creates a new local task. /// /// This constructor returns a [`Task`] reference that runs the future and a /// [`JoinHandle`] that awaits its result. /// /// When run, the task polls `future`. When woken up, it gets scheduled for /// running by the `schedule` function. /// /// [`Task`]: struct.Task.html /// [`JoinHandle`]: struct.JoinHandle.html pub(crate) fn spawn_local<F, R, S>( executor_id: usize, future: F, schedule: S, ) -> (Task, JoinHandle<R>) where F: Future<Output = R>, S: Fn(Task), { // Allocate large futures on the heap. let raw_task = if mem::size_of::<F>() >= 2048 { let future = alloc::boxed::Box::pin(future); RawTask::<_, R, S>::allocate(future, schedule, executor_id) } else { RawTask::<_, R, S>::allocate(future, schedule, executor_id) }; let task = Task { raw_task }; let handle = JoinHandle { raw_task, _marker: PhantomData, }; (task, handle) } /// A task reference that runs its future. /// /// At any moment in time, there is at most one [`Task`] reference associated /// with a particular task. Running consumes the [`Task`] reference and polls /// its internal future. If the future is still pending after getting polled, /// the [`Task`] reference simply won't exist until a [`Waker`] notifies the /// task. If the future completes, its result becomes available to the /// [`JoinHandle`]. /// /// When a task is woken up, its [`Task`] reference is recreated and passed to /// the schedule function. In most executors, scheduling simply pushes the /// [`Task`] reference into a queue of runnable tasks. /// /// If the [`Task`] reference is dropped without getting run, the task is /// automatically canceled. When canceled, the task won't be scheduled again /// even if a [`Waker`] wakes it. It is possible for the [`JoinHandle`] to /// cancel while the [`Task`] reference exists, in which case an attempt /// to run the task won't do anything. /// /// [`run()`]: struct.Task.html#method.run /// [`JoinHandle`]: struct.JoinHandle.html /// [`Task`]: struct.Task.html /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html pub struct Task { /// A pointer to the heap-allocated task. pub(crate) raw_task: NonNull<()>, } impl Task { /// Schedules the task. /// /// This is a convenience method that simply reschedules the task by passing /// it to its schedule function. /// /// If the task is canceled, this method won't do anything. pub fn schedule(self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.schedule)(ptr); } } /// Runs the task. /// /// Returns `true` if the task was woken while running, in which case it /// gets rescheduled at the end of this method invocation. /// /// This method polls the task's future. If the future completes, its result /// will become available to the [`JoinHandle`]. And if the future is /// still pending, the task will have to be woken up in order to be /// rescheduled and run again. /// /// If the task was canceled by a [`JoinHandle`] before it gets run, then /// this method won't do anything. /// /// It is possible that polling the future panics, in which case the panic /// will be propagated into the caller. It is advised that invocations /// of this method are wrapped inside [`catch_unwind`]. If a panic /// occurs, the task is automatically canceled. /// /// [`JoinHandle`]: struct.JoinHandle.html /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html pub fn run(self) -> bool { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr) } } } impl Drop for Task { fn drop(&mut self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *mut Header; unsafe { // Cancel the task. (*header).cancel(); // Drop the future. ((*header).vtable.drop_future)(ptr); // Mark the task as unscheduled. (*header).state &= !SCHEDULED; // Notify the awaiter that the future has been dropped. (*header).notify(None); // Drop the task reference. ((*header).vtable.drop_task)(ptr); } } } impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; f.debug_struct("Task") .field("header", unsafe { &(*header) }) .finish() } }