Skip to main content

kayrx/karx/karx/kernel/
raw.rs

1use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::mem::{self, ManuallyDrop};
6use core::pin::Pin;
7use core::ptr::NonNull;
8use core::sync::atomic::{AtomicUsize, Ordering};
9use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
10
11use super::header::Header;
12use super::state::*;
13use super::utils::{abort, abort_on_panic, extend};
14use super::Task;
15
16/// The vtable for a task.
17pub(crate) struct TaskVTable {
18    /// Schedules the task.
19    pub(crate) schedule: unsafe fn(*const ()),
20
21    /// Drops the future inside the task.
22    pub(crate) drop_future: unsafe fn(*const ()),
23
24    /// Returns a pointer to the output stored after completion.
25    pub(crate) get_output: unsafe fn(*const ()) -> *const (),
26
27    /// Drops the task.
28    pub(crate) drop_task: unsafe fn(ptr: *const ()),
29
30    /// Destroys the task.
31    pub(crate) destroy: unsafe fn(*const ()),
32
33    /// Runs the task.
34    pub(crate) run: unsafe fn(*const ()) -> bool,
35
36    /// Creates a new waker associated with the task.
37    pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
38}
39
40/// Memory layout of a task.
41///
42/// This struct contains the following information:
43///
44/// 1. How to allocate and deallocate the task.
45/// 2. How to access the fields inside the task.
46#[derive(Clone, Copy)]
47pub(crate) struct TaskLayout {
48    /// Memory layout of the whole task.
49    pub(crate) layout: Layout,
50
51    /// Offset into the task at which the tag is stored.
52    pub(crate) offset_t: usize,
53
54    /// Offset into the task at which the schedule function is stored.
55    pub(crate) offset_s: usize,
56
57    /// Offset into the task at which the future is stored.
58    pub(crate) offset_f: usize,
59
60    /// Offset into the task at which the output is stored.
61    pub(crate) offset_r: usize,
62}
63
64/// Raw pointers to the fields inside a task.
65pub(crate) struct RawTask<F, R, S, T> {
66    /// The task header.
67    pub(crate) header: *const Header,
68
69    /// The schedule function.
70    pub(crate) schedule: *const S,
71
72    /// The tag inside the task.
73    pub(crate) tag: *mut T,
74
75    /// The future.
76    pub(crate) future: *mut F,
77
78    /// The output of the future.
79    pub(crate) output: *mut R,
80}
81
82impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
83
84impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
85    fn clone(&self) -> Self {
86        *self
87    }
88}
89
90impl<F, R, S, T> RawTask<F, R, S, T>
91where
92    F: Future<Output = R> + 'static,
93    S: Fn(Task<T>) + Send + Sync + 'static,
94{
95    const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
96        Self::clone_waker,
97        Self::wake,
98        Self::wake_by_ref,
99        Self::drop_waker,
100    );
101
102    /// Allocates a task with the given `future` and `schedule` function.
103    ///
104    /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
105    pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
106        // Compute the layout of the task for allocation. Abort if the computation fails.
107        let task_layout = abort_on_panic(|| Self::task_layout());
108
109        unsafe {
110            // Allocate enough space for the entire task.
111            let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
112                None => abort(),
113                Some(p) => p,
114            };
115
116            let raw = Self::from_ptr(raw_task.as_ptr());
117
118            // Write the header as the first field of the task.
119            (raw.header as *mut Header).write(Header {
120                state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
121                awaiter: UnsafeCell::new(None),
122                vtable: &TaskVTable {
123                    schedule: Self::schedule,
124                    drop_future: Self::drop_future,
125                    get_output: Self::get_output,
126                    drop_task: Self::drop_task,
127                    destroy: Self::destroy,
128                    run: Self::run,
129                    clone_waker: Self::clone_waker,
130                },
131            });
132
133            // Write the tag as the second field of the task.
134            (raw.tag as *mut T).write(tag);
135
136            // Write the schedule function as the third field of the task.
137            (raw.schedule as *mut S).write(schedule);
138
139            // Write the future as the fourth field of the task.
140            raw.future.write(future);
141
142            raw_task
143        }
144    }
145
146    /// Creates a `RawTask` from a raw task pointer.
147    #[inline]
148    pub(crate) fn from_ptr(ptr: *const ()) -> Self {
149        let task_layout = Self::task_layout();
150        let p = ptr as *const u8;
151
152        unsafe {
153            Self {
154                header: p as *const Header,
155                tag: p.add(task_layout.offset_t) as *mut T,
156                schedule: p.add(task_layout.offset_s) as *const S,
157                future: p.add(task_layout.offset_f) as *mut F,
158                output: p.add(task_layout.offset_r) as *mut R,
159            }
160        }
161    }
162
163    /// Returns the memory layout for a task.
164    #[inline]
165    fn task_layout() -> TaskLayout {
166        // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
167        let layout_header = Layout::new::<Header>();
168        let layout_t = Layout::new::<T>();
169        let layout_s = Layout::new::<S>();
170        let layout_f = Layout::new::<F>();
171        let layout_r = Layout::new::<R>();
172
173        // Compute the layout for `union { F, R }`.
174        let size_union = layout_f.size().max(layout_r.size());
175        let align_union = layout_f.align().max(layout_r.align());
176        let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
177
178        // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
179        let layout = layout_header;
180        let (layout, offset_t) = extend(layout, layout_t);
181        let (layout, offset_s) = extend(layout, layout_s);
182        let (layout, offset_union) = extend(layout, layout_union);
183        let offset_f = offset_union;
184        let offset_r = offset_union;
185
186        TaskLayout {
187            layout,
188            offset_t,
189            offset_s,
190            offset_f,
191            offset_r,
192        }
193    }
194
195    /// Wakes a waker.
196    unsafe fn wake(ptr: *const ()) {
197        // This is just an optimization. If the schedule function has captured variables, then
198        // we'll do less reference counting if we wake the waker by reference and then drop it.
199        if mem::size_of::<S>() > 0 {
200            Self::wake_by_ref(ptr);
201            Self::drop_waker(ptr);
202            return;
203        }
204
205        let raw = Self::from_ptr(ptr);
206
207        let mut state = (*raw.header).state.load(Ordering::Acquire);
208
209        loop {
210            // If the task is completed or closed, it can't be woken up.
211            if state & (COMPLETED | CLOSED) != 0 {
212                // Drop the waker.
213                Self::drop_waker(ptr);
214                break;
215            }
216
217            // If the task is already scheduled, we just need to synchronize with the thread that
218            // will run the task by "publishing" our current view of the memory.
219            if state & SCHEDULED != 0 {
220                // Update the state without actually modifying it.
221                match (*raw.header).state.compare_exchange_weak(
222                    state,
223                    state,
224                    Ordering::AcqRel,
225                    Ordering::Acquire,
226                ) {
227                    Ok(_) => {
228                        // Drop the waker.
229                        Self::drop_waker(ptr);
230                        break;
231                    }
232                    Err(s) => state = s,
233                }
234            } else {
235                // Mark the task as scheduled.
236                match (*raw.header).state.compare_exchange_weak(
237                    state,
238                    state | SCHEDULED,
239                    Ordering::AcqRel,
240                    Ordering::Acquire,
241                ) {
242                    Ok(_) => {
243                        // If the task is not yet scheduled and isn't currently running, now is the
244                        // time to schedule it.
245                        if state & RUNNING == 0 {
246                            // Schedule the task.
247                            Self::schedule(ptr);
248                        } else {
249                            // Drop the waker.
250                            Self::drop_waker(ptr);
251                        }
252
253                        break;
254                    }
255                    Err(s) => state = s,
256                }
257            }
258        }
259    }
260
261    /// Wakes a waker by reference.
262    unsafe fn wake_by_ref(ptr: *const ()) {
263        let raw = Self::from_ptr(ptr);
264
265        let mut state = (*raw.header).state.load(Ordering::Acquire);
266
267        loop {
268            // If the task is completed or closed, it can't be woken up.
269            if state & (COMPLETED | CLOSED) != 0 {
270                break;
271            }
272
273            // If the task is already scheduled, we just need to synchronize with the thread that
274            // will run the task by "publishing" our current view of the memory.
275            if state & SCHEDULED != 0 {
276                // Update the state without actually modifying it.
277                match (*raw.header).state.compare_exchange_weak(
278                    state,
279                    state,
280                    Ordering::AcqRel,
281                    Ordering::Acquire,
282                ) {
283                    Ok(_) => break,
284                    Err(s) => state = s,
285                }
286            } else {
287                // If the task is not running, we can schedule right away.
288                let new = if state & RUNNING == 0 {
289                    (state | SCHEDULED) + REFERENCE
290                } else {
291                    state | SCHEDULED
292                };
293
294                // Mark the task as scheduled.
295                match (*raw.header).state.compare_exchange_weak(
296                    state,
297                    new,
298                    Ordering::AcqRel,
299                    Ordering::Acquire,
300                ) {
301                    Ok(_) => {
302                        // If the task is not running, now is the time to schedule.
303                        if state & RUNNING == 0 {
304                            // If the reference count overflowed, abort.
305                            if state > isize::max_value() as usize {
306                                abort();
307                            }
308
309                            // Schedule the task. There is no need to call `Self::schedule(ptr)`
310                            // because the schedule function cannot be destroyed while the waker is
311                            // still alive.
312                            let task = Task {
313                                raw_task: NonNull::new_unchecked(ptr as *mut ()),
314                                _marker: PhantomData,
315                            };
316                            (*raw.schedule)(task);
317                        }
318
319                        break;
320                    }
321                    Err(s) => state = s,
322                }
323            }
324        }
325    }
326
327    /// Clones a waker.
328    unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
329        let raw = Self::from_ptr(ptr);
330
331        // Increment the reference count. With any kind of reference-counted data structure,
332        // relaxed ordering is appropriate when incrementing the counter.
333        let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
334
335        // If the reference count overflowed, abort.
336        if state > isize::max_value() as usize {
337            abort();
338        }
339
340        RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
341    }
342
343    /// Drops a waker.
344    ///
345    /// This function will decrement the reference count. If it drops down to zero, the associated
346    /// join handle has been dropped too, and the task has not been completed, then it will get
347    /// scheduled one more time so that its future gets dropped by the executor.
348    #[inline]
349    unsafe fn drop_waker(ptr: *const ()) {
350        let raw = Self::from_ptr(ptr);
351
352        // Decrement the reference count.
353        let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
354
355        // If this was the last reference to the task and the `JoinHandle` has been dropped too,
356        // then we need to decide how to destroy the task.
357        if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
358            if new & (COMPLETED | CLOSED) == 0 {
359                // If the task was not completed nor closed, close it and schedule one more time so
360                // that its future gets dropped by the executor.
361                (*raw.header)
362                    .state
363                    .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
364                Self::schedule(ptr);
365            } else {
366                // Otherwise, destroy the task right away.
367                Self::destroy(ptr);
368            }
369        }
370    }
371
372    /// Drops a task.
373    ///
374    /// This function will decrement the reference count. If it drops down to zero and the
375    /// associated join handle has been dropped too, then the task gets destroyed.
376    #[inline]
377    unsafe fn drop_task(ptr: *const ()) {
378        let raw = Self::from_ptr(ptr);
379
380        // Decrement the reference count.
381        let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
382
383        // If this was the last reference to the task and the `JoinHandle` has been dropped too,
384        // then destroy the task.
385        if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
386            Self::destroy(ptr);
387        }
388    }
389
390    /// Schedules a task for running.
391    ///
392    /// This function doesn't modify the state of the task. It only passes the task reference to
393    /// its schedule function.
394    unsafe fn schedule(ptr: *const ()) {
395        let raw = Self::from_ptr(ptr);
396
397        // If the schedule function has captured variables, create a temporary waker that prevents
398        // the task from getting deallocated while the function is being invoked.
399        let _waker;
400        if mem::size_of::<S>() > 0 {
401            _waker = Waker::from_raw(Self::clone_waker(ptr));
402        }
403
404        let task = Task {
405            raw_task: NonNull::new_unchecked(ptr as *mut ()),
406            _marker: PhantomData,
407        };
408        (*raw.schedule)(task);
409    }
410
411    /// Drops the future inside a task.
412    #[inline]
413    unsafe fn drop_future(ptr: *const ()) {
414        let raw = Self::from_ptr(ptr);
415
416        // We need a safeguard against panics because the destructor can panic.
417        abort_on_panic(|| {
418            raw.future.drop_in_place();
419        })
420    }
421
422    /// Returns a pointer to the output inside a task.
423    unsafe fn get_output(ptr: *const ()) -> *const () {
424        let raw = Self::from_ptr(ptr);
425        raw.output as *const ()
426    }
427
428    /// Cleans up task's resources and deallocates it.
429    ///
430    /// The schedule function and the tag will be dropped, and the task will then get deallocated.
431    /// The task must be closed before this function is called.
432    #[inline]
433    unsafe fn destroy(ptr: *const ()) {
434        let raw = Self::from_ptr(ptr);
435        let task_layout = Self::task_layout();
436
437        // We need a safeguard against panics because destructors can panic.
438        abort_on_panic(|| {
439            // Drop the schedule function.
440            (raw.schedule as *mut S).drop_in_place();
441
442            // Drop the tag.
443            (raw.tag as *mut T).drop_in_place();
444        });
445
446        // Finally, deallocate the memory reserved by the task.
447        alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
448    }
449
450    /// Runs a task.
451    ///
452    /// If polling its future panics, the task will be closed and the panic will be propagated into
453    /// the caller.
454    unsafe fn run(ptr: *const ()) -> bool {
455        let raw = Self::from_ptr(ptr);
456
457        // Create a context from the raw task pointer and the vtable inside the its header.
458        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
459        let cx = &mut Context::from_waker(&waker);
460
461        let mut state = (*raw.header).state.load(Ordering::Acquire);
462
463        // Update the task's state before polling its future.
464        loop {
465            // If the task has already been closed, drop the task reference and return.
466            if state & CLOSED != 0 {
467                // Drop the future.
468                Self::drop_future(ptr);
469
470                // Mark the task as unscheduled.
471                let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
472
473                // Notify the awaiter that the future has been dropped.
474                if state & AWAITER != 0 {
475                    (*raw.header).notify(None);
476                }
477
478                // Drop the task reference.
479                Self::drop_task(ptr);
480                return false;
481            }
482
483            // Mark the task as unscheduled and running.
484            match (*raw.header).state.compare_exchange_weak(
485                state,
486                (state & !SCHEDULED) | RUNNING,
487                Ordering::AcqRel,
488                Ordering::Acquire,
489            ) {
490                Ok(_) => {
491                    // Update the state because we're continuing with polling the future.
492                    state = (state & !SCHEDULED) | RUNNING;
493                    break;
494                }
495                Err(s) => state = s,
496            }
497        }
498
499        // Poll the inner future, but surround it with a guard that closes the task in case polling
500        // panics.
501        let guard = Guard(raw);
502        let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
503        mem::forget(guard);
504
505        match poll {
506            Poll::Ready(out) => {
507                // Replace the future with its output.
508                Self::drop_future(ptr);
509                raw.output.write(out);
510
511                // A place where the output will be stored in case it needs to be dropped.
512                let mut output = None;
513
514                // The task is now completed.
515                loop {
516                    // If the handle is dropped, we'll need to close it and drop the output.
517                    let new = if state & HANDLE == 0 {
518                        (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
519                    } else {
520                        (state & !RUNNING & !SCHEDULED) | COMPLETED
521                    };
522
523                    // Mark the task as not running and completed.
524                    match (*raw.header).state.compare_exchange_weak(
525                        state,
526                        new,
527                        Ordering::AcqRel,
528                        Ordering::Acquire,
529                    ) {
530                        Ok(_) => {
531                            // If the handle is dropped or if the task was closed while running,
532                            // now it's time to drop the output.
533                            if state & HANDLE == 0 || state & CLOSED != 0 {
534                                // Read the output.
535                                output = Some(raw.output.read());
536                            }
537
538                            // Notify the awaiter that the task has been completed.
539                            if state & AWAITER != 0 {
540                                (*raw.header).notify(None);
541                            }
542
543                            // Drop the task reference.
544                            Self::drop_task(ptr);
545                            break;
546                        }
547                        Err(s) => state = s,
548                    }
549                }
550
551                // Drop the output if it was taken out of the task.
552                drop(output);
553            }
554            Poll::Pending => {
555                let mut future_dropped = false;
556
557                // The task is still not completed.
558                loop {
559                    // If the task was closed while running, we'll need to unschedule in case it
560                    // was woken up and then destroy it.
561                    let new = if state & CLOSED != 0 {
562                        state & !RUNNING & !SCHEDULED
563                    } else {
564                        state & !RUNNING
565                    };
566
567                    if state & CLOSED != 0 && !future_dropped {
568                        // The thread that closed the task didn't drop the future because it was
569                        // running so now it's our responsibility to do so.
570                        Self::drop_future(ptr);
571                        future_dropped = true;
572                    }
573
574                    // Mark the task as not running.
575                    match (*raw.header).state.compare_exchange_weak(
576                        state,
577                        new,
578                        Ordering::AcqRel,
579                        Ordering::Acquire,
580                    ) {
581                        Ok(state) => {
582                            // If the task was closed while running, we need to notify the awaiter.
583                            // If the task was woken up while running, we need to schedule it.
584                            // Otherwise, we just drop the task reference.
585                            if state & CLOSED != 0 {
586                                // Notify the awaiter that the future has been dropped.
587                                if state & AWAITER != 0 {
588                                    (*raw.header).notify(None);
589                                }
590                                // Drop the task reference.
591                                Self::drop_task(ptr);
592                            } else if state & SCHEDULED != 0 {
593                                // The thread that woke the task up didn't reschedule it because
594                                // it was running so now it's our responsibility to do so.
595                                Self::schedule(ptr);
596                                return true;
597                            } else {
598                                // Drop the task reference.
599                                Self::drop_task(ptr);
600                            }
601                            break;
602                        }
603                        Err(s) => state = s,
604                    }
605                }
606            }
607        }
608
609        return false;
610
611        /// A guard that closes the task if polling its future panics.
612        struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
613        where
614            F: Future<Output = R> + 'static,
615            S: Fn(Task<T>) + Send + Sync + 'static;
616
617        impl<F, R, S, T> Drop for Guard<F, R, S, T>
618        where
619            F: Future<Output = R> + 'static,
620            S: Fn(Task<T>) + Send + Sync + 'static,
621        {
622            fn drop(&mut self) {
623                let raw = self.0;
624                let ptr = raw.header as *const ();
625
626                unsafe {
627                    let mut state = (*raw.header).state.load(Ordering::Acquire);
628
629                    loop {
630                        // If the task was closed while running, then unschedule it, drop its
631                        // future, and drop the task reference.
632                        if state & CLOSED != 0 {
633                            // The thread that closed the task didn't drop the future because it
634                            // was running so now it's our responsibility to do so.
635                            RawTask::<F, R, S, T>::drop_future(ptr);
636
637                            // Mark the task as not running and not scheduled.
638                            (*raw.header)
639                                .state
640                                .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
641
642                            // Notify the awaiter that the future has been dropped.
643                            if state & AWAITER != 0 {
644                                (*raw.header).notify(None);
645                            }
646
647                            // Drop the task reference.
648                            RawTask::<F, R, S, T>::drop_task(ptr);
649                            break;
650                        }
651
652                        // Mark the task as not running, not scheduled, and closed.
653                        match (*raw.header).state.compare_exchange_weak(
654                            state,
655                            (state & !RUNNING & !SCHEDULED) | CLOSED,
656                            Ordering::AcqRel,
657                            Ordering::Acquire,
658                        ) {
659                            Ok(state) => {
660                                // Drop the future because the task is now closed.
661                                RawTask::<F, R, S, T>::drop_future(ptr);
662
663                                // Notify the awaiter that the future has been dropped.
664                                if state & AWAITER != 0 {
665                                    (*raw.header).notify(None);
666                                }
667
668                                // Drop the task reference.
669                                RawTask::<F, R, S, T>::drop_task(ptr);
670                                break;
671                            }
672                            Err(s) => state = s,
673                        }
674                    }
675                }
676            }
677        }
678    }
679}