1use crate::{cache::CachePolicy, db::DB, errors::StoreError};
2
3use super::prelude::{Cache, DbKey, DbWriter};
4use kaspa_utils::mem_size::MemSizeEstimator;
5use rocksdb::{Direction, IterateBounds, IteratorMode, ReadOptions};
6use serde::{de::DeserializeOwned, Serialize};
7use std::{collections::hash_map::RandomState, error::Error, hash::BuildHasher, sync::Arc};
8
9#[derive(Clone)]
11pub struct CachedDbAccess<TKey, TData, S = RandomState>
12where
13 TKey: Clone + std::hash::Hash + Eq + Send + Sync,
14 TData: Clone + Send + Sync + MemSizeEstimator,
15{
16 db: Arc<DB>,
17
18 cache: Cache<TKey, TData, S>,
20
21 prefix: Vec<u8>,
23}
24
25impl<TKey, TData, S> CachedDbAccess<TKey, TData, S>
26where
27 TKey: Clone + std::hash::Hash + Eq + Send + Sync,
28 TData: Clone + Send + Sync + MemSizeEstimator,
29 S: BuildHasher + Default,
30{
31 pub fn new(db: Arc<DB>, cache_policy: CachePolicy, prefix: Vec<u8>) -> Self {
32 Self { db, cache: Cache::new(cache_policy), prefix }
33 }
34
35 pub fn read_from_cache(&self, key: TKey) -> Option<TData>
36 where
37 TKey: Copy + AsRef<[u8]>,
38 {
39 self.cache.get(&key)
40 }
41
42 pub fn has(&self, key: TKey) -> Result<bool, StoreError>
43 where
44 TKey: Clone + AsRef<[u8]>,
45 {
46 Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(&self.prefix, key))?.is_some())
47 }
48
49 pub fn read(&self, key: TKey) -> Result<TData, StoreError>
50 where
51 TKey: Clone + AsRef<[u8]> + ToString,
52 TData: DeserializeOwned, {
54 if let Some(data) = self.cache.get(&key) {
55 Ok(data)
56 } else {
57 let db_key = DbKey::new(&self.prefix, key.clone());
58 if let Some(slice) = self.db.get_pinned(&db_key)? {
59 let data: TData = bincode::deserialize(&slice)?;
60 self.cache.insert(key, data.clone());
61 Ok(data)
62 } else {
63 Err(StoreError::KeyNotFound(db_key))
64 }
65 }
66 }
67
68 pub fn iterator(&self) -> impl Iterator<Item = Result<(Box<[u8]>, TData), Box<dyn Error>>> + '_
69 where
70 TKey: Clone + AsRef<[u8]>,
71 TData: DeserializeOwned, {
73 let prefix_key = DbKey::prefix_only(&self.prefix);
74 let mut read_opts = ReadOptions::default();
75 read_opts.set_iterate_range(rocksdb::PrefixRange(prefix_key.as_ref()));
76 self.db.iterator_opt(IteratorMode::From(prefix_key.as_ref(), Direction::Forward), read_opts).map(move |iter_result| {
77 match iter_result {
78 Ok((key, data_bytes)) => match bincode::deserialize(&data_bytes) {
79 Ok(data) => Ok((key[prefix_key.prefix_len()..].into(), data)),
80 Err(e) => Err(e.into()),
81 },
82 Err(e) => Err(e.into()),
83 }
84 })
85 }
86
87 pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError>
88 where
89 TKey: Clone + AsRef<[u8]>,
90 TData: Serialize,
91 {
92 let bin_data = bincode::serialize(&data)?;
93 self.cache.insert(key.clone(), data);
94 writer.put(DbKey::new(&self.prefix, key), bin_data)?;
95 Ok(())
96 }
97
98 pub fn write_many(
99 &self,
100 mut writer: impl DbWriter,
101 iter: &mut (impl Iterator<Item = (TKey, TData)> + Clone),
102 ) -> Result<(), StoreError>
103 where
104 TKey: Clone + AsRef<[u8]>,
105 TData: Serialize,
106 {
107 let iter_clone = iter.clone();
108 self.cache.insert_many(iter);
109 for (key, data) in iter_clone {
110 let bin_data = bincode::serialize(&data)?;
111 writer.put(DbKey::new(&self.prefix, key.clone()), bin_data)?;
112 }
113 Ok(())
114 }
115
116 pub fn write_many_without_cache(
118 &self,
119 mut writer: impl DbWriter,
120 iter: &mut impl Iterator<Item = (TKey, TData)>,
121 ) -> Result<(), StoreError>
122 where
123 TKey: Clone + AsRef<[u8]>,
124 TData: Serialize,
125 {
126 for (key, data) in iter {
127 let bin_data = bincode::serialize(&data)?;
128 writer.put(DbKey::new(&self.prefix, key), bin_data)?;
129 }
130 self.cache.remove_all();
132 Ok(())
133 }
134
135 pub fn delete(&self, mut writer: impl DbWriter, key: TKey) -> Result<(), StoreError>
136 where
137 TKey: Clone + AsRef<[u8]>,
138 {
139 self.cache.remove(&key);
140 writer.delete(DbKey::new(&self.prefix, key))?;
141 Ok(())
142 }
143
144 pub fn delete_many(&self, mut writer: impl DbWriter, key_iter: &mut (impl Iterator<Item = TKey> + Clone)) -> Result<(), StoreError>
145 where
146 TKey: Clone + AsRef<[u8]>,
147 {
148 let key_iter_clone = key_iter.clone();
149 self.cache.remove_many(key_iter);
150 for key in key_iter_clone {
151 writer.delete(DbKey::new(&self.prefix, key.clone()))?;
152 }
153 Ok(())
154 }
155
156 pub fn delete_all(&self, mut writer: impl DbWriter) -> Result<(), StoreError>
158 where
159 TKey: Clone + AsRef<[u8]>,
160 {
161 self.cache.remove_all();
162 let db_key = DbKey::prefix_only(&self.prefix);
163 let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds();
164 writer.delete_range(from.unwrap(), to.unwrap())?;
165 Ok(())
166 }
167
168 pub fn seek_iterator(
171 &self,
172 bucket: Option<&[u8]>, seek_from: Option<TKey>, limit: usize, skip_first: bool, ) -> impl Iterator<Item = Result<(Box<[u8]>, TData), Box<dyn Error>>> + '_
177 where
178 TKey: Clone + AsRef<[u8]>,
179 TData: DeserializeOwned,
180 {
181 let db_key = bucket.map_or_else(
182 move || DbKey::prefix_only(&self.prefix),
183 move |bucket| {
184 let mut key = DbKey::prefix_only(&self.prefix);
185 key.add_bucket(bucket);
186 key
187 },
188 );
189
190 let mut read_opts = ReadOptions::default();
191 read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));
192
193 let mut db_iterator = match seek_from {
194 Some(seek_key) => {
195 self.db.iterator_opt(IteratorMode::From(DbKey::new(&self.prefix, seek_key).as_ref(), Direction::Forward), read_opts)
196 }
197 None => self.db.iterator_opt(IteratorMode::Start, read_opts),
198 };
199
200 if skip_first {
201 db_iterator.next();
202 }
203
204 db_iterator.take(limit).map(move |item| match item {
205 Ok((key_bytes, value_bytes)) => match bincode::deserialize::<TData>(value_bytes.as_ref()) {
206 Ok(value) => Ok((key_bytes[db_key.prefix_len()..].into(), value)),
207 Err(err) => Err(err.into()),
208 },
209 Err(err) => Err(err.into()),
210 })
211 }
212
213 pub fn prefix(&self) -> &[u8] {
214 &self.prefix
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::{
222 create_temp_db,
223 prelude::{BatchDbWriter, ConnBuilder, DirectDbWriter},
224 };
225 use kaspa_hashes::Hash;
226 use rocksdb::WriteBatch;
227
228 #[test]
229 fn test_delete_all() {
230 let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
231 let access = CachedDbAccess::<Hash, u64>::new(db.clone(), CachePolicy::Count(2), vec![1, 2]);
232
233 access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
234 assert_eq!(16, access.iterator().count());
235 access.delete_all(DirectDbWriter::new(&db)).unwrap();
236 assert_eq!(0, access.iterator().count());
237
238 access.write_many(DirectDbWriter::new(&db), &mut (0..16).map(|i| (i.into(), 2))).unwrap();
239 assert_eq!(16, access.iterator().count());
240 let mut batch = WriteBatch::default();
241 access.delete_all(BatchDbWriter::new(&mut batch)).unwrap();
242 assert_eq!(16, access.iterator().count());
243 db.write(batch).unwrap();
244 assert_eq!(0, access.iterator().count());
245 }
246}