obnam/
engine.rs

1//! Engine for doing CPU heavy work in the background.
2
3use crate::workqueue::WorkQueue;
4use futures::stream::{FuturesOrdered, StreamExt};
5use tokio::select;
6use tokio::sync::mpsc;
7
8/// Do heavy work in the background.
9///
10/// An engine takes items of work from a work queue, and does the work
11/// in the background, using `tokio` blocking tasks. The background
12/// work can be CPU intensive or block on I/O. The number of active
13/// concurrent tasks is limited to the size of the queue.
14///
15/// The actual work is done in a function or closure passed in as a
16/// parameter to the engine. The worker function is called with a work
17/// item as an argument, in a thread dedicated for that worker
18/// function.
19///
20/// The need to move work items between threads puts some restrictions
21/// on the types used as work items.
22pub struct Engine<T> {
23    rx: mpsc::Receiver<T>,
24}
25
26impl<T: Send + 'static> Engine<T> {
27    /// Create a new engine.
28    ///
29    /// Each engine gets work from a queue, and calls the same worker
30    /// function for each item of work. The results are put into
31    /// another, internal queue.
32    pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
33    where
34        F: Send + Copy + 'static + Fn(S) -> T,
35        S: Send + 'static,
36    {
37        let size = queue.size();
38        let (tx, rx) = mpsc::channel(size);
39        tokio::spawn(manage_workers(queue, size, tx, func));
40        Self { rx }
41    }
42
43    /// Get the oldest result of the worker function, if any.
44    ///
45    /// This will block until there is a result, or it's known that no
46    /// more results will be forthcoming.
47    pub async fn next(&mut self) -> Option<T> {
48        self.rx.recv().await
49    }
50}
51
52// This is a normal (non-blocking) background task that retrieves work
53// items, launches blocking background tasks for work to be done, and
54// waits on those tasks. Care is taken to not launch too many worker
55// tasks.
56async fn manage_workers<S, T, F>(
57    mut queue: WorkQueue<S>,
58    queue_size: usize,
59    tx: mpsc::Sender<T>,
60    func: F,
61) where
62    F: Send + 'static + Copy + Fn(S) -> T,
63    S: Send + 'static,
64    T: Send + 'static,
65{
66    let mut workers = FuturesOrdered::new();
67
68    'processing: loop {
69        // Wait for first of various concurrent things to finish.
70        select! {
71            biased;
72
73            // Get work to be done.
74            maybe_work = queue.next() => {
75                if let Some(work) = maybe_work {
76                    // We got a work item. Launch background task to
77                    // work on it.
78                    let tx = tx.clone();
79                    workers.push(do_work(work, tx, func));
80
81                    // If queue is full, wait for at least one
82                    // background task to finish.
83                    while workers.len() >= queue_size {
84                        workers.next().await;
85                    }
86                } else {
87                    // Finished with the input queue. Nothing more to do.
88                    break 'processing;
89                }
90            }
91
92            // Wait for background task to finish, if there are any
93            // background tasks currently running.
94            _ = workers.next(), if !workers.is_empty() => {
95                // nothing to do here
96            }
97        }
98    }
99
100    while workers.next().await.is_some() {
101        // Finish the remaining work items.
102    }
103}
104
105// Work on a work item.
106//
107// This launches a `tokio` blocking background task, and waits for it
108// to finish. The caller spawns a normal (non-blocking) async task for
109// this function, so it's OK for this function to wait on the task it
110// launches.
111async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
112where
113    F: Send + 'static + Fn(S) -> T,
114    S: Send + 'static,
115    T: Send + 'static,
116{
117    let result = tokio::task::spawn_blocking(move || func(item))
118        .await
119        .unwrap();
120    if let Err(err) = tx.send(result).await {
121        panic!("failed to send result to channel: {}", err);
122    }
123}