1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bonsaidb_core::connection::{
5 self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
6 Connection, HasSchema, HasSession, IdentityReference, LowLevelConnection, Range,
7 SerializedQueryKey, Session, Sort, StorageConnection,
8};
9use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10use bonsaidb_core::keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output};
11use bonsaidb_core::permissions::Permissions;
12use bonsaidb_core::pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver};
13use bonsaidb_core::schema::view::map::MappedSerializedValue;
14use bonsaidb_core::schema::{
15 self, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic, ViewName,
16};
17use bonsaidb_core::transaction::{self, OperationResult, Transaction};
18
19use crate::config::StorageConfiguration;
20use crate::database::DatabaseNonBlocking;
21use crate::storage::{AnyBackupLocation, StorageNonBlocking};
22use crate::{Database, Error, Storage, Subscriber};
23
24#[derive(Debug, Clone)]
124#[must_use]
125pub struct AsyncStorage {
126 pub(crate) storage: Storage,
127 pub(crate) runtime: Arc<tokio::runtime::Handle>,
128}
129
130impl AsyncStorage {
131 pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
133 tokio::task::spawn_blocking(move || Storage::open(configuration))
134 .await?
135 .map(Storage::into_async)
136 }
137
138 pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
140 let task_self = self.clone();
141 self.runtime
142 .spawn_blocking(move || task_self.storage.restore(&location))
143 .await?
144 }
145
146 pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
148 let task_self = self.clone();
149 self.runtime
150 .spawn_blocking(move || task_self.storage.backup(&location))
151 .await?
152 }
153
154 #[must_use]
157 pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
158 self.storage
159 .with_effective_permissions(effective_permissions)
160 .map(|storage| Self {
161 storage,
162 runtime: self.runtime.clone(),
163 })
164 }
165
166 #[cfg(feature = "internal-apis")]
167 #[doc(hidden)]
168 pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
169 let name = name.to_owned();
170 let task_self = self.clone();
171 self.runtime
172 .spawn_blocking(move || {
173 task_self
174 .storage
175 .database_without_schema(&name)
176 .map(Database::into_async)
177 })
178 .await?
179 }
180
181 pub fn into_blocking(self) -> Storage {
184 self.storage
185 }
186
187 pub fn to_blocking(&self) -> Storage {
190 self.storage.clone()
191 }
192
193 pub fn as_blocking(&self) -> &Storage {
196 &self.storage
197 }
198}
199
200impl<'a> From<&'a AsyncStorage> for Storage {
201 fn from(storage: &'a AsyncStorage) -> Self {
202 storage.to_blocking()
203 }
204}
205
206impl From<AsyncStorage> for Storage {
207 fn from(storage: AsyncStorage) -> Self {
208 storage.into_blocking()
209 }
210}
211
212impl StorageNonBlocking for AsyncStorage {
213 fn path(&self) -> &std::path::Path {
214 self.storage.path()
215 }
216
217 fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
218 self.storage.assume_session(session).map(|storage| Self {
219 storage,
220 runtime: self.runtime.clone(),
221 })
222 }
223}
224
225#[derive(Debug, Clone)]
278pub struct AsyncDatabase {
279 pub(crate) database: Database,
280 pub(crate) runtime: Arc<tokio::runtime::Handle>,
281}
282
283impl AsyncDatabase {
284 pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
286 tokio::task::spawn_blocking(move || {
287 Database::open::<DB>(configuration).map(Database::into_async)
288 })
289 .await?
290 }
291
292 #[must_use]
295 pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
296 self.database
297 .with_effective_permissions(effective_permissions)
298 .map(|database| Self {
299 database,
300 runtime: self.runtime.clone(),
301 })
302 }
303
304 #[must_use]
307 pub fn into_blocking(self) -> Database {
308 self.database
309 }
310
311 #[must_use]
314 pub fn to_blocking(&self) -> Database {
315 self.database.clone()
316 }
317
318 #[must_use]
321 pub fn as_blocking(&self) -> &Database {
322 &self.database
323 }
324}
325
326impl From<AsyncDatabase> for Database {
327 fn from(database: AsyncDatabase) -> Self {
328 database.into_blocking()
329 }
330}
331
332impl<'a> From<&'a AsyncDatabase> for Database {
333 fn from(database: &'a AsyncDatabase) -> Self {
334 database.to_blocking()
335 }
336}
337
338impl DatabaseNonBlocking for AsyncDatabase {
339 fn name(&self) -> &str {
340 self.database.name()
341 }
342}
343
344impl HasSession for AsyncStorage {
345 fn session(&self) -> Option<&Session> {
346 self.storage.session()
347 }
348}
349
350#[async_trait]
351impl AsyncStorageConnection for AsyncStorage {
352 type Authenticated = Self;
353 type Database = AsyncDatabase;
354
355 async fn admin(&self) -> Self::Database {
356 let task_self = self.clone();
357
358 self.runtime
359 .spawn_blocking(move || task_self.storage.admin())
360 .await
361 .unwrap()
362 .into_async()
363 }
364
365 async fn create_database_with_schema(
366 &self,
367 name: &str,
368 schema: SchemaName,
369 only_if_needed: bool,
370 ) -> Result<(), bonsaidb_core::Error> {
371 let task_self = self.clone();
372 let name = name.to_owned();
373 self.runtime
374 .spawn_blocking(move || {
375 StorageConnection::create_database_with_schema(
376 &task_self.storage,
377 &name,
378 schema,
379 only_if_needed,
380 )
381 })
382 .await
383 .map_err(Error::from)?
384 }
385
386 async fn database<DB: Schema>(
387 &self,
388 name: &str,
389 ) -> Result<Self::Database, bonsaidb_core::Error> {
390 let task_self = self.clone();
391 let name = name.to_owned();
392 self.runtime
393 .spawn_blocking(move || {
394 task_self
395 .storage
396 .database::<DB>(&name)
397 .map(Database::into_async)
398 })
399 .await
400 .map_err(Error::from)?
401 }
402
403 async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
404 let task_self = self.clone();
405 let name = name.to_owned();
406 self.runtime
407 .spawn_blocking(move || task_self.storage.delete_database(&name))
408 .await
409 .map_err(Error::from)?
410 }
411
412 async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
413 let task_self = self.clone();
414 self.runtime
415 .spawn_blocking(move || task_self.storage.list_databases())
416 .await
417 .map_err(Error::from)?
418 }
419
420 async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
421 let task_self = self.clone();
422 self.runtime
423 .spawn_blocking(move || task_self.storage.list_available_schemas())
424 .await
425 .map_err(Error::from)?
426 }
427
428 async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
429 let task_self = self.clone();
430 let username = username.to_owned();
431 self.runtime
432 .spawn_blocking(move || task_self.storage.create_user(&username))
433 .await
434 .map_err(Error::from)?
435 }
436
437 async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
438 &self,
439 user: U,
440 ) -> Result<(), bonsaidb_core::Error> {
441 let task_self = self.clone();
442 let user = user.name()?.into_owned();
443 self.runtime
444 .spawn_blocking(move || task_self.storage.delete_user(user))
445 .await
446 .map_err(Error::from)?
447 }
448
449 #[cfg(feature = "password-hashing")]
450 async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
451 &self,
452 user: U,
453 password: bonsaidb_core::connection::SensitiveString,
454 ) -> Result<(), bonsaidb_core::Error> {
455 let task_self = self.clone();
456 let user = user.name()?.into_owned();
457 self.runtime
458 .spawn_blocking(move || task_self.storage.set_user_password(user, password))
459 .await
460 .map_err(Error::from)?
461 }
462
463 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
464 async fn authenticate(
465 &self,
466 authentication: bonsaidb_core::connection::Authentication,
467 ) -> Result<Self, bonsaidb_core::Error> {
468 let task_self = self.clone();
469 self.runtime
470 .spawn_blocking(move || {
471 task_self
472 .storage
473 .authenticate(authentication)
474 .map(Storage::into_async)
475 })
476 .await
477 .map_err(Error::from)?
478 }
479
480 async fn assume_identity(
481 &self,
482 identity: IdentityReference<'_>,
483 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
484 let task_self = self.clone();
485 let identity = identity.into_owned();
486 self.runtime
487 .spawn_blocking(move || {
488 task_self
489 .storage
490 .assume_identity(identity)
491 .map(Storage::into_async)
492 })
493 .await
494 .map_err(Error::from)?
495 }
496
497 async fn add_permission_group_to_user<
498 'user,
499 'group,
500 U: Nameable<'user, u64> + Send + Sync,
501 G: Nameable<'group, u64> + Send + Sync,
502 >(
503 &self,
504 user: U,
505 permission_group: G,
506 ) -> Result<(), bonsaidb_core::Error> {
507 let task_self = self.clone();
508 let user = user.name()?.into_owned();
509 let group = permission_group.name()?.into_owned();
510 self.runtime
511 .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
512 .await
513 .map_err(Error::from)?
514 }
515
516 async fn remove_permission_group_from_user<
517 'user,
518 'group,
519 U: Nameable<'user, u64> + Send + Sync,
520 G: Nameable<'group, u64> + Send + Sync,
521 >(
522 &self,
523 user: U,
524 permission_group: G,
525 ) -> Result<(), bonsaidb_core::Error> {
526 let task_self = self.clone();
527 let user = user.name()?.into_owned();
528 let group = permission_group.name()?.into_owned();
529 self.runtime
530 .spawn_blocking(move || {
531 task_self
532 .storage
533 .remove_permission_group_from_user(user, group)
534 })
535 .await
536 .map_err(Error::from)?
537 }
538
539 async fn add_role_to_user<
540 'user,
541 'group,
542 U: Nameable<'user, u64> + Send + Sync,
543 G: Nameable<'group, u64> + Send + Sync,
544 >(
545 &self,
546 user: U,
547 role: G,
548 ) -> Result<(), bonsaidb_core::Error> {
549 let task_self = self.clone();
550 let user = user.name()?.into_owned();
551 let role = role.name()?.into_owned();
552 self.runtime
553 .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
554 .await
555 .map_err(Error::from)?
556 }
557
558 async fn remove_role_from_user<
559 'user,
560 'group,
561 U: Nameable<'user, u64> + Send + Sync,
562 G: Nameable<'group, u64> + Send + Sync,
563 >(
564 &self,
565 user: U,
566 role: G,
567 ) -> Result<(), bonsaidb_core::Error> {
568 let task_self = self.clone();
569 let user = user.name()?.into_owned();
570 let role = role.name()?.into_owned();
571 self.runtime
572 .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
573 .await
574 .map_err(Error::from)?
575 }
576}
577
578impl HasSession for AsyncDatabase {
579 fn session(&self) -> Option<&Session> {
580 self.database.session()
581 }
582}
583
584#[async_trait]
585impl AsyncConnection for AsyncDatabase {
586 type Storage = AsyncStorage;
587
588 fn storage(&self) -> Self::Storage {
589 AsyncStorage {
590 storage: self.database.storage(),
591 runtime: self.runtime.clone(),
592 }
593 }
594
595 async fn list_executed_transactions(
596 &self,
597 starting_id: Option<u64>,
598 result_limit: Option<u32>,
599 ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
600 let task_self = self.clone();
601 self.runtime
602 .spawn_blocking(move || {
603 task_self
604 .database
605 .list_executed_transactions(starting_id, result_limit)
606 })
607 .await
608 .map_err(Error::from)?
609 }
610
611 async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
612 Ok(self
613 .database
614 .roots()
615 .transactions()
616 .current_transaction_id())
617 }
618
619 async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
620 let task_self = self.clone();
621 self.runtime
622 .spawn_blocking(move || Connection::compact(&task_self.database))
623 .await
624 .map_err(Error::from)?
625 }
626
627 async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
628 let task_self = self.clone();
629 self.runtime
630 .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
631 .await
632 .map_err(Error::from)?
633 }
634
635 async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
636 let task_self = self.clone();
637 self.runtime
638 .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
639 .await
640 .map_err(Error::from)?
641 }
642}
643
644#[async_trait]
645impl AsyncKeyValue for AsyncDatabase {
646 async fn execute_key_operation(
647 &self,
648 op: KeyOperation,
649 ) -> Result<Output, bonsaidb_core::Error> {
650 let task_self = self.clone();
651 self.runtime
652 .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
653 .await
654 .map_err(Error::from)?
655 }
656}
657
658#[async_trait]
659impl AsyncPubSub for AsyncDatabase {
660 type Subscriber = Subscriber;
661
662 async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
663 PubSub::create_subscriber(&self.database)
664 }
665
666 async fn publish_bytes(
667 &self,
668 topic: Vec<u8>,
669 payload: Vec<u8>,
670 ) -> Result<(), bonsaidb_core::Error> {
671 PubSub::publish_bytes(&self.database, topic, payload)
672 }
673
674 async fn publish_bytes_to_all(
675 &self,
676 topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
677 payload: Vec<u8>,
678 ) -> Result<(), bonsaidb_core::Error> {
679 PubSub::publish_bytes_to_all(&self.database, topics, payload)
680 }
681}
682
683#[async_trait]
684impl AsyncSubscriber for Subscriber {
685 async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
686 pubsub::Subscriber::subscribe_to_bytes(self, topic)
687 }
688
689 async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
690 pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
691 }
692
693 fn receiver(&self) -> &Receiver {
694 pubsub::Subscriber::receiver(self)
695 }
696}
697
698impl HasSchema for AsyncDatabase {
699 fn schematic(&self) -> &Schematic {
700 self.database.schematic()
701 }
702}
703
704#[async_trait]
705impl AsyncLowLevelConnection for AsyncDatabase {
706 async fn apply_transaction(
707 &self,
708 transaction: Transaction,
709 ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
710 let task_self = self.clone();
711 self.runtime
712 .spawn_blocking(move || task_self.database.apply_transaction(transaction))
713 .await
714 .map_err(Error::from)?
715 }
716
717 async fn get_from_collection(
718 &self,
719 id: DocumentId,
720 collection: &CollectionName,
721 ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
722 let task_self = self.clone();
723 let collection = collection.clone();
724 self.runtime
725 .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
726 .await
727 .map_err(Error::from)?
728 }
729
730 async fn list_from_collection(
731 &self,
732 ids: Range<DocumentId>,
733 order: Sort,
734 limit: Option<u32>,
735 collection: &CollectionName,
736 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
737 let task_self = self.clone();
738 let collection = collection.clone();
739 self.runtime
740 .spawn_blocking(move || {
741 task_self
742 .database
743 .list_from_collection(ids, order, limit, &collection)
744 })
745 .await
746 .map_err(Error::from)?
747 }
748
749 async fn list_headers_from_collection(
750 &self,
751 ids: Range<DocumentId>,
752 order: Sort,
753 limit: Option<u32>,
754 collection: &CollectionName,
755 ) -> Result<Vec<Header>, bonsaidb_core::Error> {
756 let task_self = self.clone();
757 let collection = collection.clone();
758 self.runtime
759 .spawn_blocking(move || {
760 task_self
761 .database
762 .list_headers_from_collection(ids, order, limit, &collection)
763 })
764 .await
765 .map_err(Error::from)?
766 }
767
768 async fn count_from_collection(
769 &self,
770 ids: Range<DocumentId>,
771 collection: &CollectionName,
772 ) -> Result<u64, bonsaidb_core::Error> {
773 let task_self = self.clone();
774 let collection = collection.clone();
775 self.runtime
776 .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
777 .await
778 .map_err(Error::from)?
779 }
780
781 async fn get_multiple_from_collection(
782 &self,
783 ids: &[DocumentId],
784 collection: &CollectionName,
785 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
786 let task_self = self.clone();
787 let ids = ids.to_vec();
789 let collection = collection.clone();
790 self.runtime
791 .spawn_blocking(move || {
792 task_self
793 .database
794 .get_multiple_from_collection(&ids, &collection)
795 })
796 .await
797 .map_err(Error::from)?
798 }
799
800 async fn compact_collection_by_name(
801 &self,
802 collection: CollectionName,
803 ) -> Result<(), bonsaidb_core::Error> {
804 let task_self = self.clone();
805 self.runtime
806 .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
807 .await
808 .map_err(Error::from)?
809 }
810
811 async fn query_by_name(
812 &self,
813 view: &ViewName,
814 key: Option<SerializedQueryKey>,
815 order: Sort,
816 limit: Option<u32>,
817 access_policy: AccessPolicy,
818 ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
819 let task_self = self.clone();
820 let view = view.clone();
821 self.runtime
822 .spawn_blocking(move || {
823 task_self
824 .database
825 .query_by_name(&view, key, order, limit, access_policy)
826 })
827 .await
828 .map_err(Error::from)?
829 }
830
831 async fn query_by_name_with_docs(
832 &self,
833 view: &ViewName,
834 key: Option<SerializedQueryKey>,
835 order: Sort,
836 limit: Option<u32>,
837 access_policy: AccessPolicy,
838 ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
839 let task_self = self.clone();
840 let view = view.clone();
841 self.runtime
842 .spawn_blocking(move || {
843 task_self
844 .database
845 .query_by_name_with_docs(&view, key, order, limit, access_policy)
846 })
847 .await
848 .map_err(Error::from)?
849 }
850
851 async fn reduce_by_name(
852 &self,
853 view: &ViewName,
854 key: Option<SerializedQueryKey>,
855 access_policy: AccessPolicy,
856 ) -> Result<Vec<u8>, bonsaidb_core::Error> {
857 let task_self = self.clone();
858 let view = view.clone();
859 self.runtime
860 .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
861 .await
862 .map_err(Error::from)?
863 }
864
865 async fn reduce_grouped_by_name(
866 &self,
867 view: &ViewName,
868 key: Option<SerializedQueryKey>,
869 access_policy: AccessPolicy,
870 ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
871 let task_self = self.clone();
872 let view = view.clone();
873 self.runtime
874 .spawn_blocking(move || {
875 task_self
876 .database
877 .reduce_grouped_by_name(&view, key, access_policy)
878 })
879 .await
880 .map_err(Error::from)?
881 }
882
883 async fn delete_docs_by_name(
884 &self,
885 view: &ViewName,
886 key: Option<SerializedQueryKey>,
887 access_policy: AccessPolicy,
888 ) -> Result<u64, bonsaidb_core::Error> {
889 let task_self = self.clone();
890 let view = view.clone();
891 self.runtime
892 .spawn_blocking(move || {
893 task_self
894 .database
895 .delete_docs_by_name(&view, key, access_policy)
896 })
897 .await
898 .map_err(Error::from)?
899 }
900}