1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use std::sync::Arc;

use async_trait::async_trait;
pub use bonsaidb_core::circulate::Relay;
use bonsaidb_core::{
    circulate,
    pubsub::{self, database_topic, PubSub},
    Error,
};

#[async_trait]
impl PubSub for super::Database {
    type Subscriber = Subscriber;

    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
        Ok(Subscriber {
            database_name: self.data.name.to_string(),
            subscriber: self.data.storage.relay().create_subscriber().await,
        })
    }

    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
        &self,
        topic: S,
        payload: &P,
    ) -> Result<(), bonsaidb_core::Error> {
        self.data
            .storage
            .relay()
            .publish(database_topic(&self.data.name, &topic.into()), payload)
            .await?;
        Ok(())
    }

    async fn publish_to_all<P: serde::Serialize + Sync>(
        &self,
        topics: Vec<String>,
        payload: &P,
    ) -> Result<(), bonsaidb_core::Error> {
        self.data
            .storage
            .relay()
            .publish_to_all(
                topics
                    .iter()
                    .map(|topic| database_topic(&self.data.name, topic))
                    .collect(),
                payload,
            )
            .await?;
        Ok(())
    }
}

/// A subscriber for `PubSub` messages.
pub struct Subscriber {
    database_name: String,
    subscriber: circulate::Subscriber,
}

#[async_trait]
impl pubsub::Subscriber for Subscriber {
    async fn subscribe_to<S: Into<String> + Send>(&self, topic: S) -> Result<(), Error> {
        self.subscriber
            .subscribe_to(database_topic(&self.database_name, &topic.into()))
            .await;
        Ok(())
    }

    async fn unsubscribe_from(&self, topic: &str) -> Result<(), Error> {
        let topic = format!("{}\u{0}{}", self.database_name, topic);
        self.subscriber.unsubscribe_from(&topic).await;
        Ok(())
    }

    fn receiver(&self) -> &'_ flume::Receiver<Arc<circulate::Message>> {
        self.subscriber.receiver()
    }
}