refreshable/
lib.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! A simple wrapper around a value that changes over time.
15//!
16//! A `Refreshable` provides access to both the current value and also ways to be notified of changes made in the
17//! future. Users can *subscribe* to the refreshable, registering a callback which is invoked whenever the value
18//! changes. Additionally, users can *map* a refreshable of one type to a refreshable of another type, with the new
19//! refreshable being updated based on changes to the original refreshable. For example, a caching component of a
20//! service may map a `Refreshable<ServiceConfiguration>` down to a `Refreshable<CacheConfiguration>` so it can
21//! subscribe specifically to only the configuration changes that matter to it.
22//!
23//! A `Subscription` is returned when subscribing to a refreshable which acts as a guard type, unregistering the
24//! subscription when dropped. If you intend the subscription to last for the lifetime of the refreshable, you can use
25//! the `Subscription::leak` method to allow the `Subscription` to fall out of scope without unregistering.
26//!
27//! A `RefreshHandle` is returned when creating a new `Refreshable` which is used to update its value. Subscriptions
28//! are fallible, and all errors encountered when running subscriptions in response to an update are reported through
29//! the `RefreshHandle::refresh` method.
30//!
31//! # Examples
32//!
33//! ```
34//! use refreshable::Refreshable;
35//!
36//! #[derive(PartialEq)]
37//! struct ServiceConfiguration {
38//!     cache: CacheConfiguration,
39//!     some_other_thing: u32,
40//! }
41//!
42//! #[derive(PartialEq, Clone)]
43//! struct CacheConfiguration {
44//!     size: usize,
45//! }
46//!
47//! let initial_config = ServiceConfiguration {
48//!     cache: CacheConfiguration {
49//!         size: 10,
50//!     },
51//!     some_other_thing: 5,
52//! };
53//! let (refreshable, mut handle) = Refreshable::new(initial_config);
54//!
55//! let cache_refreshable = refreshable.map(|config| config.cache.clone());
56//!
57//! let subscription = cache_refreshable.try_subscribe(|cache| {
58//!     if cache.size == 0 {
59//!         Err("cache size must be positive")
60//!     } else {
61//!         println!("new cache size is {}", cache.size);
62//!         Ok(())
63//!     }
64//! }).unwrap();
65//!
66//! let new_config = ServiceConfiguration {
67//!     cache: CacheConfiguration {
68//!         size: 20,
69//!     },
70//!     some_other_thing: 5,
71//! };
72//! // "new cache size is 20" is printed.
73//! handle.refresh(new_config).unwrap();
74//!
75//! let new_config = ServiceConfiguration {
76//!     cache: CacheConfiguration {
77//!         size: 20,
78//!     },
79//!     some_other_thing: 10,
80//! };
81//! // nothing is printed since the cache configuration did not change.
82//! handle.refresh(new_config).unwrap();
83//!
84//! drop(subscription);
85//! let new_config = ServiceConfiguration {
86//!     cache: CacheConfiguration {
87//!         size: 0,
88//!     },
89//!     some_other_thing: 10,
90//! };
91//! // nothing is printed since the the cache subscription was dropped.
92//! handle.refresh(new_config).unwrap();
93//! ```
94#![doc(html_root_url = "https://docs.rs/refreshable/1")]
95#![warn(clippy::all, missing_docs)]
96
97use arc_swap::ArcSwap;
98use parking_lot::Mutex;
99use std::collections::HashMap;
100use std::marker::PhantomData;
101use std::ops::Deref;
102use std::sync::atomic::{AtomicU64, Ordering};
103use std::sync::Arc;
104
105#[cfg(test)]
106mod test;
107
108trait Cleanup {}
109
110impl<T, E> Cleanup for Subscription<T, E> {}
111
112struct RawCallback<F: ?Sized> {
113    _cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
114    // We need to run failing callbacks again on a refresh even if the value didn't change. Otherwise, you can "lose"
115    // errors when the refreshable update to the same value.
116    ok: bool,
117    callback: F,
118}
119
120type Callback<T, E> = RawCallback<dyn FnMut(&T, &mut Vec<E>) + Sync + Send>;
121
122struct Shared<T, E> {
123    value: ArcSwap<T>,
124    // This guards subscription registration against concurrent refreshes. If that happened, the subscription callback
125    // could miss updates that happen after the subscription is called with the "current" value but before it is
126    // inserted in the callbacks map.
127    update_lock: Mutex<()>,
128    // This is a nested mess of mutexes and arcs to allow us to process callbacks while allowing subscription
129    // modifications. Otherwise, a subscription that tried to unsubscribe itself would deadlock.
130    #[allow(clippy::type_complexity)]
131    callbacks: Mutex<Arc<HashMap<u64, Arc<Mutex<Callback<T, E>>>>>>,
132}
133
134/// A wrapper around a live-refreshable value.
135pub struct Refreshable<T, E> {
136    shared: Arc<Shared<T, E>>,
137    next_id: AtomicU64,
138    // This is used to unsubscribe a mapped refreshable from its parent refreshable. A copy of the Arc is held in the
139    // refreshable itself, along with every subscription of the mapped refreshable. The inner dyn Drop is a Subscription
140    // type.
141    cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
142}
143
144impl<T, E> Refreshable<T, E>
145where
146    T: PartialEq + 'static + Sync + Send,
147    E: 'static,
148{
149    /// Creates a new `Refreshable` with an initial value, returning it along with a `RefreshHandle` used to update it
150    /// with new values.
151    pub fn new(value: T) -> (Refreshable<T, E>, RefreshHandle<T, E>) {
152        let shared = Arc::new(Shared {
153            value: ArcSwap::new(Arc::new(value)),
154            update_lock: Mutex::new(()),
155            callbacks: Mutex::new(Arc::new(HashMap::new())),
156        });
157
158        (
159            Refreshable {
160                shared: shared.clone(),
161                next_id: AtomicU64::new(0),
162                cleanup: None,
163            },
164            RefreshHandle { shared },
165        )
166    }
167
168    /// Returns a guard type providing access to a snapshot of the refreshable's current value.
169    #[inline]
170    pub fn get(&self) -> Guard<'_, T> {
171        Guard {
172            inner: self.shared.value.load(),
173            _p: PhantomData,
174        }
175    }
176
177    /// Subscribes to the refreshable with an infallible callback.
178    ///
179    /// The callback will be invoked every time the refreshable's value changes, and is also called synchronously when
180    /// this method is called with the current value.
181    pub fn subscribe<F>(&self, mut callback: F) -> Subscription<T, E>
182    where
183        F: FnMut(&T) + 'static + Sync + Send,
184    {
185        self.try_subscribe(move |value| {
186            callback(value);
187            Ok(())
188        })
189        .ok()
190        .unwrap()
191    }
192
193    /// Subscribes to the refreshable with a fallible callback.
194    ///
195    /// The callback will be invoked every time the refreshable's value changes, and is also called synchronously when
196    /// this method is called with the current value. If the callback returns `Ok`, a `Subscription` object is returned
197    /// that will unsubscribe from the refreshable when it drops. If the callback returns `Err`, this method will return
198    /// the error and the callback will *not* be invoked on updates to the value. Errors in subsequent invocations will
199    /// be propagated to the originating [`RefreshHandle::refresh`] call.
200    pub fn try_subscribe<F>(&self, mut callback: F) -> Result<Subscription<T, E>, E>
201    where
202        F: FnMut(&T) -> Result<(), E> + 'static + Sync + Send,
203    {
204        let _guard = self.shared.update_lock.lock();
205        callback(&self.get())?;
206
207        let subscription = self.subscribe_raw(move |value, errors| {
208            if let Err(e) = callback(value) {
209                errors.push(e);
210            }
211        });
212
213        Ok(subscription)
214    }
215
216    fn subscribe_raw<F>(&self, callback: F) -> Subscription<T, E>
217    where
218        F: FnMut(&T, &mut Vec<E>) + 'static + Sync + Send,
219    {
220        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
221        let callback = Arc::new(Mutex::new(RawCallback {
222            _cleanup: self.cleanup.clone(),
223            ok: true,
224            callback,
225        }));
226        Arc::make_mut(&mut *self.shared.callbacks.lock()).insert(id, callback);
227
228        Subscription {
229            shared: self.shared.clone(),
230            id,
231            live: true,
232        }
233    }
234
235    /// Creates a new refreshable from this one by applying a mapping function to the value.
236    ///
237    /// This can be used to narrow the scope of the refreshable value. Updates to the initial refreshable value will
238    /// propagate to the mapped refreshable value, but the mapped refreshable's subscriptions will only be invoked if
239    /// the mapped value actually changed.
240    pub fn map<F, R>(&self, mut map: F) -> Refreshable<R, E>
241    where
242        F: FnMut(&T) -> R + 'static + Sync + Send,
243        R: PartialEq + 'static + Sync + Send,
244    {
245        self.try_map(move |v| Ok(map(v))).ok().unwrap()
246    }
247
248    /// Creates a new refreshable from this one by applying a fallible mapping function to the value.
249    ///
250    /// This can be used to narrow the scope of the refreshable value. Updates to the initial refreshable value will
251    /// propagate to the mapped refreshable value, but the mapped refreshable's subscriptions will only be invoked if
252    /// the mapped value actually changed.
253    pub fn try_map<F, R>(&self, mut map: F) -> Result<Refreshable<R, E>, E>
254    where
255        F: FnMut(&T) -> Result<R, E> + 'static + Sync + Send,
256        R: PartialEq + 'static + Sync + Send,
257    {
258        let _guard = self.shared.update_lock.lock();
259        let (mut refreshable, mut handle) = Refreshable::new(map(&self.get())?);
260        let subscription = self.subscribe_raw(move |value, errors| match map(value) {
261            Ok(value) => handle.refresh_raw(value, errors),
262            Err(e) => errors.push(e),
263        });
264        refreshable.cleanup = Some(Arc::new(subscription));
265        Ok(refreshable)
266    }
267}
268
269/// A subscription to a `Refreshable` value.
270///
271/// The associated subscription is unregistered when this value is dropped, unless the `Subscription::leak` method is
272/// used.
273#[must_use = "the associated subscription is unregistered when this value is dropped"]
274pub struct Subscription<T, E> {
275    shared: Arc<Shared<T, E>>,
276    id: u64,
277    live: bool,
278}
279
280impl<T, E> Drop for Subscription<T, E> {
281    fn drop(&mut self) {
282        if self.live {
283            Arc::make_mut(&mut *self.shared.callbacks.lock()).remove(&self.id);
284        }
285    }
286}
287
288impl<T, E> Subscription<T, E> {
289    /// Destroys the guard without unregistering its associated subscription.
290    pub fn leak(mut self) {
291        self.live = false;
292    }
293}
294
295/// A handle that can update the value associated with a refreshable.
296pub struct RefreshHandle<T, E> {
297    shared: Arc<Shared<T, E>>,
298}
299
300impl<T, E> RefreshHandle<T, E>
301where
302    T: PartialEq + 'static + Sync + Send,
303{
304    /// Updates the refreshable's value.
305    ///
306    /// If the new value is equal to the refreshable's current value, the method returns immediately. Otherwise, it
307    /// runs all registered subscriptions, collecting any errors and returning them all when finished.
308    // NB: It's important that this takes &mut self. That way, all the way down through the tree of mapped refreshables,
309    // we don't need to worry about concurrent refreshes.
310    pub fn refresh(&mut self, new_value: T) -> Result<(), Vec<E>> {
311        let mut errors = vec![];
312
313        self.refresh_raw(new_value, &mut errors);
314
315        if errors.is_empty() {
316            Ok(())
317        } else {
318            Err(errors)
319        }
320    }
321
322    fn refresh_raw(&mut self, new_value: T, errors: &mut Vec<E>) {
323        // We could avoid updating the inner value when it hasn't changed but the complexity doesn't seem worth it.
324        let value_changed = new_value != **self.shared.value.load();
325
326        let guard = self.shared.update_lock.lock();
327        self.shared.value.store(Arc::new(new_value));
328        let value = self.shared.value.load();
329        let callbacks = self.shared.callbacks.lock().clone();
330        drop(guard);
331
332        for callback in callbacks.values() {
333            let mut callback = callback.lock();
334            if value_changed || !callback.ok {
335                let nerrors = errors.len();
336                (callback.callback)(&value, errors);
337                callback.ok = errors.len() == nerrors;
338            }
339        }
340    }
341}
342
343/// A guard type providing access to a snapshot of a refreshable's current value.
344pub struct Guard<'a, T> {
345    inner: arc_swap::Guard<Arc<T>>,
346    // the arc_swap guard doesn't borrow from its ArcSwap, but we don't want to expose that fact in the public API
347    _p: PhantomData<&'a ()>,
348}
349
350impl<T> Deref for Guard<'_, T> {
351    type Target = T;
352
353    #[inline]
354    fn deref(&self) -> &T {
355        &self.inner
356    }
357}