cdk_common/pub_sub/
subscriber.rs1use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Mutex};
5
6use tokio::sync::mpsc;
7
8use super::pubsub::{SubReceiver, TopicTree};
9use super::{Error, Spec};
10
11pub trait SubscriptionRequest {
13 type Topic;
15
16 type SubscriptionId;
18
19 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
21
22 fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
24}
25
26pub struct ActiveSubscription<S>
28where
29 S: Spec + 'static,
30{
31 id: usize,
32 name: Arc<S::SubscriptionId>,
33 active_subscribers: Arc<AtomicUsize>,
34 topics: TopicTree<S>,
35 subscribed_to: Vec<S::Topic>,
36 receiver: Option<SubReceiver<S>>,
37}
38
39impl<S> ActiveSubscription<S>
40where
41 S: Spec + 'static,
42{
43 pub fn new(
45 id: usize,
46 name: Arc<S::SubscriptionId>,
47 active_subscribers: Arc<AtomicUsize>,
48 topics: TopicTree<S>,
49 subscribed_to: Vec<S::Topic>,
50 receiver: Option<SubReceiver<S>>,
51 ) -> Self {
52 Self {
53 id,
54 name,
55 active_subscribers,
56 subscribed_to,
57 topics,
58 receiver,
59 }
60 }
61
62 pub async fn recv(&mut self) -> Option<S::Event> {
64 self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
65 }
66
67 pub fn try_recv(&mut self) -> Option<S::Event> {
69 self.receiver
70 .as_mut()?
71 .try_recv()
72 .ok()
73 .map(|(_, event)| event)
74 }
75
76 pub fn name(&self) -> &S::SubscriptionId {
78 &self.name
79 }
80}
81
82impl<S> Drop for ActiveSubscription<S>
83where
84 S: Spec + 'static,
85{
86 fn drop(&mut self) {
87 let mut topics = self.topics.write();
89 for index in self.subscribed_to.drain(..) {
90 topics.remove(&(index, self.id));
91 }
92
93 self.active_subscribers
95 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
96 }
97}
98
99#[derive(Debug)]
104pub struct Subscriber<S>
105where
106 S: Spec + 'static,
107{
108 subscription: Arc<S::SubscriptionId>,
109 inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
110 latest: Arc<Mutex<Option<S::Event>>>,
111}
112
113impl<S> Clone for Subscriber<S>
114where
115 S: Spec + 'static,
116{
117 fn clone(&self) -> Self {
118 Self {
119 subscription: self.subscription.clone(),
120 inner: self.inner.clone(),
121 latest: self.latest.clone(),
122 }
123 }
124}
125
126impl<S> Subscriber<S>
127where
128 S: Spec + 'static,
129{
130 pub fn new(
132 subscription: Arc<S::SubscriptionId>,
133 inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
134 ) -> Self {
135 Self {
136 inner: inner.clone(),
137 subscription,
138 latest: Arc::new(Mutex::new(None)),
139 }
140 }
141
142 pub fn send(&self, event: S::Event) {
144 let mut latest = if let Ok(reader) = self.latest.lock() {
145 reader
146 } else {
147 let _ = self.inner.try_send((self.subscription.to_owned(), event));
148 return;
149 };
150
151 if let Some(last_event) = latest.replace(event.clone()) {
152 if last_event == event {
153 return;
154 }
155 }
156
157 let _ = self.inner.try_send((self.subscription.to_owned(), event));
158 }
159}