avalanche_types/subnet/rpc/database/versiondb/
mod.rs1pub mod batch;
4pub mod iterator;
5
6use std::{
7 collections::HashMap,
8 io,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use crate::subnet::rpc::{
16 database::{self, batch::BoxedBatch, iterator::BoxedIterator, BoxedDatabase},
17 errors::Error,
18};
19
20use tokio::sync::RwLock;
21
22#[derive(Clone)]
28pub struct Database {
29 db: BoxedDatabase,
30 mem: Arc<RwLock<HashMap<Vec<u8>, iterator::ValueDelete>>>,
31 batch: BoxedBatch,
32 closed: Arc<AtomicBool>,
34}
35
36impl Database {
37 pub fn new(db: BoxedDatabase, batch: BoxedBatch) -> Self {
38 Self {
39 db,
40 mem: Arc::new(RwLock::new(HashMap::new())),
41 batch,
42 closed: Arc::new(AtomicBool::new(false)),
43 }
44 }
45}
46
47#[tonic::async_trait]
48impl database::KeyValueReaderWriterDeleter for Database {
49 async fn has(&self, key: &[u8]) -> io::Result<bool> {
51 if self.closed.load(Ordering::Relaxed) {
52 return Err(Error::DatabaseClosed.to_err());
53 }
54
55 let mem = self.mem.read().await;
56 if let Some(value) = mem.get(key) {
57 return Ok(!value.delete);
58 }
59
60 self.db.has(key).await
61 }
62
63 async fn get(&self, key: &[u8]) -> io::Result<Vec<u8>> {
65 if self.closed.load(Ordering::Relaxed) {
66 return Err(Error::DatabaseClosed.to_err());
67 }
68
69 let mem = self.mem.read().await;
70 if let Some(val) = mem.get(key) {
71 return Ok(val.value.clone());
72 }
73
74 self.db.get(key).await
75 }
76
77 async fn put(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
79 if self.closed.load(Ordering::Relaxed) {
80 return Err(Error::DatabaseClosed.to_err());
81 }
82
83 let mut mem = self.mem.write().await;
84 mem.insert(
85 key.to_vec(),
86 iterator::ValueDelete {
87 value: value.to_vec(),
88 delete: false,
89 },
90 );
91
92 Ok(())
93 }
94
95 async fn delete(&mut self, key: &[u8]) -> io::Result<()> {
97 if self.closed.load(Ordering::Relaxed) {
98 return Err(Error::DatabaseClosed.to_err());
99 }
100
101 let mut mem = self.mem.write().await;
102 if let Some(val) = mem.get_mut(key) {
103 val.delete = true;
104 }
105 mem.insert(
106 key.to_vec(),
107 iterator::ValueDelete {
108 value: vec![],
109 delete: true,
110 },
111 );
112
113 Ok(())
114 }
115}
116
117#[tonic::async_trait]
118impl database::Closer for Database {
119 async fn close(&self) -> io::Result<()> {
121 if self.closed.load(Ordering::Relaxed) {
122 return Err(Error::DatabaseClosed.to_err());
123 }
124 self.closed.store(true, Ordering::Relaxed);
125
126 Ok(())
127 }
128}
129
130#[tonic::async_trait]
131impl crate::subnet::rpc::health::Checkable for Database {
132 async fn health_check(&self) -> io::Result<Vec<u8>> {
134 if self.closed.load(Ordering::Relaxed) {
135 return Err(Error::DatabaseClosed.to_err());
136 }
137
138 self.db.health_check().await
139 }
140}
141
142#[tonic::async_trait]
143impl database::iterator::Iteratee for Database {
144 async fn new_iterator(&self) -> io::Result<BoxedIterator> {
146 self.new_iterator_with_start_and_prefix(&[], &[]).await
147 }
148
149 async fn new_iterator_with_start(&self, start: &[u8]) -> io::Result<BoxedIterator> {
151 self.new_iterator_with_start_and_prefix(start, &[]).await
152 }
153
154 async fn new_iterator_with_prefix(&self, prefix: &[u8]) -> io::Result<BoxedIterator> {
156 self.new_iterator_with_start_and_prefix(&[], prefix).await
157 }
158
159 async fn new_iterator_with_start_and_prefix(
161 &self,
162 start: &[u8],
163 prefix: &[u8],
164 ) -> io::Result<BoxedIterator> {
165 if self.closed.load(Ordering::Relaxed) {
166 return Err(Error::DatabaseClosed.to_err());
167 }
168
169 let mem = self.mem.write().await;
170 let mut keys: Vec<Vec<u8>> = Vec::with_capacity(mem.len());
171 for (k, _v) in mem.iter() {
172 if k.starts_with(prefix) && k >= &start.to_vec() {
173 keys.push(k.to_owned());
174 }
175 }
176
177 keys.sort();
179
180 let mut values: Vec<iterator::ValueDelete> = Vec::with_capacity(keys.len());
181 for key in keys.iter() {
182 if let Some(v) = mem.get(key) {
183 values.push(v.to_owned());
184 }
185 }
186
187 Ok(iterator::Iterator::new_boxed(
188 keys,
189 values,
190 Arc::clone(&self.closed),
191 self.db
192 .new_iterator_with_start_and_prefix(start, prefix)
193 .await?,
194 ))
195 }
196}
197
198#[tonic::async_trait]
199impl database::batch::Batcher for Database {
200 async fn new_batch(&self) -> io::Result<BoxedBatch> {
202 Ok(Box::new(batch::Batch::new(
203 Arc::clone(&self.mem),
204 Arc::clone(&self.closed),
205 )))
206 }
207}
208
209#[tonic::async_trait]
210impl database::Commitable for Database {
211 async fn commit(&mut self) -> io::Result<()> {
213 let mut batch = self.commit_batch().await?;
214 batch.write().await?;
215 batch.reset().await;
216 self.abort().await?;
217 Ok(())
218 }
219
220 async fn abort(&self) -> io::Result<()> {
222 let mut mem = self.mem.write().await;
223 mem.clear();
224 Ok(())
225 }
226
227 async fn commit_batch(&mut self) -> io::Result<BoxedBatch> {
229 if self.closed.load(Ordering::Relaxed) {
230 return Err(Error::DatabaseClosed.to_err());
231 }
232
233 let mem = self.mem.read().await;
234 self.batch.reset().await;
235 for (key, value) in mem.iter() {
236 if value.delete {
237 self.batch.delete(key).await?;
238 } else {
239 self.batch.put(key, &value.value).await?;
240 }
241 }
242
243 Ok(self.batch.clone())
244 }
245}
246
247impl database::Database for Database {}
248
249#[tokio::test]
250async fn iterate_test() {
251 use crate::subnet::rpc::database::{
252 iterator::Iteratee, memdb, Commitable, KeyValueReaderWriterDeleter,
253 };
254
255 let base_db = memdb::Database::new_boxed();
256
257 let batch = base_db.new_batch().await.unwrap();
258 let mut db = Database::new(base_db, batch);
259
260 let key1 = "hello1".as_bytes();
261 let value1 = "world1".as_bytes();
262 let key2 = "z".as_bytes();
263 let value2 = "world2".as_bytes();
264
265 db.put(key1, value1).await.unwrap();
266 db.commit().await.unwrap();
267
268 let mut iterator = db.new_iterator().await.unwrap();
269 assert!(iterator.next().await.unwrap());
270 assert_eq!(iterator.key().await.unwrap(), key1);
271 assert_eq!(iterator.value().await.unwrap(), value1);
272 assert!(!iterator.next().await.unwrap());
273 assert!(iterator.key().await.unwrap().is_empty());
274 assert!(iterator.value().await.unwrap().is_empty());
275 assert!(iterator.error().await.is_ok());
276
277 db.put(key2, value2).await.unwrap();
278
279 let mut iterator = db.new_iterator().await.unwrap();
280 assert!(iterator.next().await.unwrap());
281 assert_eq!(iterator.key().await.unwrap(), key1);
282 assert_eq!(iterator.value().await.unwrap(), value1);
283 assert!(iterator.next().await.unwrap());
284 assert_eq!(iterator.key().await.unwrap(), key2);
285 assert_eq!(iterator.value().await.unwrap(), value2);
286 assert!(!iterator.next().await.unwrap());
287 assert!(iterator.key().await.unwrap().is_empty());
288 assert!(iterator.value().await.unwrap().is_empty());
289 assert!(iterator.error().await.is_ok());
290
291 db.delete(key1).await.unwrap();
292
293 let mut iterator = db.new_iterator().await.unwrap();
294 assert!(iterator.next().await.unwrap());
295 assert_eq!(iterator.key().await.unwrap(), key2);
296 assert_eq!(iterator.value().await.unwrap(), value2);
297 assert!(!iterator.next().await.unwrap());
298 assert!(iterator.key().await.unwrap().is_empty());
299 assert!(iterator.value().await.unwrap().is_empty());
300 assert!(iterator.error().await.is_ok());
301
302 db.commit().await.unwrap();
303 db.put(key2, value1).await.unwrap();
304
305 let mut iterator = db.new_iterator().await.unwrap();
306 assert!(iterator.next().await.unwrap());
307 assert_eq!(iterator.key().await.unwrap(), key2);
308 assert_eq!(iterator.value().await.unwrap(), value1);
309 assert!(!iterator.next().await.unwrap());
310 assert!(iterator.key().await.unwrap().is_empty());
311 assert!(iterator.value().await.unwrap().is_empty());
312 assert!(iterator.error().await.is_ok());
313
314 db.commit().await.unwrap();
315 db.put(key1, value2).await.unwrap();
316
317 let mut iterator = db.new_iterator().await.unwrap();
318 assert!(iterator.next().await.unwrap());
319 assert_eq!(iterator.key().await.unwrap(), key1);
320 assert_eq!(iterator.value().await.unwrap(), value2);
321 assert!(iterator.next().await.unwrap());
322 assert_eq!(iterator.key().await.unwrap(), key2);
323 assert_eq!(iterator.value().await.unwrap(), value1);
324 assert!(!iterator.next().await.unwrap());
325 assert!(iterator.key().await.unwrap().is_empty());
326 assert!(iterator.value().await.unwrap().is_empty());
327 assert!(iterator.error().await.is_ok());
328}