kaspa_database/
access.rs

1use crate::{cache::CachePolicy, db::DB, errors::StoreError};
2
3use super::prelude::{Cache, DbKey, DbWriter};
4use kaspa_utils::mem_size::MemSizeEstimator;
5use rocksdb::{Direction, IterateBounds, IteratorMode, ReadOptions};
6use serde::{de::DeserializeOwned, Serialize};
7use std::{collections::hash_map::RandomState, error::Error, hash::BuildHasher, sync::Arc};
8
9/// A concurrent DB store access with typed caching.
10#[derive(Clone)]
11pub struct CachedDbAccess<TKey, TData, S = RandomState>
12where
13    TKey: Clone + std::hash::Hash + Eq + Send + Sync,
14    TData: Clone + Send + Sync + MemSizeEstimator,
15{
16    db: Arc<DB>,
17
18    // Cache
19    cache: Cache<TKey, TData, S>,
20
21    // DB bucket/path
22    prefix: Vec<u8>,
23}
24
25impl<TKey, TData, S> CachedDbAccess<TKey, TData, S>
26where
27    TKey: Clone + std::hash::Hash + Eq + Send + Sync,
28    TData: Clone + Send + Sync + MemSizeEstimator,
29    S: BuildHasher + Default,
30{
31    pub fn new(db: Arc<DB>, cache_policy: CachePolicy, prefix: Vec<u8>) -> Self {
32        Self { db, cache: Cache::new(cache_policy), prefix }
33    }
34
35    pub fn read_from_cache(&self, key: TKey) -> Option<TData>
36    where
37        TKey: Copy + AsRef<[u8]>,
38    {
39        self.cache.get(&key)
40    }
41
42    pub fn has(&self, key: TKey) -> Result<bool, StoreError>
43    where
44        TKey: Clone + AsRef<[u8]>,
45    {
46        Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(&self.prefix, key))?.is_some())
47    }
48
49    pub fn read(&self, key: TKey) -> Result<TData, StoreError>
50    where
51        TKey: Clone + AsRef<[u8]> + ToString,
52        TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime
53    {
54        if let Some(data) = self.cache.get(&key) {
55            Ok(data)
56        } else {
57            let db_key = DbKey::new(&self.prefix, key.clone());
58            if let Some(slice) = self.db.get_pinned(&db_key)? {
59                let data: TData = bincode::deserialize(&slice)?;
60                self.cache.insert(key, data.clone());
61                Ok(data)
62            } else {
63                Err(StoreError::KeyNotFound(db_key))
64            }
65        }
66    }
67
68    pub fn iterator(&self) -> impl Iterator<Item = Result<(Box<[u8]>, TData), Box<dyn Error>>> + '_
69    where
70        TKey: Clone + AsRef<[u8]>,
71        TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime
72    {
73        let prefix_key = DbKey::prefix_only(&self.prefix);
74        let mut read_opts = ReadOptions::default();
75        read_opts.set_iterate_range(rocksdb::PrefixRange(prefix_key.as_ref()));
76        self.db.iterator_opt(IteratorMode::From(prefix_key.as_ref(), Direction::Forward), read_opts).map(move |iter_result| {
77            match iter_result {
78                Ok((key, data_bytes)) => match bincode::deserialize(&data_bytes) {
79                    Ok(data) => Ok((key[prefix_key.prefix_len()..].into(), data)),
80                    Err(e) => Err(e.into()),
81                },
82                Err(e) => Err(e.into()),
83            }
84        })
85    }
86
87    pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError>
88    where
89        TKey: Clone + AsRef<[u8]>,
90        TData: Serialize,
91    {
92        let bin_data = bincode::serialize(&data)?;
93        self.cache.insert(key.clone(), data);
94        writer.put(DbKey::new(&self.prefix, key), bin_data)?;
95        Ok(())
96    }
97
98    pub fn write_many(
99        &self,
100        mut writer: impl DbWriter,
101        iter: &mut (impl Iterator<Item = (TKey, TData)> + Clone),
102    ) -> Result<(), StoreError>
103    where
104        TKey: Clone + AsRef<[u8]>,
105        TData: Serialize,
106    {
107        let iter_clone = iter.clone();
108        self.cache.insert_many(iter);
109        for (key, data) in iter_clone {
110            let bin_data = bincode::serialize(&data)?;
111            writer.put(DbKey::new(&self.prefix, key.clone()), bin_data)?;
112        }
113        Ok(())
114    }
115
116    /// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache
117    pub fn write_many_without_cache(
118        &self,
119        mut writer: impl DbWriter,
120        iter: &mut impl Iterator<Item = (TKey, TData)>,
121    ) -> Result<(), StoreError>
122    where
123        TKey: Clone + AsRef<[u8]>,
124        TData: Serialize,
125    {
126        for (key, data) in iter {
127            let bin_data = bincode::serialize(&data)?;
128            writer.put(DbKey::new(&self.prefix, key), bin_data)?;
129        }
130        // We must clear the cache in order to avoid invalidated entries
131        self.cache.remove_all();
132        Ok(())
133    }
134
135    pub fn delete(&self, mut writer: impl DbWriter, key: TKey) -> Result<(), StoreError>
136    where
137        TKey: Clone + AsRef<[u8]>,
138    {
139        self.cache.remove(&key);
140        writer.delete(DbKey::new(&self.prefix, key))?;
141        Ok(())
142    }
143
144    pub fn delete_many(&self, mut writer: impl DbWriter, key_iter: &mut (impl Iterator<Item = TKey> + Clone)) -> Result<(), StoreError>
145    where
146        TKey: Clone + AsRef<[u8]>,
147    {
148        let key_iter_clone = key_iter.clone();
149        self.cache.remove_many(key_iter);
150        for key in key_iter_clone {
151            writer.delete(DbKey::new(&self.prefix, key.clone()))?;
152        }
153        Ok(())
154    }
155
156    /// Deletes all entries in the store using the underlying rocksdb `delete_range` operation
157    pub fn delete_all(&self, mut writer: impl DbWriter) -> Result<(), StoreError>
158    where
159        TKey: Clone + AsRef<[u8]>,
160    {
161        self.cache.remove_all();
162        let db_key = DbKey::prefix_only(&self.prefix);
163        let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds();
164        writer.delete_range(from.unwrap(), to.unwrap())?;
165        Ok(())
166    }
167
168    /// A dynamic iterator that can iterate through a specific prefix / bucket, or from a certain start point.
169    //TODO: loop and chain iterators for multi-prefix / bucket iterator.
170    pub fn seek_iterator(
171        &self,
172        bucket: Option<&[u8]>,   // iter self.prefix if None, else append bytes to self.prefix.
173        seek_from: Option<TKey>, // iter whole range if None
174        limit: usize,            // amount to take.
175        skip_first: bool,        // skips the first value, (useful in conjunction with the seek-key, as to not re-retrieve).
176    ) -> impl Iterator<Item = Result<(Box<[u8]>, TData), Box<dyn Error>>> + '_
177    where
178        TKey: Clone + AsRef<[u8]>,
179        TData: DeserializeOwned,
180    {
181        let db_key = bucket.map_or_else(
182            move || DbKey::prefix_only(&self.prefix),
183            move |bucket| {
184                let mut key = DbKey::prefix_only(&self.prefix);
185                key.add_bucket(bucket);
186                key
187            },
188        );
189
190        let mut read_opts = ReadOptions::default();
191        read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));
192
193        let mut db_iterator = match seek_from {
194            Some(seek_key) => {
195                self.db.iterator_opt(IteratorMode::From(DbKey::new(&self.prefix, seek_key).as_ref(), Direction::Forward), read_opts)
196            }
197            None => self.db.iterator_opt(IteratorMode::Start, read_opts),
198        };
199
200        if skip_first {
201            db_iterator.next();
202        }
203
204        db_iterator.take(limit).map(move |item| match item {
205            Ok((key_bytes, value_bytes)) => match bincode::deserialize::<TData>(value_bytes.as_ref()) {
206                Ok(value) => Ok((key_bytes[db_key.prefix_len()..].into(), value)),
207                Err(err) => Err(err.into()),
208            },
209            Err(err) => Err(err.into()),
210        })
211    }
212
213    pub fn prefix(&self) -> &[u8] {
214        &self.prefix
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::{
222        create_temp_db,
223        prelude::{BatchDbWriter, ConnBuilder, DirectDbWriter},
224    };
225    use kaspa_hashes::Hash;
226    use rocksdb::WriteBatch;
227
228    #[test]
229    fn test_delete_all() {
230        let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
231        let access = CachedDbAccess::<Hash, u64>::new(db.clone(), CachePolicy::Count(2), vec![1, 2]);
232
233        access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
234        assert_eq!(16, access.iterator().count());
235        access.delete_all(DirectDbWriter::new(&db)).unwrap();
236        assert_eq!(0, access.iterator().count());
237
238        access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
239        assert_eq!(16, access.iterator().count());
240        let mut batch = WriteBatch::default();
241        access.delete_all(BatchDbWriter::new(&mut batch)).unwrap();
242        assert_eq!(16, access.iterator().count());
243        db.write(batch).unwrap();
244        assert_eq!(0, access.iterator().count());
245    }
246}