1use std::time::SystemTime;
2
3use bytes::{BufMut, BytesMut};
4use tokio::time::Instant;
5
6use crate::error::ValidationErrorKind;
7use crate::filter::{CombinedFilter, FilterTrait};
8use crate::storage::{BlobRecordTimestamp, ReadResult};
9
10use super::prelude::*;
11
12use super::{header::Header, index::IndexTrait};
13
14pub(crate) const BLOB_INDEX_FILE_EXTENSION: &str = "index";
15
16#[derive(Debug)]
21pub struct Blob<K>
22where
23 for<'a> K: Key<'a>,
24{
25 header: Header,
26 index: Index<K>,
27 name: Arc<FileName>,
28 file: File,
29 created_at: SystemTime,
30 validate_data_during_index_regen: bool,
31}
32
33pub(crate) struct WriteResult {
34 pub dirty_bytes: u64
35}
36
37pub(crate) struct DeleteResult {
38 pub dirty_bytes: u64,
39 pub deleted: bool
40}
41
42impl<K> Blob<K>
43where
44 for<'a> K: Key<'a> + 'static,
45{
46 pub(crate) async fn open_new(
54 name: FileName,
55 iodriver: IoDriver,
56 config: BlobConfig,
57 ) -> Result<Self> {
58 let BlobConfig {
59 index: index_config,
60 validate_data_during_index_regen,
61 } = config;
62 let file = iodriver.create(name.as_path()).await?;
63 let index = Self::create_index(&name, iodriver, index_config);
64 let header = Header::new();
65 let mut blob = Self {
66 header,
67 index,
68 name: Arc::new(name),
69 file,
70 created_at: SystemTime::now(),
71 validate_data_during_index_regen,
72 };
73 blob.write_header().await?;
74 Ok(blob)
75 }
76
77 pub fn name(&self) -> &FileName {
78 &self.name
79 }
80
81 pub(crate) fn created_at(&self) -> SystemTime {
82 self.created_at
83 }
84
85 async fn write_header(&mut self) -> Result<()> {
86 let size = self.header.serialized_size();
87 let mut buf = BytesMut::with_capacity(size as usize);
88 serialize_into((&mut buf).writer(), &self.header)?;
89 self.file.write_append_all(buf.freeze()).await?;
90 self.file.fsyncdata().await?;
91 Ok(())
92 }
93
94 #[inline]
95 fn create_index(name: &FileName, iodriver: IoDriver, index_config: IndexConfig) -> Index<K> {
96 Index::new(name.with_extension(BLOB_INDEX_FILE_EXTENSION), iodriver, index_config)
97 }
98
99 pub(crate) async fn dump(&mut self) -> Result<usize> {
100 if self.index.on_disk() {
101 Ok(0) } else {
103 self.fsyncdata()
104 .await
105 .with_context(|| format!("blob file dump failed: {:?}", self.name.as_path()))?;
106
107 self.index.dump(self.file_size()).await.with_context(|| {
108 format!(
109 "index file dump failed, associated blob file: {:?}",
110 self.name.as_path()
111 )
112 })
113 }
114 }
115
116 pub(crate) async fn load_index(&mut self) -> Result<()> {
117 if let Err(e) = self.index.load(self.file_size()).await {
118 warn!("error loading index: {}, regenerating", e);
119 self.index.clear();
120 self.try_regenerate_index().await?;
121 }
122 Ok(())
123 }
124
125 pub(crate) async fn from_file(
126 path: PathBuf,
127 iodriver: IoDriver,
128 config: BlobConfig,
129 ) -> Result<Self> {
130 let now = Instant::now();
131 let file = iodriver.open(&path).await?;
132 let name = FileName::from_path(&path)?;
133 info!("{} blob init started", name);
134 let size = file.size();
135
136 let header = Header::from_file(&file, &path)
137 .await
138 .with_context(|| format!("failed to read blob header. Blob file: {:?}", path))?;
139
140 let index_name = name.with_extension(BLOB_INDEX_FILE_EXTENSION);
141 let BlobConfig {
142 index: index_config,
143 validate_data_during_index_regen,
144 } = config;
145 trace!("looking for index file: [{}]", index_name);
146 let mut is_index_corrupted = false;
147 let index = if index_name.exists() {
148 trace!("file exists");
149 Index::from_file(
150 index_name.clone(),
151 index_config.clone(),
152 iodriver.clone(),
153 size,
154 )
155 .await
156 .or_else(|error| {
157 if let Some(io_error) = error.downcast_ref::<IOError>() {
158 match io_error.kind() {
159 IOErrorKind::PermissionDenied | IOErrorKind::Other => {
160 warn!(
161 "index for file '{:?}' cannot be regenerated due to an error: {}",
162 path, io_error
163 );
164 return Err(error);
165 }
166 _ => {}
167 }
168 }
169 is_index_corrupted = true;
170 Ok(Index::new(index_name, iodriver, index_config))
171 })?
172 } else {
173 trace!("file not found, create new");
174 Index::new(index_name, iodriver, index_config)
175 };
176 trace!("index initialized");
177 let header_size = bincode::serialized_size(&header)?;
178 let created_at = file.created_at()?;
179 let mut blob = Self {
180 header,
181 file,
182 name: Arc::new(name),
183 index,
184 created_at,
185 validate_data_during_index_regen,
186 };
187 trace!("call update index");
188 if is_index_corrupted || size as u64 > header_size {
189 blob.try_regenerate_index()
190 .await
191 .with_context(|| format!("failed to regenerate index for blob file: {:?}", path))?;
192 } else {
193 warn!("empty or corrupted blob: {:?}", path);
194 }
195 trace!("check data consistency");
196 Self::check_data_consistency();
197 info!(
198 "{} init finished: {}ms",
199 blob.name(),
200 now.elapsed().as_millis()
201 );
202 Ok(blob)
203 }
204
205 async fn raw_records(&self, validate_data: bool) -> Result<RawRecords> {
206 RawRecords::start(
207 self.file.clone(),
208 bincode::serialized_size(&self.header)?,
209 K::LEN as usize,
210 validate_data,
211 )
212 .await
213 .context("failed to create iterator for raw records")
214 }
215
216 pub(crate) async fn try_regenerate_index(&mut self) -> Result<()> {
217 info!("try regenerate index for blob: {}", self.name);
218 if self.index.on_disk() {
219 debug!("index already updated");
220 return Ok(());
221 }
222 debug!("index file missed");
223 let raw_r = self
224 .raw_records(self.validate_data_during_index_regen)
225 .await
226 .with_context(|| {
227 format!(
228 "failed to read raw records from blob {:?}",
229 self.name.as_path()
230 )
231 })?;
232 debug!("raw records loaded");
233 if let Some(headers) = raw_r.load().await.with_context(|| {
234 format!(
235 "load headers from blob file failed, {:?}",
236 self.name.as_path()
237 )
238 })? {
239 for header in headers {
240 let key = header.key().into();
241 self.index.push(&key, header).context("index push failed")?;
242 }
243 }
244 debug!("index successfully generated: {}", self.index.name());
245 Ok(())
246 }
247
248 pub(crate) fn check_data_consistency() {
249 }
251
252 pub(crate) async fn write(blob: &ASRwLock<Self>, key: &K, record: Record) -> Result<WriteResult> {
253 debug!("blob write");
254 let (partially_serialized, mut header) = record.to_partially_serialized_and_header()?;
255 let blob = blob.upgradable_read().await;
258 let write_result = partially_serialized.write_to_file(&blob.file).await?;
259 header.set_offset_checksum(write_result.blob_offset(), write_result.header_checksum());
260 blob.index.push(key, header)?;
261 Ok(WriteResult { dirty_bytes: blob.file.dirty_bytes() })
262 }
263
264 async fn write_mut(&mut self, key: &K, record: Record) -> Result<WriteResult> {
265 debug!("blob write");
266 let (record, mut header) = record.to_partially_serialized_and_header()?;
267 let write_result = record.write_to_file(&self.file).await?;
268 header.set_offset_checksum(write_result.blob_offset(), write_result.header_checksum());
269 self.index.push(key, header)?;
270 Ok(WriteResult { dirty_bytes: self.file.dirty_bytes() })
271 }
272
273 #[inline]
274 pub(crate) async fn read_all_entries_with_deletion_marker(
275 &self,
276 key: &K,
277 ) -> Result<Vec<Entry>> {
278 let headers = self.index.get_all_with_deletion_marker(key).await?;
279 debug_assert!(headers
280 .iter()
281 .zip(headers.iter().skip(1))
282 .all(|(x, y)| x.timestamp() >= y.timestamp()));
283 Ok(Self::headers_to_entries(headers, &self.file, &self.name))
284 }
285
286 pub(crate) async fn delete(
287 &mut self,
288 key: &K,
289 timestamp: BlobRecordTimestamp,
290 meta: Option<Meta>,
291 only_if_presented: bool,
292 ) -> Result<DeleteResult> {
293 if !only_if_presented || self.index.get_latest(key).await?.is_found() {
294 let record = Record::deleted(key, timestamp.into(), meta)?;
295 self.push_deletion_record(key, record).await
296 } else {
297 Ok(DeleteResult { dirty_bytes: self.file.dirty_bytes(), deleted: false })
298 }
299 }
300
301 async fn push_deletion_record(&mut self, key: &K, record: Record) -> Result<DeleteResult> {
302 let on_disk = self.index.on_disk();
303 if on_disk {
304 self.load_index().await?;
305 }
306 let result = self.write_mut(key, record).await?;
307 Ok(DeleteResult { dirty_bytes: result.dirty_bytes, deleted: true })
308 }
309
310 fn headers_to_entries(headers: Vec<RecordHeader>, file: &File, file_name: &Arc<FileName>) -> Vec<Entry> {
311 headers
312 .into_iter()
313 .map(|header| Entry::new(header, file.clone(), file_name.clone()))
314 .collect()
315 }
316
317 pub(crate) async fn get_latest_entry(
319 &self,
320 key: &K,
321 meta: Option<&Meta>,
322 check_filters: bool,
323 ) -> Result<ReadResult<Entry>> {
324 debug!("blob get any entry {:?}, {:?}", key, meta);
325 if check_filters && self.check_filter(key).await == FilterResult::NotContains {
326 debug!("Key was filtered out by filters");
327 Ok(ReadResult::NotFound)
328 } else if let Some(meta) = meta {
329 debug!("blob get any entry meta: {:?}", meta);
330 self.get_entry_with_meta(key, meta).await
331 } else {
332 debug!("blob get any entry bloom true no meta");
333 Ok(self
334 .index
335 .get_latest(key)
336 .await
337 .with_context(|| {
338 format!("index get any failed for blob: {:?}", self.name.as_path())
339 })?
340 .map(|header| {
341 let entry = Entry::new(header, self.file.clone(), self.name.clone());
342 debug!("blob, get any entry, bloom true no meta, entry found");
343 entry
344 }))
345 }
346 }
347
348 async fn get_entry_with_meta(&self, key: &K, meta: &Meta) -> Result<ReadResult<Entry>> {
349 let mut headers = self.index.get_all_with_deletion_marker(key).await?;
350 let deleted_ts = headers
351 .last()
352 .filter(|h| h.is_deleted())
353 .map(|h| BlobRecordTimestamp::new(h.timestamp()));
354 if deleted_ts.is_some() {
355 headers.truncate(headers.len() - 1);
356 }
357 let entries = Self::headers_to_entries(headers, &self.file, &self.name);
358 if let Some(entries) = self.filter_entries(entries, meta).await? {
359 Ok(ReadResult::Found(entries))
360 } else {
361 if let Some(ts) = deleted_ts {
362 return Ok(ReadResult::Deleted(ts));
363 }
364 Ok(ReadResult::NotFound)
365 }
366 }
367
368 async fn filter_entries(&self, entries: Vec<Entry>, meta: &Meta) -> Result<Option<Entry>> {
369 for mut entry in entries {
370 if Some(meta) == entry.load_meta().await? {
371 return Ok(Some(entry));
372 }
373 }
374 Ok(None)
375 }
376
377 #[inline]
378 pub(crate) fn file_size(&self) -> u64 {
379 self.file.size()
380 }
381
382 pub(crate) fn records_count(&self) -> usize {
383 self.index.count()
384 }
385
386 pub(crate) fn file_dirty_bytes(&self) -> u64 {
387 self.file.dirty_bytes()
388 }
389
390 pub(crate) async fn fsyncdata(&self) -> IOResult<()> {
391 self.file.fsyncdata().await
392 }
393
394 #[inline]
395 pub(crate) fn id(&self) -> usize {
396 self.name.id()
397 }
398
399 pub(crate) fn index_memory(&self) -> usize {
400 self.index.memory_used()
401 }
402
403 pub(crate) fn disk_used(&self) -> u64 {
404 self.file_size() + self.index.disk_used()
405 }
406}
407
408
409struct RawRecords {
410 current_offset: u64,
411 record_header_size: u64,
412 file: File,
413 validate_data: bool,
414}
415
416impl RawRecords {
417 async fn start(
418 file: File,
419 blob_header_size: u64,
420 key_size: usize,
421 validate_data: bool,
422 ) -> Result<Self> {
423 let current_offset = blob_header_size;
424 debug!("blob raw records start, current offset: {}", current_offset);
425 let size_of_len = bincode::serialized_size(&(0_usize))? as usize;
426 let size_of_magic_byte = bincode::serialized_size(&RECORD_MAGIC_BYTE)? as usize;
427 debug!(
428 "blob raw records start, read at: size {}, offset: {}",
429 size_of_len,
430 current_offset + size_of_len as u64
431 );
432 let buf = file
435 .read_exact_at_allocate(size_of_magic_byte + size_of_len, current_offset)
436 .await
437 .map_err(|err| err.into_bincode_if_unexpected_eof())
438 .context("Can't read BLOB header from file")?;
439 let (magic_byte_buf, key_len_buf) = buf.split_at(size_of_magic_byte);
440 debug!("blob raw records start, read at {} bytes", buf.len());
441 let magic_byte = bincode::deserialize::<u64>(magic_byte_buf)
442 .map_err(|err| Error::from(err))
443 .context("failed to deserialize magic byte")?;
444 Self::check_record_header_magic_byte(magic_byte)?;
445 let key_len = bincode::deserialize::<usize>(key_len_buf)
446 .map_err(|err| Error::from(err))
447 .context("failed to deserialize index buf vec length")?;
448 if key_len != key_size {
449 let msg = "blob key_size is not equal to pearl compile-time key size";
450 return Err(Error::validation(ValidationErrorKind::BlobKeySize, msg).into());
451 }
452 let record_header_size = RecordHeader::default().serialized_size() + key_len as u64;
453 debug!(
454 "blob raw records start, record header size: {}",
455 record_header_size
456 );
457 Ok(Self {
458 current_offset,
459 record_header_size,
460 file,
461 validate_data,
462 })
463 }
464
465 fn check_record_header_magic_byte(magic_byte: u64) -> Result<()> {
466 if magic_byte == RECORD_MAGIC_BYTE {
467 Ok(())
468 } else {
469 let param = ValidationErrorKind::RecordMagicByte;
470 Err(Error::validation(param, "First record's magic byte is wrong").into())
471 }
472 }
473
474 async fn load(mut self) -> Result<Option<Vec<RecordHeader>>> {
475 debug!("blob raw records load");
476 let mut headers = Vec::new();
477 while self.current_offset < self.file.size() {
478 let (header, data) = self
479 .read_current_record(self.validate_data)
480 .await
481 .with_context(|| {
482 format!(
483 "read record header or data failed, at {}",
484 self.current_offset
485 )
486 })?;
487 if let Some(data) = data {
488 header.data_checksum_audit(&data)
489 .with_context(|| format!("bad data checksum, at {}", self.current_offset))?;
490 }
491 headers.push(header);
492 }
493 if headers.is_empty() {
494 Ok(None)
495 } else {
496 Ok(Some(headers))
497 }
498 }
499
500 async fn read_current_record(
501 &mut self,
502 read_data: bool,
503 ) -> Result<(RecordHeader, Option<BytesMut>)> {
504 let mut buf = self
505 .file
506 .read_exact_at_allocate(self.record_header_size as usize, self.current_offset)
507 .await
508 .map_err(|err| err.into_bincode_if_unexpected_eof())
509 .with_context(|| format!("read at call failed, size {}", self.current_offset))?;
510 let header = RecordHeader::from_raw(&buf)
511 .map_err(|e| Error::from(ErrorKind::Bincode(e.to_string())))
512 .with_context(|| {
513 format!(
514 "header deserialization from raw failed, buf len: {}",
515 buf.len()
516 )
517 })?;
518 header.validate()?;
519 self.current_offset += self.record_header_size;
520 self.current_offset += header.meta_size();
521 let data = if read_data {
522 buf.resize(header.data_size() as usize, 0);
523 buf = self
524 .file
525 .read_exact_at(buf, self.current_offset)
526 .await
527 .map_err(|err| err.into_bincode_if_unexpected_eof())
528 .with_context(|| format!("read at call failed, size {}", self.current_offset))?;
529 Some(buf)
530 } else {
531 None
532 };
533 self.current_offset += header.data_size();
534 Ok((header, data))
535 }
536}
537
538#[async_trait::async_trait]
539impl<K> BloomProvider<K> for Blob<K>
540where
541 for<'a> K: Key<'a> + 'static,
542{
543 type Filter = CombinedFilter<K>;
544 async fn check_filter(&self, item: &K) -> FilterResult {
545 match self.index.contains_key_fast(item) {
546 Some(true) => { return FilterResult::NeedAdditionalCheck; },
547 Some(false) => { return FilterResult::NotContains; },
548 None => { }
549 }
550
551 self.index.get_filter().contains(&self.index, item).await
552 }
553
554 fn check_filter_fast(&self, item: &K) -> FilterResult {
555 match self.index.contains_key_fast(item) {
556 Some(true) => { return FilterResult::NeedAdditionalCheck; },
557 Some(false) => { return FilterResult::NotContains; },
558 None => { }
559 }
560
561 self.index.get_filter().contains_fast(item)
562 }
563
564 async fn offload_buffer(&mut self, _: usize, _: usize) -> usize {
565 self.index.offload_filter()
566 }
567
568 async fn get_filter(&self) -> Option<Self::Filter> {
569 Some(self.index.get_filter().clone())
570 }
571
572 fn get_filter_fast(&self) -> Option<&Self::Filter> {
573 Some(self.index.get_filter())
574 }
575
576 async fn filter_memory_allocated(&self) -> usize {
577 self.index.get_filter().memory_allocated()
578 }
579}