bonsaidb_local/database/
pubsub.rs1use bonsaidb_core::arc_bytes::OwnedBytes;
2pub use bonsaidb_core::circulate::Relay;
3use bonsaidb_core::connection::{Connection, HasSession};
4use bonsaidb_core::permissions::bonsai::{
5 database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction, PubSubAction,
6};
7use bonsaidb_core::pubsub::{self, database_topic, PubSub, Receiver};
8use bonsaidb_core::{circulate, Error};
9
10use crate::{Database, DatabaseNonBlocking};
11
12impl PubSub for super::Database {
13 type Subscriber = Subscriber;
14
15 fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
16 self.check_permission(
17 database_resource_name(self.name()),
18 &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
19 )?;
20 Ok(self
21 .storage()
22 .instance
23 .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
24 }
25
26 fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
27 self.check_permission(
28 pubsub_topic_resource_name(self.name(), &topic),
29 &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
30 )?;
31 self.storage
32 .instance
33 .relay()
34 .publish_raw(database_topic(&self.data.name, &topic), payload);
35 Ok(())
36 }
37
38 fn publish_bytes_to_all(
39 &self,
40 topics: impl IntoIterator<Item = Vec<u8>> + Send,
41 payload: Vec<u8>,
42 ) -> Result<(), bonsaidb_core::Error> {
43 self.storage.instance.relay().publish_raw_to_all(
44 topics
45 .into_iter()
46 .map(|topic| {
47 self.check_permission(
48 pubsub_topic_resource_name(self.name(), &topic),
49 &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
50 )
51 .map(|_| OwnedBytes::from(database_topic(&self.data.name, &topic)))
52 })
53 .collect::<Result<Vec<_>, _>>()?,
54 payload,
55 );
56 Ok(())
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct Subscriber {
63 pub(crate) id: u64,
64 pub(crate) database: Database,
65 pub(crate) subscriber: circulate::Subscriber,
66 pub(crate) receiver: Receiver,
67}
68
69impl Subscriber {
70 #[must_use]
72 pub const fn id(&self) -> u64 {
73 self.id
74 }
75}
76
77impl Drop for Subscriber {
78 fn drop(&mut self) {
79 self.database.storage().instance.unregister_subscriber(self);
80 }
81}
82
83impl pubsub::Subscriber for Subscriber {
84 fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
85 self.database.check_permission(
86 pubsub_topic_resource_name(self.database.name(), &topic),
87 &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
88 )?;
89 self.subscriber
90 .subscribe_to_raw(database_topic(self.database.name(), &topic));
91 Ok(())
92 }
93
94 fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
95 self.database.check_permission(
96 pubsub_topic_resource_name(self.database.name(), topic),
97 &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
98 )?;
99 self.subscriber
100 .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
101 Ok(())
102 }
103
104 fn receiver(&self) -> &Receiver {
105 &self.receiver
106 }
107}