bonsaidb_local/database/
pubsub.rs

1use bonsaidb_core::arc_bytes::OwnedBytes;
2pub use bonsaidb_core::circulate::Relay;
3use bonsaidb_core::connection::{Connection, HasSession};
4use bonsaidb_core::permissions::bonsai::{
5    database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction, PubSubAction,
6};
7use bonsaidb_core::pubsub::{self, database_topic, PubSub, Receiver};
8use bonsaidb_core::{circulate, Error};
9
10use crate::{Database, DatabaseNonBlocking};
11
12impl PubSub for super::Database {
13    type Subscriber = Subscriber;
14
15    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
16        self.check_permission(
17            database_resource_name(self.name()),
18            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
19        )?;
20        Ok(self
21            .storage()
22            .instance
23            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
24    }
25
26    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
27        self.check_permission(
28            pubsub_topic_resource_name(self.name(), &topic),
29            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
30        )?;
31        self.storage
32            .instance
33            .relay()
34            .publish_raw(database_topic(&self.data.name, &topic), payload);
35        Ok(())
36    }
37
38    fn publish_bytes_to_all(
39        &self,
40        topics: impl IntoIterator<Item = Vec<u8>> + Send,
41        payload: Vec<u8>,
42    ) -> Result<(), bonsaidb_core::Error> {
43        self.storage.instance.relay().publish_raw_to_all(
44            topics
45                .into_iter()
46                .map(|topic| {
47                    self.check_permission(
48                        pubsub_topic_resource_name(self.name(), &topic),
49                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
50                    )
51                    .map(|_| OwnedBytes::from(database_topic(&self.data.name, &topic)))
52                })
53                .collect::<Result<Vec<_>, _>>()?,
54            payload,
55        );
56        Ok(())
57    }
58}
59
60/// A subscriber for `PubSub` messages.
61#[derive(Debug, Clone)]
62pub struct Subscriber {
63    pub(crate) id: u64,
64    pub(crate) database: Database,
65    pub(crate) subscriber: circulate::Subscriber,
66    pub(crate) receiver: Receiver,
67}
68
69impl Subscriber {
70    /// Returns the unique id of the subscriber.
71    #[must_use]
72    pub const fn id(&self) -> u64 {
73        self.id
74    }
75}
76
77impl Drop for Subscriber {
78    fn drop(&mut self) {
79        self.database.storage().instance.unregister_subscriber(self);
80    }
81}
82
83impl pubsub::Subscriber for Subscriber {
84    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
85        self.database.check_permission(
86            pubsub_topic_resource_name(self.database.name(), &topic),
87            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
88        )?;
89        self.subscriber
90            .subscribe_to_raw(database_topic(self.database.name(), &topic));
91        Ok(())
92    }
93
94    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
95        self.database.check_permission(
96            pubsub_topic_resource_name(self.database.name(), topic),
97            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
98        )?;
99        self.subscriber
100            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
101        Ok(())
102    }
103
104    fn receiver(&self) -> &Receiver {
105        &self.receiver
106    }
107}