Skip to main content

moltendb_core/engine/
open.rs

1// ─── engine/open.rs ───────────────────────────────────────────────────────────
2// Native (non-WASM) constructor for the Db struct.
3// Opens or creates a database at the given file path.
4// ─────────────────────────────────────────────────────────────────────────────
5
6#[cfg(not(target_arch = "wasm32"))]
7use std::sync::Arc;
8#[cfg(not(target_arch = "wasm32"))]
9use dashmap::{DashMap, DashSet};
10#[cfg(not(target_arch = "wasm32"))]
11use tokio::sync::broadcast;
12
13use crate::engine::Db;
14#[cfg(not(target_arch = "wasm32"))]
15use crate::engine::DbError;
16#[cfg(not(target_arch = "wasm32"))]
17use crate::engine::config::DbConfig;
18#[cfg(not(target_arch = "wasm32"))]
19use crate::engine::storage;
20
21impl Db {
22    /// Open (or create) a database at the given file path.
23    /// Only available on native (non-WASM) builds.
24    ///
25    /// `sync_mode`      — if true, use SyncDiskStorage (flush on every write).
26    ///                    if false, use AsyncDiskStorage (flush every 50ms).
27    ///                    Ignored when `tiered_mode` is true.
28    /// `tiered_mode`    — if true, use TieredStorage (hot + cold two-tier backend).
29    ///                    Hot writes go to the active log; cold data is archived and
30    ///                    read via mmap on startup. Best for large datasets (100k+ docs).
31    ///                    Enable with STORAGE_MODE=tiered environment variable.
32    /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
33    ///                    if None, data is stored in plaintext (not recommended).
34    #[cfg(not(target_arch = "wasm32"))]
35    pub fn open(config: DbConfig) -> Result<Self, DbError> {
36        let path = &config.path;
37        let sync_mode = config.sync_mode;
38        let tiered_mode = config.tiered_mode;
39        let hot_threshold = config.hot_threshold;
40        let rate_limit_requests = config.rate_limit_requests.unwrap_or(1000);
41        let rate_limit_window = config.rate_limit_window.unwrap_or(60);
42        let max_body_size = config.max_body_size;
43        let max_keys_per_request = config.max_keys_per_request;
44        let encryption_key = config.encryption_key;
45        let post_backup_script = config.post_backup_script;
46        let in_memory = config.in_memory;
47
48        // Create the shared in-memory state containers.
49        let state = Arc::new(DashMap::new());
50        // Create the broadcast channel with a buffer of 100 messages.
51        // If the buffer fills up (no subscribers reading), old messages are dropped.
52        let (tx, _rx) = broadcast::channel(1000);
53        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
54            Arc::new(Default::default());
55        let query_heatmap = Arc::new(Default::default());
56        #[cfg(feature = "schema")]
57        let schemas = Arc::new(DashMap::new());
58
59        // Ensure the parent directory exists (skipped in in-memory mode — no file is created).
60        if !in_memory {
61            if let Some(parent) = std::path::Path::new(path).parent() {
62                std::fs::create_dir_all(parent)?;
63            }
64        }
65
66        // Choose the base storage backend based on the configured mode.
67        //
68        //   in_memory = true    → InMemoryStorage: all data lives in the DashMap only.
69        //                         No disk I/O at all. Data is lost on exit.
70        //                         Ideal for ephemeral caches and CI environments.
71        //
72        //   tiered_mode = true  → TieredStorage: hot log (async writes) + cold log
73        //                         (mmap reads). Best for large datasets. The cold log
74        //                         accumulates promoted hot data and is paged by the OS.
75        //
76        //   sync_mode = true    → SyncDiskStorage: every write is flushed to disk
77        //                         immediately. Zero data loss, lower throughput.
78        //
79        //   default             → AsyncDiskStorage: writes buffered in memory, flushed
80        //                         every 50ms. Highest throughput, up to 50ms data loss.
81        let base_storage: Arc<dyn crate::engine::storage::StorageBackend> = if in_memory {
82            Arc::new(storage::InMemoryStorage)
83        } else if tiered_mode {
84            Arc::new(storage::TieredStorage::new(path)?)
85        } else if sync_mode {
86            Arc::new(storage::SyncDiskStorage::new(path)?)
87        } else {
88            Arc::new(storage::AsyncDiskStorage::new(path)?)
89        };
90
91        // Optionally wrap the base storage in EncryptedStorage.
92        // Encryption is skipped in in-memory mode — there is nothing to encrypt on disk.
93        // EncryptedStorage is transparent — it encrypts on write and decrypts
94        // on read, so the rest of the engine doesn't know encryption is happening.
95        let storage: Arc<dyn crate::engine::storage::StorageBackend> = if !in_memory {
96            if let Some(key) = encryption_key {
97                Arc::new(storage::EncryptedStorage::new(base_storage, &key))
98            } else {
99                base_storage
100            }
101        } else {
102            base_storage
103        };
104
105        // Replay the log (or snapshot + delta) into the in-memory state.
106        // After this call, `state` and `indexes` reflect the persisted data.
107        storage::stream_into_state(
108            &*storage,
109            &state,
110            &indexes,
111            #[cfg(feature = "schema")] &schemas,
112        )?;
113
114        Ok(Self {
115            state,
116            storage,
117            tx,
118            indexes,
119            query_heatmap,
120            hot_threshold,
121            rate_limit_requests,
122            rate_limit_window,
123            max_body_size,
124            max_keys_per_request,
125            #[cfg(feature = "schema")]
126            schemas,
127            post_backup_script,
128            tiered_mode,
129            #[cfg(not(target_arch = "wasm32"))]
130            started_at: std::time::Instant::now(),
131        })
132    }
133}