1use bytes::BytesMut;
2
3use super::prelude::*;
4use crate::filter::{BloomDataProvider, CombinedFilter, FilterTrait};
5use std::mem::size_of;
6
7pub(crate) type Index<K> = IndexStruct<BPTreeFileIndex<K>, K>;
8
9pub(crate) const HEADER_VERSION: u8 = 6;
10pub(crate) const INDEX_HEADER_MAGIC_BYTE: u64 = 0xacdc_bcde;
11
12#[derive(Debug)]
13struct IndexParams {
14 bloom_is_on: bool,
15 recreate_file: bool,
16}
17
18impl IndexParams {
19 fn new(bloom_is_on: bool, recreate_file: bool) -> Self {
20 Self {
21 bloom_is_on,
22 recreate_file,
23 }
24 }
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct IndexConfig {
29 pub bloom_config: Option<BloomConfig>,
30 pub recreate_index_file: bool,
31}
32
33impl Default for IndexConfig {
34 fn default() -> Self {
35 Self {
36 bloom_config: None,
37 recreate_index_file: true,
38 }
39 }
40}
41
42#[derive(Debug)]
43pub(crate) struct IndexStruct<FileIndex, K>
44where
45 for<'a> K: Key<'a>,
46{
47 filter: CombinedFilter<K>,
48 bloom_offset: Option<u64>,
49 params: IndexParams,
50 inner: State<FileIndex, K>,
51 name: FileName,
52 iodriver: IoDriver,
53}
54
55#[derive(Debug, Default)] struct MemoryAttrs<K> {
57 records_count: usize,
58 records_allocated: usize,
59 marker: PhantomData<K>,
60}
61
62const BTREE_B_FACTOR: usize = 6;
63const BTREE_VALUES_LEN: usize = BTREE_B_FACTOR * 2 - 1;
64const BTREE_EDGES_LEN: usize = BTREE_B_FACTOR * 2;
65
66impl<K> MemoryAttrs<K>
67where
68 for<'a> K: Key<'a>,
69{
70 const BTREE_ENTRY_SIZE: usize = K::MEM_SIZE + size_of::<Vec<RecordHeader>>();
71 const RECORD_HEADER_SIZE: usize = size_of::<RecordHeader>() + K::LEN as usize;
72 const BTREE_DATA_NODE_SIZE: usize = size_of::<Option<std::ptr::NonNull<()>>>() + size_of::<u16>() * 2 + MemoryAttrs::<K>::BTREE_ENTRY_SIZE * BTREE_VALUES_LEN; const BTREE_DATA_NODE_RATIO: f64 = 1.0 / BTREE_VALUES_LEN as f64;
79 const BTREE_INTERNAL_NODE_OVERHEAD: usize = size_of::<std::ptr::NonNull<()>>() * BTREE_EDGES_LEN; const BTREE_INTERNAL_NODE_RATIO: f64 = (1 +
81 BTREE_EDGES_LEN +
82 BTREE_EDGES_LEN.pow(2) +
83 BTREE_EDGES_LEN.pow(3) +
84 BTREE_EDGES_LEN.pow(4)) as f64
85 / (BTREE_VALUES_LEN * BTREE_EDGES_LEN.pow(5)) as f64;
86 const BTREE_SIZE_MULTIPLIER: f64 =
87 (MemoryAttrs::<K>::BTREE_DATA_NODE_SIZE as f64 * MemoryAttrs::<K>::BTREE_DATA_NODE_RATIO) +
88 (MemoryAttrs::<K>::BTREE_INTERNAL_NODE_OVERHEAD as f64 * MemoryAttrs::<K>::BTREE_INTERNAL_NODE_RATIO);
89}
90
91pub type InMemoryIndex<K> = BTreeMap<K, Vec<RecordHeader>>;
92
93#[derive(Debug, Default)]
94pub(crate) struct InMemoryData<K> {
95 headers: InMemoryIndex<K>,
96 mem: MemoryAttrs<K>,
97}
98
99impl<K> InMemoryData<K>
100where
101 for<'a> K: Key<'a>,
102{
103 fn new(headers: InMemoryIndex<K>, count: usize) -> Self {
104 let mem = MemoryAttrs {
105 records_allocated: headers.values().fold(0, |acc, v| acc + v.capacity()),
106 records_count: count,
107 marker: PhantomData,
108 };
109
110 Self { headers, mem }
111 }
112
113 fn memory_used(&self) -> usize {
114 let Self { mem, .. } = &self;
115 let MemoryAttrs {
116 records_count,
117 records_allocated,
118 ..
119 } = &mem;
120 let len = self.headers.len();
121 trace!("len: {}, records_allocated: {}, records_count: {}",
122 len, records_allocated, records_count);
123 MemoryAttrs::<K>::RECORD_HEADER_SIZE * records_allocated
126 + (len as f64 * MemoryAttrs::<K>::BTREE_SIZE_MULTIPLIER) as usize
127 - (records_allocated - records_count) * K::LEN as usize
128 }
129
130 fn records_count(&self) -> usize {
131 self.mem.records_count
132 }
133
134 fn register_record_allocation(&mut self, records_allocated: usize) {
135 self.mem.records_allocated += records_allocated;
136 self.mem.records_count += 1;
137 }
138}
139
140#[derive(Debug)]
141pub(crate) enum State<FileIndex, K> {
142 InMemory(SRwLock<InMemoryData<K>>),
143 OnDisk(FileIndex),
144}
145
146impl<FileIndex, K> IndexStruct<FileIndex, K>
147where
148 FileIndex: FileIndexTrait<K>,
149 for<'a> K: Key<'a>,
150{
151 pub(crate) fn new(name: FileName, iodriver: IoDriver, config: IndexConfig) -> Self {
152 let params = IndexParams::new(config.bloom_config.is_some(), config.recreate_index_file);
153 let bloom_filter = config.bloom_config.map(|cfg| Bloom::new(cfg));
154 Self {
155 params,
156 filter: CombinedFilter::new(bloom_filter, RangeFilter::new()),
157 bloom_offset: None,
158 inner: State::InMemory(SRwLock::default()),
159 name,
160 iodriver,
161 }
162 }
163
164 pub(crate) fn clear(&mut self) {
165 self.inner = State::InMemory(SRwLock::default());
166 self.filter.clear_filter();
167 }
168
169 pub fn offload_filter(&mut self) -> usize {
170 if self.on_disk() {
171 self.filter.offload_filter()
172 } else {
173 0
174 }
175 }
176
177 pub fn get_filter(&self) -> &CombinedFilter<K> {
178 &self.filter
179 }
180
181 pub(crate) fn name(&self) -> &FileName {
182 &self.name
183 }
184
185 pub(crate) fn contains_key_fast(&self, key: &K) -> Option<bool> {
187 match &self.inner {
188 State::InMemory(index) => Some(index.read().expect("read lock acquired").headers.contains_key(key)),
189 State::OnDisk(_) => None
190 }
191 }
192
193 pub(crate) async fn from_file(
194 name: FileName,
195 config: IndexConfig,
196 iodriver: IoDriver,
197 blob_size: u64,
198 ) -> Result<Self> {
199 let findex = FileIndex::from_file(name.clone(), iodriver.clone()).await?;
200 findex
201 .validate(blob_size)
202 .with_context(|| "Header is corrupt")?;
203 let meta_buf = findex.read_meta().await.map_err(|err| err.into_bincode_if_unexpected_eof())?;
204 let (bloom_filter, range_filter, bloom_offset) = Self::deserialize_filters(&meta_buf)?;
205 let params = IndexParams::new(config.bloom_config.is_some(), config.recreate_index_file);
206 let bloom_filter = if params.bloom_is_on {
207 Some(bloom_filter)
208 } else {
209 None
210 };
211 trace!("index restored successfuly");
212 let index = Self {
213 inner: State::OnDisk(findex),
214 name,
215 filter: CombinedFilter::new(bloom_filter, range_filter),
216 bloom_offset: Some(bloom_offset as u64),
217 params,
218 iodriver,
219 };
220 Ok(index)
221 }
222
223 pub(crate) fn on_disk(&self) -> bool {
224 matches!(&self.inner, State::OnDisk(_))
225 }
226
227 async fn dump_in_memory(&mut self, blob_size: u64) -> Result<usize> {
228 if let State::InMemory(headers) = &self.inner {
229 let headers = {
230 let mut headers = headers.write().expect("rwlock");
231 std::mem::take(&mut *headers).headers
232 };
233 if headers.len() == 0 {
234 return Ok(0);
235 }
236 debug!("blob index simple in memory headers {}", headers.len());
237 let (meta_buf, bloom_offset) = self.serialize_filters()?;
238 self.bloom_offset = Some(bloom_offset as u64);
239 let findex = FileIndex::from_records(
240 self.name.as_path(),
241 self.iodriver.clone(),
242 &headers,
243 meta_buf,
244 self.params.recreate_file,
245 blob_size,
246 )
247 .await?;
248 let size = findex.file_size() as usize;
249 self.inner = State::OnDisk(findex);
250 return Ok(size);
251 }
252 Ok(0)
253 }
254
255 fn serialize_filters(&self) -> Result<(Vec<u8>, usize)> {
256 let range_buf = self.filter.range().to_raw()?;
257 let range_buf_size = range_buf.len() as u64;
258 let bloom_buf = self
259 .filter
260 .bloom()
261 .as_ref()
262 .unwrap_or(&Bloom::empty())
263 .to_raw()?;
264 let mut buf = Vec::with_capacity(size_of::<u64>() + range_buf.len() + bloom_buf.len());
265 let bloom_offset = size_of::<u64>() + range_buf.len();
266 buf.extend_from_slice(&serialize(&range_buf_size)?);
267 buf.extend_from_slice(&range_buf);
268 buf.extend_from_slice(&bloom_buf);
269 Ok((buf, bloom_offset))
270 }
271
272 fn deserialize_filters(buf: &[u8]) -> Result<(Bloom, RangeFilter<K>, usize)> {
273 let (range_size_buf, rest_buf) = buf.split_at(size_of::<u64>());
274 let range_size = deserialize(&range_size_buf)?;
275 let (range_buf, bloom_buf) = rest_buf.split_at(range_size);
276 let bloom = Bloom::from_raw(bloom_buf)?;
277 let range = RangeFilter::<K>::from_raw(range_buf)?;
278 Ok((bloom, range, range_size + size_of::<u64>()))
279 }
280
281 async fn load_in_memory(&mut self, findex: FileIndex, blob_size: u64) -> Result<()> {
282 let (record_headers, records_count) = findex.get_records_headers(blob_size).await?;
283 self.inner = State::InMemory(SRwLock::new(InMemoryData::new(record_headers, records_count)));
284 let meta_buf = findex.read_meta().await.map_err(|err| err.into_bincode_if_unexpected_eof())?;
285 let (bloom_filter, range_filter, _) = Self::deserialize_filters(&meta_buf)?;
286 let bloom_filter = if self.params.bloom_is_on {
287 Some(bloom_filter)
288 } else {
289 None
290 };
291 self.filter = CombinedFilter::new(bloom_filter, range_filter);
292 self.bloom_offset = None;
293 Ok(())
294 }
295
296 pub(crate) fn memory_used(&self) -> usize {
297 match &self.inner {
298 State::InMemory(data) => data.read().expect("rwlock").memory_used(),
299 State::OnDisk(file) => file.memory_used(),
300 }
301 }
302
303 pub(crate) fn disk_used(&self) -> u64 {
304 if let State::OnDisk(file) = &self.inner {
305 file.file_size()
306 } else {
307 0
308 }
309 }
310}
311
312#[async_trait::async_trait]
313impl<FileIndex, K> IndexTrait<K> for IndexStruct<FileIndex, K>
314where
315 FileIndex: FileIndexTrait<K> + Clone,
316 for<'a> K: Key<'a>,
317{
318 async fn contains_key(&self, key: &K) -> Result<ReadResult<BlobRecordTimestamp>> {
319 self.get_latest(key)
320 .await
321 .map(|h| h.map(|h| BlobRecordTimestamp::new(h.timestamp())))
322 }
323
324 fn push(&self, key: &K, h: RecordHeader) -> Result<()> {
325 debug!("blob index simple push");
326 match &self.inner {
327 State::InMemory(headers) => {
328 let mut data = headers.write().expect("rwlock");
329 debug!("blob index simple push bloom filter add");
330 self.filter.add(key);
331 debug!("blob index simple push key: {:?}", h.key());
332 let records_allocated;
333 if let Some(v) = data.headers.get_mut(key) {
334 let old_capacity = v.capacity();
335 let mut pos = 0;
337 if v.len() > 4 {
338 pos = v.binary_search_by(|item| item.timestamp().cmp(&h.timestamp())).unwrap_or_else(|e| e);
340 }
341 while pos < v.len() && v[pos].timestamp() <= h.timestamp() {
343 pos += 1;
344 }
345 v.insert(pos, h);
346 trace!("capacity growth: {}", v.capacity() - old_capacity);
347 records_allocated = v.capacity() - old_capacity;
348 } else {
349 let v = vec![h];
350 records_allocated = v.capacity(); data.headers.insert(key.clone(), v);
352 }
353 data.register_record_allocation(records_allocated);
354 Ok(())
355 }
356 State::OnDisk(_) => Err(Error::from(ErrorKind::Index(
357 "Index is closed, push is unavalaible".to_string(),
358 ))
359 .into()),
360 }
361 }
362
363 async fn get_all(&self, key: &K) -> Result<Vec<RecordHeader>> {
364 let mut with_deletion = self.get_all_with_deletion_marker(key).await?;
365 if let Some(h) = with_deletion.last() {
366 if h.is_deleted() {
367 with_deletion.truncate(with_deletion.len() - 1);
368 }
369 }
370 Ok(with_deletion)
371 }
372
373 async fn get_all_with_deletion_marker(&self, key: &K) -> Result<Vec<RecordHeader>> {
374 let headers = match &self.inner {
375 State::InMemory(data) => {
376 let data = data.read().expect("rwlock");
377 Ok(data.headers.get(key).cloned().map(|mut hs| {
378 if hs.len() > 1 {
379 hs.reverse();
380 }
381 hs
382 }))
383 }
384 State::OnDisk(findex) => findex.find_by_key(key).await,
385 }?;
386 if let Some(mut hs) = headers {
387 let first_del = hs.iter().position(|h| h.is_deleted());
388 if let Some(first_del) = first_del {
389 hs.truncate(first_del + 1);
390 }
391 Ok(hs)
392 } else {
393 Ok(vec![])
394 }
395 }
396
397 async fn get_latest(&self, key: &K) -> Result<ReadResult<RecordHeader>> {
398 debug!("index get any");
399 let result = match &self.inner {
400 State::InMemory(headers) => {
401 let data = headers.read().expect("rwlock");
402 debug!("index get any in memory headers: {}", data.headers.len());
403 data.headers.get(key).and_then(|h| h.last()).cloned()
406 }
407 State::OnDisk(findex) => {
408 debug!("index get any on disk");
409 findex.get_latest(key).await?
410 }
411 };
412 Ok(match result {
413 Some(header) if header.is_deleted() => {
414 ReadResult::Deleted(BlobRecordTimestamp::new(header.timestamp()))
415 }
416 Some(header) => ReadResult::Found(header),
417 None => ReadResult::NotFound,
418 })
419 }
420
421 async fn dump(&mut self, blob_size: u64) -> Result<usize> {
422 self.dump_in_memory(blob_size).await
423 }
424
425 async fn load(&mut self, blob_size: u64) -> Result<()> {
426 match &self.inner {
427 State::InMemory(_) => Ok(()),
428 State::OnDisk(findex) => {
429 let findex = findex.clone();
430 self.load_in_memory(findex, blob_size).await
431 }
432 }
433 }
434
435 fn count(&self) -> usize {
436 match &self.inner {
437 State::OnDisk(ref findex) => findex.records_count(),
438 State::InMemory(d) => d.read().expect("rwlock").records_count(),
439 }
440 }
441
442 fn push_deletion(&mut self, key: &K, header: RecordHeader) -> Result<()> {
443 debug!("mark all as deleted by {:?} key", key);
444 assert!(header.is_deleted());
445 assert!(header.data_size() == 0);
446 self.push(key, header)
447 }
448}
449
450#[async_trait::async_trait]
451pub(crate) trait FileIndexTrait<K>: Sized + Send + Sync {
452 async fn from_file(name: FileName, iodriver: IoDriver) -> Result<Self>;
453 async fn from_records(
454 path: &Path,
455 iodriver: IoDriver,
456 headers: &InMemoryIndex<K>,
457 meta: Vec<u8>,
458 recreate_index_file: bool,
459 blob_size: u64,
460 ) -> Result<Self>;
461 fn file_size(&self) -> u64;
462 fn records_count(&self) -> usize;
463 fn blob_size(&self) -> u64;
464 async fn read_meta(&self) -> Result<BytesMut>;
465 async fn read_meta_at(&self, i: u64) -> Result<u8>;
466 async fn find_by_key(&self, key: &K) -> Result<Option<Vec<RecordHeader>>>;
467 async fn get_records_headers(&self, blob_size: u64) -> Result<(InMemoryIndex<K>, usize)>;
468 async fn get_latest(&self, key: &K) -> Result<Option<RecordHeader>>;
469 fn validate(&self, blob_size: u64) -> Result<()>;
470 fn memory_used(&self) -> usize;
471}
472
473#[async_trait::async_trait]
474impl<FileIndex, K> BloomDataProvider for IndexStruct<FileIndex, K>
475where
476 FileIndex: FileIndexTrait<K>,
477 for<'a> K: Key<'a>,
478{
479 async fn read_byte(&self, index: u64) -> Result<u8> {
480 match &self.inner {
481 State::OnDisk(findex) => {
482 findex
483 .read_meta_at(index + self.bloom_offset.expect("should be set after dump"))
484 .await
485 }
486 _ => Err(anyhow::anyhow!("Can't read from in-memory index")),
487 }
488 }
489}