bonsaidb_server/server/
database.rs

1use std::ops::Deref;
2
3use async_trait::async_trait;
4use bonsaidb_core::connection::{
5    AccessPolicy, AsyncLowLevelConnection, HasSchema, HasSession, Range, SerializedQueryKey, Sort,
6};
7use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
8use bonsaidb_core::keyvalue::AsyncKeyValue;
9use bonsaidb_core::permissions::Permissions;
10use bonsaidb_core::pubsub::AsyncPubSub;
11use bonsaidb_core::schema::view::map::MappedSerializedValue;
12use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
13use bonsaidb_core::transaction::{OperationResult, Transaction};
14use bonsaidb_local::{AsyncDatabase, Database};
15use derive_where::derive_where;
16
17use crate::{Backend, CustomServer, NoBackend};
18
19/// A database belonging to a [`CustomServer`].
20#[derive_where(Debug, Clone)]
21pub struct ServerDatabase<B: Backend = NoBackend> {
22    pub(crate) server: CustomServer<B>,
23    pub(crate) db: AsyncDatabase,
24}
25
26impl<B: Backend> From<ServerDatabase<B>> for Database {
27    fn from(server: ServerDatabase<B>) -> Self {
28        Self::from(server.db)
29    }
30}
31
32impl<'a, B: Backend> From<&'a ServerDatabase<B>> for Database {
33    fn from(server: &'a ServerDatabase<B>) -> Self {
34        Self::from(server.db.clone())
35    }
36}
37
38impl<B: Backend> ServerDatabase<B> {
39    /// Restricts an unauthenticated instance to having `effective_permissions`.
40    /// Returns `None` if a session has already been established.
41    #[must_use]
42    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
43        self.db
44            .with_effective_permissions(effective_permissions)
45            .map(|db| Self {
46                db,
47                server: self.server.clone(),
48            })
49    }
50}
51
52impl<B: Backend> Deref for ServerDatabase<B> {
53    type Target = AsyncDatabase;
54
55    fn deref(&self) -> &Self::Target {
56        &self.db
57    }
58}
59
60/// Uses `CustomServer`'s `PubSub` relay.
61#[async_trait]
62impl<B: Backend> AsyncPubSub for ServerDatabase<B> {
63    type Subscriber = bonsaidb_local::Subscriber;
64
65    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
66        let subscriber = self.db.create_subscriber().await?;
67        Ok(subscriber)
68    }
69
70    async fn publish_bytes(
71        &self,
72        topic: Vec<u8>,
73        payload: Vec<u8>,
74    ) -> Result<(), bonsaidb_core::Error> {
75        self.db.publish_bytes(topic, payload).await
76    }
77
78    async fn publish_bytes_to_all(
79        &self,
80        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
81        payload: Vec<u8>,
82    ) -> Result<(), bonsaidb_core::Error> {
83        self.db.publish_bytes_to_all(topics, payload).await
84    }
85}
86
87impl<B: Backend> HasSession for ServerDatabase<B> {
88    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
89        self.server.session()
90    }
91}
92
93/// Pass-through implementation
94#[async_trait]
95impl<B: Backend> bonsaidb_core::connection::AsyncConnection for ServerDatabase<B> {
96    type Storage = CustomServer<B>;
97
98    fn storage(&self) -> Self::Storage {
99        self.server.clone()
100    }
101
102    async fn list_executed_transactions(
103        &self,
104        starting_id: Option<u64>,
105        result_limit: Option<u32>,
106    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
107        self.db
108            .list_executed_transactions(starting_id, result_limit)
109            .await
110    }
111
112    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
113        self.db.last_transaction_id().await
114    }
115
116    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
117        self.db.compact_collection::<C>().await
118    }
119
120    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
121        self.db.compact().await
122    }
123
124    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
125        self.db.compact_key_value_store().await
126    }
127}
128
129/// Pass-through implementation
130#[async_trait]
131impl<B: Backend> AsyncKeyValue for ServerDatabase<B> {
132    async fn execute_key_operation(
133        &self,
134        op: bonsaidb_core::keyvalue::KeyOperation,
135    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
136        self.db.execute_key_operation(op).await
137    }
138}
139
140#[async_trait]
141impl<B: Backend> AsyncLowLevelConnection for ServerDatabase<B> {
142    async fn get_from_collection(
143        &self,
144        id: DocumentId,
145        collection: &CollectionName,
146    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
147        self.db.get_from_collection(id, collection).await
148    }
149
150    async fn list_from_collection(
151        &self,
152        ids: Range<DocumentId>,
153        order: Sort,
154        limit: Option<u32>,
155        collection: &CollectionName,
156    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
157        self.db
158            .list_from_collection(ids, order, limit, collection)
159            .await
160    }
161
162    async fn list_headers_from_collection(
163        &self,
164        ids: Range<DocumentId>,
165        order: Sort,
166        limit: Option<u32>,
167        collection: &CollectionName,
168    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
169        self.db
170            .list_headers_from_collection(ids, order, limit, collection)
171            .await
172    }
173
174    async fn count_from_collection(
175        &self,
176        ids: Range<DocumentId>,
177        collection: &CollectionName,
178    ) -> Result<u64, bonsaidb_core::Error> {
179        self.db.count_from_collection(ids, collection).await
180    }
181
182    async fn get_multiple_from_collection(
183        &self,
184        ids: &[DocumentId],
185        collection: &CollectionName,
186    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
187        self.db.get_multiple_from_collection(ids, collection).await
188    }
189
190    async fn compact_collection_by_name(
191        &self,
192        collection: CollectionName,
193    ) -> Result<(), bonsaidb_core::Error> {
194        self.db.compact_collection_by_name(collection).await
195    }
196
197    async fn query_by_name(
198        &self,
199        view: &ViewName,
200        key: Option<SerializedQueryKey>,
201        order: Sort,
202        limit: Option<u32>,
203        access_policy: AccessPolicy,
204    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
205        self.db
206            .query_by_name(view, key, order, limit, access_policy)
207            .await
208    }
209
210    async fn query_by_name_with_docs(
211        &self,
212        view: &ViewName,
213        key: Option<SerializedQueryKey>,
214        order: Sort,
215        limit: Option<u32>,
216        access_policy: AccessPolicy,
217    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
218        self.db
219            .query_by_name_with_docs(view, key, order, limit, access_policy)
220            .await
221    }
222
223    async fn reduce_by_name(
224        &self,
225        view: &ViewName,
226        key: Option<SerializedQueryKey>,
227        access_policy: AccessPolicy,
228    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
229        self.db.reduce_by_name(view, key, access_policy).await
230    }
231
232    async fn reduce_grouped_by_name(
233        &self,
234        view: &ViewName,
235        key: Option<SerializedQueryKey>,
236        access_policy: AccessPolicy,
237    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
238        self.db
239            .reduce_grouped_by_name(view, key, access_policy)
240            .await
241    }
242
243    async fn delete_docs_by_name(
244        &self,
245        view: &ViewName,
246        key: Option<SerializedQueryKey>,
247        access_policy: AccessPolicy,
248    ) -> Result<u64, bonsaidb_core::Error> {
249        self.db.delete_docs_by_name(view, key, access_policy).await
250    }
251
252    async fn apply_transaction(
253        &self,
254        transaction: Transaction,
255    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
256        self.db.apply_transaction(transaction).await
257    }
258}
259
260impl<B: Backend> HasSchema for ServerDatabase<B> {
261    fn schematic(&self) -> &Schematic {
262        self.db.schematic()
263    }
264}