Skip to main content

mqdb_core/storage/
memory_backend.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::backend::{BatchOperations, StorageBackend};
5use crate::error::{Error, Result};
6use std::collections::BTreeMap;
7use std::sync::{Arc, RwLock};
8
9pub struct MemoryBackend {
10    data: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
11}
12
13impl MemoryBackend {
14    #[allow(clippy::must_use_candidate)]
15    pub fn new() -> Self {
16        Self {
17            data: Arc::new(RwLock::new(BTreeMap::new())),
18        }
19    }
20}
21
22impl Default for MemoryBackend {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl StorageBackend for MemoryBackend {
29    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
30        let data = self
31            .data
32            .read()
33            .map_err(|e| Error::Internal(e.to_string()))?;
34        Ok(data.get(key).cloned())
35    }
36
37    fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
38        let mut data = self
39            .data
40            .write()
41            .map_err(|e| Error::Internal(e.to_string()))?;
42        data.insert(key.to_vec(), value.to_vec());
43        Ok(())
44    }
45
46    fn remove(&self, key: &[u8]) -> Result<()> {
47        let mut data = self
48            .data
49            .write()
50            .map_err(|e| Error::Internal(e.to_string()))?;
51        data.remove(key);
52        Ok(())
53    }
54
55    fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
56        let data = self
57            .data
58            .read()
59            .map_err(|e| Error::Internal(e.to_string()))?;
60        let results: Vec<_> = data
61            .range(prefix.to_vec()..)
62            .take_while(|(k, _)| k.starts_with(prefix))
63            .map(|(k, v)| (k.clone(), v.clone()))
64            .collect();
65        Ok(results)
66    }
67
68    fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
69        let data = self
70            .data
71            .read()
72            .map_err(|e| Error::Internal(e.to_string()))?;
73        Ok(data
74            .range(prefix.to_vec()..)
75            .take_while(|(k, _)| k.starts_with(prefix))
76            .count())
77    }
78
79    fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
80        let data = self
81            .data
82            .read()
83            .map_err(|e| Error::Internal(e.to_string()))?;
84        Ok(data
85            .range(prefix.to_vec()..)
86            .take_while(|(k, _)| k.starts_with(prefix))
87            .map(|(k, _)| k.clone())
88            .collect())
89    }
90
91    fn prefix_scan_batch(
92        &self,
93        prefix: &[u8],
94        batch_size: usize,
95        after_key: Option<&[u8]>,
96    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
97        let data = self
98            .data
99            .read()
100            .map_err(|e| Error::Internal(e.to_string()))?;
101        let start: Vec<u8> = if let Some(after) = after_key {
102            let mut s = after.to_vec();
103            s.push(0);
104            s
105        } else {
106            prefix.to_vec()
107        };
108        Ok(data
109            .range(start..)
110            .take_while(|(k, _)| k.starts_with(prefix))
111            .take(batch_size)
112            .map(|(k, v)| (k.clone(), v.clone()))
113            .collect())
114    }
115
116    fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
117        let data = self
118            .data
119            .read()
120            .map_err(|e| Error::Internal(e.to_string()))?;
121        let results: Vec<_> = data
122            .range(start.to_vec()..end.to_vec())
123            .map(|(k, v)| (k.clone(), v.clone()))
124            .collect();
125        Ok(results)
126    }
127
128    fn batch(&self) -> Box<dyn BatchOperations> {
129        Box::new(MemoryBatch {
130            data: Arc::clone(&self.data),
131            operations: Vec::new(),
132            preconditions: Vec::new(),
133        })
134    }
135
136    fn flush(&self) -> Result<()> {
137        Ok(())
138    }
139}
140
141enum BatchOp {
142    Insert(Vec<u8>, Vec<u8>),
143    Remove(Vec<u8>),
144}
145
146struct Precondition {
147    key: Vec<u8>,
148    expected_value: Vec<u8>,
149}
150
151pub struct MemoryBatch {
152    data: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
153    operations: Vec<BatchOp>,
154    preconditions: Vec<Precondition>,
155}
156
157impl BatchOperations for MemoryBatch {
158    fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
159        self.operations.push(BatchOp::Insert(key, value));
160    }
161
162    fn remove(&mut self, key: Vec<u8>) {
163        self.operations.push(BatchOp::Remove(key));
164    }
165
166    fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
167        self.preconditions.push(Precondition {
168            key,
169            expected_value,
170        });
171    }
172
173    fn commit(self: Box<Self>) -> Result<()> {
174        let mut data = self
175            .data
176            .write()
177            .map_err(|e| Error::Internal(e.to_string()))?;
178
179        for precondition in &self.preconditions {
180            let actual = data.get(&precondition.key);
181            match actual {
182                Some(val) if val.as_slice() == precondition.expected_value.as_slice() => {}
183                _ => {
184                    return Err(Error::Conflict(
185                        "optimistic lock failed: value was modified".into(),
186                    ));
187                }
188            }
189        }
190
191        for op in self.operations {
192            match op {
193                BatchOp::Insert(k, v) => {
194                    data.insert(k, v);
195                }
196                BatchOp::Remove(k) => {
197                    data.remove(&k);
198                }
199            }
200        }
201
202        Ok(())
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_basic_operations() {
212        let backend = MemoryBackend::new();
213
214        StorageBackend::insert(&backend, b"key1", b"value1").unwrap();
215        assert_eq!(
216            StorageBackend::get(&backend, b"key1").unwrap(),
217            Some(b"value1".to_vec())
218        );
219
220        StorageBackend::remove(&backend, b"key1").unwrap();
221        assert_eq!(StorageBackend::get(&backend, b"key1").unwrap(), None);
222    }
223
224    #[test]
225    fn test_prefix_scan() {
226        let backend = MemoryBackend::new();
227
228        StorageBackend::insert(&backend, b"users/1", b"alice").unwrap();
229        StorageBackend::insert(&backend, b"users/2", b"bob").unwrap();
230        StorageBackend::insert(&backend, b"posts/1", b"hello").unwrap();
231
232        let results = StorageBackend::prefix_scan(&backend, b"users/").unwrap();
233        assert_eq!(results.len(), 2);
234    }
235
236    #[test]
237    fn test_batch_commit() {
238        let backend = MemoryBackend::new();
239
240        let mut batch = StorageBackend::batch(&backend);
241        batch.insert(b"key1".to_vec(), b"value1".to_vec());
242        batch.insert(b"key2".to_vec(), b"value2".to_vec());
243        batch.commit().unwrap();
244
245        assert_eq!(
246            StorageBackend::get(&backend, b"key1").unwrap(),
247            Some(b"value1".to_vec())
248        );
249        assert_eq!(
250            StorageBackend::get(&backend, b"key2").unwrap(),
251            Some(b"value2".to_vec())
252        );
253    }
254
255    #[test]
256    fn test_optimistic_lock_success() {
257        let backend = MemoryBackend::new();
258        StorageBackend::insert(&backend, b"key1", b"value1").unwrap();
259
260        let mut batch = StorageBackend::batch(&backend);
261        batch.expect_value(b"key1".to_vec(), b"value1".to_vec());
262        batch.insert(b"key1".to_vec(), b"value2".to_vec());
263        batch.commit().unwrap();
264
265        assert_eq!(
266            StorageBackend::get(&backend, b"key1").unwrap(),
267            Some(b"value2".to_vec())
268        );
269    }
270
271    #[test]
272    fn test_optimistic_lock_failure() {
273        let backend = MemoryBackend::new();
274        StorageBackend::insert(&backend, b"key1", b"value1").unwrap();
275
276        let mut batch = StorageBackend::batch(&backend);
277        batch.expect_value(b"key1".to_vec(), b"wrong_value".to_vec());
278        batch.insert(b"key1".to_vec(), b"value2".to_vec());
279
280        let result = batch.commit();
281        assert!(result.is_err());
282        assert_eq!(
283            StorageBackend::get(&backend, b"key1").unwrap(),
284            Some(b"value1".to_vec())
285        );
286    }
287}