bonsaidb_local/
storage.rs

1use std::borrow::Cow;
2use std::collections::{HashMap, HashSet};
3use std::fmt::{Debug, Display};
4use std::fs::{self, File};
5use std::io::{Read, Write};
6use std::marker::PhantomData;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Weak};
9
10use bonsaidb_core::admin::database::{self, ByName, Database as DatabaseRecord};
11use bonsaidb_core::admin::user::User;
12use bonsaidb_core::admin::{self, Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME};
13use bonsaidb_core::circulate;
14pub use bonsaidb_core::circulate::Relay;
15use bonsaidb_core::connection::{
16    self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
17    SessionAuthentication, SessionId, StorageConnection,
18};
19use bonsaidb_core::document::CollectionDocument;
20#[cfg(any(feature = "encryption", feature = "compression"))]
21use bonsaidb_core::document::KeyId;
22use bonsaidb_core::permissions::bonsai::{
23    bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
24    BonsaiAction, ServerAction,
25};
26use bonsaidb_core::permissions::Permissions;
27use bonsaidb_core::schema::{
28    Nameable, NamedCollection, Schema, SchemaName, SchemaSummary, Schematic,
29};
30use fs2::FileExt;
31use itertools::Itertools;
32use nebari::io::any::{AnyFile, AnyFileManager};
33use nebari::io::FileManager;
34use nebari::{ChunkCache, ThreadPool};
35use parking_lot::{Mutex, RwLock};
36use rand::{thread_rng, Rng};
37
38#[cfg(feature = "compression")]
39use crate::config::Compression;
40use crate::config::{KeyValuePersistence, StorageConfiguration};
41use crate::database::Context;
42use crate::tasks::manager::Manager;
43use crate::tasks::TaskManager;
44#[cfg(feature = "encryption")]
45use crate::vault::{self, LocalVaultKeyStorage, Vault};
46use crate::{Database, Error};
47
48#[cfg(feature = "password-hashing")]
49mod argon;
50#[cfg(feature = "token-authentication")]
51mod token_authentication;
52
53mod backup;
54mod pubsub;
55pub use backup::{AnyBackupLocation, BackupLocation};
56
57/// A file-based, multi-database, multi-user database engine. This type blocks
58/// the current thread when used. See [`AsyncStorage`](crate::AsyncStorage) for
59/// this type's async counterpart.
60///
61/// ## Converting between Blocking and Async Types
62///
63/// [`AsyncStorage`](crate::AsyncStorage) and [`Storage`] can be converted to
64/// and from each other using:
65///
66/// - [`AsyncStorage::into_blocking()`](crate::AsyncStorage::into_blocking)
67/// - [`AsyncStorage::to_blocking()`](crate::AsyncStorage::to_blocking)
68/// - [`AsyncStorage::as_blocking()`](crate::AsyncStorage::as_blocking)
69/// - [`Storage::into_async()`]
70/// - [`Storage::to_async()`]
71/// - [`Storage::into_async_with_runtime()`]
72/// - [`Storage::to_async_with_runtime()`]
73///
74/// ## Converting from `Database::open` to `Storage::open`
75///
76/// [`Database::open`](Database::open) is a simple method that uses `Storage` to
77/// create a database named `default` with the schema provided. These two ways
78/// of opening the database are the same:
79///
80/// ```rust
81/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
82/// use bonsaidb_core::connection::StorageConnection;
83/// use bonsaidb_core::schema::Schema;
84/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
85/// use bonsaidb_local::{
86///     config::{Builder, StorageConfiguration},
87///     Database, Storage,
88/// };
89/// # fn open<MySchema: Schema>() -> anyhow::Result<()> {
90/// // This creates a Storage instance, creates a database, and returns it.
91/// let db = Database::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb"))?;
92///
93/// // This is the equivalent code being executed:
94/// let storage =
95///     Storage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)?;
96/// storage.create_database::<MySchema>("default", true)?;
97/// let db = storage.database::<MySchema>("default")?;
98/// #     Ok(())
99/// # }
100/// ```
101///
102/// ## Using multiple databases
103///
104/// This example shows how to use `Storage` to create and use multiple databases
105/// with multiple schemas:
106///
107/// ```rust
108/// use bonsaidb_core::connection::StorageConnection;
109/// use bonsaidb_core::schema::{Collection, Schema};
110/// use bonsaidb_local::config::{Builder, StorageConfiguration};
111/// use bonsaidb_local::Storage;
112/// use serde::{Deserialize, Serialize};
113///
114/// #[derive(Debug, Schema)]
115/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
116/// # #[schema(core = bonsaidb_core)]
117/// struct MySchema;
118///
119/// #[derive(Debug, Serialize, Deserialize, Collection)]
120/// #[collection(name = "blog-posts")]
121/// # #[collection(core = bonsaidb_core)]
122/// struct BlogPost {
123///     pub title: String,
124///     pub contents: String,
125///     pub author_id: u64,
126/// }
127///
128/// #[derive(Debug, Serialize, Deserialize, Collection)]
129/// #[collection(name = "blog-posts")]
130/// # #[collection(core = bonsaidb_core)]
131/// struct Author {
132///     pub name: String,
133/// }
134///
135/// # fn open() -> anyhow::Result<()> {
136/// let storage = Storage::open(
137///     StorageConfiguration::new("my-db.bonsaidb")
138///         .with_schema::<BlogPost>()?
139///         .with_schema::<MySchema>()?,
140/// )?;
141///
142/// storage.create_database::<BlogPost>("ectons-blog", true)?;
143/// let ectons_blog = storage.database::<BlogPost>("ectons-blog")?;
144/// storage.create_database::<MySchema>("another-db", true)?;
145/// let another_db = storage.database::<MySchema>("another-db")?;
146/// # Ok(())
147/// # }
148/// ```
149#[derive(Debug, Clone)]
150#[must_use]
151pub struct Storage {
152    pub(crate) instance: StorageInstance,
153    pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
154    effective_session: Option<Arc<Session>>,
155}
156
157#[derive(Debug)]
158pub struct AuthenticatedSession {
159    // TODO: client_data,
160    storage: Weak<Data>,
161    pub session: Mutex<Session>,
162}
163
164#[derive(Debug, Default)]
165pub struct SessionSubscribers {
166    pub subscribers: HashMap<u64, SessionSubscriber>,
167    pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
168    pub last_id: u64,
169}
170
171impl SessionSubscribers {
172    pub fn unregister(&mut self, subscriber_id: u64) {
173        if let Some(session_id) = self
174            .subscribers
175            .remove(&subscriber_id)
176            .and_then(|sub| sub.session_id)
177        {
178            if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
179                session_subscribers.remove(&subscriber_id);
180            }
181        }
182    }
183}
184
185#[derive(Debug)]
186pub struct SessionSubscriber {
187    pub session_id: Option<SessionId>,
188    pub subscriber: circulate::Subscriber,
189}
190
191impl Drop for AuthenticatedSession {
192    fn drop(&mut self) {
193        let mut session = self.session.lock();
194        if let Some(id) = session.id.take() {
195            if let Some(storage) = self.storage.upgrade() {
196                // Deregister the session id once dropped.
197                let mut sessions = storage.sessions.write();
198                sessions.sessions.remove(&id);
199
200                // Remove all subscribers.
201                let mut sessions = storage.subscribers.write();
202                for id in sessions
203                    .subscribers_by_session
204                    .remove(&id)
205                    .into_iter()
206                    .flatten()
207                {
208                    sessions.subscribers.remove(&id);
209                }
210            }
211        }
212    }
213}
214
215#[derive(Debug, Default)]
216struct AuthenticatedSessions {
217    sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
218    last_session_id: u64,
219}
220
221#[derive(Debug, Clone)]
222pub struct StorageInstance {
223    data: Arc<Data>,
224}
225
226impl From<StorageInstance> for Storage {
227    fn from(instance: StorageInstance) -> Self {
228        Self {
229            instance,
230            authentication: None,
231            effective_session: None,
232        }
233    }
234}
235
236struct Data {
237    lock: StorageLock,
238    path: PathBuf,
239    parallelization: usize,
240    threadpool: ThreadPool<AnyFile>,
241    file_manager: AnyFileManager,
242    pub(crate) tasks: TaskManager,
243    schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
244    available_databases: RwLock<HashMap<String, SchemaName>>,
245    open_roots: Mutex<HashMap<String, Context>>,
246    // cfg check matches `Connection::authenticate`
247    authenticated_permissions: Permissions,
248    sessions: RwLock<AuthenticatedSessions>,
249    pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
250    #[cfg(feature = "password-hashing")]
251    argon: argon::Hasher,
252    #[cfg(feature = "encryption")]
253    pub(crate) vault: Arc<Vault>,
254    #[cfg(feature = "encryption")]
255    default_encryption_key: Option<KeyId>,
256    #[cfg(any(feature = "compression", feature = "encryption"))]
257    tree_vault: Option<TreeVault>,
258    pub(crate) key_value_persistence: KeyValuePersistence,
259    chunk_cache: ChunkCache,
260    pub(crate) check_view_integrity_on_database_open: bool,
261    relay: Relay,
262}
263
264impl Storage {
265    /// Creates or opens a multi-database [`Storage`] with its data stored in `directory`.
266    pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
267        let owned_path = configuration
268            .path
269            .clone()
270            .unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
271        let file_manager = if configuration.memory_only {
272            AnyFileManager::memory()
273        } else {
274            AnyFileManager::std()
275        };
276
277        let manager = Manager::default();
278        for _ in 0..configuration.workers.worker_count {
279            manager.spawn_worker();
280        }
281        let tasks = TaskManager::new(manager);
282
283        fs::create_dir_all(&owned_path)?;
284
285        let storage_lock = Self::lookup_or_create_id(&configuration, &owned_path)?;
286
287        #[cfg(feature = "encryption")]
288        let vault = {
289            let vault_key_storage = match configuration.vault_key_storage {
290                Some(storage) => storage,
291                None => Arc::new(
292                    LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
293                        .map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
294                ),
295            };
296
297            Arc::new(Vault::initialize(
298                storage_lock.id(),
299                &owned_path,
300                vault_key_storage,
301            )?)
302        };
303
304        let parallelization = configuration.workers.parallelization;
305        let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
306        let key_value_persistence = configuration.key_value_persistence;
307        #[cfg(feature = "password-hashing")]
308        let argon = argon::Hasher::new(configuration.argon);
309        #[cfg(feature = "encryption")]
310        let default_encryption_key = configuration.default_encryption_key;
311        #[cfg(all(feature = "compression", feature = "encryption"))]
312        let tree_vault = TreeVault::new_if_needed(
313            default_encryption_key.clone(),
314            &vault,
315            configuration.default_compression,
316        );
317        #[cfg(all(not(feature = "compression"), feature = "encryption"))]
318        let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
319        #[cfg(all(feature = "compression", not(feature = "encryption")))]
320        let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
321
322        let authenticated_permissions = configuration.authenticated_permissions;
323
324        let storage = Self {
325            instance: StorageInstance {
326                data: Arc::new(Data {
327                    lock: storage_lock,
328                    tasks,
329                    parallelization,
330                    subscribers: Arc::default(),
331                    authenticated_permissions,
332                    sessions: RwLock::default(),
333                    #[cfg(feature = "password-hashing")]
334                    argon,
335                    #[cfg(feature = "encryption")]
336                    vault,
337                    #[cfg(feature = "encryption")]
338                    default_encryption_key,
339                    #[cfg(any(feature = "compression", feature = "encryption"))]
340                    tree_vault,
341                    path: owned_path,
342                    file_manager,
343                    chunk_cache: ChunkCache::new(2000, 160_384),
344                    threadpool: ThreadPool::new(parallelization),
345                    schemas: RwLock::new(configuration.initial_schemas),
346                    available_databases: RwLock::default(),
347                    open_roots: Mutex::default(),
348                    key_value_persistence,
349                    check_view_integrity_on_database_open,
350                    relay: Relay::default(),
351                }),
352            },
353            authentication: None,
354            effective_session: None,
355        };
356
357        storage.cache_available_databases()?;
358
359        storage.create_admin_database_if_needed()?;
360
361        Ok(storage)
362    }
363
364    #[cfg(feature = "internal-apis")]
365    #[doc(hidden)]
366    pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
367        let name = name.to_owned();
368        self.instance
369            .database_without_schema(&name, Some(self), None)
370    }
371
372    fn lookup_or_create_id(
373        configuration: &StorageConfiguration,
374        path: &Path,
375    ) -> Result<StorageLock, Error> {
376        let id_path = {
377            let storage_id = path.join("server-id");
378            if storage_id.exists() {
379                storage_id
380            } else {
381                path.join("storage-id")
382            }
383        };
384
385        let (id, file) = if let Some(id) = configuration.unique_id {
386            // The configuraiton id override is not persisted to disk. This is
387            // mostly to prevent someone from accidentally adding this
388            // configuration, realizing it breaks things, and then wanting to
389            // revert. This makes reverting to the old value easier.
390            let file = if id_path.exists() {
391                File::open(id_path)?
392            } else {
393                let mut file = File::create(id_path)?;
394                let id = id.to_string();
395                file.write_all(id.as_bytes())?;
396                file
397            };
398            file.lock_exclusive()?;
399            (id, file)
400        } else {
401            // Load/Store a randomly generated id into a file. While the value
402            // is numerical, the file contents are the ascii decimal, making it
403            // easier for a human to view, and if needed, edit.
404
405            if id_path.exists() {
406                // This value is important enought to not allow launching the
407                // server if the file can't be read or contains unexpected data.
408                let mut file = File::open(id_path)?;
409                file.lock_exclusive()?;
410                let mut bytes = Vec::new();
411                file.read_to_end(&mut bytes)?;
412                let existing_id =
413                    String::from_utf8(bytes).expect("server-id contains invalid data");
414
415                (existing_id.parse().expect("server-id isn't numeric"), file)
416            } else {
417                let id = { thread_rng().gen::<u64>() };
418                let mut file = File::create(id_path)?;
419                file.lock_exclusive()?;
420
421                file.write_all(id.to_string().as_bytes())?;
422
423                (id, file)
424            }
425        };
426        Ok(StorageLock::new(StorageId(id), file))
427    }
428
429    fn cache_available_databases(&self) -> Result<(), Error> {
430        let available_databases = self
431            .admin()
432            .view::<ByName>()
433            .query()?
434            .into_iter()
435            .map(|map| (map.key, map.value))
436            .collect();
437        let mut storage_databases = self.instance.data.available_databases.write();
438        *storage_databases = available_databases;
439        Ok(())
440    }
441
442    fn create_admin_database_if_needed(&self) -> Result<(), Error> {
443        self.register_schema::<Admin>()?;
444        match self.database::<Admin>(ADMIN_DATABASE_NAME) {
445            Ok(_) => {}
446            Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
447                drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
448            }
449            Err(err) => return Err(Error::Core(err)),
450        }
451        Ok(())
452    }
453
454    /// Returns the unique id of the server.
455    ///
456    /// This value is set from the [`StorageConfiguration`] or randomly
457    /// generated when creating a server. It shouldn't be changed after a server
458    /// is in use, as doing can cause issues. For example, the vault that
459    /// manages encrypted storage uses the server ID to store the vault key. If
460    /// the server ID changes, the vault key storage will need to be updated
461    /// with the new server ID.
462    #[must_use]
463    pub fn unique_id(&self) -> StorageId {
464        self.instance.data.lock.id()
465    }
466
467    #[must_use]
468    pub(crate) fn parallelization(&self) -> usize {
469        self.instance.data.parallelization
470    }
471
472    #[must_use]
473    #[cfg(feature = "encryption")]
474    pub(crate) fn vault(&self) -> &Arc<Vault> {
475        &self.instance.data.vault
476    }
477
478    #[must_use]
479    #[cfg(any(feature = "encryption", feature = "compression"))]
480    pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
481        self.instance.data.tree_vault.as_ref()
482    }
483
484    #[must_use]
485    #[cfg(feature = "encryption")]
486    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
487        self.instance.data.default_encryption_key.as_ref()
488    }
489
490    #[must_use]
491    #[cfg(all(feature = "compression", not(feature = "encryption")))]
492    #[allow(clippy::unused_self)]
493    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
494        None
495    }
496
497    /// Registers a schema for use within the server.
498    pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
499        let mut schemas = self.instance.data.schemas.write();
500        if schemas
501            .insert(
502                DB::schema_name(),
503                Arc::new(StorageSchemaOpener::<DB>::new()?),
504            )
505            .is_none()
506        {
507            Ok(())
508        } else {
509            Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
510                DB::schema_name(),
511            )))
512        }
513    }
514
515    fn validate_name(name: &str) -> Result<(), Error> {
516        if name.chars().enumerate().all(|(index, c)| {
517            c.is_ascii_alphanumeric()
518                || (index == 0 && c == '_')
519                || (index > 0 && (c == '.' || c == '-'))
520        }) {
521            Ok(())
522        } else {
523            Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
524                name.to_owned(),
525            )))
526        }
527    }
528
529    /// Restricts an unauthenticated instance to having `effective_permissions`.
530    /// Returns `None` if a session has already been established.
531    #[must_use]
532    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
533        if self.effective_session.is_some() {
534            None
535        } else {
536            Some(Self {
537                instance: self.instance.clone(),
538                authentication: self.authentication.clone(),
539                effective_session: Some(Arc::new(Session {
540                    id: None,
541                    authentication: SessionAuthentication::None,
542                    permissions: effective_permissions,
543                })),
544            })
545        }
546    }
547
548    /// Converts this instance into its blocking version, which is able to be
549    /// used without async. The returned instance uses the current Tokio runtime
550    /// handle to spawn blocking tasks.
551    ///
552    /// # Panics
553    ///
554    /// Panics if called outside the context of a Tokio runtime.
555    #[cfg(feature = "async")]
556    pub fn into_async(self) -> crate::AsyncStorage {
557        self.into_async_with_runtime(tokio::runtime::Handle::current())
558    }
559
560    /// Converts this instance into its blocking version, which is able to be
561    /// used without async. The returned instance uses the provided runtime
562    /// handle to spawn blocking tasks.
563    #[cfg(feature = "async")]
564    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
565        crate::AsyncStorage {
566            storage: self,
567            runtime: Arc::new(runtime),
568        }
569    }
570
571    /// Converts this instance into its blocking version, which is able to be
572    /// used without async. The returned instance uses the current Tokio runtime
573    /// handle to spawn blocking tasks.
574    ///
575    /// # Panics
576    ///
577    /// Panics if called outside the context of a Tokio runtime.
578    #[cfg(feature = "async")]
579    pub fn to_async(&self) -> crate::AsyncStorage {
580        self.clone().into_async()
581    }
582
583    /// Converts this instance into its blocking version, which is able to be
584    /// used without async. The returned instance uses the provided runtime
585    /// handle to spawn blocking tasks.
586    #[cfg(feature = "async")]
587    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
588        self.clone().into_async_with_runtime(runtime)
589    }
590}
591
592impl Debug for Data {
593    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594        let mut f = f.debug_struct("Data");
595        f.field("lock", &self.lock)
596            .field("path", &self.path)
597            .field("parallelization", &self.parallelization)
598            .field("threadpool", &self.threadpool)
599            .field("file_manager", &self.file_manager)
600            .field("tasks", &self.tasks)
601            .field("available_databases", &self.available_databases)
602            .field("open_roots", &self.open_roots)
603            .field("authenticated_permissions", &self.authenticated_permissions)
604            .field("sessions", &self.sessions)
605            .field("subscribers", &self.subscribers)
606            .field("key_value_persistence", &self.key_value_persistence)
607            .field("chunk_cache", &self.chunk_cache)
608            .field(
609                "check_view_integrity_on_database_open",
610                &self.check_view_integrity_on_database_open,
611            )
612            .field("relay", &self.relay);
613
614        if let Some(schemas) = self.schemas.try_read() {
615            let mut schemas = schemas.keys().collect::<Vec<_>>();
616            schemas.sort();
617            f.field("schemas", &schemas);
618        } else {
619            f.field("schemas", &"RwLock locked");
620        }
621
622        #[cfg(feature = "password-hashing")]
623        f.field("argon", &self.argon);
624        #[cfg(feature = "encryption")]
625        {
626            f.field("vault", &self.vault)
627                .field("default_encryption_key", &self.default_encryption_key);
628        }
629        #[cfg(any(feature = "compression", feature = "encryption"))]
630        f.field("tree_vault", &self.tree_vault);
631
632        f.finish()
633    }
634}
635
636impl StorageInstance {
637    #[cfg_attr(
638        not(any(feature = "encryption", feature = "compression")),
639        allow(unused_mut)
640    )]
641    pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
642        let mut open_roots = self.data.open_roots.lock();
643        if let Some(roots) = open_roots.get(name) {
644            Ok(roots.clone())
645        } else {
646            let task_name = name.to_string();
647
648            let mut config = nebari::Config::new(self.data.path.join(task_name))
649                .file_manager(self.data.file_manager.clone())
650                .cache(self.data.chunk_cache.clone())
651                .shared_thread_pool(&self.data.threadpool);
652
653            #[cfg(any(feature = "encryption", feature = "compression"))]
654            if let Some(vault) = self.data.tree_vault.clone() {
655                config = config.vault(vault);
656            }
657
658            let roots = config.open().map_err(Error::from)?;
659            let context = Context::new(
660                roots,
661                self.data.key_value_persistence.clone(),
662                Some(self.data.lock.clone()),
663            );
664
665            open_roots.insert(name.to_owned(), context.clone());
666
667            Ok(context)
668        }
669    }
670
671    pub(crate) fn tasks(&self) -> &'_ TaskManager {
672        &self.data.tasks
673    }
674
675    pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
676        self.data.check_view_integrity_on_database_open
677    }
678
679    pub(crate) fn relay(&self) -> &'_ Relay {
680        &self.data.relay
681    }
682
683    /// Opens a database through a generic-free trait.
684    pub(crate) fn database_without_schema(
685        &self,
686        name: &str,
687        storage: Option<&Storage>,
688        expected_schema: Option<SchemaName>,
689    ) -> Result<Database, Error> {
690        // TODO switch to upgradable read now that we are on parking_lot
691        let stored_schema = {
692            let available_databases = self.data.available_databases.read();
693            available_databases
694                .get(name)
695                .ok_or_else(|| {
696                    Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
697                })?
698                .clone()
699        };
700
701        if let Some(expected_schema) = expected_schema {
702            if stored_schema != expected_schema {
703                return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
704                    database_name: name.to_owned(),
705                    schema: expected_schema,
706                    stored_schema,
707                }));
708            }
709        }
710
711        let mut schemas = self.data.schemas.write();
712        let storage =
713            storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
714        if let Some(schema) = schemas.get_mut(&stored_schema) {
715            let db = schema.open(name.to_string(), storage.as_ref())?;
716            Ok(db)
717        } else {
718            // The schema was stored, the user is requesting the same schema,
719            // but it isn't registerd with the storage currently.
720            Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
721                stored_schema,
722            )))
723        }
724    }
725
726    fn update_user_with_named_id<
727        'user,
728        'other,
729        Col: NamedCollection<PrimaryKey = u64>,
730        U: Nameable<'user, u64> + Send + Sync,
731        O: Nameable<'other, u64> + Send + Sync,
732        F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
733    >(
734        &self,
735        user: U,
736        other: O,
737        callback: F,
738    ) -> Result<(), bonsaidb_core::Error> {
739        let admin = self.admin();
740        let other = other.name()?;
741        let user = User::load(user.name()?, &admin)?;
742        let other = other.id::<Col, _>(&admin)?;
743        match (user, other) {
744            (Some(mut user), Some(other)) => {
745                if callback(&mut user, other)? {
746                    user.update(&admin)?;
747                }
748                Ok(())
749            }
750            // TODO make this a generic not found with a name parameter.
751            _ => Err(bonsaidb_core::Error::UserNotFound),
752        }
753    }
754
755    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
756    #[cfg_attr(
757        any(
758            not(feature = "token-authentication"),
759            not(feature = "password-hashing")
760        ),
761        allow(unused_variables, clippy::needless_pass_by_value)
762    )]
763    fn authenticate_inner(
764        &self,
765        authentication: bonsaidb_core::connection::Authentication,
766        loaded_user: Option<CollectionDocument<User>>,
767        current_session_id: Option<SessionId>,
768        admin: &Database,
769    ) -> Result<Storage, bonsaidb_core::Error> {
770        use bonsaidb_core::connection::Authentication;
771        match authentication {
772            #[cfg(feature = "token-authentication")]
773            Authentication::Token {
774                id,
775                now,
776                now_hash,
777                algorithm,
778            } => self.begin_token_authentication(id, now, &now_hash, algorithm, admin),
779            #[cfg(feature = "token-authentication")]
780            Authentication::TokenChallengeResponse(hash) => {
781                let session_id =
782                    current_session_id.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
783                self.finish_token_authentication(session_id, &hash, admin)
784            }
785            #[cfg(feature = "password-hashing")]
786            Authentication::Password { user, password } => {
787                let user = match loaded_user {
788                    Some(user) => user,
789                    None => {
790                        User::load(user, admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?
791                    }
792                };
793                let saved_hash = user
794                    .contents
795                    .argon_hash
796                    .clone()
797                    .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
798
799                self.data
800                    .argon
801                    .verify(user.header.id, password, saved_hash)?;
802                self.assume_user(user, admin)
803            }
804        }
805    }
806
807    fn assume_user(
808        &self,
809        user: CollectionDocument<User>,
810        admin: &Database,
811    ) -> Result<Storage, bonsaidb_core::Error> {
812        let permissions = user.contents.effective_permissions(
813            admin,
814            &admin.storage().instance.data.authenticated_permissions,
815        )?;
816
817        let mut sessions = self.data.sessions.write();
818        sessions.last_session_id += 1;
819        let session_id = SessionId(sessions.last_session_id);
820        let session = Session {
821            id: Some(session_id),
822            authentication: SessionAuthentication::Identity(Arc::new(Identity::User {
823                id: user.header.id,
824                username: user.contents.username,
825            })),
826            permissions,
827        };
828        let authentication = Arc::new(AuthenticatedSession {
829            storage: Arc::downgrade(&self.data),
830            session: Mutex::new(session.clone()),
831        });
832        sessions.sessions.insert(session_id, authentication.clone());
833
834        Ok(Storage {
835            instance: self.clone(),
836            authentication: Some(authentication),
837            effective_session: Some(Arc::new(session)),
838        })
839    }
840
841    fn assume_role(
842        &self,
843        role: CollectionDocument<Role>,
844        admin: &Database,
845    ) -> Result<Storage, bonsaidb_core::Error> {
846        let permissions = role.contents.effective_permissions(
847            admin,
848            &admin.storage().instance.data.authenticated_permissions,
849        )?;
850
851        let mut sessions = self.data.sessions.write();
852        sessions.last_session_id += 1;
853        let session_id = SessionId(sessions.last_session_id);
854        let session = Session {
855            id: Some(session_id),
856            authentication: SessionAuthentication::Identity(Arc::new(Identity::Role {
857                id: role.header.id,
858                name: role.contents.name,
859            })),
860            permissions,
861        };
862        let authentication = Arc::new(AuthenticatedSession {
863            storage: Arc::downgrade(&self.data),
864            session: Mutex::new(session.clone()),
865        });
866        sessions.sessions.insert(session_id, authentication.clone());
867
868        Ok(Storage {
869            instance: self.clone(),
870            authentication: Some(authentication),
871            effective_session: Some(Arc::new(session)),
872        })
873    }
874
875    fn add_permission_group_to_user_inner(
876        user: &mut CollectionDocument<User>,
877        permission_group_id: u64,
878    ) -> bool {
879        if user.contents.groups.contains(&permission_group_id) {
880            false
881        } else {
882            user.contents.groups.push(permission_group_id);
883            true
884        }
885    }
886
887    fn remove_permission_group_from_user_inner(
888        user: &mut CollectionDocument<User>,
889        permission_group_id: u64,
890    ) -> bool {
891        let old_len = user.contents.groups.len();
892        user.contents.groups.retain(|id| id != &permission_group_id);
893        old_len != user.contents.groups.len()
894    }
895
896    fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
897        if user.contents.roles.contains(&role_id) {
898            false
899        } else {
900            user.contents.roles.push(role_id);
901            true
902        }
903    }
904
905    fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
906        let old_len = user.contents.roles.len();
907        user.contents.roles.retain(|id| id != &role_id);
908        old_len != user.contents.roles.len()
909    }
910}
911
912pub trait DatabaseOpener: Send + Sync {
913    fn schematic(&self) -> &'_ Schematic;
914    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
915}
916
917pub struct StorageSchemaOpener<DB: Schema> {
918    schematic: Schematic,
919    _phantom: PhantomData<DB>,
920}
921
922impl<DB> StorageSchemaOpener<DB>
923where
924    DB: Schema,
925{
926    pub fn new() -> Result<Self, Error> {
927        let schematic = DB::schematic()?;
928        Ok(Self {
929            schematic,
930            _phantom: PhantomData,
931        })
932    }
933}
934
935impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
936where
937    DB: Schema,
938{
939    fn schematic(&self) -> &'_ Schematic {
940        &self.schematic
941    }
942
943    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
944        let roots = storage.instance.open_roots(&name)?;
945        let db = Database::new::<DB, _>(name, roots, storage)?;
946        Ok(db)
947    }
948}
949
950impl HasSession for StorageInstance {
951    fn session(&self) -> Option<&Session> {
952        None
953    }
954}
955
956impl StorageConnection for StorageInstance {
957    type Authenticated = Storage;
958    type Database = Database;
959
960    fn admin(&self) -> Self::Database {
961        Database::new::<Admin, _>(
962            ADMIN_DATABASE_NAME,
963            self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
964            &Storage::from(self.clone()),
965        )
966        .unwrap()
967    }
968
969    #[cfg_attr(feature = "tracing", tracing::instrument(
970        level = "trace",
971        skip(self, schema),
972        fields(
973            schema.authority = schema.authority.as_ref(),
974            schema.name = schema.name.as_ref(),
975        )
976    ))]
977    fn create_database_with_schema(
978        &self,
979        name: &str,
980        schema: SchemaName,
981        only_if_needed: bool,
982    ) -> Result<(), bonsaidb_core::Error> {
983        Storage::validate_name(name)?;
984
985        {
986            let schemas = self.data.schemas.read();
987            if !schemas.contains_key(&schema) {
988                return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
989            }
990        }
991
992        let mut available_databases = self.data.available_databases.write();
993        let admin = self.admin();
994        if !available_databases.contains_key(name) {
995            admin
996                .collection::<DatabaseRecord>()
997                .push(&admin::Database {
998                    name: name.to_string(),
999                    schema: schema.clone(),
1000                })?;
1001            available_databases.insert(name.to_string(), schema);
1002        } else if !only_if_needed {
1003            return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
1004                name.to_string(),
1005            ));
1006        }
1007
1008        Ok(())
1009    }
1010
1011    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
1012        self.database_without_schema(name, None, Some(DB::schema_name()))
1013            .map_err(bonsaidb_core::Error::from)
1014    }
1015
1016    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
1017    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
1018        let admin = self.admin();
1019        let mut available_databases = self.data.available_databases.write();
1020        available_databases.remove(name);
1021
1022        let mut open_roots = self.data.open_roots.lock();
1023        open_roots.remove(name);
1024
1025        let database_folder = self.data.path.join(name);
1026        if database_folder.exists() {
1027            let file_manager = self.data.file_manager.clone();
1028            file_manager
1029                .delete_directory(&database_folder)
1030                .map_err(Error::Nebari)?;
1031        }
1032
1033        if let Some(entry) = admin
1034            .view::<database::ByName>()
1035            .with_key(name)
1036            .query()?
1037            .first()
1038        {
1039            admin.delete::<DatabaseRecord, _>(&entry.source)?;
1040
1041            Ok(())
1042        } else {
1043            Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
1044        }
1045    }
1046
1047    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1048    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
1049        let available_databases = self.data.available_databases.read();
1050        Ok(available_databases
1051            .iter()
1052            .map(|(name, schema)| connection::Database {
1053                name: name.to_string(),
1054                schema: schema.clone(),
1055            })
1056            .collect())
1057    }
1058
1059    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1060    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
1061        let available_databases = self.data.available_databases.read();
1062        let schemas = self.data.schemas.read();
1063
1064        Ok(available_databases
1065            .values()
1066            .unique()
1067            .filter_map(|name| {
1068                schemas
1069                    .get(name)
1070                    .map(|opener| SchemaSummary::from(opener.schematic()))
1071            })
1072            .collect())
1073    }
1074
1075    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1076    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
1077        let result = self
1078            .admin()
1079            .collection::<User>()
1080            .push(&User::default_with_username(username))?;
1081        Ok(result.id)
1082    }
1083
1084    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1085    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
1086        &self,
1087        user: U,
1088    ) -> Result<(), bonsaidb_core::Error> {
1089        let admin = self.admin();
1090        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
1091        user.delete(&admin)?;
1092
1093        Ok(())
1094    }
1095
1096    #[cfg(feature = "password-hashing")]
1097    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1098    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
1099        &self,
1100        user: U,
1101        password: bonsaidb_core::connection::SensitiveString,
1102    ) -> Result<(), bonsaidb_core::Error> {
1103        let admin = self.admin();
1104        let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
1105        user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
1106        user.update(&admin)
1107    }
1108
1109    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1110    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
1111    fn authenticate(
1112        &self,
1113        authentication: bonsaidb_core::connection::Authentication,
1114    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1115        let admin = self.admin();
1116        self.authenticate_inner(authentication, None, None, &admin)
1117            .map(Storage::from)
1118    }
1119
1120    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1121    fn assume_identity(
1122        &self,
1123        identity: IdentityReference<'_>,
1124    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1125        let admin = self.admin();
1126        match identity {
1127            IdentityReference::User(user) => {
1128                let user =
1129                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1130                self.assume_user(user, &admin).map(Storage::from)
1131            }
1132            IdentityReference::Role(role) => {
1133                let role =
1134                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1135                self.assume_role(role, &admin).map(Storage::from)
1136            }
1137            _ => Err(bonsaidb_core::Error::InvalidCredentials),
1138        }
1139    }
1140
1141    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1142    fn add_permission_group_to_user<
1143        'user,
1144        'group,
1145        U: Nameable<'user, u64> + Send + Sync,
1146        G: Nameable<'group, u64> + Send + Sync,
1147    >(
1148        &self,
1149        user: U,
1150        permission_group: G,
1151    ) -> Result<(), bonsaidb_core::Error> {
1152        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
1153            user,
1154            permission_group,
1155            |user, permission_group_id| {
1156                Ok(Self::add_permission_group_to_user_inner(
1157                    user,
1158                    permission_group_id,
1159                ))
1160            },
1161        )
1162    }
1163
1164    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1165    fn remove_permission_group_from_user<
1166        'user,
1167        'group,
1168        U: Nameable<'user, u64> + Send + Sync,
1169        G: Nameable<'group, u64> + Send + Sync,
1170    >(
1171        &self,
1172        user: U,
1173        permission_group: G,
1174    ) -> Result<(), bonsaidb_core::Error> {
1175        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
1176            user,
1177            permission_group,
1178            |user, permission_group_id| {
1179                Ok(Self::remove_permission_group_from_user_inner(
1180                    user,
1181                    permission_group_id,
1182                ))
1183            },
1184        )
1185    }
1186
1187    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1188    fn add_role_to_user<
1189        'user,
1190        'group,
1191        U: Nameable<'user, u64> + Send + Sync,
1192        G: Nameable<'group, u64> + Send + Sync,
1193    >(
1194        &self,
1195        user: U,
1196        role: G,
1197    ) -> Result<(), bonsaidb_core::Error> {
1198        self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
1199            Ok(Self::add_role_to_user_inner(user, role_id))
1200        })
1201    }
1202
1203    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
1204    fn remove_role_from_user<
1205        'user,
1206        'group,
1207        U: Nameable<'user, u64> + Send + Sync,
1208        G: Nameable<'group, u64> + Send + Sync,
1209    >(
1210        &self,
1211        user: U,
1212        role: G,
1213    ) -> Result<(), bonsaidb_core::Error> {
1214        self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
1215            Ok(Self::remove_role_from_user_inner(user, role_id))
1216        })
1217    }
1218}
1219
1220impl HasSession for Storage {
1221    fn session(&self) -> Option<&Session> {
1222        self.effective_session.as_deref()
1223    }
1224}
1225
1226impl StorageConnection for Storage {
1227    type Authenticated = Self;
1228    type Database = Database;
1229
1230    fn admin(&self) -> Self::Database {
1231        self.instance.admin()
1232    }
1233
1234    fn create_database_with_schema(
1235        &self,
1236        name: &str,
1237        schema: SchemaName,
1238        only_if_needed: bool,
1239    ) -> Result<(), bonsaidb_core::Error> {
1240        self.check_permission(
1241            database_resource_name(name),
1242            &BonsaiAction::Server(ServerAction::CreateDatabase),
1243        )?;
1244        self.instance
1245            .create_database_with_schema(name, schema, only_if_needed)
1246    }
1247
1248    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
1249        self.instance.database::<DB>(name)
1250    }
1251
1252    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
1253        self.check_permission(
1254            database_resource_name(name),
1255            &BonsaiAction::Server(ServerAction::DeleteDatabase),
1256        )?;
1257        self.instance.delete_database(name)
1258    }
1259
1260    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
1261        self.check_permission(
1262            bonsaidb_resource_name(),
1263            &BonsaiAction::Server(ServerAction::ListDatabases),
1264        )?;
1265        self.instance.list_databases()
1266    }
1267
1268    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
1269        self.check_permission(
1270            bonsaidb_resource_name(),
1271            &BonsaiAction::Server(ServerAction::ListAvailableSchemas),
1272        )?;
1273        self.instance.list_available_schemas()
1274    }
1275
1276    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
1277        self.check_permission(
1278            bonsaidb_resource_name(),
1279            &BonsaiAction::Server(ServerAction::CreateUser),
1280        )?;
1281        self.instance.create_user(username)
1282    }
1283
1284    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
1285        &self,
1286        user: U,
1287    ) -> Result<(), bonsaidb_core::Error> {
1288        let admin = self.admin();
1289        let user = user.name()?;
1290        let user_id = user
1291            .id::<User, _>(&admin)?
1292            .ok_or(bonsaidb_core::Error::UserNotFound)?;
1293        self.check_permission(
1294            user_resource_name(user_id),
1295            &BonsaiAction::Server(ServerAction::DeleteUser),
1296        )?;
1297        self.instance.delete_user(user)
1298    }
1299
1300    #[cfg(feature = "password-hashing")]
1301    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
1302        &self,
1303        user: U,
1304        password: bonsaidb_core::connection::SensitiveString,
1305    ) -> Result<(), bonsaidb_core::Error> {
1306        let admin = self.admin();
1307        let user = user.name()?;
1308        let user_id = user
1309            .id::<User, _>(&admin)?
1310            .ok_or(bonsaidb_core::Error::UserNotFound)?;
1311        self.check_permission(
1312            user_resource_name(user_id),
1313            &BonsaiAction::Server(ServerAction::SetPassword),
1314        )?;
1315        self.instance.set_user_password(user, password)
1316    }
1317
1318    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
1319    #[cfg_attr(not(feature = "token-authentication"), allow(unused_assignments))]
1320    #[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
1321    fn authenticate(
1322        &self,
1323        authentication: bonsaidb_core::connection::Authentication,
1324    ) -> Result<Self, bonsaidb_core::Error> {
1325        let admin = self.admin();
1326        let mut loaded_user = None;
1327        match &authentication {
1328            #[cfg(feature = "token-authentication")]
1329            bonsaidb_core::connection::Authentication::Token { id, .. } => {
1330                self.check_permission(
1331                    bonsaidb_core::permissions::bonsai::authentication_token_resource_name(*id),
1332                    &BonsaiAction::Server(ServerAction::Authenticate(
1333                        bonsaidb_core::connection::AuthenticationMethod::Token,
1334                    )),
1335                )?;
1336            }
1337            #[cfg(feature = "password-hashing")]
1338            bonsaidb_core::connection::Authentication::Password { user, .. } => {
1339                let user =
1340                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1341                self.check_permission(
1342                    user_resource_name(user.header.id),
1343                    &BonsaiAction::Server(ServerAction::Authenticate(
1344                        bonsaidb_core::connection::AuthenticationMethod::PasswordHash,
1345                    )),
1346                )?;
1347                loaded_user = Some(user);
1348            }
1349            #[cfg(feature = "token-authentication")]
1350            bonsaidb_core::connection::Authentication::TokenChallengeResponse(_) => {}
1351        }
1352        self.instance.authenticate_inner(
1353            authentication,
1354            loaded_user,
1355            self.authentication
1356                .as_ref()
1357                .and_then(|auth| auth.session.lock().id),
1358            &admin,
1359        )
1360    }
1361
1362    fn assume_identity(
1363        &self,
1364        identity: IdentityReference<'_>,
1365    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
1366        match identity {
1367            IdentityReference::User(user) => {
1368                let admin = self.admin();
1369                let user =
1370                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1371                self.check_permission(
1372                    user_resource_name(user.header.id),
1373                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
1374                )?;
1375                self.instance.assume_user(user, &admin)
1376            }
1377            IdentityReference::Role(role) => {
1378                let admin = self.admin();
1379                let role =
1380                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1381                self.check_permission(
1382                    role_resource_name(role.header.id),
1383                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
1384                )?;
1385                self.instance.assume_role(role, &admin)
1386            }
1387
1388            _ => Err(bonsaidb_core::Error::InvalidCredentials),
1389        }
1390    }
1391
1392    fn add_permission_group_to_user<
1393        'user,
1394        'group,
1395        U: Nameable<'user, u64> + Send + Sync,
1396        G: Nameable<'group, u64> + Send + Sync,
1397    >(
1398        &self,
1399        user: U,
1400        permission_group: G,
1401    ) -> Result<(), bonsaidb_core::Error> {
1402        self.instance
1403            .update_user_with_named_id::<PermissionGroup, _, _, _>(
1404                user,
1405                permission_group,
1406                |user, permission_group_id| {
1407                    self.check_permission(
1408                        user_resource_name(user.header.id),
1409                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
1410                    )?;
1411                    Ok(StorageInstance::add_permission_group_to_user_inner(
1412                        user,
1413                        permission_group_id,
1414                    ))
1415                },
1416            )
1417    }
1418
1419    fn remove_permission_group_from_user<
1420        'user,
1421        'group,
1422        U: Nameable<'user, u64> + Send + Sync,
1423        G: Nameable<'group, u64> + Send + Sync,
1424    >(
1425        &self,
1426        user: U,
1427        permission_group: G,
1428    ) -> Result<(), bonsaidb_core::Error> {
1429        self.instance
1430            .update_user_with_named_id::<PermissionGroup, _, _, _>(
1431                user,
1432                permission_group,
1433                |user, permission_group_id| {
1434                    self.check_permission(
1435                        user_resource_name(user.header.id),
1436                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
1437                    )?;
1438                    Ok(StorageInstance::remove_permission_group_from_user_inner(
1439                        user,
1440                        permission_group_id,
1441                    ))
1442                },
1443            )
1444    }
1445
1446    fn add_role_to_user<
1447        'user,
1448        'group,
1449        U: Nameable<'user, u64> + Send + Sync,
1450        G: Nameable<'group, u64> + Send + Sync,
1451    >(
1452        &self,
1453        user: U,
1454        role: G,
1455    ) -> Result<(), bonsaidb_core::Error> {
1456        self.instance
1457            .update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
1458                self.check_permission(
1459                    user_resource_name(user.header.id),
1460                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
1461                )?;
1462                Ok(StorageInstance::add_role_to_user_inner(user, role_id))
1463            })
1464    }
1465
1466    fn remove_role_from_user<
1467        'user,
1468        'group,
1469        U: Nameable<'user, u64> + Send + Sync,
1470        G: Nameable<'group, u64> + Send + Sync,
1471    >(
1472        &self,
1473        user: U,
1474        role: G,
1475    ) -> Result<(), bonsaidb_core::Error> {
1476        self.instance
1477            .update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
1478                self.check_permission(
1479                    user_resource_name(user.header.id),
1480                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
1481                )?;
1482                Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
1483            })
1484    }
1485}
1486
1487#[test]
1488fn name_validation_tests() {
1489    assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
1490    assert!(matches!(
1491        Storage::validate_name("_internal-names-work"),
1492        Ok(())
1493    ));
1494    assert!(matches!(
1495        Storage::validate_name("-alphaunmericfirstrequired"),
1496        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
1497    ));
1498    assert!(matches!(
1499        Storage::validate_name("\u{2661}"),
1500        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
1501    ));
1502}
1503
1504/// The unique id of a [`Storage`] instance.
1505#[derive(Clone, Copy, Eq, PartialEq, Hash)]
1506pub struct StorageId(u64);
1507
1508impl StorageId {
1509    /// Returns the id as a u64.
1510    #[must_use]
1511    pub const fn as_u64(self) -> u64 {
1512        self.0
1513    }
1514}
1515
1516impl Debug for StorageId {
1517    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1518        // let formatted_length = format!();
1519        write!(f, "{:016x}", self.0)
1520    }
1521}
1522
1523impl Display for StorageId {
1524    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1525        Debug::fmt(self, f)
1526    }
1527}
1528
1529#[derive(Debug, Clone)]
1530#[cfg(any(feature = "compression", feature = "encryption"))]
1531pub(crate) struct TreeVault {
1532    #[cfg(feature = "compression")]
1533    compression: Option<Compression>,
1534    #[cfg(feature = "encryption")]
1535    pub key: Option<KeyId>,
1536    #[cfg(feature = "encryption")]
1537    pub vault: Arc<Vault>,
1538}
1539
1540#[cfg(all(feature = "compression", feature = "encryption"))]
1541impl TreeVault {
1542    pub(crate) fn new_if_needed(
1543        key: Option<KeyId>,
1544        vault: &Arc<Vault>,
1545        compression: Option<Compression>,
1546    ) -> Option<Self> {
1547        if key.is_none() && compression.is_none() {
1548            None
1549        } else {
1550            Some(Self {
1551                key,
1552                compression,
1553                vault: vault.clone(),
1554            })
1555        }
1556    }
1557
1558    fn header(&self, compressed: bool) -> u8 {
1559        let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };
1560
1561        if compressed {
1562            if let Some(compression) = self.compression {
1563                bits |= compression as u8;
1564            }
1565        }
1566
1567        bits
1568    }
1569}
1570
1571#[cfg(all(feature = "compression", feature = "encryption"))]
1572impl nebari::Vault for TreeVault {
1573    type Error = Error;
1574
1575    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1576        // TODO this allocates too much. The vault should be able to do an
1577        // in-place encryption operation so that we can use a single buffer.
1578        let mut includes_compression = false;
1579        let compressed = match (payload.len(), self.compression) {
1580            (128..=usize::MAX, Some(Compression::Lz4)) => {
1581                includes_compression = true;
1582                Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
1583            }
1584            _ => Cow::Borrowed(payload),
1585        };
1586
1587        let mut complete = if let Some(key) = &self.key {
1588            self.vault.encrypt_payload(key, &compressed, None)?
1589        } else {
1590            compressed.into_owned()
1591        };
1592
1593        let header = self.header(includes_compression);
1594        if header != 0 {
1595            let header = [b't', b'r', b'v', header];
1596            complete.splice(0..0, header);
1597        }
1598
1599        Ok(complete)
1600    }
1601
1602    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1603        if payload.len() >= 4 && &payload[0..3] == b"trv" {
1604            let header = payload[3];
1605            let payload = &payload[4..];
1606            let encrypted = (header & 0b1000_0000) != 0;
1607            let compression = header & 0b0111_1111;
1608            let decrypted = if encrypted {
1609                Cow::Owned(self.vault.decrypt_payload(payload, None)?)
1610            } else {
1611                Cow::Borrowed(payload)
1612            };
1613            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
1614            return Ok(match Compression::from_u8(compression) {
1615                Some(Compression::Lz4) => {
1616                    lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
1617                }
1618                None => decrypted.into_owned(),
1619            });
1620        }
1621        self.vault.decrypt_payload(payload, None)
1622    }
1623}
1624
1625/// Functionality that is available on both [`Storage`] and
1626/// [`AsyncStorage`](crate::AsyncStorage).
1627pub trait StorageNonBlocking: Sized {
1628    /// Returns the path of the database storage.
1629    #[must_use]
1630    fn path(&self) -> &Path;
1631
1632    /// Returns a new instance of [`Storage`] with `session` as the effective
1633    /// authentication session. This call will only succeed if there is no
1634    /// current session.
1635    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
1636}
1637
1638impl StorageNonBlocking for Storage {
1639    fn path(&self) -> &Path {
1640        &self.instance.data.path
1641    }
1642
1643    fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
1644        if self.authentication.is_some() {
1645            // TODO better error
1646            return Err(bonsaidb_core::Error::InvalidCredentials);
1647        }
1648
1649        let Some(session_id) = session.id else {
1650            return Ok(Self {
1651                instance: self.instance.clone(),
1652                authentication: None,
1653                effective_session: Some(Arc::new(session)),
1654            });
1655        };
1656
1657        let session_data = self.instance.data.sessions.read();
1658        // TODO better error
1659        let authentication = session_data
1660            .sessions
1661            .get(&session_id)
1662            .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
1663
1664        let authentication_session = authentication.session.lock();
1665        let effective_permissions =
1666            Permissions::merged([&session.permissions, &authentication_session.permissions]);
1667        let effective_session = Session {
1668            id: authentication_session.id,
1669            authentication: authentication_session.authentication.clone(),
1670            permissions: effective_permissions,
1671        };
1672
1673        Ok(Self {
1674            instance: self.instance.clone(),
1675            authentication: Some(authentication.clone()),
1676            effective_session: Some(Arc::new(effective_session)),
1677        })
1678    }
1679}
1680
1681#[cfg(all(feature = "compression", not(feature = "encryption")))]
1682impl TreeVault {
1683    pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
1684        compression.map(|compression| Self {
1685            compression: Some(compression),
1686        })
1687    }
1688}
1689
1690#[cfg(all(feature = "compression", not(feature = "encryption")))]
1691impl nebari::Vault for TreeVault {
1692    type Error = Error;
1693
1694    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1695        Ok(match (payload.len(), self.compression) {
1696            (128..=usize::MAX, Some(Compression::Lz4)) => {
1697                let mut destination =
1698                    vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
1699                let compressed_length =
1700                    lz4_flex::block::compress_into(payload, &mut destination[8..])
1701                        .expect("lz4-flex documents this shouldn't fail");
1702                destination.truncate(compressed_length + 8);
1703                destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
1704                // to_le_bytes() makes it compatible with lz4-flex decompress_size_prepended.
1705                let uncompressed_length =
1706                    u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
1707                destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
1708                destination
1709            }
1710            // TODO this shouldn't copy
1711            _ => payload.to_vec(),
1712        })
1713    }
1714
1715    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1716        if payload.len() >= 4 && &payload[0..3] == b"trv" {
1717            let header = payload[3];
1718            let payload = &payload[4..];
1719            let encrypted = (header & 0b1000_0000) != 0;
1720            let compression = header & 0b0111_1111;
1721            if encrypted {
1722                return Err(Error::EncryptionDisabled);
1723            }
1724
1725            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
1726            return Ok(match Compression::from_u8(compression) {
1727                Some(Compression::Lz4) => {
1728                    lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
1729                }
1730                None => payload.to_vec(),
1731            });
1732        }
1733        Ok(payload.to_vec())
1734    }
1735}
1736
1737#[cfg(all(not(feature = "compression"), feature = "encryption"))]
1738impl TreeVault {
1739    pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
1740        key.map(|key| Self {
1741            key: Some(key),
1742            vault: vault.clone(),
1743        })
1744    }
1745
1746    #[allow(dead_code)] // This implementation is sort of documentation for what it would be. But our Vault payload already can detect if a parsing error occurs, so we don't need a header if only encryption is enabled.
1747    fn header(&self) -> u8 {
1748        if self.key.is_some() {
1749            0b1000_0000
1750        } else {
1751            0
1752        }
1753    }
1754}
1755
1756#[cfg(all(not(feature = "compression"), feature = "encryption"))]
1757impl nebari::Vault for TreeVault {
1758    type Error = Error;
1759
1760    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1761        if let Some(key) = &self.key {
1762            self.vault.encrypt_payload(key, payload, None)
1763        } else {
1764            // TODO does this need to copy?
1765            Ok(payload.to_vec())
1766        }
1767    }
1768
1769    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
1770        self.vault.decrypt_payload(payload, None)
1771    }
1772}
1773
1774#[derive(Clone, Debug)]
1775pub struct StorageLock(StorageId, Arc<LockData>);
1776
1777impl StorageLock {
1778    pub const fn id(&self) -> StorageId {
1779        self.0
1780    }
1781}
1782
1783#[derive(Debug)]
1784struct LockData(File);
1785
1786impl StorageLock {
1787    fn new(id: StorageId, file: File) -> Self {
1788        Self(id, Arc::new(LockData(file)))
1789    }
1790}
1791
1792impl Drop for LockData {
1793    fn drop(&mut self) {
1794        drop(self.0.unlock());
1795    }
1796}