Skip to main content

operation_queue/
operation_queue.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5//! This module defines the types and data structures for the operation queue.
6//! See the crate's top-level documentation.
7
8use std::{
9    cell::{Cell, RefCell},
10    fmt::Debug,
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14};
15
16use async_channel::{Receiver, Sender};
17
18use crate::error::Error;
19
20/// An operation that can be added to an [`OperationQueue`].
21#[allow(async_fn_in_trait)]
22pub trait QueuedOperation: Debug {
23    /// Performs the operation asynchronously.
24    async fn perform(&self);
25}
26
27/// A dyn-compatible version of [`QueuedOperation`]. It is implemented for all
28/// types that implement [`QueuedOperation`].
29///
30/// [`ErasedQueuedOperation`] makes [`QueuedOperation`] dyn-compatible by
31/// wrapping the opaque [`Future`] returned by `perform` into a [`Box`], which
32/// is essentially an owned pointer and which size is known at compile time.
33/// This makes `perform` dispatchable from a trait object.
34///
35/// This return value is further wrapped into a [`Pin`] so that the `Future` can
36/// be `await`ed (since the receiver for [`Future::poll`] is `Pin<&mut Self>`).
37///
38/// In this context, "erased" refers to how this trait "erases" the
39/// opaque/generic return type of [`QueuedOperation::perform`] by turning it
40/// into a trait object.
41pub trait ErasedQueuedOperation: Debug {
42    fn perform<'op>(&'op self) -> Pin<Box<dyn Future<Output = ()> + 'op>>;
43}
44
45impl<T> ErasedQueuedOperation for T
46where
47    T: QueuedOperation,
48{
49    fn perform<'op>(&'op self) -> Pin<Box<dyn Future<Output = ()> + 'op>> {
50        Box::pin(self.perform())
51    }
52}
53
54/// A queue that performs asynchronous operations in order.
55//
56// Design considerations:
57//
58//  * A previous approach involved using a `VecDeque` as the queue's inner
59//    buffer, but relying on `async_channel` allows simplifying the queue's
60//    structure, as well as the logic for waiting for new items to become
61//    available.
62//
63//  * `Arc` is used to keep track of runners in a way that ensures memory is
64//    properly managed. For compatibility with current Thunderbird code, the
65//    queue's item type (`ErasedQueuedOperation`) does not include a bound on
66//    `Send` and/or `Sync`, so `Rc` could be used instead. However, we plan to,
67//    at a later time, address the current thread safety issues within the
68//    Thunderbird code base which currently prevent dispatching runners across
69//    multiple threads. In this context, we believe using `Arc` right away will
70//    avoid a hefty change in the future (at a negligible performance cost).
71pub struct OperationQueue {
72    channel_sender: Sender<Box<dyn ErasedQueuedOperation>>,
73    channel_receiver: Receiver<Box<dyn ErasedQueuedOperation>>,
74    runners: RefCell<Vec<Arc<Runner>>>,
75    spawn_task: fn(fut: Pin<Box<dyn Future<Output = ()>>>),
76}
77
78impl OperationQueue {
79    /// Creates a new operation queue.
80    ///
81    /// The function provided as argument is used when spawning new runners,
82    /// e.g. `tokio::task::spawn_local`. It must not be blocking.
83    pub fn new(spawn_task: fn(fut: Pin<Box<dyn Future<Output = ()>>>)) -> OperationQueue {
84        let (snd, rcv) = async_channel::unbounded();
85
86        OperationQueue {
87            channel_sender: snd,
88            channel_receiver: rcv,
89            runners: RefCell::new(Vec::new()),
90            spawn_task,
91        }
92    }
93
94    /// Starts the given number of runners that consume new items pushed to the
95    /// queue.
96    ///
97    /// A runner loops infinitely, performing operations as they get queued.
98    ///
99    /// An error can be returned if the queue has previously been stopped.
100    pub fn start(&self, runners: u32) -> Result<(), Error> {
101        if self.channel_sender.is_closed() {
102            return Err(Error::Stopped);
103        }
104
105        for i in 0..runners {
106            let runner = Runner::new(i, self.channel_receiver.clone());
107            (self.spawn_task)(Box::pin(runner.clone().run()));
108            self.runners.borrow_mut().push(runner);
109        }
110
111        Ok(())
112    }
113
114    /// Pushes an operation to the back of the queue.
115    ///
116    /// This function can be used with any type that implements
117    /// [`QueuedOperation`], since [`ErasedQueuedOperation`] is automatically
118    /// implemented for all such implementations.
119    ///
120    /// An error can be returned if the queue has been stopped.
121    pub async fn enqueue(&self, op: Box<dyn ErasedQueuedOperation>) -> Result<(), Error> {
122        self.channel_sender.send(op).await?;
123        Ok(())
124    }
125
126    /// Stops the queue.
127    ///
128    /// Operations that have already been queued up will still be performed, but
129    /// any call to [`start`] or [`enqueue`] following a call to `stop` will fail.
130    ///
131    /// [`start`]: OperationQueue::start
132    /// [`enqueue`]: OperationQueue::enqueue
133    pub async fn stop(&self) {
134        if !self.channel_sender.close() {
135            log::warn!("request queue: attempted to close channel that's already closed");
136        }
137
138        // Clear the references we have on the runners, so they can be dropped
139        // when they finish running.
140        self.runners.borrow_mut().clear();
141    }
142
143    /// Checks whether one or more runner(s) is currently active.
144    ///
145    /// If a runner has been created but isn't running yet, it is still included
146    /// in this count. Thus a runner being active means it's in any state other
147    /// than fully stopped.
148    ///
149    /// This method also returns `false` if there aren't any runners (e.g. if
150    /// the queue hasn't been started yet, or it has been stopped).
151    pub fn running(&self) -> bool {
152        // Count every runner that's not permanently stopped. This should be
153        // fine, since the only places we mutably borrow `self.runners` are
154        // `start` and `stop` and:
155        //  * both `start`, `stop` and `running` are expected to be run in the
156        //    same thread/routine, and
157        //  * both are synchronous functions so there should be no risk of one
158        //    happening while the other waits.
159        let active_runners =
160            self.count_matching_runners(|runner| !matches!(runner.state(), RunnerState::Stopped));
161
162        log::debug!("{active_runners} runner(s) currently active");
163
164        // Check if there's at least one runner currently active.
165        active_runners > 0
166    }
167
168    /// Checks whether all runners are currently waiting for a new operation to
169    /// perform.
170    pub fn idle(&self) -> bool {
171        // Count every runner that's waiting for a new operation to perform.
172        // This should be fine, since the only places we mutably borrow
173        // `self.runners` are `start` and `stop` and:
174        //  * both `start`, `stop` and `idle` are expected to be run in the
175        //    thread/routine, and
176        //  * both are synchronous functions so there should be no risk of one
177        //    happening while the other waits.
178        let idle_runners =
179            self.count_matching_runners(|runner| matches!(runner.state(), RunnerState::Waiting));
180
181        log::debug!("{idle_runners} runner(s) currently idle");
182
183        // If `self.runner` was being mutably borrowed here, we would have
184        // already panicked when calling `self.count_matching_runners()`.
185        idle_runners == self.runners.borrow().len()
186    }
187
188    /// Counts the number of runners matching the given closure. The type of the
189    /// closure is the same that would be used by [`Iterator::filter`].
190    ///
191    /// # Panics
192    ///
193    /// This method will panic if it's called while `self.runners` is being
194    /// mutably borrowed.
195    fn count_matching_runners<PredicateT>(&self, predicate: PredicateT) -> usize
196    where
197        PredicateT: FnMut(&&Arc<Runner>) -> bool,
198    {
199        self.runners.borrow().iter().filter(predicate).count()
200    }
201}
202
203/// The status of a runner.
204#[derive(Clone, Copy, PartialEq, Eq, Debug)]
205enum RunnerState {
206    /// The runner has been created but isn't running yet.
207    Pending,
208
209    /// The runner is currently waiting for an operation to perform.
210    Waiting,
211
212    /// The runner is currently performing an operation.
213    Running,
214
215    /// The runner has finished performing its last operation and has exited its
216    /// main loop.
217    Stopped,
218}
219
220/// A runner created and run by the [`OperationQueue`].
221///
222/// Each runner works by entering an infinite loop upon calling [`Runner::run`],
223/// which is only exited when the queue's channel is closed and has been
224/// emptied.
225///
226/// The current state of the runner can be checked at any time with
227/// [`Runner::state`].
228struct Runner {
229    receiver: Receiver<Box<dyn ErasedQueuedOperation>>,
230    state: Cell<RunnerState>,
231
232    // A numerical identifier attached to the current runner, used for
233    // debugging.
234    id: u32,
235}
236
237impl Runner {
238    /// Creates a new [`Runner`], wrapped into an [`Arc`].
239    ///
240    /// `id` is a numerical identifier used for debugging.
241    ///
242    /// Since [`Runner::run`] requires the queue to be wrapped inside an
243    /// [`Arc`], this is how this method returns the new queue.
244    //
245    // See the design consideration comment for `OperationQueue` regarding the
246    // use of `Arc`.
247    #[allow(clippy::arc_with_non_send_sync)]
248    fn new(id: u32, receiver: Receiver<Box<dyn ErasedQueuedOperation>>) -> Arc<Runner> {
249        Arc::new(Runner {
250            id,
251            receiver,
252            state: Cell::new(RunnerState::Pending),
253        })
254    }
255
256    /// Starts a loop that waits for new operations to come down the inner
257    /// channel and performs them.
258    ///
259    /// This method does not explicitly take care of sharing the operation's
260    /// response to the consumer; this is expected to be done by
261    /// [`QueuedOperation::perform`].
262    async fn run(self: Arc<Runner>) {
263        loop {
264            self.state.replace(RunnerState::Waiting);
265
266            let op = match self.receiver.recv().await {
267                Ok(op) => op,
268                Err(_) => {
269                    log::info!(
270                        "request queue: channel has closed (likely due to client shutdown), exiting the loop"
271                    );
272                    self.state.replace(RunnerState::Stopped);
273                    return;
274                }
275            };
276
277            self.state.replace(RunnerState::Running);
278
279            log::info!(
280                "operation_queue::Runner: runner {} performing op: {op:?}",
281                self.id
282            );
283
284            op.perform().await;
285        }
286    }
287
288    /// Gets the runner's current state.
289    fn state(&self) -> RunnerState {
290        self.state.get()
291    }
292}
293
294#[cfg(test)]
295// For simplicity, we run our async tests using tokio's local runtime using the
296// unstable "local" value for the `flavor` argument in `tokio::test`. Because it
297// comes from tokio's unstable API, we need to supply the `tokio_unstable` cfg
298// condition, which in turn triggers a warning from within the `tokio::test`
299// macro about an unexpected cfg condition name.
300#[allow(unexpected_cfgs)]
301mod tests {
302    use super::*;
303
304    use async_channel::Sender;
305    use tokio::time::Duration;
306
307    fn new_queue() -> OperationQueue {
308        OperationQueue::new(|fut| {
309            _ = tokio::task::spawn_local(fut);
310        })
311    }
312
313    #[tokio::test(flavor = "local")]
314    async fn start_queue() {
315        let queue = new_queue();
316
317        queue.start(5).unwrap();
318        assert_eq!(queue.runners.borrow().len(), 5);
319
320        // We need to await something to give the runners a chance to start
321        // their loops.
322        tokio::time::sleep(Duration::from_millis(0)).await;
323        assert!(queue.idle());
324    }
325
326    #[tokio::test(flavor = "local")]
327    async fn stop_queue() {
328        let queue = new_queue();
329
330        queue.start(5).unwrap();
331
332        // We need to await something to give the runners a chance to start
333        // their loops.
334        tokio::time::sleep(Duration::from_millis(0)).await;
335        assert!(queue.idle());
336
337        queue.stop().await;
338        assert!(!queue.running());
339        assert!(queue.channel_receiver.is_closed());
340
341        match queue.start(1) {
342            Ok(_) => panic!("we should not be able to start the queue after stopping it"),
343            Err(Error::Stopped) => (),
344            Err(_) => panic!("unexpected error"),
345        }
346
347        // Try to enqueue a dummy operation to make sure it fails.
348        #[derive(Debug)]
349        struct Operation {}
350        impl QueuedOperation for Operation {
351            async fn perform(&self) {}
352        }
353
354        let op = Box::new(Operation {});
355        match queue.enqueue(op).await {
356            Ok(_) => panic!("we should not be able to enqueue operations after stopping the queue"),
357            Err(Error::Sender) => (),
358            Err(_) => panic!("unexpected error"),
359        }
360    }
361
362    #[tokio::test(flavor = "local")]
363    async fn operation_order() {
364        // A simple operation with a numerical ID that sends its own ID through
365        // a channel.
366        #[derive(Debug)]
367        struct Operation {
368            id: u8,
369            sender: Sender<u8>,
370        }
371        impl QueuedOperation for Operation {
372            async fn perform(&self) {
373                self.sender.send(self.id).await.unwrap();
374            }
375        }
376
377        let queue = new_queue();
378
379        // Create a channel the operations can use to send us their ID.
380        let (sender, receiver) = async_channel::unbounded();
381
382        // Enqueue a couple of operations.
383        queue
384            .enqueue(Box::new(Operation {
385                id: 1,
386                sender: sender.clone(),
387            }))
388            .await
389            .unwrap();
390
391        queue
392            .enqueue(Box::new(Operation {
393                id: 2,
394                sender: sender.clone(),
395            }))
396            .await
397            .unwrap();
398
399        // Start exactly one runner so we can check that operations run in
400        // order.
401        queue.start(1).unwrap();
402
403        // We need to await something to give the runner a chance to start and
404        // perform operations.
405        tokio::time::sleep(Duration::from_millis(0)).await;
406
407        // Check that we got both IDs in order.
408        let id = receiver.recv().await.unwrap();
409        assert_eq!(id, 1);
410        let id = receiver.recv().await.unwrap();
411        assert_eq!(id, 2);
412
413        // For bonus points: the queue should be fully idle now.
414        assert!(queue.idle());
415    }
416}