background_runner/
lib.rs

1//! This crate provides the [`BackgroundRunner`] struct, which runs a
2//! given task in the background once [`BackgroundRunner::update()`] is called.
3//!
4//! ```rust
5//! use background_runner::BackgroundRunner;
6//! use std::thread::sleep;
7//! use std::time::Duration;
8//! use std::time::Instant;
9//!
10//! let mut runner_iter = 0;
11//!
12//! // Set up the runner, giving it a task to run
13//! let runner = BackgroundRunner::new(move |iter| {
14//!     // Simulate some heavy work
15//!     println!("runner_iter = {runner_iter}, iter = {iter}");
16//!     runner_iter += 1;
17//!     sleep(Duration::from_millis(10));
18//! });
19//!
20//! let start = Instant::now();
21//! let mut iter = 0;
22//! while start.elapsed().as_millis() < 100 {
23//!     // Update the runner with the current loop iteration
24//!     runner.update(&iter);
25//!     iter += 1;
26//! }
27//! ```
28
29#[cfg(test)]
30mod test;
31
32use std::sync::Arc;
33use std::sync::Condvar;
34use std::sync::Mutex;
35use std::sync::TryLockError;
36use std::sync::atomic;
37use std::sync::atomic::AtomicBool;
38
39/// Runner for a background task to be invoked periodically.
40#[derive(Debug)]
41pub struct BackgroundRunner<T> {
42    sync: Arc<Sync<T>>,
43    background_thread: Option<std::thread::JoinHandle<()>>,
44}
45
46impl<T: 'static + Send> BackgroundRunner<T> {
47    /// Create a new background runner with the given task
48    ///
49    /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
50    pub fn new(task: impl 'static + Send + FnMut(&T)) -> Self
51    where
52        T: Default,
53    {
54        Self::with_init_data(T::default(), task)
55    }
56
57    /// Create a new background runner with the given initial data and task
58    ///
59    /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
60    pub fn with_init_data(init_data: T, mut task: impl 'static + Send + FnMut(&T)) -> Self {
61        let sync = Arc::new(Sync::new(init_data));
62        let cloned_sync = Arc::clone(&sync);
63        let background_thread = Some(std::thread::spawn(move || {
64            loop {
65                let sync_status = cloned_sync.wait_read(|data| task(data));
66                if sync_status == SyncStatus::Disconnected {
67                    break;
68                }
69            }
70        }));
71
72        Self {
73            sync,
74            background_thread,
75        }
76    }
77
78    /// Trigger the runner's task if it's not currently running.
79    ///
80    /// Also see [`Self::update_with()`] for details.
81    ///
82    /// `T::clone_from()` is invoked to update the data that is passed to the task.
83    pub fn update(&self, new_data: &T)
84    where
85        T: Clone,
86    {
87        self.update_with(|data| data.clone_from(new_data));
88    }
89
90    /// Trigger the runner's task if it's not currently running.
91    ///
92    /// This will never block the current thread. If the task is currently running
93    /// and thus not able to process this update request, this update is dicarded
94    /// and this method returns immediately without any effect (also `f` is not
95    /// invoked in this case).
96    ///
97    /// The given closure is invoked with a mutable reference to the currently
98    /// stored data to allow for its modification (e.g. via `clone_from`).
99    pub fn update_with(&self, f: impl FnOnce(&mut T)) {
100        self.sync.try_write(f);
101    }
102
103    /// Trigger the runner's task, blocking until it can be executed.
104    ///
105    /// Also see [`Self::wait_and_update_with()`] for details.
106    ///
107    /// `T::clone_from()` is invoked to update the data that is passed to the task.
108    pub fn wait_and_update(&self, new_data: &T)
109    where
110        T: Clone,
111    {
112        self.wait_and_update_with(|data| data.clone_from(new_data));
113    }
114
115    /// Trigger the runner's task, blocking until it can be executed.
116    ///
117    /// Unlike [`Self::update_with()`], this method will block the current
118    /// thread until the `new_data` can be handed over to the background task.
119    /// Thus, it is ensured that the task will see this or a later submitted
120    /// datum, given that is does not panic and the program is not terminated.
121    ///
122    /// This function is particularly useful in conjunction with
123    /// [`Self::join()`] to ensure that the last update is processed completely.
124    ///
125    /// The given closure is invoked with a mutable reference to the currently
126    /// stored data to allow for its modification.
127    pub fn wait_and_update_with(&self, f: impl FnOnce(&mut T)) {
128        self.sync.wait_write(f);
129    }
130
131    /// Wait for the background thread to finish.
132    ///
133    /// This function will wait until the background thread has finished executing
134    /// it's last task and exited. This is useful to ensure that the last task
135    /// has completed, in particular, before terminating the program.
136    ///
137    /// Notice, if the program is terminating, without calling this method,
138    /// the background thread might be terminated abruptly, possibly
139    /// in the middle of executing the task.
140    pub fn join(mut self) {
141        // Signal the background thread to exit
142        self.sync.disconnect();
143
144        // Wait for the background thread to finish
145        if let Some(thread) = self.background_thread.take() {
146            let _ = thread.join();
147        };
148    }
149}
150
151impl<T> Drop for BackgroundRunner<T> {
152    fn drop(&mut self) {
153        // Notify the runner thread without blocking here
154        self.sync.disconnect();
155    }
156}
157
158#[must_use]
159#[derive(Debug, Copy, Clone, PartialEq, Eq)]
160enum SyncStatus {
161    Connected,
162    Disconnected,
163}
164
165#[derive(Debug)]
166struct Sync<T> {
167    mutex: Mutex<State<T>>,
168    cvar: Condvar,
169    is_disconnected: AtomicBool,
170}
171
172impl<T> Sync<T> {
173    fn new(value: T) -> Self {
174        let state = State::new(value);
175        let mutex = Mutex::new(state);
176        let cvar = Condvar::new();
177        let is_disconnected = AtomicBool::new(false);
178
179        Self {
180            mutex,
181            cvar,
182            is_disconnected,
183        }
184    }
185
186    /// Used by the runner thread to wait for a data update
187    ///
188    /// Returns `SyncStatus::Disconnected` if the background runner is disconnected.
189    fn wait_read(&self, f: impl FnOnce(&T)) -> SyncStatus {
190        let guard = self.mutex.lock().unwrap();
191        let mut state = self
192            .cvar
193            .wait_while(guard, |state| {
194                !state.is_dirty && !self.is_disconnected.load(atomic::Ordering::SeqCst)
195            })
196            .unwrap();
197
198        if state.is_dirty {
199            f(state.read());
200        }
201
202        if self.is_disconnected.load(atomic::Ordering::SeqCst) {
203            return SyncStatus::Disconnected;
204        }
205
206        SyncStatus::Connected
207    }
208
209    /// Invokes the given closure if the background runner is not currently working.
210    fn try_write(&self, f: impl FnOnce(&mut T)) {
211        let mut state = match self.mutex.try_lock() {
212            Ok(state) => state,
213            Err(TryLockError::Poisoned(p)) => panic!("Runner panicked: {p}"),
214            Err(TryLockError::WouldBlock) => return,
215        };
216
217        f(state.write());
218        self.cvar.notify_all();
219    }
220
221    /// Invokes the given closure, blocking until the lock is acquired.
222    fn wait_write(&self, f: impl FnOnce(&mut T)) {
223        let mut state = self.mutex.lock().expect("Runner panicked");
224        f(state.write());
225        self.cvar.notify_all();
226    }
227
228    /// Marks this sync object as disconnected
229    ///
230    /// This is used to signal the runner thread about the background runner being dropped.
231    fn disconnect(&self) {
232        self.is_disconnected.store(true, atomic::Ordering::SeqCst);
233        self.cvar.notify_all();
234    }
235}
236
237/// State protected by the mutex
238#[derive(Debug)]
239struct State<T> {
240    /// Actual data stored by calls to [`BackgroundRunner::update()`]
241    data: T,
242
243    /// Flag indicating whether the data has been updated since the last read
244    is_dirty: bool,
245}
246
247impl<T> State<T> {
248    fn new(data: T) -> Self {
249        Self {
250            data,
251            is_dirty: false,
252        }
253    }
254
255    /// Marks this state as not dirty and returns a reference to the data
256    fn read(&mut self) -> &T {
257        self.is_dirty = false;
258        &self.data
259    }
260
261    /// Marks this state as dirty and returns a mutable reference to the data
262    fn write(&mut self) -> &mut T {
263        self.is_dirty = true;
264        &mut self.data
265    }
266}