1use super::Signal;
2use std;
3use std::fmt;
4use std::pin::Pin;
5use std::marker::Unpin;
6use std::ops::{Deref, DerefMut};
7use std::sync::{Arc, Weak, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::task::{Poll, Waker, Context};
12
13
14#[derive(Debug)]
15pub(crate) struct ChangedWaker {
16 changed: AtomicBool,
17 waker: Mutex<Option<Waker>>,
18}
19
20impl ChangedWaker {
21 pub(crate) fn new() -> Self {
22 Self {
23 changed: AtomicBool::new(true),
24 waker: Mutex::new(None),
25 }
26 }
27
28 pub(crate) fn wake(&self, changed: bool) {
29 let waker = {
30 let mut lock = self.waker.lock().unwrap();
31
32 if changed {
33 self.changed.store(true, Ordering::SeqCst);
34 }
35
36 lock.take()
37 };
38
39 if let Some(waker) = waker {
40 waker.wake();
41 }
42 }
43
44 pub(crate) fn set_waker(&self, cx: &Context) {
45 *self.waker.lock().unwrap() = Some(cx.waker().clone());
46 }
47
48 pub(crate) fn is_changed(&self) -> bool {
49 self.changed.swap(false, Ordering::SeqCst)
50 }
51}
52
53
54#[derive(Debug)]
55struct MutableLockState<A> {
56 value: A,
57 signals: Vec<Weak<ChangedWaker>>,
59}
60
61impl<A> MutableLockState<A> {
62 fn push_signal(&mut self, state: &Arc<ChangedWaker>) {
63 self.signals.push(Arc::downgrade(state));
64 }
65
66 fn notify(&mut self, has_changed: bool) {
67 self.signals.retain(|signal| {
68 if let Some(signal) = signal.upgrade() {
69 signal.wake(has_changed);
70 true
71
72 } else {
73 false
74 }
75 });
76 }
77}
78
79
80#[derive(Debug)]
81struct MutableState<A> {
82 senders: AtomicUsize,
83 lock: RwLock<MutableLockState<A>>,
84}
85
86
87#[derive(Debug)]
88struct MutableSignalState<A> {
89 waker: Arc<ChangedWaker>,
90 state: Arc<MutableState<A>>,
92}
93
94impl<A> MutableSignalState<A> {
95 fn new(state: Arc<MutableState<A>>) -> Self {
96 MutableSignalState {
97 waker: Arc::new(ChangedWaker::new()),
98 state,
99 }
100 }
101
102 fn poll_change<B, F>(&self, cx: &mut Context, f: F) -> Poll<Option<B>> where F: FnOnce(&A) -> B {
103 if self.waker.is_changed() {
104 let value = {
105 let lock = self.state.lock.read().unwrap();
106 f(&lock.value)
107 };
108 Poll::Ready(Some(value))
109
110 } else if self.state.senders.load(Ordering::SeqCst) == 0 {
111 Poll::Ready(None)
112
113 } else {
114 self.waker.set_waker(cx);
115 Poll::Pending
116 }
117 }
118}
119
120
121#[derive(Debug)]
122pub struct MutableLockMut<'a, A> where A: 'a {
123 mutated: bool,
124 lock: RwLockWriteGuard<'a, MutableLockState<A>>,
125}
126
127impl<'a, A> Deref for MutableLockMut<'a, A> {
128 type Target = A;
129
130 #[inline]
131 fn deref(&self) -> &Self::Target {
132 &self.lock.value
133 }
134}
135
136impl<'a, A> DerefMut for MutableLockMut<'a, A> {
137 #[inline]
138 fn deref_mut(&mut self) -> &mut Self::Target {
139 self.mutated = true;
140 &mut self.lock.value
141 }
142}
143
144impl<'a, A> Drop for MutableLockMut<'a, A> {
145 #[inline]
146 fn drop(&mut self) {
147 if self.mutated {
148 self.lock.notify(true);
149 }
150 }
151}
152
153
154#[derive(Debug)]
155pub struct MutableLockRef<'a, A> where A: 'a {
156 lock: RwLockReadGuard<'a, MutableLockState<A>>,
157}
158
159impl<'a, A> Deref for MutableLockRef<'a, A> {
160 type Target = A;
161
162 #[inline]
163 fn deref(&self) -> &Self::Target {
164 &self.lock.value
165 }
166}
167
168
169#[repr(transparent)]
170pub struct ReadOnlyMutable<A>(Arc<MutableState<A>>);
171
172impl<A> ReadOnlyMutable<A> {
173 #[inline]
175 pub fn lock_ref(&self) -> MutableLockRef<A> {
176 MutableLockRef {
177 lock: self.0.lock.read().unwrap(),
178 }
179 }
180
181 fn signal_state(&self) -> MutableSignalState<A> {
182 let signal = MutableSignalState::new(self.0.clone());
183
184 if self.0.senders.load(Ordering::SeqCst) != 0 {
185 self.0.lock.write().unwrap().push_signal(&signal.waker);
186 }
187
188 signal
189 }
190
191 #[inline]
192 pub fn signal_ref<B, F>(&self, f: F) -> MutableSignalRef<A, F> where F: FnMut(&A) -> B {
193 MutableSignalRef(self.signal_state(), f)
194 }
195}
196
197impl<A: Copy> ReadOnlyMutable<A> {
198 #[inline]
199 pub fn get(&self) -> A {
200 self.0.lock.read().unwrap().value
201 }
202
203 #[inline]
204 pub fn signal(&self) -> MutableSignal<A> {
205 MutableSignal(self.signal_state())
206 }
207}
208
209impl<A: Clone> ReadOnlyMutable<A> {
210 #[inline]
211 pub fn get_cloned(&self) -> A {
212 self.0.lock.read().unwrap().value.clone()
213 }
214
215 #[inline]
216 pub fn signal_cloned(&self) -> MutableSignalCloned<A> {
217 MutableSignalCloned(self.signal_state())
218 }
219}
220
221impl<A> Clone for ReadOnlyMutable<A> {
222 #[inline]
223 fn clone(&self) -> Self {
224 ReadOnlyMutable(self.0.clone())
225 }
226}
227
228impl<A> fmt::Debug for ReadOnlyMutable<A> where A: fmt::Debug {
229 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
230 let state = self.0.lock.read().unwrap();
231
232 fmt.debug_tuple("ReadOnlyMutable")
233 .field(&state.value)
234 .finish()
235 }
236}
237
238
239#[repr(transparent)]
240pub struct Mutable<A>(ReadOnlyMutable<A>);
241
242impl<A> Mutable<A> {
243 pub fn new(value: A) -> Self {
245 Self::from(value)
246 }
247
248 #[inline]
249 fn state(&self) -> &Arc<MutableState<A>> {
250 &(self.0).0
251 }
252
253 #[inline]
254 pub fn read_only(&self) -> ReadOnlyMutable<A> {
255 self.0.clone()
256 }
257
258 pub fn replace(&self, value: A) -> A {
259 let mut state = self.state().lock.write().unwrap();
260
261 let value = std::mem::replace(&mut state.value, value);
262
263 state.notify(true);
264
265 value
266 }
267
268 pub fn replace_with<F>(&self, f: F) -> A where F: FnOnce(&mut A) -> A {
269 let mut state = self.state().lock.write().unwrap();
270
271 let new_value = f(&mut state.value);
272 let value = std::mem::replace(&mut state.value, new_value);
273
274 state.notify(true);
275
276 value
277 }
278
279 pub fn swap(&self, other: &Mutable<A>) {
280 let mut state1 = self.state().lock.write().unwrap();
282 let mut state2 = other.state().lock.write().unwrap();
283
284 std::mem::swap(&mut state1.value, &mut state2.value);
285
286 state1.notify(true);
287 state2.notify(true);
288 }
289
290 pub fn set(&self, value: A) {
291 let mut state = self.state().lock.write().unwrap();
292
293 state.value = value;
294
295 state.notify(true);
296 }
297
298 pub fn set_if<F>(&self, value: A, f: F) where F: FnOnce(&A, &A) -> bool {
299 let mut state = self.state().lock.write().unwrap();
300
301 if f(&state.value, &value) {
302 state.value = value;
303 state.notify(true);
304 }
305 }
306
307 pub fn lock_mut(&self) -> MutableLockMut<A> {
311 MutableLockMut {
312 mutated: false,
313 lock: self.state().lock.write().unwrap(),
314 }
315 }
316}
317
318impl<A> From<A> for Mutable<A> {
319 #[inline]
320 fn from(value: A) -> Self {
321 Mutable(ReadOnlyMutable(Arc::new(MutableState {
322 senders: AtomicUsize::new(1),
323 lock: RwLock::new(MutableLockState {
324 value: value.into(),
325 signals: vec![],
326 }),
327 })))
328 }
329}
330
331impl<A> ::std::ops::Deref for Mutable<A> {
332 type Target = ReadOnlyMutable<A>;
333
334 #[inline]
335 fn deref(&self) -> &Self::Target {
336 &self.0
337 }
338}
339
340impl<A: PartialEq> Mutable<A> {
341 #[inline]
342 pub fn set_neq(&self, value: A) {
343 self.set_if(value, PartialEq::ne);
344 }
345}
346
347impl<A> fmt::Debug for Mutable<A> where A: fmt::Debug {
348 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
349 let state = self.state().lock.read().unwrap();
350
351 fmt.debug_tuple("Mutable")
352 .field(&state.value)
353 .finish()
354 }
355}
356
357#[cfg(feature = "serde")]
358impl<T> serde::Serialize for Mutable<T> where T: serde::Serialize {
359 #[inline]
360 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
361 self.state().lock.read().unwrap().value.serialize(serializer)
362 }
363}
364
365#[cfg(feature = "serde")]
366impl<'de, T> serde::Deserialize<'de> for Mutable<T> where T: serde::Deserialize<'de> {
367 #[inline]
368 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer<'de> {
369 T::deserialize(deserializer).map(Mutable::new)
370 }
371}
372
373impl<T: Default> Default for Mutable<T> {
375 #[inline]
376 fn default() -> Self {
377 Mutable::new(Default::default())
378 }
379}
380
381impl<A> Clone for Mutable<A> {
382 #[inline]
383 fn clone(&self) -> Self {
384 self.state().senders.fetch_add(1, Ordering::SeqCst);
385 Mutable(self.0.clone())
386 }
387}
388
389impl<A> Drop for Mutable<A> {
390 #[inline]
391 fn drop(&mut self) {
392 let state = self.state();
393
394 let old_senders = state.senders.fetch_sub(1, Ordering::SeqCst);
395
396 if old_senders == 1 {
397 let mut lock = state.lock.write().unwrap();
398
399 if lock.signals.len() > 0 {
400 lock.notify(false);
401 lock.signals = vec![];
403 }
404 }
405 }
406}
407
408
409#[derive(Debug)]
411#[repr(transparent)]
412#[must_use = "Signals do nothing unless polled"]
413pub struct MutableSignal<A>(MutableSignalState<A>);
414
415impl<A> Unpin for MutableSignal<A> {}
416
417impl<A: Copy> Signal for MutableSignal<A> {
418 type Item = A;
419
420 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
421 self.0.poll_change(cx, |value| *value)
422 }
423}
424
425
426#[derive(Debug)]
428#[must_use = "Signals do nothing unless polled"]
429pub struct MutableSignalRef<A, F>(MutableSignalState<A>, F);
430
431impl<A, F> Unpin for MutableSignalRef<A, F> {}
432
433impl<A, B, F> Signal for MutableSignalRef<A, F> where F: FnMut(&A) -> B {
434 type Item = B;
435
436 fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
437 let this = &mut *self;
438 let state = &this.0;
439 let callback = &mut this.1;
440 state.poll_change(cx, callback)
441 }
442}
443
444
445#[derive(Debug)]
448#[repr(transparent)]
449#[must_use = "Signals do nothing unless polled"]
450pub struct MutableSignalCloned<A>(MutableSignalState<A>);
451
452impl<A> Unpin for MutableSignalCloned<A> {}
453
454impl<A: Clone> Signal for MutableSignalCloned<A> {
455 type Item = A;
456
457 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
459 self.0.poll_change(cx, |value| value.clone())
460 }
461}