1use std::borrow::{Borrow, Cow};
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::convert::Infallible;
4use std::ops::{self, Deref};
5use std::sync::Arc;
6use std::u8;
7
8use bonsaidb_core::arc_bytes::serde::CowBytes;
9use bonsaidb_core::arc_bytes::ArcBytes;
10use bonsaidb_core::connection::{
11 self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
12 SerializedQueryKey, Session, Sort, StorageConnection,
13};
14#[cfg(any(feature = "encryption", feature = "compression"))]
15use bonsaidb_core::document::KeyId;
16use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
17use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
18use bonsaidb_core::limits::{
19 LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
20};
21use bonsaidb_core::permissions::bonsai::{
22 collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
23 view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
24 ViewAction,
25};
26use bonsaidb_core::permissions::Permissions;
27use bonsaidb_core::schema::view::map::MappedSerializedValue;
28use bonsaidb_core::schema::view::{self};
29use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
30use bonsaidb_core::transaction::{
31 self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
32 Transaction,
33};
34use itertools::Itertools;
35use nebari::io::any::AnyFile;
36use nebari::tree::{
37 AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
38 Unversioned, Versioned,
39};
40use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
41use parking_lot::Mutex;
42use serde::{Deserialize, Serialize};
43use watchable::Watchable;
44
45use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
46use crate::database::keyvalue::BackgroundWorkerProcessTarget;
47use crate::error::Error;
48use crate::open_trees::OpenTrees;
49use crate::storage::StorageLock;
50#[cfg(feature = "encryption")]
51use crate::storage::TreeVault;
52use crate::views::{
53 mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54 ViewEntry,
55};
56use crate::Storage;
57
58pub mod keyvalue;
59
60pub(crate) mod compat;
61pub mod pubsub;
62
63#[derive(Debug, Clone)]
114pub struct Database {
115 pub(crate) data: Arc<Data>,
116 pub(crate) storage: Storage,
117}
118
119#[derive(Debug)]
120pub struct Data {
121 pub name: Arc<Cow<'static, str>>,
122 context: Context,
123 pub(crate) schema: Arc<Schematic>,
124}
125
126impl Database {
127 pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
129 name: S,
130 context: Context,
131 storage: &Storage,
132 ) -> Result<Self, Error> {
133 let name = name.into();
134 let schema = Arc::new(DB::schematic()?);
135 let db = Self {
136 storage: storage.clone(),
137 data: Arc::new(Data {
138 name: Arc::new(name),
139 context,
140 schema,
141 }),
142 };
143
144 if storage.instance.check_view_integrity_on_database_open() {
145 for view in db.data.schema.views() {
146 storage.instance.tasks().spawn_integrity_check(view, &db);
147 }
148 }
149
150 storage
151 .instance
152 .tasks()
153 .spawn_key_value_expiration_loader(&db);
154
155 Ok(db)
156 }
157
158 #[must_use]
161 pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
162 self.storage
163 .with_effective_permissions(effective_permissions)
164 .map(|storage| Self {
165 storage,
166 data: self.data.clone(),
167 })
168 }
169
170 pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
185 let storage = Storage::open(configuration.with_schema::<DB>()?)?;
186
187 Ok(storage.create_database::<DB>("default", true)?)
188 }
189
190 #[must_use]
192 pub fn schematic(&self) -> &'_ Schematic {
193 &self.data.schema
194 }
195
196 pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
197 &self.data.context.roots
198 }
199
200 fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
201 &self,
202 view: &dyn view::Serialized,
203 key: Option<SerializedQueryKey>,
204 order: Sort,
205 limit: Option<u32>,
206 access_policy: AccessPolicy,
207 mut callback: F,
208 ) -> Result<(), bonsaidb_core::Error> {
209 if matches!(access_policy, AccessPolicy::UpdateBefore) {
210 self.storage
211 .instance
212 .tasks()
213 .update_view_if_needed(view, self, true)?;
214 } else if let Some(integrity_check) = self
215 .storage
216 .instance
217 .tasks()
218 .spawn_integrity_check(view, self)
219 {
220 integrity_check
221 .receive()
222 .map_err(Error::from)?
223 .map_err(Error::from)?;
224 }
225
226 let view_entries = self
227 .roots()
228 .tree(self.collection_tree(
229 &view.collection(),
230 view_entries_tree_name(&view.view_name()),
231 )?)
232 .map_err(Error::from)?;
233
234 {
235 for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
236 callback(entry)?;
237 }
238 }
239
240 if matches!(access_policy, AccessPolicy::UpdateAfter) {
241 let db = self.clone();
242 let view_name = view.view_name();
243 let view = db
244 .data
245 .schema
246 .view_by_name(&view_name)
247 .expect("query made with view that isn't registered with this database");
248 db.storage
249 .instance
250 .tasks()
251 .update_view_if_needed(view, &db, false)?;
252 }
253
254 Ok(())
255 }
256
257 fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
258 let mut open_trees = OpenTrees::default();
259 for op in &transaction.operations {
260 if self
261 .data
262 .schema
263 .collection_primary_key_description(&op.collection)
264 .is_none()
265 {
266 return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267 }
268
269 #[cfg(any(feature = "encryption", feature = "compression"))]
270 let vault = if let Some(encryption_key) =
271 self.collection_encryption_key(&op.collection).cloned()
272 {
273 #[cfg(feature = "encryption")]
274 if let Some(mut vault) = self.storage().tree_vault().cloned() {
275 vault.key = Some(encryption_key);
276 Some(vault)
277 } else {
278 TreeVault::new_if_needed(
279 Some(encryption_key),
280 self.storage().vault(),
281 #[cfg(feature = "compression")]
282 None,
283 )
284 }
285
286 #[cfg(not(feature = "encryption"))]
287 {
288 drop(encryption_key);
289 return Err(Error::EncryptionDisabled);
290 }
291 } else {
292 self.storage().tree_vault().cloned()
293 };
294
295 open_trees.open_trees_for_document_change(
296 &op.collection,
297 &self.data.schema,
298 #[cfg(any(feature = "encryption", feature = "compression"))]
299 vault,
300 );
301 }
302
303 Ok(open_trees)
304 }
305
306 fn apply_transaction_to_roots(
307 &self,
308 transaction: &Transaction,
309 ) -> Result<Vec<OperationResult>, Error> {
310 let open_trees = self.open_trees_for_transaction(transaction)?;
311
312 let mut roots_transaction = self
313 .data
314 .context
315 .roots
316 .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
317
318 let mut results = Vec::new();
319 let mut changed_documents = Vec::new();
320 let mut collection_indexes = HashMap::new();
321 let mut collections = Vec::new();
322 for op in &transaction.operations {
323 let result = self.execute_operation(
324 op,
325 &mut roots_transaction,
326 &open_trees.trees_index_by_name,
327 )?;
328
329 if let Some((collection, id, deleted)) = match &result {
330 OperationResult::DocumentUpdated { header, collection } => {
331 Some((collection, header.id.clone(), false))
332 }
333 OperationResult::DocumentDeleted { id, collection } => {
334 Some((collection, id.clone(), true))
335 }
336 OperationResult::Success => None,
337 } {
338 let collection = match collection_indexes.get(collection) {
339 Some(index) => *index,
340 None => {
341 if let Ok(id) = u16::try_from(collections.len()) {
342 collection_indexes.insert(collection.clone(), id);
343 collections.push(collection.clone());
344 id
345 } else {
346 return Err(Error::TransactionTooLarge);
347 }
348 }
349 };
350 changed_documents.push(ChangedDocument {
351 collection,
352 id,
353 deleted,
354 });
355 }
356 results.push(result);
357 }
358
359 self.invalidate_changed_documents(
360 &mut roots_transaction,
361 &open_trees,
362 &collections,
363 &changed_documents,
364 )?;
365
366 roots_transaction
367 .entry_mut()
368 .set_data(compat::serialize_executed_transaction_changes(
369 &Changes::Documents(DocumentChanges {
370 collections,
371 documents: changed_documents,
372 }),
373 )?)?;
374
375 roots_transaction.commit()?;
376
377 Ok(results)
378 }
379
380 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
381 fn invalidate_changed_documents(
382 &self,
383 roots_transaction: &mut ExecutingTransaction<AnyFile>,
384 open_trees: &OpenTrees,
385 collections: &[CollectionName],
386 changed_documents: &[ChangedDocument],
387 ) -> Result<(), Error> {
388 for (collection, changed_documents) in &changed_documents
389 .iter()
390 .group_by(|doc| &collections[usize::from(doc.collection)])
391 {
392 let mut views = self
393 .data
394 .schema
395 .views_in_collection(collection)
396 .filter(|view| !view.update_policy().is_eager())
397 .peekable();
398 if views.peek().is_some() {
399 let changed_documents = changed_documents.collect::<Vec<_>>();
400 for view in views {
401 let view_name = view.view_name();
402 let tree_name = view_invalidated_docs_tree_name(&view_name);
403 for changed_document in &changed_documents {
404 let mut invalidated_docs = roots_transaction
405 .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
406 .unwrap();
407 invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
408 }
409 }
410 }
411 }
412 Ok(())
413 }
414
415 fn execute_operation(
416 &self,
417 operation: &Operation,
418 transaction: &mut ExecutingTransaction<AnyFile>,
419 tree_index_map: &HashMap<String, usize>,
420 ) -> Result<OperationResult, Error> {
421 match &operation.command {
422 Command::Insert { id, contents } => {
423 self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
424 }
425 Command::Update { header, contents } => self.execute_update(
426 operation,
427 transaction,
428 tree_index_map,
429 &header.id,
430 Some(&header.revision),
431 contents,
432 ),
433 Command::Overwrite { id, contents } => {
434 self.execute_update(operation, transaction, tree_index_map, id, None, contents)
435 }
436 Command::Delete { header } => {
437 self.execute_delete(operation, transaction, tree_index_map, header)
438 }
439 Command::Check { id, revision } => Self::execute_check(
440 operation,
441 transaction,
442 tree_index_map,
443 id.clone(),
444 *revision,
445 ),
446 }
447 }
448
449 #[cfg_attr(
450 feature = "tracing",
451 tracing::instrument(
452 level = "trace",
453 skip(self, operation, transaction, tree_index_map, contents),
454 fields(
455 database = self.name(),
456 collection.name = operation.collection.name.as_ref(),
457 collection.authority = operation.collection.authority.as_ref()
458 )
459 )
460 )]
461 fn execute_update(
462 &self,
463 operation: &Operation,
464 transaction: &mut ExecutingTransaction<AnyFile>,
465 tree_index_map: &HashMap<String, usize>,
466 id: &DocumentId,
467 check_revision: Option<&Revision>,
468 contents: &[u8],
469 ) -> Result<OperationResult, crate::Error> {
470 let mut documents = transaction
471 .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
472 .unwrap();
473 let document_id = ArcBytes::from(id.to_vec());
474 let mut result = None;
475 let mut updated = false;
476 documents.modify(
477 vec![document_id.clone()],
478 nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
479 value: Option<
480 ArcBytes<'_>,
481 >| {
482 if let Some(old) = value {
483 let doc = match deserialize_document(&old) {
484 Ok(doc) => doc,
485 Err(err) => {
486 result = Some(Err(err));
487 return nebari::tree::KeyOperation::Skip;
488 }
489 };
490 if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
491 if let Some(updated_revision) = doc.header.revision.next_revision(contents)
492 {
493 let updated_header = Header {
494 id: id.clone(),
495 revision: updated_revision,
496 };
497 let serialized_doc = match serialize_document(&BorrowedDocument {
498 header: updated_header.clone(),
499 contents: CowBytes::from(contents),
500 }) {
501 Ok(bytes) => bytes,
502 Err(err) => {
503 result = Some(Err(Error::from(err)));
504 return nebari::tree::KeyOperation::Skip;
505 }
506 };
507 result = Some(Ok(OperationResult::DocumentUpdated {
508 collection: operation.collection.clone(),
509 header: updated_header,
510 }));
511 updated = true;
512 return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
513 }
514
515 result = Some(Ok(OperationResult::DocumentUpdated {
520 collection: operation.collection.clone(),
521 header: doc.header,
522 }));
523 } else {
524 result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
525 operation.collection.clone(),
526 Box::new(doc.header),
527 ))));
528 }
529 } else if check_revision.is_none() {
530 let doc = BorrowedDocument::new(id.clone(), contents);
531 match serialize_document(&doc).map(|bytes| (doc, bytes)) {
532 Ok((doc, serialized)) => {
533 result = Some(Ok(OperationResult::DocumentUpdated {
534 collection: operation.collection.clone(),
535 header: doc.header,
536 }));
537 updated = true;
538 return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
539 }
540 Err(err) => {
541 result = Some(Err(Error::from(err)));
542 }
543 }
544 } else {
545 result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
546 operation.collection.clone(),
547 Box::new(id.clone()),
548 ))));
549 }
550 nebari::tree::KeyOperation::Skip
551 })),
552 )?;
553 drop(documents);
554
555 if updated {
556 self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
557 }
558
559 result.expect("nebari should invoke the callback even when the key isn't found")
560 }
561
562 #[cfg_attr(
563 feature = "tracing",
564 tracing::instrument(
565 level = "trace",
566 skip(self, operation, transaction, tree_index_map, contents),
567 fields(
568 database = self.name(),
569 collection.name = operation.collection.name.as_ref(),
570 collection.authority = operation.collection.authority.as_ref()
571 )
572 )
573 )]
574 fn execute_insert(
575 &self,
576 operation: &Operation,
577 transaction: &mut ExecutingTransaction<AnyFile>,
578 tree_index_map: &HashMap<String, usize>,
579 id: Option<DocumentId>,
580 contents: &[u8],
581 ) -> Result<OperationResult, Error> {
582 let mut documents = transaction
583 .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
584 .unwrap();
585 let id = if let Some(id) = id {
586 id
587 } else if let Some(last_key) = documents.last_key()? {
588 let id = DocumentId::try_from(last_key.as_slice())?;
589 self.data
590 .schema
591 .next_id_for_collection(&operation.collection, Some(id))?
592 } else {
593 self.data
594 .schema
595 .next_id_for_collection(&operation.collection, None)?
596 };
597
598 let doc = BorrowedDocument::new(id, contents);
599 let serialized: Vec<u8> = serialize_document(&doc)?;
600 let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
601 if let Some(document) = documents.replace(document_id.clone(), serialized)? {
602 let doc = deserialize_document(&document)?;
603 Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
604 operation.collection.clone(),
605 Box::new(doc.header),
606 )))
607 } else {
608 drop(documents);
609 self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
610
611 Ok(OperationResult::DocumentUpdated {
612 collection: operation.collection.clone(),
613 header: doc.header,
614 })
615 }
616 }
617
618 #[cfg_attr(feature = "tracing", tracing::instrument(
619 level = "trace",
620 skip(self, operation, transaction, tree_index_map),
621 fields(
622 database = self.name(),
623 collection.name = operation.collection.name.as_ref(),
624 collection.authority = operation.collection.authority.as_ref()
625 )
626 ))]
627 fn execute_delete(
628 &self,
629 operation: &Operation,
630 transaction: &mut ExecutingTransaction<AnyFile>,
631 tree_index_map: &HashMap<String, usize>,
632 header: &Header,
633 ) -> Result<OperationResult, Error> {
634 let mut documents = transaction
635 .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
636 .unwrap();
637 if let Some(vec) = documents.remove(header.id.as_ref())? {
638 drop(documents);
639 let doc = deserialize_document(&vec)?;
640 if &doc.header == header {
641 self.update_eager_views(
642 &ArcBytes::from(doc.header.id.to_vec()),
643 operation,
644 transaction,
645 tree_index_map,
646 )?;
647
648 Ok(OperationResult::DocumentDeleted {
649 collection: operation.collection.clone(),
650 id: header.id.clone(),
651 })
652 } else {
653 Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
654 operation.collection.clone(),
655 Box::new(header.clone()),
656 )))
657 }
658 } else {
659 Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
660 operation.collection.clone(),
661 Box::new(header.id.clone()),
662 )))
663 }
664 }
665
666 #[cfg_attr(feature = "tracing", tracing::instrument(
667 level = "trace",
668 skip(self, operation, transaction, tree_index_map),
669 fields(
670 database = self.name(),
671 collection.name = operation.collection.name.as_ref(),
672 collection.authority = operation.collection.authority.as_ref()
673 )
674 ))]
675 fn update_eager_views(
676 &self,
677 document_id: &ArcBytes<'static>,
678 operation: &Operation,
679 transaction: &mut ExecutingTransaction<AnyFile>,
680 tree_index_map: &HashMap<String, usize>,
681 ) -> Result<(), Error> {
682 let mut eager_views = self
683 .data
684 .schema
685 .eager_views_in_collection(&operation.collection)
686 .peekable();
687 if eager_views.peek().is_some() {
688 let documents = transaction
689 .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
690 .unwrap();
691 for view in eager_views {
692 let name = view.view_name();
693 let document_map = transaction
694 .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
695 .unwrap();
696 let view_entries = transaction
697 .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
698 .unwrap();
699 mapper::DocumentRequest {
700 database: self,
701 document_ids: vec![document_id.clone()],
702 map_request: &mapper::Map {
703 database: self.data.name.clone(),
704 collection: operation.collection.clone(),
705 view_name: name.clone(),
706 },
707 document_map,
708 documents,
709 view_entries,
710 view,
711 }
712 .map()?;
713 }
714 }
715
716 Ok(())
717 }
718
719 #[cfg_attr(feature = "tracing", tracing::instrument(
720 level = "trace",
721 skip(operation, transaction, tree_index_map),
722 fields(
723 collection.name = operation.collection.name.as_ref(),
724 collection.authority = operation.collection.authority.as_ref(),
725 ),
726 ))]
727 fn execute_check(
728 operation: &Operation,
729 transaction: &mut ExecutingTransaction<AnyFile>,
730 tree_index_map: &HashMap<String, usize>,
731 id: DocumentId,
732 revision: Option<Revision>,
733 ) -> Result<OperationResult, Error> {
734 let mut documents = transaction
735 .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
736 .unwrap();
737 if let Some(vec) = documents.get(id.as_ref())? {
738 drop(documents);
739
740 if let Some(revision) = revision {
741 let doc = deserialize_document(&vec)?;
742 if doc.header.revision != revision {
743 return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
744 operation.collection.clone(),
745 Box::new(Header { id, revision }),
746 )));
747 }
748 }
749
750 Ok(OperationResult::Success)
751 } else {
752 Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
753 operation.collection.clone(),
754 Box::new(id),
755 )))
756 }
757 }
758
759 fn create_view_iterator(
760 view_entries: &Tree<Unversioned, AnyFile>,
761 key: Option<SerializedQueryKey>,
762 order: Sort,
763 limit: Option<u32>,
764 ) -> Result<Vec<ViewEntry>, Error> {
765 let mut values = Vec::new();
766 let forwards = match order {
767 Sort::Ascending => true,
768 Sort::Descending => false,
769 };
770 let mut values_read = 0;
771 if let Some(key) = key {
772 match key {
773 SerializedQueryKey::Range(range) => {
774 view_entries.scan::<Infallible, _, _, _, _>(
775 &range.map_ref(|bytes| &bytes[..]),
776 forwards,
777 |_, _, _| ScanEvaluation::ReadData,
778 |_, _| {
779 if let Some(limit) = limit {
780 if values_read >= limit {
781 return ScanEvaluation::Stop;
782 }
783 values_read += 1;
784 }
785 ScanEvaluation::ReadData
786 },
787 |_key, _index, value| {
788 values.push(value);
789 Ok(())
790 },
791 )?;
792 }
793 SerializedQueryKey::Matches(key) => {
794 values.extend(view_entries.get(&key)?);
795 }
796 SerializedQueryKey::Multiple(mut list) => {
797 list.sort();
798
799 values.extend(
800 view_entries
801 .get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
802 .into_iter()
803 .map(|(_, value)| value),
804 );
805 }
806 }
807 } else {
808 view_entries.scan::<Infallible, _, _, _, _>(
809 &(..),
810 forwards,
811 |_, _, _| ScanEvaluation::ReadData,
812 |_, _| {
813 if let Some(limit) = limit {
814 if values_read >= limit {
815 return ScanEvaluation::Stop;
816 }
817 values_read += 1;
818 }
819 ScanEvaluation::ReadData
820 },
821 |_, _, value| {
822 values.push(value);
823 Ok(())
824 },
825 )?;
826 }
827
828 values
829 .into_iter()
830 .map(|value| bincode::deserialize(&value).map_err(Error::from))
831 .collect::<Result<Vec<_>, Error>>()
832 }
833
834 #[cfg(any(feature = "encryption", feature = "compression"))]
835 pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
836 self.schematic()
837 .encryption_key_for_collection(collection)
838 .or_else(|| self.storage.default_encryption_key())
839 }
840
841 #[cfg_attr(
842 not(feature = "encryption"),
843 allow(
844 unused_mut,
845 unused_variables,
846 clippy::unused_self,
847 clippy::let_and_return
848 )
849 )]
850 #[allow(clippy::unnecessary_wraps)]
851 pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
852 &self,
853 collection: &CollectionName,
854 name: S,
855 ) -> Result<TreeRoot<R, AnyFile>, Error> {
856 let mut tree = R::tree(name);
857
858 #[cfg(any(feature = "encryption", feature = "compression"))]
859 match (
860 self.collection_encryption_key(collection),
861 self.storage().tree_vault().cloned(),
862 ) {
863 (Some(override_key), Some(mut vault)) => {
864 #[cfg(feature = "encryption")]
865 {
866 vault.key = Some(override_key.clone());
867 tree = tree.with_vault(vault);
868 }
869
870 #[cfg(not(feature = "encryption"))]
871 {
872 return Err(Error::EncryptionDisabled);
873 }
874 }
875 (None, Some(vault)) => {
876 tree = tree.with_vault(vault);
877 }
878 (key, None) => {
879 #[cfg(feature = "encryption")]
880 if let Some(vault) = TreeVault::new_if_needed(
881 key.cloned(),
882 self.storage().vault(),
883 #[cfg(feature = "compression")]
884 None,
885 ) {
886 tree = tree.with_vault(vault);
887 }
888
889 #[cfg(not(feature = "encryption"))]
890 if key.is_some() {
891 return Err(Error::EncryptionDisabled);
892 }
893 }
894 }
895
896 Ok(tree)
897 }
898
899 pub(crate) fn update_key_expiration<'key>(
900 &self,
901 tree_key: impl Into<Cow<'key, str>>,
902 expiration: Option<Timestamp>,
903 ) {
904 self.data
905 .context
906 .update_key_expiration(tree_key, expiration);
907 }
908
909 #[cfg(feature = "async")]
917 #[must_use]
918 pub fn into_async(self) -> crate::AsyncDatabase {
919 self.into_async_with_runtime(tokio::runtime::Handle::current())
920 }
921
922 #[cfg(feature = "async")]
926 #[must_use]
927 pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
928 crate::AsyncDatabase {
929 database: self,
930 runtime: Arc::new(runtime),
931 }
932 }
933
934 #[cfg(feature = "async")]
942 #[must_use]
943 pub fn to_async(&self) -> crate::AsyncDatabase {
944 self.clone().into_async()
945 }
946
947 #[cfg(feature = "async")]
951 #[must_use]
952 pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
953 self.clone().into_async_with_runtime(runtime)
954 }
955}
956#[derive(Serialize, Deserialize)]
957struct LegacyHeader {
958 id: u64,
959 revision: Revision,
960}
961#[derive(Serialize, Deserialize)]
962struct LegacyDocument<'a> {
963 header: LegacyHeader,
964 #[serde(borrow)]
965 contents: &'a [u8],
966}
967
968pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
969 match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
970 Ok(document) => Ok(document),
971 Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
972 Ok(legacy_doc) => Ok(BorrowedDocument {
973 header: Header {
974 id: DocumentId::from_u64(legacy_doc.header.id),
975 revision: legacy_doc.header.revision,
976 },
977 contents: CowBytes::from(legacy_doc.contents),
978 }),
979 Err(_) => Err(Error::from(err)),
980 },
981 }
982}
983
984fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
985 pot::to_vec(document)
986 .map_err(Error::from)
987 .map_err(bonsaidb_core::Error::from)
988}
989
990impl HasSession for Database {
991 fn session(&self) -> Option<&Session> {
992 self.storage.session()
993 }
994}
995
996impl Connection for Database {
997 type Storage = Storage;
998
999 fn storage(&self) -> Self::Storage {
1000 self.storage.clone()
1001 }
1002
1003 #[cfg_attr(feature = "tracing", tracing::instrument(
1004 level = "trace",
1005 skip(self),
1006 fields(
1007 database = self.name(),
1008 )
1009 ))]
1010 fn list_executed_transactions(
1011 &self,
1012 starting_id: Option<u64>,
1013 result_limit: Option<u32>,
1014 ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
1015 self.check_permission(
1016 database_resource_name(self.name()),
1017 &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
1018 )?;
1019 let result_limit = usize::try_from(
1020 result_limit
1021 .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
1022 .min(LIST_TRANSACTIONS_MAX_RESULTS),
1023 )
1024 .unwrap();
1025 if result_limit > 0 {
1026 let range = if let Some(starting_id) = starting_id {
1027 Range::from(starting_id..)
1028 } else {
1029 Range::from(..)
1030 };
1031
1032 let mut entries = Vec::new();
1033 self.roots()
1034 .transactions()
1035 .scan(range, |entry| {
1036 if entry.data().is_some() {
1037 entries.push(entry);
1038 }
1039 entries.len() < result_limit
1040 })
1041 .map_err(Error::from)?;
1042
1043 entries
1044 .into_iter()
1045 .map(|entry| {
1046 if let Some(data) = entry.data() {
1047 let changes = compat::deserialize_executed_transaction_changes(data)?;
1048 Ok(Some(transaction::Executed {
1049 id: entry.id,
1050 changes,
1051 }))
1052 } else {
1053 Ok(None)
1054 }
1055 })
1056 .filter_map(Result::transpose)
1057 .collect::<Result<Vec<_>, Error>>()
1058 .map_err(bonsaidb_core::Error::from)
1059 } else {
1060 Ok(Vec::default())
1063 }
1064 }
1065
1066 #[cfg_attr(feature = "tracing", tracing::instrument(
1067 level = "trace",
1068 skip(self),
1069 fields(
1070 database = self.name(),
1071 )
1072 ))]
1073 fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
1074 self.check_permission(
1075 database_resource_name(self.name()),
1076 &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
1077 )?;
1078 Ok(self.roots().transactions().current_transaction_id())
1079 }
1080
1081 #[cfg_attr(feature = "tracing", tracing::instrument(
1082 level = "trace",
1083 skip(self),
1084 fields(
1085 database = self.name(),
1086 )
1087 ))]
1088 fn compact(&self) -> Result<(), bonsaidb_core::Error> {
1089 self.check_permission(
1090 database_resource_name(self.name()),
1091 &BonsaiAction::Database(DatabaseAction::Compact),
1092 )?;
1093 self.storage()
1094 .instance
1095 .tasks()
1096 .compact_database(self.clone())?;
1097 Ok(())
1098 }
1099
1100 #[cfg_attr(feature = "tracing", tracing::instrument(
1101 level = "trace",
1102 skip(self),
1103 fields(
1104 database = self.name(),
1105 )
1106 ))]
1107 fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
1108 self.check_permission(
1109 kv_resource_name(self.name()),
1110 &BonsaiAction::Database(DatabaseAction::Compact),
1111 )?;
1112 self.storage()
1113 .instance
1114 .tasks()
1115 .compact_key_value_store(self.clone())?;
1116 Ok(())
1117 }
1118}
1119
1120impl LowLevelConnection for Database {
1121 #[cfg_attr(feature = "tracing", tracing::instrument(
1122 level = "trace",
1123 skip(self, transaction),
1124 fields(
1125 database = self.name(),
1126 )
1127 ))]
1128 fn apply_transaction(
1129 &self,
1130 transaction: Transaction,
1131 ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1132 for op in &transaction.operations {
1133 let (resource, action) = match &op.command {
1134 Command::Insert { .. } => (
1135 collection_resource_name(self.name(), &op.collection),
1136 BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
1137 ),
1138 Command::Update { header, .. } => (
1139 document_resource_name(self.name(), &op.collection, &header.id),
1140 BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
1141 ),
1142 Command::Overwrite { id, .. } => (
1143 document_resource_name(self.name(), &op.collection, id),
1144 BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
1145 ),
1146 Command::Delete { header } => (
1147 document_resource_name(self.name(), &op.collection, &header.id),
1148 BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
1149 ),
1150 Command::Check { id, .. } => (
1151 document_resource_name(self.name(), &op.collection, id),
1152 BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1153 ),
1154 };
1155 self.check_permission(resource, &action)?;
1156 }
1157
1158 let mut eager_view_tasks = Vec::new();
1159 for collection_name in transaction
1160 .operations
1161 .iter()
1162 .map(|op| &op.collection)
1163 .collect::<HashSet<_>>()
1164 {
1165 for view in self.data.schema.eager_views_in_collection(collection_name) {
1166 if let Some(task) = self
1167 .storage
1168 .instance
1169 .tasks()
1170 .spawn_integrity_check(view, self)
1171 {
1172 eager_view_tasks.push(task);
1173 }
1174 }
1175 }
1176
1177 let mut eager_view_mapping_tasks = Vec::new();
1178 for task in eager_view_tasks {
1179 if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
1180 eager_view_mapping_tasks.push(spawned_task);
1181 }
1182 }
1183
1184 for task in eager_view_mapping_tasks {
1185 let mut task = task.lock();
1186 if let Some(task) = task.take() {
1187 task.receive().map_err(Error::from)?.map_err(Error::from)?;
1188 }
1189 }
1190
1191 self.apply_transaction_to_roots(&transaction)
1192 .map_err(bonsaidb_core::Error::from)
1193 }
1194
1195 #[cfg_attr(feature = "tracing", tracing::instrument(
1196 level = "trace",
1197 skip(self, collection),
1198 fields(
1199 database = self.name(),
1200 collection.name = collection.name.as_ref(),
1201 collection.authority = collection.authority.as_ref(),
1202 )
1203 ))]
1204 fn get_from_collection(
1205 &self,
1206 id: DocumentId,
1207 collection: &CollectionName,
1208 ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
1209 self.check_permission(
1210 document_resource_name(self.name(), collection, &id),
1211 &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1212 )?;
1213 let tree = self
1214 .data
1215 .context
1216 .roots
1217 .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1218 .map_err(Error::from)?;
1219 if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
1220 Ok(Some(deserialize_document(&vec)?.into_owned()))
1221 } else {
1222 Ok(None)
1223 }
1224 }
1225
1226 #[cfg_attr(feature = "tracing", tracing::instrument(
1227 level = "trace",
1228 skip(self, collection),
1229 fields(
1230 database = self.name(),
1231 collection.name = collection.name.as_ref(),
1232 collection.authority = collection.authority.as_ref(),
1233 )
1234 ))]
1235 fn list_from_collection(
1236 &self,
1237 ids: Range<DocumentId>,
1238 sort: Sort,
1239 limit: Option<u32>,
1240 collection: &CollectionName,
1241 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
1242 self.check_permission(
1243 collection_resource_name(self.name(), collection),
1244 &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
1245 )?;
1246 let tree = self
1247 .data
1248 .context
1249 .roots
1250 .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1251 .map_err(Error::from)?;
1252 let mut found_docs = Vec::new();
1253 let mut keys_read = 0;
1254 let ids = DocumentIdRange(ids);
1255 tree.scan(
1256 &ids.borrow_as_bytes(),
1257 match sort {
1258 Sort::Ascending => true,
1259 Sort::Descending => false,
1260 },
1261 |_, _, _| ScanEvaluation::ReadData,
1262 |_, _| {
1263 if let Some(limit) = limit {
1264 if keys_read >= limit {
1265 return ScanEvaluation::Stop;
1266 }
1267
1268 keys_read += 1;
1269 }
1270 ScanEvaluation::ReadData
1271 },
1272 |_, _, doc| {
1273 found_docs.push(
1274 deserialize_document(&doc)
1275 .map(BorrowedDocument::into_owned)
1276 .map_err(AbortError::Other)?,
1277 );
1278 Ok(())
1279 },
1280 )
1281 .map_err(|err| match err {
1282 AbortError::Other(err) => err,
1283 AbortError::Nebari(err) => crate::Error::from(err),
1284 })?;
1285
1286 Ok(found_docs)
1287 }
1288
1289 #[cfg_attr(feature = "tracing", tracing::instrument(
1290 level = "trace",
1291 skip(self, collection),
1292 fields(
1293 database = self.name(),
1294 collection.name = collection.name.as_ref(),
1295 collection.authority = collection.authority.as_ref(),
1296 )
1297 ))]
1298 fn list_headers_from_collection(
1299 &self,
1300 ids: Range<DocumentId>,
1301 sort: Sort,
1302 limit: Option<u32>,
1303 collection: &CollectionName,
1304 ) -> Result<Vec<Header>, bonsaidb_core::Error> {
1305 self.check_permission(
1306 collection_resource_name(self.name(), collection),
1307 &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
1308 )?;
1309 let tree = self
1310 .data
1311 .context
1312 .roots
1313 .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1314 .map_err(Error::from)?;
1315 let mut found_headers = Vec::new();
1316 let mut keys_read = 0;
1317 let ids = DocumentIdRange(ids);
1318 tree.scan(
1319 &ids.borrow_as_bytes(),
1320 match sort {
1321 Sort::Ascending => true,
1322 Sort::Descending => false,
1323 },
1324 |_, _, _| ScanEvaluation::ReadData,
1325 |_, _| {
1326 if let Some(limit) = limit {
1327 if keys_read >= limit {
1328 return ScanEvaluation::Stop;
1329 }
1330
1331 keys_read += 1;
1332 }
1333 ScanEvaluation::ReadData
1334 },
1335 |_, _, doc| {
1336 found_headers.push(
1337 deserialize_document(&doc)
1338 .map(|doc| doc.header)
1339 .map_err(AbortError::Other)?,
1340 );
1341 Ok(())
1342 },
1343 )
1344 .map_err(|err| match err {
1345 AbortError::Other(err) => err,
1346 AbortError::Nebari(err) => crate::Error::from(err),
1347 })?;
1348
1349 Ok(found_headers)
1350 }
1351
1352 #[cfg_attr(feature = "tracing", tracing::instrument(
1353 level = "trace",
1354 skip(self, collection),
1355 fields(
1356 database = self.name(),
1357 collection.name = collection.name.as_ref(),
1358 collection.authority = collection.authority.as_ref(),
1359 )
1360 ))]
1361 fn count_from_collection(
1362 &self,
1363 ids: Range<DocumentId>,
1364 collection: &CollectionName,
1365 ) -> Result<u64, bonsaidb_core::Error> {
1366 self.check_permission(
1367 collection_resource_name(self.name(), collection),
1368 &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
1369 )?;
1370 let tree = self
1371 .data
1372 .context
1373 .roots
1374 .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
1375 .map_err(Error::from)?;
1376 let ids = DocumentIdRange(ids);
1377 let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;
1378
1379 Ok(stats.alive_keys)
1380 }
1381
1382 #[cfg_attr(feature = "tracing", tracing::instrument(
1383 level = "trace",
1384 skip(self, collection),
1385 fields(
1386 database = self.name(),
1387 collection.name = collection.name.as_ref(),
1388 collection.authority = collection.authority.as_ref(),
1389 )
1390 ))]
1391 fn get_multiple_from_collection(
1392 &self,
1393 ids: &[DocumentId],
1394 collection: &CollectionName,
1395 ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
1396 for id in ids {
1397 self.check_permission(
1398 document_resource_name(self.name(), collection, id),
1399 &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1400 )?;
1401 }
1402 let mut ids = ids.to_vec();
1403 let collection = collection.clone();
1404 let tree = self
1405 .data
1406 .context
1407 .roots
1408 .tree(
1409 self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
1410 )
1411 .map_err(Error::from)?;
1412 ids.sort();
1413 let keys_and_values = tree
1414 .get_multiple(ids.iter().map(|id| id.as_ref()))
1415 .map_err(Error::from)?;
1416
1417 keys_and_values
1418 .into_iter()
1419 .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
1420 .collect::<Result<Vec<_>, Error>>()
1421 .map_err(bonsaidb_core::Error::from)
1422 }
1423
1424 #[cfg_attr(feature = "tracing", tracing::instrument(
1425 level = "trace",
1426 skip(self, collection),
1427 fields(
1428 database = self.name(),
1429 collection.name = collection.name.as_ref(),
1430 collection.authority = collection.authority.as_ref(),
1431 )
1432 ))]
1433 fn compact_collection_by_name(
1434 &self,
1435 collection: CollectionName,
1436 ) -> Result<(), bonsaidb_core::Error> {
1437 self.check_permission(
1438 collection_resource_name(self.name(), &collection),
1439 &BonsaiAction::Database(DatabaseAction::Compact),
1440 )?;
1441 self.storage()
1442 .instance
1443 .tasks()
1444 .compact_collection(self.clone(), collection)?;
1445 Ok(())
1446 }
1447
1448 #[cfg_attr(feature = "tracing", tracing::instrument(
1449 level = "trace",
1450 skip(self, view),
1451 fields(
1452 database = self.name(),
1453 view.collection.name = view.collection.name.as_ref(),
1454 view.collection.authority = view.collection.authority.as_ref(),
1455 view.name = view.name.as_ref(),
1456 )
1457 ))]
1458 fn query_by_name(
1459 &self,
1460 view: &ViewName,
1461 key: Option<SerializedQueryKey>,
1462 order: Sort,
1463 limit: Option<u32>,
1464 access_policy: AccessPolicy,
1465 ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
1466 let view = self.schematic().view_by_name(view)?;
1467 self.check_permission(
1468 view_resource_name(self.name(), &view.view_name()),
1469 &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
1470 )?;
1471 let mut results = Vec::new();
1472 self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
1473 for mapping in entry.mappings {
1474 results.push(bonsaidb_core::schema::view::map::Serialized {
1475 source: mapping.source,
1476 key: entry.key.clone(),
1477 value: mapping.value,
1478 });
1479 }
1480 Ok(())
1481 })?;
1482
1483 Ok(results)
1484 }
1485
1486 #[cfg_attr(feature = "tracing", tracing::instrument(
1487 level = "trace",
1488 skip(self, view),
1489 fields(
1490 database = self.name(),
1491 view.collection.name = view.collection.name.as_ref(),
1492 view.collection.authority = view.collection.authority.as_ref(),
1493 view.name = view.name.as_ref(),
1494 )
1495 ))]
1496 fn query_by_name_with_docs(
1497 &self,
1498 view: &ViewName,
1499 key: Option<SerializedQueryKey>,
1500 order: Sort,
1501 limit: Option<u32>,
1502 access_policy: AccessPolicy,
1503 ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
1504 let results = self.query_by_name(view, key, order, limit, access_policy)?;
1505 let view = self.schematic().view_by_name(view).unwrap(); let documents = self
1508 .get_multiple_from_collection(
1509 &results
1510 .iter()
1511 .map(|m| m.source.id.clone())
1512 .collect::<Vec<_>>(),
1513 &view.collection(),
1514 )?
1515 .into_iter()
1516 .map(|doc| (doc.header.id.clone(), doc))
1517 .collect::<BTreeMap<_, _>>();
1518
1519 Ok(
1520 bonsaidb_core::schema::view::map::MappedSerializedDocuments {
1521 mappings: results,
1522 documents,
1523 },
1524 )
1525 }
1526
1527 #[cfg_attr(feature = "tracing", tracing::instrument(
1528 level = "trace",
1529 skip(self, view_name),
1530 fields(
1531 database = self.name(),
1532 view.collection.name = view_name.collection.name.as_ref(),
1533 view.collection.authority = view_name.collection.authority.as_ref(),
1534 view.name = view_name.name.as_ref(),
1535 )
1536 ))]
1537 fn reduce_by_name(
1538 &self,
1539 view_name: &ViewName,
1540 key: Option<SerializedQueryKey>,
1541 access_policy: AccessPolicy,
1542 ) -> Result<Vec<u8>, bonsaidb_core::Error> {
1543 let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;
1544
1545 let result = if mappings.len() == 1 {
1546 mappings.pop().unwrap().value.into_vec()
1547 } else {
1548 let view = self.data.schema.view_by_name(view_name)?;
1549 view.reduce(
1550 &mappings
1551 .iter()
1552 .map(|map| (map.key.as_ref(), map.value.as_ref()))
1553 .collect::<Vec<_>>(),
1554 true,
1555 )
1556 .map_err(Error::from)?
1557 };
1558
1559 Ok(result)
1560 }
1561
1562 #[cfg_attr(feature = "tracing", tracing::instrument(
1563 level = "trace",
1564 skip(self, view_name),
1565 fields(
1566 database = self.name(),
1567 view.collection.name = view_name.collection.name.as_ref(),
1568 view.collection.authority = view_name.collection.authority.as_ref(),
1569 view.name = view_name.name.as_ref(),
1570 )
1571 ))]
1572 fn reduce_grouped_by_name(
1573 &self,
1574 view_name: &ViewName,
1575 key: Option<SerializedQueryKey>,
1576 access_policy: AccessPolicy,
1577 ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
1578 let view = self.data.schema.view_by_name(view_name)?;
1579 self.check_permission(
1580 view_resource_name(self.name(), &view.view_name()),
1581 &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
1582 )?;
1583 let mut mappings = Vec::new();
1584 self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
1585 mappings.push(MappedSerializedValue {
1586 key: entry.key,
1587 value: entry.reduced_value,
1588 });
1589 Ok(())
1590 })?;
1591
1592 Ok(mappings)
1593 }
1594
1595 #[cfg_attr(feature = "tracing", tracing::instrument(
1596 level = "trace",
1597 skip(self, view),
1598 fields(
1599 database = self.name(),
1600 view.collection.name = view.collection.name.as_ref(),
1601 view.collection.authority = view.collection.authority.as_ref(),
1602 view.name = view.name.as_ref(),
1603 )
1604 ))]
1605 fn delete_docs_by_name(
1606 &self,
1607 view: &ViewName,
1608 key: Option<SerializedQueryKey>,
1609 access_policy: AccessPolicy,
1610 ) -> Result<u64, bonsaidb_core::Error> {
1611 let view = self.data.schema.view_by_name(view)?;
1612 let collection = view.collection();
1613 let mut transaction = Transaction::default();
1614 self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
1615 for mapping in entry.mappings {
1616 transaction.push(Operation::delete(collection.clone(), mapping.source));
1617 }
1618
1619 Ok(())
1620 })?;
1621
1622 let results = LowLevelConnection::apply_transaction(self, transaction)?;
1623
1624 Ok(results.len() as u64)
1625 }
1626}
1627
1628impl HasSchema for Database {
1629 fn schematic(&self) -> &Schematic {
1630 &self.data.schema
1631 }
1632}
1633
1634type ViewIterator<'a> =
1635 Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;
1636
1637struct ViewEntryCollectionIterator<'a> {
1638 iterator: ViewIterator<'a>,
1639}
1640
1641impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
1642 type Item = Result<ViewEntry, crate::Error>;
1643
1644 fn next(&mut self) -> Option<Self::Item> {
1645 self.iterator.next().map(|item| {
1646 item.map_err(crate::Error::from)
1647 .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
1648 })
1649 }
1650}
1651
1652#[derive(Debug, Clone)]
1653pub(crate) struct Context {
1654 data: Arc<ContextData>,
1655}
1656
1657impl Deref for Context {
1658 type Target = ContextData;
1659
1660 fn deref(&self) -> &Self::Target {
1661 &self.data
1662 }
1663}
1664
1665#[derive(Debug)]
1666pub(crate) struct ContextData {
1667 pub(crate) roots: Roots<AnyFile>,
1668 key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
1669}
1670
1671impl Borrow<Roots<AnyFile>> for Context {
1672 fn borrow(&self) -> &Roots<AnyFile> {
1673 &self.data.roots
1674 }
1675}
1676
1677impl Context {
1678 pub(crate) fn new(
1679 roots: Roots<AnyFile>,
1680 key_value_persistence: KeyValuePersistence,
1681 storage_lock: Option<StorageLock>,
1682 ) -> Self {
1683 let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
1684 let mut background_worker_target_watcher = background_worker_target.watch();
1685 let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
1686 key_value_persistence,
1687 roots.clone(),
1688 background_worker_target,
1689 )));
1690 let background_worker_state = Arc::downgrade(&key_value_state);
1691 let context = Self {
1692 data: Arc::new(ContextData {
1693 roots,
1694 key_value_state,
1695 }),
1696 };
1697 std::thread::Builder::new()
1698 .name(String::from("keyvalue-worker"))
1699 .spawn(move || {
1700 keyvalue::background_worker(
1701 &background_worker_state,
1702 &mut background_worker_target_watcher,
1703 storage_lock,
1704 );
1705 })
1706 .unwrap();
1707 context
1708 }
1709
1710 pub(crate) fn perform_kv_operation(
1711 &self,
1712 op: KeyOperation,
1713 ) -> Result<Output, bonsaidb_core::Error> {
1714 let mut state = self.data.key_value_state.lock();
1715 state.perform_kv_operation(op, &self.data.key_value_state)
1716 }
1717
1718 pub(crate) fn update_key_expiration<'key>(
1719 &self,
1720 tree_key: impl Into<Cow<'key, str>>,
1721 expiration: Option<Timestamp>,
1722 ) {
1723 let mut state = self.data.key_value_state.lock();
1724 state.update_key_expiration(tree_key, expiration);
1725 }
1726
1727 #[cfg(test)]
1728 pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
1729 let state = self.data.key_value_state.lock();
1730 state.persistence_watcher()
1731 }
1732}
1733
1734impl Drop for ContextData {
1735 fn drop(&mut self) {
1736 if let Some(shutdown) = {
1737 let mut state = self.key_value_state.lock();
1738 state.shutdown(&self.key_value_state)
1739 } {
1740 let _: Result<_, _> = shutdown.recv();
1741 }
1742 }
1743}
1744
1745pub fn document_tree_name(collection: &CollectionName) -> String {
1746 format!("collection.{collection:#}")
1747}
1748
1749pub struct DocumentIdRange(Range<DocumentId>);
1750
1751impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1752 fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1753 BorrowedRange {
1754 start: match &self.0.start {
1755 connection::Bound::Unbounded => ops::Bound::Unbounded,
1756 connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
1757 connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
1758 },
1759 end: match &self.0.end {
1760 connection::Bound::Unbounded => ops::Bound::Unbounded,
1761 connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
1762 connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
1763 },
1764 }
1765 }
1766}
1767
1768pub trait DatabaseNonBlocking {
1771 #[must_use]
1773 fn name(&self) -> &str;
1774}
1775
1776impl DatabaseNonBlocking for Database {
1777 fn name(&self) -> &str {
1778 self.data.name.as_ref()
1779 }
1780}