maybe_single/
blocking.rs

1use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
2use std::ops::Deref;
3use std::sync::atomic::AtomicUsize;
4use std::sync::atomic::Ordering::SeqCst;
5use std::sync::Arc;
6
7pub struct MaybeSingle<T> {
8    data: Arc<RwLock<Option<Arc<T>>>>,
9    lock_mutex: Arc<RwLock<()>>,
10    init: fn() -> T,
11    callers: Arc<Mutex<AtomicUsize>>,
12}
13
14impl<T> MaybeSingle<T> {
15    pub fn new(init: fn() -> T) -> Self {
16        MaybeSingle {
17            data: Arc::new(RwLock::new(None)),
18            init,
19            lock_mutex: Arc::new(RwLock::new(())),
20            callers: Arc::new(Mutex::new(AtomicUsize::new(0))),
21        }
22    }
23
24    pub fn data(&self, serial: bool) -> Data<'_, T> {
25        {
26            let lock = self.callers.lock();
27            let callers = lock.load(SeqCst) + 1;
28            lock.store(callers, SeqCst);
29        }
30        let data_arc = {
31            let mut lock = self.data.read();
32
33            lock = if lock.is_none() {
34                drop(lock);
35                {
36                    let mut write_lock = self.data.write();
37                    if write_lock.is_none() {
38                        //  println!("--- INIT ---");
39                        *write_lock = Some(Arc::new((self.init)()));
40                    }
41                }
42                self.data.read()
43            } else {
44                lock
45            };
46
47            //println!("---- Exec {}", rnd);
48            match lock.as_ref() {
49                Some(data) => data.clone(),
50                None => panic!("There should always be data here!"),
51            }
52        };
53
54        let (read_lock, write_lock) = if serial {
55            (None, Some(self.lock_mutex.write()))
56        } else {
57            (Some(self.lock_mutex.read()), None)
58        };
59
60        Data {
61            data_arc,
62            data: self.data.clone(),
63            callers: self.callers.clone(),
64            read_lock,
65            write_lock,
66        }
67    }
68}
69
70pub struct Data<'a, T> {
71    data_arc: Arc<T>,
72    data: Arc<RwLock<Option<Arc<T>>>>,
73    #[allow(dead_code)]
74    read_lock: Option<RwLockReadGuard<'a, ()>>,
75    #[allow(dead_code)]
76    write_lock: Option<RwLockWriteGuard<'a, ()>>,
77    callers: Arc<Mutex<AtomicUsize>>,
78}
79
80impl<'a, T> Drop for Data<'a, T> {
81    fn drop(&mut self) {
82        //println!("--- Dropping DATA ---");
83        let lock = self.callers.lock();
84        let callers = lock.load(SeqCst) - 1;
85        lock.store(callers, SeqCst);
86
87        if callers == 0 {
88            println!("MaybeSingle --- Dropping DATA ---");
89            let mut data = self.data.write();
90            *data = None;
91        }
92    }
93}
94
95impl<'a, T> Deref for Data<'a, T> {
96    type Target = T;
97
98    fn deref(&self) -> &Self::Target {
99        self.data_arc.as_ref()
100    }
101}
102
103impl<'a, T> AsRef<T> for Data<'a, T> {
104    fn as_ref(&self) -> &T {
105        self.data_arc.as_ref()
106    }
107}
108
109#[cfg(test)]
110mod test {
111
112    use super::*;
113    use rand::{thread_rng, Rng};
114    use std::thread::sleep;
115    use std::time::Duration;
116
117    #[test]
118    fn should_execute_in_parallel() {
119        let maybe: MaybeSingle<()> = MaybeSingle::new(|| {});
120        let maybe = Arc::new(maybe);
121        let mut handles = vec![];
122
123        for i in 0..100 {
124            let maybe = maybe.clone();
125            handles.push(std::thread::spawn(move || {
126                let _data = maybe.data(false);
127                assert!(maybe.data.read().is_some());
128                println!(" exec {} start", i);
129                sleep(Duration::from_nanos(thread_rng().gen_range(0..1000)));
130                println!(" exec {} end", i);
131            }));
132        }
133
134        for handle in handles {
135            let _ = handle.join().unwrap(); // maybe consider handling errors propagated from the thread here
136        }
137
138        assert!(maybe.data.read().is_none());
139    }
140
141    #[test]
142    fn should_execute_serially() {
143        let maybe: MaybeSingle<()> = MaybeSingle::new(|| {});
144        let maybe = Arc::new(maybe);
145        let mut handles = vec![];
146
147        for i in 0..100 {
148            let maybe = maybe.clone();
149            handles.push(std::thread::spawn(move || {
150                let _data = maybe.data(true);
151                assert!(maybe.data.read().is_some());
152                println!(" exec {} start", i);
153                sleep(Duration::from_nanos(thread_rng().gen_range(0..1000)));
154                println!(" exec {} end", i);
155            }));
156        }
157
158        for handle in handles {
159            let _ = handle.join().unwrap(); // maybe consider handling errors propagated from the thread here
160        }
161
162        assert!(maybe.data.read().is_none());
163    }
164}