Skip to main content

ethrex_storage/backend/
in_memory.rs

1use crate::api::{
2    PrefixResult, StorageBackend, StorageLockedView, StorageReadView, StorageWriteBatch,
3};
4use crate::error::StoreError;
5use rustc_hash::FxHashMap;
6use std::path::Path;
7use std::sync::{Arc, RwLock};
8
9type Table = FxHashMap<Vec<u8>, Vec<u8>>;
10type Database = FxHashMap<&'static str, Table>;
11
12#[derive(Debug)]
13pub struct InMemoryBackend {
14    // RCU-style snapshot store: readers clone the inner Arc and then read lock-free.
15    // Writes run under the outer write lock and use Arc::make_mut for copy-on-write.
16    // If read snapshots are still alive, writes may clone the full Database.
17    inner: Arc<RwLock<Arc<Database>>>,
18}
19
20impl InMemoryBackend {
21    pub fn open() -> Result<Self, StoreError> {
22        Ok(Self {
23            inner: Arc::new(RwLock::new(Arc::new(Database::default()))),
24        })
25    }
26}
27
28impl StorageBackend for InMemoryBackend {
29    fn clear_table(&self, table: &str) -> Result<(), StoreError> {
30        let mut db = self
31            .inner
32            .write()
33            .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
34
35        let db_mut = Arc::make_mut(&mut *db);
36        if let Some(table_ref) = db_mut.get_mut(table) {
37            table_ref.clear();
38        }
39        Ok(())
40    }
41
42    fn begin_read(&self) -> Result<Arc<dyn StorageReadView>, StoreError> {
43        let snapshot = self
44            .inner
45            .read()
46            .map_err(|_| StoreError::Custom("Failed to acquire read lock".to_string()))?
47            .clone();
48        Ok(Arc::new(InMemoryReadTx { snapshot }))
49    }
50
51    fn begin_write(&self) -> Result<Box<dyn StorageWriteBatch + 'static>, StoreError> {
52        Ok(Box::new(InMemoryWriteTx {
53            backend: self.inner.clone(),
54        }))
55    }
56
57    fn begin_locked(
58        &self,
59        table_name: &'static str,
60    ) -> Result<Box<dyn StorageLockedView>, StoreError> {
61        let snapshot = self
62            .inner
63            .read()
64            .map_err(|_| StoreError::Custom("Failed to acquire read lock".to_string()))?
65            .clone();
66        Ok(Box::new(InMemoryLocked {
67            snapshot,
68            table_name,
69        }))
70    }
71
72    fn create_checkpoint(&self, _path: &Path) -> Result<(), StoreError> {
73        // Checkpoints are not supported for the InMemory DB
74        // Silently ignoring the request to create a checkpoint is harmless
75        Ok(())
76    }
77}
78
79pub struct InMemoryLocked {
80    snapshot: Arc<Database>,
81    table_name: &'static str,
82}
83
84pub struct InMemoryPrefixIter {
85    results: std::vec::IntoIter<PrefixResult>,
86}
87
88impl Iterator for InMemoryPrefixIter {
89    type Item = PrefixResult;
90
91    fn next(&mut self) -> Option<Self::Item> {
92        self.results.next()
93    }
94}
95
96impl StorageLockedView for InMemoryLocked {
97    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
98        Ok(self
99            .snapshot
100            .get(&self.table_name)
101            .and_then(|table_ref| table_ref.get(key))
102            .cloned())
103    }
104}
105
106pub struct InMemoryReadTx {
107    snapshot: Arc<Database>,
108}
109
110impl StorageReadView for InMemoryReadTx {
111    fn get(&self, table: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
112        Ok(self
113            .snapshot
114            .get(table)
115            .and_then(|table_ref| table_ref.get(key))
116            .cloned())
117    }
118
119    fn prefix_iterator(
120        &self,
121        table: &str,
122        prefix: &[u8],
123    ) -> Result<Box<dyn Iterator<Item = PrefixResult> + '_>, StoreError> {
124        let table_data = self.snapshot.get(table).cloned().unwrap_or_default();
125        let prefix_vec = prefix.to_vec();
126
127        let mut entries: Vec<(Vec<u8>, Vec<u8>)> = table_data
128            .into_iter()
129            .filter(|(key, _)| key.starts_with(&prefix_vec))
130            .collect();
131        entries.sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
132
133        let results: Vec<PrefixResult> = entries
134            .into_iter()
135            .map(|(k, v)| Ok((k.into_boxed_slice(), v.into_boxed_slice())))
136            .collect();
137
138        let iter = InMemoryPrefixIter {
139            results: results.into_iter(),
140        };
141        Ok(Box::new(iter))
142    }
143}
144
145pub struct InMemoryWriteTx {
146    backend: Arc<RwLock<Arc<Database>>>,
147}
148
149impl StorageWriteBatch for InMemoryWriteTx {
150    fn put_batch(
151        &mut self,
152        table: &'static str,
153        batch: Vec<(Vec<u8>, Vec<u8>)>,
154    ) -> Result<(), StoreError> {
155        let mut db = self
156            .backend
157            .write()
158            .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
159
160        // Copy-on-write update of the current snapshot.
161        let db_mut = Arc::make_mut(&mut *db);
162        let table_ref = db_mut.entry(table).or_default();
163
164        for (key, value) in batch {
165            table_ref.insert(key, value);
166        }
167
168        Ok(())
169    }
170
171    fn delete(&mut self, table: &str, key: &[u8]) -> Result<(), StoreError> {
172        let mut db = self
173            .backend
174            .write()
175            .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
176
177        let db_mut = Arc::make_mut(&mut *db);
178        if let Some(table_ref) = db_mut.get_mut(table) {
179            table_ref.remove(key);
180        }
181        Ok(())
182    }
183
184    fn merge(&mut self, table: &'static str, key: &[u8], operand: &[u8]) -> Result<(), StoreError> {
185        // InMemory has no native merge operator, so apply the merge inline.
186        // Only TRANSACTION_LOCATIONS uses merge today; dispatch by table.
187        if table != crate::api::tables::TRANSACTION_LOCATIONS {
188            return Err(StoreError::Custom(format!(
189                "merge not supported for table {table}"
190            )));
191        }
192        let mut db = self
193            .backend
194            .write()
195            .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
196        let db_mut = Arc::make_mut(&mut *db);
197        let table_ref = db_mut.entry(table).or_default();
198        let existing = table_ref.get(key).map(|v| v.as_slice());
199        let merged = crate::store::tx_locations_merge(existing, std::iter::once(operand))
200            .ok_or_else(|| StoreError::Custom("tx_locations_merge returned None".to_string()))?;
201        table_ref.insert(key.to_vec(), merged);
202        Ok(())
203    }
204
205    fn commit(&mut self) -> Result<(), StoreError> {
206        // FIXME: in-memory writes aren't atomic
207        Ok(())
208    }
209}