ethrex_storage/backend/
in_memory.rs1use crate::api::{
2 PrefixResult, StorageBackend, StorageLockedView, StorageReadView, StorageWriteBatch,
3};
4use crate::error::StoreError;
5use rustc_hash::FxHashMap;
6use std::path::Path;
7use std::sync::{Arc, RwLock};
8
9type Table = FxHashMap<Vec<u8>, Vec<u8>>;
10type Database = FxHashMap<&'static str, Table>;
11
12#[derive(Debug)]
13pub struct InMemoryBackend {
14 inner: Arc<RwLock<Arc<Database>>>,
18}
19
20impl InMemoryBackend {
21 pub fn open() -> Result<Self, StoreError> {
22 Ok(Self {
23 inner: Arc::new(RwLock::new(Arc::new(Database::default()))),
24 })
25 }
26}
27
28impl StorageBackend for InMemoryBackend {
29 fn clear_table(&self, table: &str) -> Result<(), StoreError> {
30 let mut db = self
31 .inner
32 .write()
33 .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
34
35 let db_mut = Arc::make_mut(&mut *db);
36 if let Some(table_ref) = db_mut.get_mut(table) {
37 table_ref.clear();
38 }
39 Ok(())
40 }
41
42 fn begin_read(&self) -> Result<Arc<dyn StorageReadView>, StoreError> {
43 let snapshot = self
44 .inner
45 .read()
46 .map_err(|_| StoreError::Custom("Failed to acquire read lock".to_string()))?
47 .clone();
48 Ok(Arc::new(InMemoryReadTx { snapshot }))
49 }
50
51 fn begin_write(&self) -> Result<Box<dyn StorageWriteBatch + 'static>, StoreError> {
52 Ok(Box::new(InMemoryWriteTx {
53 backend: self.inner.clone(),
54 }))
55 }
56
57 fn begin_locked(
58 &self,
59 table_name: &'static str,
60 ) -> Result<Box<dyn StorageLockedView>, StoreError> {
61 let snapshot = self
62 .inner
63 .read()
64 .map_err(|_| StoreError::Custom("Failed to acquire read lock".to_string()))?
65 .clone();
66 Ok(Box::new(InMemoryLocked {
67 snapshot,
68 table_name,
69 }))
70 }
71
72 fn create_checkpoint(&self, _path: &Path) -> Result<(), StoreError> {
73 Ok(())
76 }
77}
78
79pub struct InMemoryLocked {
80 snapshot: Arc<Database>,
81 table_name: &'static str,
82}
83
84pub struct InMemoryPrefixIter {
85 results: std::vec::IntoIter<PrefixResult>,
86}
87
88impl Iterator for InMemoryPrefixIter {
89 type Item = PrefixResult;
90
91 fn next(&mut self) -> Option<Self::Item> {
92 self.results.next()
93 }
94}
95
96impl StorageLockedView for InMemoryLocked {
97 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
98 Ok(self
99 .snapshot
100 .get(&self.table_name)
101 .and_then(|table_ref| table_ref.get(key))
102 .cloned())
103 }
104}
105
106pub struct InMemoryReadTx {
107 snapshot: Arc<Database>,
108}
109
110impl StorageReadView for InMemoryReadTx {
111 fn get(&self, table: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
112 Ok(self
113 .snapshot
114 .get(table)
115 .and_then(|table_ref| table_ref.get(key))
116 .cloned())
117 }
118
119 fn prefix_iterator(
120 &self,
121 table: &str,
122 prefix: &[u8],
123 ) -> Result<Box<dyn Iterator<Item = PrefixResult> + '_>, StoreError> {
124 let table_data = self.snapshot.get(table).cloned().unwrap_or_default();
125 let prefix_vec = prefix.to_vec();
126
127 let mut entries: Vec<(Vec<u8>, Vec<u8>)> = table_data
128 .into_iter()
129 .filter(|(key, _)| key.starts_with(&prefix_vec))
130 .collect();
131 entries.sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
132
133 let results: Vec<PrefixResult> = entries
134 .into_iter()
135 .map(|(k, v)| Ok((k.into_boxed_slice(), v.into_boxed_slice())))
136 .collect();
137
138 let iter = InMemoryPrefixIter {
139 results: results.into_iter(),
140 };
141 Ok(Box::new(iter))
142 }
143}
144
145pub struct InMemoryWriteTx {
146 backend: Arc<RwLock<Arc<Database>>>,
147}
148
149impl StorageWriteBatch for InMemoryWriteTx {
150 fn put_batch(
151 &mut self,
152 table: &'static str,
153 batch: Vec<(Vec<u8>, Vec<u8>)>,
154 ) -> Result<(), StoreError> {
155 let mut db = self
156 .backend
157 .write()
158 .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
159
160 let db_mut = Arc::make_mut(&mut *db);
162 let table_ref = db_mut.entry(table).or_default();
163
164 for (key, value) in batch {
165 table_ref.insert(key, value);
166 }
167
168 Ok(())
169 }
170
171 fn delete(&mut self, table: &str, key: &[u8]) -> Result<(), StoreError> {
172 let mut db = self
173 .backend
174 .write()
175 .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
176
177 let db_mut = Arc::make_mut(&mut *db);
178 if let Some(table_ref) = db_mut.get_mut(table) {
179 table_ref.remove(key);
180 }
181 Ok(())
182 }
183
184 fn merge(&mut self, table: &'static str, key: &[u8], operand: &[u8]) -> Result<(), StoreError> {
185 if table != crate::api::tables::TRANSACTION_LOCATIONS {
188 return Err(StoreError::Custom(format!(
189 "merge not supported for table {table}"
190 )));
191 }
192 let mut db = self
193 .backend
194 .write()
195 .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?;
196 let db_mut = Arc::make_mut(&mut *db);
197 let table_ref = db_mut.entry(table).or_default();
198 let existing = table_ref.get(key).map(|v| v.as_slice());
199 let merged = crate::store::tx_locations_merge(existing, std::iter::once(operand))
200 .ok_or_else(|| StoreError::Custom("tx_locations_merge returned None".to_string()))?;
201 table_ref.insert(key.to_vec(), merged);
202 Ok(())
203 }
204
205 fn commit(&mut self) -> Result<(), StoreError> {
206 Ok(())
208 }
209}