pearl/blob/
core.rs

1use std::time::SystemTime;
2
3use bytes::{BufMut, BytesMut};
4use tokio::time::Instant;
5
6use crate::error::ValidationErrorKind;
7use crate::filter::{CombinedFilter, FilterTrait};
8use crate::storage::{BlobRecordTimestamp, ReadResult};
9
10use super::prelude::*;
11
12use super::{header::Header, index::IndexTrait};
13
14pub(crate) const BLOB_INDEX_FILE_EXTENSION: &str = "index";
15
16/// A [`Blob`] struct representing file with records,
17/// provides methods for read/write access by key
18///
19/// [`Blob`]: struct.Blob.html
20#[derive(Debug)]
21pub struct Blob<K>
22where
23    for<'a> K: Key<'a>,
24{
25    header: Header,
26    index: Index<K>,
27    name: Arc<FileName>,
28    file: File,
29    created_at: SystemTime,
30    validate_data_during_index_regen: bool,
31}
32
33pub(crate) struct WriteResult {
34    pub dirty_bytes: u64
35}
36
37pub(crate) struct DeleteResult {
38    pub dirty_bytes: u64,
39    pub deleted: bool
40}
41
42impl<K> Blob<K>
43where
44    for<'a> K: Key<'a> + 'static,
45{
46    /// # Description
47    /// Creates new blob file with given [`FileName`].
48    /// And creates index from existing `.index` file or scans corresponding blob.
49    /// # Panic
50    /// Panics if file with same path already exists
51    ///
52    /// [`FileName`]: struct.FileName.html
53    pub(crate) async fn open_new(
54        name: FileName,
55        iodriver: IoDriver,
56        config: BlobConfig,
57    ) -> Result<Self> {
58        let BlobConfig {
59            index: index_config,
60            validate_data_during_index_regen,
61        } = config;
62        let file = iodriver.create(name.as_path()).await?;
63        let index = Self::create_index(&name, iodriver, index_config);
64        let header = Header::new();
65        let mut blob = Self {
66            header,
67            index,
68            name: Arc::new(name),
69            file,
70            created_at: SystemTime::now(),
71            validate_data_during_index_regen,
72        };
73        blob.write_header().await?;
74        Ok(blob)
75    }
76
77    pub fn name(&self) -> &FileName {
78        &self.name
79    }
80
81    pub(crate) fn created_at(&self) -> SystemTime {
82        self.created_at
83    }
84
85    async fn write_header(&mut self) -> Result<()> {
86        let size = self.header.serialized_size();
87        let mut buf = BytesMut::with_capacity(size as usize);
88        serialize_into((&mut buf).writer(), &self.header)?;
89        self.file.write_append_all(buf.freeze()).await?;
90        self.file.fsyncdata().await?;
91        Ok(())
92    }
93
94    #[inline]
95    fn create_index(name: &FileName, iodriver: IoDriver, index_config: IndexConfig) -> Index<K> {
96        Index::new(name.with_extension(BLOB_INDEX_FILE_EXTENSION), iodriver, index_config)
97    }
98
99    pub(crate) async fn dump(&mut self) -> Result<usize> {
100        if self.index.on_disk() {
101            Ok(0) // 0 bytes dumped
102        } else {
103            self.fsyncdata()
104                .await
105                .with_context(|| format!("blob file dump failed: {:?}", self.name.as_path()))?;
106
107            self.index.dump(self.file_size()).await.with_context(|| {
108                format!(
109                    "index file dump failed, associated blob file: {:?}",
110                    self.name.as_path()
111                )
112            })
113        }
114    }
115
116    pub(crate) async fn load_index(&mut self) -> Result<()> {
117        if let Err(e) = self.index.load(self.file_size()).await {
118            warn!("error loading index: {}, regenerating", e);
119            self.index.clear();
120            self.try_regenerate_index().await?;
121        }
122        Ok(())
123    }
124
125    pub(crate) async fn from_file(
126        path: PathBuf,
127        iodriver: IoDriver,
128        config: BlobConfig,
129    ) -> Result<Self> {
130        let now = Instant::now();
131        let file = iodriver.open(&path).await?;
132        let name = FileName::from_path(&path)?;
133        info!("{} blob init started", name);
134        let size = file.size();
135
136        let header = Header::from_file(&file, &path)
137            .await
138            .with_context(|| format!("failed to read blob header. Blob file: {:?}", path))?;
139
140        let index_name = name.with_extension(BLOB_INDEX_FILE_EXTENSION);
141        let BlobConfig {
142            index: index_config,
143            validate_data_during_index_regen,
144        } = config;
145        trace!("looking for index file: [{}]", index_name);
146        let mut is_index_corrupted = false;
147        let index = if index_name.exists() {
148            trace!("file exists");
149            Index::from_file(
150                index_name.clone(),
151                index_config.clone(),
152                iodriver.clone(),
153                size,
154            )
155            .await
156            .or_else(|error| {
157                if let Some(io_error) = error.downcast_ref::<IOError>() {
158                    match io_error.kind() {
159                        IOErrorKind::PermissionDenied | IOErrorKind::Other => {
160                            warn!(
161                                "index for file '{:?}' cannot be regenerated due to an error: {}",
162                                path, io_error
163                            );
164                            return Err(error);
165                        }
166                        _ => {}
167                    }
168                }
169                is_index_corrupted = true;
170                Ok(Index::new(index_name, iodriver, index_config))
171            })?
172        } else {
173            trace!("file not found, create new");
174            Index::new(index_name, iodriver, index_config)
175        };
176        trace!("index initialized");
177        let header_size = bincode::serialized_size(&header)?;
178        let created_at = file.created_at()?;
179        let mut blob = Self {
180            header,
181            file,
182            name: Arc::new(name),
183            index,
184            created_at,
185            validate_data_during_index_regen,
186        };
187        trace!("call update index");
188        if is_index_corrupted || size as u64 > header_size {
189            blob.try_regenerate_index()
190                .await
191                .with_context(|| format!("failed to regenerate index for blob file: {:?}", path))?;
192        } else {
193            warn!("empty or corrupted blob: {:?}", path);
194        }
195        trace!("check data consistency");
196        Self::check_data_consistency();
197        info!(
198            "{} init finished: {}ms",
199            blob.name(),
200            now.elapsed().as_millis()
201        );
202        Ok(blob)
203    }
204
205    async fn raw_records(&self, validate_data: bool) -> Result<RawRecords> {
206        RawRecords::start(
207            self.file.clone(),
208            bincode::serialized_size(&self.header)?,
209            K::LEN as usize,
210            validate_data,
211        )
212        .await
213        .context("failed to create iterator for raw records")
214    }
215
216    pub(crate) async fn try_regenerate_index(&mut self) -> Result<()> {
217        info!("try regenerate index for blob: {}", self.name);
218        if self.index.on_disk() {
219            debug!("index already updated");
220            return Ok(());
221        }
222        debug!("index file missed");
223        let raw_r = self
224            .raw_records(self.validate_data_during_index_regen)
225            .await
226            .with_context(|| {
227                format!(
228                    "failed to read raw records from blob {:?}",
229                    self.name.as_path()
230                )
231            })?;
232        debug!("raw records loaded");
233        if let Some(headers) = raw_r.load().await.with_context(|| {
234            format!(
235                "load headers from blob file failed, {:?}",
236                self.name.as_path()
237            )
238        })? {
239            for header in headers {
240                let key = header.key().into();
241                self.index.push(&key, header).context("index push failed")?;
242            }
243        }
244        debug!("index successfully generated: {}", self.index.name());
245        Ok(())
246    }
247
248    pub(crate) fn check_data_consistency() {
249        // @TODO implement
250    }
251
252    pub(crate) async fn write(blob: &ASRwLock<Self>, key: &K, record: Record) -> Result<WriteResult> {
253        debug!("blob write");
254        let (partially_serialized, mut header) = record.to_partially_serialized_and_header()?;
255        // Only one upgradable_read lock is allowed at a time. This is critical because we want to
256        // be sure that only one write operation is running at a time
257        let blob = blob.upgradable_read().await;
258        let write_result = partially_serialized.write_to_file(&blob.file).await?;
259        header.set_offset_checksum(write_result.blob_offset(), write_result.header_checksum());
260        blob.index.push(key, header)?;
261        Ok(WriteResult { dirty_bytes: blob.file.dirty_bytes() })
262    }
263
264    async fn write_mut(&mut self, key: &K, record: Record) -> Result<WriteResult> {
265        debug!("blob write");
266        let (record, mut header) = record.to_partially_serialized_and_header()?;
267        let write_result = record.write_to_file(&self.file).await?;
268        header.set_offset_checksum(write_result.blob_offset(), write_result.header_checksum());
269        self.index.push(key, header)?;
270        Ok(WriteResult { dirty_bytes: self.file.dirty_bytes() })
271    }
272
273    #[inline]
274    pub(crate) async fn read_all_entries_with_deletion_marker(
275        &self,
276        key: &K,
277    ) -> Result<Vec<Entry>> {
278        let headers = self.index.get_all_with_deletion_marker(key).await?;
279        debug_assert!(headers
280            .iter()
281            .zip(headers.iter().skip(1))
282            .all(|(x, y)| x.timestamp() >= y.timestamp()));
283        Ok(Self::headers_to_entries(headers, &self.file, &self.name))
284    }
285
286    pub(crate) async fn delete(
287        &mut self,
288        key: &K,
289        timestamp: BlobRecordTimestamp,
290        meta: Option<Meta>,
291        only_if_presented: bool,
292    ) -> Result<DeleteResult> {
293        if !only_if_presented || self.index.get_latest(key).await?.is_found() {
294            let record = Record::deleted(key, timestamp.into(), meta)?;
295            self.push_deletion_record(key, record).await
296        } else {
297            Ok(DeleteResult { dirty_bytes: self.file.dirty_bytes(), deleted: false })
298        }
299    }
300
301    async fn push_deletion_record(&mut self, key: &K, record: Record) -> Result<DeleteResult> {
302        let on_disk = self.index.on_disk();
303        if on_disk {
304            self.load_index().await?;
305        }
306        let result = self.write_mut(key, record).await?;
307        Ok(DeleteResult { dirty_bytes: result.dirty_bytes, deleted: true })
308    }
309
310    fn headers_to_entries(headers: Vec<RecordHeader>, file: &File, file_name: &Arc<FileName>) -> Vec<Entry> {
311        headers
312            .into_iter()
313            .map(|header| Entry::new(header, file.clone(), file_name.clone()))
314            .collect()
315    }
316
317    /// Returns latest Entry from Blob for specified key and meta
318    pub(crate) async fn get_latest_entry(
319        &self,
320        key: &K,
321        meta: Option<&Meta>,
322        check_filters: bool,
323    ) -> Result<ReadResult<Entry>> {
324        debug!("blob get any entry {:?}, {:?}", key, meta);
325        if check_filters && self.check_filter(key).await == FilterResult::NotContains {
326            debug!("Key was filtered out by filters");
327            Ok(ReadResult::NotFound)
328        } else if let Some(meta) = meta {
329            debug!("blob get any entry meta: {:?}", meta);
330            self.get_entry_with_meta(key, meta).await
331        } else {
332            debug!("blob get any entry bloom true no meta");
333            Ok(self
334                .index
335                .get_latest(key)
336                .await
337                .with_context(|| {
338                    format!("index get any failed for blob: {:?}", self.name.as_path())
339                })?
340                .map(|header| {
341                    let entry = Entry::new(header, self.file.clone(), self.name.clone());
342                    debug!("blob, get any entry, bloom true no meta, entry found");
343                    entry
344                }))
345        }
346    }
347
348    async fn get_entry_with_meta(&self, key: &K, meta: &Meta) -> Result<ReadResult<Entry>> {
349        let mut headers = self.index.get_all_with_deletion_marker(key).await?;
350        let deleted_ts = headers
351            .last()
352            .filter(|h| h.is_deleted())
353            .map(|h| BlobRecordTimestamp::new(h.timestamp()));
354        if deleted_ts.is_some() {
355            headers.truncate(headers.len() - 1);
356        }
357        let entries = Self::headers_to_entries(headers, &self.file, &self.name);
358        if let Some(entries) = self.filter_entries(entries, meta).await? {
359            Ok(ReadResult::Found(entries))
360        } else {
361            if let Some(ts) = deleted_ts {
362                return Ok(ReadResult::Deleted(ts));
363            }
364            Ok(ReadResult::NotFound)
365        }
366    }
367
368    async fn filter_entries(&self, entries: Vec<Entry>, meta: &Meta) -> Result<Option<Entry>> {
369        for mut entry in entries {
370            if Some(meta) == entry.load_meta().await? {
371                return Ok(Some(entry));
372            }
373        }
374        Ok(None)
375    }
376
377    #[inline]
378    pub(crate) fn file_size(&self) -> u64 {
379        self.file.size()
380    }
381
382    pub(crate) fn records_count(&self) -> usize {
383        self.index.count()
384    }
385
386    pub(crate) fn file_dirty_bytes(&self) -> u64 {
387        self.file.dirty_bytes()
388    }
389
390    pub(crate) async fn fsyncdata(&self) -> IOResult<()> {
391        self.file.fsyncdata().await
392    }
393
394    #[inline]
395    pub(crate) fn id(&self) -> usize {
396        self.name.id()
397    }
398
399    pub(crate) fn index_memory(&self) -> usize {
400        self.index.memory_used()
401    }
402
403    pub(crate) fn disk_used(&self) -> u64 {
404        self.file_size() + self.index.disk_used()
405    }
406}
407
408
409struct RawRecords {
410    current_offset: u64,
411    record_header_size: u64,
412    file: File,
413    validate_data: bool,
414}
415
416impl RawRecords {
417    async fn start(
418        file: File,
419        blob_header_size: u64,
420        key_size: usize,
421        validate_data: bool,
422    ) -> Result<Self> {
423        let current_offset = blob_header_size;
424        debug!("blob raw records start, current offset: {}", current_offset);
425        let size_of_len = bincode::serialized_size(&(0_usize))? as usize;
426        let size_of_magic_byte = bincode::serialized_size(&RECORD_MAGIC_BYTE)? as usize;
427        debug!(
428            "blob raw records start, read at: size {}, offset: {}",
429            size_of_len,
430            current_offset + size_of_len as u64
431        );
432        // plus size of usize because serialized
433        // vector contains usize len in front
434        let buf = file
435            .read_exact_at_allocate(size_of_magic_byte + size_of_len, current_offset)
436            .await
437            .map_err(|err| err.into_bincode_if_unexpected_eof())
438            .context("Can't read BLOB header from file")?;
439        let (magic_byte_buf, key_len_buf) = buf.split_at(size_of_magic_byte);
440        debug!("blob raw records start, read at {} bytes", buf.len());
441        let magic_byte = bincode::deserialize::<u64>(magic_byte_buf)
442            .map_err(|err| Error::from(err))
443            .context("failed to deserialize magic byte")?;
444        Self::check_record_header_magic_byte(magic_byte)?;
445        let key_len = bincode::deserialize::<usize>(key_len_buf)
446            .map_err(|err| Error::from(err))
447            .context("failed to deserialize index buf vec length")?;
448        if key_len != key_size {
449            let msg = "blob key_size is not equal to pearl compile-time key size";
450            return Err(Error::validation(ValidationErrorKind::BlobKeySize, msg).into());
451        }
452        let record_header_size = RecordHeader::default().serialized_size() + key_len as u64;
453        debug!(
454            "blob raw records start, record header size: {}",
455            record_header_size
456        );
457        Ok(Self {
458            current_offset,
459            record_header_size,
460            file,
461            validate_data,
462        })
463    }
464
465    fn check_record_header_magic_byte(magic_byte: u64) -> Result<()> {
466        if magic_byte == RECORD_MAGIC_BYTE {
467            Ok(())
468        } else {
469            let param = ValidationErrorKind::RecordMagicByte;
470            Err(Error::validation(param, "First record's magic byte is wrong").into())
471        }
472    }
473
474    async fn load(mut self) -> Result<Option<Vec<RecordHeader>>> {
475        debug!("blob raw records load");
476        let mut headers = Vec::new();
477        while self.current_offset < self.file.size() {
478            let (header, data) = self
479                .read_current_record(self.validate_data)
480                .await
481                .with_context(|| {
482                    format!(
483                        "read record header or data failed, at {}",
484                        self.current_offset
485                    )
486                })?;
487            if let Some(data) = data {
488                header.data_checksum_audit(&data)
489                    .with_context(|| format!("bad data checksum, at {}", self.current_offset))?;
490            }
491            headers.push(header);
492        }
493        if headers.is_empty() {
494            Ok(None)
495        } else {
496            Ok(Some(headers))
497        }
498    }
499
500    async fn read_current_record(
501        &mut self,
502        read_data: bool,
503    ) -> Result<(RecordHeader, Option<BytesMut>)> {
504        let mut buf = self
505            .file
506            .read_exact_at_allocate(self.record_header_size as usize, self.current_offset)
507            .await
508            .map_err(|err| err.into_bincode_if_unexpected_eof())
509            .with_context(|| format!("read at call failed, size {}", self.current_offset))?;
510        let header = RecordHeader::from_raw(&buf)
511            .map_err(|e| Error::from(ErrorKind::Bincode(e.to_string())))
512            .with_context(|| {
513                format!(
514                    "header deserialization from raw failed, buf len: {}",
515                    buf.len()
516                )
517            })?;
518        header.validate()?;
519        self.current_offset += self.record_header_size;
520        self.current_offset += header.meta_size();
521        let data = if read_data {
522            buf.resize(header.data_size() as usize, 0);
523            buf = self
524                .file
525                .read_exact_at(buf, self.current_offset)
526                .await
527                .map_err(|err| err.into_bincode_if_unexpected_eof())
528                .with_context(|| format!("read at call failed, size {}", self.current_offset))?;
529            Some(buf)
530        } else {
531            None
532        };
533        self.current_offset += header.data_size();
534        Ok((header, data))
535    }
536}
537
538#[async_trait::async_trait]
539impl<K> BloomProvider<K> for Blob<K>
540where
541    for<'a> K: Key<'a> + 'static,
542{
543    type Filter = CombinedFilter<K>;
544    async fn check_filter(&self, item: &K) -> FilterResult {
545        match self.index.contains_key_fast(item) {
546            Some(true) => { return FilterResult::NeedAdditionalCheck; },
547            Some(false) => { return FilterResult::NotContains; },
548            None => { }
549        }
550        
551        self.index.get_filter().contains(&self.index, item).await
552    }
553
554    fn check_filter_fast(&self, item: &K) -> FilterResult {
555        match self.index.contains_key_fast(item) {
556            Some(true) => { return FilterResult::NeedAdditionalCheck; },
557            Some(false) => { return FilterResult::NotContains; },
558            None => { }
559        }
560        
561        self.index.get_filter().contains_fast(item)
562    }
563
564    async fn offload_buffer(&mut self, _: usize, _: usize) -> usize {
565        self.index.offload_filter()
566    }
567
568    async fn get_filter(&self) -> Option<Self::Filter> {
569        Some(self.index.get_filter().clone())
570    }
571
572    fn get_filter_fast(&self) -> Option<&Self::Filter> {
573        Some(self.index.get_filter())
574    }
575
576    async fn filter_memory_allocated(&self) -> usize {
577        self.index.get_filter().memory_allocated()
578    }
579}