1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
7use bonsaidb_core::api;
8use bonsaidb_core::arc_bytes::serde::Bytes;
9use bonsaidb_core::connection::{
10 AccessPolicy, Connection, Database, HasSchema, HasSession, IdentityReference,
11 LowLevelConnection, Range, SerializedQueryKey, Sort, StorageConnection,
12};
13use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
14use bonsaidb_core::keyvalue::KeyValue;
15use bonsaidb_core::networking::{
16 AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
17 Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
18 CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
19 LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
20 ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo,
21 UnsubscribeFrom, CURRENT_PROTOCOL_VERSION,
22};
23use bonsaidb_core::pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber};
24use bonsaidb_core::schema::view::map;
25use bonsaidb_core::schema::{CollectionName, ViewName};
26use futures::Future;
27use tokio::runtime::{Handle, Runtime};
28use tokio::sync::oneshot;
29use tokio::task::JoinHandle;
30use url::Url;
31
32use crate::builder::Blocking;
33use crate::client::ClientSession;
34use crate::{ApiError, AsyncClient, AsyncRemoteDatabase, AsyncRemoteSubscriber, Builder, Error};
35
36#[derive(Debug, Clone)]
38pub struct BlockingClient(pub(crate) AsyncClient);
39
40impl BlockingClient {
41 pub fn build(url: Url) -> Builder<Blocking> {
43 Builder::new(url)
44 }
45
46 pub fn new(url: Url) -> Result<Self, Error> {
62 AsyncClient::new_from_parts(
63 url,
64 CURRENT_PROTOCOL_VERSION,
65 HashMap::default(),
66 None,
67 None,
68 #[cfg(not(target_arch = "wasm32"))]
69 None,
70 #[cfg(not(target_arch = "wasm32"))]
71 Handle::try_current().ok(),
72 )
73 .map(Self)
74 }
75
76 pub fn send_api_request<Api: api::Api>(
78 &self,
79 request: &Api,
80 ) -> Result<Api::Response, ApiError<Api::Error>> {
81 self.0.send_blocking_api_request(request)
82 }
83
84 pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
87 let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
88 self.0
89 .send_request_without_confirmation(Api::name(), request)
90 .map(|_| ())
91 }
92
93 #[must_use]
95 pub fn as_async(&self) -> &AsyncClient {
96 &self.0
97 }
98
99 pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
104 self.0.request_timeout = timeout.into();
105 }
106}
107
108impl From<AsyncClient> for BlockingClient {
109 fn from(client: AsyncClient) -> Self {
110 Self(client)
111 }
112}
113
114impl From<BlockingClient> for AsyncClient {
115 fn from(client: BlockingClient) -> Self {
116 client.0
117 }
118}
119
120impl StorageConnection for BlockingClient {
121 type Authenticated = Self;
122 type Database = BlockingRemoteDatabase;
123
124 fn admin(&self) -> Self::Database {
125 BlockingRemoteDatabase(
126 self.0
127 .remote_database::<Admin>(ADMIN_DATABASE_NAME)
128 .unwrap(),
129 )
130 }
131
132 fn database<DB: bonsaidb_core::schema::Schema>(
133 &self,
134 name: &str,
135 ) -> Result<Self::Database, bonsaidb_core::Error> {
136 self.0
137 .remote_database::<DB>(name)
138 .map(BlockingRemoteDatabase)
139 }
140
141 fn create_database_with_schema(
142 &self,
143 name: &str,
144 schema: bonsaidb_core::schema::SchemaName,
145 only_if_needed: bool,
146 ) -> Result<(), bonsaidb_core::Error> {
147 self.send_api_request(&CreateDatabase {
148 database: Database {
149 name: name.to_string(),
150 schema,
151 },
152 only_if_needed,
153 })?;
154 Ok(())
155 }
156
157 fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
158 self.send_api_request(&DeleteDatabase {
159 name: name.to_string(),
160 })?;
161 Ok(())
162 }
163
164 fn list_databases(
165 &self,
166 ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
167 Ok(self.send_api_request(&ListDatabases)?)
168 }
169
170 fn list_available_schemas(
171 &self,
172 ) -> Result<Vec<bonsaidb_core::schema::SchemaSummary>, bonsaidb_core::Error> {
173 Ok(self.send_api_request(&ListAvailableSchemas)?)
174 }
175
176 fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
177 Ok(self.send_api_request(&CreateUser {
178 username: username.to_string(),
179 })?)
180 }
181
182 fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
183 &self,
184 user: U,
185 ) -> Result<(), bonsaidb_core::Error> {
186 Ok(self.send_api_request(&DeleteUser {
187 user: user.name()?.into_owned(),
188 })?)
189 }
190
191 #[cfg(feature = "password-hashing")]
192 fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
193 &self,
194 user: U,
195 password: bonsaidb_core::connection::SensitiveString,
196 ) -> Result<(), bonsaidb_core::Error> {
197 use bonsaidb_core::networking::SetUserPassword;
198
199 Ok(self.send_api_request(&SetUserPassword {
200 user: user.name()?.into_owned(),
201 password,
202 })?)
203 }
204
205 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
206 fn authenticate(
207 &self,
208 authentication: bonsaidb_core::connection::Authentication,
209 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
210 let session =
211 self.send_api_request(&bonsaidb_core::networking::Authenticate { authentication })?;
212 Ok(Self(AsyncClient {
213 data: self.0.data.clone(),
214 session: ClientSession {
215 session: Arc::new(session),
216 connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
217 },
218 request_timeout: self.0.request_timeout,
219 }))
220 }
221
222 fn assume_identity(
223 &self,
224 identity: IdentityReference<'_>,
225 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
226 let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
227 Ok(Self(AsyncClient {
228 data: self.0.data.clone(),
229 session: ClientSession {
230 session: Arc::new(session),
231 connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
232 },
233 request_timeout: self.0.request_timeout,
234 }))
235 }
236
237 fn add_permission_group_to_user<
238 'user,
239 'group,
240 U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
241 G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
242 >(
243 &self,
244 user: U,
245 permission_group: G,
246 ) -> Result<(), bonsaidb_core::Error> {
247 self.send_api_request(&AlterUserPermissionGroupMembership {
248 user: user.name()?.into_owned(),
249 group: permission_group.name()?.into_owned(),
250 should_be_member: true,
251 })?;
252 Ok(())
253 }
254
255 fn remove_permission_group_from_user<
256 'user,
257 'group,
258 U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
259 G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
260 >(
261 &self,
262 user: U,
263 permission_group: G,
264 ) -> Result<(), bonsaidb_core::Error> {
265 self.send_api_request(&AlterUserPermissionGroupMembership {
266 user: user.name()?.into_owned(),
267 group: permission_group.name()?.into_owned(),
268 should_be_member: false,
269 })?;
270 Ok(())
271 }
272
273 fn add_role_to_user<
274 'user,
275 'role,
276 U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
277 R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
278 >(
279 &self,
280 user: U,
281 role: R,
282 ) -> Result<(), bonsaidb_core::Error> {
283 self.send_api_request(&AlterUserRoleMembership {
284 user: user.name()?.into_owned(),
285 role: role.name()?.into_owned(),
286 should_be_member: true,
287 })?;
288 Ok(())
289 }
290
291 fn remove_role_from_user<
292 'user,
293 'role,
294 U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
295 R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
296 >(
297 &self,
298 user: U,
299 role: R,
300 ) -> Result<(), bonsaidb_core::Error> {
301 self.send_api_request(&AlterUserRoleMembership {
302 user: user.name()?.into_owned(),
303 role: role.name()?.into_owned(),
304 should_be_member: false,
305 })?;
306 Ok(())
307 }
308}
309
310impl HasSession for BlockingClient {
311 fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
312 self.0.session()
313 }
314}
315
316#[derive(Debug, Clone)]
319pub struct BlockingRemoteDatabase(AsyncRemoteDatabase);
320
321impl Connection for BlockingRemoteDatabase {
322 type Storage = BlockingClient;
323
324 fn storage(&self) -> Self::Storage {
325 BlockingClient(self.0.client.clone())
326 }
327
328 fn list_executed_transactions(
329 &self,
330 starting_id: Option<u64>,
331 result_limit: Option<u32>,
332 ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
333 Ok(self
334 .0
335 .client
336 .send_blocking_api_request(&ListExecutedTransactions {
337 database: self.0.name.to_string(),
338 starting_id,
339 result_limit,
340 })?)
341 }
342
343 fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
344 Ok(self
345 .0
346 .client
347 .send_blocking_api_request(&LastTransactionId {
348 database: self.0.name.to_string(),
349 })?)
350 }
351
352 fn compact(&self) -> Result<(), bonsaidb_core::Error> {
353 self.0.send_blocking_api_request(&Compact {
354 database: self.0.name.to_string(),
355 })?;
356 Ok(())
357 }
358
359 fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
360 self.0.send_blocking_api_request(&CompactKeyValueStore {
361 database: self.0.name.to_string(),
362 })?;
363 Ok(())
364 }
365}
366
367impl LowLevelConnection for BlockingRemoteDatabase {
368 fn apply_transaction(
369 &self,
370 transaction: bonsaidb_core::transaction::Transaction,
371 ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
372 Ok(self.0.client.send_blocking_api_request(&ApplyTransaction {
373 database: self.0.name.to_string(),
374 transaction,
375 })?)
376 }
377
378 fn get_from_collection(
379 &self,
380 id: bonsaidb_core::document::DocumentId,
381 collection: &CollectionName,
382 ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
383 Ok(self.0.client.send_blocking_api_request(&Get {
384 database: self.0.name.to_string(),
385 collection: collection.clone(),
386 id,
387 })?)
388 }
389
390 fn get_multiple_from_collection(
391 &self,
392 ids: &[bonsaidb_core::document::DocumentId],
393 collection: &CollectionName,
394 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
395 Ok(self.0.client.send_blocking_api_request(&GetMultiple {
396 database: self.0.name.to_string(),
397 collection: collection.clone(),
398 ids: ids.to_vec(),
399 })?)
400 }
401
402 fn list_from_collection(
403 &self,
404 ids: Range<bonsaidb_core::document::DocumentId>,
405 order: Sort,
406 limit: Option<u32>,
407 collection: &CollectionName,
408 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
409 Ok(self.0.client.send_blocking_api_request(&List {
410 database: self.0.name.to_string(),
411 collection: collection.clone(),
412 ids,
413 order,
414 limit,
415 })?)
416 }
417
418 fn list_headers_from_collection(
419 &self,
420 ids: Range<DocumentId>,
421 order: Sort,
422 limit: Option<u32>,
423 collection: &CollectionName,
424 ) -> Result<Vec<Header>, bonsaidb_core::Error> {
425 Ok(self.0.client.send_blocking_api_request(&ListHeaders(List {
426 database: self.0.name.to_string(),
427 collection: collection.clone(),
428 ids,
429 order,
430 limit,
431 }))?)
432 }
433
434 fn count_from_collection(
435 &self,
436 ids: Range<bonsaidb_core::document::DocumentId>,
437 collection: &CollectionName,
438 ) -> Result<u64, bonsaidb_core::Error> {
439 Ok(self.0.client.send_blocking_api_request(&Count {
440 database: self.0.name.to_string(),
441 collection: collection.clone(),
442 ids,
443 })?)
444 }
445
446 fn compact_collection_by_name(
447 &self,
448 collection: CollectionName,
449 ) -> Result<(), bonsaidb_core::Error> {
450 self.0.send_blocking_api_request(&CompactCollection {
451 database: self.0.name.to_string(),
452 name: collection,
453 })?;
454 Ok(())
455 }
456
457 fn query_by_name(
458 &self,
459 view: &ViewName,
460 key: Option<SerializedQueryKey>,
461 order: Sort,
462 limit: Option<u32>,
463 access_policy: AccessPolicy,
464 ) -> Result<Vec<map::Serialized>, bonsaidb_core::Error> {
465 Ok(self.0.client.send_blocking_api_request(&Query {
466 database: self.0.name.to_string(),
467 view: view.clone(),
468 key,
469 order,
470 limit,
471 access_policy,
472 })?)
473 }
474
475 fn query_by_name_with_docs(
476 &self,
477 view: &bonsaidb_core::schema::ViewName,
478 key: Option<SerializedQueryKey>,
479 order: Sort,
480 limit: Option<u32>,
481 access_policy: AccessPolicy,
482 ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
483 {
484 Ok(self
485 .0
486 .client
487 .send_blocking_api_request(&QueryWithDocs(Query {
488 database: self.0.name.to_string(),
489 view: view.clone(),
490 key,
491 order,
492 limit,
493 access_policy,
494 }))?)
495 }
496
497 fn reduce_by_name(
498 &self,
499 view: &bonsaidb_core::schema::ViewName,
500 key: Option<SerializedQueryKey>,
501 access_policy: AccessPolicy,
502 ) -> Result<Vec<u8>, bonsaidb_core::Error> {
503 Ok(self
504 .0
505 .client
506 .send_blocking_api_request(&Reduce {
507 database: self.0.name.to_string(),
508 view: view.clone(),
509 key,
510 access_policy,
511 })?
512 .into_vec())
513 }
514
515 fn reduce_grouped_by_name(
516 &self,
517 view: &bonsaidb_core::schema::ViewName,
518 key: Option<SerializedQueryKey>,
519 access_policy: AccessPolicy,
520 ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
521 {
522 Ok(self
523 .0
524 .client
525 .send_blocking_api_request(&ReduceGrouped(Reduce {
526 database: self.0.name.to_string(),
527 view: view.clone(),
528 key,
529 access_policy,
530 }))?)
531 }
532
533 fn delete_docs_by_name(
534 &self,
535 view: &bonsaidb_core::schema::ViewName,
536 key: Option<SerializedQueryKey>,
537 access_policy: AccessPolicy,
538 ) -> Result<u64, bonsaidb_core::Error> {
539 Ok(self.0.client.send_blocking_api_request(&DeleteDocs {
540 database: self.0.name.to_string(),
541 view: view.clone(),
542 key,
543 access_policy,
544 })?)
545 }
546}
547
548impl HasSession for BlockingRemoteDatabase {
549 fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
550 self.0.session()
551 }
552}
553
554impl HasSchema for BlockingRemoteDatabase {
555 fn schematic(&self) -> &bonsaidb_core::schema::Schematic {
556 self.0.schematic()
557 }
558}
559
560impl PubSub for BlockingRemoteDatabase {
561 type Subscriber = BlockingRemoteSubscriber;
562
563 fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
564 let subscriber_id = self.0.client.send_blocking_api_request(&CreateSubscriber {
565 database: self.0.name.to_string(),
566 })?;
567
568 let (sender, receiver) = flume::unbounded();
569 self.0.client.register_subscriber(subscriber_id, sender);
570 Ok(BlockingRemoteSubscriber(AsyncRemoteSubscriber {
571 client: self.0.client.clone(),
572 database: self.0.name.clone(),
573 id: subscriber_id,
574 receiver: Receiver::new(receiver),
575 tokio: None,
576 }))
577 }
578
579 fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
580 self.0.client.send_blocking_api_request(&Publish {
581 database: self.0.name.to_string(),
582 topic: Bytes::from(topic),
583 payload: Bytes::from(payload),
584 })?;
585 Ok(())
586 }
587
588 fn publish_bytes_to_all(
589 &self,
590 topics: impl IntoIterator<Item = Vec<u8>> + Send,
591 payload: Vec<u8>,
592 ) -> Result<(), bonsaidb_core::Error> {
593 let topics = topics.into_iter().map(Bytes::from).collect();
594 self.0.client.send_blocking_api_request(&PublishToAll {
595 database: self.0.name.to_string(),
596 topics,
597 payload: Bytes::from(payload),
598 })?;
599 Ok(())
600 }
601}
602
603#[derive(Debug)]
606pub struct BlockingRemoteSubscriber(AsyncRemoteSubscriber);
607
608impl Subscriber for BlockingRemoteSubscriber {
609 fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
610 self.0.client.send_blocking_api_request(&SubscribeTo {
611 database: self.0.database.to_string(),
612 subscriber_id: self.0.id,
613 topic: Bytes::from(topic),
614 })?;
615 Ok(())
616 }
617
618 fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
619 self.0.client.send_blocking_api_request(&UnsubscribeFrom {
620 database: self.0.database.to_string(),
621 subscriber_id: self.0.id,
622 topic: Bytes::from(topic),
623 })?;
624 Ok(())
625 }
626
627 fn receiver(&self) -> &Receiver {
628 AsyncSubscriber::receiver(&self.0)
629 }
630}
631
632impl KeyValue for BlockingRemoteDatabase {
633 fn execute_key_operation(
634 &self,
635 op: bonsaidb_core::keyvalue::KeyOperation,
636 ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
637 Ok(self
638 .0
639 .client
640 .send_blocking_api_request(&ExecuteKeyOperation {
641 database: self.0.name.to_string(),
642
643 op,
644 })?)
645 }
646}
647
648pub enum Tokio {
649 Runtime(Runtime),
650 Handle(Handle),
651}
652
653impl Tokio {
654 pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
655 self,
656 task: F,
657 ) -> JoinHandle<Result<(), crate::Error>> {
658 match self {
659 Self::Runtime(tokio) => {
660 let (completion_sender, completion_receiver) = oneshot::channel();
667 let task = async move {
668 task.await?;
669 let _: Result<_, _> = completion_sender.send(());
670 Ok(())
671 };
672 let task = tokio.spawn(task);
673
674 std::thread::spawn(move || {
675 tokio.block_on(async move {
676 let _: Result<_, _> = completion_receiver.await;
677 });
678 });
679 task
680 }
681 Self::Handle(tokio) => tokio.spawn(task),
682 }
683 }
684}
685
686pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
687 task: F,
688 handle: Option<Handle>,
689) -> JoinHandle<Result<(), crate::Error>> {
690 let tokio = if let Some(handle) = handle {
692 Tokio::Handle(handle)
693 } else {
694 Tokio::Runtime(Runtime::new().unwrap())
695 };
696 tokio.spawn(task)
697}