Skip to main content

mqdb_core/storage/
mod.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4mod backend;
5#[cfg(feature = "fjall-backend")]
6mod encrypted_backend;
7#[cfg(feature = "fjall-backend")]
8mod fjall_backend;
9mod memory_backend;
10
11pub use backend::{AsyncBatchOperations, AsyncStorageBackend, BatchOperations, StorageBackend};
12#[cfg(feature = "fjall-backend")]
13pub use encrypted_backend::EncryptedBackend;
14#[cfg(feature = "fjall-backend")]
15pub use fjall_backend::FjallBackend;
16pub use memory_backend::MemoryBackend;
17
18use crate::error::Result;
19use std::sync::Arc;
20
21pub struct Storage {
22    backend: Arc<dyn StorageBackend>,
23}
24
25impl Storage {
26    /// # Errors
27    /// Returns an error if the storage backend fails to open.
28    #[cfg(feature = "fjall-backend")]
29    pub fn open<P: AsRef<std::path::Path>>(
30        path: P,
31        durability: crate::config::DurabilityMode,
32    ) -> Result<Self> {
33        let backend = FjallBackend::open(path, durability)?;
34        Ok(Self {
35            backend: Arc::new(backend),
36        })
37    }
38
39    /// # Errors
40    /// Returns an error if the storage backend fails to open or passphrase is invalid.
41    #[cfg(feature = "fjall-backend")]
42    pub fn open_encrypted<P: AsRef<std::path::Path>>(
43        path: P,
44        passphrase: &str,
45        durability: crate::config::DurabilityMode,
46    ) -> Result<Self> {
47        let inner = Arc::new(FjallBackend::open(path, durability)?);
48        let encrypted = EncryptedBackend::open(inner, passphrase)?;
49        Ok(Self {
50            backend: Arc::new(encrypted),
51        })
52    }
53
54    #[allow(clippy::must_use_candidate)]
55    pub fn memory() -> Self {
56        Self {
57            backend: Arc::new(MemoryBackend::new()),
58        }
59    }
60
61    pub fn with_backend(backend: Arc<dyn StorageBackend>) -> Self {
62        Self { backend }
63    }
64
65    /// # Errors
66    /// Returns an error if the storage operation fails.
67    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
68        self.backend.get(key)
69    }
70
71    /// # Errors
72    /// Returns an error if the storage operation fails.
73    pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
74        self.backend.insert(key, value)
75    }
76
77    /// # Errors
78    /// Returns an error if the storage operation fails.
79    pub fn remove(&self, key: &[u8]) -> Result<()> {
80        self.backend.remove(key)
81    }
82
83    /// # Errors
84    /// Returns an error if the storage operation fails.
85    pub fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
86        self.backend.prefix_scan(prefix)
87    }
88
89    /// # Errors
90    /// Returns an error if the storage operation fails.
91    pub fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
92        self.backend.prefix_count(prefix)
93    }
94
95    /// # Errors
96    /// Returns an error if the storage operation fails.
97    pub fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
98        self.backend.prefix_scan_keys(prefix)
99    }
100
101    /// # Errors
102    /// Returns an error if the storage operation fails.
103    pub fn prefix_scan_batch(
104        &self,
105        prefix: &[u8],
106        batch_size: usize,
107        after_key: Option<&[u8]>,
108    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
109        self.backend
110            .prefix_scan_batch(prefix, batch_size, after_key)
111    }
112
113    /// # Errors
114    /// Returns an error if the storage operation fails.
115    pub fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
116        self.backend.range_scan(start, end)
117    }
118
119    #[allow(clippy::must_use_candidate)]
120    pub fn batch(&self) -> BatchWriter {
121        BatchWriter {
122            inner: self.backend.batch(),
123        }
124    }
125
126    /// # Errors
127    /// Returns an error if the storage operation fails.
128    pub fn flush(&self) -> Result<()> {
129        self.backend.flush()
130    }
131}
132
133pub struct BatchWriter {
134    inner: Box<dyn BatchOperations>,
135}
136
137impl BatchWriter {
138    pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
139        self.inner.insert(key, value);
140    }
141
142    pub fn remove(&mut self, key: Vec<u8>) {
143        self.inner.remove(key);
144    }
145
146    pub fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
147        self.inner.expect_value(key, expected_value);
148    }
149
150    /// Commits all queued operations atomically.
151    ///
152    /// # Errors
153    /// Returns an error if the commit fails or expected values don't match.
154    pub fn commit(self) -> Result<()> {
155        self.inner.commit()
156    }
157}
158
159pub struct AsyncStorage<B: AsyncStorageBackend> {
160    backend: B,
161}
162
163impl<B: AsyncStorageBackend> AsyncStorage<B> {
164    #[allow(clippy::must_use_candidate)]
165    pub fn new(backend: B) -> Self {
166        Self { backend }
167    }
168
169    /// # Errors
170    /// Returns an error if the storage operation fails.
171    pub async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
172        self.backend.get(key).await
173    }
174
175    /// # Errors
176    /// Returns an error if the storage operation fails.
177    pub async fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
178        self.backend.insert(key, value).await
179    }
180
181    /// # Errors
182    /// Returns an error if the storage operation fails.
183    pub async fn remove(&self, key: &[u8]) -> Result<()> {
184        self.backend.remove(key).await
185    }
186
187    /// # Errors
188    /// Returns an error if the storage operation fails.
189    pub async fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
190        self.backend.prefix_scan(prefix).await
191    }
192
193    /// # Errors
194    /// Returns an error if the storage operation fails.
195    pub async fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
196        self.backend.prefix_count(prefix).await
197    }
198
199    /// # Errors
200    /// Returns an error if the storage operation fails.
201    pub async fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
202        self.backend.prefix_scan_keys(prefix).await
203    }
204
205    /// # Errors
206    /// Returns an error if the storage operation fails.
207    pub async fn prefix_scan_batch(
208        &self,
209        prefix: &[u8],
210        batch_size: usize,
211        after_key: Option<&[u8]>,
212    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
213        self.backend
214            .prefix_scan_batch(prefix, batch_size, after_key)
215            .await
216    }
217
218    /// # Errors
219    /// Returns an error if the storage operation fails.
220    pub async fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
221        self.backend.range_scan(start, end).await
222    }
223
224    #[allow(clippy::must_use_candidate)]
225    pub fn batch(&self) -> AsyncBatchWriter<B::Batch> {
226        AsyncBatchWriter {
227            inner: self.backend.batch(),
228        }
229    }
230
231    /// # Errors
232    /// Returns an error if the flush operation fails.
233    pub async fn flush(&self) -> Result<()> {
234        self.backend.flush().await
235    }
236}
237
238pub struct AsyncBatchWriter<B: AsyncBatchOperations> {
239    inner: B,
240}
241
242impl<B: AsyncBatchOperations> AsyncBatchWriter<B> {
243    pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
244        self.inner.insert(key, value);
245    }
246
247    pub fn remove(&mut self, key: Vec<u8>) {
248        self.inner.remove(key);
249    }
250
251    pub fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
252        self.inner.expect_value(key, expected_value);
253    }
254
255    /// # Errors
256    /// Returns an error if the commit fails.
257    pub async fn commit(self) -> Result<()> {
258        self.inner.commit().await
259    }
260}