futures_signals/signal/
mutable.rs

1use super::Signal;
2use std;
3use std::fmt;
4use std::pin::Pin;
5use std::marker::Unpin;
6use std::ops::{Deref, DerefMut};
7// TODO use parking_lot ?
8use std::sync::{Arc, Weak, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
9// TODO use parking_lot ?
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::task::{Poll, Waker, Context};
12
13
14#[derive(Debug)]
15pub(crate) struct ChangedWaker {
16    changed: AtomicBool,
17    waker: Mutex<Option<Waker>>,
18}
19
20impl ChangedWaker {
21    pub(crate) fn new() -> Self {
22        Self {
23            changed: AtomicBool::new(true),
24            waker: Mutex::new(None),
25        }
26    }
27
28    pub(crate) fn wake(&self, changed: bool) {
29        let waker = {
30            let mut lock = self.waker.lock().unwrap();
31
32            if changed {
33                self.changed.store(true, Ordering::SeqCst);
34            }
35
36            lock.take()
37        };
38
39        if let Some(waker) = waker {
40            waker.wake();
41        }
42    }
43
44    pub(crate) fn set_waker(&self, cx: &Context) {
45        *self.waker.lock().unwrap() = Some(cx.waker().clone());
46    }
47
48    pub(crate) fn is_changed(&self) -> bool {
49        self.changed.swap(false, Ordering::SeqCst)
50    }
51}
52
53
54#[derive(Debug)]
55struct MutableLockState<A> {
56    value: A,
57    // TODO use HashMap or BTreeMap instead ?
58    signals: Vec<Weak<ChangedWaker>>,
59}
60
61impl<A> MutableLockState<A> {
62    fn push_signal(&mut self, state: &Arc<ChangedWaker>) {
63        self.signals.push(Arc::downgrade(state));
64    }
65
66    fn notify(&mut self, has_changed: bool) {
67        self.signals.retain(|signal| {
68            if let Some(signal) = signal.upgrade() {
69                signal.wake(has_changed);
70                true
71
72            } else {
73                false
74            }
75        });
76    }
77}
78
79
80#[derive(Debug)]
81struct MutableState<A> {
82    senders: AtomicUsize,
83    lock: RwLock<MutableLockState<A>>,
84}
85
86
87#[derive(Debug)]
88struct MutableSignalState<A> {
89    waker: Arc<ChangedWaker>,
90    // TODO change this to Weak ?
91    state: Arc<MutableState<A>>,
92}
93
94impl<A> MutableSignalState<A> {
95    fn new(state: Arc<MutableState<A>>) -> Self {
96        MutableSignalState {
97            waker: Arc::new(ChangedWaker::new()),
98            state,
99        }
100    }
101
102    fn poll_change<B, F>(&self, cx: &mut Context, f: F) -> Poll<Option<B>> where F: FnOnce(&A) -> B {
103        if self.waker.is_changed() {
104            let value = {
105                let lock = self.state.lock.read().unwrap();
106                f(&lock.value)
107            };
108            Poll::Ready(Some(value))
109
110        } else if self.state.senders.load(Ordering::SeqCst) == 0 {
111            Poll::Ready(None)
112
113        } else {
114            self.waker.set_waker(cx);
115            Poll::Pending
116        }
117    }
118}
119
120
121#[derive(Debug)]
122pub struct MutableLockMut<'a, A> where A: 'a {
123    mutated: bool,
124    lock: RwLockWriteGuard<'a, MutableLockState<A>>,
125}
126
127impl<'a, A> Deref for MutableLockMut<'a, A> {
128    type Target = A;
129
130    #[inline]
131    fn deref(&self) -> &Self::Target {
132        &self.lock.value
133    }
134}
135
136impl<'a, A> DerefMut for MutableLockMut<'a, A> {
137    #[inline]
138    fn deref_mut(&mut self) -> &mut Self::Target {
139        self.mutated = true;
140        &mut self.lock.value
141    }
142}
143
144impl<'a, A> Drop for MutableLockMut<'a, A> {
145    #[inline]
146    fn drop(&mut self) {
147        if self.mutated {
148            self.lock.notify(true);
149        }
150    }
151}
152
153
154#[derive(Debug)]
155pub struct MutableLockRef<'a, A> where A: 'a {
156    lock: RwLockReadGuard<'a, MutableLockState<A>>,
157}
158
159impl<'a, A> Deref for MutableLockRef<'a, A> {
160    type Target = A;
161
162    #[inline]
163    fn deref(&self) -> &Self::Target {
164        &self.lock.value
165    }
166}
167
168
169#[repr(transparent)]
170pub struct ReadOnlyMutable<A>(Arc<MutableState<A>>);
171
172impl<A> ReadOnlyMutable<A> {
173    // TODO return Result ?
174    #[inline]
175    pub fn lock_ref(&self) -> MutableLockRef<A> {
176        MutableLockRef {
177            lock: self.0.lock.read().unwrap(),
178        }
179    }
180
181    fn signal_state(&self) -> MutableSignalState<A> {
182        let signal = MutableSignalState::new(self.0.clone());
183
184        if self.0.senders.load(Ordering::SeqCst) != 0 {
185            self.0.lock.write().unwrap().push_signal(&signal.waker);
186        }
187
188        signal
189    }
190
191    #[inline]
192    pub fn signal_ref<B, F>(&self, f: F) -> MutableSignalRef<A, F> where F: FnMut(&A) -> B {
193        MutableSignalRef(self.signal_state(), f)
194    }
195}
196
197impl<A: Copy> ReadOnlyMutable<A> {
198    #[inline]
199    pub fn get(&self) -> A {
200        self.0.lock.read().unwrap().value
201    }
202
203    #[inline]
204    pub fn signal(&self) -> MutableSignal<A> {
205        MutableSignal(self.signal_state())
206    }
207}
208
209impl<A: Clone> ReadOnlyMutable<A> {
210    #[inline]
211    pub fn get_cloned(&self) -> A {
212        self.0.lock.read().unwrap().value.clone()
213    }
214
215    #[inline]
216    pub fn signal_cloned(&self) -> MutableSignalCloned<A> {
217        MutableSignalCloned(self.signal_state())
218    }
219}
220
221impl<A> Clone for ReadOnlyMutable<A> {
222    #[inline]
223    fn clone(&self) -> Self {
224        ReadOnlyMutable(self.0.clone())
225    }
226}
227
228impl<A> fmt::Debug for ReadOnlyMutable<A> where A: fmt::Debug {
229    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
230        let state = self.0.lock.read().unwrap();
231
232        fmt.debug_tuple("ReadOnlyMutable")
233            .field(&state.value)
234            .finish()
235    }
236}
237
238
239#[repr(transparent)]
240pub struct Mutable<A>(ReadOnlyMutable<A>);
241
242impl<A> Mutable<A> {
243    // TODO should this inline ?
244    pub fn new(value: A) -> Self {
245        Self::from(value)
246    }
247
248    #[inline]
249    fn state(&self) -> &Arc<MutableState<A>> {
250        &(self.0).0
251    }
252
253    #[inline]
254    pub fn read_only(&self) -> ReadOnlyMutable<A> {
255        self.0.clone()
256    }
257
258    pub fn replace(&self, value: A) -> A {
259        let mut state = self.state().lock.write().unwrap();
260
261        let value = std::mem::replace(&mut state.value, value);
262
263        state.notify(true);
264
265        value
266    }
267
268    pub fn replace_with<F>(&self, f: F) -> A where F: FnOnce(&mut A) -> A {
269        let mut state = self.state().lock.write().unwrap();
270
271        let new_value = f(&mut state.value);
272        let value = std::mem::replace(&mut state.value, new_value);
273
274        state.notify(true);
275
276        value
277    }
278
279    pub fn swap(&self, other: &Mutable<A>) {
280        // TODO can this dead lock ?
281        let mut state1 = self.state().lock.write().unwrap();
282        let mut state2 = other.state().lock.write().unwrap();
283
284        std::mem::swap(&mut state1.value, &mut state2.value);
285
286        state1.notify(true);
287        state2.notify(true);
288    }
289
290    pub fn set(&self, value: A) {
291        let mut state = self.state().lock.write().unwrap();
292
293        state.value = value;
294
295        state.notify(true);
296    }
297
298    pub fn set_if<F>(&self, value: A, f: F) where F: FnOnce(&A, &A) -> bool {
299        let mut state = self.state().lock.write().unwrap();
300
301        if f(&state.value, &value) {
302            state.value = value;
303            state.notify(true);
304        }
305    }
306
307    // TODO lots of unit tests to verify that it only notifies when the object is mutated
308    // TODO return Result ?
309    // TODO should this inline ?
310    pub fn lock_mut(&self) -> MutableLockMut<A> {
311        MutableLockMut {
312            mutated: false,
313            lock: self.state().lock.write().unwrap(),
314        }
315    }
316}
317
318impl<A> From<A> for Mutable<A> {
319    #[inline]
320    fn from(value: A) -> Self {
321        Mutable(ReadOnlyMutable(Arc::new(MutableState {
322            senders: AtomicUsize::new(1),
323            lock: RwLock::new(MutableLockState {
324                value: value.into(),
325                signals: vec![],
326            }),
327        })))
328    }
329}
330
331impl<A> ::std::ops::Deref for Mutable<A> {
332    type Target = ReadOnlyMutable<A>;
333
334    #[inline]
335    fn deref(&self) -> &Self::Target {
336        &self.0
337    }
338}
339
340impl<A: PartialEq> Mutable<A> {
341    #[inline]
342    pub fn set_neq(&self, value: A) {
343        self.set_if(value, PartialEq::ne);
344    }
345}
346
347impl<A> fmt::Debug for Mutable<A> where A: fmt::Debug {
348    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
349        let state = self.state().lock.read().unwrap();
350
351        fmt.debug_tuple("Mutable")
352            .field(&state.value)
353            .finish()
354    }
355}
356
357#[cfg(feature = "serde")]
358impl<T> serde::Serialize for Mutable<T> where T: serde::Serialize {
359    #[inline]
360    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
361        self.state().lock.read().unwrap().value.serialize(serializer)
362    }
363}
364
365#[cfg(feature = "serde")]
366impl<'de, T> serde::Deserialize<'de> for Mutable<T> where T: serde::Deserialize<'de> {
367    #[inline]
368    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer<'de> {
369        T::deserialize(deserializer).map(Mutable::new)
370    }
371}
372
373// TODO can this be derived ?
374impl<T: Default> Default for Mutable<T> {
375    #[inline]
376    fn default() -> Self {
377        Mutable::new(Default::default())
378    }
379}
380
381impl<A> Clone for Mutable<A> {
382    #[inline]
383    fn clone(&self) -> Self {
384        self.state().senders.fetch_add(1, Ordering::SeqCst);
385        Mutable(self.0.clone())
386    }
387}
388
389impl<A> Drop for Mutable<A> {
390    #[inline]
391    fn drop(&mut self) {
392        let state = self.state();
393
394        let old_senders = state.senders.fetch_sub(1, Ordering::SeqCst);
395
396        if old_senders == 1 {
397            let mut lock = state.lock.write().unwrap();
398
399            if lock.signals.len() > 0 {
400                lock.notify(false);
401                // TODO is this necessary ?
402                lock.signals = vec![];
403            }
404        }
405    }
406}
407
408
409// TODO remove it from signals when it's dropped
410#[derive(Debug)]
411#[repr(transparent)]
412#[must_use = "Signals do nothing unless polled"]
413pub struct MutableSignal<A>(MutableSignalState<A>);
414
415impl<A> Unpin for MutableSignal<A> {}
416
417impl<A: Copy> Signal for MutableSignal<A> {
418    type Item = A;
419
420    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
421        self.0.poll_change(cx, |value| *value)
422    }
423}
424
425
426// TODO remove it from signals when it's dropped
427#[derive(Debug)]
428#[must_use = "Signals do nothing unless polled"]
429pub struct MutableSignalRef<A, F>(MutableSignalState<A>, F);
430
431impl<A, F> Unpin for MutableSignalRef<A, F> {}
432
433impl<A, B, F> Signal for MutableSignalRef<A, F> where F: FnMut(&A) -> B {
434    type Item = B;
435
436    fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
437        let this = &mut *self;
438        let state = &this.0;
439        let callback = &mut this.1;
440        state.poll_change(cx, callback)
441    }
442}
443
444
445// TODO it should have a single MutableSignal implementation for both Copy and Clone
446// TODO remove it from signals when it's dropped
447#[derive(Debug)]
448#[repr(transparent)]
449#[must_use = "Signals do nothing unless polled"]
450pub struct MutableSignalCloned<A>(MutableSignalState<A>);
451
452impl<A> Unpin for MutableSignalCloned<A> {}
453
454impl<A: Clone> Signal for MutableSignalCloned<A> {
455    type Item = A;
456
457    // TODO code duplication with MutableSignal::poll
458    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
459        self.0.poll_change(cx, |value| value.clone())
460    }
461}