avalanche_types/subnet/rpc/database/versiondb/
mod.rs

1//! An in-memory database which perists mutations to the underlying database on
2//! commit().
3pub mod batch;
4pub mod iterator;
5
6use std::{
7    collections::HashMap,
8    io,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use crate::subnet::rpc::{
16    database::{self, batch::BoxedBatch, iterator::BoxedIterator, BoxedDatabase},
17    errors::Error,
18};
19
20use tokio::sync::RwLock;
21
22/// Database implements the [`crate::subnet::rpc::database::Database`] interface
23/// by living on top of another database, writing changes to the underlying
24/// database only when commit is called.
25///
26/// ref. <https://pkg.go.dev/github.com/ava-labs/avalanchego/database/versiondb#Database>
27#[derive(Clone)]
28pub struct Database {
29    db: BoxedDatabase,
30    mem: Arc<RwLock<HashMap<Vec<u8>, iterator::ValueDelete>>>,
31    batch: BoxedBatch,
32    /// True if the database is closed.
33    closed: Arc<AtomicBool>,
34}
35
36impl Database {
37    pub fn new(db: BoxedDatabase, batch: BoxedBatch) -> Self {
38        Self {
39            db,
40            mem: Arc::new(RwLock::new(HashMap::new())),
41            batch,
42            closed: Arc::new(AtomicBool::new(false)),
43        }
44    }
45}
46
47#[tonic::async_trait]
48impl database::KeyValueReaderWriterDeleter for Database {
49    /// Implements the [`crate::subnet::rpc::database::KeyValueReaderWriterDeleter`] trait.
50    async fn has(&self, key: &[u8]) -> io::Result<bool> {
51        if self.closed.load(Ordering::Relaxed) {
52            return Err(Error::DatabaseClosed.to_err());
53        }
54
55        let mem = self.mem.read().await;
56        if let Some(value) = mem.get(key) {
57            return Ok(!value.delete);
58        }
59
60        self.db.has(key).await
61    }
62
63    /// Implements the [`crate::subnet::rpc::database::KeyValueReaderWriterDeleter`] trait.
64    async fn get(&self, key: &[u8]) -> io::Result<Vec<u8>> {
65        if self.closed.load(Ordering::Relaxed) {
66            return Err(Error::DatabaseClosed.to_err());
67        }
68
69        let mem = self.mem.read().await;
70        if let Some(val) = mem.get(key) {
71            return Ok(val.value.clone());
72        }
73
74        self.db.get(key).await
75    }
76
77    /// Implements the [`crate::subnet::rpc::database::KeyValueReaderWriterDeleter`] trait.
78    async fn put(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
79        if self.closed.load(Ordering::Relaxed) {
80            return Err(Error::DatabaseClosed.to_err());
81        }
82
83        let mut mem = self.mem.write().await;
84        mem.insert(
85            key.to_vec(),
86            iterator::ValueDelete {
87                value: value.to_vec(),
88                delete: false,
89            },
90        );
91
92        Ok(())
93    }
94
95    /// Implements the [`crate::subnet::rpc::database::KeyValueReaderWriterDeleter`] trait.
96    async fn delete(&mut self, key: &[u8]) -> io::Result<()> {
97        if self.closed.load(Ordering::Relaxed) {
98            return Err(Error::DatabaseClosed.to_err());
99        }
100
101        let mut mem = self.mem.write().await;
102        if let Some(val) = mem.get_mut(key) {
103            val.delete = true;
104        }
105        mem.insert(
106            key.to_vec(),
107            iterator::ValueDelete {
108                value: vec![],
109                delete: true,
110            },
111        );
112
113        Ok(())
114    }
115}
116
117#[tonic::async_trait]
118impl database::Closer for Database {
119    /// Implements the [`crate::subnet::rpc::database::Closer`] trait.
120    async fn close(&self) -> io::Result<()> {
121        if self.closed.load(Ordering::Relaxed) {
122            return Err(Error::DatabaseClosed.to_err());
123        }
124        self.closed.store(true, Ordering::Relaxed);
125
126        Ok(())
127    }
128}
129
130#[tonic::async_trait]
131impl crate::subnet::rpc::health::Checkable for Database {
132    /// Implements the [`crate::subnet::rpc::health::Checkable`] trait.
133    async fn health_check(&self) -> io::Result<Vec<u8>> {
134        if self.closed.load(Ordering::Relaxed) {
135            return Err(Error::DatabaseClosed.to_err());
136        }
137
138        self.db.health_check().await
139    }
140}
141
142#[tonic::async_trait]
143impl database::iterator::Iteratee for Database {
144    /// Implements the [`crate::subnet::rpc::database::iterator::Iteratee`] trait.
145    async fn new_iterator(&self) -> io::Result<BoxedIterator> {
146        self.new_iterator_with_start_and_prefix(&[], &[]).await
147    }
148
149    /// Implements the [`crate::subnet::rpc::database::iterator::Iteratee`] trait.
150    async fn new_iterator_with_start(&self, start: &[u8]) -> io::Result<BoxedIterator> {
151        self.new_iterator_with_start_and_prefix(start, &[]).await
152    }
153
154    /// Implements the [`crate::subnet::rpc::database::iterator::Iteratee`] trait.
155    async fn new_iterator_with_prefix(&self, prefix: &[u8]) -> io::Result<BoxedIterator> {
156        self.new_iterator_with_start_and_prefix(&[], prefix).await
157    }
158
159    /// Implements the [`crate::subnet::rpc::database::iterator::Iteratee`] trait.
160    async fn new_iterator_with_start_and_prefix(
161        &self,
162        start: &[u8],
163        prefix: &[u8],
164    ) -> io::Result<BoxedIterator> {
165        if self.closed.load(Ordering::Relaxed) {
166            return Err(Error::DatabaseClosed.to_err());
167        }
168
169        let mem = self.mem.write().await;
170        let mut keys: Vec<Vec<u8>> = Vec::with_capacity(mem.len());
171        for (k, _v) in mem.iter() {
172            if k.starts_with(prefix) && k >= &start.to_vec() {
173                keys.push(k.to_owned());
174            }
175        }
176
177        // keys need to be in sorted order
178        keys.sort();
179
180        let mut values: Vec<iterator::ValueDelete> = Vec::with_capacity(keys.len());
181        for key in keys.iter() {
182            if let Some(v) = mem.get(key) {
183                values.push(v.to_owned());
184            }
185        }
186
187        Ok(iterator::Iterator::new_boxed(
188            keys,
189            values,
190            Arc::clone(&self.closed),
191            self.db
192                .new_iterator_with_start_and_prefix(start, prefix)
193                .await?,
194        ))
195    }
196}
197
198#[tonic::async_trait]
199impl database::batch::Batcher for Database {
200    /// Implements the [`crate::subnet::rpc::database::batch::Batcher`] trait.
201    async fn new_batch(&self) -> io::Result<BoxedBatch> {
202        Ok(Box::new(batch::Batch::new(
203            Arc::clone(&self.mem),
204            Arc::clone(&self.closed),
205        )))
206    }
207}
208
209#[tonic::async_trait]
210impl database::Commitable for Database {
211    /// Implements the [`crate::subnet::rpc::database::Commitable`] trait.
212    async fn commit(&mut self) -> io::Result<()> {
213        let mut batch = self.commit_batch().await?;
214        batch.write().await?;
215        batch.reset().await;
216        self.abort().await?;
217        Ok(())
218    }
219
220    /// Implements the [`crate::subnet::rpc::database::Commitable`] trait.
221    async fn abort(&self) -> io::Result<()> {
222        let mut mem = self.mem.write().await;
223        mem.clear();
224        Ok(())
225    }
226
227    /// Implements the [`crate::subnet::rpc::database::Commitable`] trait.
228    async fn commit_batch(&mut self) -> io::Result<BoxedBatch> {
229        if self.closed.load(Ordering::Relaxed) {
230            return Err(Error::DatabaseClosed.to_err());
231        }
232
233        let mem = self.mem.read().await;
234        self.batch.reset().await;
235        for (key, value) in mem.iter() {
236            if value.delete {
237                self.batch.delete(key).await?;
238            } else {
239                self.batch.put(key, &value.value).await?;
240            }
241        }
242
243        Ok(self.batch.clone())
244    }
245}
246
247impl database::Database for Database {}
248
249#[tokio::test]
250async fn iterate_test() {
251    use crate::subnet::rpc::database::{
252        iterator::Iteratee, memdb, Commitable, KeyValueReaderWriterDeleter,
253    };
254
255    let base_db = memdb::Database::new_boxed();
256
257    let batch = base_db.new_batch().await.unwrap();
258    let mut db = Database::new(base_db, batch);
259
260    let key1 = "hello1".as_bytes();
261    let value1 = "world1".as_bytes();
262    let key2 = "z".as_bytes();
263    let value2 = "world2".as_bytes();
264
265    db.put(key1, value1).await.unwrap();
266    db.commit().await.unwrap();
267
268    let mut iterator = db.new_iterator().await.unwrap();
269    assert!(iterator.next().await.unwrap());
270    assert_eq!(iterator.key().await.unwrap(), key1);
271    assert_eq!(iterator.value().await.unwrap(), value1);
272    assert!(!iterator.next().await.unwrap());
273    assert!(iterator.key().await.unwrap().is_empty());
274    assert!(iterator.value().await.unwrap().is_empty());
275    assert!(iterator.error().await.is_ok());
276
277    db.put(key2, value2).await.unwrap();
278
279    let mut iterator = db.new_iterator().await.unwrap();
280    assert!(iterator.next().await.unwrap());
281    assert_eq!(iterator.key().await.unwrap(), key1);
282    assert_eq!(iterator.value().await.unwrap(), value1);
283    assert!(iterator.next().await.unwrap());
284    assert_eq!(iterator.key().await.unwrap(), key2);
285    assert_eq!(iterator.value().await.unwrap(), value2);
286    assert!(!iterator.next().await.unwrap());
287    assert!(iterator.key().await.unwrap().is_empty());
288    assert!(iterator.value().await.unwrap().is_empty());
289    assert!(iterator.error().await.is_ok());
290
291    db.delete(key1).await.unwrap();
292
293    let mut iterator = db.new_iterator().await.unwrap();
294    assert!(iterator.next().await.unwrap());
295    assert_eq!(iterator.key().await.unwrap(), key2);
296    assert_eq!(iterator.value().await.unwrap(), value2);
297    assert!(!iterator.next().await.unwrap());
298    assert!(iterator.key().await.unwrap().is_empty());
299    assert!(iterator.value().await.unwrap().is_empty());
300    assert!(iterator.error().await.is_ok());
301
302    db.commit().await.unwrap();
303    db.put(key2, value1).await.unwrap();
304
305    let mut iterator = db.new_iterator().await.unwrap();
306    assert!(iterator.next().await.unwrap());
307    assert_eq!(iterator.key().await.unwrap(), key2);
308    assert_eq!(iterator.value().await.unwrap(), value1);
309    assert!(!iterator.next().await.unwrap());
310    assert!(iterator.key().await.unwrap().is_empty());
311    assert!(iterator.value().await.unwrap().is_empty());
312    assert!(iterator.error().await.is_ok());
313
314    db.commit().await.unwrap();
315    db.put(key1, value2).await.unwrap();
316
317    let mut iterator = db.new_iterator().await.unwrap();
318    assert!(iterator.next().await.unwrap());
319    assert_eq!(iterator.key().await.unwrap(), key1);
320    assert_eq!(iterator.value().await.unwrap(), value2);
321    assert!(iterator.next().await.unwrap());
322    assert_eq!(iterator.key().await.unwrap(), key2);
323    assert_eq!(iterator.value().await.unwrap(), value1);
324    assert!(!iterator.next().await.unwrap());
325    assert!(iterator.key().await.unwrap().is_empty());
326    assert!(iterator.value().await.unwrap().is_empty());
327    assert!(iterator.error().await.is_ok());
328}