asynchronix/executor/injector.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Mutex;
3use std::{mem, vec};
4
5/// An unfair injector queue which stores batches of tasks in bounded-size
6/// buckets.
7///
8/// This is a simple but effective unfair injector design which, despite being
9/// based on a mutex-protected `Vec`, ensures low contention and low latency in
10/// most realistic cases.
11///
12/// This is achieved by enabling the worker to push and pop batches of tasks
13/// readily stored in buckets. Since only the handles to the buckets are moved
14/// to and from the injector, pushing and popping a bucket is very fast and the
15/// lock is therefore only held for a very short time.
16///
17/// Also, since tasks in a bucket are memory-contiguous, they can be efficiently
18/// copied to and from worker queues. The use of buckets also keeps the size of
19/// the injector queue small (its size is the number of buckets) so
20/// re-allocation is rare and fast.
21///
22/// As an additional optimization, an `is_empty` atomic flag allows workers
23/// seeking for tasks to skip taking the lock if the queue is likely to be
24/// empty.
25///
26/// The queue is not strictly LIFO. While buckets are indeed pushed and popped
27/// in LIFO order, individual tasks are stored in a bucket at the front of the
28/// queue and this bucket is only moved to the back of the queue when full.
29#[derive(Debug)]
30pub(crate) struct Injector<T, const BUCKET_CAPACITY: usize> {
31 /// A mutex-protected list of tasks.
32 inner: Mutex<Vec<Bucket<T, BUCKET_CAPACITY>>>,
33 /// A flag indicating whether the injector queue is empty.
34 is_empty: AtomicBool,
35}
36
37impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
38 /// Creates an empty injector queue.
39 ///
40 /// # Panics
41 ///
42 /// Panics if the capacity is 0.
43 pub(crate) const fn new() -> Self {
44 assert!(BUCKET_CAPACITY >= 1);
45
46 Self {
47 inner: Mutex::new(Vec::new()),
48 is_empty: AtomicBool::new(true),
49 }
50 }
51
52 /// Inserts a task.
53 ///
54 /// The task is inserted in a bucket at the front of the queue. Once this
55 /// bucket is full, it is moved to the back of the queue.
56 pub(crate) fn insert_task(&self, task: T) {
57 let mut inner = self.inner.lock().unwrap();
58
59 // Try to push the task onto the first bucket if it has enough capacity left.
60 if let Some(bucket) = inner.first_mut() {
61 if let Err(task) = bucket.push(task) {
62 // The bucket is full: move it to the back of the vector and
63 // replace it with a newly created bucket that contains the
64 // task.
65 let mut new_bucket = Bucket::new();
66 let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1
67
68 let full_bucket = mem::replace(bucket, new_bucket);
69 inner.push(full_bucket);
70 }
71
72 return;
73 }
74
75 // The queue is empty: create a new bucket.
76 let mut new_bucket = Bucket::new();
77 let _ = new_bucket.push(task); // this cannot fail provided the capacity is >=1
78
79 inner.push(new_bucket);
80
81 // Ordering: this flag is only used as a hint so Relaxed ordering is
82 // sufficient.
83 self.is_empty.store(false, Ordering::Relaxed);
84 }
85
86 /// Appends a bucket to the back of the queue.
87 pub(crate) fn push_bucket(&self, bucket: Bucket<T, BUCKET_CAPACITY>) {
88 let mut inner = self.inner.lock().unwrap();
89
90 let was_empty = inner.is_empty();
91 inner.push(bucket);
92
93 // If the queue was empty before, update the flag.
94 if was_empty {
95 // Ordering: this flag is only used as a hint so Relaxed ordering is
96 // sufficient.
97 self.is_empty.store(false, Ordering::Relaxed);
98 }
99 }
100
101 /// Takes the bucket at the back of the queue, if any.
102 ///
103 /// Note that this can spuriously return `None` even though the queue is
104 /// populated, unless a happens-before relationship exists between the
105 /// thread that populated the queue and the thread calling this method (this
106 /// is obviously the case if they are the same thread).
107 ///
108 /// This is not an issue in practice because it cannot lead to executor
109 /// deadlock. Indeed, if the last task/bucket was inserted by a worker
110 /// thread, that worker thread will always see that the injector queue is
111 /// populated (unless the bucket was already popped). Therefore, if all
112 /// workers exit, then all tasks they have re-injected will necessarily have
113 /// been processed. Likewise, if the last task/bucket was inserted by the
114 /// main executor thread before `Executor::run()` is called, the
115 /// synchronization established when the executor unparks worker threads
116 /// ensures that the task is visible to all unparked workers (there is
117 /// actually an edge case when the executor cannot unpark a thread after
118 /// pushing tasks, but this is taken care of by some extra synchronization
119 /// when deactivating workers).
120 pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> {
121 // Ordering: this flag is only used as a hint so Relaxed ordering is
122 // sufficient.
123 if self.is_empty.load(Ordering::Relaxed) {
124 return None;
125 }
126
127 let mut inner = self.inner.lock().unwrap();
128
129 let bucket = inner.pop();
130
131 if inner.is_empty() {
132 // Ordering: this flag is only used as a hint so Relaxed ordering is
133 // sufficient.
134 self.is_empty.store(true, Ordering::Relaxed);
135 }
136
137 bucket
138 }
139
140 /// Checks whether the queue is empty.
141 ///
142 /// Note that this can spuriously return `true` even though the queue is
143 /// populated, unless a happens-before relationship exists between the
144 /// thread that populated the queue and the thread calling this method (this
145 /// is obviously the case if they are the same thread).
146 pub(crate) fn is_empty(&self) -> bool {
147 self.is_empty.load(Ordering::Relaxed)
148 }
149}
150
151/// A collection of tasks with a bounded size.
152///
153/// This is just a very thin wrapper around a `Vec` that ensures that the
154/// nominal capacity bound is never exceeded.
155#[derive(Debug)]
156pub(crate) struct Bucket<T, const CAPACITY: usize>(Vec<T>);
157
158impl<T, const CAPACITY: usize> Bucket<T, CAPACITY> {
159 /// Creates a new bucket, allocating the full capacity upfront.
160 pub(crate) fn new() -> Self {
161 Self(Vec::with_capacity(CAPACITY))
162 }
163
164 /// Returns the bucket's nominal capacity.
165 pub(crate) const fn capacity() -> usize {
166 CAPACITY
167 }
168
169 /// Appends one task if capacity allows; otherwise returns the task in the
170 /// error.
171 pub(crate) fn push(&mut self, task: T) -> Result<(), T> {
172 if self.0.len() < CAPACITY {
173 self.0.push(task);
174 Ok(())
175 } else {
176 Err(task)
177 }
178 }
179}
180
181impl<T, const CAPACITY: usize> IntoIterator for Bucket<T, CAPACITY> {
182 type Item = T;
183 type IntoIter = vec::IntoIter<T>;
184
185 fn into_iter(self) -> Self::IntoIter {
186 self.0.into_iter()
187 }
188}
189
190impl<T, const CAPACITY: usize> FromIterator<T> for Bucket<T, CAPACITY> {
191 fn from_iter<U: IntoIterator<Item = T>>(iter: U) -> Self {
192 Self(Vec::from_iter(iter.into_iter().take(CAPACITY)))
193 }
194}