cdk_common/pub_sub/
mod.rs1mod error;
23mod pubsub;
24pub mod remote_consumer;
25mod subscriber;
26mod types;
27
28pub use self::error::Error;
29pub use self::pubsub::Pubsub;
30pub use self::subscriber::{Subscriber, SubscriptionRequest};
31pub use self::types::*;
32
33#[cfg(test)]
34mod test {
35 use std::collections::HashMap;
36 use std::sync::{Arc, RwLock};
37
38 use serde::{Deserialize, Serialize};
39
40 use super::subscriber::SubscriptionRequest;
41 use super::{Error, Event, Pubsub, Spec, Subscriber};
42
43 #[derive(Clone, Debug, Serialize, Eq, PartialEq, Deserialize)]
44 pub struct Message {
45 pub foo: u64,
46 pub bar: u64,
47 }
48
49 #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
50 pub enum IndexTest {
51 Foo(u64),
52 Bar(u64),
53 }
54
55 impl Event for Message {
56 type Topic = IndexTest;
57
58 fn get_topics(&self) -> Vec<Self::Topic> {
59 vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)]
60 }
61 }
62
63 pub struct CustomPubSub {
64 pub storage: Arc<RwLock<HashMap<IndexTest, Message>>>,
65 }
66
67 #[async_trait::async_trait]
68 impl Spec for CustomPubSub {
69 type Topic = IndexTest;
70
71 type Event = Message;
72
73 type SubscriptionId = String;
74
75 type Context = ();
76
77 fn new_instance(_context: Self::Context) -> Arc<Self>
78 where
79 Self: Sized,
80 {
81 Arc::new(Self {
82 storage: Default::default(),
83 })
84 }
85
86 async fn fetch_events(
87 self: &Arc<Self>,
88 topics: Vec<<Self::Event as Event>::Topic>,
89 reply_to: Subscriber<Self>,
90 ) where
91 Self: Sized,
92 {
93 let storage = self.storage.read().unwrap();
94
95 for index in topics {
96 if let Some(value) = storage.get(&index) {
97 let _ = reply_to.send(value.clone());
98 }
99 }
100 }
101 }
102
103 #[derive(Debug, Clone)]
104 pub enum SubscriptionReq {
105 Foo(u64),
106 Bar(u64),
107 }
108
109 impl SubscriptionRequest for SubscriptionReq {
110 type Topic = IndexTest;
111
112 type SubscriptionId = String;
113
114 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
115 Ok(vec![match self {
116 SubscriptionReq::Bar(n) => IndexTest::Bar(*n),
117 SubscriptionReq::Foo(n) => IndexTest::Foo(*n),
118 }])
119 }
120
121 fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
122 Arc::new("test".to_owned())
123 }
124 }
125
126 #[derive(Debug, Clone)]
127 pub struct FailingSubscriptionReq;
128
129 impl SubscriptionRequest for FailingSubscriptionReq {
130 type Topic = IndexTest;
131
132 type SubscriptionId = String;
133
134 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
135 Err(Error::ParsingError("intentional failure".to_string()))
136 }
137
138 fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
139 Arc::new("failing-sub".to_owned())
140 }
141 }
142
143 #[tokio::test]
144 async fn delivery_twice_realtime() {
145 let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
146
147 assert_eq!(pubsub.active_subscribers(), 0);
148
149 let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap();
150
151 assert_eq!(pubsub.active_subscribers(), 1);
152
153 let _ = pubsub.publish_now(Message { foo: 2, bar: 1 });
154 let _ = pubsub.publish_now(Message { foo: 2, bar: 2 });
155
156 assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1));
157 assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2));
158 assert!(subscriber.try_recv().is_none());
159
160 drop(subscriber);
161
162 assert_eq!(pubsub.active_subscribers(), 0);
163 }
164
165 #[tokio::test]
166 async fn failed_subscribe_does_not_leak_active_subscribers() {
167 let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
168
169 assert_eq!(pubsub.active_subscribers(), 0);
170
171 let result = pubsub.subscribe(FailingSubscriptionReq);
172
173 assert!(result.is_err());
174 assert_eq!(pubsub.active_subscribers(), 0);
175 }
176
177 #[tokio::test]
178 async fn read_from_storage() {
179 let x = CustomPubSub::new_instance(());
180 let storage = x.storage.clone();
181
182 let pubsub = Pubsub::new(x);
183
184 {
185 let mut s = storage.write().unwrap();
187 s.insert(IndexTest::Bar(2), Message { foo: 3, bar: 2 });
188 }
189
190 let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
191
192 assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3));
194
195 let _ = pubsub.publish_now(Message { foo: 1, bar: 2 });
197 assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1));
198
199 {
200 let mut s = storage.write().unwrap();
202 s.insert(IndexTest::Bar(2), Message { foo: 1, bar: 2 });
203 }
204
205 let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
207 assert_eq!(y.recv().await.map(|x| x.foo), Some(1));
208 }
209}