async_compatibility_layer/async_primitives/
subscribable_rwlock.rs

1use crate::channel::{unbounded, UnboundedReceiver, UnboundedSender};
2use async_lock::{Mutex, RwLock};
3use async_trait::async_trait;
4use std::fmt;
5
6/// read only view of [`SubscribableRwLock`]
7#[async_trait]
8pub trait ReadView<T: Clone> {
9    /// subscribe to state changes. Receive
10    /// the updated state upon state change
11    async fn subscribe(&self) -> UnboundedReceiver<T>;
12    /// async clone the internal state and return it
13    async fn cloned(&self) -> T;
14}
15
16/// read view with requirements on being threadsafe
17pub trait ThreadedReadView<T: Clone + Sync + Send>:
18    Send + Sync + ReadView<T> + std::fmt::Debug
19{
20}
21
22/// A [`RwLock`] that can register subscribers to be notified upon state change.
23#[derive(Default)]
24pub struct SubscribableRwLock<T: Clone> {
25    /// A list of subscribers to the rwlock
26    subscribers: Mutex<Vec<UnboundedSender<T>>>,
27    /// The lock holding the state
28    rw_lock: RwLock<T>,
29}
30
31impl<T: Clone + Sync + Send + std::fmt::Debug> ThreadedReadView<T> for SubscribableRwLock<T> {}
32
33#[async_trait]
34impl<T: Clone + Send + Sync> ReadView<T> for SubscribableRwLock<T> {
35    async fn subscribe(&self) -> UnboundedReceiver<T> {
36        let (sender, receiver) = unbounded();
37        self.subscribers.lock().await.push(sender);
38        receiver
39    }
40
41    async fn cloned(&self) -> T {
42        self.rw_lock.read().await.clone()
43    }
44}
45
46impl<T: Clone> SubscribableRwLock<T> {
47    /// create a new [`SubscribableRwLock`]
48    pub fn new(t: T) -> Self {
49        Self {
50            subscribers: Mutex::new(Vec::new()),
51            rw_lock: RwLock::new(t),
52        }
53    }
54
55    /// subscribe to state changes. Receive
56    /// the updated state upon state change
57    pub async fn modify<F>(&self, cb: F)
58    where
59        F: FnOnce(&mut T),
60    {
61        let mut lock = self.rw_lock.write().await;
62        cb(&mut *lock);
63        let result = lock.clone();
64        drop(lock);
65        self.notify_change_subscribers(result).await;
66    }
67
68    /// send subscribers the updated state
69    async fn notify_change_subscribers(&self, t: T) {
70        let mut lock = self.subscribers.lock().await;
71        let mut idx_to_remove = Vec::new();
72        for (idx, sender) in lock.iter().enumerate() {
73            if sender.send(t.clone()).await.is_err() {
74                idx_to_remove.push(idx);
75            }
76        }
77        for idx in idx_to_remove.into_iter().rev() {
78            lock.remove(idx);
79        }
80    }
81}
82
83impl<T: Copy> SubscribableRwLock<T> {
84    /// Return a copy of the current value of `T`
85    pub async fn copied(&self) -> T {
86        *self.rw_lock.read().await
87    }
88}
89
90impl<T: fmt::Debug + Clone> fmt::Debug for SubscribableRwLock<T> {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        /// Helper struct to be shown when the inner mutex is locked.
93        struct Locked;
94        impl fmt::Debug for Locked {
95            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96                f.write_str("<locked>")
97            }
98        }
99
100        match self.rw_lock.try_read() {
101            None => f
102                .debug_struct("SubscribableRwLock")
103                .field("data", &Locked)
104                .finish(),
105            Some(guard) => f
106                .debug_struct("SubscribableRwLock")
107                .field("data", &&*guard)
108                .finish(),
109        }
110    }
111}