1#![doc = include_str!("../README.md")]
2
3use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path};
4
5use brk_error::Result;
6use brk_types::{Height, Version};
7use byteview::ByteView;
8use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*};
9use rustc_hash::{FxHashMap, FxHashSet};
10
11mod any;
12mod item;
13mod kind;
14mod meta;
15mod mode;
16
17pub use any::*;
18pub use item::*;
19pub use kind::*;
20pub use meta::*;
21pub use mode::*;
22
23const MAJOR_FJALL_VERSION: Version = Version::new(3);
24
25pub fn open_database(path: &Path) -> fjall::Result<Database> {
26 Database::builder(path.join("fjall"))
27 .cache_size(3 * 1024 * 1024 * 1024)
28 .max_cached_files(Some(512))
29 .open()
30}
31
32#[derive(Clone)]
33pub struct Store<K, V> {
34 meta: StoreMeta,
35 name: &'static str,
36 keyspace: Keyspace,
37 puts: FxHashMap<K, V>,
38 dels: FxHashSet<K>,
39 caches: Vec<FxHashMap<K, V>>,
40}
41
42impl<K, V> Store<K, V>
43where
44 K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
45 V: Debug + Clone + From<ByteView>,
46 ByteView: From<K> + From<V>,
47 Self: Send + Sync,
48{
49 pub fn import(
50 db: &Database,
51 path: &Path,
52 name: &str,
53 version: Version,
54 mode: Mode,
55 kind: Kind,
56 ) -> Result<Self> {
57 Self::import_inner(db, path, name, version, mode, kind, 0)
58 }
59
60 pub fn import_cached(
61 db: &Database,
62 path: &Path,
63 name: &str,
64 version: Version,
65 mode: Mode,
66 kind: Kind,
67 max_batches: u8,
68 ) -> Result<Self> {
69 Self::import_inner(db, path, name, version, mode, kind, max_batches)
70 }
71
72 fn import_inner(
73 db: &Database,
74 path: &Path,
75 name: &str,
76 version: Version,
77 mode: Mode,
78 kind: Kind,
79 max_batches: u8,
80 ) -> Result<Self> {
81 fs::create_dir_all(path)?;
82
83 let (meta, keyspace) = StoreMeta::checked_open(
84 &path.join(format!("meta/{name}")),
85 MAJOR_FJALL_VERSION + version,
86 || {
87 Self::open_keyspace(db, name, mode, kind).inspect_err(|e| {
88 eprintln!("{e}");
89 eprintln!("Delete {path:?} and try again");
90 })
91 },
92 )?;
93
94 let mut caches = vec![];
95 for _ in 0..max_batches {
96 caches.push(FxHashMap::default());
97 }
98
99 Ok(Self {
100 meta,
101 name: Box::leak(Box::new(name.to_string())),
102 keyspace,
103 puts: FxHashMap::default(),
104 dels: FxHashSet::default(),
105 caches,
106 })
107 }
108
109 fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result<Keyspace> {
110 let mut options = KeyspaceCreateOptions::default()
111 .manual_journal_persist(true)
112 .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true]))
113 .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true]));
114
115 match kind {
116 Kind::Random => {
117 options = options
118 .filter_block_pinning_policy(PinningPolicy::new([true, true, true, false]))
119 .filter_policy(FilterPolicy::new([
120 FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
121 0.0001,
122 )),
123 FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
124 FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
125 FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)),
126 ]));
127 }
128 Kind::Recent => {
129 options = options
130 .expect_point_read_hits(true)
131 .filter_policy(FilterPolicy::new([
132 FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
133 0.0001,
134 )),
135 FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
136 FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)),
137 FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)),
138 ]));
139 }
140 Kind::Sequential => {
141 options = options
142 .filter_block_partitioning_policy(PartitioningPolicy::all(true))
143 .index_block_partitioning_policy(PartitioningPolicy::all(true))
144 .filter_block_pinning_policy(PinningPolicy::all(false))
145 .index_block_pinning_policy(PinningPolicy::all(false));
146 }
147 Kind::Vec => {
148 options = options
149 .max_memtable_size(8 * 1024 * 1024)
150 .filter_policy(FilterPolicy::disabled())
151 .filter_block_pinning_policy(PinningPolicy::all(false))
152 .index_block_pinning_policy(PinningPolicy::all(false));
153 }
154 }
155
156 database.keyspace(name, || options).map_err(|e| e.into())
157 }
158
159 #[inline]
160 pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
161 where
162 ByteView: From<&'a K>,
163 {
164 if let Some(v) = self.puts.get(key) {
165 return Ok(Some(Cow::Borrowed(v)));
166 }
167
168 for cache in &self.caches {
169 if let Some(v) = cache.get(key) {
170 return Ok(Some(Cow::Borrowed(v)));
171 }
172 }
173
174 if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
175 Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
176 } else {
177 Ok(None)
178 }
179 }
180
181 #[inline]
182 pub fn is_empty(&self) -> Result<bool> {
183 self.keyspace.is_empty().map_err(|e| e.into())
184 }
185
186 #[inline]
187 pub fn insert(&mut self, key: K, value: V) {
188 let _ = self.dels.is_empty() || self.dels.remove(&key);
189 self.puts.insert(key, value);
190 }
191
192 #[inline]
193 pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
194 if self.needs(height) {
195 self.insert(key, value);
196 }
197 }
198
199 #[inline]
200 pub fn remove(&mut self, key: K) {
201 if self.puts.remove(&key).is_some() {
202 return;
203 }
204 let newly_inserted = self.dels.insert(key);
205 debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
206 }
207
208 #[inline]
209 pub fn remove_if_needed(&mut self, key: K, height: Height) {
210 if self.needs(height) {
211 self.remove(key)
212 }
213 }
214
215 #[inline]
217 pub fn clear_caches(&mut self) {
218 for cache in &mut self.caches {
219 cache.clear();
220 }
221 }
222
223 #[inline]
224 pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
225 self.keyspace
226 .iter()
227 .map(|res| res.into_inner().unwrap())
228 .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
229 }
230
231 #[inline]
232 pub fn prefix<P: AsRef<[u8]>>(
233 &self,
234 prefix: P,
235 ) -> impl DoubleEndedIterator<Item = (K, V)> + '_ {
236 self.keyspace
237 .prefix(prefix)
238 .map(|res| res.into_inner().unwrap())
239 .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
240 }
241
242 pub fn approximate_len(&self) -> usize {
243 self.keyspace.approximate_len()
244 }
245
246 #[inline]
247 fn has(&self, height: Height) -> bool {
248 self.meta.has(height)
249 }
250
251 #[inline]
252 pub fn needs(&self, height: Height) -> bool {
253 self.meta.needs(height)
254 }
255
256 fn export_meta(&mut self, height: Height) -> Result<()> {
257 self.meta.export(height)?;
258 Ok(())
259 }
260
261 fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
262 if !self.has(height) {
263 self.export_meta(height)?;
264 }
265 Ok(())
266 }
267
268 fn ingest<'a>(
269 keyspace: &Keyspace,
270 puts: impl Iterator<Item = (&'a K, &'a V)>,
271 dels: impl Iterator<Item = &'a K>,
272 ) -> Result<()>
273 where
274 ByteView: From<&'a K> + From<&'a V>,
275 K: 'a,
276 V: 'a,
277 {
278 let mut items: Vec<Item<&'a K, &'a V>> = puts
279 .map(|(key, value)| Item::Value { key, value })
280 .chain(dels.map(Item::Tomb))
281 .collect();
282
283 items.sort_unstable();
284
285 let mut ingestion = keyspace.start_ingestion()?;
286 for item in items {
287 match item {
288 Item::Value { key, value } => {
289 ingestion.write(ByteView::from(key), ByteView::from(value))?;
290 }
291 Item::Tomb(key) => {
292 ingestion.write_weak_tombstone(ByteView::from(key))?;
293 }
294 }
295 }
296 ingestion.finish()?;
297
298 Ok(())
299 }
300}
301
302impl<K, V> AnyStore for Store<K, V>
303where
304 K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
305 V: Debug + Clone + From<ByteView>,
306 for<'a> ByteView: From<K> + From<V> + From<&'a K> + From<&'a V>,
307 Self: Send + Sync,
308{
309 fn keyspace(&self) -> &Keyspace {
310 &self.keyspace
311 }
312
313 fn export_meta(&mut self, height: Height) -> Result<()> {
314 self.export_meta(height)
315 }
316
317 fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
318 self.export_meta_if_needed(height)
319 }
320
321 fn name(&self) -> &'static str {
322 self.name
323 }
324
325 fn height(&self) -> Option<Height> {
326 self.meta.height()
327 }
328
329 fn has(&self, height: Height) -> bool {
330 self.has(height)
331 }
332
333 fn needs(&self, height: Height) -> bool {
334 self.needs(height)
335 }
336
337 fn version(&self) -> Version {
338 self.meta.version()
339 }
340
341 fn commit(&mut self, height: Height) -> Result<()> {
342 self.export_meta_if_needed(height)?;
343
344 let puts = mem::take(&mut self.puts);
345 let dels = mem::take(&mut self.dels);
346
347 if puts.is_empty() && dels.is_empty() {
348 return Ok(());
349 }
350
351 Self::ingest(&self.keyspace, puts.iter(), dels.iter())?;
352
353 if !self.caches.is_empty() {
354 self.caches.pop();
355 self.caches.insert(0, puts);
356 }
357
358 Ok(())
359 }
360
361 fn reset(&mut self) -> Result<()> {
362 self.meta.reset()?;
363 self.puts.clear();
364 self.dels.clear();
365 for cache in &mut self.caches {
366 cache.clear();
367 }
368 self.keyspace.clear()?;
369 Ok(())
370 }
371}