pearl/blob/index/
core.rs

1use bytes::BytesMut;
2
3use super::prelude::*;
4use crate::filter::{BloomDataProvider, CombinedFilter, FilterTrait};
5use std::mem::size_of;
6
7pub(crate) type Index<K> = IndexStruct<BPTreeFileIndex<K>, K>;
8
9pub(crate) const HEADER_VERSION: u8 = 6;
10pub(crate) const INDEX_HEADER_MAGIC_BYTE: u64 = 0xacdc_bcde;
11
12#[derive(Debug)]
13struct IndexParams {
14    bloom_is_on: bool,
15    recreate_file: bool,
16}
17
18impl IndexParams {
19    fn new(bloom_is_on: bool, recreate_file: bool) -> Self {
20        Self {
21            bloom_is_on,
22            recreate_file,
23        }
24    }
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct IndexConfig {
29    pub bloom_config: Option<BloomConfig>,
30    pub recreate_index_file: bool,
31}
32
33impl Default for IndexConfig {
34    fn default() -> Self {
35        Self {
36            bloom_config: None,
37            recreate_index_file: true,
38        }
39    }
40}
41
42#[derive(Debug)]
43pub(crate) struct IndexStruct<FileIndex, K>
44where
45    for<'a> K: Key<'a>,
46{
47    filter: CombinedFilter<K>,
48    bloom_offset: Option<u64>,
49    params: IndexParams,
50    inner: State<FileIndex, K>,
51    name: FileName,
52    iodriver: IoDriver,
53}
54
55#[derive(Debug, Default)] // Default can be used to initialize structure with 0
56struct MemoryAttrs<K> {
57    records_count: usize,
58    records_allocated: usize,
59    marker: PhantomData<K>,
60}
61
62const BTREE_B_FACTOR: usize = 6;
63const BTREE_VALUES_LEN: usize = BTREE_B_FACTOR * 2 - 1;
64const BTREE_EDGES_LEN: usize = BTREE_B_FACTOR * 2;
65
66impl<K> MemoryAttrs<K>
67where
68    for<'a> K: Key<'a>,
69{
70    const BTREE_ENTRY_SIZE: usize = K::MEM_SIZE + size_of::<Vec<RecordHeader>>();
71    const RECORD_HEADER_SIZE: usize = size_of::<RecordHeader>() + K::LEN as usize;
72    // Each node in BTreeMap contains preallocated vectors of 11 values and 12 edges.
73    // Although count of nodes can't be determined without reimplementing insertion algorithm,
74    // we can use approximation of overhead size added per one key
75    const BTREE_DATA_NODE_SIZE: usize = size_of::<Option<std::ptr::NonNull<()>>>() +                     // ptr to parent
76                                        size_of::<u16>() * 2 +                                           // metadata
77                                        MemoryAttrs::<K>::BTREE_ENTRY_SIZE * BTREE_VALUES_LEN;           // data
78    const BTREE_DATA_NODE_RATIO: f64 = 1.0 / BTREE_VALUES_LEN as f64;
79    const BTREE_INTERNAL_NODE_OVERHEAD: usize = size_of::<std::ptr::NonNull<()>>() * BTREE_EDGES_LEN;    // edges
80    const BTREE_INTERNAL_NODE_RATIO: f64 = (1 +
81                                            BTREE_EDGES_LEN +
82                                            BTREE_EDGES_LEN.pow(2) +
83                                            BTREE_EDGES_LEN.pow(3) +
84                                            BTREE_EDGES_LEN.pow(4)) as f64
85                                           / (BTREE_VALUES_LEN * BTREE_EDGES_LEN.pow(5)) as f64;
86    const BTREE_SIZE_MULTIPLIER: f64 =
87        (MemoryAttrs::<K>::BTREE_DATA_NODE_SIZE as f64 * MemoryAttrs::<K>::BTREE_DATA_NODE_RATIO) +
88        (MemoryAttrs::<K>::BTREE_INTERNAL_NODE_OVERHEAD as f64 * MemoryAttrs::<K>::BTREE_INTERNAL_NODE_RATIO);
89}
90
91pub type InMemoryIndex<K> = BTreeMap<K, Vec<RecordHeader>>;
92
93#[derive(Debug, Default)]
94pub(crate) struct InMemoryData<K> {
95    headers: InMemoryIndex<K>,
96    mem: MemoryAttrs<K>,
97}
98
99impl<K> InMemoryData<K>
100where
101    for<'a> K: Key<'a>,
102{
103    fn new(headers: InMemoryIndex<K>, count: usize) -> Self {
104        let mem = MemoryAttrs {
105            records_allocated: headers.values().fold(0, |acc, v| acc + v.capacity()),
106            records_count: count,
107            marker: PhantomData,
108        };
109
110        Self { headers, mem }
111    }
112
113    fn memory_used(&self) -> usize {
114        let Self { mem, .. } = &self;
115        let MemoryAttrs {
116            records_count,
117            records_allocated,
118            ..
119        } = &mem;
120        let len = self.headers.len();
121        trace!("len: {}, records_allocated: {}, records_count: {}",
122                len, records_allocated, records_count);
123        // last minus is neccessary, because allocated but not initialized record
124        // headers don't have key allocated on heap
125        MemoryAttrs::<K>::RECORD_HEADER_SIZE * records_allocated
126            + (len as f64 * MemoryAttrs::<K>::BTREE_SIZE_MULTIPLIER) as usize
127            - (records_allocated - records_count) * K::LEN as usize
128    }
129
130    fn records_count(&self) -> usize {
131        self.mem.records_count
132    }
133
134    fn register_record_allocation(&mut self, records_allocated: usize) {
135        self.mem.records_allocated += records_allocated;
136        self.mem.records_count += 1;
137    }
138}
139
140#[derive(Debug)]
141pub(crate) enum State<FileIndex, K> {
142    InMemory(SRwLock<InMemoryData<K>>),
143    OnDisk(FileIndex),
144}
145
146impl<FileIndex, K> IndexStruct<FileIndex, K>
147where
148    FileIndex: FileIndexTrait<K>,
149    for<'a> K: Key<'a>,
150{
151    pub(crate) fn new(name: FileName, iodriver: IoDriver, config: IndexConfig) -> Self {
152        let params = IndexParams::new(config.bloom_config.is_some(), config.recreate_index_file);
153        let bloom_filter = config.bloom_config.map(|cfg| Bloom::new(cfg));
154        Self {
155            params,
156            filter: CombinedFilter::new(bloom_filter, RangeFilter::new()),
157            bloom_offset: None,
158            inner: State::InMemory(SRwLock::default()),
159            name,
160            iodriver,
161        }
162    }
163
164    pub(crate) fn clear(&mut self) {
165        self.inner = State::InMemory(SRwLock::default());
166        self.filter.clear_filter();
167    }
168
169    pub fn offload_filter(&mut self) -> usize {
170        if self.on_disk() {
171            self.filter.offload_filter()
172        } else {
173            0
174        }
175    }
176
177    pub fn get_filter(&self) -> &CombinedFilter<K> {
178        &self.filter
179    }
180
181    pub(crate) fn name(&self) -> &FileName {
182        &self.name
183    }
184
185    /// Fast check for key presence. None - can't perform fast check (disk access required)
186    pub(crate) fn contains_key_fast(&self, key: &K) -> Option<bool> {
187        match &self.inner {
188            State::InMemory(index) => Some(index.read().expect("read lock acquired").headers.contains_key(key)),
189            State::OnDisk(_) => None
190        }
191    }
192
193    pub(crate) async fn from_file(
194        name: FileName,
195        config: IndexConfig,
196        iodriver: IoDriver,
197        blob_size: u64,
198    ) -> Result<Self> {
199        let findex = FileIndex::from_file(name.clone(), iodriver.clone()).await?;
200        findex
201            .validate(blob_size)
202            .with_context(|| "Header is corrupt")?;
203        let meta_buf = findex.read_meta().await.map_err(|err| err.into_bincode_if_unexpected_eof())?;
204        let (bloom_filter, range_filter, bloom_offset) = Self::deserialize_filters(&meta_buf)?;
205        let params = IndexParams::new(config.bloom_config.is_some(), config.recreate_index_file);
206        let bloom_filter = if params.bloom_is_on {
207            Some(bloom_filter)
208        } else {
209            None
210        };
211        trace!("index restored successfuly");
212        let index = Self {
213            inner: State::OnDisk(findex),
214            name,
215            filter: CombinedFilter::new(bloom_filter, range_filter),
216            bloom_offset: Some(bloom_offset as u64),
217            params,
218            iodriver,
219        };
220        Ok(index)
221    }
222
223    pub(crate) fn on_disk(&self) -> bool {
224        matches!(&self.inner, State::OnDisk(_))
225    }
226
227    async fn dump_in_memory(&mut self, blob_size: u64) -> Result<usize> {
228        if let State::InMemory(headers) = &self.inner {
229            let headers = {
230                let mut headers = headers.write().expect("rwlock");
231                std::mem::take(&mut *headers).headers
232            };
233            if headers.len() == 0 {
234                return Ok(0);
235            }
236            debug!("blob index simple in memory headers {}", headers.len());
237            let (meta_buf, bloom_offset) = self.serialize_filters()?;
238            self.bloom_offset = Some(bloom_offset as u64);
239            let findex = FileIndex::from_records(
240                self.name.as_path(),
241                self.iodriver.clone(),
242                &headers,
243                meta_buf,
244                self.params.recreate_file,
245                blob_size,
246            )
247            .await?;
248            let size = findex.file_size() as usize;
249            self.inner = State::OnDisk(findex);
250            return Ok(size);
251        }
252        Ok(0)
253    }
254
255    fn serialize_filters(&self) -> Result<(Vec<u8>, usize)> {
256        let range_buf = self.filter.range().to_raw()?;
257        let range_buf_size = range_buf.len() as u64;
258        let bloom_buf = self
259            .filter
260            .bloom()
261            .as_ref()
262            .unwrap_or(&Bloom::empty())
263            .to_raw()?;
264        let mut buf = Vec::with_capacity(size_of::<u64>() + range_buf.len() + bloom_buf.len());
265        let bloom_offset = size_of::<u64>() + range_buf.len();
266        buf.extend_from_slice(&serialize(&range_buf_size)?);
267        buf.extend_from_slice(&range_buf);
268        buf.extend_from_slice(&bloom_buf);
269        Ok((buf, bloom_offset))
270    }
271
272    fn deserialize_filters(buf: &[u8]) -> Result<(Bloom, RangeFilter<K>, usize)> {
273        let (range_size_buf, rest_buf) = buf.split_at(size_of::<u64>());
274        let range_size = deserialize(&range_size_buf)?;
275        let (range_buf, bloom_buf) = rest_buf.split_at(range_size);
276        let bloom = Bloom::from_raw(bloom_buf)?;
277        let range = RangeFilter::<K>::from_raw(range_buf)?;
278        Ok((bloom, range, range_size + size_of::<u64>()))
279    }
280
281    async fn load_in_memory(&mut self, findex: FileIndex, blob_size: u64) -> Result<()> {
282        let (record_headers, records_count) = findex.get_records_headers(blob_size).await?;
283        self.inner = State::InMemory(SRwLock::new(InMemoryData::new(record_headers, records_count)));
284        let meta_buf = findex.read_meta().await.map_err(|err| err.into_bincode_if_unexpected_eof())?;
285        let (bloom_filter, range_filter, _) = Self::deserialize_filters(&meta_buf)?;
286        let bloom_filter = if self.params.bloom_is_on {
287            Some(bloom_filter)
288        } else {
289            None
290        };
291        self.filter = CombinedFilter::new(bloom_filter, range_filter);
292        self.bloom_offset = None;
293        Ok(())
294    }
295
296    pub(crate) fn memory_used(&self) -> usize {
297        match &self.inner {
298            State::InMemory(data) => data.read().expect("rwlock").memory_used(),
299            State::OnDisk(file) => file.memory_used(),
300        }
301    }
302
303    pub(crate) fn disk_used(&self) -> u64 {
304        if let State::OnDisk(file) = &self.inner {
305            file.file_size()
306        } else {
307            0
308        }
309    }
310}
311
312#[async_trait::async_trait]
313impl<FileIndex, K> IndexTrait<K> for IndexStruct<FileIndex, K>
314where
315    FileIndex: FileIndexTrait<K> + Clone,
316    for<'a> K: Key<'a>,
317{
318    async fn contains_key(&self, key: &K) -> Result<ReadResult<BlobRecordTimestamp>> {
319        self.get_latest(key)
320            .await
321            .map(|h| h.map(|h| BlobRecordTimestamp::new(h.timestamp())))
322    }
323
324    fn push(&self, key: &K, h: RecordHeader) -> Result<()> {
325        debug!("blob index simple push");
326        match &self.inner {
327            State::InMemory(headers) => {
328                let mut data = headers.write().expect("rwlock");
329                debug!("blob index simple push bloom filter add");
330                self.filter.add(key);
331                debug!("blob index simple push key: {:?}", h.key());
332                let records_allocated;
333                if let Some(v) = data.headers.get_mut(key) {
334                    let old_capacity = v.capacity();
335                    // Keep ordered by timestamp
336                    let mut pos = 0;
337                    if v.len() > 4 {
338                        // Use binary search when len > 4. For smaller len sequential search will be faster
339                        pos = v.binary_search_by(|item| item.timestamp().cmp(&h.timestamp())).unwrap_or_else(|e| e);
340                    }
341                    // Skip records with timestamp less or equal to our (our should be the latest)
342                    while pos < v.len() && v[pos].timestamp() <= h.timestamp() {
343                        pos += 1;
344                    }
345                    v.insert(pos, h);
346                    trace!("capacity growth: {}", v.capacity() - old_capacity);
347                    records_allocated = v.capacity() - old_capacity;
348                } else {
349                    let v = vec![h];
350                    records_allocated = v.capacity(); // capacity == 1
351                    data.headers.insert(key.clone(), v);
352                }
353                data.register_record_allocation(records_allocated);
354                Ok(())
355            }
356            State::OnDisk(_) => Err(Error::from(ErrorKind::Index(
357                "Index is closed, push is unavalaible".to_string(),
358            ))
359            .into()),
360        }
361    }
362
363    async fn get_all(&self, key: &K) -> Result<Vec<RecordHeader>> {
364        let mut with_deletion = self.get_all_with_deletion_marker(key).await?;
365        if let Some(h) = with_deletion.last() {
366            if h.is_deleted() {
367                with_deletion.truncate(with_deletion.len() - 1);
368            }
369        }
370        Ok(with_deletion)
371    }
372
373    async fn get_all_with_deletion_marker(&self, key: &K) -> Result<Vec<RecordHeader>> {
374        let headers = match &self.inner {
375            State::InMemory(data) => {
376                let data = data.read().expect("rwlock");
377                Ok(data.headers.get(key).cloned().map(|mut hs| {
378                    if hs.len() > 1 {
379                        hs.reverse();
380                    }
381                    hs
382                }))
383            }
384            State::OnDisk(findex) => findex.find_by_key(key).await,
385        }?;
386        if let Some(mut hs) = headers {
387            let first_del = hs.iter().position(|h| h.is_deleted());
388            if let Some(first_del) = first_del {
389                hs.truncate(first_del + 1);
390            }
391            Ok(hs)
392        } else {
393            Ok(vec![])
394        }
395    }
396
397    async fn get_latest(&self, key: &K) -> Result<ReadResult<RecordHeader>> {
398        debug!("index get any");
399        let result = match &self.inner {
400            State::InMemory(headers) => {
401                let data = headers.read().expect("rwlock");
402                debug!("index get any in memory headers: {}", data.headers.len());
403                // in memory indexes with same key are stored in ascending order, so the last
404                // by adding time record is last in list (in b+tree disk index it's first)
405                data.headers.get(key).and_then(|h| h.last()).cloned()
406            }
407            State::OnDisk(findex) => {
408                debug!("index get any on disk");
409                findex.get_latest(key).await?
410            }
411        };
412        Ok(match result {
413            Some(header) if header.is_deleted() => {
414                ReadResult::Deleted(BlobRecordTimestamp::new(header.timestamp()))
415            }
416            Some(header) => ReadResult::Found(header),
417            None => ReadResult::NotFound,
418        })
419    }
420
421    async fn dump(&mut self, blob_size: u64) -> Result<usize> {
422        self.dump_in_memory(blob_size).await
423    }
424
425    async fn load(&mut self, blob_size: u64) -> Result<()> {
426        match &self.inner {
427            State::InMemory(_) => Ok(()),
428            State::OnDisk(findex) => {
429                let findex = findex.clone();
430                self.load_in_memory(findex, blob_size).await
431            }
432        }
433    }
434
435    fn count(&self) -> usize {
436        match &self.inner {
437            State::OnDisk(ref findex) => findex.records_count(),
438            State::InMemory(d) => d.read().expect("rwlock").records_count(),
439        }
440    }
441
442    fn push_deletion(&mut self, key: &K, header: RecordHeader) -> Result<()> {
443        debug!("mark all as deleted by {:?} key", key);
444        assert!(header.is_deleted());
445        assert!(header.data_size() == 0);
446        self.push(key, header)
447    }
448}
449
450#[async_trait::async_trait]
451pub(crate) trait FileIndexTrait<K>: Sized + Send + Sync {
452    async fn from_file(name: FileName, iodriver: IoDriver) -> Result<Self>;
453    async fn from_records(
454        path: &Path,
455        iodriver: IoDriver,
456        headers: &InMemoryIndex<K>,
457        meta: Vec<u8>,
458        recreate_index_file: bool,
459        blob_size: u64,
460    ) -> Result<Self>;
461    fn file_size(&self) -> u64;
462    fn records_count(&self) -> usize;
463    fn blob_size(&self) -> u64;
464    async fn read_meta(&self) -> Result<BytesMut>;
465    async fn read_meta_at(&self, i: u64) -> Result<u8>;
466    async fn find_by_key(&self, key: &K) -> Result<Option<Vec<RecordHeader>>>;
467    async fn get_records_headers(&self, blob_size: u64) -> Result<(InMemoryIndex<K>, usize)>;
468    async fn get_latest(&self, key: &K) -> Result<Option<RecordHeader>>;
469    fn validate(&self, blob_size: u64) -> Result<()>;
470    fn memory_used(&self) -> usize;
471}
472
473#[async_trait::async_trait]
474impl<FileIndex, K> BloomDataProvider for IndexStruct<FileIndex, K>
475where
476    FileIndex: FileIndexTrait<K>,
477    for<'a> K: Key<'a>,
478{
479    async fn read_byte(&self, index: u64) -> Result<u8> {
480        match &self.inner {
481            State::OnDisk(findex) => {
482                findex
483                    .read_meta_at(index + self.bloom_offset.expect("should be set after dump"))
484                    .await
485            }
486            _ => Err(anyhow::anyhow!("Can't read from in-memory index")),
487        }
488    }
489}