1use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
2use std::ops::Deref;
3use std::sync::atomic::AtomicUsize;
4use std::sync::atomic::Ordering::SeqCst;
5use std::sync::Arc;
6
7pub struct MaybeSingle<T> {
8 data: Arc<RwLock<Option<Arc<T>>>>,
9 lock_mutex: Arc<RwLock<()>>,
10 init: fn() -> T,
11 callers: Arc<Mutex<AtomicUsize>>,
12}
13
14impl<T> MaybeSingle<T> {
15 pub fn new(init: fn() -> T) -> Self {
16 MaybeSingle {
17 data: Arc::new(RwLock::new(None)),
18 init,
19 lock_mutex: Arc::new(RwLock::new(())),
20 callers: Arc::new(Mutex::new(AtomicUsize::new(0))),
21 }
22 }
23
24 pub fn data(&self, serial: bool) -> Data<'_, T> {
25 {
26 let lock = self.callers.lock();
27 let callers = lock.load(SeqCst) + 1;
28 lock.store(callers, SeqCst);
29 }
30 let data_arc = {
31 let mut lock = self.data.read();
32
33 lock = if lock.is_none() {
34 drop(lock);
35 {
36 let mut write_lock = self.data.write();
37 if write_lock.is_none() {
38 *write_lock = Some(Arc::new((self.init)()));
40 }
41 }
42 self.data.read()
43 } else {
44 lock
45 };
46
47 match lock.as_ref() {
49 Some(data) => data.clone(),
50 None => panic!("There should always be data here!"),
51 }
52 };
53
54 let (read_lock, write_lock) = if serial {
55 (None, Some(self.lock_mutex.write()))
56 } else {
57 (Some(self.lock_mutex.read()), None)
58 };
59
60 Data {
61 data_arc,
62 data: self.data.clone(),
63 callers: self.callers.clone(),
64 read_lock,
65 write_lock,
66 }
67 }
68}
69
70pub struct Data<'a, T> {
71 data_arc: Arc<T>,
72 data: Arc<RwLock<Option<Arc<T>>>>,
73 #[allow(dead_code)]
74 read_lock: Option<RwLockReadGuard<'a, ()>>,
75 #[allow(dead_code)]
76 write_lock: Option<RwLockWriteGuard<'a, ()>>,
77 callers: Arc<Mutex<AtomicUsize>>,
78}
79
80impl<'a, T> Drop for Data<'a, T> {
81 fn drop(&mut self) {
82 let lock = self.callers.lock();
84 let callers = lock.load(SeqCst) - 1;
85 lock.store(callers, SeqCst);
86
87 if callers == 0 {
88 println!("MaybeSingle --- Dropping DATA ---");
89 let mut data = self.data.write();
90 *data = None;
91 }
92 }
93}
94
95impl<'a, T> Deref for Data<'a, T> {
96 type Target = T;
97
98 fn deref(&self) -> &Self::Target {
99 self.data_arc.as_ref()
100 }
101}
102
103impl<'a, T> AsRef<T> for Data<'a, T> {
104 fn as_ref(&self) -> &T {
105 self.data_arc.as_ref()
106 }
107}
108
109#[cfg(test)]
110mod test {
111
112 use super::*;
113 use rand::{thread_rng, Rng};
114 use std::thread::sleep;
115 use std::time::Duration;
116
117 #[test]
118 fn should_execute_in_parallel() {
119 let maybe: MaybeSingle<()> = MaybeSingle::new(|| {});
120 let maybe = Arc::new(maybe);
121 let mut handles = vec![];
122
123 for i in 0..100 {
124 let maybe = maybe.clone();
125 handles.push(std::thread::spawn(move || {
126 let _data = maybe.data(false);
127 assert!(maybe.data.read().is_some());
128 println!(" exec {} start", i);
129 sleep(Duration::from_nanos(thread_rng().gen_range(0..1000)));
130 println!(" exec {} end", i);
131 }));
132 }
133
134 for handle in handles {
135 let _ = handle.join().unwrap(); }
137
138 assert!(maybe.data.read().is_none());
139 }
140
141 #[test]
142 fn should_execute_serially() {
143 let maybe: MaybeSingle<()> = MaybeSingle::new(|| {});
144 let maybe = Arc::new(maybe);
145 let mut handles = vec![];
146
147 for i in 0..100 {
148 let maybe = maybe.clone();
149 handles.push(std::thread::spawn(move || {
150 let _data = maybe.data(true);
151 assert!(maybe.data.read().is_some());
152 println!(" exec {} start", i);
153 sleep(Duration::from_nanos(thread_rng().gen_range(0..1000)));
154 println!(" exec {} end", i);
155 }));
156 }
157
158 for handle in handles {
159 let _ = handle.join().unwrap(); }
161
162 assert!(maybe.data.read().is_none());
163 }
164}