operation_queue/operation_queue.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5//! This module defines the types and data structures for the operation queue.
6//! See the crate's top-level documentation.
7
8use std::{
9 cell::{Cell, RefCell},
10 fmt::Debug,
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14};
15
16use async_channel::{Receiver, Sender};
17
18use crate::error::Error;
19
20/// An operation that can be added to an [`OperationQueue`].
21#[allow(async_fn_in_trait)]
22pub trait QueuedOperation: Debug {
23 /// Performs the operation asynchronously.
24 async fn perform(&self);
25}
26
27/// A dyn-compatible version of [`QueuedOperation`]. It is implemented for all
28/// types that implement [`QueuedOperation`].
29///
30/// [`ErasedQueuedOperation`] makes [`QueuedOperation`] dyn-compatible by
31/// wrapping the opaque [`Future`] returned by `perform` into a [`Box`], which
32/// is essentially an owned pointer and which size is known at compile time.
33/// This makes `perform` dispatchable from a trait object.
34///
35/// This return value is further wrapped into a [`Pin`] so that the `Future` can
36/// be `await`ed (since the receiver for [`Future::poll`] is `Pin<&mut Self>`).
37///
38/// In this context, "erased" refers to how this trait "erases" the
39/// opaque/generic return type of [`QueuedOperation::perform`] by turning it
40/// into a trait object.
41pub trait ErasedQueuedOperation: Debug {
42 fn perform<'op>(&'op self) -> Pin<Box<dyn Future<Output = ()> + 'op>>;
43}
44
45impl<T> ErasedQueuedOperation for T
46where
47 T: QueuedOperation,
48{
49 fn perform<'op>(&'op self) -> Pin<Box<dyn Future<Output = ()> + 'op>> {
50 Box::pin(self.perform())
51 }
52}
53
54/// A queue that performs asynchronous operations in order.
55//
56// Design considerations:
57//
58// * A previous approach involved using a `VecDeque` as the queue's inner
59// buffer, but relying on `async_channel` allows simplifying the queue's
60// structure, as well as the logic for waiting for new items to become
61// available.
62//
63// * `Arc` is used to keep track of runners in a way that ensures memory is
64// properly managed. For compatibility with current Thunderbird code, the
65// queue's item type (`ErasedQueuedOperation`) does not include a bound on
66// `Send` and/or `Sync`, so `Rc` could be used instead. However, we plan to,
67// at a later time, address the current thread safety issues within the
68// Thunderbird code base which currently prevent dispatching runners across
69// multiple threads. In this context, we believe using `Arc` right away will
70// avoid a hefty change in the future (at a negligible performance cost).
71pub struct OperationQueue {
72 channel_sender: Sender<Box<dyn ErasedQueuedOperation>>,
73 channel_receiver: Receiver<Box<dyn ErasedQueuedOperation>>,
74 runners: RefCell<Vec<Arc<Runner>>>,
75 spawn_task: fn(fut: Pin<Box<dyn Future<Output = ()>>>),
76}
77
78impl OperationQueue {
79 /// Creates a new operation queue.
80 ///
81 /// The function provided as argument is used when spawning new runners,
82 /// e.g. `tokio::task::spawn_local`. It must not be blocking.
83 pub fn new(spawn_task: fn(fut: Pin<Box<dyn Future<Output = ()>>>)) -> OperationQueue {
84 let (snd, rcv) = async_channel::unbounded();
85
86 OperationQueue {
87 channel_sender: snd,
88 channel_receiver: rcv,
89 runners: RefCell::new(Vec::new()),
90 spawn_task,
91 }
92 }
93
94 /// Starts the given number of runners that consume new items pushed to the
95 /// queue.
96 ///
97 /// A runner loops infinitely, performing operations as they get queued.
98 ///
99 /// An error can be returned if the queue has previously been stopped.
100 pub fn start(&self, runners: u32) -> Result<(), Error> {
101 if self.channel_sender.is_closed() {
102 return Err(Error::Stopped);
103 }
104
105 for i in 0..runners {
106 let runner = Runner::new(i, self.channel_receiver.clone());
107 (self.spawn_task)(Box::pin(runner.clone().run()));
108 self.runners.borrow_mut().push(runner);
109 }
110
111 Ok(())
112 }
113
114 /// Pushes an operation to the back of the queue.
115 ///
116 /// This function can be used with any type that implements
117 /// [`QueuedOperation`], since [`ErasedQueuedOperation`] is automatically
118 /// implemented for all such implementations.
119 ///
120 /// An error can be returned if the queue has been stopped.
121 pub async fn enqueue(&self, op: Box<dyn ErasedQueuedOperation>) -> Result<(), Error> {
122 self.channel_sender.send(op).await?;
123 Ok(())
124 }
125
126 /// Stops the queue.
127 ///
128 /// Operations that have already been queued up will still be performed, but
129 /// any call to [`start`] or [`enqueue`] following a call to `stop` will fail.
130 ///
131 /// [`start`]: OperationQueue::start
132 /// [`enqueue`]: OperationQueue::enqueue
133 pub async fn stop(&self) {
134 if !self.channel_sender.close() {
135 log::warn!("request queue: attempted to close channel that's already closed");
136 }
137
138 // Clear the references we have on the runners, so they can be dropped
139 // when they finish running.
140 self.runners.borrow_mut().clear();
141 }
142
143 /// Checks whether one or more runner(s) is currently active.
144 ///
145 /// If a runner has been created but isn't running yet, it is still included
146 /// in this count. Thus a runner being active means it's in any state other
147 /// than fully stopped.
148 ///
149 /// This method also returns `false` if there aren't any runners (e.g. if
150 /// the queue hasn't been started yet, or it has been stopped).
151 pub fn running(&self) -> bool {
152 // Count every runner that's not permanently stopped. This should be
153 // fine, since the only places we mutably borrow `self.runners` are
154 // `start` and `stop` and:
155 // * both `start`, `stop` and `running` are expected to be run in the
156 // same thread/routine, and
157 // * both are synchronous functions so there should be no risk of one
158 // happening while the other waits.
159 let active_runners =
160 self.count_matching_runners(|runner| !matches!(runner.state(), RunnerState::Stopped));
161
162 log::debug!("{active_runners} runner(s) currently active");
163
164 // Check if there's at least one runner currently active.
165 active_runners > 0
166 }
167
168 /// Checks whether all runners are currently waiting for a new operation to
169 /// perform.
170 pub fn idle(&self) -> bool {
171 // Count every runner that's waiting for a new operation to perform.
172 // This should be fine, since the only places we mutably borrow
173 // `self.runners` are `start` and `stop` and:
174 // * both `start`, `stop` and `idle` are expected to be run in the
175 // thread/routine, and
176 // * both are synchronous functions so there should be no risk of one
177 // happening while the other waits.
178 let idle_runners =
179 self.count_matching_runners(|runner| matches!(runner.state(), RunnerState::Waiting));
180
181 log::debug!("{idle_runners} runner(s) currently idle");
182
183 // If `self.runner` was being mutably borrowed here, we would have
184 // already panicked when calling `self.count_matching_runners()`.
185 idle_runners == self.runners.borrow().len()
186 }
187
188 /// Counts the number of runners matching the given closure. The type of the
189 /// closure is the same that would be used by [`Iterator::filter`].
190 ///
191 /// # Panics
192 ///
193 /// This method will panic if it's called while `self.runners` is being
194 /// mutably borrowed.
195 fn count_matching_runners<PredicateT>(&self, predicate: PredicateT) -> usize
196 where
197 PredicateT: FnMut(&&Arc<Runner>) -> bool,
198 {
199 self.runners.borrow().iter().filter(predicate).count()
200 }
201}
202
203/// The status of a runner.
204#[derive(Clone, Copy, PartialEq, Eq, Debug)]
205enum RunnerState {
206 /// The runner has been created but isn't running yet.
207 Pending,
208
209 /// The runner is currently waiting for an operation to perform.
210 Waiting,
211
212 /// The runner is currently performing an operation.
213 Running,
214
215 /// The runner has finished performing its last operation and has exited its
216 /// main loop.
217 Stopped,
218}
219
220/// A runner created and run by the [`OperationQueue`].
221///
222/// Each runner works by entering an infinite loop upon calling [`Runner::run`],
223/// which is only exited when the queue's channel is closed and has been
224/// emptied.
225///
226/// The current state of the runner can be checked at any time with
227/// [`Runner::state`].
228struct Runner {
229 receiver: Receiver<Box<dyn ErasedQueuedOperation>>,
230 state: Cell<RunnerState>,
231
232 // A numerical identifier attached to the current runner, used for
233 // debugging.
234 id: u32,
235}
236
237impl Runner {
238 /// Creates a new [`Runner`], wrapped into an [`Arc`].
239 ///
240 /// `id` is a numerical identifier used for debugging.
241 ///
242 /// Since [`Runner::run`] requires the queue to be wrapped inside an
243 /// [`Arc`], this is how this method returns the new queue.
244 //
245 // See the design consideration comment for `OperationQueue` regarding the
246 // use of `Arc`.
247 #[allow(clippy::arc_with_non_send_sync)]
248 fn new(id: u32, receiver: Receiver<Box<dyn ErasedQueuedOperation>>) -> Arc<Runner> {
249 Arc::new(Runner {
250 id,
251 receiver,
252 state: Cell::new(RunnerState::Pending),
253 })
254 }
255
256 /// Starts a loop that waits for new operations to come down the inner
257 /// channel and performs them.
258 ///
259 /// This method does not explicitly take care of sharing the operation's
260 /// response to the consumer; this is expected to be done by
261 /// [`QueuedOperation::perform`].
262 async fn run(self: Arc<Runner>) {
263 loop {
264 self.state.replace(RunnerState::Waiting);
265
266 let op = match self.receiver.recv().await {
267 Ok(op) => op,
268 Err(_) => {
269 log::info!(
270 "request queue: channel has closed (likely due to client shutdown), exiting the loop"
271 );
272 self.state.replace(RunnerState::Stopped);
273 return;
274 }
275 };
276
277 self.state.replace(RunnerState::Running);
278
279 log::info!(
280 "operation_queue::Runner: runner {} performing op: {op:?}",
281 self.id
282 );
283
284 op.perform().await;
285 }
286 }
287
288 /// Gets the runner's current state.
289 fn state(&self) -> RunnerState {
290 self.state.get()
291 }
292}
293
294#[cfg(test)]
295// For simplicity, we run our async tests using tokio's local runtime using the
296// unstable "local" value for the `flavor` argument in `tokio::test`. Because it
297// comes from tokio's unstable API, we need to supply the `tokio_unstable` cfg
298// condition, which in turn triggers a warning from within the `tokio::test`
299// macro about an unexpected cfg condition name.
300#[allow(unexpected_cfgs)]
301mod tests {
302 use super::*;
303
304 use async_channel::Sender;
305 use tokio::time::Duration;
306
307 fn new_queue() -> OperationQueue {
308 OperationQueue::new(|fut| {
309 _ = tokio::task::spawn_local(fut);
310 })
311 }
312
313 #[tokio::test(flavor = "local")]
314 async fn start_queue() {
315 let queue = new_queue();
316
317 queue.start(5).unwrap();
318 assert_eq!(queue.runners.borrow().len(), 5);
319
320 // We need to await something to give the runners a chance to start
321 // their loops.
322 tokio::time::sleep(Duration::from_millis(0)).await;
323 assert!(queue.idle());
324 }
325
326 #[tokio::test(flavor = "local")]
327 async fn stop_queue() {
328 let queue = new_queue();
329
330 queue.start(5).unwrap();
331
332 // We need to await something to give the runners a chance to start
333 // their loops.
334 tokio::time::sleep(Duration::from_millis(0)).await;
335 assert!(queue.idle());
336
337 queue.stop().await;
338 assert!(!queue.running());
339 assert!(queue.channel_receiver.is_closed());
340
341 match queue.start(1) {
342 Ok(_) => panic!("we should not be able to start the queue after stopping it"),
343 Err(Error::Stopped) => (),
344 Err(_) => panic!("unexpected error"),
345 }
346
347 // Try to enqueue a dummy operation to make sure it fails.
348 #[derive(Debug)]
349 struct Operation {}
350 impl QueuedOperation for Operation {
351 async fn perform(&self) {}
352 }
353
354 let op = Box::new(Operation {});
355 match queue.enqueue(op).await {
356 Ok(_) => panic!("we should not be able to enqueue operations after stopping the queue"),
357 Err(Error::Sender) => (),
358 Err(_) => panic!("unexpected error"),
359 }
360 }
361
362 #[tokio::test(flavor = "local")]
363 async fn operation_order() {
364 // A simple operation with a numerical ID that sends its own ID through
365 // a channel.
366 #[derive(Debug)]
367 struct Operation {
368 id: u8,
369 sender: Sender<u8>,
370 }
371 impl QueuedOperation for Operation {
372 async fn perform(&self) {
373 self.sender.send(self.id).await.unwrap();
374 }
375 }
376
377 let queue = new_queue();
378
379 // Create a channel the operations can use to send us their ID.
380 let (sender, receiver) = async_channel::unbounded();
381
382 // Enqueue a couple of operations.
383 queue
384 .enqueue(Box::new(Operation {
385 id: 1,
386 sender: sender.clone(),
387 }))
388 .await
389 .unwrap();
390
391 queue
392 .enqueue(Box::new(Operation {
393 id: 2,
394 sender: sender.clone(),
395 }))
396 .await
397 .unwrap();
398
399 // Start exactly one runner so we can check that operations run in
400 // order.
401 queue.start(1).unwrap();
402
403 // We need to await something to give the runner a chance to start and
404 // perform operations.
405 tokio::time::sleep(Duration::from_millis(0)).await;
406
407 // Check that we got both IDs in order.
408 let id = receiver.recv().await.unwrap();
409 assert_eq!(id, 1);
410 let id = receiver.recv().await.unwrap();
411 assert_eq!(id, 2);
412
413 // For bonus points: the queue should be fully idle now.
414 assert!(queue.idle());
415 }
416}