1use std::borrow::Cow;
2use std::collections::{HashMap, HashSet};
3use std::fmt::{Debug, Display};
4use std::fs::{self, File};
5use std::io::{Read, Write};
6use std::marker::PhantomData;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Weak};
9
10use bonsaidb_core::admin::database::{self, ByName, Database as DatabaseRecord};
11use bonsaidb_core::admin::user::User;
12use bonsaidb_core::admin::{self, Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME};
13use bonsaidb_core::circulate;
14pub use bonsaidb_core::circulate::Relay;
15use bonsaidb_core::connection::{
16 self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
17 SessionAuthentication, SessionId, StorageConnection,
18};
19use bonsaidb_core::document::CollectionDocument;
20#[cfg(any(feature = "encryption", feature = "compression"))]
21use bonsaidb_core::document::KeyId;
22use bonsaidb_core::permissions::bonsai::{
23 bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
24 BonsaiAction, ServerAction,
25};
26use bonsaidb_core::permissions::Permissions;
27use bonsaidb_core::schema::{
28 Nameable, NamedCollection, Schema, SchemaName, SchemaSummary, Schematic,
29};
30use fs2::FileExt;
31use itertools::Itertools;
32use nebari::io::any::{AnyFile, AnyFileManager};
33use nebari::io::FileManager;
34use nebari::{ChunkCache, ThreadPool};
35use parking_lot::{Mutex, RwLock};
36use rand::{thread_rng, Rng};
37
38#[cfg(feature = "compression")]
39use crate::config::Compression;
40use crate::config::{KeyValuePersistence, StorageConfiguration};
41use crate::database::Context;
42use crate::tasks::manager::Manager;
43use crate::tasks::TaskManager;
44#[cfg(feature = "encryption")]
45use crate::vault::{self, LocalVaultKeyStorage, Vault};
46use crate::{Database, Error};
47
48#[cfg(feature = "password-hashing")]
49mod argon;
50#[cfg(feature = "token-authentication")]
51mod token_authentication;
52
53mod backup;
54mod pubsub;
55pub use backup::{AnyBackupLocation, BackupLocation};
56
57#[derive(Debug, Clone)]
150#[must_use]
151pub struct Storage {
152 pub(crate) instance: StorageInstance,
153 pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
154 effective_session: Option<Arc<Session>>,
155}
156
157#[derive(Debug)]
158pub struct AuthenticatedSession {
159 storage: Weak<Data>,
161 pub session: Mutex<Session>,
162}
163
164#[derive(Debug, Default)]
165pub struct SessionSubscribers {
166 pub subscribers: HashMap<u64, SessionSubscriber>,
167 pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
168 pub last_id: u64,
169}
170
171impl SessionSubscribers {
172 pub fn unregister(&mut self, subscriber_id: u64) {
173 if let Some(session_id) = self
174 .subscribers
175 .remove(&subscriber_id)
176 .and_then(|sub| sub.session_id)
177 {
178 if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
179 session_subscribers.remove(&subscriber_id);
180 }
181 }
182 }
183}
184
185#[derive(Debug)]
186pub struct SessionSubscriber {
187 pub session_id: Option<SessionId>,
188 pub subscriber: circulate::Subscriber,
189}
190
191impl Drop for AuthenticatedSession {
192 fn drop(&mut self) {
193 let mut session = self.session.lock();
194 if let Some(id) = session.id.take() {
195 if let Some(storage) = self.storage.upgrade() {
196 let mut sessions = storage.sessions.write();
198 sessions.sessions.remove(&id);
199
200 let mut sessions = storage.subscribers.write();
202 for id in sessions
203 .subscribers_by_session
204 .remove(&id)
205 .into_iter()
206 .flatten()
207 {
208 sessions.subscribers.remove(&id);
209 }
210 }
211 }
212 }
213}
214
215#[derive(Debug, Default)]
216struct AuthenticatedSessions {
217 sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
218 last_session_id: u64,
219}
220
221#[derive(Debug, Clone)]
222pub struct StorageInstance {
223 data: Arc<Data>,
224}
225
226impl From<StorageInstance> for Storage {
227 fn from(instance: StorageInstance) -> Self {
228 Self {
229 instance,
230 authentication: None,
231 effective_session: None,
232 }
233 }
234}
235
236struct Data {
237 lock: StorageLock,
238 path: PathBuf,
239 parallelization: usize,
240 threadpool: ThreadPool<AnyFile>,
241 file_manager: AnyFileManager,
242 pub(crate) tasks: TaskManager,
243 schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
244 available_databases: RwLock<HashMap<String, SchemaName>>,
245 open_roots: Mutex<HashMap<String, Context>>,
246 authenticated_permissions: Permissions,
248 sessions: RwLock<AuthenticatedSessions>,
249 pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
250 #[cfg(feature = "password-hashing")]
251 argon: argon::Hasher,
252 #[cfg(feature = "encryption")]
253 pub(crate) vault: Arc<Vault>,
254 #[cfg(feature = "encryption")]
255 default_encryption_key: Option<KeyId>,
256 #[cfg(any(feature = "compression", feature = "encryption"))]
257 tree_vault: Option<TreeVault>,
258 pub(crate) key_value_persistence: KeyValuePersistence,
259 chunk_cache: ChunkCache,
260 pub(crate) check_view_integrity_on_database_open: bool,
261 relay: Relay,
262}
263
264impl Storage {
265 pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
267 let owned_path = configuration
268 .path
269 .clone()
270 .unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
271 let file_manager = if configuration.memory_only {
272 AnyFileManager::memory()
273 } else {
274 AnyFileManager::std()
275 };
276
277 let manager = Manager::default();
278 for _ in 0..configuration.workers.worker_count {
279 manager.spawn_worker();
280 }
281 let tasks = TaskManager::new(manager);
282
283 fs::create_dir_all(&owned_path)?;
284
285 let storage_lock = Self::lookup_or_create_id(&configuration, &owned_path)?;
286
287 #[cfg(feature = "encryption")]
288 let vault = {
289 let vault_key_storage = match configuration.vault_key_storage {
290 Some(storage) => storage,
291 None => Arc::new(
292 LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
293 .map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
294 ),
295 };
296
297 Arc::new(Vault::initialize(
298 storage_lock.id(),
299 &owned_path,
300 vault_key_storage,
301 )?)
302 };
303
304 let parallelization = configuration.workers.parallelization;
305 let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
306 let key_value_persistence = configuration.key_value_persistence;
307 #[cfg(feature = "password-hashing")]
308 let argon = argon::Hasher::new(configuration.argon);
309 #[cfg(feature = "encryption")]
310 let default_encryption_key = configuration.default_encryption_key;
311 #[cfg(all(feature = "compression", feature = "encryption"))]
312 let tree_vault = TreeVault::new_if_needed(
313 default_encryption_key.clone(),
314 &vault,
315 configuration.default_compression,
316 );
317 #[cfg(all(not(feature = "compression"), feature = "encryption"))]
318 let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
319 #[cfg(all(feature = "compression", not(feature = "encryption")))]
320 let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
321
322 let authenticated_permissions = configuration.authenticated_permissions;
323
324 let storage = Self {
325 instance: StorageInstance {
326 data: Arc::new(Data {
327 lock: storage_lock,
328 tasks,
329 parallelization,
330 subscribers: Arc::default(),
331 authenticated_permissions,
332 sessions: RwLock::default(),
333 #[cfg(feature = "password-hashing")]
334 argon,
335 #[cfg(feature = "encryption")]
336 vault,
337 #[cfg(feature = "encryption")]
338 default_encryption_key,
339 #[cfg(any(feature = "compression", feature = "encryption"))]
340 tree_vault,
341 path: owned_path,
342 file_manager,
343 chunk_cache: ChunkCache::new(2000, 160_384),
344 threadpool: ThreadPool::new(parallelization),
345 schemas: RwLock::new(configuration.initial_schemas),
346 available_databases: RwLock::default(),
347 open_roots: Mutex::default(),
348 key_value_persistence,
349 check_view_integrity_on_database_open,
350 relay: Relay::default(),
351 }),
352 },
353 authentication: None,
354 effective_session: None,
355 };
356
357 storage.cache_available_databases()?;
358
359 storage.create_admin_database_if_needed()?;
360
361 Ok(storage)
362 }
363
364 #[cfg(feature = "internal-apis")]
365 #[doc(hidden)]
366 pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
367 let name = name.to_owned();
368 self.instance
369 .database_without_schema(&name, Some(self), None)
370 }
371
372 fn lookup_or_create_id(
373 configuration: &StorageConfiguration,
374 path: &Path,
375 ) -> Result<StorageLock, Error> {
376 let id_path = {
377 let storage_id = path.join("server-id");
378 if storage_id.exists() {
379 storage_id
380 } else {
381 path.join("storage-id")
382 }
383 };
384
385 let (id, file) = if let Some(id) = configuration.unique_id {
386 let file = if id_path.exists() {
391 File::open(id_path)?
392 } else {
393 let mut file = File::create(id_path)?;
394 let id = id.to_string();
395 file.write_all(id.as_bytes())?;
396 file
397 };
398 file.lock_exclusive()?;
399 (id, file)
400 } else {
401 if id_path.exists() {
406 let mut file = File::open(id_path)?;
409 file.lock_exclusive()?;
410 let mut bytes = Vec::new();
411 file.read_to_end(&mut bytes)?;
412 let existing_id =
413 String::from_utf8(bytes).expect("server-id contains invalid data");
414
415 (existing_id.parse().expect("server-id isn't numeric"), file)
416 } else {
417 let id = { thread_rng().gen::<u64>() };
418 let mut file = File::create(id_path)?;
419 file.lock_exclusive()?;
420
421 file.write_all(id.to_string().as_bytes())?;
422
423 (id, file)
424 }
425 };
426 Ok(StorageLock::new(StorageId(id), file))
427 }
428
429 fn cache_available_databases(&self) -> Result<(), Error> {
430 let available_databases = self
431 .admin()
432 .view::<ByName>()
433 .query()?
434 .into_iter()
435 .map(|map| (map.key, map.value))
436 .collect();
437 let mut storage_databases = self.instance.data.available_databases.write();
438 *storage_databases = available_databases;
439 Ok(())
440 }
441
442 fn create_admin_database_if_needed(&self) -> Result<(), Error> {
443 self.register_schema::<Admin>()?;
444 match self.database::<Admin>(ADMIN_DATABASE_NAME) {
445 Ok(_) => {}
446 Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
447 drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
448 }
449 Err(err) => return Err(Error::Core(err)),
450 }
451 Ok(())
452 }
453
454 #[must_use]
463 pub fn unique_id(&self) -> StorageId {
464 self.instance.data.lock.id()
465 }
466
467 #[must_use]
468 pub(crate) fn parallelization(&self) -> usize {
469 self.instance.data.parallelization
470 }
471
472 #[must_use]
473 #[cfg(feature = "encryption")]
474 pub(crate) fn vault(&self) -> &Arc<Vault> {
475 &self.instance.data.vault
476 }
477
478 #[must_use]
479 #[cfg(any(feature = "encryption", feature = "compression"))]
480 pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
481 self.instance.data.tree_vault.as_ref()
482 }
483
484 #[must_use]
485 #[cfg(feature = "encryption")]
486 pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
487 self.instance.data.default_encryption_key.as_ref()
488 }
489
490 #[must_use]
491 #[cfg(all(feature = "compression", not(feature = "encryption")))]
492 #[allow(clippy::unused_self)]
493 pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
494 None
495 }
496
497 pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
499 let mut schemas = self.instance.data.schemas.write();
500 if schemas
501 .insert(
502 DB::schema_name(),
503 Arc::new(StorageSchemaOpener::<DB>::new()?),
504 )
505 .is_none()
506 {
507 Ok(())
508 } else {
509 Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
510 DB::schema_name(),
511 )))
512 }
513 }
514
515 fn validate_name(name: &str) -> Result<(), Error> {
516 if name.chars().enumerate().all(|(index, c)| {
517 c.is_ascii_alphanumeric()
518 || (index == 0 && c == '_')
519 || (index > 0 && (c == '.' || c == '-'))
520 }) {
521 Ok(())
522 } else {
523 Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
524 name.to_owned(),
525 )))
526 }
527 }
528
529 #[must_use]
532 pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
533 if self.effective_session.is_some() {
534 None
535 } else {
536 Some(Self {
537 instance: self.instance.clone(),
538 authentication: self.authentication.clone(),
539 effective_session: Some(Arc::new(Session {
540 id: None,
541 authentication: SessionAuthentication::None,
542 permissions: effective_permissions,
543 })),
544 })
545 }
546 }
547
548 #[cfg(feature = "async")]
556 pub fn into_async(self) -> crate::AsyncStorage {
557 self.into_async_with_runtime(tokio::runtime::Handle::current())
558 }
559
560 #[cfg(feature = "async")]
564 pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
565 crate::AsyncStorage {
566 storage: self,
567 runtime: Arc::new(runtime),
568 }
569 }
570
571 #[cfg(feature = "async")]
579 pub fn to_async(&self) -> crate::AsyncStorage {
580 self.clone().into_async()
581 }
582
583 #[cfg(feature = "async")]
587 pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
588 self.clone().into_async_with_runtime(runtime)
589 }
590}
591
592impl Debug for Data {
593 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594 let mut f = f.debug_struct("Data");
595 f.field("lock", &self.lock)
596 .field("path", &self.path)
597 .field("parallelization", &self.parallelization)
598 .field("threadpool", &self.threadpool)
599 .field("file_manager", &self.file_manager)
600 .field("tasks", &self.tasks)
601 .field("available_databases", &self.available_databases)
602 .field("open_roots", &self.open_roots)
603 .field("authenticated_permissions", &self.authenticated_permissions)
604 .field("sessions", &self.sessions)
605 .field("subscribers", &self.subscribers)
606 .field("key_value_persistence", &self.key_value_persistence)
607 .field("chunk_cache", &self.chunk_cache)
608 .field(
609 "check_view_integrity_on_database_open",
610 &self.check_view_integrity_on_database_open,
611 )
612 .field("relay", &self.relay);
613
614 if let Some(schemas) = self.schemas.try_read() {
615 let mut schemas = schemas.keys().collect::<Vec<_>>();
616 schemas.sort();
617 f.field("schemas", &schemas);
618 } else {
619 f.field("schemas", &"RwLock locked");
620 }
621
622 #[cfg(feature = "password-hashing")]
623 f.field("argon", &self.argon);
624 #[cfg(feature = "encryption")]
625 {
626 f.field("vault", &self.vault)
627 .field("default_encryption_key", &self.default_encryption_key);
628 }
629 #[cfg(any(feature = "compression", feature = "encryption"))]
630 f.field("tree_vault", &self.tree_vault);
631
632 f.finish()
633 }
634}
635
636impl StorageInstance {
637 #[cfg_attr(
638 not(any(feature = "encryption", feature = "compression")),
639 allow(unused_mut)
640 )]
641 pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
642 let mut open_roots = self.data.open_roots.lock();
643 if let Some(roots) = open_roots.get(name) {
644 Ok(roots.clone())
645 } else {
646 let task_name = name.to_string();
647
648 let mut config = nebari::Config::new(self.data.path.join(task_name))
649 .file_manager(self.data.file_manager.clone())
650 .cache(self.data.chunk_cache.clone())
651 .shared_thread_pool(&self.data.threadpool);
652
653 #[cfg(any(feature = "encryption", feature = "compression"))]
654 if let Some(vault) = self.data.tree_vault.clone() {
655 config = config.vault(vault);
656 }
657
658 let roots = config.open().map_err(Error::from)?;
659 let context = Context::new(
660 roots,
661 self.data.key_value_persistence.clone(),
662 Some(self.data.lock.clone()),
663 );
664
665 open_roots.insert(name.to_owned(), context.clone());
666
667 Ok(context)
668 }
669 }
670
671 pub(crate) fn tasks(&self) -> &'_ TaskManager {
672 &self.data.tasks
673 }
674
675 pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
676 self.data.check_view_integrity_on_database_open
677 }
678
679 pub(crate) fn relay(&self) -> &'_ Relay {
680 &self.data.relay
681 }
682
683 pub(crate) fn database_without_schema(
685 &self,
686 name: &str,
687 storage: Option<&Storage>,
688 expected_schema: Option<SchemaName>,
689 ) -> Result<Database, Error> {
690 let stored_schema = {
692 let available_databases = self.data.available_databases.read();
693 available_databases
694 .get(name)
695 .ok_or_else(|| {
696 Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
697 })?
698 .clone()
699 };
700
701 if let Some(expected_schema) = expected_schema {
702 if stored_schema != expected_schema {
703 return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
704 database_name: name.to_owned(),
705 schema: expected_schema,
706 stored_schema,
707 }));
708 }
709 }
710
711 let mut schemas = self.data.schemas.write();
712 let storage =
713 storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
714 if let Some(schema) = schemas.get_mut(&stored_schema) {
715 let db = schema.open(name.to_string(), storage.as_ref())?;
716 Ok(db)
717 } else {
718 Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
721 stored_schema,
722 )))
723 }
724 }
725
726 fn update_user_with_named_id<
727 'user,
728 'other,
729 Col: NamedCollection<PrimaryKey = u64>,
730 U: Nameable<'user, u64> + Send + Sync,
731 O: Nameable<'other, u64> + Send + Sync,
732 F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
733 >(
734 &self,
735 user: U,
736 other: O,
737 callback: F,
738 ) -> Result<(), bonsaidb_core::Error> {
739 let admin = self.admin();
740 let other = other.name()?;
741 let user = User::load(user.name()?, &admin)?;
742 let other = other.id::<Col, _>(&admin)?;
743 match (user, other) {
744 (Some(mut user), Some(other)) => {
745 if callback(&mut user, other)? {
746 user.update(&admin)?;
747 }
748 Ok(())
749 }
750 _ => Err(bonsaidb_core::Error::UserNotFound),
752 }
753 }
754
755 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
756 #[cfg_attr(
757 any(
758 not(feature = "token-authentication"),
759 not(feature = "password-hashing")
760 ),
761 allow(unused_variables, clippy::needless_pass_by_value)
762 )]
763 fn authenticate_inner(
764 &self,
765 authentication: bonsaidb_core::connection::Authentication,
766 loaded_user: Option<CollectionDocument<User>>,
767 current_session_id: Option<SessionId>,
768 admin: &Database,
769 ) -> Result<Storage, bonsaidb_core::Error> {
770 use bonsaidb_core::connection::Authentication;
771 match authentication {
772 #[cfg(feature = "token-authentication")]
773 Authentication::Token {
774 id,
775 now,
776 now_hash,
777 algorithm,
778 } => self.begin_token_authentication(id, now, &now_hash, algorithm, admin),
779 #[cfg(feature = "token-authentication")]
780 Authentication::TokenChallengeResponse(hash) => {
781 let session_id =
782 current_session_id.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
783 self.finish_token_authentication(session_id, &hash, admin)
784 }
785 #[cfg(feature = "password-hashing")]
786 Authentication::Password { user, password } => {
787 let user = match loaded_user {
788 Some(user) => user,
789 None => {
790 User::load(user, admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?
791 }
792 };
793 let saved_hash = user
794 .contents
795 .argon_hash
796 .clone()
797 .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
798
799 self.data
800 .argon
801 .verify(user.header.id, password, saved_hash)?;
802 self.assume_user(user, admin)
803 }
804 }
805 }
806
807 fn assume_user(
808 &self,
809 user: CollectionDocument<User>,
810 admin: &Database,
811 ) -> Result<Storage, bonsaidb_core::Error> {
812 let permissions = user.contents.effective_permissions(
813 admin,
814 &admin.storage().instance.data.authenticated_permissions,
815 )?;
816
817 let mut sessions = self.data.sessions.write();
818 sessions.last_session_id += 1;
819 let session_id = SessionId(sessions.last_session_id);
820 let session = Session {
821 id: Some(session_id),
822 authentication: SessionAuthentication::Identity(Arc::new(Identity::User {
823 id: user.header.id,
824 username: user.contents.username,
825 })),
826 permissions,
827 };
828 let authentication = Arc::new(AuthenticatedSession {
829 storage: Arc::downgrade(&self.data),
830 session: Mutex::new(session.clone()),
831 });
832 sessions.sessions.insert(session_id, authentication.clone());
833
834 Ok(Storage {
835 instance: self.clone(),
836 authentication: Some(authentication),
837 effective_session: Some(Arc::new(session)),
838 })
839 }
840
841 fn assume_role(
842 &self,
843 role: CollectionDocument<Role>,
844 admin: &Database,
845 ) -> Result<Storage, bonsaidb_core::Error> {
846 let permissions = role.contents.effective_permissions(
847 admin,
848 &admin.storage().instance.data.authenticated_permissions,
849 )?;
850
851 let mut sessions = self.data.sessions.write();
852 sessions.last_session_id += 1;
853 let session_id = SessionId(sessions.last_session_id);
854 let session = Session {
855 id: Some(session_id),
856 authentication: SessionAuthentication::Identity(Arc::new(Identity::Role {
857 id: role.header.id,
858 name: role.contents.name,
859 })),
860 permissions,
861 };
862 let authentication = Arc::new(AuthenticatedSession {
863 storage: Arc::downgrade(&self.data),
864 session: Mutex::new(session.clone()),
865 });
866 sessions.sessions.insert(session_id, authentication.clone());
867
868 Ok(Storage {
869 instance: self.clone(),
870 authentication: Some(authentication),
871 effective_session: Some(Arc::new(session)),
872 })
873 }
874
875 fn add_permission_group_to_user_inner(
876 user: &mut CollectionDocument<User>,
877 permission_group_id: u64,
878 ) -> bool {
879 if user.contents.groups.contains(&permission_group_id) {
880 false
881 } else {
882 user.contents.groups.push(permission_group_id);
883 true
884 }
885 }
886
887 fn remove_permission_group_from_user_inner(
888 user: &mut CollectionDocument<User>,
889 permission_group_id: u64,
890 ) -> bool {
891 let old_len = user.contents.groups.len();
892 user.contents.groups.retain(|id| id != &permission_group_id);
893 old_len != user.contents.groups.len()
894 }
895
896 fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
897 if user.contents.roles.contains(&role_id) {
898 false
899 } else {
900 user.contents.roles.push(role_id);
901 true
902 }
903 }
904
905 fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
906 let old_len = user.contents.roles.len();
907 user.contents.roles.retain(|id| id != &role_id);
908 old_len != user.contents.roles.len()
909 }
910}
911
912pub trait DatabaseOpener: Send + Sync {
913 fn schematic(&self) -> &'_ Schematic;
914 fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
915}
916
917pub struct StorageSchemaOpener<DB: Schema> {
918 schematic: Schematic,
919 _phantom: PhantomData<DB>,
920}
921
922impl<DB> StorageSchemaOpener<DB>
923where
924 DB: Schema,
925{
926 pub fn new() -> Result<Self, Error> {
927 let schematic = DB::schematic()?;
928 Ok(Self {
929 schematic,
930 _phantom: PhantomData,
931 })
932 }
933}
934
935impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
936where
937 DB: Schema,
938{
939 fn schematic(&self) -> &'_ Schematic {
940 &self.schematic
941 }
942
943 fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
944 let roots = storage.instance.open_roots(&name)?;
945 let db = Database::new::<DB, _>(name, roots, storage)?;
946 Ok(db)
947 }
948}
949
950impl HasSession for StorageInstance {
951 fn session(&self) -> Option<&Session> {
952 None
953 }
954}
955
956impl StorageConnection for StorageInstance {
957 type Authenticated = Storage;
958 type Database = Database;
959
960 fn admin(&self) -> Self::Database {
961 Database::new::<Admin, _>(
962 ADMIN_DATABASE_NAME,
963 self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
964 &Storage::from(self.clone()),
965 )
966 .unwrap()
967 }
968
969 #[cfg_attr(feature = "tracing", tracing::instrument(
970 level = "trace",
971 skip(self, schema),
972 fields(
973 schema.authority = schema.authority.as_ref(),
974 schema.name = schema.name.as_ref(),
975 )
976 ))]
977 fn create_database_with_schema(
978 &self,
979 name: &str,
980 schema: SchemaName,
981 only_if_needed: bool,
982 ) -> Result<(), bonsaidb_core::Error> {
983 Storage::validate_name(name)?;
984
985 {
986 let schemas = self.data.schemas.read();
987 if !schemas.contains_key(&schema) {
988 return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
989 }
990 }
991
992 let mut available_databases = self.data.available_databases.write();
993 let admin = self.admin();
994 if !available_databases.contains_key(name) {
995 admin
996 .collection::<DatabaseRecord>()
997 .push(&admin::Database {
998 name: name.to_string(),
999 schema: schema.clone(),
1000 })?;
1001 available_databases.insert(name.to_string(), schema);
1002 } else if !only_if_needed {
1003 return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
1004 name.to_string(),
1005 ));
1006 }
1007
1008 Ok(())
1009 }
1010
1011 fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
1012 self.database_without_schema(name, None, Some(DB::schema_name()))
1013 .map_err(bonsaidb_core::Error::from)
1014 }
1015
1016 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
1017 fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
1018 let admin = self.admin();
1019 let mut available_databases = self.data.available_databases.write();
1020 available_databases.remove(name);
1021
1022 let mut open_roots = self.data.open_roots.lock();
1023 open_roots.remove(name);
1024
1025 let database_folder = self.data.path.join(name);
1026 if database_folder.exists() {
1027 let file_manager = self.data.file_manager.clone();
1028 file_manager
1029 .delete_directory(&database_folder)
1030 .map_err(Error::Nebari)?;
1031 }
1032
1033 if let Some(entry) = admin
1034 .view::<database::ByName>()
1035 .with_key(name)
1036 .query()?
1037 .first()
1038 {
1039 admin.delete::<DatabaseRecord, _>(&entry.source)?;
1040
1041 Ok(())
1042 } else {
1043 Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
1044 }
1045 }
1046
1047 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1048 fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
1049 let available_databases = self.data.available_databases.read();
1050 Ok(available_databases
1051 .iter()
1052 .map(|(name, schema)| connection::Database {
1053 name: name.to_string(),
1054 schema: schema.clone(),
1055 })
1056 .collect())
1057 }
1058
1059 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1060 fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
1061 let available_databases = self.data.available_databases.read();
1062 let schemas = self.data.schemas.read();
1063
1064 Ok(available_databases
1065 .values()
1066 .unique()
1067 .filter_map(|name| {
1068 schemas
1069 .get(name)
1070 .map(|opener| SchemaSummary::from(opener.schematic()))
1071 })
1072 .collect())
1073 }
1074
1075 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1076 fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
1077 let result = self
1078 .admin()
1079 .collection::<User>()
1080 .push(&User::default_with_username(username))?;
1081 Ok(result.id)
1082 }
1083
1084 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1085 fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
1086 &self,
1087 user: U,
1088 ) -> Result<(), bonsaidb_core::Error> {
1089 let admin = self.admin();
1090 let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
1091 user.delete(&admin)?;
1092
1093 Ok(())
1094 }
1095
1096 #[cfg(feature = "password-hashing")]
1097 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1098 fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
1099 &self,
1100 user: U,
1101 password: bonsaidb_core::connection::SensitiveString,
1102 ) -> Result<(), bonsaidb_core::Error> {
1103 let admin = self.admin();
1104 let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
1105 user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
1106 user.update(&admin)
1107 }
1108
1109 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1110 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
1111 fn authenticate(
1112 &self,
1113 authentication: bonsaidb_core::connection::Authentication,
1114 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1115 let admin = self.admin();
1116 self.authenticate_inner(authentication, None, None, &admin)
1117 .map(Storage::from)
1118 }
1119
1120 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1121 fn assume_identity(
1122 &self,
1123 identity: IdentityReference<'_>,
1124 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1125 let admin = self.admin();
1126 match identity {
1127 IdentityReference::User(user) => {
1128 let user =
1129 User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1130 self.assume_user(user, &admin).map(Storage::from)
1131 }
1132 IdentityReference::Role(role) => {
1133 let role =
1134 Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1135 self.assume_role(role, &admin).map(Storage::from)
1136 }
1137 _ => Err(bonsaidb_core::Error::InvalidCredentials),
1138 }
1139 }
1140
1141 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1142 fn add_permission_group_to_user<
1143 'user,
1144 'group,
1145 U: Nameable<'user, u64> + Send + Sync,
1146 G: Nameable<'group, u64> + Send + Sync,
1147 >(
1148 &self,
1149 user: U,
1150 permission_group: G,
1151 ) -> Result<(), bonsaidb_core::Error> {
1152 self.update_user_with_named_id::<PermissionGroup, _, _, _>(
1153 user,
1154 permission_group,
1155 |user, permission_group_id| {
1156 Ok(Self::add_permission_group_to_user_inner(
1157 user,
1158 permission_group_id,
1159 ))
1160 },
1161 )
1162 }
1163
1164 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1165 fn remove_permission_group_from_user<
1166 'user,
1167 'group,
1168 U: Nameable<'user, u64> + Send + Sync,
1169 G: Nameable<'group, u64> + Send + Sync,
1170 >(
1171 &self,
1172 user: U,
1173 permission_group: G,
1174 ) -> Result<(), bonsaidb_core::Error> {
1175 self.update_user_with_named_id::<PermissionGroup, _, _, _>(
1176 user,
1177 permission_group,
1178 |user, permission_group_id| {
1179 Ok(Self::remove_permission_group_from_user_inner(
1180 user,
1181 permission_group_id,
1182 ))
1183 },
1184 )
1185 }
1186
1187 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1188 fn add_role_to_user<
1189 'user,
1190 'group,
1191 U: Nameable<'user, u64> + Send + Sync,
1192 G: Nameable<'group, u64> + Send + Sync,
1193 >(
1194 &self,
1195 user: U,
1196 role: G,
1197 ) -> Result<(), bonsaidb_core::Error> {
1198 self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
1199 Ok(Self::add_role_to_user_inner(user, role_id))
1200 })
1201 }
1202
1203 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1204 fn remove_role_from_user<
1205 'user,
1206 'group,
1207 U: Nameable<'user, u64> + Send + Sync,
1208 G: Nameable<'group, u64> + Send + Sync,
1209 >(
1210 &self,
1211 user: U,
1212 role: G,
1213 ) -> Result<(), bonsaidb_core::Error> {
1214 self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
1215 Ok(Self::remove_role_from_user_inner(user, role_id))
1216 })
1217 }
1218}
1219
1220impl HasSession for Storage {
1221 fn session(&self) -> Option<&Session> {
1222 self.effective_session.as_deref()
1223 }
1224}
1225
1226impl StorageConnection for Storage {
1227 type Authenticated = Self;
1228 type Database = Database;
1229
1230 fn admin(&self) -> Self::Database {
1231 self.instance.admin()
1232 }
1233
1234 fn create_database_with_schema(
1235 &self,
1236 name: &str,
1237 schema: SchemaName,
1238 only_if_needed: bool,
1239 ) -> Result<(), bonsaidb_core::Error> {
1240 self.check_permission(
1241 database_resource_name(name),
1242 &BonsaiAction::Server(ServerAction::CreateDatabase),
1243 )?;
1244 self.instance
1245 .create_database_with_schema(name, schema, only_if_needed)
1246 }
1247
1248 fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
1249 self.instance.database::<DB>(name)
1250 }
1251
1252 fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
1253 self.check_permission(
1254 database_resource_name(name),
1255 &BonsaiAction::Server(ServerAction::DeleteDatabase),
1256 )?;
1257 self.instance.delete_database(name)
1258 }
1259
1260 fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
1261 self.check_permission(
1262 bonsaidb_resource_name(),
1263 &BonsaiAction::Server(ServerAction::ListDatabases),
1264 )?;
1265 self.instance.list_databases()
1266 }
1267
1268 fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
1269 self.check_permission(
1270 bonsaidb_resource_name(),
1271 &BonsaiAction::Server(ServerAction::ListAvailableSchemas),
1272 )?;
1273 self.instance.list_available_schemas()
1274 }
1275
1276 fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
1277 self.check_permission(
1278 bonsaidb_resource_name(),
1279 &BonsaiAction::Server(ServerAction::CreateUser),
1280 )?;
1281 self.instance.create_user(username)
1282 }
1283
1284 fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
1285 &self,
1286 user: U,
1287 ) -> Result<(), bonsaidb_core::Error> {
1288 let admin = self.admin();
1289 let user = user.name()?;
1290 let user_id = user
1291 .id::<User, _>(&admin)?
1292 .ok_or(bonsaidb_core::Error::UserNotFound)?;
1293 self.check_permission(
1294 user_resource_name(user_id),
1295 &BonsaiAction::Server(ServerAction::DeleteUser),
1296 )?;
1297 self.instance.delete_user(user)
1298 }
1299
1300 #[cfg(feature = "password-hashing")]
1301 fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
1302 &self,
1303 user: U,
1304 password: bonsaidb_core::connection::SensitiveString,
1305 ) -> Result<(), bonsaidb_core::Error> {
1306 let admin = self.admin();
1307 let user = user.name()?;
1308 let user_id = user
1309 .id::<User, _>(&admin)?
1310 .ok_or(bonsaidb_core::Error::UserNotFound)?;
1311 self.check_permission(
1312 user_resource_name(user_id),
1313 &BonsaiAction::Server(ServerAction::SetPassword),
1314 )?;
1315 self.instance.set_user_password(user, password)
1316 }
1317
1318 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
1319 #[cfg_attr(not(feature = "token-authentication"), allow(unused_assignments))]
1320 #[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
1321 fn authenticate(
1322 &self,
1323 authentication: bonsaidb_core::connection::Authentication,
1324 ) -> Result<Self, bonsaidb_core::Error> {
1325 let admin = self.admin();
1326 let mut loaded_user = None;
1327 match &authentication {
1328 #[cfg(feature = "token-authentication")]
1329 bonsaidb_core::connection::Authentication::Token { id, .. } => {
1330 self.check_permission(
1331 bonsaidb_core::permissions::bonsai::authentication_token_resource_name(*id),
1332 &BonsaiAction::Server(ServerAction::Authenticate(
1333 bonsaidb_core::connection::AuthenticationMethod::Token,
1334 )),
1335 )?;
1336 }
1337 #[cfg(feature = "password-hashing")]
1338 bonsaidb_core::connection::Authentication::Password { user, .. } => {
1339 let user =
1340 User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1341 self.check_permission(
1342 user_resource_name(user.header.id),
1343 &BonsaiAction::Server(ServerAction::Authenticate(
1344 bonsaidb_core::connection::AuthenticationMethod::PasswordHash,
1345 )),
1346 )?;
1347 loaded_user = Some(user);
1348 }
1349 #[cfg(feature = "token-authentication")]
1350 bonsaidb_core::connection::Authentication::TokenChallengeResponse(_) => {}
1351 }
1352 self.instance.authenticate_inner(
1353 authentication,
1354 loaded_user,
1355 self.authentication
1356 .as_ref()
1357 .and_then(|auth| auth.session.lock().id),
1358 &admin,
1359 )
1360 }
1361
1362 fn assume_identity(
1363 &self,
1364 identity: IdentityReference<'_>,
1365 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1366 match identity {
1367 IdentityReference::User(user) => {
1368 let admin = self.admin();
1369 let user =
1370 User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1371 self.check_permission(
1372 user_resource_name(user.header.id),
1373 &BonsaiAction::Server(ServerAction::AssumeIdentity),
1374 )?;
1375 self.instance.assume_user(user, &admin)
1376 }
1377 IdentityReference::Role(role) => {
1378 let admin = self.admin();
1379 let role =
1380 Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1381 self.check_permission(
1382 role_resource_name(role.header.id),
1383 &BonsaiAction::Server(ServerAction::AssumeIdentity),
1384 )?;
1385 self.instance.assume_role(role, &admin)
1386 }
1387
1388 _ => Err(bonsaidb_core::Error::InvalidCredentials),
1389 }
1390 }
1391
1392 fn add_permission_group_to_user<
1393 'user,
1394 'group,
1395 U: Nameable<'user, u64> + Send + Sync,
1396 G: Nameable<'group, u64> + Send + Sync,
1397 >(
1398 &self,
1399 user: U,
1400 permission_group: G,
1401 ) -> Result<(), bonsaidb_core::Error> {
1402 self.instance
1403 .update_user_with_named_id::<PermissionGroup, _, _, _>(
1404 user,
1405 permission_group,
1406 |user, permission_group_id| {
1407 self.check_permission(
1408 user_resource_name(user.header.id),
1409 &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
1410 )?;
1411 Ok(StorageInstance::add_permission_group_to_user_inner(
1412 user,
1413 permission_group_id,
1414 ))
1415 },
1416 )
1417 }
1418
1419 fn remove_permission_group_from_user<
1420 'user,
1421 'group,
1422 U: Nameable<'user, u64> + Send + Sync,
1423 G: Nameable<'group, u64> + Send + Sync,
1424 >(
1425 &self,
1426 user: U,
1427 permission_group: G,
1428 ) -> Result<(), bonsaidb_core::Error> {
1429 self.instance
1430 .update_user_with_named_id::<PermissionGroup, _, _, _>(
1431 user,
1432 permission_group,
1433 |user, permission_group_id| {
1434 self.check_permission(
1435 user_resource_name(user.header.id),
1436 &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
1437 )?;
1438 Ok(StorageInstance::remove_permission_group_from_user_inner(
1439 user,
1440 permission_group_id,
1441 ))
1442 },
1443 )
1444 }
1445
1446 fn add_role_to_user<
1447 'user,
1448 'group,
1449 U: Nameable<'user, u64> + Send + Sync,
1450 G: Nameable<'group, u64> + Send + Sync,
1451 >(
1452 &self,
1453 user: U,
1454 role: G,
1455 ) -> Result<(), bonsaidb_core::Error> {
1456 self.instance
1457 .update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
1458 self.check_permission(
1459 user_resource_name(user.header.id),
1460 &BonsaiAction::Server(ServerAction::ModifyUserRoles),
1461 )?;
1462 Ok(StorageInstance::add_role_to_user_inner(user, role_id))
1463 })
1464 }
1465
1466 fn remove_role_from_user<
1467 'user,
1468 'group,
1469 U: Nameable<'user, u64> + Send + Sync,
1470 G: Nameable<'group, u64> + Send + Sync,
1471 >(
1472 &self,
1473 user: U,
1474 role: G,
1475 ) -> Result<(), bonsaidb_core::Error> {
1476 self.instance
1477 .update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
1478 self.check_permission(
1479 user_resource_name(user.header.id),
1480 &BonsaiAction::Server(ServerAction::ModifyUserRoles),
1481 )?;
1482 Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
1483 })
1484 }
1485}
1486
1487#[test]
1488fn name_validation_tests() {
1489 assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
1490 assert!(matches!(
1491 Storage::validate_name("_internal-names-work"),
1492 Ok(())
1493 ));
1494 assert!(matches!(
1495 Storage::validate_name("-alphaunmericfirstrequired"),
1496 Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
1497 ));
1498 assert!(matches!(
1499 Storage::validate_name("\u{2661}"),
1500 Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
1501 ));
1502}
1503
1504#[derive(Clone, Copy, Eq, PartialEq, Hash)]
1506pub struct StorageId(u64);
1507
1508impl StorageId {
1509 #[must_use]
1511 pub const fn as_u64(self) -> u64 {
1512 self.0
1513 }
1514}
1515
1516impl Debug for StorageId {
1517 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1518 write!(f, "{:016x}", self.0)
1520 }
1521}
1522
1523impl Display for StorageId {
1524 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1525 Debug::fmt(self, f)
1526 }
1527}
1528
1529#[derive(Debug, Clone)]
1530#[cfg(any(feature = "compression", feature = "encryption"))]
1531pub(crate) struct TreeVault {
1532 #[cfg(feature = "compression")]
1533 compression: Option<Compression>,
1534 #[cfg(feature = "encryption")]
1535 pub key: Option<KeyId>,
1536 #[cfg(feature = "encryption")]
1537 pub vault: Arc<Vault>,
1538}
1539
1540#[cfg(all(feature = "compression", feature = "encryption"))]
1541impl TreeVault {
1542 pub(crate) fn new_if_needed(
1543 key: Option<KeyId>,
1544 vault: &Arc<Vault>,
1545 compression: Option<Compression>,
1546 ) -> Option<Self> {
1547 if key.is_none() && compression.is_none() {
1548 None
1549 } else {
1550 Some(Self {
1551 key,
1552 compression,
1553 vault: vault.clone(),
1554 })
1555 }
1556 }
1557
1558 fn header(&self, compressed: bool) -> u8 {
1559 let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };
1560
1561 if compressed {
1562 if let Some(compression) = self.compression {
1563 bits |= compression as u8;
1564 }
1565 }
1566
1567 bits
1568 }
1569}
1570
1571#[cfg(all(feature = "compression", feature = "encryption"))]
1572impl nebari::Vault for TreeVault {
1573 type Error = Error;
1574
1575 fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1576 let mut includes_compression = false;
1579 let compressed = match (payload.len(), self.compression) {
1580 (128..=usize::MAX, Some(Compression::Lz4)) => {
1581 includes_compression = true;
1582 Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
1583 }
1584 _ => Cow::Borrowed(payload),
1585 };
1586
1587 let mut complete = if let Some(key) = &self.key {
1588 self.vault.encrypt_payload(key, &compressed, None)?
1589 } else {
1590 compressed.into_owned()
1591 };
1592
1593 let header = self.header(includes_compression);
1594 if header != 0 {
1595 let header = [b't', b'r', b'v', header];
1596 complete.splice(0..0, header);
1597 }
1598
1599 Ok(complete)
1600 }
1601
1602 fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1603 if payload.len() >= 4 && &payload[0..3] == b"trv" {
1604 let header = payload[3];
1605 let payload = &payload[4..];
1606 let encrypted = (header & 0b1000_0000) != 0;
1607 let compression = header & 0b0111_1111;
1608 let decrypted = if encrypted {
1609 Cow::Owned(self.vault.decrypt_payload(payload, None)?)
1610 } else {
1611 Cow::Borrowed(payload)
1612 };
1613 #[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
1615 Some(Compression::Lz4) => {
1616 lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
1617 }
1618 None => decrypted.into_owned(),
1619 });
1620 }
1621 self.vault.decrypt_payload(payload, None)
1622 }
1623}
1624
1625pub trait StorageNonBlocking: Sized {
1628 #[must_use]
1630 fn path(&self) -> &Path;
1631
1632 fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
1636}
1637
1638impl StorageNonBlocking for Storage {
1639 fn path(&self) -> &Path {
1640 &self.instance.data.path
1641 }
1642
1643 fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
1644 if self.authentication.is_some() {
1645 return Err(bonsaidb_core::Error::InvalidCredentials);
1647 }
1648
1649 let Some(session_id) = session.id else {
1650 return Ok(Self {
1651 instance: self.instance.clone(),
1652 authentication: None,
1653 effective_session: Some(Arc::new(session)),
1654 });
1655 };
1656
1657 let session_data = self.instance.data.sessions.read();
1658 let authentication = session_data
1660 .sessions
1661 .get(&session_id)
1662 .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1663
1664 let authentication_session = authentication.session.lock();
1665 let effective_permissions =
1666 Permissions::merged([&session.permissions, &authentication_session.permissions]);
1667 let effective_session = Session {
1668 id: authentication_session.id,
1669 authentication: authentication_session.authentication.clone(),
1670 permissions: effective_permissions,
1671 };
1672
1673 Ok(Self {
1674 instance: self.instance.clone(),
1675 authentication: Some(authentication.clone()),
1676 effective_session: Some(Arc::new(effective_session)),
1677 })
1678 }
1679}
1680
1681#[cfg(all(feature = "compression", not(feature = "encryption")))]
1682impl TreeVault {
1683 pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
1684 compression.map(|compression| Self {
1685 compression: Some(compression),
1686 })
1687 }
1688}
1689
1690#[cfg(all(feature = "compression", not(feature = "encryption")))]
1691impl nebari::Vault for TreeVault {
1692 type Error = Error;
1693
1694 fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1695 Ok(match (payload.len(), self.compression) {
1696 (128..=usize::MAX, Some(Compression::Lz4)) => {
1697 let mut destination =
1698 vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
1699 let compressed_length =
1700 lz4_flex::block::compress_into(payload, &mut destination[8..])
1701 .expect("lz4-flex documents this shouldn't fail");
1702 destination.truncate(compressed_length + 8);
1703 destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
1704 let uncompressed_length =
1706 u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
1707 destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
1708 destination
1709 }
1710 _ => payload.to_vec(),
1712 })
1713 }
1714
1715 fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1716 if payload.len() >= 4 && &payload[0..3] == b"trv" {
1717 let header = payload[3];
1718 let payload = &payload[4..];
1719 let encrypted = (header & 0b1000_0000) != 0;
1720 let compression = header & 0b0111_1111;
1721 if encrypted {
1722 return Err(Error::EncryptionDisabled);
1723 }
1724
1725 #[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
1727 Some(Compression::Lz4) => {
1728 lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
1729 }
1730 None => payload.to_vec(),
1731 });
1732 }
1733 Ok(payload.to_vec())
1734 }
1735}
1736
1737#[cfg(all(not(feature = "compression"), feature = "encryption"))]
1738impl TreeVault {
1739 pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
1740 key.map(|key| Self {
1741 key: Some(key),
1742 vault: vault.clone(),
1743 })
1744 }
1745
1746 #[allow(dead_code)] fn header(&self) -> u8 {
1748 if self.key.is_some() {
1749 0b1000_0000
1750 } else {
1751 0
1752 }
1753 }
1754}
1755
1756#[cfg(all(not(feature = "compression"), feature = "encryption"))]
1757impl nebari::Vault for TreeVault {
1758 type Error = Error;
1759
1760 fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1761 if let Some(key) = &self.key {
1762 self.vault.encrypt_payload(key, payload, None)
1763 } else {
1764 Ok(payload.to_vec())
1766 }
1767 }
1768
1769 fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1770 self.vault.decrypt_payload(payload, None)
1771 }
1772}
1773
1774#[derive(Clone, Debug)]
1775pub struct StorageLock(StorageId, Arc<LockData>);
1776
1777impl StorageLock {
1778 pub const fn id(&self) -> StorageId {
1779 self.0
1780 }
1781}
1782
1783#[derive(Debug)]
1784struct LockData(File);
1785
1786impl StorageLock {
1787 fn new(id: StorageId, file: File) -> Self {
1788 Self(id, Arc::new(LockData(file)))
1789 }
1790}
1791
1792impl Drop for LockData {
1793 fn drop(&mut self) {
1794 drop(self.0.unlock());
1795 }
1796}