kaspa_database/
item.rs

1use crate::{
2    db::DB,
3    errors::StoreError,
4    prelude::{DbSetAccess, ReadLock},
5};
6
7use super::prelude::{DbKey, DbWriter};
8use parking_lot::RwLock;
9use serde::{de::DeserializeOwned, Serialize};
10use std::{
11    collections::{hash_map::RandomState, HashSet},
12    hash::BuildHasher,
13    sync::Arc,
14};
15
16/// A cached DB item with concurrency support
17#[derive(Clone)]
18pub struct CachedDbItem<T> {
19    db: Arc<DB>,
20    key: Vec<u8>,
21    cached_item: Arc<RwLock<Option<T>>>,
22}
23
24impl<T> CachedDbItem<T> {
25    pub fn new(db: Arc<DB>, key: Vec<u8>) -> Self {
26        Self { db, key, cached_item: Arc::new(RwLock::new(None)) }
27    }
28
29    pub fn read(&self) -> Result<T, StoreError>
30    where
31        T: Clone + DeserializeOwned,
32    {
33        if let Some(item) = self.cached_item.read().clone() {
34            return Ok(item);
35        }
36        if let Some(slice) = self.db.get_pinned(&self.key)? {
37            let item: T = bincode::deserialize(&slice)?;
38            *self.cached_item.write() = Some(item.clone());
39            Ok(item)
40        } else {
41            Err(StoreError::KeyNotFound(DbKey::prefix_only(&self.key)))
42        }
43    }
44
45    pub fn write(&mut self, mut writer: impl DbWriter, item: &T) -> Result<(), StoreError>
46    where
47        T: Clone + Serialize,
48    {
49        *self.cached_item.write() = Some(item.clone());
50        let bin_data = bincode::serialize(item)?;
51        writer.put(&self.key, bin_data)?;
52        Ok(())
53    }
54
55    pub fn remove(&mut self, mut writer: impl DbWriter) -> Result<(), StoreError>
56where {
57        *self.cached_item.write() = None;
58        writer.delete(&self.key)?;
59        Ok(())
60    }
61
62    pub fn update<F>(&mut self, mut writer: impl DbWriter, op: F) -> Result<T, StoreError>
63    where
64        T: Clone + Serialize + DeserializeOwned,
65        F: Fn(T) -> T,
66    {
67        let mut guard = self.cached_item.write();
68        let mut item = if let Some(item) = guard.take() {
69            item
70        } else if let Some(slice) = self.db.get_pinned(&self.key)? {
71            let item: T = bincode::deserialize(&slice)?;
72            item
73        } else {
74            return Err(StoreError::KeyNotFound(DbKey::prefix_only(&self.key)));
75        };
76
77        item = op(item); // Apply the update op
78        *guard = Some(item.clone());
79        let bin_data = bincode::serialize(&item)?;
80        writer.put(&self.key, bin_data)?;
81        Ok(item)
82    }
83}
84
85#[derive(Clone, Copy, Hash, PartialEq, Eq)]
86struct EmptyKey;
87
88impl AsRef<[u8]> for EmptyKey {
89    fn as_ref(&self) -> &[u8] {
90        &[]
91    }
92}
93
94type LockedSet<T, S> = Arc<RwLock<HashSet<T, S>>>;
95
96#[derive(Clone)]
97pub struct CachedDbSetItem<T: Clone + Send + Sync, S = RandomState> {
98    access: DbSetAccess<EmptyKey, T>,
99    cached_set: Arc<RwLock<Option<LockedSet<T, S>>>>,
100}
101
102impl<T, S> CachedDbSetItem<T, S>
103where
104    T: Clone + std::hash::Hash + Eq + Send + Sync + DeserializeOwned + Serialize,
105    S: BuildHasher + Default,
106{
107    pub fn new(db: Arc<DB>, key: Vec<u8>) -> Self {
108        Self { access: DbSetAccess::new(db, key), cached_set: Arc::new(RwLock::new(None)) }
109    }
110
111    fn read_locked_set(&self) -> Result<LockedSet<T, S>, StoreError>
112    where
113        T: Clone + DeserializeOwned,
114    {
115        if let Some(item) = self.cached_set.read().clone() {
116            return Ok(item);
117        }
118        let set = self.access.bucket_iterator(EmptyKey).collect::<Result<HashSet<_, _>, _>>()?;
119        let set = Arc::new(RwLock::new(set));
120        self.cached_set.write().replace(set.clone());
121        Ok(set)
122    }
123
124    pub fn read(&self) -> Result<ReadLock<HashSet<T, S>>, StoreError>
125    where
126        T: Clone + DeserializeOwned,
127    {
128        Ok(ReadLock::new(self.read_locked_set()?))
129    }
130
131    pub fn update(
132        &mut self,
133        mut writer: impl DbWriter,
134        added_items: &[T],
135        removed_items: &[T],
136    ) -> Result<ReadLock<HashSet<T, S>>, StoreError>
137    where
138        T: Clone + Serialize,
139    {
140        let set = self.read_locked_set()?;
141        {
142            let mut set_write = set.write();
143            for item in removed_items.iter() {
144                self.access.delete(&mut writer, EmptyKey, item.clone())?;
145                set_write.remove(item);
146            }
147            for item in added_items.iter().cloned() {
148                self.access.write(&mut writer, EmptyKey, item.clone())?;
149                set_write.insert(item);
150            }
151        }
152        Ok(ReadLock::new(set))
153    }
154}