asynchronix/executor/
injector.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Mutex;
3use std::{mem, vec};
4
5/// An unfair injector queue which stores batches of tasks in bounded-size
6/// buckets.
7///
8/// This is a simple but effective unfair injector design which, despite being
9/// based on a mutex-protected `Vec`, ensures low contention and low latency in
10/// most realistic cases.
11///
12/// This is achieved by enabling the worker to push and pop batches of tasks
13/// readily stored in buckets. Since only the handles to the buckets are moved
14/// to and from the injector, pushing and popping a bucket is very fast and the
15/// lock is therefore only held for a very short time.
16///
17/// Also, since tasks in a bucket are memory-contiguous, they can be efficiently
18/// copied to and from worker queues. The use of buckets also keeps the size of
19/// the injector queue small (its size is the number of buckets) so
20/// re-allocation is rare and fast.
21///
22/// As an additional optimization, an `is_empty` atomic flag allows workers
23/// seeking for tasks to skip taking the lock if the queue is likely to be
24/// empty.
25///
26/// The queue is not strictly LIFO. While buckets are indeed pushed and popped
27/// in LIFO order, individual tasks are stored in a bucket at the front of the
28/// queue and this bucket is only moved to the back of the queue when full.
29#[derive(Debug)]
30pub(crate) struct Injector<T, const BUCKET_CAPACITY: usize> {
31    /// A mutex-protected list of tasks.
32    inner: Mutex<Vec<Bucket<T, BUCKET_CAPACITY>>>,
33    /// A flag indicating whether the injector queue is empty.
34    is_empty: AtomicBool,
35}
36
37impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
38    /// Creates an empty injector queue.
39    ///
40    /// # Panics
41    ///
42    /// Panics if the capacity is 0.
43    pub(crate) const fn new() -> Self {
44        assert!(BUCKET_CAPACITY >= 1);
45
46        Self {
47            inner: Mutex::new(Vec::new()),
48            is_empty: AtomicBool::new(true),
49        }
50    }
51
52    /// Inserts a task.
53    ///
54    /// The task is inserted in a bucket at the front of the queue. Once this
55    /// bucket is full, it is moved to the back of the queue.
56    pub(crate) fn insert_task(&self, task: T) {
57        let mut inner = self.inner.lock().unwrap();
58
59        // Try to push the task onto the first bucket if it has enough capacity left.
60        if let Some(bucket) = inner.first_mut() {
61            if let Err(task) = bucket.push(task) {
62                // The bucket is full: move it to the back of the vector and
63                // replace it with a newly created bucket that contains the
64                // task.
65                let mut new_bucket = Bucket::new();
66                let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1
67
68                let full_bucket = mem::replace(bucket, new_bucket);
69                inner.push(full_bucket);
70            }
71
72            return;
73        }
74
75        // The queue is empty: create a new bucket.
76        let mut new_bucket = Bucket::new();
77        let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1
78
79        inner.push(new_bucket);
80
81        // Ordering: this flag is only used as a hint so Relaxed ordering is
82        // sufficient.
83        self.is_empty.store(false, Ordering::Relaxed);
84    }
85
86    /// Appends a bucket to the back of the queue.
87    pub(crate) fn push_bucket(&self, bucket: Bucket<T, BUCKET_CAPACITY>) {
88        let mut inner = self.inner.lock().unwrap();
89
90        let was_empty = inner.is_empty();
91        inner.push(bucket);
92
93        // If the queue was empty before, update the flag.
94        if was_empty {
95            // Ordering: this flag is only used as a hint so Relaxed ordering is
96            // sufficient.
97            self.is_empty.store(false, Ordering::Relaxed);
98        }
99    }
100
101    /// Takes the bucket at the back of the queue, if any.
102    ///
103    /// Note that this can spuriously return `None` even though the queue is
104    /// populated, unless a happens-before relationship exists between the
105    /// thread that populated the queue and the thread calling this method (this
106    /// is obviously the case if they are the same thread).
107    ///
108    /// This is not an issue in practice because it cannot lead to executor
109    /// deadlock. Indeed, if the last task/bucket was inserted by a worker
110    /// thread, that worker thread will always see that the injector queue is
111    /// populated (unless the bucket was already popped). Therefore, if all
112    /// workers exit, then all tasks they have re-injected will necessarily have
113    /// been processed. Likewise, if the last task/bucket was inserted by the
114    /// main executor thread before `Executor::run()` is called, the
115    /// synchronization established when the executor unparks worker threads
116    /// ensures that the task is visible to all unparked workers (there is
117    /// actually an edge case when the executor cannot unpark a thread after
118    /// pushing tasks, but this is taken care of by some extra synchronization
119    /// when deactivating workers).
120    pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> {
121        // Ordering: this flag is only used as a hint so Relaxed ordering is
122        // sufficient.
123        if self.is_empty.load(Ordering::Relaxed) {
124            return None;
125        }
126
127        let mut inner = self.inner.lock().unwrap();
128
129        let bucket = inner.pop();
130
131        if inner.is_empty() {
132            // Ordering: this flag is only used as a hint so Relaxed ordering is
133            // sufficient.
134            self.is_empty.store(true, Ordering::Relaxed);
135        }
136
137        bucket
138    }
139
140    /// Checks whether the queue is empty.
141    ///
142    /// Note that this can spuriously return `true` even though the queue is
143    /// populated, unless a happens-before relationship exists between the
144    /// thread that populated the queue and the thread calling this method (this
145    /// is obviously the case if they are the same thread).
146    pub(crate) fn is_empty(&self) -> bool {
147        self.is_empty.load(Ordering::Relaxed)
148    }
149}
150
151/// A collection of tasks with a bounded size.
152///
153/// This is just a very thin wrapper around a `Vec` that ensures that the
154/// nominal capacity bound is never exceeded.
155#[derive(Debug)]
156pub(crate) struct Bucket<T, const CAPACITY: usize>(Vec<T>);
157
158impl<T, const CAPACITY: usize> Bucket<T, CAPACITY> {
159    /// Creates a new bucket, allocating the full capacity upfront.
160    pub(crate) fn new() -> Self {
161        Self(Vec::with_capacity(CAPACITY))
162    }
163
164    /// Returns the bucket's nominal capacity.
165    pub(crate) const fn capacity() -> usize {
166        CAPACITY
167    }
168
169    /// Appends one task if capacity allows; otherwise returns the task in the
170    /// error.
171    pub(crate) fn push(&mut self, task: T) -> Result<(), T> {
172        if self.0.len() < CAPACITY {
173            self.0.push(task);
174            Ok(())
175        } else {
176            Err(task)
177        }
178    }
179}
180
181impl<T, const CAPACITY: usize> IntoIterator for Bucket<T, CAPACITY> {
182    type Item = T;
183    type IntoIter = vec::IntoIter<T>;
184
185    fn into_iter(self) -> Self::IntoIter {
186        self.0.into_iter()
187    }
188}
189
190impl<T, const CAPACITY: usize> FromIterator<T> for Bucket<T, CAPACITY> {
191    fn from_iter<U: IntoIterator<Item = T>>(iter: U) -> Self {
192        Self(Vec::from_iter(iter.into_iter().take(CAPACITY)))
193    }
194}