bonsaidb_core/
pubsub.rs

1use async_trait::async_trait;
2use circulate::{flume, Message};
3use serde::Serialize;
4
5use crate::Error;
6
7/// Publishes and Subscribes to messages on topics.
8pub trait PubSub {
9    /// The Subscriber type for this `PubSub` connection.
10    type Subscriber: Subscriber;
11
12    /// Create a new [`Subscriber`] for this relay.
13    fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
14
15    /// Publishes a `payload` to all subscribers of `topic`.
16    fn publish<Topic: Serialize, Payload: Serialize>(
17        &self,
18        topic: &Topic,
19        payload: &Payload,
20    ) -> Result<(), Error> {
21        self.publish_bytes(pot::to_vec(topic)?, pot::to_vec(payload)?)
22    }
23
24    /// Publishes a `payload` to all subscribers of `topic`.
25    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
26
27    /// Publishes a `payload` to all subscribers of all `topics`.
28    fn publish_to_all<
29        'topics,
30        Topics: IntoIterator<Item = &'topics Topic> + 'topics,
31        Topic: Serialize + 'topics,
32        Payload: Serialize,
33    >(
34        &self,
35        topics: Topics,
36        payload: &Payload,
37    ) -> Result<(), Error> {
38        let topics = topics
39            .into_iter()
40            .map(pot::to_vec)
41            .collect::<Result<Vec<_>, _>>()?;
42        self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
43    }
44
45    /// Publishes a `payload` to all subscribers of all `topics`.
46    fn publish_bytes_to_all(
47        &self,
48        topics: impl IntoIterator<Item = Vec<u8>> + Send,
49        payload: Vec<u8>,
50    ) -> Result<(), Error>;
51}
52
53/// A subscriber to one or more topics.
54pub trait Subscriber {
55    /// Subscribe to [`Message`]s published to `topic`.
56    fn subscribe_to<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
57        self.subscribe_to_bytes(pot::to_vec(topic)?)
58    }
59
60    /// Subscribe to [`Message`]s published to `topic`.
61    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
62
63    /// Unsubscribe from [`Message`]s published to `topic`.
64    fn unsubscribe_from<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
65        self.unsubscribe_from_bytes(&pot::to_vec(topic)?)
66    }
67
68    /// Unsubscribe from [`Message`]s published to `topic`.
69    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
70
71    /// Returns the receiver to receive [`Message`]s.
72    fn receiver(&self) -> &Receiver;
73}
74
75/// Publishes and Subscribes to messages on topics.
76#[async_trait]
77pub trait AsyncPubSub: Send + Sync {
78    /// The Subscriber type for this `PubSub` connection.
79    type Subscriber: AsyncSubscriber;
80
81    /// Create a new [`Subscriber`] for this relay.
82    async fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
83
84    /// Publishes a `payload` to all subscribers of `topic`.
85    async fn publish<Topic: Serialize + Send + Sync, Payload: Serialize + Send + Sync>(
86        &self,
87        topic: &Topic,
88        payload: &Payload,
89    ) -> Result<(), Error> {
90        let topic = pot::to_vec(topic)?;
91        let payload = pot::to_vec(payload)?;
92        self.publish_bytes(topic, payload).await
93    }
94
95    /// Publishes a `payload` to all subscribers of `topic`.
96    async fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
97
98    /// Publishes a `payload` to all subscribers of all `topics`.
99    async fn publish_to_all<
100        'topics,
101        Topics: IntoIterator<Item = &'topics Topic> + Send + 'topics,
102        Topic: Serialize + Send + 'topics,
103        Payload: Serialize + Send + Sync,
104    >(
105        &self,
106        topics: Topics,
107        payload: &Payload,
108    ) -> Result<(), Error> {
109        let topics = topics
110            .into_iter()
111            .map(|topic| pot::to_vec(topic))
112            .collect::<Result<Vec<_>, _>>()?;
113        self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
114            .await
115    }
116
117    /// Publishes a `payload` to all subscribers of all `topics`.
118    async fn publish_bytes_to_all(
119        &self,
120        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
121        payload: Vec<u8>,
122    ) -> Result<(), Error>;
123}
124
125/// A subscriber to one or more topics.
126#[async_trait]
127pub trait AsyncSubscriber: Send + Sync {
128    /// Subscribe to [`Message`]s published to `topic`.
129    async fn subscribe_to<Topic: Serialize + Send + Sync>(
130        &self,
131        topic: &Topic,
132    ) -> Result<(), Error> {
133        self.subscribe_to_bytes(pot::to_vec(topic)?).await
134    }
135
136    /// Subscribe to [`Message`]s published to `topic`.
137    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
138
139    /// Unsubscribe from [`Message`]s published to `topic`.
140    async fn unsubscribe_from<Topic: Serialize + Send + Sync>(
141        &self,
142        topic: &Topic,
143    ) -> Result<(), Error> {
144        self.unsubscribe_from_bytes(&pot::to_vec(topic)?).await
145    }
146
147    /// Unsubscribe from [`Message`]s published to `topic`.
148    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
149
150    /// Returns the receiver to receive [`Message`]s.
151    fn receiver(&self) -> &Receiver;
152}
153
154/// Receiver of PubSub [`Message`]s.
155#[derive(Clone, Debug)]
156#[must_use]
157pub struct Receiver {
158    receiver: flume::Receiver<Message>,
159    strip_database: bool,
160}
161
162impl Receiver {
163    #[doc(hidden)]
164    pub fn new_stripping_prefixes(receiver: flume::Receiver<Message>) -> Self {
165        Self {
166            receiver,
167            strip_database: true,
168        }
169    }
170
171    #[doc(hidden)]
172    pub fn new(receiver: flume::Receiver<Message>) -> Self {
173        Self {
174            receiver,
175            strip_database: false,
176        }
177    }
178
179    /// Receive the next [`Message`]. Blocks the current thread until a message
180    /// is available. If the receiver becomes disconnected, an error will be
181    /// returned.
182    pub fn receive(&self) -> Result<Message, Disconnected> {
183        self.receiver
184            .recv()
185            .map(|message| self.remove_database_prefix(message))
186            .map_err(|_| Disconnected)
187    }
188
189    /// Receive the next [`Message`]. Blocks the current task until a new
190    /// message is available. If the receiver becomes disconnected, an error
191    /// will be returned.
192    pub async fn receive_async(&self) -> Result<Message, Disconnected> {
193        self.receiver
194            .recv_async()
195            .await
196            .map(|message| self.remove_database_prefix(message))
197            .map_err(|_| Disconnected)
198    }
199
200    /// Try to receive the next [`Message`]. This function will not block, and
201    /// only returns a message if one is already available.
202    pub fn try_receive(&self) -> Result<Message, TryReceiveError> {
203        self.receiver
204            .try_recv()
205            .map(|message| self.remove_database_prefix(message))
206            .map_err(TryReceiveError::from)
207    }
208
209    fn remove_database_prefix(&self, mut message: Message) -> Message {
210        if self.strip_database {
211            if let Some(database_length) = message.topic.iter().position(|b| b == 0) {
212                message.topic.0.read_bytes(database_length + 1).unwrap();
213            }
214        }
215
216        message
217    }
218}
219
220impl Iterator for Receiver {
221    type Item = Message;
222
223    fn next(&mut self) -> Option<Self::Item> {
224        self.receive().ok()
225    }
226}
227
228/// The [`Receiver`] was disconnected
229#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
230#[error("the receiver is disconnected")]
231pub struct Disconnected;
232
233/// An error occurred trying to receive a message.
234#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
235pub enum TryReceiveError {
236    /// The receiver was disconnected
237    #[error("the receiver is disconnected")]
238    Disconnected,
239    /// No message was avaiable
240    #[error("the receiver was empty")]
241    Empty,
242}
243
244impl From<flume::TryRecvError> for TryReceiveError {
245    fn from(err: flume::TryRecvError) -> Self {
246        match err {
247            flume::TryRecvError::Empty => Self::Empty,
248            flume::TryRecvError::Disconnected => Self::Disconnected,
249        }
250    }
251}
252
253/// Creates a topic for use in a server. This is an internal API, which is why
254/// the documentation is hidden. This is an implementation detail, but both
255/// Client and Server must agree on this format, which is why it lives in core.
256#[doc(hidden)]
257#[must_use]
258pub fn database_topic(database: &str, topic: &[u8]) -> Vec<u8> {
259    let mut namespaced_topic = Vec::with_capacity(database.len() + topic.len() + 1);
260
261    namespaced_topic.extend(database.bytes());
262    namespaced_topic.push(b'\0');
263    namespaced_topic.extend(topic);
264
265    namespaced_topic
266}
267
268/// Expands into a suite of pubsub unit tests using the passed type as the test harness.
269#[cfg(feature = "test-util")]
270#[macro_export]
271macro_rules! define_async_pubsub_test_suite {
272    ($harness:ident) => {
273        #[cfg(test)]
274        mod r#async_pubsub {
275            use $crate::pubsub::{AsyncPubSub, AsyncSubscriber};
276
277            use super::$harness;
278            #[tokio::test]
279            async fn simple_pubsub_test() -> anyhow::Result<()> {
280                let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple).await?;
281                let pubsub = harness.connect().await?;
282                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
283                AsyncSubscriber::subscribe_to(&subscriber, &"mytopic").await?;
284                AsyncPubSub::publish(&pubsub, &"mytopic", &String::from("test")).await?;
285                AsyncPubSub::publish(&pubsub, &"othertopic", &String::from("test")).await?;
286                let receiver = subscriber.receiver().clone();
287                let message = receiver.receive_async().await.expect("No message received");
288                assert_eq!(message.topic::<String>()?, "mytopic");
289                assert_eq!(message.payload::<String>()?, "test");
290                // The message should only be received once.
291                assert!(matches!(
292                    receiver.try_receive(),
293                    Err($crate::pubsub::TryReceiveError::Empty)
294                ));
295                Ok(())
296            }
297
298            #[tokio::test]
299            async fn multiple_subscribers_test() -> anyhow::Result<()> {
300                let harness =
301                    $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)
302                        .await?;
303                let pubsub = harness.connect().await?;
304                let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
305                let subscriber_ab = AsyncPubSub::create_subscriber(&pubsub).await?;
306                AsyncSubscriber::subscribe_to(&subscriber_a, &"a").await?;
307                AsyncSubscriber::subscribe_to(&subscriber_ab, &"a").await?;
308                AsyncSubscriber::subscribe_to(&subscriber_ab, &"b").await?;
309
310                let mut messages_a = Vec::new();
311                let mut messages_ab = Vec::new();
312                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
313                messages_a.push(
314                    subscriber_a
315                        .receiver()
316                        .receive_async()
317                        .await?
318                        .payload::<String>()?,
319                );
320                messages_ab.push(
321                    subscriber_ab
322                        .receiver()
323                        .receive_async()
324                        .await?
325                        .payload::<String>()?,
326                );
327
328                AsyncPubSub::publish(&pubsub, &"b", &String::from("b1")).await?;
329                messages_ab.push(
330                    subscriber_ab
331                        .receiver()
332                        .receive_async()
333                        .await?
334                        .payload::<String>()?,
335                );
336
337                AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
338                messages_a.push(
339                    subscriber_a
340                        .receiver()
341                        .receive_async()
342                        .await?
343                        .payload::<String>()?,
344                );
345                messages_ab.push(
346                    subscriber_ab
347                        .receiver()
348                        .receive_async()
349                        .await?
350                        .payload::<String>()?,
351                );
352
353                assert_eq!(&messages_a[0], "a1");
354                assert_eq!(&messages_a[1], "a2");
355
356                assert_eq!(&messages_ab[0], "a1");
357                assert_eq!(&messages_ab[1], "b1");
358                assert_eq!(&messages_ab[2], "a2");
359
360                Ok(())
361            }
362
363            #[tokio::test]
364            async fn unsubscribe_test() -> anyhow::Result<()> {
365                let harness =
366                    $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe).await?;
367                let pubsub = harness.connect().await?;
368                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
369                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
370
371                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
372                AsyncSubscriber::unsubscribe_from(&subscriber, &"a").await?;
373                AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
374                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
375                AsyncPubSub::publish(&pubsub, &"a", &String::from("a3")).await?;
376
377                // Check subscriber_a for a1 and a2.
378                let message = subscriber.receiver().receive_async().await?;
379                assert_eq!(message.payload::<String>()?, "a1");
380                let message = subscriber.receiver().receive_async().await?;
381                assert_eq!(message.payload::<String>()?, "a3");
382
383                Ok(())
384            }
385
386            #[tokio::test]
387            async fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
388                let harness =
389                    $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup).await?;
390                let pubsub = harness.connect().await?;
391                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
392                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
393
394                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
395                let receiver = subscriber.receiver().clone();
396                drop(subscriber);
397
398                // The receiver should now be disconnected, but after receiving the
399                // first message. For when we're testing network connections, we
400                // need to insert a little delay here to allow the server to process
401                // the drop.
402                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
403
404                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
405
406                let message = receiver.receive_async().await?;
407                assert_eq!(message.payload::<String>()?, "a1");
408                let $crate::pubsub::Disconnected = receiver.receive_async().await.unwrap_err();
409
410                Ok(())
411            }
412
413            #[tokio::test]
414            async fn publish_to_all_test() -> anyhow::Result<()> {
415                let harness =
416                    $harness::new($crate::test_util::HarnessTest::PubSubPublishAll).await?;
417                let pubsub = harness.connect().await?;
418                let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
419                let subscriber_b = AsyncPubSub::create_subscriber(&pubsub).await?;
420                let subscriber_c = AsyncPubSub::create_subscriber(&pubsub).await?;
421                AsyncSubscriber::subscribe_to(&subscriber_a, &"1").await?;
422                AsyncSubscriber::subscribe_to(&subscriber_b, &"1").await?;
423                AsyncSubscriber::subscribe_to(&subscriber_b, &"2").await?;
424                AsyncSubscriber::subscribe_to(&subscriber_c, &"2").await?;
425                AsyncSubscriber::subscribe_to(&subscriber_a, &"3").await?;
426                AsyncSubscriber::subscribe_to(&subscriber_c, &"3").await?;
427
428                AsyncPubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))
429                    .await?;
430
431                // Each subscriber should get "1" twice on separate topics
432                for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
433                    let mut message_topics = Vec::new();
434                    for _ in 0..2_u8 {
435                        let message = subscriber.receiver().receive_async().await?;
436                        assert_eq!(message.payload::<String>()?, "1");
437                        message_topics.push(message.topic.clone());
438                    }
439                    assert!(matches!(
440                        subscriber.receiver().try_receive(),
441                        Err($crate::pubsub::TryReceiveError::Empty)
442                    ));
443                    assert!(message_topics[0] != message_topics[1]);
444                }
445
446                Ok(())
447            }
448        }
449    };
450}
451
452/// Expands into a suite of pubsub unit tests using the passed type as the test harness.
453#[cfg(feature = "test-util")]
454#[macro_export]
455macro_rules! define_blocking_pubsub_test_suite {
456    ($harness:ident) => {
457        #[cfg(test)]
458        mod blocking_pubsub {
459            use $crate::pubsub::{PubSub, Subscriber};
460
461            use super::$harness;
462            #[test]
463            fn simple_pubsub_test() -> anyhow::Result<()> {
464                let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple)?;
465                let pubsub = harness.connect()?;
466                let subscriber = PubSub::create_subscriber(&pubsub)?;
467                Subscriber::subscribe_to(&subscriber, &"mytopic")?;
468                PubSub::publish(&pubsub, &"mytopic", &String::from("test"))?;
469                PubSub::publish(&pubsub, &"othertopic", &String::from("test"))?;
470                let receiver = subscriber.receiver().clone();
471                let message = receiver.receive().expect("No message received");
472                assert_eq!(message.topic::<String>()?, "mytopic");
473                assert_eq!(message.payload::<String>()?, "test");
474                // The message should only be received once.
475                assert!(matches!(
476                    receiver.try_receive(),
477                    Err($crate::pubsub::TryReceiveError::Empty)
478                ));
479                Ok(())
480            }
481
482            #[test]
483            fn multiple_subscribers_test() -> anyhow::Result<()> {
484                let harness =
485                    $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)?;
486                let pubsub = harness.connect()?;
487                let subscriber_a = PubSub::create_subscriber(&pubsub)?;
488                let subscriber_ab = PubSub::create_subscriber(&pubsub)?;
489                Subscriber::subscribe_to(&subscriber_a, &"a")?;
490                Subscriber::subscribe_to(&subscriber_ab, &"a")?;
491                Subscriber::subscribe_to(&subscriber_ab, &"b")?;
492
493                let mut messages_a = Vec::new();
494                let mut messages_ab = Vec::new();
495                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
496                messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
497                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
498
499                PubSub::publish(&pubsub, &"b", &String::from("b1"))?;
500                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
501
502                PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
503                messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
504                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
505
506                assert_eq!(&messages_a[0], "a1");
507                assert_eq!(&messages_a[1], "a2");
508
509                assert_eq!(&messages_ab[0], "a1");
510                assert_eq!(&messages_ab[1], "b1");
511                assert_eq!(&messages_ab[2], "a2");
512
513                Ok(())
514            }
515
516            #[test]
517            fn unsubscribe_test() -> anyhow::Result<()> {
518                let harness = $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe)?;
519                let pubsub = harness.connect()?;
520                let subscriber = PubSub::create_subscriber(&pubsub)?;
521                Subscriber::subscribe_to(&subscriber, &"a")?;
522
523                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
524                Subscriber::unsubscribe_from(&subscriber, &"a")?;
525                PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
526                Subscriber::subscribe_to(&subscriber, &"a")?;
527                PubSub::publish(&pubsub, &"a", &String::from("a3"))?;
528
529                // Check subscriber_a for a1 and a2.
530                let message = subscriber.receiver().receive()?;
531                assert_eq!(message.payload::<String>()?, "a1");
532                let message = subscriber.receiver().receive()?;
533                assert_eq!(message.payload::<String>()?, "a3");
534
535                Ok(())
536            }
537
538            #[test]
539            fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
540                let harness = $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup)?;
541                let pubsub = harness.connect()?;
542                let subscriber = PubSub::create_subscriber(&pubsub)?;
543                Subscriber::subscribe_to(&subscriber, &"a")?;
544
545                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
546                let receiver = subscriber.receiver().clone();
547                drop(subscriber);
548
549                // The receiver should now be disconnected, but after receiving the
550                // first message. For when we're testing network connections, we
551                // need to insert a little delay here to allow the server to process
552                // the drop.
553                std::thread::sleep(std::time::Duration::from_millis(100));
554
555                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
556
557                let message = receiver.receive()?;
558                assert_eq!(message.payload::<String>()?, "a1");
559                let $crate::pubsub::Disconnected = receiver.receive().unwrap_err();
560
561                Ok(())
562            }
563
564            #[test]
565            fn publish_to_all_test() -> anyhow::Result<()> {
566                let harness = $harness::new($crate::test_util::HarnessTest::PubSubPublishAll)?;
567                let pubsub = harness.connect()?;
568                let subscriber_a = PubSub::create_subscriber(&pubsub)?;
569                let subscriber_b = PubSub::create_subscriber(&pubsub)?;
570                let subscriber_c = PubSub::create_subscriber(&pubsub)?;
571                Subscriber::subscribe_to(&subscriber_a, &"1")?;
572                Subscriber::subscribe_to(&subscriber_b, &"1")?;
573                Subscriber::subscribe_to(&subscriber_b, &"2")?;
574                Subscriber::subscribe_to(&subscriber_c, &"2")?;
575                Subscriber::subscribe_to(&subscriber_a, &"3")?;
576                Subscriber::subscribe_to(&subscriber_c, &"3")?;
577
578                PubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))?;
579
580                // Each subscriber should get "1" twice on separate topics
581                for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
582                    let mut message_topics = Vec::new();
583                    for _ in 0..2_u8 {
584                        let message = subscriber.receiver().receive()?;
585                        assert_eq!(message.payload::<String>()?, "1");
586                        message_topics.push(message.topic.clone());
587                    }
588                    assert!(matches!(
589                        subscriber.receiver().try_receive(),
590                        Err($crate::pubsub::TryReceiveError::Empty)
591                    ));
592                    assert!(message_topics[0] != message_topics[1]);
593                }
594
595                Ok(())
596            }
597        }
598    };
599}