1use async_trait::async_trait;
2use circulate::{flume, Message};
3use serde::Serialize;
4
5use crate::Error;
6
7pub trait PubSub {
9 type Subscriber: Subscriber;
11
12 fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
14
15 fn publish<Topic: Serialize, Payload: Serialize>(
17 &self,
18 topic: &Topic,
19 payload: &Payload,
20 ) -> Result<(), Error> {
21 self.publish_bytes(pot::to_vec(topic)?, pot::to_vec(payload)?)
22 }
23
24 fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
26
27 fn publish_to_all<
29 'topics,
30 Topics: IntoIterator<Item = &'topics Topic> + 'topics,
31 Topic: Serialize + 'topics,
32 Payload: Serialize,
33 >(
34 &self,
35 topics: Topics,
36 payload: &Payload,
37 ) -> Result<(), Error> {
38 let topics = topics
39 .into_iter()
40 .map(pot::to_vec)
41 .collect::<Result<Vec<_>, _>>()?;
42 self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
43 }
44
45 fn publish_bytes_to_all(
47 &self,
48 topics: impl IntoIterator<Item = Vec<u8>> + Send,
49 payload: Vec<u8>,
50 ) -> Result<(), Error>;
51}
52
53pub trait Subscriber {
55 fn subscribe_to<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
57 self.subscribe_to_bytes(pot::to_vec(topic)?)
58 }
59
60 fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
62
63 fn unsubscribe_from<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
65 self.unsubscribe_from_bytes(&pot::to_vec(topic)?)
66 }
67
68 fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
70
71 fn receiver(&self) -> &Receiver;
73}
74
75#[async_trait]
77pub trait AsyncPubSub: Send + Sync {
78 type Subscriber: AsyncSubscriber;
80
81 async fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
83
84 async fn publish<Topic: Serialize + Send + Sync, Payload: Serialize + Send + Sync>(
86 &self,
87 topic: &Topic,
88 payload: &Payload,
89 ) -> Result<(), Error> {
90 let topic = pot::to_vec(topic)?;
91 let payload = pot::to_vec(payload)?;
92 self.publish_bytes(topic, payload).await
93 }
94
95 async fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
97
98 async fn publish_to_all<
100 'topics,
101 Topics: IntoIterator<Item = &'topics Topic> + Send + 'topics,
102 Topic: Serialize + Send + 'topics,
103 Payload: Serialize + Send + Sync,
104 >(
105 &self,
106 topics: Topics,
107 payload: &Payload,
108 ) -> Result<(), Error> {
109 let topics = topics
110 .into_iter()
111 .map(|topic| pot::to_vec(topic))
112 .collect::<Result<Vec<_>, _>>()?;
113 self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
114 .await
115 }
116
117 async fn publish_bytes_to_all(
119 &self,
120 topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
121 payload: Vec<u8>,
122 ) -> Result<(), Error>;
123}
124
125#[async_trait]
127pub trait AsyncSubscriber: Send + Sync {
128 async fn subscribe_to<Topic: Serialize + Send + Sync>(
130 &self,
131 topic: &Topic,
132 ) -> Result<(), Error> {
133 self.subscribe_to_bytes(pot::to_vec(topic)?).await
134 }
135
136 async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
138
139 async fn unsubscribe_from<Topic: Serialize + Send + Sync>(
141 &self,
142 topic: &Topic,
143 ) -> Result<(), Error> {
144 self.unsubscribe_from_bytes(&pot::to_vec(topic)?).await
145 }
146
147 async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
149
150 fn receiver(&self) -> &Receiver;
152}
153
154#[derive(Clone, Debug)]
156#[must_use]
157pub struct Receiver {
158 receiver: flume::Receiver<Message>,
159 strip_database: bool,
160}
161
162impl Receiver {
163 #[doc(hidden)]
164 pub fn new_stripping_prefixes(receiver: flume::Receiver<Message>) -> Self {
165 Self {
166 receiver,
167 strip_database: true,
168 }
169 }
170
171 #[doc(hidden)]
172 pub fn new(receiver: flume::Receiver<Message>) -> Self {
173 Self {
174 receiver,
175 strip_database: false,
176 }
177 }
178
179 pub fn receive(&self) -> Result<Message, Disconnected> {
183 self.receiver
184 .recv()
185 .map(|message| self.remove_database_prefix(message))
186 .map_err(|_| Disconnected)
187 }
188
189 pub async fn receive_async(&self) -> Result<Message, Disconnected> {
193 self.receiver
194 .recv_async()
195 .await
196 .map(|message| self.remove_database_prefix(message))
197 .map_err(|_| Disconnected)
198 }
199
200 pub fn try_receive(&self) -> Result<Message, TryReceiveError> {
203 self.receiver
204 .try_recv()
205 .map(|message| self.remove_database_prefix(message))
206 .map_err(TryReceiveError::from)
207 }
208
209 fn remove_database_prefix(&self, mut message: Message) -> Message {
210 if self.strip_database {
211 if let Some(database_length) = message.topic.iter().position(|b| b == 0) {
212 message.topic.0.read_bytes(database_length + 1).unwrap();
213 }
214 }
215
216 message
217 }
218}
219
220impl Iterator for Receiver {
221 type Item = Message;
222
223 fn next(&mut self) -> Option<Self::Item> {
224 self.receive().ok()
225 }
226}
227
228#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
230#[error("the receiver is disconnected")]
231pub struct Disconnected;
232
233#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
235pub enum TryReceiveError {
236 #[error("the receiver is disconnected")]
238 Disconnected,
239 #[error("the receiver was empty")]
241 Empty,
242}
243
244impl From<flume::TryRecvError> for TryReceiveError {
245 fn from(err: flume::TryRecvError) -> Self {
246 match err {
247 flume::TryRecvError::Empty => Self::Empty,
248 flume::TryRecvError::Disconnected => Self::Disconnected,
249 }
250 }
251}
252
253#[doc(hidden)]
257#[must_use]
258pub fn database_topic(database: &str, topic: &[u8]) -> Vec<u8> {
259 let mut namespaced_topic = Vec::with_capacity(database.len() + topic.len() + 1);
260
261 namespaced_topic.extend(database.bytes());
262 namespaced_topic.push(b'\0');
263 namespaced_topic.extend(topic);
264
265 namespaced_topic
266}
267
268#[cfg(feature = "test-util")]
270#[macro_export]
271macro_rules! define_async_pubsub_test_suite {
272 ($harness:ident) => {
273 #[cfg(test)]
274 mod r#async_pubsub {
275 use $crate::pubsub::{AsyncPubSub, AsyncSubscriber};
276
277 use super::$harness;
278 #[tokio::test]
279 async fn simple_pubsub_test() -> anyhow::Result<()> {
280 let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple).await?;
281 let pubsub = harness.connect().await?;
282 let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
283 AsyncSubscriber::subscribe_to(&subscriber, &"mytopic").await?;
284 AsyncPubSub::publish(&pubsub, &"mytopic", &String::from("test")).await?;
285 AsyncPubSub::publish(&pubsub, &"othertopic", &String::from("test")).await?;
286 let receiver = subscriber.receiver().clone();
287 let message = receiver.receive_async().await.expect("No message received");
288 assert_eq!(message.topic::<String>()?, "mytopic");
289 assert_eq!(message.payload::<String>()?, "test");
290 assert!(matches!(
292 receiver.try_receive(),
293 Err($crate::pubsub::TryReceiveError::Empty)
294 ));
295 Ok(())
296 }
297
298 #[tokio::test]
299 async fn multiple_subscribers_test() -> anyhow::Result<()> {
300 let harness =
301 $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)
302 .await?;
303 let pubsub = harness.connect().await?;
304 let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
305 let subscriber_ab = AsyncPubSub::create_subscriber(&pubsub).await?;
306 AsyncSubscriber::subscribe_to(&subscriber_a, &"a").await?;
307 AsyncSubscriber::subscribe_to(&subscriber_ab, &"a").await?;
308 AsyncSubscriber::subscribe_to(&subscriber_ab, &"b").await?;
309
310 let mut messages_a = Vec::new();
311 let mut messages_ab = Vec::new();
312 AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
313 messages_a.push(
314 subscriber_a
315 .receiver()
316 .receive_async()
317 .await?
318 .payload::<String>()?,
319 );
320 messages_ab.push(
321 subscriber_ab
322 .receiver()
323 .receive_async()
324 .await?
325 .payload::<String>()?,
326 );
327
328 AsyncPubSub::publish(&pubsub, &"b", &String::from("b1")).await?;
329 messages_ab.push(
330 subscriber_ab
331 .receiver()
332 .receive_async()
333 .await?
334 .payload::<String>()?,
335 );
336
337 AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
338 messages_a.push(
339 subscriber_a
340 .receiver()
341 .receive_async()
342 .await?
343 .payload::<String>()?,
344 );
345 messages_ab.push(
346 subscriber_ab
347 .receiver()
348 .receive_async()
349 .await?
350 .payload::<String>()?,
351 );
352
353 assert_eq!(&messages_a[0], "a1");
354 assert_eq!(&messages_a[1], "a2");
355
356 assert_eq!(&messages_ab[0], "a1");
357 assert_eq!(&messages_ab[1], "b1");
358 assert_eq!(&messages_ab[2], "a2");
359
360 Ok(())
361 }
362
363 #[tokio::test]
364 async fn unsubscribe_test() -> anyhow::Result<()> {
365 let harness =
366 $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe).await?;
367 let pubsub = harness.connect().await?;
368 let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
369 AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
370
371 AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
372 AsyncSubscriber::unsubscribe_from(&subscriber, &"a").await?;
373 AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
374 AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
375 AsyncPubSub::publish(&pubsub, &"a", &String::from("a3")).await?;
376
377 let message = subscriber.receiver().receive_async().await?;
379 assert_eq!(message.payload::<String>()?, "a1");
380 let message = subscriber.receiver().receive_async().await?;
381 assert_eq!(message.payload::<String>()?, "a3");
382
383 Ok(())
384 }
385
386 #[tokio::test]
387 async fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
388 let harness =
389 $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup).await?;
390 let pubsub = harness.connect().await?;
391 let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
392 AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
393
394 AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
395 let receiver = subscriber.receiver().clone();
396 drop(subscriber);
397
398 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
403
404 AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
405
406 let message = receiver.receive_async().await?;
407 assert_eq!(message.payload::<String>()?, "a1");
408 let $crate::pubsub::Disconnected = receiver.receive_async().await.unwrap_err();
409
410 Ok(())
411 }
412
413 #[tokio::test]
414 async fn publish_to_all_test() -> anyhow::Result<()> {
415 let harness =
416 $harness::new($crate::test_util::HarnessTest::PubSubPublishAll).await?;
417 let pubsub = harness.connect().await?;
418 let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
419 let subscriber_b = AsyncPubSub::create_subscriber(&pubsub).await?;
420 let subscriber_c = AsyncPubSub::create_subscriber(&pubsub).await?;
421 AsyncSubscriber::subscribe_to(&subscriber_a, &"1").await?;
422 AsyncSubscriber::subscribe_to(&subscriber_b, &"1").await?;
423 AsyncSubscriber::subscribe_to(&subscriber_b, &"2").await?;
424 AsyncSubscriber::subscribe_to(&subscriber_c, &"2").await?;
425 AsyncSubscriber::subscribe_to(&subscriber_a, &"3").await?;
426 AsyncSubscriber::subscribe_to(&subscriber_c, &"3").await?;
427
428 AsyncPubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))
429 .await?;
430
431 for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
433 let mut message_topics = Vec::new();
434 for _ in 0..2_u8 {
435 let message = subscriber.receiver().receive_async().await?;
436 assert_eq!(message.payload::<String>()?, "1");
437 message_topics.push(message.topic.clone());
438 }
439 assert!(matches!(
440 subscriber.receiver().try_receive(),
441 Err($crate::pubsub::TryReceiveError::Empty)
442 ));
443 assert!(message_topics[0] != message_topics[1]);
444 }
445
446 Ok(())
447 }
448 }
449 };
450}
451
452#[cfg(feature = "test-util")]
454#[macro_export]
455macro_rules! define_blocking_pubsub_test_suite {
456 ($harness:ident) => {
457 #[cfg(test)]
458 mod blocking_pubsub {
459 use $crate::pubsub::{PubSub, Subscriber};
460
461 use super::$harness;
462 #[test]
463 fn simple_pubsub_test() -> anyhow::Result<()> {
464 let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple)?;
465 let pubsub = harness.connect()?;
466 let subscriber = PubSub::create_subscriber(&pubsub)?;
467 Subscriber::subscribe_to(&subscriber, &"mytopic")?;
468 PubSub::publish(&pubsub, &"mytopic", &String::from("test"))?;
469 PubSub::publish(&pubsub, &"othertopic", &String::from("test"))?;
470 let receiver = subscriber.receiver().clone();
471 let message = receiver.receive().expect("No message received");
472 assert_eq!(message.topic::<String>()?, "mytopic");
473 assert_eq!(message.payload::<String>()?, "test");
474 assert!(matches!(
476 receiver.try_receive(),
477 Err($crate::pubsub::TryReceiveError::Empty)
478 ));
479 Ok(())
480 }
481
482 #[test]
483 fn multiple_subscribers_test() -> anyhow::Result<()> {
484 let harness =
485 $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)?;
486 let pubsub = harness.connect()?;
487 let subscriber_a = PubSub::create_subscriber(&pubsub)?;
488 let subscriber_ab = PubSub::create_subscriber(&pubsub)?;
489 Subscriber::subscribe_to(&subscriber_a, &"a")?;
490 Subscriber::subscribe_to(&subscriber_ab, &"a")?;
491 Subscriber::subscribe_to(&subscriber_ab, &"b")?;
492
493 let mut messages_a = Vec::new();
494 let mut messages_ab = Vec::new();
495 PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
496 messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
497 messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
498
499 PubSub::publish(&pubsub, &"b", &String::from("b1"))?;
500 messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
501
502 PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
503 messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
504 messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
505
506 assert_eq!(&messages_a[0], "a1");
507 assert_eq!(&messages_a[1], "a2");
508
509 assert_eq!(&messages_ab[0], "a1");
510 assert_eq!(&messages_ab[1], "b1");
511 assert_eq!(&messages_ab[2], "a2");
512
513 Ok(())
514 }
515
516 #[test]
517 fn unsubscribe_test() -> anyhow::Result<()> {
518 let harness = $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe)?;
519 let pubsub = harness.connect()?;
520 let subscriber = PubSub::create_subscriber(&pubsub)?;
521 Subscriber::subscribe_to(&subscriber, &"a")?;
522
523 PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
524 Subscriber::unsubscribe_from(&subscriber, &"a")?;
525 PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
526 Subscriber::subscribe_to(&subscriber, &"a")?;
527 PubSub::publish(&pubsub, &"a", &String::from("a3"))?;
528
529 let message = subscriber.receiver().receive()?;
531 assert_eq!(message.payload::<String>()?, "a1");
532 let message = subscriber.receiver().receive()?;
533 assert_eq!(message.payload::<String>()?, "a3");
534
535 Ok(())
536 }
537
538 #[test]
539 fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
540 let harness = $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup)?;
541 let pubsub = harness.connect()?;
542 let subscriber = PubSub::create_subscriber(&pubsub)?;
543 Subscriber::subscribe_to(&subscriber, &"a")?;
544
545 PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
546 let receiver = subscriber.receiver().clone();
547 drop(subscriber);
548
549 std::thread::sleep(std::time::Duration::from_millis(100));
554
555 PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
556
557 let message = receiver.receive()?;
558 assert_eq!(message.payload::<String>()?, "a1");
559 let $crate::pubsub::Disconnected = receiver.receive().unwrap_err();
560
561 Ok(())
562 }
563
564 #[test]
565 fn publish_to_all_test() -> anyhow::Result<()> {
566 let harness = $harness::new($crate::test_util::HarnessTest::PubSubPublishAll)?;
567 let pubsub = harness.connect()?;
568 let subscriber_a = PubSub::create_subscriber(&pubsub)?;
569 let subscriber_b = PubSub::create_subscriber(&pubsub)?;
570 let subscriber_c = PubSub::create_subscriber(&pubsub)?;
571 Subscriber::subscribe_to(&subscriber_a, &"1")?;
572 Subscriber::subscribe_to(&subscriber_b, &"1")?;
573 Subscriber::subscribe_to(&subscriber_b, &"2")?;
574 Subscriber::subscribe_to(&subscriber_c, &"2")?;
575 Subscriber::subscribe_to(&subscriber_a, &"3")?;
576 Subscriber::subscribe_to(&subscriber_c, &"3")?;
577
578 PubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))?;
579
580 for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
582 let mut message_topics = Vec::new();
583 for _ in 0..2_u8 {
584 let message = subscriber.receiver().receive()?;
585 assert_eq!(message.payload::<String>()?, "1");
586 message_topics.push(message.topic.clone());
587 }
588 assert!(matches!(
589 subscriber.receiver().try_receive(),
590 Err($crate::pubsub::TryReceiveError::Empty)
591 ));
592 assert!(message_topics[0] != message_topics[1]);
593 }
594
595 Ok(())
596 }
597 }
598 };
599}