bonsaidb_local/
async.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bonsaidb_core::connection::{
5    self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
6    Connection, HasSchema, HasSession, IdentityReference, LowLevelConnection, Range,
7    SerializedQueryKey, Session, Sort, StorageConnection,
8};
9use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10use bonsaidb_core::keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output};
11use bonsaidb_core::permissions::Permissions;
12use bonsaidb_core::pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver};
13use bonsaidb_core::schema::view::map::MappedSerializedValue;
14use bonsaidb_core::schema::{
15    self, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic, ViewName,
16};
17use bonsaidb_core::transaction::{self, OperationResult, Transaction};
18
19use crate::config::StorageConfiguration;
20use crate::database::DatabaseNonBlocking;
21use crate::storage::{AnyBackupLocation, StorageNonBlocking};
22use crate::{Database, Error, Storage, Subscriber};
23
24/// A file-based, multi-database, multi-user database engine. This type is
25/// designed for use with [Tokio](https://tokio.rs). For blocking
26/// (non-asynchronous) code, see the [`Storage`] type instead.
27///
28/// ## Converting between Blocking and Async Types
29///
30/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
31/// using:
32///
33/// - [`AsyncStorage::into_blocking()`]
34/// - [`AsyncStorage::to_blocking()`]
35/// - [`AsyncStorage::as_blocking()`]
36/// - [`Storage::into_async()`]
37/// - [`Storage::to_async()`]
38/// - [`Storage::into_async_with_runtime()`]
39/// - [`Storage::to_async_with_runtime()`]
40///
41/// ## Converting from `AsyncDatabase::open` to `AsyncStorage::open`
42///
43/// [`AsyncDatabase::open`](AsyncDatabase::open) is a simple method that uses
44/// `AsyncStorage` to create a database named `default` with the schema
45/// provided. These two ways of opening the database are the same:
46///
47/// ```rust
48/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
49/// use bonsaidb_core::connection::AsyncStorageConnection;
50/// use bonsaidb_core::schema::Schema;
51/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
52/// use bonsaidb_local::{
53///     config::{Builder, StorageConfiguration},
54///     AsyncDatabase, AsyncStorage,
55/// };
56/// # async fn open<MySchema: Schema>() -> anyhow::Result<()> {
57/// // This creates a Storage instance, creates a database, and returns it.
58/// let db = AsyncDatabase::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb")).await?;
59///
60/// // This is the equivalent code being executed:
61/// let storage =
62///     AsyncStorage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)
63///         .await?;
64/// storage.create_database::<MySchema>("default", true).await?;
65/// let db = storage.database::<MySchema>("default").await?;
66/// #     Ok(())
67/// # }
68/// ```
69///
70/// ## Using multiple databases
71///
72/// This example shows how to use `AsyncStorage` to create and use multiple
73/// databases with multiple schemas:
74///
75/// ```rust
76/// use bonsaidb_core::connection::AsyncStorageConnection;
77/// use bonsaidb_core::schema::{Collection, Schema};
78/// use bonsaidb_local::config::{Builder, StorageConfiguration};
79/// use bonsaidb_local::AsyncStorage;
80/// use serde::{Deserialize, Serialize};
81///
82/// #[derive(Debug, Schema)]
83/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
84/// # #[schema(core = bonsaidb_core)]
85/// struct MySchema;
86///
87/// #[derive(Debug, Serialize, Deserialize, Collection)]
88/// #[collection(name = "blog-posts")]
89/// # #[collection(core = bonsaidb_core)]
90/// struct BlogPost {
91///     pub title: String,
92///     pub contents: String,
93///     pub author_id: u64,
94/// }
95///
96/// #[derive(Debug, Serialize, Deserialize, Collection)]
97/// #[collection(name = "blog-posts")]
98/// # #[collection(core = bonsaidb_core)]
99/// struct Author {
100///     pub name: String,
101/// }
102///
103/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
104/// let storage = AsyncStorage::open(
105///     StorageConfiguration::new("my-db.bonsaidb")
106///         .with_schema::<BlogPost>()?
107///         .with_schema::<MySchema>()?,
108/// )
109/// .await?;
110///
111/// storage
112///     .create_database::<BlogPost>("ectons-blog", true)
113///     .await?;
114/// let ectons_blog = storage.database::<BlogPost>("ectons-blog").await?;
115/// storage
116///     .create_database::<MySchema>("another-db", true)
117///     .await?;
118/// let another_db = storage.database::<MySchema>("another-db").await?;
119///
120/// #     Ok(())
121/// # }
122/// ```
123#[derive(Debug, Clone)]
124#[must_use]
125pub struct AsyncStorage {
126    pub(crate) storage: Storage,
127    pub(crate) runtime: Arc<tokio::runtime::Handle>,
128}
129
130impl AsyncStorage {
131    /// Creates or opens a multi-database [`AsyncStorage`] with its data stored in `directory`.
132    pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
133        tokio::task::spawn_blocking(move || Storage::open(configuration))
134            .await?
135            .map(Storage::into_async)
136    }
137
138    /// Restores all data from a previously stored backup `location`.
139    pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
140        let task_self = self.clone();
141        self.runtime
142            .spawn_blocking(move || task_self.storage.restore(&location))
143            .await?
144    }
145
146    /// Stores a copy of all data in this instance to `location`.
147    pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
148        let task_self = self.clone();
149        self.runtime
150            .spawn_blocking(move || task_self.storage.backup(&location))
151            .await?
152    }
153
154    /// Restricts an unauthenticated instance to having `effective_permissions`.
155    /// Returns `None` if a session has already been established.
156    #[must_use]
157    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
158        self.storage
159            .with_effective_permissions(effective_permissions)
160            .map(|storage| Self {
161                storage,
162                runtime: self.runtime.clone(),
163            })
164    }
165
166    #[cfg(feature = "internal-apis")]
167    #[doc(hidden)]
168    pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
169        let name = name.to_owned();
170        let task_self = self.clone();
171        self.runtime
172            .spawn_blocking(move || {
173                task_self
174                    .storage
175                    .database_without_schema(&name)
176                    .map(Database::into_async)
177            })
178            .await?
179    }
180
181    /// Converts this instance into its blocking version, which is able to be
182    /// used without async.
183    pub fn into_blocking(self) -> Storage {
184        self.storage
185    }
186
187    /// Converts this instance into its blocking version, which is able to be
188    /// used without async.
189    pub fn to_blocking(&self) -> Storage {
190        self.storage.clone()
191    }
192
193    /// Returns a reference to this instance's blocking version, which is able
194    /// to be used without async.
195    pub fn as_blocking(&self) -> &Storage {
196        &self.storage
197    }
198}
199
200impl<'a> From<&'a AsyncStorage> for Storage {
201    fn from(storage: &'a AsyncStorage) -> Self {
202        storage.to_blocking()
203    }
204}
205
206impl From<AsyncStorage> for Storage {
207    fn from(storage: AsyncStorage) -> Self {
208        storage.into_blocking()
209    }
210}
211
212impl StorageNonBlocking for AsyncStorage {
213    fn path(&self) -> &std::path::Path {
214        self.storage.path()
215    }
216
217    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
218        self.storage.assume_session(session).map(|storage| Self {
219            storage,
220            runtime: self.runtime.clone(),
221        })
222    }
223}
224
225/// A database stored in BonsaiDb. This type is designed for use with
226/// [Tokio](https://tokio.rs). For blocking (non-asynchronous) code, see the
227/// [`Database`] type instead.
228///
229/// ## Converting between Async and Blocking Types
230///
231/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
232/// using:
233///
234/// - [`AsyncDatabase::into_blocking()`]
235/// - [`AsyncDatabase::to_blocking()`]
236/// - [`AsyncDatabase::as_blocking()`]
237/// - [`Database::into_async()`]
238/// - [`Database::to_async()`]
239/// - [`Database::into_async_with_runtime()`]
240/// - [`Database::to_async_with_runtime()`]
241///
242/// ## Using `Database` to create a single database
243///
244/// `Database`provides an easy mechanism to open and access a single database:
245///
246/// ```rust
247/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
248/// use bonsaidb_core::schema::Collection;
249/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
250/// use bonsaidb_local::{
251///     config::{Builder, StorageConfiguration},
252///     AsyncDatabase,
253/// };
254/// use serde::{Deserialize, Serialize};
255///
256/// #[derive(Debug, Serialize, Deserialize, Collection)]
257/// #[collection(name = "blog-posts")]
258/// # #[collection(core = bonsaidb_core)]
259/// struct BlogPost {
260///     pub title: String,
261///     pub contents: String,
262/// }
263///
264/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
265/// let db = AsyncDatabase::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
266/// #     Ok(())
267/// # }
268/// ```
269///
270/// Under the hood, this initializes a [`AsyncStorage`] instance pointing at
271/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
272/// with the schema `BlogPost`.
273///
274/// In this example, `BlogPost` implements the
275/// [`Collection`](schema::Collection) trait, and all collections can be used as
276/// a [`Schema`].
277#[derive(Debug, Clone)]
278pub struct AsyncDatabase {
279    pub(crate) database: Database,
280    pub(crate) runtime: Arc<tokio::runtime::Handle>,
281}
282
283impl AsyncDatabase {
284    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
285    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
286        tokio::task::spawn_blocking(move || {
287            Database::open::<DB>(configuration).map(Database::into_async)
288        })
289        .await?
290    }
291
292    /// Restricts an unauthenticated instance to having `effective_permissions`.
293    /// Returns `None` if a session has already been established.
294    #[must_use]
295    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
296        self.database
297            .with_effective_permissions(effective_permissions)
298            .map(|database| Self {
299                database,
300                runtime: self.runtime.clone(),
301            })
302    }
303
304    /// Converts this instance into its blocking version, which is able to be
305    /// used without async.
306    #[must_use]
307    pub fn into_blocking(self) -> Database {
308        self.database
309    }
310
311    /// Converts this instance into its blocking version, which is able to be
312    /// used without async.
313    #[must_use]
314    pub fn to_blocking(&self) -> Database {
315        self.database.clone()
316    }
317
318    /// Returns a reference to this instance's blocking version, which is able
319    /// to be used without async.
320    #[must_use]
321    pub fn as_blocking(&self) -> &Database {
322        &self.database
323    }
324}
325
326impl From<AsyncDatabase> for Database {
327    fn from(database: AsyncDatabase) -> Self {
328        database.into_blocking()
329    }
330}
331
332impl<'a> From<&'a AsyncDatabase> for Database {
333    fn from(database: &'a AsyncDatabase) -> Self {
334        database.to_blocking()
335    }
336}
337
338impl DatabaseNonBlocking for AsyncDatabase {
339    fn name(&self) -> &str {
340        self.database.name()
341    }
342}
343
344impl HasSession for AsyncStorage {
345    fn session(&self) -> Option<&Session> {
346        self.storage.session()
347    }
348}
349
350#[async_trait]
351impl AsyncStorageConnection for AsyncStorage {
352    type Authenticated = Self;
353    type Database = AsyncDatabase;
354
355    async fn admin(&self) -> Self::Database {
356        let task_self = self.clone();
357
358        self.runtime
359            .spawn_blocking(move || task_self.storage.admin())
360            .await
361            .unwrap()
362            .into_async()
363    }
364
365    async fn create_database_with_schema(
366        &self,
367        name: &str,
368        schema: SchemaName,
369        only_if_needed: bool,
370    ) -> Result<(), bonsaidb_core::Error> {
371        let task_self = self.clone();
372        let name = name.to_owned();
373        self.runtime
374            .spawn_blocking(move || {
375                StorageConnection::create_database_with_schema(
376                    &task_self.storage,
377                    &name,
378                    schema,
379                    only_if_needed,
380                )
381            })
382            .await
383            .map_err(Error::from)?
384    }
385
386    async fn database<DB: Schema>(
387        &self,
388        name: &str,
389    ) -> Result<Self::Database, bonsaidb_core::Error> {
390        let task_self = self.clone();
391        let name = name.to_owned();
392        self.runtime
393            .spawn_blocking(move || {
394                task_self
395                    .storage
396                    .database::<DB>(&name)
397                    .map(Database::into_async)
398            })
399            .await
400            .map_err(Error::from)?
401    }
402
403    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
404        let task_self = self.clone();
405        let name = name.to_owned();
406        self.runtime
407            .spawn_blocking(move || task_self.storage.delete_database(&name))
408            .await
409            .map_err(Error::from)?
410    }
411
412    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
413        let task_self = self.clone();
414        self.runtime
415            .spawn_blocking(move || task_self.storage.list_databases())
416            .await
417            .map_err(Error::from)?
418    }
419
420    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
421        let task_self = self.clone();
422        self.runtime
423            .spawn_blocking(move || task_self.storage.list_available_schemas())
424            .await
425            .map_err(Error::from)?
426    }
427
428    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
429        let task_self = self.clone();
430        let username = username.to_owned();
431        self.runtime
432            .spawn_blocking(move || task_self.storage.create_user(&username))
433            .await
434            .map_err(Error::from)?
435    }
436
437    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
438        &self,
439        user: U,
440    ) -> Result<(), bonsaidb_core::Error> {
441        let task_self = self.clone();
442        let user = user.name()?.into_owned();
443        self.runtime
444            .spawn_blocking(move || task_self.storage.delete_user(user))
445            .await
446            .map_err(Error::from)?
447    }
448
449    #[cfg(feature = "password-hashing")]
450    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
451        &self,
452        user: U,
453        password: bonsaidb_core::connection::SensitiveString,
454    ) -> Result<(), bonsaidb_core::Error> {
455        let task_self = self.clone();
456        let user = user.name()?.into_owned();
457        self.runtime
458            .spawn_blocking(move || task_self.storage.set_user_password(user, password))
459            .await
460            .map_err(Error::from)?
461    }
462
463    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
464    async fn authenticate(
465        &self,
466        authentication: bonsaidb_core::connection::Authentication,
467    ) -> Result<Self, bonsaidb_core::Error> {
468        let task_self = self.clone();
469        self.runtime
470            .spawn_blocking(move || {
471                task_self
472                    .storage
473                    .authenticate(authentication)
474                    .map(Storage::into_async)
475            })
476            .await
477            .map_err(Error::from)?
478    }
479
480    async fn assume_identity(
481        &self,
482        identity: IdentityReference<'_>,
483    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
484        let task_self = self.clone();
485        let identity = identity.into_owned();
486        self.runtime
487            .spawn_blocking(move || {
488                task_self
489                    .storage
490                    .assume_identity(identity)
491                    .map(Storage::into_async)
492            })
493            .await
494            .map_err(Error::from)?
495    }
496
497    async fn add_permission_group_to_user<
498        'user,
499        'group,
500        U: Nameable<'user, u64> + Send + Sync,
501        G: Nameable<'group, u64> + Send + Sync,
502    >(
503        &self,
504        user: U,
505        permission_group: G,
506    ) -> Result<(), bonsaidb_core::Error> {
507        let task_self = self.clone();
508        let user = user.name()?.into_owned();
509        let group = permission_group.name()?.into_owned();
510        self.runtime
511            .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
512            .await
513            .map_err(Error::from)?
514    }
515
516    async fn remove_permission_group_from_user<
517        'user,
518        'group,
519        U: Nameable<'user, u64> + Send + Sync,
520        G: Nameable<'group, u64> + Send + Sync,
521    >(
522        &self,
523        user: U,
524        permission_group: G,
525    ) -> Result<(), bonsaidb_core::Error> {
526        let task_self = self.clone();
527        let user = user.name()?.into_owned();
528        let group = permission_group.name()?.into_owned();
529        self.runtime
530            .spawn_blocking(move || {
531                task_self
532                    .storage
533                    .remove_permission_group_from_user(user, group)
534            })
535            .await
536            .map_err(Error::from)?
537    }
538
539    async fn add_role_to_user<
540        'user,
541        'group,
542        U: Nameable<'user, u64> + Send + Sync,
543        G: Nameable<'group, u64> + Send + Sync,
544    >(
545        &self,
546        user: U,
547        role: G,
548    ) -> Result<(), bonsaidb_core::Error> {
549        let task_self = self.clone();
550        let user = user.name()?.into_owned();
551        let role = role.name()?.into_owned();
552        self.runtime
553            .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
554            .await
555            .map_err(Error::from)?
556    }
557
558    async fn remove_role_from_user<
559        'user,
560        'group,
561        U: Nameable<'user, u64> + Send + Sync,
562        G: Nameable<'group, u64> + Send + Sync,
563    >(
564        &self,
565        user: U,
566        role: G,
567    ) -> Result<(), bonsaidb_core::Error> {
568        let task_self = self.clone();
569        let user = user.name()?.into_owned();
570        let role = role.name()?.into_owned();
571        self.runtime
572            .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
573            .await
574            .map_err(Error::from)?
575    }
576}
577
578impl HasSession for AsyncDatabase {
579    fn session(&self) -> Option<&Session> {
580        self.database.session()
581    }
582}
583
584#[async_trait]
585impl AsyncConnection for AsyncDatabase {
586    type Storage = AsyncStorage;
587
588    fn storage(&self) -> Self::Storage {
589        AsyncStorage {
590            storage: self.database.storage(),
591            runtime: self.runtime.clone(),
592        }
593    }
594
595    async fn list_executed_transactions(
596        &self,
597        starting_id: Option<u64>,
598        result_limit: Option<u32>,
599    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
600        let task_self = self.clone();
601        self.runtime
602            .spawn_blocking(move || {
603                task_self
604                    .database
605                    .list_executed_transactions(starting_id, result_limit)
606            })
607            .await
608            .map_err(Error::from)?
609    }
610
611    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
612        Ok(self
613            .database
614            .roots()
615            .transactions()
616            .current_transaction_id())
617    }
618
619    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
620        let task_self = self.clone();
621        self.runtime
622            .spawn_blocking(move || Connection::compact(&task_self.database))
623            .await
624            .map_err(Error::from)?
625    }
626
627    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
628        let task_self = self.clone();
629        self.runtime
630            .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
631            .await
632            .map_err(Error::from)?
633    }
634
635    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
636        let task_self = self.clone();
637        self.runtime
638            .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
639            .await
640            .map_err(Error::from)?
641    }
642}
643
644#[async_trait]
645impl AsyncKeyValue for AsyncDatabase {
646    async fn execute_key_operation(
647        &self,
648        op: KeyOperation,
649    ) -> Result<Output, bonsaidb_core::Error> {
650        let task_self = self.clone();
651        self.runtime
652            .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
653            .await
654            .map_err(Error::from)?
655    }
656}
657
658#[async_trait]
659impl AsyncPubSub for AsyncDatabase {
660    type Subscriber = Subscriber;
661
662    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
663        PubSub::create_subscriber(&self.database)
664    }
665
666    async fn publish_bytes(
667        &self,
668        topic: Vec<u8>,
669        payload: Vec<u8>,
670    ) -> Result<(), bonsaidb_core::Error> {
671        PubSub::publish_bytes(&self.database, topic, payload)
672    }
673
674    async fn publish_bytes_to_all(
675        &self,
676        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
677        payload: Vec<u8>,
678    ) -> Result<(), bonsaidb_core::Error> {
679        PubSub::publish_bytes_to_all(&self.database, topics, payload)
680    }
681}
682
683#[async_trait]
684impl AsyncSubscriber for Subscriber {
685    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
686        pubsub::Subscriber::subscribe_to_bytes(self, topic)
687    }
688
689    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
690        pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
691    }
692
693    fn receiver(&self) -> &Receiver {
694        pubsub::Subscriber::receiver(self)
695    }
696}
697
698impl HasSchema for AsyncDatabase {
699    fn schematic(&self) -> &Schematic {
700        self.database.schematic()
701    }
702}
703
704#[async_trait]
705impl AsyncLowLevelConnection for AsyncDatabase {
706    async fn apply_transaction(
707        &self,
708        transaction: Transaction,
709    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
710        let task_self = self.clone();
711        self.runtime
712            .spawn_blocking(move || task_self.database.apply_transaction(transaction))
713            .await
714            .map_err(Error::from)?
715    }
716
717    async fn get_from_collection(
718        &self,
719        id: DocumentId,
720        collection: &CollectionName,
721    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
722        let task_self = self.clone();
723        let collection = collection.clone();
724        self.runtime
725            .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
726            .await
727            .map_err(Error::from)?
728    }
729
730    async fn list_from_collection(
731        &self,
732        ids: Range<DocumentId>,
733        order: Sort,
734        limit: Option<u32>,
735        collection: &CollectionName,
736    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
737        let task_self = self.clone();
738        let collection = collection.clone();
739        self.runtime
740            .spawn_blocking(move || {
741                task_self
742                    .database
743                    .list_from_collection(ids, order, limit, &collection)
744            })
745            .await
746            .map_err(Error::from)?
747    }
748
749    async fn list_headers_from_collection(
750        &self,
751        ids: Range<DocumentId>,
752        order: Sort,
753        limit: Option<u32>,
754        collection: &CollectionName,
755    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
756        let task_self = self.clone();
757        let collection = collection.clone();
758        self.runtime
759            .spawn_blocking(move || {
760                task_self
761                    .database
762                    .list_headers_from_collection(ids, order, limit, &collection)
763            })
764            .await
765            .map_err(Error::from)?
766    }
767
768    async fn count_from_collection(
769        &self,
770        ids: Range<DocumentId>,
771        collection: &CollectionName,
772    ) -> Result<u64, bonsaidb_core::Error> {
773        let task_self = self.clone();
774        let collection = collection.clone();
775        self.runtime
776            .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
777            .await
778            .map_err(Error::from)?
779    }
780
781    async fn get_multiple_from_collection(
782        &self,
783        ids: &[DocumentId],
784        collection: &CollectionName,
785    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
786        let task_self = self.clone();
787        // TODO avoid the allocation here, switch to IntoIterator.
788        let ids = ids.to_vec();
789        let collection = collection.clone();
790        self.runtime
791            .spawn_blocking(move || {
792                task_self
793                    .database
794                    .get_multiple_from_collection(&ids, &collection)
795            })
796            .await
797            .map_err(Error::from)?
798    }
799
800    async fn compact_collection_by_name(
801        &self,
802        collection: CollectionName,
803    ) -> Result<(), bonsaidb_core::Error> {
804        let task_self = self.clone();
805        self.runtime
806            .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
807            .await
808            .map_err(Error::from)?
809    }
810
811    async fn query_by_name(
812        &self,
813        view: &ViewName,
814        key: Option<SerializedQueryKey>,
815        order: Sort,
816        limit: Option<u32>,
817        access_policy: AccessPolicy,
818    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
819        let task_self = self.clone();
820        let view = view.clone();
821        self.runtime
822            .spawn_blocking(move || {
823                task_self
824                    .database
825                    .query_by_name(&view, key, order, limit, access_policy)
826            })
827            .await
828            .map_err(Error::from)?
829    }
830
831    async fn query_by_name_with_docs(
832        &self,
833        view: &ViewName,
834        key: Option<SerializedQueryKey>,
835        order: Sort,
836        limit: Option<u32>,
837        access_policy: AccessPolicy,
838    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
839        let task_self = self.clone();
840        let view = view.clone();
841        self.runtime
842            .spawn_blocking(move || {
843                task_self
844                    .database
845                    .query_by_name_with_docs(&view, key, order, limit, access_policy)
846            })
847            .await
848            .map_err(Error::from)?
849    }
850
851    async fn reduce_by_name(
852        &self,
853        view: &ViewName,
854        key: Option<SerializedQueryKey>,
855        access_policy: AccessPolicy,
856    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
857        let task_self = self.clone();
858        let view = view.clone();
859        self.runtime
860            .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
861            .await
862            .map_err(Error::from)?
863    }
864
865    async fn reduce_grouped_by_name(
866        &self,
867        view: &ViewName,
868        key: Option<SerializedQueryKey>,
869        access_policy: AccessPolicy,
870    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
871        let task_self = self.clone();
872        let view = view.clone();
873        self.runtime
874            .spawn_blocking(move || {
875                task_self
876                    .database
877                    .reduce_grouped_by_name(&view, key, access_policy)
878            })
879            .await
880            .map_err(Error::from)?
881    }
882
883    async fn delete_docs_by_name(
884        &self,
885        view: &ViewName,
886        key: Option<SerializedQueryKey>,
887        access_policy: AccessPolicy,
888    ) -> Result<u64, bonsaidb_core::Error> {
889        let task_self = self.clone();
890        let view = view.clone();
891        self.runtime
892            .spawn_blocking(move || {
893                task_self
894                    .database
895                    .delete_docs_by_name(&view, key, access_policy)
896            })
897            .await
898            .map_err(Error::from)?
899    }
900}