async_map/single_writer_versioned/
mod.rs

1//! The `single_writer_versioned` module implements a versioning mechanism for immutable data-structures
2//! which allows many concurrent readers. All changes to the contained data structure - which, since they
3//! are immtuable, means a creating a new instance - are delegated to a single task, and hence occur
4//! sequentially. Each new update is appended to a linked list of versions, and an atomic integer is used to
5//! indicate which is the latest version, so readers can retrieve the latest version when they read. This integer
6//! acts in place of a lock on the linked list element, so that no actual locks are required and reads can always
7//! proceed without waiting.
8mod private {
9    use std::cell::Cell;
10    use std::ops::Deref;
11    use std::sync::atomic::{AtomicU32, Ordering};
12    use std::sync::Arc;
13
14    pub trait Data: Send + Sync + std::fmt::Debug + 'static {}
15    impl<T: Send + Sync + std::fmt::Debug + 'static> Data for T {}
16
17    pub struct Version<T>
18    where
19        T: Data,
20    {
21        version: u32,
22        data: T,
23        next: Cell<Option<Arc<Version<T>>>>,
24        latest_version: Arc<AtomicU32>,
25    }
26
27    impl<T: Data> std::fmt::Debug for Version<T> {
28        fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result {
29            f.write_fmt(format_args!("Version[{}]", self.version))
30        }
31    }
32
33    impl<T> Version<T>
34    where
35        T: Data,
36    {
37        pub fn initial(data: T) -> (Arc<Version<T>>, Updater<T>) {
38            let initial_version =
39                Arc::new(Version::new_version(data, 0, Arc::new(AtomicU32::new(0))));
40            let updater = Updater {
41                version: initial_version.clone(),
42            };
43            (initial_version, updater)
44        }
45
46        pub fn as_ref(&self) -> &T {
47            &self.data
48        }
49
50        pub fn latest<'a>(self: &'a Arc<Version<T>>) -> Option<&'a Arc<Version<T>>> {
51            let latest_version = self.latest_version.load(Ordering::Acquire);
52
53            if self.version == latest_version {
54                None
55            } else {
56                // This is safe because above we ensured that
57                // a) The requested version exists and is the latest one
58                // b) It is not self, so self must be in the chain after self.
59                Some(self.get_version(latest_version))
60            }
61        }
62
63        /// This method is unsafe because it does not check whether there is a next; it
64        /// should only be called in cases where that check has been performed
65        fn next<'a>(self: &'a Arc<Version<T>>) -> &'a Arc<Version<T>> {
66            unsafe { &*self.next.as_ptr() }.as_ref().unwrap()
67        }
68
69        /// This method is unsafed because it does not check whether the requested version exists,
70        /// and is in the chain following self; it should only be called in cases where that check
71        /// has been performed
72        fn get_version<'a>(self: &'a Arc<Version<T>>, version: u32) -> &'a Arc<Version<T>> {
73            if self.version == version {
74                self
75            } else {
76                self.next().get_version(version)
77            }
78        }
79
80        fn set_next(&self, data: T) -> Result<(Arc<Version<T>>, Updater<T>), T> {
81            let latest_version = self.latest_version.load(Ordering::Acquire);
82
83            // If this instance is not the latest version...
84            if latest_version != self.version {
85                // ...then next must already be set, so return data as
86                return Err(data);
87            }
88
89            let new_version = latest_version + 1;
90
91            let next = Arc::new(Version::new_version(
92                data,
93                new_version,
94                self.latest_version.clone(),
95            ));
96
97            // First, set the next version on self
98            self.next.replace(Some(next.clone()));
99
100            // Now that next has been set we can update the latest version to reflect
101            // the new reality
102            self.latest_version.store(new_version, Ordering::Release);
103            let updater = Updater {
104                version: next.clone(),
105            };
106            Ok((next, updater))
107        }
108
109        fn new_version(data: T, version: u32, latest_version: Arc<AtomicU32>) -> Version<T> {
110            let result = Version {
111                version,
112                data,
113                next: Cell::new(None),
114                latest_version,
115            };
116            result
117        }
118    }
119
120    unsafe impl<T> Send for Version<T> where T: Data {}
121    unsafe impl<T> Sync for Version<T> where T: Data {}
122
123    impl<T> Deref for Version<T>
124    where
125        T: Data,
126    {
127        type Target = T;
128        fn deref(&self) -> &T {
129            &self.data
130        }
131    }
132
133    pub struct Updater<T>
134    where
135        T: Data,
136    {
137        version: Arc<Version<T>>,
138    }
139
140    impl<T> Updater<T>
141    where
142        T: Data,
143    {
144        pub fn update(self, new_data: T) -> (Arc<Version<T>>, Updater<T>) {
145            self.version.set_next(new_data).expect("Illegal State") // Is recovery possible?
146        }
147    }
148
149    #[cfg(test)]
150    mod test {
151        #![allow(mutable_transmutes)]
152        use super::Version;
153        #[test]
154        fn it_creates_sensible_initial() {
155            let version = Version::initial("hello").0;
156            assert_eq!("hello", version.data);
157            assert_eq!(0, version.version);
158        }
159
160        #[test]
161        fn it_accepts_a_next_version() {
162            let (first, _) = Version::initial("hello");
163            let (second, _) = first.set_next("goodbye").unwrap();
164
165            assert_eq!("hello", first.data);
166            assert_eq!(0, first.version);
167            assert_eq!(second.version, first.next().as_ref().version);
168
169            assert_eq!("goodbye", second.data);
170            assert_eq!(1, second.version);
171        }
172
173        #[test]
174        fn it_does_not_update_next_version() {
175            let (first, _) = Version::initial("hello");
176            let (_, _) = first.set_next("goodbye").unwrap();
177            let result = first.set_next("au revoir");
178
179            assert_eq!(true, result.is_err());
180        }
181
182        #[tokio::test]
183        async fn it_can_be_used_across_tasks() {
184            let version = Version::initial("hello").0;
185
186            version.set_next("goodbye").unwrap();
187
188            tokio::task::spawn(async move {
189                assert_eq!("goodbye", version.next().data);
190            })
191            .await
192            .unwrap();
193        }
194
195        #[test]
196        fn latest_returns_none_on_latest() {
197            let first = Version::initial("hello").0;
198
199            assert_eq!(true, first.latest().is_none());
200
201            let second = first.set_next("goodbye").unwrap().0;
202            assert_eq!(true, second.latest().is_none());
203        }
204
205        #[test]
206        fn latest_returns_latest() {
207            let first = Version::initial("hello").0;
208
209            let second = first.set_next("goodbye").unwrap().0;
210            assert_eq!("goodbye", first.latest().unwrap().data);
211
212            let third = second.set_next("servus").unwrap().0;
213            assert_eq!("servus", first.latest().unwrap().data);
214            assert_eq!("servus", second.latest().unwrap().data);
215            assert_eq!(true, third.latest().is_none());
216        }
217
218        #[test]
219        fn updater_updates() {
220            let (first, updater) = Version::initial("hello");
221
222            let (second, updater) = updater.update("goodbye");
223            assert_eq!("goodbye", first.latest().unwrap().data);
224
225            let third = updater.update("servus").0;
226            assert_eq!("servus", first.latest().unwrap().data);
227            assert_eq!("servus", second.latest().unwrap().data);
228            assert_eq!(true, third.latest().is_none());
229        }
230    }
231}
232
233use self::private::{Data, Updater, Version};
234use std::cell::RefCell;
235use std::sync::Arc;
236use tokio::sync::mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender};
237
238pub trait DataUpdater<T>: (FnOnce(&T) -> Option<T>) + Send + 'static
239where
240    T: Data,
241{
242}
243
244impl<T, S: (FnOnce(&T) -> Option<T>) + Send + 'static> DataUpdater<T> for S where T: Data {}
245
246enum VersionedUpdaterAction<T>
247where
248    T: Data,
249{
250    Update(Box<dyn DataUpdater<T>>),
251    Quit,
252}
253
254/// The core structure of this package, which provides synchronous read-access to the latest version and
255/// aysnchronous writes, delegated to the update task.
256///
257/// The struct is not Sync, but it is Send; in order to share between tasks it should be cloned and Sent.
258/// The contained data is not cloned; clones share the same backing linked list of versions and data.
259///
260/// Old versions are not actively purged, but will be dropped as long as there are no more instances holding
261/// that version. This means that instances that are held for a long duration without being accessed will prevent
262/// the old version from being purged. This situation should be avoided.
263#[derive(Clone, Debug)]
264pub struct Versioned<T>
265where
266    T: Data,
267{
268    current_holder: RefCell<Arc<Version<T>>>,
269    update_sender: UnboundedSender<VersionedUpdaterAction<T>>,
270}
271
272/// Quits the updated task backing the data structure. Though not necessary (see notes) it allows
273/// the the data structure to become invalidated for updates, which may speed up dropping of
274/// references to it by acting as a signal to reference holders.
275///
276/// Some notes:
277///
278/// 1. Any previously dispatched updates will be
279/// processed before the map task is quit
280/// 1. As long as references to the Versioned itself, the data will not
281/// be dropped. In other words, this does not free any memory.
282/// 1. Conversely, when all references are dropped the memory and the task
283/// will also be dropped. Thus, quitting is not necessary.
284pub struct Quitter<T>
285where
286    T: Data,
287{
288    update_sender: UnboundedSender<VersionedUpdaterAction<T>>,
289}
290
291impl<T> Quitter<T>
292where
293    T: Data,
294{
295    pub fn quit(self) {
296        if let Err(_) = self.update_sender.send(VersionedUpdaterAction::Quit) {
297            // Probably already quit
298        }
299    }
300}
301
302impl<T> Versioned<T>
303where
304    T: Data,
305{
306    /// Creates the Versioned from the initial data, returning both the Versioned instance
307    /// and a Quitter which can be used to stop the backing update task.
308    pub fn from_initial(data: T) -> (Self, Quitter<T>) {
309        let (initial_version, update_sender) = VersionedUpdater::start_from_initial(data);
310
311        (
312            Versioned {
313                current_holder: RefCell::from(initial_version),
314                update_sender: update_sender.clone(),
315            },
316            Quitter { update_sender },
317        )
318    }
319
320    /// Passes a reference to the latest version of the contained data to the provided
321    /// function and returns it result.
322    ///
323    /// This is the mechanism for read access to the data.
324    pub fn with_latest<U, F: FnOnce(&T) -> U>(&self, action: F) -> U {
325        self.ensure_latest();
326        let the_ref = self.current_holder.borrow();
327        action(&***the_ref)
328    }
329
330    fn ensure_latest(&self) {
331        let current = self.current_holder.borrow();
332
333        if let Some(new_version) = current.latest() {
334            let new_version = new_version.clone();
335            drop(current); // drop existing borrow.
336            self.current_holder.replace(new_version);
337        }
338    }
339
340    /// Allows the data to be upated by passing the latest version to the provided DataUpdater,
341    /// and storing the result if one is provided. The update is delegated to the update task,
342    /// which is also where the DataUpdater will be called.
343    pub fn update(
344        &self,
345        update_fn: Box<dyn DataUpdater<T>>,
346    ) -> Result<(), Box<dyn DataUpdater<T>>> {
347        self.update_sender
348            .send(VersionedUpdaterAction::Update(update_fn))
349            .map_err(|action| match action {
350                mpsc::error::SendError(VersionedUpdaterAction::Update(update_fn)) => update_fn,
351                _ => panic!("Received illegal error"),
352            })
353    }
354}
355
356/// This is the backing task which receives the updates and performs them sequentially.
357struct VersionedUpdater<T>
358where
359    T: Data,
360{
361    current: (Arc<Version<T>>, Updater<T>),
362    update_receiver: UnboundedReceiver<VersionedUpdaterAction<T>>,
363}
364
365impl<T> VersionedUpdater<T>
366where
367    T: Data,
368{
369    fn start_from_initial(
370        data: T,
371    ) -> (Arc<Version<T>>, UnboundedSender<VersionedUpdaterAction<T>>) {
372        let (initial_version, updater) = Version::initial(data);
373
374        let (update_sender, update_receiver) = unbounded_channel();
375
376        let current = (initial_version.clone(), updater);
377
378        VersionedUpdater {
379            current,
380            update_receiver,
381        }
382        .run();
383
384        (initial_version, update_sender)
385    }
386
387    fn run(mut self) {
388        tokio::task::spawn(async move {
389            while let Some(action) = self.update_receiver.recv().await {
390                match action {
391                    VersionedUpdaterAction::Update(update_fn) => {
392                        if let Some(new_data) = update_fn(self.current.0.as_ref().as_ref()) {
393                            self.current = self.current.1.update(new_data);
394                        }
395                    }
396                    VersionedUpdaterAction::Quit => {
397                        break;
398                    }
399                }
400            }
401        });
402    }
403}
404
405#[cfg(test)]
406mod test {
407    use super::*;
408    use std::sync::atomic::{AtomicU32, Ordering};
409
410    #[tokio::test]
411    async fn intial_holds_passed_data() {
412        let versioned = Versioned::from_initial(String::from("Hello")).0;
413
414        versioned.with_latest(|data| assert_eq!("Hello", data));
415    }
416
417    #[tokio::test]
418    async fn updates_are_processed() {
419        let versioned = Versioned::from_initial(String::from("Hello")).0;
420
421        //updates affect versioned
422        versioned
423            .update(Box::new(|old| Some(old.clone() + ", World")))
424            .map_err(|_| ())
425            .expect("Should be ok");
426
427        tokio::task::yield_now().await;
428
429        versioned.with_latest(|data| assert_eq!("Hello, World", data));
430    }
431
432    #[tokio::test]
433    async fn updates_are_shared() {
434        let versioned = Versioned::from_initial(String::from("Hello")).0;
435        let clone = versioned.clone();
436        //updates affect versioned
437        versioned
438            .update(Box::new(|old| Some(old.clone() + ", World")))
439            .map_err(|_| ())
440            .expect("Should be ok");
441
442        tokio::task::yield_now().await;
443
444        versioned.with_latest(|data| assert_eq!("Hello, World", data));
445        clone.with_latest(|data| assert_eq!("Hello, World", data));
446    }
447
448    #[tokio::test]
449    async fn quitter_quits() {
450        let tuple = Versioned::from_initial(String::from("Hello"));
451        let versioned = tuple.0;
452        let quitter = tuple.1;
453
454        //updates affect versioned
455        versioned
456            .update(Box::new(|old| Some(old.clone() + ", World")))
457            .map_err(|_| ())
458            .expect("Should be ok");
459        tokio::task::yield_now().await;
460
461        quitter.quit();
462        tokio::task::yield_now().await;
463
464        let res = versioned.update(Box::new(|old| Some(old.clone() + "! And Moon!")));
465
466        assert_eq!(true, res.is_err());
467
468        tokio::task::yield_now().await;
469        // The second update did not take.
470        versioned.with_latest(|data| assert_eq!("Hello, World", data));
471    }
472
473    #[derive(Debug)]
474    struct TestData {
475        drop_counter: Arc<AtomicU32>,
476    }
477
478    impl Drop for TestData {
479        fn drop(&mut self) {
480            self.drop_counter.fetch_add(1, Ordering::Release);
481        }
482    }
483
484    #[tokio::test]
485    async fn old_versions_are_purged() {
486        let counter = Arc::<AtomicU32>::default();
487        let drop_counter = counter.clone();
488
489        let versioned: Versioned<Arc<TestData>> = Versioned::from_initial(Arc::new(TestData {
490            drop_counter: drop_counter,
491        }))
492        .0;
493        let clone = versioned.clone();
494
495        assert_eq!(0, counter.load(Ordering::Acquire));
496
497        let drop_counter = counter.clone();
498
499        //updates affect versioned
500        versioned
501            .update(Box::new(|_| {
502                Some(Arc::new(TestData {
503                    drop_counter: drop_counter,
504                }))
505            }))
506            .map_err(|_| ())
507            .expect("Should be ok");
508
509        tokio::task::yield_now().await;
510        // update Versioned to latest
511        versioned.with_latest(Box::new(|_: &Arc<TestData>| ()));
512        tokio::task::yield_now().await;
513
514        // Nothing dropped because clone still has initial version
515        assert_eq!(0, counter.load(Ordering::Acquire));
516
517        // update cloned to latest
518        clone.with_latest(Box::new(|_: &Arc<TestData>| ()));
519        tokio::task::yield_now().await;
520        // First verion dropped, everyone has latest
521        assert_eq!(1, counter.load(Ordering::Acquire));
522
523        drop(versioned);
524        drop(clone);
525
526        tokio::task::yield_now().await;
527
528        // all Versioned instances have been dropped, so the update task should have ended
529        // and the latest version dropped too
530        assert_eq!(2, counter.load(Ordering::Acquire));
531    }
532
533    fn any_test<T: std::any::Any + Send + 'static>(func: Box<dyn FnOnce() -> Box<T>>) -> Box<T> {
534        func()
535    }
536
537    #[tokio::test]
538    async fn test_any() {
539        let foo = any_test(Box::new(|| Box::new(String::from("Hello"))));
540
541        assert_eq!("Hello", *foo);
542
543        let bar = any_test(Box::new(|| {
544            Box::new(im::HashMap::new().update("key", "secret"))
545        }));
546
547        assert_eq!("secret", *bar.get("key").unwrap());
548    }
549}