refreshable/lib.rs
1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! A simple wrapper around a value that changes over time.
15//!
16//! A `Refreshable` provides access to both the current value and also ways to be notified of changes made in the
17//! future. Users can *subscribe* to the refreshable, registering a callback which is invoked whenever the value
18//! changes. Additionally, users can *map* a refreshable of one type to a refreshable of another type, with the new
19//! refreshable being updated based on changes to the original refreshable. For example, a caching component of a
20//! service may map a `Refreshable<ServiceConfiguration>` down to a `Refreshable<CacheConfiguration>` so it can
21//! subscribe specifically to only the configuration changes that matter to it.
22//!
23//! A `Subscription` is returned when subscribing to a refreshable which acts as a guard type, unregistering the
24//! subscription when dropped. If you intend the subscription to last for the lifetime of the refreshable, you can use
25//! the `Subscription::leak` method to allow the `Subscription` to fall out of scope without unregistering.
26//!
27//! A `RefreshHandle` is returned when creating a new `Refreshable` which is used to update its value. Subscriptions
28//! are fallible, and all errors encountered when running subscriptions in response to an update are reported through
29//! the `RefreshHandle::refresh` method.
30//!
31//! # Examples
32//!
33//! ```
34//! use refreshable::Refreshable;
35//!
36//! #[derive(PartialEq)]
37//! struct ServiceConfiguration {
38//! cache: CacheConfiguration,
39//! some_other_thing: u32,
40//! }
41//!
42//! #[derive(PartialEq, Clone)]
43//! struct CacheConfiguration {
44//! size: usize,
45//! }
46//!
47//! let initial_config = ServiceConfiguration {
48//! cache: CacheConfiguration {
49//! size: 10,
50//! },
51//! some_other_thing: 5,
52//! };
53//! let (refreshable, mut handle) = Refreshable::new(initial_config);
54//!
55//! let cache_refreshable = refreshable.map(|config| config.cache.clone());
56//!
57//! let subscription = cache_refreshable.try_subscribe(|cache| {
58//! if cache.size == 0 {
59//! Err("cache size must be positive")
60//! } else {
61//! println!("new cache size is {}", cache.size);
62//! Ok(())
63//! }
64//! }).unwrap();
65//!
66//! let new_config = ServiceConfiguration {
67//! cache: CacheConfiguration {
68//! size: 20,
69//! },
70//! some_other_thing: 5,
71//! };
72//! // "new cache size is 20" is printed.
73//! handle.refresh(new_config).unwrap();
74//!
75//! let new_config = ServiceConfiguration {
76//! cache: CacheConfiguration {
77//! size: 20,
78//! },
79//! some_other_thing: 10,
80//! };
81//! // nothing is printed since the cache configuration did not change.
82//! handle.refresh(new_config).unwrap();
83//!
84//! drop(subscription);
85//! let new_config = ServiceConfiguration {
86//! cache: CacheConfiguration {
87//! size: 0,
88//! },
89//! some_other_thing: 10,
90//! };
91//! // nothing is printed since the the cache subscription was dropped.
92//! handle.refresh(new_config).unwrap();
93//! ```
94#![doc(html_root_url = "https://docs.rs/refreshable/1")]
95#![warn(clippy::all, missing_docs)]
96
97use arc_swap::ArcSwap;
98use parking_lot::Mutex;
99use std::collections::HashMap;
100use std::marker::PhantomData;
101use std::ops::Deref;
102use std::sync::atomic::{AtomicU64, Ordering};
103use std::sync::Arc;
104
105#[cfg(test)]
106mod test;
107
108trait Cleanup {}
109
110impl<T, E> Cleanup for Subscription<T, E> {}
111
112struct RawCallback<F: ?Sized> {
113 _cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
114 // We need to run failing callbacks again on a refresh even if the value didn't change. Otherwise, you can "lose"
115 // errors when the refreshable update to the same value.
116 ok: bool,
117 callback: F,
118}
119
120type Callback<T, E> = RawCallback<dyn FnMut(&T, &mut Vec<E>) + Sync + Send>;
121
122struct Shared<T, E> {
123 value: ArcSwap<T>,
124 // This guards subscription registration against concurrent refreshes. If that happened, the subscription callback
125 // could miss updates that happen after the subscription is called with the "current" value but before it is
126 // inserted in the callbacks map.
127 update_lock: Mutex<()>,
128 // This is a nested mess of mutexes and arcs to allow us to process callbacks while allowing subscription
129 // modifications. Otherwise, a subscription that tried to unsubscribe itself would deadlock.
130 #[allow(clippy::type_complexity)]
131 callbacks: Mutex<Arc<HashMap<u64, Arc<Mutex<Callback<T, E>>>>>>,
132}
133
134/// A wrapper around a live-refreshable value.
135pub struct Refreshable<T, E> {
136 shared: Arc<Shared<T, E>>,
137 next_id: AtomicU64,
138 // This is used to unsubscribe a mapped refreshable from its parent refreshable. A copy of the Arc is held in the
139 // refreshable itself, along with every subscription of the mapped refreshable. The inner dyn Drop is a Subscription
140 // type.
141 cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
142}
143
144impl<T, E> Refreshable<T, E>
145where
146 T: PartialEq + 'static + Sync + Send,
147 E: 'static,
148{
149 /// Creates a new `Refreshable` with an initial value, returning it along with a `RefreshHandle` used to update it
150 /// with new values.
151 pub fn new(value: T) -> (Refreshable<T, E>, RefreshHandle<T, E>) {
152 let shared = Arc::new(Shared {
153 value: ArcSwap::new(Arc::new(value)),
154 update_lock: Mutex::new(()),
155 callbacks: Mutex::new(Arc::new(HashMap::new())),
156 });
157
158 (
159 Refreshable {
160 shared: shared.clone(),
161 next_id: AtomicU64::new(0),
162 cleanup: None,
163 },
164 RefreshHandle { shared },
165 )
166 }
167
168 /// Returns a guard type providing access to a snapshot of the refreshable's current value.
169 #[inline]
170 pub fn get(&self) -> Guard<'_, T> {
171 Guard {
172 inner: self.shared.value.load(),
173 _p: PhantomData,
174 }
175 }
176
177 /// Subscribes to the refreshable with an infallible callback.
178 ///
179 /// The callback will be invoked every time the refreshable's value changes, and is also called synchronously when
180 /// this method is called with the current value.
181 pub fn subscribe<F>(&self, mut callback: F) -> Subscription<T, E>
182 where
183 F: FnMut(&T) + 'static + Sync + Send,
184 {
185 self.try_subscribe(move |value| {
186 callback(value);
187 Ok(())
188 })
189 .ok()
190 .unwrap()
191 }
192
193 /// Subscribes to the refreshable with a fallible callback.
194 ///
195 /// The callback will be invoked every time the refreshable's value changes, and is also called synchronously when
196 /// this method is called with the current value. If the callback returns `Ok`, a `Subscription` object is returned
197 /// that will unsubscribe from the refreshable when it drops. If the callback returns `Err`, this method will return
198 /// the error and the callback will *not* be invoked on updates to the value. Errors in subsequent invocations will
199 /// be propagated to the originating [`RefreshHandle::refresh`] call.
200 pub fn try_subscribe<F>(&self, mut callback: F) -> Result<Subscription<T, E>, E>
201 where
202 F: FnMut(&T) -> Result<(), E> + 'static + Sync + Send,
203 {
204 let _guard = self.shared.update_lock.lock();
205 callback(&self.get())?;
206
207 let subscription = self.subscribe_raw(move |value, errors| {
208 if let Err(e) = callback(value) {
209 errors.push(e);
210 }
211 });
212
213 Ok(subscription)
214 }
215
216 fn subscribe_raw<F>(&self, callback: F) -> Subscription<T, E>
217 where
218 F: FnMut(&T, &mut Vec<E>) + 'static + Sync + Send,
219 {
220 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
221 let callback = Arc::new(Mutex::new(RawCallback {
222 _cleanup: self.cleanup.clone(),
223 ok: true,
224 callback,
225 }));
226 Arc::make_mut(&mut *self.shared.callbacks.lock()).insert(id, callback);
227
228 Subscription {
229 shared: self.shared.clone(),
230 id,
231 live: true,
232 }
233 }
234
235 /// Creates a new refreshable from this one by applying a mapping function to the value.
236 ///
237 /// This can be used to narrow the scope of the refreshable value. Updates to the initial refreshable value will
238 /// propagate to the mapped refreshable value, but the mapped refreshable's subscriptions will only be invoked if
239 /// the mapped value actually changed.
240 pub fn map<F, R>(&self, mut map: F) -> Refreshable<R, E>
241 where
242 F: FnMut(&T) -> R + 'static + Sync + Send,
243 R: PartialEq + 'static + Sync + Send,
244 {
245 self.try_map(move |v| Ok(map(v))).ok().unwrap()
246 }
247
248 /// Creates a new refreshable from this one by applying a fallible mapping function to the value.
249 ///
250 /// This can be used to narrow the scope of the refreshable value. Updates to the initial refreshable value will
251 /// propagate to the mapped refreshable value, but the mapped refreshable's subscriptions will only be invoked if
252 /// the mapped value actually changed.
253 pub fn try_map<F, R>(&self, mut map: F) -> Result<Refreshable<R, E>, E>
254 where
255 F: FnMut(&T) -> Result<R, E> + 'static + Sync + Send,
256 R: PartialEq + 'static + Sync + Send,
257 {
258 let _guard = self.shared.update_lock.lock();
259 let (mut refreshable, mut handle) = Refreshable::new(map(&self.get())?);
260 let subscription = self.subscribe_raw(move |value, errors| match map(value) {
261 Ok(value) => handle.refresh_raw(value, errors),
262 Err(e) => errors.push(e),
263 });
264 refreshable.cleanup = Some(Arc::new(subscription));
265 Ok(refreshable)
266 }
267}
268
269/// A subscription to a `Refreshable` value.
270///
271/// The associated subscription is unregistered when this value is dropped, unless the `Subscription::leak` method is
272/// used.
273#[must_use = "the associated subscription is unregistered when this value is dropped"]
274pub struct Subscription<T, E> {
275 shared: Arc<Shared<T, E>>,
276 id: u64,
277 live: bool,
278}
279
280impl<T, E> Drop for Subscription<T, E> {
281 fn drop(&mut self) {
282 if self.live {
283 Arc::make_mut(&mut *self.shared.callbacks.lock()).remove(&self.id);
284 }
285 }
286}
287
288impl<T, E> Subscription<T, E> {
289 /// Destroys the guard without unregistering its associated subscription.
290 pub fn leak(mut self) {
291 self.live = false;
292 }
293}
294
295/// A handle that can update the value associated with a refreshable.
296pub struct RefreshHandle<T, E> {
297 shared: Arc<Shared<T, E>>,
298}
299
300impl<T, E> RefreshHandle<T, E>
301where
302 T: PartialEq + 'static + Sync + Send,
303{
304 /// Updates the refreshable's value.
305 ///
306 /// If the new value is equal to the refreshable's current value, the method returns immediately. Otherwise, it
307 /// runs all registered subscriptions, collecting any errors and returning them all when finished.
308 // NB: It's important that this takes &mut self. That way, all the way down through the tree of mapped refreshables,
309 // we don't need to worry about concurrent refreshes.
310 pub fn refresh(&mut self, new_value: T) -> Result<(), Vec<E>> {
311 let mut errors = vec![];
312
313 self.refresh_raw(new_value, &mut errors);
314
315 if errors.is_empty() {
316 Ok(())
317 } else {
318 Err(errors)
319 }
320 }
321
322 fn refresh_raw(&mut self, new_value: T, errors: &mut Vec<E>) {
323 // We could avoid updating the inner value when it hasn't changed but the complexity doesn't seem worth it.
324 let value_changed = new_value != **self.shared.value.load();
325
326 let guard = self.shared.update_lock.lock();
327 self.shared.value.store(Arc::new(new_value));
328 let value = self.shared.value.load();
329 let callbacks = self.shared.callbacks.lock().clone();
330 drop(guard);
331
332 for callback in callbacks.values() {
333 let mut callback = callback.lock();
334 if value_changed || !callback.ok {
335 let nerrors = errors.len();
336 (callback.callback)(&value, errors);
337 callback.ok = errors.len() == nerrors;
338 }
339 }
340 }
341}
342
343/// A guard type providing access to a snapshot of a refreshable's current value.
344pub struct Guard<'a, T> {
345 inner: arc_swap::Guard<Arc<T>>,
346 // the arc_swap guard doesn't borrow from its ArcSwap, but we don't want to expose that fact in the public API
347 _p: PhantomData<&'a ()>,
348}
349
350impl<T> Deref for Guard<'_, T> {
351 type Target = T;
352
353 #[inline]
354 fn deref(&self) -> &T {
355 &self.inner
356 }
357}