1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT/Apache-2.0 License, at your convenience
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use core::{fmt, future::Future, marker::PhantomData, mem, ptr::NonNull};

use crate::task::{header::Header, raw::RawTask, state::*, JoinHandle};

/// Creates a new local task.
///
/// This constructor returns a [`Task`] reference that runs the future and a
/// [`JoinHandle`] that awaits its result.
///
/// When run, the task polls `future`. When woken up, it gets scheduled for
/// running by the `schedule` function.
///
/// [`Task`]: struct.Task.html
/// [`JoinHandle`]: struct.JoinHandle.html
pub(crate) fn spawn_local<F, R, S>(
    executor_id: usize,
    future: F,
    schedule: S,
) -> (Task, JoinHandle<R>)
where
    F: Future<Output = R>,
    S: Fn(Task),
{
    // Allocate large futures on the heap.
    let raw_task = if mem::size_of::<F>() >= 2048 {
        let future = alloc::boxed::Box::pin(future);
        RawTask::<_, R, S>::allocate(future, schedule, executor_id)
    } else {
        RawTask::<_, R, S>::allocate(future, schedule, executor_id)
    };

    let task = Task { raw_task };
    let handle = JoinHandle {
        raw_task,
        _marker: PhantomData,
    };
    (task, handle)
}

/// A task reference that runs its future.
///
/// At any moment in time, there is at most one [`Task`] reference associated
/// with a particular task. Running consumes the [`Task`] reference and polls
/// its internal future. If the future is still pending after getting polled,
/// the [`Task`] reference simply won't exist until a [`Waker`] notifies the
/// task. If the future completes, its result becomes available to the
/// [`JoinHandle`].
///
/// When a task is woken up, its [`Task`] reference is recreated and passed to
/// the schedule function. In most executors, scheduling simply pushes the
/// [`Task`] reference into a queue of runnable tasks.
///
/// If the [`Task`] reference is dropped without getting run, the task is
/// automatically canceled. When canceled, the task won't be scheduled again
/// even if a [`Waker`] wakes it. It is possible for the [`JoinHandle`] to
/// cancel while the [`Task`] reference exists, in which case an attempt
/// to run the task won't do anything.
///
/// [`run()`]: struct.Task.html#method.run
/// [`JoinHandle`]: struct.JoinHandle.html
/// [`Task`]: struct.Task.html
/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
pub struct Task {
    /// A pointer to the heap-allocated task.
    pub(crate) raw_task: NonNull<()>,
}

impl Task {
    /// Schedules the task.
    ///
    /// This is a convenience method that simply reschedules the task by passing
    /// it to its schedule function.
    ///
    /// If the task is canceled, this method won't do anything.
    pub fn schedule(self) {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;
        mem::forget(self);

        unsafe {
            ((*header).vtable.schedule)(ptr);
        }
    }

    /// Runs the task.
    ///
    /// Returns `true` if the task was woken while running, in which case it
    /// gets rescheduled at the end of this method invocation.
    ///
    /// This method polls the task's future. If the future completes, its result
    /// will become available to the [`JoinHandle`]. And if the future is
    /// still pending, the task will have to be woken up in order to be
    /// rescheduled and run again.
    ///
    /// If the task was canceled by a [`JoinHandle`] before it gets run, then
    /// this method won't do anything.
    ///
    /// It is possible that polling the future panics, in which case the panic
    /// will be propagated into the caller. It is advised that invocations
    /// of this method are wrapped inside [`catch_unwind`]. If a panic
    /// occurs, the task is automatically canceled.
    ///
    /// [`JoinHandle`]: struct.JoinHandle.html
    /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
    pub fn run(self) -> bool {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;
        mem::forget(self);

        unsafe { ((*header).vtable.run)(ptr) }
    }
}

impl Drop for Task {
    fn drop(&mut self) {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *mut Header;

        unsafe {
            // Cancel the task.
            (*header).cancel();

            // Drop the future.
            ((*header).vtable.drop_future)(ptr);

            // Mark the task as unscheduled.
            (*header).state &= !SCHEDULED;

            // Notify the awaiter that the future has been dropped.
            (*header).notify(None);

            // Drop the task reference.
            ((*header).vtable.drop_task)(ptr);
        }
    }
}

impl fmt::Debug for Task {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *const Header;

        f.debug_struct("Task")
            .field("header", unsafe { &(*header) })
            .finish()
    }
}