Skip to main content

brk_store/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path};
4
5use brk_error::Result;
6use brk_types::{Height, Version};
7use byteview::ByteView;
8use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*};
9use rustc_hash::{FxHashMap, FxHashSet};
10
11mod any;
12mod item;
13mod kind;
14mod meta;
15mod mode;
16
17pub use any::*;
18pub use item::*;
19pub use kind::*;
20pub use meta::*;
21pub use mode::*;
22
23const MAJOR_FJALL_VERSION: Version = Version::new(3);
24
25pub fn open_database(path: &Path) -> fjall::Result<Database> {
26    Database::builder(path.join("fjall"))
27        .cache_size(3 * 1024 * 1024 * 1024)
28        .max_cached_files(Some(512))
29        .open()
30}
31
32#[derive(Clone)]
33pub struct Store<K, V> {
34    meta: StoreMeta,
35    name: &'static str,
36    keyspace: Keyspace,
37    puts: FxHashMap<K, V>,
38    dels: FxHashSet<K>,
39    caches: Vec<FxHashMap<K, V>>,
40}
41
42impl<K, V> Store<K, V>
43where
44    K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
45    V: Debug + Clone + From<ByteView>,
46    ByteView: From<K> + From<V>,
47    Self: Send + Sync,
48{
49    pub fn import(
50        db: &Database,
51        path: &Path,
52        name: &str,
53        version: Version,
54        mode: Mode,
55        kind: Kind,
56    ) -> Result<Self> {
57        Self::import_inner(db, path, name, version, mode, kind, 0)
58    }
59
60    pub fn import_cached(
61        db: &Database,
62        path: &Path,
63        name: &str,
64        version: Version,
65        mode: Mode,
66        kind: Kind,
67        max_batches: u8,
68    ) -> Result<Self> {
69        Self::import_inner(db, path, name, version, mode, kind, max_batches)
70    }
71
72    fn import_inner(
73        db: &Database,
74        path: &Path,
75        name: &str,
76        version: Version,
77        mode: Mode,
78        kind: Kind,
79        max_batches: u8,
80    ) -> Result<Self> {
81        fs::create_dir_all(path)?;
82
83        let (meta, keyspace) = StoreMeta::checked_open(
84            &path.join(format!("meta/{name}")),
85            MAJOR_FJALL_VERSION + version,
86            || {
87                Self::open_keyspace(db, name, mode, kind).inspect_err(|e| {
88                    eprintln!("{e}");
89                    eprintln!("Delete {path:?} and try again");
90                })
91            },
92        )?;
93
94        let mut caches = vec![];
95        for _ in 0..max_batches {
96            caches.push(FxHashMap::default());
97        }
98
99        Ok(Self {
100            meta,
101            name: Box::leak(Box::new(name.to_string())),
102            keyspace,
103            puts: FxHashMap::default(),
104            dels: FxHashSet::default(),
105            caches,
106        })
107    }
108
109    fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result<Keyspace> {
110        let mut options = KeyspaceCreateOptions::default()
111            .manual_journal_persist(true)
112            .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true]))
113            .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true]));
114
115        match kind {
116            Kind::Random => {
117                options = options
118                    .filter_block_pinning_policy(PinningPolicy::new([true, true, true, false]))
119                    .filter_policy(FilterPolicy::new([
120                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
121                            0.0001,
122                        )),
123                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
124                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
125                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)),
126                    ]));
127            }
128            Kind::Recent => {
129                options = options
130                    .expect_point_read_hits(true)
131                    .filter_policy(FilterPolicy::new([
132                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
133                            0.0001,
134                        )),
135                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
136                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)),
137                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)),
138                    ]));
139            }
140            Kind::Sequential => {
141                options = options
142                    .filter_block_partitioning_policy(PartitioningPolicy::all(true))
143                    .index_block_partitioning_policy(PartitioningPolicy::all(true))
144                    .filter_block_pinning_policy(PinningPolicy::all(false))
145                    .index_block_pinning_policy(PinningPolicy::all(false));
146            }
147            Kind::Vec => {
148                options = options
149                    .max_memtable_size(8 * 1024 * 1024)
150                    .filter_policy(FilterPolicy::disabled())
151                    .filter_block_pinning_policy(PinningPolicy::all(false))
152                    .index_block_pinning_policy(PinningPolicy::all(false));
153            }
154        }
155
156        database.keyspace(name, || options).map_err(|e| e.into())
157    }
158
159    #[inline]
160    pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
161    where
162        ByteView: From<&'a K>,
163    {
164        if let Some(v) = self.puts.get(key) {
165            return Ok(Some(Cow::Borrowed(v)));
166        }
167
168        for cache in &self.caches {
169            if let Some(v) = cache.get(key) {
170                return Ok(Some(Cow::Borrowed(v)));
171            }
172        }
173
174        if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
175            Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
176        } else {
177            Ok(None)
178        }
179    }
180
181    #[inline]
182    pub fn is_empty(&self) -> Result<bool> {
183        self.keyspace.is_empty().map_err(|e| e.into())
184    }
185
186    #[inline]
187    pub fn insert(&mut self, key: K, value: V) {
188        let _ = self.dels.is_empty() || self.dels.remove(&key);
189        self.puts.insert(key, value);
190    }
191
192    #[inline]
193    pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
194        if self.needs(height) {
195            self.insert(key, value);
196        }
197    }
198
199    #[inline]
200    pub fn remove(&mut self, key: K) {
201        if self.puts.remove(&key).is_some() {
202            return;
203        }
204        let newly_inserted = self.dels.insert(key);
205        debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
206    }
207
208    #[inline]
209    pub fn remove_if_needed(&mut self, key: K, height: Height) {
210        if self.needs(height) {
211            self.remove(key)
212        }
213    }
214
215    /// Clear all caches. Call after bulk removals (e.g., rollback) to prevent stale reads.
216    #[inline]
217    pub fn clear_caches(&mut self) {
218        for cache in &mut self.caches {
219            cache.clear();
220        }
221    }
222
223    #[inline]
224    pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
225        self.keyspace
226            .iter()
227            .map(|res| res.into_inner().unwrap())
228            .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
229    }
230
231    #[inline]
232    pub fn prefix<P: AsRef<[u8]>>(
233        &self,
234        prefix: P,
235    ) -> impl DoubleEndedIterator<Item = (K, V)> + '_ {
236        self.keyspace
237            .prefix(prefix)
238            .map(|res| res.into_inner().unwrap())
239            .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
240    }
241
242    pub fn approximate_len(&self) -> usize {
243        self.keyspace.approximate_len()
244    }
245
246    #[inline]
247    fn has(&self, height: Height) -> bool {
248        self.meta.has(height)
249    }
250
251    #[inline]
252    pub fn needs(&self, height: Height) -> bool {
253        self.meta.needs(height)
254    }
255
256    fn export_meta(&mut self, height: Height) -> Result<()> {
257        self.meta.export(height)?;
258        Ok(())
259    }
260
261    fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
262        if !self.has(height) {
263            self.export_meta(height)?;
264        }
265        Ok(())
266    }
267
268    fn ingest<'a>(
269        keyspace: &Keyspace,
270        puts: impl Iterator<Item = (&'a K, &'a V)>,
271        dels: impl Iterator<Item = &'a K>,
272    ) -> Result<()>
273    where
274        ByteView: From<&'a K> + From<&'a V>,
275        K: 'a,
276        V: 'a,
277    {
278        let mut items: Vec<Item<&'a K, &'a V>> = puts
279            .map(|(key, value)| Item::Value { key, value })
280            .chain(dels.map(Item::Tomb))
281            .collect();
282
283        items.sort_unstable();
284
285        let mut ingestion = keyspace.start_ingestion()?;
286        for item in items {
287            match item {
288                Item::Value { key, value } => {
289                    ingestion.write(ByteView::from(key), ByteView::from(value))?;
290                }
291                Item::Tomb(key) => {
292                    ingestion.write_weak_tombstone(ByteView::from(key))?;
293                }
294            }
295        }
296        ingestion.finish()?;
297
298        Ok(())
299    }
300}
301
302impl<K, V> AnyStore for Store<K, V>
303where
304    K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
305    V: Debug + Clone + From<ByteView>,
306    for<'a> ByteView: From<K> + From<V> + From<&'a K> + From<&'a V>,
307    Self: Send + Sync,
308{
309    fn keyspace(&self) -> &Keyspace {
310        &self.keyspace
311    }
312
313    fn export_meta(&mut self, height: Height) -> Result<()> {
314        self.export_meta(height)
315    }
316
317    fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
318        self.export_meta_if_needed(height)
319    }
320
321    fn name(&self) -> &'static str {
322        self.name
323    }
324
325    fn height(&self) -> Option<Height> {
326        self.meta.height()
327    }
328
329    fn has(&self, height: Height) -> bool {
330        self.has(height)
331    }
332
333    fn needs(&self, height: Height) -> bool {
334        self.needs(height)
335    }
336
337    fn version(&self) -> Version {
338        self.meta.version()
339    }
340
341    fn commit(&mut self, height: Height) -> Result<()> {
342        self.export_meta_if_needed(height)?;
343
344        let puts = mem::take(&mut self.puts);
345        let dels = mem::take(&mut self.dels);
346
347        if puts.is_empty() && dels.is_empty() {
348            return Ok(());
349        }
350
351        Self::ingest(&self.keyspace, puts.iter(), dels.iter())?;
352
353        if !self.caches.is_empty() {
354            self.caches.pop();
355            self.caches.insert(0, puts);
356        }
357
358        Ok(())
359    }
360
361    fn reset(&mut self) -> Result<()> {
362        self.meta.reset()?;
363        self.puts.clear();
364        self.dels.clear();
365        for cache in &mut self.caches {
366            cache.clear();
367        }
368        self.keyspace.clear()?;
369        Ok(())
370    }
371}