background_runner/lib.rs
1//! This crate provides the [`BackgroundRunner`] struct, which runs a
2//! given task in the background once [`BackgroundRunner::update()`] is called.
3//!
4//! ```rust
5//! use background_runner::BackgroundRunner;
6//! use std::thread::sleep;
7//! use std::time::Duration;
8//! use std::time::Instant;
9//!
10//! let mut runner_iter = 0;
11//!
12//! // Set up the runner, giving it a task to run
13//! let runner = BackgroundRunner::new(move |iter| {
14//! // Simulate some heavy work
15//! println!("runner_iter = {runner_iter}, iter = {iter}");
16//! runner_iter += 1;
17//! sleep(Duration::from_millis(10));
18//! });
19//!
20//! let start = Instant::now();
21//! let mut iter = 0;
22//! while start.elapsed().as_millis() < 100 {
23//! // Update the runner with the current loop iteration
24//! runner.update(&iter);
25//! iter += 1;
26//! }
27//! ```
28
29#[cfg(test)]
30mod test;
31
32use std::sync::Arc;
33use std::sync::Condvar;
34use std::sync::Mutex;
35use std::sync::TryLockError;
36use std::sync::atomic;
37use std::sync::atomic::AtomicBool;
38
39/// Runner for a background task to be invoked periodically.
40#[derive(Debug)]
41pub struct BackgroundRunner<T> {
42 sync: Arc<Sync<T>>,
43 background_thread: Option<std::thread::JoinHandle<()>>,
44}
45
46impl<T: 'static + Send> BackgroundRunner<T> {
47 /// Create a new background runner with the given task
48 ///
49 /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
50 pub fn new(task: impl 'static + Send + FnMut(&T)) -> Self
51 where
52 T: Default,
53 {
54 Self::with_init_data(T::default(), task)
55 }
56
57 /// Create a new background runner with the given initial data and task
58 ///
59 /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
60 pub fn with_init_data(init_data: T, mut task: impl 'static + Send + FnMut(&T)) -> Self {
61 let sync = Arc::new(Sync::new(init_data));
62 let cloned_sync = Arc::clone(&sync);
63 let background_thread = Some(std::thread::spawn(move || {
64 loop {
65 let sync_status = cloned_sync.wait_read(|data| task(data));
66 if sync_status == SyncStatus::Disconnected {
67 break;
68 }
69 }
70 }));
71
72 Self {
73 sync,
74 background_thread,
75 }
76 }
77
78 /// Trigger the runner's task if it's not currently running.
79 ///
80 /// Also see [`Self::update_with()`] for details.
81 ///
82 /// `T::clone_from()` is invoked to update the data that is passed to the task.
83 pub fn update(&self, new_data: &T)
84 where
85 T: Clone,
86 {
87 self.update_with(|data| data.clone_from(new_data));
88 }
89
90 /// Trigger the runner's task if it's not currently running.
91 ///
92 /// This will never block the current thread. If the task is currently running
93 /// and thus not able to process this update request, this update is dicarded
94 /// and this method returns immediately without any effect (also `f` is not
95 /// invoked in this case).
96 ///
97 /// The given closure is invoked with a mutable reference to the currently
98 /// stored data to allow for its modification (e.g. via `clone_from`).
99 pub fn update_with(&self, f: impl FnOnce(&mut T)) {
100 self.sync.try_write(f);
101 }
102
103 /// Trigger the runner's task, blocking until it can be executed.
104 ///
105 /// Also see [`Self::wait_and_update_with()`] for details.
106 ///
107 /// `T::clone_from()` is invoked to update the data that is passed to the task.
108 pub fn wait_and_update(&self, new_data: &T)
109 where
110 T: Clone,
111 {
112 self.wait_and_update_with(|data| data.clone_from(new_data));
113 }
114
115 /// Trigger the runner's task, blocking until it can be executed.
116 ///
117 /// Unlike [`Self::update_with()`], this method will block the current
118 /// thread until the `new_data` can be handed over to the background task.
119 /// Thus, it is ensured that the task will see this or a later submitted
120 /// datum, given that is does not panic and the program is not terminated.
121 ///
122 /// This function is particularly useful in conjunction with
123 /// [`Self::join()`] to ensure that the last update is processed completely.
124 ///
125 /// The given closure is invoked with a mutable reference to the currently
126 /// stored data to allow for its modification.
127 pub fn wait_and_update_with(&self, f: impl FnOnce(&mut T)) {
128 self.sync.wait_write(f);
129 }
130
131 /// Wait for the background thread to finish.
132 ///
133 /// This function will wait until the background thread has finished executing
134 /// it's last task and exited. This is useful to ensure that the last task
135 /// has completed, in particular, before terminating the program.
136 ///
137 /// Notice, if the program is terminating, without calling this method,
138 /// the background thread might be terminated abruptly, possibly
139 /// in the middle of executing the task.
140 pub fn join(mut self) {
141 // Signal the background thread to exit
142 self.sync.disconnect();
143
144 // Wait for the background thread to finish
145 if let Some(thread) = self.background_thread.take() {
146 let _ = thread.join();
147 };
148 }
149}
150
151impl<T> Drop for BackgroundRunner<T> {
152 fn drop(&mut self) {
153 // Notify the runner thread without blocking here
154 self.sync.disconnect();
155 }
156}
157
158#[must_use]
159#[derive(Debug, Copy, Clone, PartialEq, Eq)]
160enum SyncStatus {
161 Connected,
162 Disconnected,
163}
164
165#[derive(Debug)]
166struct Sync<T> {
167 mutex: Mutex<State<T>>,
168 cvar: Condvar,
169 is_disconnected: AtomicBool,
170}
171
172impl<T> Sync<T> {
173 fn new(value: T) -> Self {
174 let state = State::new(value);
175 let mutex = Mutex::new(state);
176 let cvar = Condvar::new();
177 let is_disconnected = AtomicBool::new(false);
178
179 Self {
180 mutex,
181 cvar,
182 is_disconnected,
183 }
184 }
185
186 /// Used by the runner thread to wait for a data update
187 ///
188 /// Returns `SyncStatus::Disconnected` if the background runner is disconnected.
189 fn wait_read(&self, f: impl FnOnce(&T)) -> SyncStatus {
190 let guard = self.mutex.lock().unwrap();
191 let mut state = self
192 .cvar
193 .wait_while(guard, |state| {
194 !state.is_dirty && !self.is_disconnected.load(atomic::Ordering::SeqCst)
195 })
196 .unwrap();
197
198 if state.is_dirty {
199 f(state.read());
200 }
201
202 if self.is_disconnected.load(atomic::Ordering::SeqCst) {
203 return SyncStatus::Disconnected;
204 }
205
206 SyncStatus::Connected
207 }
208
209 /// Invokes the given closure if the background runner is not currently working.
210 fn try_write(&self, f: impl FnOnce(&mut T)) {
211 let mut state = match self.mutex.try_lock() {
212 Ok(state) => state,
213 Err(TryLockError::Poisoned(p)) => panic!("Runner panicked: {p}"),
214 Err(TryLockError::WouldBlock) => return,
215 };
216
217 f(state.write());
218 self.cvar.notify_all();
219 }
220
221 /// Invokes the given closure, blocking until the lock is acquired.
222 fn wait_write(&self, f: impl FnOnce(&mut T)) {
223 let mut state = self.mutex.lock().expect("Runner panicked");
224 f(state.write());
225 self.cvar.notify_all();
226 }
227
228 /// Marks this sync object as disconnected
229 ///
230 /// This is used to signal the runner thread about the background runner being dropped.
231 fn disconnect(&self) {
232 self.is_disconnected.store(true, atomic::Ordering::SeqCst);
233 self.cvar.notify_all();
234 }
235}
236
237/// State protected by the mutex
238#[derive(Debug)]
239struct State<T> {
240 /// Actual data stored by calls to [`BackgroundRunner::update()`]
241 data: T,
242
243 /// Flag indicating whether the data has been updated since the last read
244 is_dirty: bool,
245}
246
247impl<T> State<T> {
248 fn new(data: T) -> Self {
249 Self {
250 data,
251 is_dirty: false,
252 }
253 }
254
255 /// Marks this state as not dirty and returns a reference to the data
256 fn read(&mut self) -> &T {
257 self.is_dirty = false;
258 &self.data
259 }
260
261 /// Marks this state as dirty and returns a mutable reference to the data
262 fn write(&mut self) -> &mut T {
263 self.is_dirty = true;
264 &mut self.data
265 }
266}