bonsaidb_client/client/remote_database/
pubsub.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bonsaidb_core::arc_bytes::serde::Bytes;
5use bonsaidb_core::networking::{
6 CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
7};
8use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
9
10use crate::AsyncClient;
11
12#[async_trait]
13impl AsyncPubSub for super::AsyncRemoteDatabase {
14 type Subscriber = AsyncRemoteSubscriber;
15
16 async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
17 let subscriber_id = self
18 .client
19 .send_api_request(&CreateSubscriber {
20 database: self.name.to_string(),
21 })
22 .await?;
23
24 let (sender, receiver) = flume::unbounded();
25 self.client.register_subscriber(subscriber_id, sender);
26 Ok(AsyncRemoteSubscriber {
27 client: self.client.clone(),
28 database: self.name.clone(),
29 id: subscriber_id,
30 receiver: Receiver::new(receiver),
31 #[cfg(not(target_arch = "wasm32"))]
32 tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
33 })
34 }
35
36 async fn publish_bytes(
37 &self,
38 topic: Vec<u8>,
39 payload: Vec<u8>,
40 ) -> Result<(), bonsaidb_core::Error> {
41 self.client
42 .send_api_request(&Publish {
43 database: self.name.to_string(),
44 topic: Bytes::from(topic),
45 payload: Bytes::from(payload),
46 })
47 .await?;
48 Ok(())
49 }
50
51 async fn publish_bytes_to_all(
52 &self,
53 topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
54 payload: Vec<u8>,
55 ) -> Result<(), bonsaidb_core::Error> {
56 let topics = topics.into_iter().map(Bytes::from).collect();
57 self.client
58 .send_api_request(&PublishToAll {
59 database: self.name.to_string(),
60 topics,
61 payload: Bytes::from(payload),
62 })
63 .await?;
64 Ok(())
65 }
66}
67
68#[derive(Debug)]
70pub struct AsyncRemoteSubscriber {
71 pub(crate) client: AsyncClient,
72 pub(crate) database: Arc<String>,
73 pub(crate) id: u64,
74 pub(crate) receiver: Receiver,
75 #[cfg(not(target_arch = "wasm32"))]
76 pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
77}
78
79#[async_trait]
80impl AsyncSubscriber for AsyncRemoteSubscriber {
81 async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
82 self.client
83 .send_api_request(&SubscribeTo {
84 database: self.database.to_string(),
85 subscriber_id: self.id,
86 topic: Bytes::from(topic),
87 })
88 .await?;
89 Ok(())
90 }
91
92 async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
93 self.client
94 .send_api_request(&UnsubscribeFrom {
95 database: self.database.to_string(),
96 subscriber_id: self.id,
97 topic: Bytes::from(topic),
98 })
99 .await?;
100 Ok(())
101 }
102
103 fn receiver(&self) -> &Receiver {
104 &self.receiver
105 }
106}
107
108#[cfg(target_arch = "wasm32")]
109impl Drop for AsyncRemoteSubscriber {
110 fn drop(&mut self) {
111 let client = self.client.clone();
112 let database = self.database.to_string();
113 let subscriber_id = self.id;
114 let drop_future = async move {
115 client
116 .unregister_subscriber_async(database, subscriber_id)
117 .await;
118 };
119 wasm_bindgen_futures::spawn_local(drop_future);
120 }
121}
122
123#[cfg(not(target_arch = "wasm32"))]
124impl Drop for AsyncRemoteSubscriber {
125 fn drop(&mut self) {
126 if let Some(tokio) = &self.tokio {
127 let client = self.client.clone();
128 let database = self.database.to_string();
129 let subscriber_id = self.id;
130 tokio.spawn(async move {
131 client
132 .unregister_subscriber_async(database, subscriber_id)
133 .await;
134 });
135 } else {
136 self.client
137 .unregister_subscriber(self.database.to_string(), self.id);
138 }
139 }
140}