cdk_common/pub_sub/
subscriber.rs1use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Mutex};
5
6use tokio::sync::mpsc;
7
8use super::pubsub::{SubReceiver, TopicTree};
9use super::{Error, Spec};
10
11pub trait SubscriptionRequest {
13 type Topic;
15
16 type SubscriptionId;
18
19 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
21
22 fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
24}
25
26#[allow(missing_debug_implementations)]
28pub struct ActiveSubscription<S>
29where
30 S: Spec + 'static,
31{
32 id: usize,
33 name: Arc<S::SubscriptionId>,
34 active_subscribers: Arc<AtomicUsize>,
35 topics: TopicTree<S>,
36 subscribed_to: Vec<S::Topic>,
37 receiver: Option<SubReceiver<S>>,
38}
39
40impl<S> ActiveSubscription<S>
41where
42 S: Spec + 'static,
43{
44 pub fn new(
46 id: usize,
47 name: Arc<S::SubscriptionId>,
48 active_subscribers: Arc<AtomicUsize>,
49 topics: TopicTree<S>,
50 subscribed_to: Vec<S::Topic>,
51 receiver: Option<SubReceiver<S>>,
52 ) -> Self {
53 Self {
54 id,
55 name,
56 active_subscribers,
57 subscribed_to,
58 topics,
59 receiver,
60 }
61 }
62
63 pub async fn recv(&mut self) -> Option<S::Event> {
65 self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
66 }
67
68 pub fn try_recv(&mut self) -> Option<S::Event> {
70 self.receiver
71 .as_mut()?
72 .try_recv()
73 .ok()
74 .map(|(_, event)| event)
75 }
76
77 pub fn name(&self) -> &S::SubscriptionId {
79 &self.name
80 }
81}
82
83impl<S> Drop for ActiveSubscription<S>
84where
85 S: Spec + 'static,
86{
87 fn drop(&mut self) {
88 let mut topics = self.topics.write();
90 for index in self.subscribed_to.drain(..) {
91 topics.remove(&(index, self.id));
92 }
93
94 self.active_subscribers
96 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
97 }
98}
99
100#[derive(Debug)]
105pub struct Subscriber<S>
106where
107 S: Spec + 'static,
108{
109 subscription: Arc<S::SubscriptionId>,
110 inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
111 latest: Arc<Mutex<Option<S::Event>>>,
112}
113
114impl<S> Clone for Subscriber<S>
115where
116 S: Spec + 'static,
117{
118 fn clone(&self) -> Self {
119 Self {
120 subscription: self.subscription.clone(),
121 inner: self.inner.clone(),
122 latest: self.latest.clone(),
123 }
124 }
125}
126
127impl<S> Subscriber<S>
128where
129 S: Spec + 'static,
130{
131 pub fn new(
133 subscription: Arc<S::SubscriptionId>,
134 inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
135 ) -> Self {
136 Self {
137 inner: inner.clone(),
138 subscription,
139 latest: Arc::new(Mutex::new(None)),
140 }
141 }
142
143 pub fn send(&self, event: S::Event) {
145 let mut latest = if let Ok(reader) = self.latest.lock() {
146 reader
147 } else {
148 let _ = self.inner.try_send((self.subscription.to_owned(), event));
149 return;
150 };
151
152 if let Some(last_event) = latest.replace(event.clone()) {
153 if last_event == event {
154 return;
155 }
156 }
157
158 let _ = self.inner.try_send((self.subscription.to_owned(), event));
159 }
160}