bonsaidb_client/client/remote_database/
pubsub.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bonsaidb_core::arc_bytes::serde::Bytes;
5use bonsaidb_core::networking::{
6    CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
7};
8use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
9
10use crate::AsyncClient;
11
12#[async_trait]
13impl AsyncPubSub for super::AsyncRemoteDatabase {
14    type Subscriber = AsyncRemoteSubscriber;
15
16    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
17        let subscriber_id = self
18            .client
19            .send_api_request(&CreateSubscriber {
20                database: self.name.to_string(),
21            })
22            .await?;
23
24        let (sender, receiver) = flume::unbounded();
25        self.client.register_subscriber(subscriber_id, sender);
26        Ok(AsyncRemoteSubscriber {
27            client: self.client.clone(),
28            database: self.name.clone(),
29            id: subscriber_id,
30            receiver: Receiver::new(receiver),
31            #[cfg(not(target_arch = "wasm32"))]
32            tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
33        })
34    }
35
36    async fn publish_bytes(
37        &self,
38        topic: Vec<u8>,
39        payload: Vec<u8>,
40    ) -> Result<(), bonsaidb_core::Error> {
41        self.client
42            .send_api_request(&Publish {
43                database: self.name.to_string(),
44                topic: Bytes::from(topic),
45                payload: Bytes::from(payload),
46            })
47            .await?;
48        Ok(())
49    }
50
51    async fn publish_bytes_to_all(
52        &self,
53        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
54        payload: Vec<u8>,
55    ) -> Result<(), bonsaidb_core::Error> {
56        let topics = topics.into_iter().map(Bytes::from).collect();
57        self.client
58            .send_api_request(&PublishToAll {
59                database: self.name.to_string(),
60                topics,
61                payload: Bytes::from(payload),
62            })
63            .await?;
64        Ok(())
65    }
66}
67
68/// A `PubSub` subscriber from a remote server.
69#[derive(Debug)]
70pub struct AsyncRemoteSubscriber {
71    pub(crate) client: AsyncClient,
72    pub(crate) database: Arc<String>,
73    pub(crate) id: u64,
74    pub(crate) receiver: Receiver,
75    #[cfg(not(target_arch = "wasm32"))]
76    pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
77}
78
79#[async_trait]
80impl AsyncSubscriber for AsyncRemoteSubscriber {
81    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
82        self.client
83            .send_api_request(&SubscribeTo {
84                database: self.database.to_string(),
85                subscriber_id: self.id,
86                topic: Bytes::from(topic),
87            })
88            .await?;
89        Ok(())
90    }
91
92    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
93        self.client
94            .send_api_request(&UnsubscribeFrom {
95                database: self.database.to_string(),
96                subscriber_id: self.id,
97                topic: Bytes::from(topic),
98            })
99            .await?;
100        Ok(())
101    }
102
103    fn receiver(&self) -> &Receiver {
104        &self.receiver
105    }
106}
107
108#[cfg(target_arch = "wasm32")]
109impl Drop for AsyncRemoteSubscriber {
110    fn drop(&mut self) {
111        let client = self.client.clone();
112        let database = self.database.to_string();
113        let subscriber_id = self.id;
114        let drop_future = async move {
115            client
116                .unregister_subscriber_async(database, subscriber_id)
117                .await;
118        };
119        wasm_bindgen_futures::spawn_local(drop_future);
120    }
121}
122
123#[cfg(not(target_arch = "wasm32"))]
124impl Drop for AsyncRemoteSubscriber {
125    fn drop(&mut self) {
126        if let Some(tokio) = &self.tokio {
127            let client = self.client.clone();
128            let database = self.database.to_string();
129            let subscriber_id = self.id;
130            tokio.spawn(async move {
131                client
132                    .unregister_subscriber_async(database, subscriber_id)
133                    .await;
134            });
135        } else {
136            self.client
137                .unregister_subscriber(self.database.to_string(), self.id);
138        }
139    }
140}