kayrx_karx/
task.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
7use core::task::Waker;
8
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::state::*;
12use crate::JoinHandle;
13
14/// Creates a new task.
15///
16/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
17/// awaits its result.
18///
19/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
20/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
21///
22/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
23/// push the task into some kind of queue so that it can be processed later.
24///
25/// If you need to spawn a future that does not implement [`Send`], consider using the
26/// [`spawn_local`] function instead.
27///
28/// [`Task`]: struct.Task.html
29/// [`JoinHandle`]: struct.JoinHandle.html
30/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
31/// [`spawn_local`]: fn.spawn_local.html
32///
33/// # Examples
34///
35/// ```
36/// use crossbeam::channel;
37///
38/// // The future inside the task.
39/// let future = async {
40///     println!("Hello, world!");
41/// };
42///
43/// // If the task gets woken up, it will be sent into this channel.
44/// let (s, r) = channel::unbounded();
45/// let schedule = move |task| s.send(task).unwrap();
46///
47/// // Create a task with the future and the schedule function.
48/// let (task, handle) = kayrx_karx::spawn(future, schedule, ());
49/// ```
50pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
51where
52    F: Future<Output = R> + Send + 'static,
53    R: Send + 'static,
54    S: Fn(Task<T>) + Send + Sync + 'static,
55    T: Send + Sync + 'static,
56{
57    // Allocate large futures on the heap.
58    let raw_task = if mem::size_of::<F>() >= 2048 {
59        let future = alloc::boxed::Box::pin(future);
60        RawTask::<_, R, S, T>::allocate(future, schedule, tag)
61    } else {
62        RawTask::<F, R, S, T>::allocate(future, schedule, tag)
63    };
64
65    let task = Task {
66        raw_task,
67        _marker: PhantomData,
68    };
69    let handle = JoinHandle {
70        raw_task,
71        _marker: PhantomData,
72    };
73    (task, handle)
74}
75
76/// Creates a new local task.
77///
78/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
79/// awaits its result.
80///
81/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
82/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
83///
84/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
85/// push the task into some kind of queue so that it can be processed later.
86///
87/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
88/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
89///
90/// **NOTE:** This function is only available when the `std` feature for this crate is enabled (it
91/// is by default).
92///
93/// [`Task`]: struct.Task.html
94/// [`JoinHandle`]: struct.JoinHandle.html
95/// [`spawn`]: fn.spawn.html
96/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
97///
98/// # Examples
99///
100/// ```
101/// use crossbeam::channel;
102///
103/// // The future inside the task.
104/// let future = async {
105///     println!("Hello, world!");
106/// };
107///
108/// // If the task gets woken up, it will be sent into this channel.
109/// let (s, r) = channel::unbounded();
110/// let schedule = move |task| s.send(task).unwrap();
111///
112/// // Create a task with the future and the schedule function.
113/// let (task, handle) = kayrx_karx::spawn_local(future, schedule, ());
114/// ```
115pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
116where
117    F: Future<Output = R> + 'static,
118    R: 'static,
119    S: Fn(Task<T>) + Send + Sync + 'static,
120    T: Send + Sync + 'static,
121{
122    extern crate std;
123
124    use std::mem::ManuallyDrop;
125    use std::pin::Pin;
126    use std::task::{Context, Poll};
127    use std::thread::{self, ThreadId};
128    use std::thread_local;
129
130    #[inline]
131    fn thread_id() -> ThreadId {
132        thread_local! {
133            static ID: ThreadId = thread::current().id();
134        }
135        ID.try_with(|id| *id)
136            .unwrap_or_else(|_| thread::current().id())
137    }
138
139    struct Checked<F> {
140        id: ThreadId,
141        inner: ManuallyDrop<F>,
142    }
143
144    impl<F> Drop for Checked<F> {
145        fn drop(&mut self) {
146            assert!(
147                self.id == thread_id(),
148                "local task dropped by a thread that didn't spawn it"
149            );
150            unsafe {
151                ManuallyDrop::drop(&mut self.inner);
152            }
153        }
154    }
155
156    impl<F: Future> Future for Checked<F> {
157        type Output = F::Output;
158
159        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160            assert!(
161                self.id == thread_id(),
162                "local task polled by a thread that didn't spawn it"
163            );
164            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
165        }
166    }
167
168    // Wrap the future into one that which thread it's on.
169    let future = Checked {
170        id: thread_id(),
171        inner: ManuallyDrop::new(future),
172    };
173
174    // Allocate large futures on the heap.
175    let raw_task = if mem::size_of::<F>() >= 2048 {
176        let future = alloc::boxed::Box::pin(future);
177        RawTask::<_, R, S, T>::allocate(future, schedule, tag)
178    } else {
179        RawTask::<_, R, S, T>::allocate(future, schedule, tag)
180    };
181
182    let task = Task {
183        raw_task,
184        _marker: PhantomData,
185    };
186    let handle = JoinHandle {
187        raw_task,
188        _marker: PhantomData,
189    };
190    (task, handle)
191}
192
193/// A task reference that runs its future.
194///
195/// At any moment in time, there is at most one [`Task`] reference associated with a particular
196/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
197/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
198/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
199///
200/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
201/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
202/// runnable tasks.
203///
204/// If the [`Task`] reference is dropped without getting run, the task is automatically canceled.
205/// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
206/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
207/// to run the task won't do anything.
208///
209/// [`run()`]: struct.Task.html#method.run
210/// [`JoinHandle`]: struct.JoinHandle.html
211/// [`Task`]: struct.Task.html
212/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
213pub struct Task<T> {
214    /// A pointer to the heap-allocated task.
215    pub(crate) raw_task: NonNull<()>,
216
217    /// A marker capturing the generic type `T`.
218    pub(crate) _marker: PhantomData<T>,
219}
220
221unsafe impl<T> Send for Task<T> {}
222unsafe impl<T> Sync for Task<T> {}
223
224impl<T> Task<T> {
225    /// Schedules the task.
226    ///
227    /// This is a convenience method that simply reschedules the task by passing it to its schedule
228    /// function.
229    ///
230    /// If the task is canceled, this method won't do anything.
231    pub fn schedule(self) {
232        let ptr = self.raw_task.as_ptr();
233        let header = ptr as *const Header;
234        mem::forget(self);
235
236        unsafe {
237            ((*header).vtable.schedule)(ptr);
238        }
239    }
240
241    /// Runs the task.
242    ///
243    /// Returns `true` if the task was woken while running, in which case it gets rescheduled at
244    /// the end of this method invocation.
245    ///
246    /// This method polls the task's future. If the future completes, its result will become
247    /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
248    /// be woken up in order to be rescheduled and run again.
249    ///
250    /// If the task was canceled by a [`JoinHandle`] before it gets run, then this method won't do
251    /// anything.
252    ///
253    /// It is possible that polling the future panics, in which case the panic will be propagated
254    /// into the caller. It is advised that invocations of this method are wrapped inside
255    /// [`catch_unwind`]. If a panic occurs, the task is automatically canceled.
256    ///
257    /// [`JoinHandle`]: struct.JoinHandle.html
258    /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
259    pub fn run(self) -> bool {
260        let ptr = self.raw_task.as_ptr();
261        let header = ptr as *const Header;
262        mem::forget(self);
263
264        unsafe { ((*header).vtable.run)(ptr) }
265    }
266
267    /// Cancels the task.
268    ///
269    /// When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
270    /// to run it won't do anything.
271    ///
272    /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
273    pub fn cancel(&self) {
274        let ptr = self.raw_task.as_ptr();
275        let header = ptr as *const Header;
276
277        unsafe {
278            (*header).cancel();
279        }
280    }
281
282    /// Returns a reference to the tag stored inside the task.
283    pub fn tag(&self) -> &T {
284        let offset = Header::offset_tag::<T>();
285        let ptr = self.raw_task.as_ptr();
286
287        unsafe {
288            let raw = (ptr as *mut u8).add(offset) as *const T;
289            &*raw
290        }
291    }
292
293    /// Converts this task into a raw pointer to the tag.
294    pub fn into_raw(self) -> *const T {
295        let offset = Header::offset_tag::<T>();
296        let ptr = self.raw_task.as_ptr();
297        mem::forget(self);
298
299        unsafe { (ptr as *mut u8).add(offset) as *const T }
300    }
301
302    /// Converts a raw pointer to the tag into a task.
303    ///
304    /// This method should only be used with raw pointers returned from [`into_raw`].
305    ///
306    /// [`into_raw`]: #method.into_raw
307    pub unsafe fn from_raw(raw: *const T) -> Task<T> {
308        let offset = Header::offset_tag::<T>();
309        let ptr = (raw as *mut u8).sub(offset) as *mut ();
310
311        Task {
312            raw_task: NonNull::new_unchecked(ptr),
313            _marker: PhantomData,
314        }
315    }
316
317    /// Returns a waker associated with this task.
318    pub fn waker(&self) -> Waker {
319        let ptr = self.raw_task.as_ptr();
320        let header = ptr as *const Header;
321
322        unsafe {
323            let raw_waker = ((*header).vtable.clone_waker)(ptr);
324            Waker::from_raw(raw_waker)
325        }
326    }
327}
328
329impl<T> Drop for Task<T> {
330    fn drop(&mut self) {
331        let ptr = self.raw_task.as_ptr();
332        let header = ptr as *const Header;
333
334        unsafe {
335            // Cancel the task.
336            (*header).cancel();
337
338            // Drop the future.
339            ((*header).vtable.drop_future)(ptr);
340
341            // Mark the task as unscheduled.
342            let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
343
344            // Notify the awaiter that the future has been dropped.
345            if state & AWAITER != 0 {
346                (*header).notify(None);
347            }
348
349            // Drop the task reference.
350            ((*header).vtable.drop_task)(ptr);
351        }
352    }
353}
354
355impl<T: fmt::Debug> fmt::Debug for Task<T> {
356    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357        let ptr = self.raw_task.as_ptr();
358        let header = ptr as *const Header;
359
360        f.debug_struct("Task")
361            .field("header", unsafe { &(*header) })
362            .field("tag", self.tag())
363            .finish()
364    }
365}