bonsaidb_local/
database.rs

1use std::borrow::{Borrow, Cow};
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::convert::Infallible;
4use std::ops::{self, Deref};
5use std::sync::Arc;
6use std::u8;
7
8use bonsaidb_core::arc_bytes::serde::CowBytes;
9use bonsaidb_core::arc_bytes::ArcBytes;
10use bonsaidb_core::connection::{
11    self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
12    SerializedQueryKey, Session, Sort, StorageConnection,
13};
14#[cfg(any(feature = "encryption", feature = "compression"))]
15use bonsaidb_core::document::KeyId;
16use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
17use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
18use bonsaidb_core::limits::{
19    LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
20};
21use bonsaidb_core::permissions::bonsai::{
22    collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
23    view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
24    ViewAction,
25};
26use bonsaidb_core::permissions::Permissions;
27use bonsaidb_core::schema::view::map::MappedSerializedValue;
28use bonsaidb_core::schema::view::{self};
29use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
30use bonsaidb_core::transaction::{
31    self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
32    Transaction,
33};
34use itertools::Itertools;
35use nebari::io::any::AnyFile;
36use nebari::tree::{
37    AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
38    Unversioned, Versioned,
39};
40use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
41use parking_lot::Mutex;
42use serde::{Deserialize, Serialize};
43use watchable::Watchable;
44
45use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
46use crate::database::keyvalue::BackgroundWorkerProcessTarget;
47use crate::error::Error;
48use crate::open_trees::OpenTrees;
49use crate::storage::StorageLock;
50#[cfg(feature = "encryption")]
51use crate::storage::TreeVault;
52use crate::views::{
53    mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54    ViewEntry,
55};
56use crate::Storage;
57
58pub mod keyvalue;
59
60pub(crate) mod compat;
61pub mod pubsub;
62
63/// A database stored in BonsaiDb. This type blocks the current thread when
64/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
65///
66/// ## Converting between Blocking and Async Types
67///
68/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
69/// using:
70///
71/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
72/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
73/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
74/// - [`Database::into_async()`]
75/// - [`Database::to_async()`]
76/// - [`Database::into_async_with_runtime()`]
77/// - [`Database::to_async_with_runtime()`]
78///
79/// ## Using `Database` to create a single database
80///
81/// `Database`provides an easy mechanism to open and access a single database:
82///
83/// ```rust
84/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
85/// use bonsaidb_core::schema::Collection;
86/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
87/// use bonsaidb_local::{
88///     config::{Builder, StorageConfiguration},
89///     Database,
90/// };
91/// use serde::{Deserialize, Serialize};
92///
93/// #[derive(Debug, Serialize, Deserialize, Collection)]
94/// #[collection(name = "blog-posts")]
95/// # #[collection(core = bonsaidb_core)]
96/// struct BlogPost {
97///     pub title: String,
98///     pub contents: String,
99/// }
100///
101/// # fn test() -> Result<(), bonsaidb_local::Error> {
102/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
103/// # Ok(())
104/// # }
105/// ```
106///
107/// Under the hood, this initializes a [`Storage`] instance pointing at
108/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
109/// with the schema `BlogPost`.
110///
111/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
112/// collections can be used as a [`Schema`].
113#[derive(Debug, Clone)]
114pub struct Database {
115    pub(crate) data: Arc<Data>,
116    pub(crate) storage: Storage,
117}
118
119#[derive(Debug)]
120pub struct Data {
121    pub name: Arc<Cow<'static, str>>,
122    context: Context,
123    pub(crate) schema: Arc<Schematic>,
124}
125
126impl Database {
127    /// Opens a local file as a bonsaidb.
128    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
129        name: S,
130        context: Context,
131        storage: &Storage,
132    ) -> Result<Self, Error> {
133        let name = name.into();
134        let schema = Arc::new(DB::schematic()?);
135        let db = Self {
136            storage: storage.clone(),
137            data: Arc::new(Data {
138                name: Arc::new(name),
139                context,
140                schema,
141            }),
142        };
143
144        if storage.instance.check_view_integrity_on_database_open() {
145            for view in db.data.schema.views() {
146                storage.instance.tasks().spawn_integrity_check(view, &db);
147            }
148        }
149
150        storage
151            .instance
152            .tasks()
153            .spawn_key_value_expiration_loader(&db);
154
155        Ok(db)
156    }
157
158    /// Restricts an unauthenticated instance to having `effective_permissions`.
159    /// Returns `None` if a session has already been established.
160    #[must_use]
161    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
162        self.storage
163            .with_effective_permissions(effective_permissions)
164            .map(|storage| Self {
165                storage,
166                data: self.data.clone(),
167            })
168    }
169
170    /// Creates a `Storage` with a single-database named "default" with its data
171    /// stored at `path`. This requires exclusive access to the storage location
172    /// configured. Attempting to open the same path multiple times concurrently
173    /// will lead to errors.
174    ///
175    /// Using this method is perfect if only one database is being used.
176    /// However, if multiple databases are needed, it is much better to store
177    /// multiple databases in a single [`Storage`] instance rather than creating
178    /// multiple independent databases using this method.
179    ///
180    /// When opening multiple databases using this function, each database will
181    /// have its own thread pool, cache, task worker pool, and more. By using a
182    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
183    /// perform better.
184    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
185        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
186
187        Ok(storage.create_database::<DB>("default", true)?)
188    }
189
190    /// Returns the [`Schematic`] for the schema for this database.
191    #[must_use]
192    pub fn schematic(&self) -> &'_ Schematic {
193        &self.data.schema
194    }
195
196    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
197        &self.data.context.roots
198    }
199
200    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
201        &self,
202        view: &dyn view::Serialized,
203        key: Option<SerializedQueryKey>,
204        order: Sort,
205        limit: Option<u32>,
206        access_policy: AccessPolicy,
207        mut callback: F,
208    ) -> Result<(), bonsaidb_core::Error> {
209        if matches!(access_policy, AccessPolicy::UpdateBefore) {
210            self.storage
211                .instance
212                .tasks()
213                .update_view_if_needed(view, self, true)?;
214        } else if let Some(integrity_check) = self
215            .storage
216            .instance
217            .tasks()
218            .spawn_integrity_check(view, self)
219        {
220            integrity_check
221                .receive()
222                .map_err(Error::from)?
223                .map_err(Error::from)?;
224        }
225
226        let view_entries = self
227            .roots()
228            .tree(self.collection_tree(
229                &view.collection(),
230                view_entries_tree_name(&view.view_name()),
231            )?)
232            .map_err(Error::from)?;
233
234        {
235            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
236                callback(entry)?;
237            }
238        }
239
240        if matches!(access_policy, AccessPolicy::UpdateAfter) {
241            let db = self.clone();
242            let view_name = view.view_name();
243            let view = db
244                .data
245                .schema
246                .view_by_name(&view_name)
247                .expect("query made with view that isn't registered with this database");
248            db.storage
249                .instance
250                .tasks()
251                .update_view_if_needed(view, &db, false)?;
252        }
253
254        Ok(())
255    }
256
257    fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
258        let mut open_trees = OpenTrees::default();
259        for op in &transaction.operations {
260            if self
261                .data
262                .schema
263                .collection_primary_key_description(&op.collection)
264                .is_none()
265            {
266                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267            }
268
269            #[cfg(any(feature = "encryption", feature = "compression"))]
270            let vault = if let Some(encryption_key) =
271                self.collection_encryption_key(&op.collection).cloned()
272            {
273                #[cfg(feature = "encryption")]
274                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275                    vault.key = Some(encryption_key);
276                    Some(vault)
277                } else {
278                    TreeVault::new_if_needed(
279                        Some(encryption_key),
280                        self.storage().vault(),
281                        #[cfg(feature = "compression")]
282                        None,
283                    )
284                }
285
286                #[cfg(not(feature = "encryption"))]
287                {
288                    drop(encryption_key);
289                    return Err(Error::EncryptionDisabled);
290                }
291            } else {
292                self.storage().tree_vault().cloned()
293            };
294
295            open_trees.open_trees_for_document_change(
296                &op.collection,
297                &self.data.schema,
298                #[cfg(any(feature = "encryption", feature = "compression"))]
299                vault,
300            );
301        }
302
303        Ok(open_trees)
304    }
305
306    fn apply_transaction_to_roots(
307        &self,
308        transaction: &Transaction,
309    ) -> Result<Vec<OperationResult>, Error> {
310        let open_trees = self.open_trees_for_transaction(transaction)?;
311
312        let mut roots_transaction = self
313            .data
314            .context
315            .roots
316            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
317
318        let mut results = Vec::new();
319        let mut changed_documents = Vec::new();
320        let mut collection_indexes = HashMap::new();
321        let mut collections = Vec::new();
322        for op in &transaction.operations {
323            let result = self.execute_operation(
324                op,
325                &mut roots_transaction,
326                &open_trees.trees_index_by_name,
327            )?;
328
329            if let Some((collection, id, deleted)) = match &result {
330                OperationResult::DocumentUpdated { header, collection } => {
331                    Some((collection, header.id.clone(), false))
332                }
333                OperationResult::DocumentDeleted { id, collection } => {
334                    Some((collection, id.clone(), true))
335                }
336                OperationResult::Success => None,
337            } {
338                let collection = match collection_indexes.get(collection) {
339                    Some(index) => *index,
340                    None => {
341                        if let Ok(id) = u16::try_from(collections.len()) {
342                            collection_indexes.insert(collection.clone(), id);
343                            collections.push(collection.clone());
344                            id
345                        } else {
346                            return Err(Error::TransactionTooLarge);
347                        }
348                    }
349                };
350                changed_documents.push(ChangedDocument {
351                    collection,
352                    id,
353                    deleted,
354                });
355            }
356            results.push(result);
357        }
358
359        self.invalidate_changed_documents(
360            &mut roots_transaction,
361            &open_trees,
362            &collections,
363            &changed_documents,
364        )?;
365
366        roots_transaction
367            .entry_mut()
368            .set_data(compat::serialize_executed_transaction_changes(
369                &Changes::Documents(DocumentChanges {
370                    collections,
371                    documents: changed_documents,
372                }),
373            )?)?;
374
375        roots_transaction.commit()?;
376
377        Ok(results)
378    }
379
380    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
381    fn invalidate_changed_documents(
382        &self,
383        roots_transaction: &mut ExecutingTransaction<AnyFile>,
384        open_trees: &OpenTrees,
385        collections: &[CollectionName],
386        changed_documents: &[ChangedDocument],
387    ) -> Result<(), Error> {
388        for (collection, changed_documents) in &changed_documents
389            .iter()
390            .group_by(|doc| &collections[usize::from(doc.collection)])
391        {
392            let mut views = self
393                .data
394                .schema
395                .views_in_collection(collection)
396                .filter(|view| !view.update_policy().is_eager())
397                .peekable();
398            if views.peek().is_some() {
399                let changed_documents = changed_documents.collect::<Vec<_>>();
400                for view in views {
401                    let view_name = view.view_name();
402                    let tree_name = view_invalidated_docs_tree_name(&view_name);
403                    for changed_document in &changed_documents {
404                        let mut invalidated_docs = roots_transaction
405                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
406                            .unwrap();
407                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
408                    }
409                }
410            }
411        }
412        Ok(())
413    }
414
415    fn execute_operation(
416        &self,
417        operation: &Operation,
418        transaction: &mut ExecutingTransaction<AnyFile>,
419        tree_index_map: &HashMap<String, usize>,
420    ) -> Result<OperationResult, Error> {
421        match &operation.command {
422            Command::Insert { id, contents } => {
423                self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
424            }
425            Command::Update { header, contents } => self.execute_update(
426                operation,
427                transaction,
428                tree_index_map,
429                &header.id,
430                Some(&header.revision),
431                contents,
432            ),
433            Command::Overwrite { id, contents } => {
434                self.execute_update(operation, transaction, tree_index_map, id, None, contents)
435            }
436            Command::Delete { header } => {
437                self.execute_delete(operation, transaction, tree_index_map, header)
438            }
439            Command::Check { id, revision } => Self::execute_check(
440                operation,
441                transaction,
442                tree_index_map,
443                id.clone(),
444                *revision,
445            ),
446        }
447    }
448
449    #[cfg_attr(
450        feature = "tracing",
451        tracing::instrument(
452            level = "trace",
453            skip(self, operation, transaction, tree_index_map, contents),
454            fields(
455                database = self.name(),
456                collection.name = operation.collection.name.as_ref(),
457                collection.authority = operation.collection.authority.as_ref()
458            )
459        )
460    )]
461    fn execute_update(
462        &self,
463        operation: &Operation,
464        transaction: &mut ExecutingTransaction<AnyFile>,
465        tree_index_map: &HashMap<String, usize>,
466        id: &DocumentId,
467        check_revision: Option<&Revision>,
468        contents: &[u8],
469    ) -> Result<OperationResult, crate::Error> {
470        let mut documents = transaction
471            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
472            .unwrap();
473        let document_id = ArcBytes::from(id.to_vec());
474        let mut result = None;
475        let mut updated = false;
476        documents.modify(
477            vec![document_id.clone()],
478            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
479                                                                        value: Option<
480                ArcBytes<'_>,
481            >| {
482                if let Some(old) = value {
483                    let doc = match deserialize_document(&old) {
484                        Ok(doc) => doc,
485                        Err(err) => {
486                            result = Some(Err(err));
487                            return nebari::tree::KeyOperation::Skip;
488                        }
489                    };
490                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
491                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
492                        {
493                            let updated_header = Header {
494                                id: id.clone(),
495                                revision: updated_revision,
496                            };
497                            let serialized_doc = match serialize_document(&BorrowedDocument {
498                                header: updated_header.clone(),
499                                contents: CowBytes::from(contents),
500                            }) {
501                                Ok(bytes) => bytes,
502                                Err(err) => {
503                                    result = Some(Err(Error::from(err)));
504                                    return nebari::tree::KeyOperation::Skip;
505                                }
506                            };
507                            result = Some(Ok(OperationResult::DocumentUpdated {
508                                collection: operation.collection.clone(),
509                                header: updated_header,
510                            }));
511                            updated = true;
512                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
513                        }
514
515                        // If no new revision was made, it means an attempt to
516                        // save a document with the same contents was made.
517                        // We'll return a success but not actually give a new
518                        // version
519                        result = Some(Ok(OperationResult::DocumentUpdated {
520                            collection: operation.collection.clone(),
521                            header: doc.header,
522                        }));
523                    } else {
524                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
525                            operation.collection.clone(),
526                            Box::new(doc.header),
527                        ))));
528                    }
529                } else if check_revision.is_none() {
530                    let doc = BorrowedDocument::new(id.clone(), contents);
531                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
532                        Ok((doc, serialized)) => {
533                            result = Some(Ok(OperationResult::DocumentUpdated {
534                                collection: operation.collection.clone(),
535                                header: doc.header,
536                            }));
537                            updated = true;
538                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
539                        }
540                        Err(err) => {
541                            result = Some(Err(Error::from(err)));
542                        }
543                    }
544                } else {
545                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
546                        operation.collection.clone(),
547                        Box::new(id.clone()),
548                    ))));
549                }
550                nebari::tree::KeyOperation::Skip
551            })),
552        )?;
553        drop(documents);
554
555        if updated {
556            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
557        }
558
559        result.expect("nebari should invoke the callback even when the key isn't found")
560    }
561
562    #[cfg_attr(
563        feature = "tracing",
564        tracing::instrument(
565            level = "trace",
566            skip(self, operation, transaction, tree_index_map, contents),
567            fields(
568                database = self.name(),
569                collection.name = operation.collection.name.as_ref(),
570                collection.authority = operation.collection.authority.as_ref()
571            )
572        )
573    )]
574    fn execute_insert(
575        &self,
576        operation: &Operation,
577        transaction: &mut ExecutingTransaction<AnyFile>,
578        tree_index_map: &HashMap<String, usize>,
579        id: Option<DocumentId>,
580        contents: &[u8],
581    ) -> Result<OperationResult, Error> {
582        let mut documents = transaction
583            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
584            .unwrap();
585        let id = if let Some(id) = id {
586            id
587        } else if let Some(last_key) = documents.last_key()? {
588            let id = DocumentId::try_from(last_key.as_slice())?;
589            self.data
590                .schema
591                .next_id_for_collection(&operation.collection, Some(id))?
592        } else {
593            self.data
594                .schema
595                .next_id_for_collection(&operation.collection, None)?
596        };
597
598        let doc = BorrowedDocument::new(id, contents);
599        let serialized: Vec<u8> = serialize_document(&doc)?;
600        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
601        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
602            let doc = deserialize_document(&document)?;
603            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
604                operation.collection.clone(),
605                Box::new(doc.header),
606            )))
607        } else {
608            drop(documents);
609            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
610
611            Ok(OperationResult::DocumentUpdated {
612                collection: operation.collection.clone(),
613                header: doc.header,
614            })
615        }
616    }
617
618    #[cfg_attr(feature = "tracing", tracing::instrument(
619        level = "trace",
620        skip(self, operation, transaction, tree_index_map),
621        fields(
622            database = self.name(),
623            collection.name = operation.collection.name.as_ref(),
624            collection.authority = operation.collection.authority.as_ref()
625        )
626    ))]
627    fn execute_delete(
628        &self,
629        operation: &Operation,
630        transaction: &mut ExecutingTransaction<AnyFile>,
631        tree_index_map: &HashMap<String, usize>,
632        header: &Header,
633    ) -> Result<OperationResult, Error> {
634        let mut documents = transaction
635            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
636            .unwrap();
637        if let Some(vec) = documents.remove(header.id.as_ref())? {
638            drop(documents);
639            let doc = deserialize_document(&vec)?;
640            if &doc.header == header {
641                self.update_eager_views(
642                    &ArcBytes::from(doc.header.id.to_vec()),
643                    operation,
644                    transaction,
645                    tree_index_map,
646                )?;
647
648                Ok(OperationResult::DocumentDeleted {
649                    collection: operation.collection.clone(),
650                    id: header.id.clone(),
651                })
652            } else {
653                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
654                    operation.collection.clone(),
655                    Box::new(header.clone()),
656                )))
657            }
658        } else {
659            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
660                operation.collection.clone(),
661                Box::new(header.id.clone()),
662            )))
663        }
664    }
665
666    #[cfg_attr(feature = "tracing", tracing::instrument(
667        level = "trace",
668        skip(self, operation, transaction, tree_index_map),
669        fields(
670            database = self.name(),
671            collection.name = operation.collection.name.as_ref(),
672            collection.authority = operation.collection.authority.as_ref()
673        )
674    ))]
675    fn update_eager_views(
676        &self,
677        document_id: &ArcBytes<'static>,
678        operation: &Operation,
679        transaction: &mut ExecutingTransaction<AnyFile>,
680        tree_index_map: &HashMap<String, usize>,
681    ) -> Result<(), Error> {
682        let mut eager_views = self
683            .data
684            .schema
685            .eager_views_in_collection(&operation.collection)
686            .peekable();
687        if eager_views.peek().is_some() {
688            let documents = transaction
689                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
690                .unwrap();
691            for view in eager_views {
692                let name = view.view_name();
693                let document_map = transaction
694                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
695                    .unwrap();
696                let view_entries = transaction
697                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
698                    .unwrap();
699                mapper::DocumentRequest {
700                    database: self,
701                    document_ids: vec![document_id.clone()],
702                    map_request: &mapper::Map {
703                        database: self.data.name.clone(),
704                        collection: operation.collection.clone(),
705                        view_name: name.clone(),
706                    },
707                    document_map,
708                    documents,
709                    view_entries,
710                    view,
711                }
712                .map()?;
713            }
714        }
715
716        Ok(())
717    }
718
719    #[cfg_attr(feature = "tracing", tracing::instrument(
720        level = "trace",
721        skip(operation, transaction, tree_index_map),
722        fields(
723            collection.name = operation.collection.name.as_ref(),
724            collection.authority = operation.collection.authority.as_ref(),
725        ),
726    ))]
727    fn execute_check(
728        operation: &Operation,
729        transaction: &mut ExecutingTransaction<AnyFile>,
730        tree_index_map: &HashMap<String, usize>,
731        id: DocumentId,
732        revision: Option<Revision>,
733    ) -> Result<OperationResult, Error> {
734        let mut documents = transaction
735            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
736            .unwrap();
737        if let Some(vec) = documents.get(id.as_ref())? {
738            drop(documents);
739
740            if let Some(revision) = revision {
741                let doc = deserialize_document(&vec)?;
742                if doc.header.revision != revision {
743                    return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
744                        operation.collection.clone(),
745                        Box::new(Header { id, revision }),
746                    )));
747                }
748            }
749
750            Ok(OperationResult::Success)
751        } else {
752            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
753                operation.collection.clone(),
754                Box::new(id),
755            )))
756        }
757    }
758
759    fn create_view_iterator(
760        view_entries: &Tree<Unversioned, AnyFile>,
761        key: Option<SerializedQueryKey>,
762        order: Sort,
763        limit: Option<u32>,
764    ) -> Result<Vec<ViewEntry>, Error> {
765        let mut values = Vec::new();
766        let forwards = match order {
767            Sort::Ascending => true,
768            Sort::Descending => false,
769        };
770        let mut values_read = 0;
771        if let Some(key) = key {
772            match key {
773                SerializedQueryKey::Range(range) => {
774                    view_entries.scan::<Infallible, _, _, _, _>(
775                        &range.map_ref(|bytes| &bytes[..]),
776                        forwards,
777                        |_, _, _| ScanEvaluation::ReadData,
778                        |_, _| {
779                            if let Some(limit) = limit {
780                                if values_read >= limit {
781                                    return ScanEvaluation::Stop;
782                                }
783                                values_read += 1;
784                            }
785                            ScanEvaluation::ReadData
786                        },
787                        |_key, _index, value| {
788                            values.push(value);
789                            Ok(())
790                        },
791                    )?;
792                }
793                SerializedQueryKey::Matches(key) => {
794                    values.extend(view_entries.get(&key)?);
795                }
796                SerializedQueryKey::Multiple(mut list) => {
797                    list.sort();
798
799                    values.extend(
800                        view_entries
801                            .get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
802                            .into_iter()
803                            .map(|(_, value)| value),
804                    );
805                }
806            }
807        } else {
808            view_entries.scan::<Infallible, _, _, _, _>(
809                &(..),
810                forwards,
811                |_, _, _| ScanEvaluation::ReadData,
812                |_, _| {
813                    if let Some(limit) = limit {
814                        if values_read >= limit {
815                            return ScanEvaluation::Stop;
816                        }
817                        values_read += 1;
818                    }
819                    ScanEvaluation::ReadData
820                },
821                |_, _, value| {
822                    values.push(value);
823                    Ok(())
824                },
825            )?;
826        }
827
828        values
829            .into_iter()
830            .map(|value| bincode::deserialize(&value).map_err(Error::from))
831            .collect::<Result<Vec<_>, Error>>()
832    }
833
834    #[cfg(any(feature = "encryption", feature = "compression"))]
835    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
836        self.schematic()
837            .encryption_key_for_collection(collection)
838            .or_else(|| self.storage.default_encryption_key())
839    }
840
841    #[cfg_attr(
842        not(feature = "encryption"),
843        allow(
844            unused_mut,
845            unused_variables,
846            clippy::unused_self,
847            clippy::let_and_return
848        )
849    )]
850    #[allow(clippy::unnecessary_wraps)]
851    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
852        &self,
853        collection: &CollectionName,
854        name: S,
855    ) -> Result<TreeRoot<R, AnyFile>, Error> {
856        let mut tree = R::tree(name);
857
858        #[cfg(any(feature = "encryption", feature = "compression"))]
859        match (
860            self.collection_encryption_key(collection),
861            self.storage().tree_vault().cloned(),
862        ) {
863            (Some(override_key), Some(mut vault)) => {
864                #[cfg(feature = "encryption")]
865                {
866                    vault.key = Some(override_key.clone());
867                    tree = tree.with_vault(vault);
868                }
869
870                #[cfg(not(feature = "encryption"))]
871                {
872                    return Err(Error::EncryptionDisabled);
873                }
874            }
875            (None, Some(vault)) => {
876                tree = tree.with_vault(vault);
877            }
878            (key, None) => {
879                #[cfg(feature = "encryption")]
880                if let Some(vault) = TreeVault::new_if_needed(
881                    key.cloned(),
882                    self.storage().vault(),
883                    #[cfg(feature = "compression")]
884                    None,
885                ) {
886                    tree = tree.with_vault(vault);
887                }
888
889                #[cfg(not(feature = "encryption"))]
890                if key.is_some() {
891                    return Err(Error::EncryptionDisabled);
892                }
893            }
894        }
895
896        Ok(tree)
897    }
898
899    pub(crate) fn update_key_expiration<'key>(
900        &self,
901        tree_key: impl Into<Cow<'key, str>>,
902        expiration: Option<Timestamp>,
903    ) {
904        self.data
905            .context
906            .update_key_expiration(tree_key, expiration);
907    }
908
909    /// Converts this instance into its blocking version, which is able to be
910    /// used without async. The returned instance uses the current Tokio runtime
911    /// handle to spawn blocking tasks.
912    ///
913    /// # Panics
914    ///
915    /// Panics if called outside the context of a Tokio runtime.
916    #[cfg(feature = "async")]
917    #[must_use]
918    pub fn into_async(self) -> crate::AsyncDatabase {
919        self.into_async_with_runtime(tokio::runtime::Handle::current())
920    }
921
922    /// Converts this instance into its blocking version, which is able to be
923    /// used without async. The returned instance uses the provided runtime
924    /// handle to spawn blocking tasks.
925    #[cfg(feature = "async")]
926    #[must_use]
927    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
928        crate::AsyncDatabase {
929            database: self,
930            runtime: Arc::new(runtime),
931        }
932    }
933
934    /// Converts this instance into its blocking version, which is able to be
935    /// used without async. The returned instance uses the current Tokio runtime
936    /// handle to spawn blocking tasks.
937    ///
938    /// # Panics
939    ///
940    /// Panics if called outside the context of a Tokio runtime.
941    #[cfg(feature = "async")]
942    #[must_use]
943    pub fn to_async(&self) -> crate::AsyncDatabase {
944        self.clone().into_async()
945    }
946
947    /// Converts this instance into its blocking version, which is able to be
948    /// used without async. The returned instance uses the provided runtime
949    /// handle to spawn blocking tasks.
950    #[cfg(feature = "async")]
951    #[must_use]
952    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
953        self.clone().into_async_with_runtime(runtime)
954    }
955}
956#[derive(Serialize, Deserialize)]
957struct LegacyHeader {
958    id: u64,
959    revision: Revision,
960}
961#[derive(Serialize, Deserialize)]
962struct LegacyDocument<'a> {
963    header: LegacyHeader,
964    #[serde(borrow)]
965    contents: &'a [u8],
966}
967
968pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
969    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
970        Ok(document) => Ok(document),
971        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
972            Ok(legacy_doc) => Ok(BorrowedDocument {
973                header: Header {
974                    id: DocumentId::from_u64(legacy_doc.header.id),
975                    revision: legacy_doc.header.revision,
976                },
977                contents: CowBytes::from(legacy_doc.contents),
978            }),
979            Err(_) => Err(Error::from(err)),
980        },
981    }
982}
983
984fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
985    pot::to_vec(document)
986        .map_err(Error::from)
987        .map_err(bonsaidb_core::Error::from)
988}
989
990impl HasSession for Database {
991    fn session(&self) -> Option<&Session> {
992        self.storage.session()
993    }
994}
995
996impl Connection for Database {
997    type Storage = Storage;
998
999    fn storage(&self) -> Self::Storage {
1000        self.storage.clone()
1001    }
1002
1003    #[cfg_attr(feature = "tracing", tracing::instrument(
1004        level = "trace",
1005        skip(self),
1006        fields(
1007            database = self.name(),
1008        )
1009    ))]
1010    fn list_executed_transactions(
1011        &self,
1012        starting_id: Option<u64>,
1013        result_limit: Option<u32>,
1014    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
1015        self.check_permission(
1016            database_resource_name(self.name()),
1017            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
1018        )?;
1019        let result_limit = usize::try_from(
1020            result_limit
1021                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
1022                .min(LIST_TRANSACTIONS_MAX_RESULTS),
1023        )
1024        .unwrap();
1025        if result_limit > 0 {
1026            let range = if let Some(starting_id) = starting_id {
1027                Range::from(starting_id..)
1028            } else {
1029                Range::from(..)
1030            };
1031
1032            let mut entries = Vec::new();
1033            self.roots()
1034                .transactions()
1035                .scan(range, |entry| {
1036                    if entry.data().is_some() {
1037                        entries.push(entry);
1038                    }
1039                    entries.len() < result_limit
1040                })
1041                .map_err(Error::from)?;
1042
1043            entries
1044                .into_iter()
1045                .map(|entry| {
1046                    if let Some(data) = entry.data() {
1047                        let changes = compat::deserialize_executed_transaction_changes(data)?;
1048                        Ok(Some(transaction::Executed {
1049                            id: entry.id,
1050                            changes,
1051                        }))
1052                    } else {
1053                        Ok(None)
1054                    }
1055                })
1056                .filter_map(Result::transpose)
1057                .collect::<Result<Vec<_>, Error>>()
1058                .map_err(bonsaidb_core::Error::from)
1059        } else {
1060            // A request was made to return an empty result? This should probably be
1061            // an error, but technically this is a correct response.
1062            Ok(Vec::default())
1063        }
1064    }
1065
1066    #[cfg_attr(feature = "tracing", tracing::instrument(
1067        level = "trace",
1068        skip(self),
1069        fields(
1070            database = self.name(),
1071        )
1072    ))]
1073    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
1074        self.check_permission(
1075            database_resource_name(self.name()),
1076            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
1077        )?;
1078        Ok(self.roots().transactions().current_transaction_id())
1079    }
1080
1081    #[cfg_attr(feature = "tracing", tracing::instrument(
1082        level = "trace",
1083        skip(self),
1084        fields(
1085            database = self.name(),
1086        )
1087    ))]
1088    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
1089        self.check_permission(
1090            database_resource_name(self.name()),
1091            &BonsaiAction::Database(DatabaseAction::Compact),
1092        )?;
1093        self.storage()
1094            .instance
1095            .tasks()
1096            .compact_database(self.clone())?;
1097        Ok(())
1098    }
1099
1100    #[cfg_attr(feature = "tracing", tracing::instrument(
1101        level = "trace",
1102        skip(self),
1103        fields(
1104            database = self.name(),
1105        )
1106    ))]
1107    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
1108        self.check_permission(
1109            kv_resource_name(self.name()),
1110            &BonsaiAction::Database(DatabaseAction::Compact),
1111        )?;
1112        self.storage()
1113            .instance
1114            .tasks()
1115            .compact_key_value_store(self.clone())?;
1116        Ok(())
1117    }
1118}
1119
1120impl LowLevelConnection for Database {
1121    #[cfg_attr(feature = "tracing", tracing::instrument(
1122        level = "trace",
1123        skip(self,  transaction),
1124        fields(
1125            database = self.name(),
1126        )
1127    ))]
1128    fn apply_transaction(
1129        &self,
1130        transaction: Transaction,
1131    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1132        for op in &transaction.operations {
1133            let (resource, action) = match &op.command {
1134                Command::Insert { .. } => (
1135                    collection_resource_name(self.name(), &op.collection),
1136                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
1137                ),
1138                Command::Update { header, .. } => (
1139                    document_resource_name(self.name(), &op.collection, &header.id),
1140                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
1141                ),
1142                Command::Overwrite { id, .. } => (
1143                    document_resource_name(self.name(), &op.collection, id),
1144                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
1145                ),
1146                Command::Delete { header } => (
1147                    document_resource_name(self.name(), &op.collection, &header.id),
1148                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
1149                ),
1150                Command::Check { id, .. } => (
1151                    document_resource_name(self.name(), &op.collection, id),
1152                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1153                ),
1154            };
1155            self.check_permission(resource, &action)?;
1156        }
1157
1158        let mut eager_view_tasks = Vec::new();
1159        for collection_name in transaction
1160            .operations
1161            .iter()
1162            .map(|op| &op.collection)
1163            .collect::<HashSet<_>>()
1164        {
1165            for view in self.data.schema.eager_views_in_collection(collection_name) {
1166                if let Some(task) = self
1167                    .storage
1168                    .instance
1169                    .tasks()
1170                    .spawn_integrity_check(view, self)
1171                {
1172                    eager_view_tasks.push(task);
1173                }
1174            }
1175        }
1176
1177        let mut eager_view_mapping_tasks = Vec::new();
1178        for task in eager_view_tasks {
1179            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
1180                eager_view_mapping_tasks.push(spawned_task);
1181            }
1182        }
1183
1184        for task in eager_view_mapping_tasks {
1185            let mut task = task.lock();
1186            if let Some(task) = task.take() {
1187                task.receive().map_err(Error::from)?.map_err(Error::from)?;
1188            }
1189        }
1190
1191        self.apply_transaction_to_roots(&transaction)
1192            .map_err(bonsaidb_core::Error::from)
1193    }
1194
1195    #[cfg_attr(feature = "tracing", tracing::instrument(
1196        level = "trace",
1197        skip(self, collection),
1198        fields(
1199            database = self.name(),
1200            collection.name = collection.name.as_ref(),
1201            collection.authority = collection.authority.as_ref(),
1202        )
1203    ))]
1204    fn get_from_collection(
1205        &self,
1206        id: DocumentId,
1207        collection: &CollectionName,
1208    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
1209        self.check_permission(
1210            document_resource_name(self.name(), collection, &id),
1211            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1212        )?;
1213        let tree = self
1214            .data
1215            .context
1216            .roots
1217            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1218            .map_err(Error::from)?;
1219        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
1220            Ok(Some(deserialize_document(&vec)?.into_owned()))
1221        } else {
1222            Ok(None)
1223        }
1224    }
1225
1226    #[cfg_attr(feature = "tracing", tracing::instrument(
1227        level = "trace",
1228        skip(self, collection),
1229        fields(
1230            database = self.name(),
1231            collection.name = collection.name.as_ref(),
1232            collection.authority = collection.authority.as_ref(),
1233        )
1234    ))]
1235    fn list_from_collection(
1236        &self,
1237        ids: Range<DocumentId>,
1238        sort: Sort,
1239        limit: Option<u32>,
1240        collection: &CollectionName,
1241    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
1242        self.check_permission(
1243            collection_resource_name(self.name(), collection),
1244            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
1245        )?;
1246        let tree = self
1247            .data
1248            .context
1249            .roots
1250            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1251            .map_err(Error::from)?;
1252        let mut found_docs = Vec::new();
1253        let mut keys_read = 0;
1254        let ids = DocumentIdRange(ids);
1255        tree.scan(
1256            &ids.borrow_as_bytes(),
1257            match sort {
1258                Sort::Ascending => true,
1259                Sort::Descending => false,
1260            },
1261            |_, _, _| ScanEvaluation::ReadData,
1262            |_, _| {
1263                if let Some(limit) = limit {
1264                    if keys_read >= limit {
1265                        return ScanEvaluation::Stop;
1266                    }
1267
1268                    keys_read += 1;
1269                }
1270                ScanEvaluation::ReadData
1271            },
1272            |_, _, doc| {
1273                found_docs.push(
1274                    deserialize_document(&doc)
1275                        .map(BorrowedDocument::into_owned)
1276                        .map_err(AbortError::Other)?,
1277                );
1278                Ok(())
1279            },
1280        )
1281        .map_err(|err| match err {
1282            AbortError::Other(err) => err,
1283            AbortError::Nebari(err) => crate::Error::from(err),
1284        })?;
1285
1286        Ok(found_docs)
1287    }
1288
1289    #[cfg_attr(feature = "tracing", tracing::instrument(
1290        level = "trace",
1291        skip(self, collection),
1292        fields(
1293            database = self.name(),
1294            collection.name = collection.name.as_ref(),
1295            collection.authority = collection.authority.as_ref(),
1296        )
1297    ))]
1298    fn list_headers_from_collection(
1299        &self,
1300        ids: Range<DocumentId>,
1301        sort: Sort,
1302        limit: Option<u32>,
1303        collection: &CollectionName,
1304    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
1305        self.check_permission(
1306            collection_resource_name(self.name(), collection),
1307            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
1308        )?;
1309        let tree = self
1310            .data
1311            .context
1312            .roots
1313            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1314            .map_err(Error::from)?;
1315        let mut found_headers = Vec::new();
1316        let mut keys_read = 0;
1317        let ids = DocumentIdRange(ids);
1318        tree.scan(
1319            &ids.borrow_as_bytes(),
1320            match sort {
1321                Sort::Ascending => true,
1322                Sort::Descending => false,
1323            },
1324            |_, _, _| ScanEvaluation::ReadData,
1325            |_, _| {
1326                if let Some(limit) = limit {
1327                    if keys_read >= limit {
1328                        return ScanEvaluation::Stop;
1329                    }
1330
1331                    keys_read += 1;
1332                }
1333                ScanEvaluation::ReadData
1334            },
1335            |_, _, doc| {
1336                found_headers.push(
1337                    deserialize_document(&doc)
1338                        .map(|doc| doc.header)
1339                        .map_err(AbortError::Other)?,
1340                );
1341                Ok(())
1342            },
1343        )
1344        .map_err(|err| match err {
1345            AbortError::Other(err) => err,
1346            AbortError::Nebari(err) => crate::Error::from(err),
1347        })?;
1348
1349        Ok(found_headers)
1350    }
1351
1352    #[cfg_attr(feature = "tracing", tracing::instrument(
1353        level = "trace",
1354        skip(self, collection),
1355        fields(
1356            database = self.name(),
1357            collection.name = collection.name.as_ref(),
1358            collection.authority = collection.authority.as_ref(),
1359        )
1360    ))]
1361    fn count_from_collection(
1362        &self,
1363        ids: Range<DocumentId>,
1364        collection: &CollectionName,
1365    ) -> Result<u64, bonsaidb_core::Error> {
1366        self.check_permission(
1367            collection_resource_name(self.name(), collection),
1368            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
1369        )?;
1370        let tree = self
1371            .data
1372            .context
1373            .roots
1374            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1375            .map_err(Error::from)?;
1376        let ids = DocumentIdRange(ids);
1377        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;
1378
1379        Ok(stats.alive_keys)
1380    }
1381
1382    #[cfg_attr(feature = "tracing", tracing::instrument(
1383        level = "trace",
1384        skip(self, collection),
1385        fields(
1386            database = self.name(),
1387            collection.name = collection.name.as_ref(),
1388            collection.authority = collection.authority.as_ref(),
1389        )
1390    ))]
1391    fn get_multiple_from_collection(
1392        &self,
1393        ids: &[DocumentId],
1394        collection: &CollectionName,
1395    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
1396        for id in ids {
1397            self.check_permission(
1398                document_resource_name(self.name(), collection, id),
1399                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1400            )?;
1401        }
1402        let mut ids = ids.to_vec();
1403        let collection = collection.clone();
1404        let tree = self
1405            .data
1406            .context
1407            .roots
1408            .tree(
1409                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
1410            )
1411            .map_err(Error::from)?;
1412        ids.sort();
1413        let keys_and_values = tree
1414            .get_multiple(ids.iter().map(|id| id.as_ref()))
1415            .map_err(Error::from)?;
1416
1417        keys_and_values
1418            .into_iter()
1419            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
1420            .collect::<Result<Vec<_>, Error>>()
1421            .map_err(bonsaidb_core::Error::from)
1422    }
1423
1424    #[cfg_attr(feature = "tracing", tracing::instrument(
1425        level = "trace",
1426        skip(self, collection),
1427        fields(
1428            database = self.name(),
1429            collection.name = collection.name.as_ref(),
1430            collection.authority = collection.authority.as_ref(),
1431        )
1432    ))]
1433    fn compact_collection_by_name(
1434        &self,
1435        collection: CollectionName,
1436    ) -> Result<(), bonsaidb_core::Error> {
1437        self.check_permission(
1438            collection_resource_name(self.name(), &collection),
1439            &BonsaiAction::Database(DatabaseAction::Compact),
1440        )?;
1441        self.storage()
1442            .instance
1443            .tasks()
1444            .compact_collection(self.clone(), collection)?;
1445        Ok(())
1446    }
1447
1448    #[cfg_attr(feature = "tracing", tracing::instrument(
1449        level = "trace",
1450        skip(self, view),
1451        fields(
1452            database = self.name(),
1453            view.collection.name = view.collection.name.as_ref(),
1454            view.collection.authority = view.collection.authority.as_ref(),
1455            view.name = view.name.as_ref(),
1456        )
1457    ))]
1458    fn query_by_name(
1459        &self,
1460        view: &ViewName,
1461        key: Option<SerializedQueryKey>,
1462        order: Sort,
1463        limit: Option<u32>,
1464        access_policy: AccessPolicy,
1465    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
1466        let view = self.schematic().view_by_name(view)?;
1467        self.check_permission(
1468            view_resource_name(self.name(), &view.view_name()),
1469            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
1470        )?;
1471        let mut results = Vec::new();
1472        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
1473            for mapping in entry.mappings {
1474                results.push(bonsaidb_core::schema::view::map::Serialized {
1475                    source: mapping.source,
1476                    key: entry.key.clone(),
1477                    value: mapping.value,
1478                });
1479            }
1480            Ok(())
1481        })?;
1482
1483        Ok(results)
1484    }
1485
1486    #[cfg_attr(feature = "tracing", tracing::instrument(
1487        level = "trace",
1488        skip(self, view),
1489        fields(
1490            database = self.name(),
1491            view.collection.name = view.collection.name.as_ref(),
1492            view.collection.authority = view.collection.authority.as_ref(),
1493            view.name = view.name.as_ref(),
1494        )
1495    ))]
1496    fn query_by_name_with_docs(
1497        &self,
1498        view: &ViewName,
1499        key: Option<SerializedQueryKey>,
1500        order: Sort,
1501        limit: Option<u32>,
1502        access_policy: AccessPolicy,
1503    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
1504        let results = self.query_by_name(view, key, order, limit, access_policy)?;
1505        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
1506
1507        let documents = self
1508            .get_multiple_from_collection(
1509                &results
1510                    .iter()
1511                    .map(|m| m.source.id.clone())
1512                    .collect::<Vec<_>>(),
1513                &view.collection(),
1514            )?
1515            .into_iter()
1516            .map(|doc| (doc.header.id.clone(), doc))
1517            .collect::<BTreeMap<_, _>>();
1518
1519        Ok(
1520            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
1521                mappings: results,
1522                documents,
1523            },
1524        )
1525    }
1526
1527    #[cfg_attr(feature = "tracing", tracing::instrument(
1528        level = "trace",
1529        skip(self, view_name),
1530        fields(
1531            database = self.name(),
1532            view.collection.name = view_name.collection.name.as_ref(),
1533            view.collection.authority = view_name.collection.authority.as_ref(),
1534            view.name = view_name.name.as_ref(),
1535        )
1536    ))]
1537    fn reduce_by_name(
1538        &self,
1539        view_name: &ViewName,
1540        key: Option<SerializedQueryKey>,
1541        access_policy: AccessPolicy,
1542    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
1543        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;
1544
1545        let result = if mappings.len() == 1 {
1546            mappings.pop().unwrap().value.into_vec()
1547        } else {
1548            let view = self.data.schema.view_by_name(view_name)?;
1549            view.reduce(
1550                &mappings
1551                    .iter()
1552                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
1553                    .collect::<Vec<_>>(),
1554                true,
1555            )
1556            .map_err(Error::from)?
1557        };
1558
1559        Ok(result)
1560    }
1561
1562    #[cfg_attr(feature = "tracing", tracing::instrument(
1563        level = "trace",
1564        skip(self, view_name),
1565        fields(
1566            database = self.name(),
1567            view.collection.name = view_name.collection.name.as_ref(),
1568            view.collection.authority = view_name.collection.authority.as_ref(),
1569            view.name = view_name.name.as_ref(),
1570        )
1571    ))]
1572    fn reduce_grouped_by_name(
1573        &self,
1574        view_name: &ViewName,
1575        key: Option<SerializedQueryKey>,
1576        access_policy: AccessPolicy,
1577    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
1578        let view = self.data.schema.view_by_name(view_name)?;
1579        self.check_permission(
1580            view_resource_name(self.name(), &view.view_name()),
1581            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
1582        )?;
1583        let mut mappings = Vec::new();
1584        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
1585            mappings.push(MappedSerializedValue {
1586                key: entry.key,
1587                value: entry.reduced_value,
1588            });
1589            Ok(())
1590        })?;
1591
1592        Ok(mappings)
1593    }
1594
1595    #[cfg_attr(feature = "tracing", tracing::instrument(
1596        level = "trace",
1597        skip(self, view),
1598        fields(
1599            database = self.name(),
1600            view.collection.name = view.collection.name.as_ref(),
1601            view.collection.authority = view.collection.authority.as_ref(),
1602            view.name = view.name.as_ref(),
1603        )
1604    ))]
1605    fn delete_docs_by_name(
1606        &self,
1607        view: &ViewName,
1608        key: Option<SerializedQueryKey>,
1609        access_policy: AccessPolicy,
1610    ) -> Result<u64, bonsaidb_core::Error> {
1611        let view = self.data.schema.view_by_name(view)?;
1612        let collection = view.collection();
1613        let mut transaction = Transaction::default();
1614        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
1615            for mapping in entry.mappings {
1616                transaction.push(Operation::delete(collection.clone(), mapping.source));
1617            }
1618
1619            Ok(())
1620        })?;
1621
1622        let results = LowLevelConnection::apply_transaction(self, transaction)?;
1623
1624        Ok(results.len() as u64)
1625    }
1626}
1627
1628impl HasSchema for Database {
1629    fn schematic(&self) -> &Schematic {
1630        &self.data.schema
1631    }
1632}
1633
1634type ViewIterator<'a> =
1635    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;
1636
1637struct ViewEntryCollectionIterator<'a> {
1638    iterator: ViewIterator<'a>,
1639}
1640
1641impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
1642    type Item = Result<ViewEntry, crate::Error>;
1643
1644    fn next(&mut self) -> Option<Self::Item> {
1645        self.iterator.next().map(|item| {
1646            item.map_err(crate::Error::from)
1647                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
1648        })
1649    }
1650}
1651
1652#[derive(Debug, Clone)]
1653pub(crate) struct Context {
1654    data: Arc<ContextData>,
1655}
1656
1657impl Deref for Context {
1658    type Target = ContextData;
1659
1660    fn deref(&self) -> &Self::Target {
1661        &self.data
1662    }
1663}
1664
1665#[derive(Debug)]
1666pub(crate) struct ContextData {
1667    pub(crate) roots: Roots<AnyFile>,
1668    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
1669}
1670
1671impl Borrow<Roots<AnyFile>> for Context {
1672    fn borrow(&self) -> &Roots<AnyFile> {
1673        &self.data.roots
1674    }
1675}
1676
1677impl Context {
1678    pub(crate) fn new(
1679        roots: Roots<AnyFile>,
1680        key_value_persistence: KeyValuePersistence,
1681        storage_lock: Option<StorageLock>,
1682    ) -> Self {
1683        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
1684        let mut background_worker_target_watcher = background_worker_target.watch();
1685        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
1686            key_value_persistence,
1687            roots.clone(),
1688            background_worker_target,
1689        )));
1690        let background_worker_state = Arc::downgrade(&key_value_state);
1691        let context = Self {
1692            data: Arc::new(ContextData {
1693                roots,
1694                key_value_state,
1695            }),
1696        };
1697        std::thread::Builder::new()
1698            .name(String::from("keyvalue-worker"))
1699            .spawn(move || {
1700                keyvalue::background_worker(
1701                    &background_worker_state,
1702                    &mut background_worker_target_watcher,
1703                    storage_lock,
1704                );
1705            })
1706            .unwrap();
1707        context
1708    }
1709
1710    pub(crate) fn perform_kv_operation(
1711        &self,
1712        op: KeyOperation,
1713    ) -> Result<Output, bonsaidb_core::Error> {
1714        let mut state = self.data.key_value_state.lock();
1715        state.perform_kv_operation(op, &self.data.key_value_state)
1716    }
1717
1718    pub(crate) fn update_key_expiration<'key>(
1719        &self,
1720        tree_key: impl Into<Cow<'key, str>>,
1721        expiration: Option<Timestamp>,
1722    ) {
1723        let mut state = self.data.key_value_state.lock();
1724        state.update_key_expiration(tree_key, expiration);
1725    }
1726
1727    #[cfg(test)]
1728    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
1729        let state = self.data.key_value_state.lock();
1730        state.persistence_watcher()
1731    }
1732}
1733
1734impl Drop for ContextData {
1735    fn drop(&mut self) {
1736        if let Some(shutdown) = {
1737            let mut state = self.key_value_state.lock();
1738            state.shutdown(&self.key_value_state)
1739        } {
1740            let _: Result<_, _> = shutdown.recv();
1741        }
1742    }
1743}
1744
1745pub fn document_tree_name(collection: &CollectionName) -> String {
1746    format!("collection.{collection:#}")
1747}
1748
1749pub struct DocumentIdRange(Range<DocumentId>);
1750
1751impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1752    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1753        BorrowedRange {
1754            start: match &self.0.start {
1755                connection::Bound::Unbounded => ops::Bound::Unbounded,
1756                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
1757                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
1758            },
1759            end: match &self.0.end {
1760                connection::Bound::Unbounded => ops::Bound::Unbounded,
1761                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
1762                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
1763            },
1764        }
1765    }
1766}
1767
1768/// Operations that can be performed on both [`Database`] and
1769/// [`AsyncDatabase`](crate::AsyncDatabase).
1770pub trait DatabaseNonBlocking {
1771    /// Returns the name of the database.
1772    #[must_use]
1773    fn name(&self) -> &str;
1774}
1775
1776impl DatabaseNonBlocking for Database {
1777    fn name(&self) -> &str {
1778        self.data.name.as_ref()
1779    }
1780}