1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bonsaidb_core::connection::{
5    self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
6    Connection, HasSchema, HasSession, IdentityReference, LowLevelConnection, Range,
7    SerializedQueryKey, Session, Sort, StorageConnection,
8};
9use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10use bonsaidb_core::keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output};
11use bonsaidb_core::permissions::Permissions;
12use bonsaidb_core::pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver};
13use bonsaidb_core::schema::view::map::MappedSerializedValue;
14use bonsaidb_core::schema::{
15    self, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic, ViewName,
16};
17use bonsaidb_core::transaction::{self, OperationResult, Transaction};
18
19use crate::config::StorageConfiguration;
20use crate::database::DatabaseNonBlocking;
21use crate::storage::{AnyBackupLocation, StorageNonBlocking};
22use crate::{Database, Error, Storage, Subscriber};
23
24#[derive(Debug, Clone)]
124#[must_use]
125pub struct AsyncStorage {
126    pub(crate) storage: Storage,
127    pub(crate) runtime: Arc<tokio::runtime::Handle>,
128}
129
130impl AsyncStorage {
131    pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
133        tokio::task::spawn_blocking(move || Storage::open(configuration))
134            .await?
135            .map(Storage::into_async)
136    }
137
138    pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
140        let task_self = self.clone();
141        self.runtime
142            .spawn_blocking(move || task_self.storage.restore(&location))
143            .await?
144    }
145
146    pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
148        let task_self = self.clone();
149        self.runtime
150            .spawn_blocking(move || task_self.storage.backup(&location))
151            .await?
152    }
153
154    #[must_use]
157    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
158        self.storage
159            .with_effective_permissions(effective_permissions)
160            .map(|storage| Self {
161                storage,
162                runtime: self.runtime.clone(),
163            })
164    }
165
166    #[cfg(feature = "internal-apis")]
167    #[doc(hidden)]
168    pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
169        let name = name.to_owned();
170        let task_self = self.clone();
171        self.runtime
172            .spawn_blocking(move || {
173                task_self
174                    .storage
175                    .database_without_schema(&name)
176                    .map(Database::into_async)
177            })
178            .await?
179    }
180
181    pub fn into_blocking(self) -> Storage {
184        self.storage
185    }
186
187    pub fn to_blocking(&self) -> Storage {
190        self.storage.clone()
191    }
192
193    pub fn as_blocking(&self) -> &Storage {
196        &self.storage
197    }
198}
199
200impl<'a> From<&'a AsyncStorage> for Storage {
201    fn from(storage: &'a AsyncStorage) -> Self {
202        storage.to_blocking()
203    }
204}
205
206impl From<AsyncStorage> for Storage {
207    fn from(storage: AsyncStorage) -> Self {
208        storage.into_blocking()
209    }
210}
211
212impl StorageNonBlocking for AsyncStorage {
213    fn path(&self) -> &std::path::Path {
214        self.storage.path()
215    }
216
217    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
218        self.storage.assume_session(session).map(|storage| Self {
219            storage,
220            runtime: self.runtime.clone(),
221        })
222    }
223}
224
225#[derive(Debug, Clone)]
278pub struct AsyncDatabase {
279    pub(crate) database: Database,
280    pub(crate) runtime: Arc<tokio::runtime::Handle>,
281}
282
283impl AsyncDatabase {
284    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
286        tokio::task::spawn_blocking(move || {
287            Database::open::<DB>(configuration).map(Database::into_async)
288        })
289        .await?
290    }
291
292    #[must_use]
295    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
296        self.database
297            .with_effective_permissions(effective_permissions)
298            .map(|database| Self {
299                database,
300                runtime: self.runtime.clone(),
301            })
302    }
303
304    #[must_use]
307    pub fn into_blocking(self) -> Database {
308        self.database
309    }
310
311    #[must_use]
314    pub fn to_blocking(&self) -> Database {
315        self.database.clone()
316    }
317
318    #[must_use]
321    pub fn as_blocking(&self) -> &Database {
322        &self.database
323    }
324}
325
326impl From<AsyncDatabase> for Database {
327    fn from(database: AsyncDatabase) -> Self {
328        database.into_blocking()
329    }
330}
331
332impl<'a> From<&'a AsyncDatabase> for Database {
333    fn from(database: &'a AsyncDatabase) -> Self {
334        database.to_blocking()
335    }
336}
337
338impl DatabaseNonBlocking for AsyncDatabase {
339    fn name(&self) -> &str {
340        self.database.name()
341    }
342}
343
344impl HasSession for AsyncStorage {
345    fn session(&self) -> Option<&Session> {
346        self.storage.session()
347    }
348}
349
350#[async_trait]
351impl AsyncStorageConnection for AsyncStorage {
352    type Authenticated = Self;
353    type Database = AsyncDatabase;
354
355    async fn admin(&self) -> Self::Database {
356        let task_self = self.clone();
357
358        self.runtime
359            .spawn_blocking(move || task_self.storage.admin())
360            .await
361            .unwrap()
362            .into_async()
363    }
364
365    async fn create_database_with_schema(
366        &self,
367        name: &str,
368        schema: SchemaName,
369        only_if_needed: bool,
370    ) -> Result<(), bonsaidb_core::Error> {
371        let task_self = self.clone();
372        let name = name.to_owned();
373        self.runtime
374            .spawn_blocking(move || {
375                StorageConnection::create_database_with_schema(
376                    &task_self.storage,
377                    &name,
378                    schema,
379                    only_if_needed,
380                )
381            })
382            .await
383            .map_err(Error::from)?
384    }
385
386    async fn database<DB: Schema>(
387        &self,
388        name: &str,
389    ) -> Result<Self::Database, bonsaidb_core::Error> {
390        let task_self = self.clone();
391        let name = name.to_owned();
392        self.runtime
393            .spawn_blocking(move || {
394                task_self
395                    .storage
396                    .database::<DB>(&name)
397                    .map(Database::into_async)
398            })
399            .await
400            .map_err(Error::from)?
401    }
402
403    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
404        let task_self = self.clone();
405        let name = name.to_owned();
406        self.runtime
407            .spawn_blocking(move || task_self.storage.delete_database(&name))
408            .await
409            .map_err(Error::from)?
410    }
411
412    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
413        let task_self = self.clone();
414        self.runtime
415            .spawn_blocking(move || task_self.storage.list_databases())
416            .await
417            .map_err(Error::from)?
418    }
419
420    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
421        let task_self = self.clone();
422        self.runtime
423            .spawn_blocking(move || task_self.storage.list_available_schemas())
424            .await
425            .map_err(Error::from)?
426    }
427
428    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
429        let task_self = self.clone();
430        let username = username.to_owned();
431        self.runtime
432            .spawn_blocking(move || task_self.storage.create_user(&username))
433            .await
434            .map_err(Error::from)?
435    }
436
437    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
438        &self,
439        user: U,
440    ) -> Result<(), bonsaidb_core::Error> {
441        let task_self = self.clone();
442        let user = user.name()?.into_owned();
443        self.runtime
444            .spawn_blocking(move || task_self.storage.delete_user(user))
445            .await
446            .map_err(Error::from)?
447    }
448
449    #[cfg(feature = "password-hashing")]
450    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
451        &self,
452        user: U,
453        password: bonsaidb_core::connection::SensitiveString,
454    ) -> Result<(), bonsaidb_core::Error> {
455        let task_self = self.clone();
456        let user = user.name()?.into_owned();
457        self.runtime
458            .spawn_blocking(move || task_self.storage.set_user_password(user, password))
459            .await
460            .map_err(Error::from)?
461    }
462
463    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
464    async fn authenticate(
465        &self,
466        authentication: bonsaidb_core::connection::Authentication,
467    ) -> Result<Self, bonsaidb_core::Error> {
468        let task_self = self.clone();
469        self.runtime
470            .spawn_blocking(move || {
471                task_self
472                    .storage
473                    .authenticate(authentication)
474                    .map(Storage::into_async)
475            })
476            .await
477            .map_err(Error::from)?
478    }
479
480    async fn assume_identity(
481        &self,
482        identity: IdentityReference<'_>,
483    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
484        let task_self = self.clone();
485        let identity = identity.into_owned();
486        self.runtime
487            .spawn_blocking(move || {
488                task_self
489                    .storage
490                    .assume_identity(identity)
491                    .map(Storage::into_async)
492            })
493            .await
494            .map_err(Error::from)?
495    }
496
497    async fn add_permission_group_to_user<
498        'user,
499        'group,
500        U: Nameable<'user, u64> + Send + Sync,
501        G: Nameable<'group, u64> + Send + Sync,
502    >(
503        &self,
504        user: U,
505        permission_group: G,
506    ) -> Result<(), bonsaidb_core::Error> {
507        let task_self = self.clone();
508        let user = user.name()?.into_owned();
509        let group = permission_group.name()?.into_owned();
510        self.runtime
511            .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
512            .await
513            .map_err(Error::from)?
514    }
515
516    async fn remove_permission_group_from_user<
517        'user,
518        'group,
519        U: Nameable<'user, u64> + Send + Sync,
520        G: Nameable<'group, u64> + Send + Sync,
521    >(
522        &self,
523        user: U,
524        permission_group: G,
525    ) -> Result<(), bonsaidb_core::Error> {
526        let task_self = self.clone();
527        let user = user.name()?.into_owned();
528        let group = permission_group.name()?.into_owned();
529        self.runtime
530            .spawn_blocking(move || {
531                task_self
532                    .storage
533                    .remove_permission_group_from_user(user, group)
534            })
535            .await
536            .map_err(Error::from)?
537    }
538
539    async fn add_role_to_user<
540        'user,
541        'group,
542        U: Nameable<'user, u64> + Send + Sync,
543        G: Nameable<'group, u64> + Send + Sync,
544    >(
545        &self,
546        user: U,
547        role: G,
548    ) -> Result<(), bonsaidb_core::Error> {
549        let task_self = self.clone();
550        let user = user.name()?.into_owned();
551        let role = role.name()?.into_owned();
552        self.runtime
553            .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
554            .await
555            .map_err(Error::from)?
556    }
557
558    async fn remove_role_from_user<
559        'user,
560        'group,
561        U: Nameable<'user, u64> + Send + Sync,
562        G: Nameable<'group, u64> + Send + Sync,
563    >(
564        &self,
565        user: U,
566        role: G,
567    ) -> Result<(), bonsaidb_core::Error> {
568        let task_self = self.clone();
569        let user = user.name()?.into_owned();
570        let role = role.name()?.into_owned();
571        self.runtime
572            .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
573            .await
574            .map_err(Error::from)?
575    }
576}
577
578impl HasSession for AsyncDatabase {
579    fn session(&self) -> Option<&Session> {
580        self.database.session()
581    }
582}
583
584#[async_trait]
585impl AsyncConnection for AsyncDatabase {
586    type Storage = AsyncStorage;
587
588    fn storage(&self) -> Self::Storage {
589        AsyncStorage {
590            storage: self.database.storage(),
591            runtime: self.runtime.clone(),
592        }
593    }
594
595    async fn list_executed_transactions(
596        &self,
597        starting_id: Option<u64>,
598        result_limit: Option<u32>,
599    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
600        let task_self = self.clone();
601        self.runtime
602            .spawn_blocking(move || {
603                task_self
604                    .database
605                    .list_executed_transactions(starting_id, result_limit)
606            })
607            .await
608            .map_err(Error::from)?
609    }
610
611    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
612        Ok(self
613            .database
614            .roots()
615            .transactions()
616            .current_transaction_id())
617    }
618
619    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
620        let task_self = self.clone();
621        self.runtime
622            .spawn_blocking(move || Connection::compact(&task_self.database))
623            .await
624            .map_err(Error::from)?
625    }
626
627    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
628        let task_self = self.clone();
629        self.runtime
630            .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
631            .await
632            .map_err(Error::from)?
633    }
634
635    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
636        let task_self = self.clone();
637        self.runtime
638            .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
639            .await
640            .map_err(Error::from)?
641    }
642}
643
644#[async_trait]
645impl AsyncKeyValue for AsyncDatabase {
646    async fn execute_key_operation(
647        &self,
648        op: KeyOperation,
649    ) -> Result<Output, bonsaidb_core::Error> {
650        let task_self = self.clone();
651        self.runtime
652            .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
653            .await
654            .map_err(Error::from)?
655    }
656}
657
658#[async_trait]
659impl AsyncPubSub for AsyncDatabase {
660    type Subscriber = Subscriber;
661
662    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
663        PubSub::create_subscriber(&self.database)
664    }
665
666    async fn publish_bytes(
667        &self,
668        topic: Vec<u8>,
669        payload: Vec<u8>,
670    ) -> Result<(), bonsaidb_core::Error> {
671        PubSub::publish_bytes(&self.database, topic, payload)
672    }
673
674    async fn publish_bytes_to_all(
675        &self,
676        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
677        payload: Vec<u8>,
678    ) -> Result<(), bonsaidb_core::Error> {
679        PubSub::publish_bytes_to_all(&self.database, topics, payload)
680    }
681}
682
683#[async_trait]
684impl AsyncSubscriber for Subscriber {
685    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
686        pubsub::Subscriber::subscribe_to_bytes(self, topic)
687    }
688
689    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
690        pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
691    }
692
693    fn receiver(&self) -> &Receiver {
694        pubsub::Subscriber::receiver(self)
695    }
696}
697
698impl HasSchema for AsyncDatabase {
699    fn schematic(&self) -> &Schematic {
700        self.database.schematic()
701    }
702}
703
704#[async_trait]
705impl AsyncLowLevelConnection for AsyncDatabase {
706    async fn apply_transaction(
707        &self,
708        transaction: Transaction,
709    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
710        let task_self = self.clone();
711        self.runtime
712            .spawn_blocking(move || task_self.database.apply_transaction(transaction))
713            .await
714            .map_err(Error::from)?
715    }
716
717    async fn get_from_collection(
718        &self,
719        id: DocumentId,
720        collection: &CollectionName,
721    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
722        let task_self = self.clone();
723        let collection = collection.clone();
724        self.runtime
725            .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
726            .await
727            .map_err(Error::from)?
728    }
729
730    async fn list_from_collection(
731        &self,
732        ids: Range<DocumentId>,
733        order: Sort,
734        limit: Option<u32>,
735        collection: &CollectionName,
736    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
737        let task_self = self.clone();
738        let collection = collection.clone();
739        self.runtime
740            .spawn_blocking(move || {
741                task_self
742                    .database
743                    .list_from_collection(ids, order, limit, &collection)
744            })
745            .await
746            .map_err(Error::from)?
747    }
748
749    async fn list_headers_from_collection(
750        &self,
751        ids: Range<DocumentId>,
752        order: Sort,
753        limit: Option<u32>,
754        collection: &CollectionName,
755    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
756        let task_self = self.clone();
757        let collection = collection.clone();
758        self.runtime
759            .spawn_blocking(move || {
760                task_self
761                    .database
762                    .list_headers_from_collection(ids, order, limit, &collection)
763            })
764            .await
765            .map_err(Error::from)?
766    }
767
768    async fn count_from_collection(
769        &self,
770        ids: Range<DocumentId>,
771        collection: &CollectionName,
772    ) -> Result<u64, bonsaidb_core::Error> {
773        let task_self = self.clone();
774        let collection = collection.clone();
775        self.runtime
776            .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
777            .await
778            .map_err(Error::from)?
779    }
780
781    async fn get_multiple_from_collection(
782        &self,
783        ids: &[DocumentId],
784        collection: &CollectionName,
785    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
786        let task_self = self.clone();
787        let ids = ids.to_vec();
789        let collection = collection.clone();
790        self.runtime
791            .spawn_blocking(move || {
792                task_self
793                    .database
794                    .get_multiple_from_collection(&ids, &collection)
795            })
796            .await
797            .map_err(Error::from)?
798    }
799
800    async fn compact_collection_by_name(
801        &self,
802        collection: CollectionName,
803    ) -> Result<(), bonsaidb_core::Error> {
804        let task_self = self.clone();
805        self.runtime
806            .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
807            .await
808            .map_err(Error::from)?
809    }
810
811    async fn query_by_name(
812        &self,
813        view: &ViewName,
814        key: Option<SerializedQueryKey>,
815        order: Sort,
816        limit: Option<u32>,
817        access_policy: AccessPolicy,
818    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
819        let task_self = self.clone();
820        let view = view.clone();
821        self.runtime
822            .spawn_blocking(move || {
823                task_self
824                    .database
825                    .query_by_name(&view, key, order, limit, access_policy)
826            })
827            .await
828            .map_err(Error::from)?
829    }
830
831    async fn query_by_name_with_docs(
832        &self,
833        view: &ViewName,
834        key: Option<SerializedQueryKey>,
835        order: Sort,
836        limit: Option<u32>,
837        access_policy: AccessPolicy,
838    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
839        let task_self = self.clone();
840        let view = view.clone();
841        self.runtime
842            .spawn_blocking(move || {
843                task_self
844                    .database
845                    .query_by_name_with_docs(&view, key, order, limit, access_policy)
846            })
847            .await
848            .map_err(Error::from)?
849    }
850
851    async fn reduce_by_name(
852        &self,
853        view: &ViewName,
854        key: Option<SerializedQueryKey>,
855        access_policy: AccessPolicy,
856    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
857        let task_self = self.clone();
858        let view = view.clone();
859        self.runtime
860            .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
861            .await
862            .map_err(Error::from)?
863    }
864
865    async fn reduce_grouped_by_name(
866        &self,
867        view: &ViewName,
868        key: Option<SerializedQueryKey>,
869        access_policy: AccessPolicy,
870    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
871        let task_self = self.clone();
872        let view = view.clone();
873        self.runtime
874            .spawn_blocking(move || {
875                task_self
876                    .database
877                    .reduce_grouped_by_name(&view, key, access_policy)
878            })
879            .await
880            .map_err(Error::from)?
881    }
882
883    async fn delete_docs_by_name(
884        &self,
885        view: &ViewName,
886        key: Option<SerializedQueryKey>,
887        access_policy: AccessPolicy,
888    ) -> Result<u64, bonsaidb_core::Error> {
889        let task_self = self.clone();
890        let view = view.clone();
891        self.runtime
892            .spawn_blocking(move || {
893                task_self
894                    .database
895                    .delete_docs_by_name(&view, key, access_policy)
896            })
897            .await
898            .map_err(Error::from)?
899    }
900}