async_compatibility_layer/async_primitives/
subscribable_rwlock.rs1use crate::channel::{unbounded, UnboundedReceiver, UnboundedSender};
2use async_lock::{Mutex, RwLock};
3use async_trait::async_trait;
4use std::fmt;
5
6#[async_trait]
8pub trait ReadView<T: Clone> {
9 async fn subscribe(&self) -> UnboundedReceiver<T>;
12 async fn cloned(&self) -> T;
14}
15
16pub trait ThreadedReadView<T: Clone + Sync + Send>:
18 Send + Sync + ReadView<T> + std::fmt::Debug
19{
20}
21
22#[derive(Default)]
24pub struct SubscribableRwLock<T: Clone> {
25 subscribers: Mutex<Vec<UnboundedSender<T>>>,
27 rw_lock: RwLock<T>,
29}
30
31impl<T: Clone + Sync + Send + std::fmt::Debug> ThreadedReadView<T> for SubscribableRwLock<T> {}
32
33#[async_trait]
34impl<T: Clone + Send + Sync> ReadView<T> for SubscribableRwLock<T> {
35 async fn subscribe(&self) -> UnboundedReceiver<T> {
36 let (sender, receiver) = unbounded();
37 self.subscribers.lock().await.push(sender);
38 receiver
39 }
40
41 async fn cloned(&self) -> T {
42 self.rw_lock.read().await.clone()
43 }
44}
45
46impl<T: Clone> SubscribableRwLock<T> {
47 pub fn new(t: T) -> Self {
49 Self {
50 subscribers: Mutex::new(Vec::new()),
51 rw_lock: RwLock::new(t),
52 }
53 }
54
55 pub async fn modify<F>(&self, cb: F)
58 where
59 F: FnOnce(&mut T),
60 {
61 let mut lock = self.rw_lock.write().await;
62 cb(&mut *lock);
63 let result = lock.clone();
64 drop(lock);
65 self.notify_change_subscribers(result).await;
66 }
67
68 async fn notify_change_subscribers(&self, t: T) {
70 let mut lock = self.subscribers.lock().await;
71 let mut idx_to_remove = Vec::new();
72 for (idx, sender) in lock.iter().enumerate() {
73 if sender.send(t.clone()).await.is_err() {
74 idx_to_remove.push(idx);
75 }
76 }
77 for idx in idx_to_remove.into_iter().rev() {
78 lock.remove(idx);
79 }
80 }
81}
82
83impl<T: Copy> SubscribableRwLock<T> {
84 pub async fn copied(&self) -> T {
86 *self.rw_lock.read().await
87 }
88}
89
90impl<T: fmt::Debug + Clone> fmt::Debug for SubscribableRwLock<T> {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 struct Locked;
94 impl fmt::Debug for Locked {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.write_str("<locked>")
97 }
98 }
99
100 match self.rw_lock.try_read() {
101 None => f
102 .debug_struct("SubscribableRwLock")
103 .field("data", &Locked)
104 .finish(),
105 Some(guard) => f
106 .debug_struct("SubscribableRwLock")
107 .field("data", &&*guard)
108 .finish(),
109 }
110 }
111}