background_runner/
lib.rs

1//! This crate provides the [`BackgroundRunner`] struct, which runs a
2//! given task in the background once [`BackgroundRunner::update()`] is called.
3//!
4//! ```rust
5//! use background_runner::BackgroundRunner;
6//! use std::thread::sleep;
7//! use std::time::Duration;
8//! use std::time::Instant;
9//!
10//! let mut runner_iter = 0;
11//!
12//! // Set up the runner, giving it a task to run
13//! let runner = BackgroundRunner::new(move |iter| {
14//!     // Simulate some heavy work
15//!     println!("runner_iter = {runner_iter}, iter = {iter}");
16//!     runner_iter += 1;
17//!     sleep(Duration::from_millis(10));
18//! });
19//!
20//! let start = Instant::now();
21//! let mut iter = 0;
22//! while start.elapsed().as_millis() < 100 {
23//!     // Update the runner with the current loop iteration
24//!     runner.update(&iter);
25//!     iter += 1;
26//! }
27//! ```
28
29#[cfg(test)]
30mod test;
31
32use std::sync::Arc;
33use std::sync::Condvar;
34use std::sync::Mutex;
35use std::sync::TryLockError;
36use std::sync::atomic;
37use std::sync::atomic::AtomicBool;
38
39/// Runner for a background task to be invoked periodically.
40#[derive(Debug)]
41pub struct BackgroundRunner<T> {
42    sync: Arc<Sync<T>>,
43}
44
45impl<T: 'static + Send> BackgroundRunner<T> {
46    /// Create a new background runner with the given task
47    ///
48    /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
49    pub fn new(task: impl 'static + Send + FnMut(&T)) -> Self
50    where
51        T: Default,
52    {
53        Self::with_init_data(T::default(), task)
54    }
55
56    /// Create a new background runner with the given initial data and task
57    ///
58    /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
59    pub fn with_init_data(init_data: T, mut task: impl 'static + Send + FnMut(&T)) -> Self {
60        let sync = Arc::new(Sync::new(init_data));
61        let cloned_sync = Arc::clone(&sync);
62        std::thread::spawn(move || {
63            loop {
64                let sync_status = cloned_sync.wait_read(|data| task(data));
65                if sync_status == SyncStatus::Disconnected {
66                    break;
67                }
68            }
69        });
70
71        Self { sync }
72    }
73
74    /// Trigger the runner's task if it's not currently running.
75    ///
76    /// This will never block the current thread. If the task is currently running
77    /// and thus not able to process this update request, nothing will be done and this method returns immediately.
78    ///
79    /// `T::clone_from()` is invoked to update the data that is passed to the task.
80    pub fn update(&self, new_data: &T)
81    where
82        T: Clone,
83    {
84        self.update_with(|data| data.clone_from(new_data));
85    }
86
87    /// Trigger the runner's task if it's not currently running.
88    ///
89    /// This will never block the current thread. If the task is currently running
90    /// and thus not able to process this update request, nothing will be done and this method returns immediately.
91    ///
92    /// The given closure is invoked with a mutable reference to the currently stored data to allow for its modification
93    pub fn update_with(&self, f: impl FnOnce(&mut T)) {
94        self.sync.try_write(f);
95    }
96}
97
98impl<T> Drop for BackgroundRunner<T> {
99    fn drop(&mut self) {
100        // Notify the runner thread without blocking here
101        self.sync.disconnect();
102    }
103}
104
105#[must_use]
106#[derive(Debug, Copy, Clone, PartialEq, Eq)]
107enum SyncStatus {
108    Connected,
109    Disconnected,
110}
111
112#[derive(Debug)]
113struct Sync<T> {
114    mutex: Mutex<State<T>>,
115    cvar: Condvar,
116    is_disconnected: AtomicBool,
117}
118
119impl<T> Sync<T> {
120    fn new(value: T) -> Self {
121        let state = State::new(value);
122        let mutex = Mutex::new(state);
123        let cvar = Condvar::new();
124        let is_disconnected = AtomicBool::new(false);
125
126        Self {
127            mutex,
128            cvar,
129            is_disconnected,
130        }
131    }
132
133    /// Used by the runner thread to wait for a data update
134    ///
135    /// Returns `SyncStatus::Disconnected` if the background runner is disconnected.
136    fn wait_read(&self, f: impl FnOnce(&T)) -> SyncStatus {
137        let guard = self.mutex.lock().unwrap();
138        let mut state = self
139            .cvar
140            .wait_while(guard, |state| {
141                !state.is_dirty && !self.is_disconnected.load(atomic::Ordering::SeqCst)
142            })
143            .unwrap();
144
145        if self.is_disconnected.load(atomic::Ordering::SeqCst) {
146            return SyncStatus::Disconnected;
147        }
148
149        f(state.read());
150
151        SyncStatus::Connected
152    }
153
154    /// Invokes the given closure if the background runner is not currently working.
155    fn try_write(&self, f: impl FnOnce(&mut T)) {
156        let mut state = match self.mutex.try_lock() {
157            Ok(state) => state,
158            Err(TryLockError::Poisoned(p)) => panic!("Runner panicked: {p}"),
159            Err(TryLockError::WouldBlock) => return,
160        };
161
162        f(state.write());
163        self.cvar.notify_all();
164    }
165
166    /// Marks this sync object as disconnected
167    ///
168    /// This is used to signal the runner thread about the background runner being dropped.
169    fn disconnect(&self) {
170        self.is_disconnected.store(true, atomic::Ordering::SeqCst);
171        self.cvar.notify_all();
172    }
173}
174
175/// State protected by the mutex
176#[derive(Debug)]
177struct State<T> {
178    /// Actual data stored by calls to [`BackgroundRunner::update()`]
179    data: T,
180
181    /// Flag indicating whether the data has been updated since the last read
182    is_dirty: bool,
183}
184
185impl<T> State<T> {
186    fn new(data: T) -> Self {
187        Self {
188            data,
189            is_dirty: false,
190        }
191    }
192
193    /// Marks this state as not dirty and returns a reference to the data
194    fn read(&mut self) -> &T {
195        self.is_dirty = false;
196        &self.data
197    }
198
199    /// Marks this state as dirty and returns a mutable reference to the data
200    fn write(&mut self) -> &mut T {
201        self.is_dirty = true;
202        &mut self.data
203    }
204}