1use std::ops::Deref;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use bonsaidb_core::connection::{
6 AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSchema, HasSession, Range,
7 SerializedQueryKey, Session, Sort,
8};
9use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10use bonsaidb_core::networking::{
11 ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
12 GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
13 QueryWithDocs, Reduce, ReduceGrouped,
14};
15use bonsaidb_core::schema::view::map::MappedSerializedValue;
16use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
17use bonsaidb_core::transaction::{Executed, OperationResult, Transaction};
18
19use crate::AsyncClient;
20
21mod pubsub;
22pub use pubsub::*;
23
24mod keyvalue;
25
26#[derive(Debug, Clone)]
28pub struct AsyncRemoteDatabase {
29 pub(crate) client: AsyncClient,
30 pub(crate) name: Arc<String>,
31 pub(crate) schema: Arc<Schematic>,
32}
33impl AsyncRemoteDatabase {
34 #[must_use]
36 pub fn name(&self) -> &str {
37 self.name.as_ref()
38 }
39}
40
41impl Deref for AsyncRemoteDatabase {
42 type Target = AsyncClient;
43
44 fn deref(&self) -> &Self::Target {
45 &self.client
46 }
47}
48
49impl AsyncRemoteDatabase {
50 pub(crate) fn new(client: AsyncClient, name: String, schema: Arc<Schematic>) -> Self {
51 Self {
52 client,
53 name: Arc::new(name),
54 schema,
55 }
56 }
57}
58
59impl HasSession for AsyncRemoteDatabase {
60 fn session(&self) -> Option<&Session> {
61 self.client.session()
62 }
63}
64
65#[async_trait]
66impl AsyncConnection for AsyncRemoteDatabase {
67 type Storage = AsyncClient;
68
69 fn storage(&self) -> Self::Storage {
70 self.client.clone()
71 }
72
73 async fn list_executed_transactions(
74 &self,
75 starting_id: Option<u64>,
76 result_limit: Option<u32>,
77 ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
78 Ok(self
79 .client
80 .send_api_request(&ListExecutedTransactions {
81 database: self.name.to_string(),
82 starting_id,
83 result_limit,
84 })
85 .await?)
86 }
87
88 async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
89 Ok(self
90 .client
91 .send_api_request(&LastTransactionId {
92 database: self.name.to_string(),
93 })
94 .await?)
95 }
96
97 async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
98 self.send_api_request(&Compact {
99 database: self.name.to_string(),
100 })
101 .await?;
102 Ok(())
103 }
104
105 async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
106 self.send_api_request(&CompactKeyValueStore {
107 database: self.name.to_string(),
108 })
109 .await?;
110 Ok(())
111 }
112}
113
114#[async_trait]
115impl AsyncLowLevelConnection for AsyncRemoteDatabase {
116 async fn apply_transaction(
117 &self,
118 transaction: Transaction,
119 ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
120 Ok(self
121 .client
122 .send_api_request(&ApplyTransaction {
123 database: self.name.to_string(),
124 transaction,
125 })
126 .await?)
127 }
128
129 async fn get_from_collection(
130 &self,
131 id: DocumentId,
132 collection: &CollectionName,
133 ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
134 Ok(self
135 .client
136 .send_api_request(&Get {
137 database: self.name.to_string(),
138 collection: collection.clone(),
139 id,
140 })
141 .await?)
142 }
143
144 async fn get_multiple_from_collection(
145 &self,
146 ids: &[DocumentId],
147 collection: &CollectionName,
148 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
149 Ok(self
150 .client
151 .send_api_request(&GetMultiple {
152 database: self.name.to_string(),
153 collection: collection.clone(),
154 ids: ids.to_vec(),
155 })
156 .await?)
157 }
158
159 async fn list_from_collection(
160 &self,
161 ids: Range<DocumentId>,
162 order: Sort,
163 limit: Option<u32>,
164 collection: &CollectionName,
165 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
166 Ok(self
167 .client
168 .send_api_request(&List {
169 database: self.name.to_string(),
170 collection: collection.clone(),
171 ids,
172 order,
173 limit,
174 })
175 .await?)
176 }
177
178 async fn list_headers_from_collection(
179 &self,
180 ids: Range<DocumentId>,
181 order: Sort,
182 limit: Option<u32>,
183 collection: &CollectionName,
184 ) -> Result<Vec<Header>, bonsaidb_core::Error> {
185 Ok(self
186 .client
187 .send_api_request(&ListHeaders(List {
188 database: self.name.to_string(),
189 collection: collection.clone(),
190 ids,
191 order,
192 limit,
193 }))
194 .await?)
195 }
196
197 async fn count_from_collection(
198 &self,
199 ids: Range<DocumentId>,
200 collection: &CollectionName,
201 ) -> Result<u64, bonsaidb_core::Error> {
202 Ok(self
203 .client
204 .send_api_request(&Count {
205 database: self.name.to_string(),
206 collection: collection.clone(),
207 ids,
208 })
209 .await?)
210 }
211
212 async fn compact_collection_by_name(
213 &self,
214 collection: CollectionName,
215 ) -> Result<(), bonsaidb_core::Error> {
216 self.send_api_request(&CompactCollection {
217 database: self.name.to_string(),
218 name: collection,
219 })
220 .await?;
221 Ok(())
222 }
223
224 async fn query_by_name(
225 &self,
226 view: &ViewName,
227 key: Option<SerializedQueryKey>,
228 order: Sort,
229 limit: Option<u32>,
230 access_policy: AccessPolicy,
231 ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
232 Ok(self
233 .client
234 .send_api_request(&Query {
235 database: self.name.to_string(),
236 view: view.clone(),
237 key,
238 order,
239 limit,
240 access_policy,
241 })
242 .await?)
243 }
244
245 async fn query_by_name_with_docs(
246 &self,
247 view: &ViewName,
248 key: Option<SerializedQueryKey>,
249 order: Sort,
250 limit: Option<u32>,
251 access_policy: AccessPolicy,
252 ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
253 Ok(self
254 .client
255 .send_api_request(&QueryWithDocs(Query {
256 database: self.name.to_string(),
257 view: view.clone(),
258 key,
259 order,
260 limit,
261 access_policy,
262 }))
263 .await?)
264 }
265
266 async fn reduce_by_name(
267 &self,
268 view: &ViewName,
269 key: Option<SerializedQueryKey>,
270 access_policy: AccessPolicy,
271 ) -> Result<Vec<u8>, bonsaidb_core::Error> {
272 Ok(self
273 .client
274 .send_api_request(&Reduce {
275 database: self.name.to_string(),
276 view: view.clone(),
277 key,
278 access_policy,
279 })
280 .await?
281 .into_vec())
282 }
283
284 async fn reduce_grouped_by_name(
285 &self,
286 view: &ViewName,
287 key: Option<SerializedQueryKey>,
288 access_policy: AccessPolicy,
289 ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
290 Ok(self
291 .client
292 .send_api_request(&ReduceGrouped(Reduce {
293 database: self.name.to_string(),
294 view: view.clone(),
295 key,
296 access_policy,
297 }))
298 .await?)
299 }
300
301 async fn delete_docs_by_name(
302 &self,
303 view: &ViewName,
304 key: Option<SerializedQueryKey>,
305 access_policy: AccessPolicy,
306 ) -> Result<u64, bonsaidb_core::Error> {
307 Ok(self
308 .client
309 .send_api_request(&DeleteDocs {
310 database: self.name.to_string(),
311 view: view.clone(),
312 key,
313 access_policy,
314 })
315 .await?)
316 }
317}
318
319impl HasSchema for AsyncRemoteDatabase {
320 fn schematic(&self) -> &Schematic {
321 &self.schema
322 }
323}