obnam/engine.rs
1//! Engine for doing CPU heavy work in the background.
2
3use crate::workqueue::WorkQueue;
4use futures::stream::{FuturesOrdered, StreamExt};
5use tokio::select;
6use tokio::sync::mpsc;
7
8/// Do heavy work in the background.
9///
10/// An engine takes items of work from a work queue, and does the work
11/// in the background, using `tokio` blocking tasks. The background
12/// work can be CPU intensive or block on I/O. The number of active
13/// concurrent tasks is limited to the size of the queue.
14///
15/// The actual work is done in a function or closure passed in as a
16/// parameter to the engine. The worker function is called with a work
17/// item as an argument, in a thread dedicated for that worker
18/// function.
19///
20/// The need to move work items between threads puts some restrictions
21/// on the types used as work items.
22pub struct Engine<T> {
23 rx: mpsc::Receiver<T>,
24}
25
26impl<T: Send + 'static> Engine<T> {
27 /// Create a new engine.
28 ///
29 /// Each engine gets work from a queue, and calls the same worker
30 /// function for each item of work. The results are put into
31 /// another, internal queue.
32 pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
33 where
34 F: Send + Copy + 'static + Fn(S) -> T,
35 S: Send + 'static,
36 {
37 let size = queue.size();
38 let (tx, rx) = mpsc::channel(size);
39 tokio::spawn(manage_workers(queue, size, tx, func));
40 Self { rx }
41 }
42
43 /// Get the oldest result of the worker function, if any.
44 ///
45 /// This will block until there is a result, or it's known that no
46 /// more results will be forthcoming.
47 pub async fn next(&mut self) -> Option<T> {
48 self.rx.recv().await
49 }
50}
51
52// This is a normal (non-blocking) background task that retrieves work
53// items, launches blocking background tasks for work to be done, and
54// waits on those tasks. Care is taken to not launch too many worker
55// tasks.
56async fn manage_workers<S, T, F>(
57 mut queue: WorkQueue<S>,
58 queue_size: usize,
59 tx: mpsc::Sender<T>,
60 func: F,
61) where
62 F: Send + 'static + Copy + Fn(S) -> T,
63 S: Send + 'static,
64 T: Send + 'static,
65{
66 let mut workers = FuturesOrdered::new();
67
68 'processing: loop {
69 // Wait for first of various concurrent things to finish.
70 select! {
71 biased;
72
73 // Get work to be done.
74 maybe_work = queue.next() => {
75 if let Some(work) = maybe_work {
76 // We got a work item. Launch background task to
77 // work on it.
78 let tx = tx.clone();
79 workers.push(do_work(work, tx, func));
80
81 // If queue is full, wait for at least one
82 // background task to finish.
83 while workers.len() >= queue_size {
84 workers.next().await;
85 }
86 } else {
87 // Finished with the input queue. Nothing more to do.
88 break 'processing;
89 }
90 }
91
92 // Wait for background task to finish, if there are any
93 // background tasks currently running.
94 _ = workers.next(), if !workers.is_empty() => {
95 // nothing to do here
96 }
97 }
98 }
99
100 while workers.next().await.is_some() {
101 // Finish the remaining work items.
102 }
103}
104
105// Work on a work item.
106//
107// This launches a `tokio` blocking background task, and waits for it
108// to finish. The caller spawns a normal (non-blocking) async task for
109// this function, so it's OK for this function to wait on the task it
110// launches.
111async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
112where
113 F: Send + 'static + Fn(S) -> T,
114 S: Send + 'static,
115 T: Send + 'static,
116{
117 let result = tokio::task::spawn_blocking(move || func(item))
118 .await
119 .unwrap();
120 if let Err(err) = tx.send(result).await {
121 panic!("failed to send result to channel: {}", err);
122 }
123}