1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::mem; use std::ptr::NonNull; use crate::header::Header; use crate::raw::RawTask; use crate::JoinHandle; /// Creates a new task. /// /// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that /// awaits its result. /// /// When run, the task polls `future`. When woken, it gets scheduled for running by the `schedule` /// function. Argument `tag` is an arbitrary piece of data stored inside the task. /// /// [`Task`]: struct.Task.html /// [`JoinHandle`]: struct.JoinHandle.html /// /// # Examples /// /// ``` /// # #![feature(async_await)] /// # /// use crossbeam::channel; /// /// // The future inside the task. /// let future = async { /// println!("Hello, world!"); /// }; /// /// // If the task gets woken, it will be sent into this channel. /// let (s, r) = channel::unbounded(); /// let schedule = move |task| s.send(task).unwrap(); /// /// // Create a task with the future and the schedule function. /// let (task, handle) = async_task::spawn(future, schedule, ()); /// ``` pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) where F: Future<Output = R> + Send + 'static, R: Send + 'static, S: Fn(Task<T>) + Send + Sync + 'static, T: Send + Sync + 'static, { let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule); let task = Task { raw_task, _marker: PhantomData, }; let handle = JoinHandle { raw_task, _marker: PhantomData, }; (task, handle) } /// A task reference that runs its future. /// /// The [`Task`] reference "owns" the task itself and is able to run it. Running consumes the /// [`Task`] reference and polls its internal future. If the future is still pending after getting /// polled, the [`Task`] reference simply won't exist until a [`Waker`] notifies the task. If the /// future completes, its result becomes available to the [`JoinHandle`]. /// /// When the task is woken, the [`Task`] reference is recreated and passed to the schedule /// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of /// runnable tasks. /// /// If the [`Task`] reference is dropped without being run, the task is cancelled. When cancelled, /// the task won't be scheduled again even if a [`Waker`] wakes it. It is possible for the /// [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt to run /// the task won't do anything. /// /// [`run()`]: struct.Task.html#method.run /// [`JoinHandle`]: struct.JoinHandle.html /// [`Task`]: struct.Task.html /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html pub struct Task<T> { /// A pointer to the heap-allocated task. pub(crate) raw_task: NonNull<()>, /// A marker capturing the generic type `T`. pub(crate) _marker: PhantomData<T>, } unsafe impl<T> Send for Task<T> {} unsafe impl<T> Sync for Task<T> {} impl<T> Task<T> { /// Schedules the task. /// /// This is a convenience method that simply reschedules the task by passing it to its schedule /// function. /// /// If the task is cancelled, this method won't do anything. pub fn schedule(self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.schedule)(ptr); } } /// Runs the task. /// /// This method polls the task's future. If the future completes, its result will become /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to /// be woken in order to be rescheduled and then run again. /// /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do /// anything. /// /// It is possible that polling the future panics, in which case the panic will be propagated /// into the caller. It is advised that invocations of this method are wrapped inside /// [`catch_unwind`]. /// /// If a panic occurs, the task is automatically cancelled. /// /// [`JoinHandle`]: struct.JoinHandle.html /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html pub fn run(self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr); } } /// Cancels the task. /// /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt /// to run it won't do anything. /// /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html pub fn cancel(&self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; unsafe { (*header).cancel(); } } /// Returns a reference to the tag stored inside the task. pub fn tag(&self) -> &T { let offset = Header::offset_tag::<T>(); let ptr = self.raw_task.as_ptr(); unsafe { let raw = (ptr as *mut u8).add(offset) as *const T; &*raw } } } impl<T> Drop for Task<T> { fn drop(&mut self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; unsafe { // Cancel the task. (*header).cancel(); // Drop the future. ((*header).vtable.drop_future)(ptr); // Drop the task reference. ((*header).vtable.decrement)(ptr); } } } impl<T: fmt::Debug> fmt::Debug for Task<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; f.debug_struct("Task") .field("header", unsafe { &(*header) }) .field("tag", self.tag()) .finish() } }