1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::{mem, vec};

/// An unfair injector queue which stores batches of tasks in bounded-size
/// buckets.
///
/// This is a simple but effective unfair injector design which, despite being
/// based on a mutex-protected `Vec`, ensures low contention and low latency in
/// most realistic cases.
///
/// This is achieved by enabling the worker to push and pop batches of tasks
/// readily stored in buckets. Since only the handles to the buckets are moved
/// to and from the injector, pushing and popping a bucket is very fast and the
/// lock is therefore only held for a very short time.
///
/// Also, since tasks in a bucket are memory-contiguous, they can be efficiently
/// copied to and from worker queues. The use of buckets also keeps the size of
/// the injector queue small (its size is the number of buckets) so
/// re-allocation is rare and fast.
///
/// As an additional optimization, an `is_empty` atomic flag allows workers
/// seeking for tasks to skip taking the lock if the queue is likely to be
/// empty.
///
/// The queue is not strictly LIFO. While buckets are indeed pushed and popped
/// in LIFO order, individual tasks are stored in a bucket at the front of the
/// queue and this bucket is only moved to the back of the queue when full.
#[derive(Debug)]
pub(crate) struct Injector<T, const BUCKET_CAPACITY: usize> {
    /// A mutex-protected list of tasks.
    inner: Mutex<Vec<Bucket<T, BUCKET_CAPACITY>>>,
    /// A flag indicating whether the injector queue is empty.
    is_empty: AtomicBool,
}

impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
    /// Creates an empty injector queue.
    ///
    /// # Panics
    ///
    /// Panics if the capacity is 0.
    pub(crate) const fn new() -> Self {
        assert!(BUCKET_CAPACITY >= 1);

        Self {
            inner: Mutex::new(Vec::new()),
            is_empty: AtomicBool::new(true),
        }
    }

    /// Inserts a task.
    ///
    /// The task is inserted in a bucket at the front of the queue. Once this
    /// bucket is full, it is moved to the back of the queue.
    pub(crate) fn insert_task(&self, task: T) {
        let mut inner = self.inner.lock().unwrap();

        // Try to push the task onto the first bucket if it has enough capacity left.
        if let Some(bucket) = inner.first_mut() {
            if let Err(task) = bucket.push(task) {
                // The bucket is full: move it to the back of the vector and
                // replace it with a newly created bucket that contains the
                // task.
                let mut new_bucket = Bucket::new();
                let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1

                let full_bucket = mem::replace(bucket, new_bucket);
                inner.push(full_bucket);
            }

            return;
        }

        // The queue is empty: create a new bucket.
        let mut new_bucket = Bucket::new();
        let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1

        inner.push(new_bucket);

        // Ordering: this flag is only used as a hint so Relaxed ordering is
        // sufficient.
        self.is_empty.store(false, Ordering::Relaxed);
    }

    /// Appends a bucket to the back of the queue.
    pub(crate) fn push_bucket(&self, bucket: Bucket<T, BUCKET_CAPACITY>) {
        let mut inner = self.inner.lock().unwrap();

        let was_empty = inner.is_empty();
        inner.push(bucket);

        // If the queue was empty before, update the flag.
        if was_empty {
            // Ordering: this flag is only used as a hint so Relaxed ordering is
            // sufficient.
            self.is_empty.store(false, Ordering::Relaxed);
        }
    }

    /// Takes the bucket at the back of the queue, if any.
    ///
    /// Note that this can spuriously return `None` even though the queue is
    /// populated, unless a happens-before relationship exists between the
    /// thread that populated the queue and the thread calling this method (this
    /// is obviously the case if they are the same thread).
    ///
    /// This is not an issue in practice because it cannot lead to executor
    /// deadlock. Indeed, if the last task/bucket was inserted by a worker
    /// thread, that worker thread will always see that the injector queue is
    /// populated (unless the bucket was already popped). Therefore, if all
    /// workers exit, then all tasks they have re-injected will necessarily have
    /// been processed. Likewise, if the last task/bucket was inserted by the
    /// main executor thread before `Executor::run()` is called, the
    /// synchronization established when the executor unparks worker threads
    /// ensures that the task is visible to all unparked workers (there is
    /// actually an edge case when the executor cannot unpark a thread after
    /// pushing tasks, but this is taken care of by some extra synchronization
    /// when deactivating workers).
    pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> {
        // Ordering: this flag is only used as a hint so Relaxed ordering is
        // sufficient.
        if self.is_empty.load(Ordering::Relaxed) {
            return None;
        }

        let mut inner = self.inner.lock().unwrap();

        let bucket = inner.pop();

        if inner.is_empty() {
            // Ordering: this flag is only used as a hint so Relaxed ordering is
            // sufficient.
            self.is_empty.store(true, Ordering::Relaxed);
        }

        bucket
    }

    /// Checks whether the queue is empty.
    ///
    /// Note that this can spuriously return `true` even though the queue is
    /// populated, unless a happens-before relationship exists between the
    /// thread that populated the queue and the thread calling this method (this
    /// is obviously the case if they are the same thread).
    pub(crate) fn is_empty(&self) -> bool {
        self.is_empty.load(Ordering::Relaxed)
    }
}

/// A collection of tasks with a bounded size.
///
/// This is just a very thin wrapper around a `Vec` that ensures that the
/// nominal capacity bound is never exceeded.
#[derive(Debug)]
pub(crate) struct Bucket<T, const CAPACITY: usize>(Vec<T>);

impl<T, const CAPACITY: usize> Bucket<T, CAPACITY> {
    /// Creates a new bucket, allocating the full capacity upfront.
    pub(crate) fn new() -> Self {
        Self(Vec::with_capacity(CAPACITY))
    }

    /// Returns the bucket's nominal capacity.
    pub(crate) const fn capacity() -> usize {
        CAPACITY
    }

    /// Appends one task if capacity allows; otherwise returns the task in the
    /// error.
    pub(crate) fn push(&mut self, task: T) -> Result<(), T> {
        if self.0.len() < CAPACITY {
            self.0.push(task);
            Ok(())
        } else {
            Err(task)
        }
    }
}

impl<T, const CAPACITY: usize> IntoIterator for Bucket<T, CAPACITY> {
    type Item = T;
    type IntoIter = vec::IntoIter<T>;

    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

impl<T, const CAPACITY: usize> FromIterator<T> for Bucket<T, CAPACITY> {
    fn from_iter<U: IntoIterator<Item = T>>(iter: U) -> Self {
        Self(Vec::from_iter(iter.into_iter().take(CAPACITY)))
    }
}