background_runner/lib.rs
1//! This crate provides the [`BackgroundRunner`] struct, which runs a
2//! given task in the background once [`BackgroundRunner::update()`] is called.
3//!
4//! ```rust
5//! use background_runner::BackgroundRunner;
6//! use std::thread::sleep;
7//! use std::time::Duration;
8//! use std::time::Instant;
9//!
10//! let mut runner_iter = 0;
11//!
12//! // Set up the runner, giving it a task to run
13//! let runner = BackgroundRunner::new(move |iter| {
14//! // Simulate some heavy work
15//! println!("runner_iter = {runner_iter}, iter = {iter}");
16//! runner_iter += 1;
17//! sleep(Duration::from_millis(10));
18//! });
19//!
20//! let start = Instant::now();
21//! let mut iter = 0;
22//! while start.elapsed().as_millis() < 100 {
23//! // Update the runner with the current loop iteration
24//! runner.update(&iter);
25//! iter += 1;
26//! }
27//! ```
28
29#[cfg(test)]
30mod test;
31
32use std::sync::Arc;
33use std::sync::Condvar;
34use std::sync::Mutex;
35use std::sync::TryLockError;
36use std::sync::atomic;
37use std::sync::atomic::AtomicBool;
38
39/// Runner for a background task to be invoked periodically.
40#[derive(Debug)]
41pub struct BackgroundRunner<T> {
42 sync: Arc<Sync<T>>,
43}
44
45impl<T: 'static + Send> BackgroundRunner<T> {
46 /// Create a new background runner with the given task
47 ///
48 /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
49 pub fn new(task: impl 'static + Send + FnMut(&T)) -> Self
50 where
51 T: Default,
52 {
53 Self::with_init_data(T::default(), task)
54 }
55
56 /// Create a new background runner with the given initial data and task
57 ///
58 /// This spawns a new thread to execute the task periodically, triggered by the [`Self::update()`] method.
59 pub fn with_init_data(init_data: T, mut task: impl 'static + Send + FnMut(&T)) -> Self {
60 let sync = Arc::new(Sync::new(init_data));
61 let cloned_sync = Arc::clone(&sync);
62 std::thread::spawn(move || {
63 loop {
64 let sync_status = cloned_sync.wait_read(|data| task(data));
65 if sync_status == SyncStatus::Disconnected {
66 break;
67 }
68 }
69 });
70
71 Self { sync }
72 }
73
74 /// Trigger the runner's task if it's not currently running.
75 ///
76 /// This will never block the current thread. If the task is currently running
77 /// and thus not able to process this update request, nothing will be done and this method returns immediately.
78 ///
79 /// `T::clone_from()` is invoked to update the data that is passed to the task.
80 pub fn update(&self, new_data: &T)
81 where
82 T: Clone,
83 {
84 self.update_with(|data| data.clone_from(new_data));
85 }
86
87 /// Trigger the runner's task if it's not currently running.
88 ///
89 /// This will never block the current thread. If the task is currently running
90 /// and thus not able to process this update request, nothing will be done and this method returns immediately.
91 ///
92 /// The given closure is invoked with a mutable reference to the currently stored data to allow for its modification
93 pub fn update_with(&self, f: impl FnOnce(&mut T)) {
94 self.sync.try_write(f);
95 }
96}
97
98impl<T> Drop for BackgroundRunner<T> {
99 fn drop(&mut self) {
100 // Notify the runner thread without blocking here
101 self.sync.disconnect();
102 }
103}
104
105#[must_use]
106#[derive(Debug, Copy, Clone, PartialEq, Eq)]
107enum SyncStatus {
108 Connected,
109 Disconnected,
110}
111
112#[derive(Debug)]
113struct Sync<T> {
114 mutex: Mutex<State<T>>,
115 cvar: Condvar,
116 is_disconnected: AtomicBool,
117}
118
119impl<T> Sync<T> {
120 fn new(value: T) -> Self {
121 let state = State::new(value);
122 let mutex = Mutex::new(state);
123 let cvar = Condvar::new();
124 let is_disconnected = AtomicBool::new(false);
125
126 Self {
127 mutex,
128 cvar,
129 is_disconnected,
130 }
131 }
132
133 /// Used by the runner thread to wait for a data update
134 ///
135 /// Returns `SyncStatus::Disconnected` if the background runner is disconnected.
136 fn wait_read(&self, f: impl FnOnce(&T)) -> SyncStatus {
137 let guard = self.mutex.lock().unwrap();
138 let mut state = self
139 .cvar
140 .wait_while(guard, |state| {
141 !state.is_dirty && !self.is_disconnected.load(atomic::Ordering::SeqCst)
142 })
143 .unwrap();
144
145 if self.is_disconnected.load(atomic::Ordering::SeqCst) {
146 return SyncStatus::Disconnected;
147 }
148
149 f(state.read());
150
151 SyncStatus::Connected
152 }
153
154 /// Invokes the given closure if the background runner is not currently working.
155 fn try_write(&self, f: impl FnOnce(&mut T)) {
156 let mut state = match self.mutex.try_lock() {
157 Ok(state) => state,
158 Err(TryLockError::Poisoned(p)) => panic!("Runner panicked: {p}"),
159 Err(TryLockError::WouldBlock) => return,
160 };
161
162 f(state.write());
163 self.cvar.notify_all();
164 }
165
166 /// Marks this sync object as disconnected
167 ///
168 /// This is used to signal the runner thread about the background runner being dropped.
169 fn disconnect(&self) {
170 self.is_disconnected.store(true, atomic::Ordering::SeqCst);
171 self.cvar.notify_all();
172 }
173}
174
175/// State protected by the mutex
176#[derive(Debug)]
177struct State<T> {
178 /// Actual data stored by calls to [`BackgroundRunner::update()`]
179 data: T,
180
181 /// Flag indicating whether the data has been updated since the last read
182 is_dirty: bool,
183}
184
185impl<T> State<T> {
186 fn new(data: T) -> Self {
187 Self {
188 data,
189 is_dirty: false,
190 }
191 }
192
193 /// Marks this state as not dirty and returns a reference to the data
194 fn read(&mut self) -> &T {
195 self.is_dirty = false;
196 &self.data
197 }
198
199 /// Marks this state as dirty and returns a mutable reference to the data
200 fn write(&mut self) -> &mut T {
201 self.is_dirty = true;
202 &mut self.data
203 }
204}