1use std::borrow::Cow;
37use std::fmt;
38use std::fmt::Debug;
39use std::fmt::Formatter;
40use std::fs;
41use std::fs::File;
42use std::io;
43use std::io::Seek;
44use std::io::SeekFrom;
45use std::io::Write;
46use std::ops::RangeBounds;
47use std::path::Path;
48use std::pin::Pin;
49use std::sync::Arc;
50
51use byteorder::ByteOrder;
52use byteorder::LittleEndian;
53use byteorder::WriteBytesExt;
54use minibytes::Bytes;
55use tracing::debug_span;
56use tracing::trace;
57use vlqencoding::VLQDecodeAt;
58use vlqencoding::VLQEncode;
59
60use crate::change_detect::SharedChangeDetector;
61use crate::config;
62use crate::errors::IoResultExt;
63use crate::errors::ResultExt;
64use crate::index;
65use crate::index::Index;
66use crate::index::InsertKey;
67use crate::index::InsertValue;
68use crate::index::LeafValueIter;
69use crate::index::RangeIter;
70use crate::index::ReadonlyBuffer;
71use crate::lock::ScopedDirLock;
72use crate::lock::READER_LOCK_OPTS;
73use crate::utils;
74use crate::utils::mmap_path;
75use crate::utils::xxhash;
76use crate::utils::xxhash32;
77
78mod fold;
79mod meta;
80mod open_options;
81mod path;
82mod repair;
83#[cfg(test)]
84pub(crate) mod tests;
85mod wait;
86
87pub use open_options::ChecksumType;
88pub use open_options::FlushFilterContext;
89pub use open_options::FlushFilterFunc;
90pub use open_options::FlushFilterOutput;
91pub use open_options::IndexDef;
92pub use open_options::IndexOutput;
93pub use open_options::OpenOptions;
94pub use path::GenericPath;
95
96pub use self::fold::Fold;
97pub use self::fold::FoldDef;
98use self::fold::FoldState;
99pub use self::meta::LogMetadata;
100pub use self::wait::Wait;
101
102pub(crate) const PRIMARY_FILE: &str = "log";
104const PRIMARY_HEADER: &[u8] = b"indexedlog0\0";
105const PRIMARY_START_OFFSET: u64 = 12; pub(crate) const META_FILE: &str = "meta";
107
108const ENTRY_FLAG_HAS_XXHASH64: u32 = 1;
109const ENTRY_FLAG_HAS_XXHASH32: u32 = 2;
110
111const INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM: u32 = 20;
113
114pub struct Log {
132 pub dir: GenericPath,
133 pub(crate) disk_buf: Bytes,
134 pub(crate) mem_buf: Pin<Box<Vec<u8>>>,
135 pub(crate) meta: LogMetadata,
136 indexes: Vec<Index>,
137 disk_folds: Vec<FoldState>,
141 all_folds: Vec<FoldState>,
142 index_corrupted: bool,
147 open_options: OpenOptions,
148 reader_lock: Option<ScopedDirLock>,
150 change_detector: Option<SharedChangeDetector>,
152}
153
154pub struct LogIter<'a> {
156 next_offset: u64,
157 errored: bool,
158 log: &'a Log,
159}
160
161pub struct LogLookupIter<'a> {
165 inner_iter: LeafValueIter<'a>,
166 errored: bool,
167 log: &'a Log,
168}
169
170pub struct LogRangeIter<'a> {
174 inner_iter: RangeIter<'a>,
175 errored: bool,
176 log: &'a Log,
177 index: &'a Index,
178}
179
180struct ExternalKeyBuffer {
183 disk_buf: Bytes,
184 disk_len: u64,
185
186 mem_buf: *const Vec<u8>,
218}
219
220unsafe impl Send for ExternalKeyBuffer {}
225unsafe impl Sync for ExternalKeyBuffer {}
226
227impl Log {
234 pub fn open<P: AsRef<Path>>(dir: P, index_defs: Vec<IndexDef>) -> crate::Result<Self> {
241 OpenOptions::new()
242 .index_defs(index_defs)
243 .create(true)
244 .open(dir.as_ref())
245 }
246
247 pub fn append<T: AsRef<[u8]>>(&mut self, data: T) -> crate::Result<()> {
254 let result: crate::Result<_> = (|| {
255 let data = data.as_ref();
256
257 let checksum_type = if self.open_options.checksum_type == ChecksumType::Auto {
258 const XXHASH64_THRESHOLD: usize = 88;
275 if data.len() >= XXHASH64_THRESHOLD {
276 ChecksumType::Xxhash64
277 } else {
278 ChecksumType::Xxhash32
279 }
280 } else {
281 self.open_options.checksum_type
282 };
283
284 let offset = self.meta.primary_len + self.mem_buf.len() as u64;
285
286 let mut entry_flags = 0;
294 entry_flags |= match checksum_type {
295 ChecksumType::Xxhash64 => ENTRY_FLAG_HAS_XXHASH64,
296 ChecksumType::Xxhash32 => ENTRY_FLAG_HAS_XXHASH32,
297 ChecksumType::Auto => unreachable!(),
298 };
299
300 self.mem_buf.write_vlq(entry_flags).infallible()?;
301 self.mem_buf.write_vlq(data.len()).infallible()?;
302
303 match checksum_type {
304 ChecksumType::Xxhash64 => {
305 self.mem_buf
306 .write_u64::<LittleEndian>(xxhash(data))
307 .infallible()?;
308 }
309 ChecksumType::Xxhash32 => {
310 self.mem_buf
311 .write_u32::<LittleEndian>(xxhash32(data))
312 .infallible()?;
313 }
314 ChecksumType::Auto => unreachable!(),
315 };
316 let data_offset = self.meta.primary_len + self.mem_buf.len() as u64;
317
318 self.mem_buf.write_all(data).infallible()?;
319 self.update_indexes_for_in_memory_entry(data, offset, data_offset)?;
320 self.update_fold_for_in_memory_entry(data, offset, data_offset)?;
321
322 if let Some(threshold) = self.open_options.auto_sync_threshold {
323 if self.mem_buf.len() as u64 >= threshold {
324 self.sync()
325 .context("sync triggered by auto_sync_threshold")?;
326 }
327 }
328
329 Ok(())
330 })();
331
332 result
333 .context(|| {
334 let data = data.as_ref();
335 if data.len() < 128 {
336 format!("in Log::append({:?})", data)
337 } else {
338 format!("in Log::append(<a {}-byte long slice>)", data.len())
339 }
340 })
341 .context(|| format!(" Log.dir = {:?}", self.dir))
342 }
343
344 pub fn clear_dirty(&mut self) -> crate::Result<()> {
347 let result: crate::Result<_> = (|| {
348 self.maybe_return_index_error()?;
349 for index in self.indexes.iter_mut() {
350 index.clear_dirty();
351 }
352 self.mem_buf.clear();
353 self.all_folds = self.disk_folds.clone();
354 self.update_indexes_for_on_disk_entries()?;
355 Ok(())
356 })();
357 result
358 .context("in Log::clear_dirty")
359 .context(|| format!(" Log.dir = {:?}", self.dir))
360 }
361
362 pub fn try_clone(&self) -> crate::Result<Self> {
364 self.try_clone_internal(true)
365 .context("in Log:try_clone")
366 .context(|| format!(" Log.dir = {:?}", self.dir))
367 }
368
369 pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
374 self.try_clone_internal(false)
375 .context("in Log:try_clone_without_dirty")
376 }
377
378 fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Self> {
379 self.maybe_return_index_error()?;
380
381 let mut indexes = self
383 .indexes
384 .iter()
385 .map(|i| i.try_clone_internal(copy_dirty))
386 .collect::<Result<Vec<Index>, _>>()?;
387 let disk_buf = self.disk_buf.clone();
388 let mem_buf = if copy_dirty {
389 self.mem_buf.clone()
390 } else {
391 Box::pin(Vec::new())
392 };
393
394 {
395 let mem_buf: &Vec<u8> = &mem_buf;
397 let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
398 let index_key_buf = Arc::new(ExternalKeyBuffer {
399 disk_buf: disk_buf.clone(),
400 disk_len: self.meta.primary_len,
401 mem_buf,
402 });
403 for index in indexes.iter_mut() {
404 index.key_buf = index_key_buf.clone();
405 }
406 }
407
408 let reader_lock = match self.dir.as_opt_path() {
409 Some(d) => Some(ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?),
410 None => None,
411 };
412
413 let mut log = Log {
415 dir: self.dir.clone(),
416 disk_buf,
417 mem_buf,
418 meta: self.meta.clone(),
419 indexes,
420 disk_folds: self.disk_folds.clone(),
421 all_folds: if copy_dirty {
422 &self.all_folds
423 } else {
424 &self.disk_folds
425 }
426 .clone(),
427 index_corrupted: false,
428 open_options: self.open_options.clone(),
429 reader_lock,
430 change_detector: self.change_detector.clone(),
431 };
432
433 if !copy_dirty {
434 log.update_indexes_for_on_disk_entries()?;
437 }
438
439 Ok(log)
440 }
441
442 pub fn sync(&mut self) -> crate::Result<u64> {
458 let result: crate::Result<_> = (|| {
459 let span = debug_span!("Log::sync", dirty_bytes = self.mem_buf.len());
460 if let Some(dir) = &self.dir.as_opt_path() {
461 span.record("dir", dir.to_string_lossy().as_ref());
462 }
463 let _guard = span.enter();
464
465 if self.dir.as_opt_path().is_none() {
466 return Ok(0);
468 }
469
470 fn check_append_only(this: &Log, new_meta: &LogMetadata) -> crate::Result<()> {
471 let old_meta = &this.meta;
472 if old_meta.primary_len > new_meta.primary_len {
473 Err(crate::Error::path(
474 this.dir.as_opt_path().unwrap(),
475 format!(
476 "on-disk log is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
477 new_meta.primary_len, old_meta.primary_len
478 ),
479 ))
480 } else {
481 Ok(())
482 }
483 }
484
485 if self.mem_buf.is_empty() {
487 if let Ok(meta) = Self::load_or_create_meta(&self.dir, false) {
488 let changed = self.meta != meta;
489 let truncated = self.meta.epoch != meta.epoch;
490 if !truncated {
491 check_append_only(self, &meta)?;
492 }
493 if changed {
495 *self = self.open_options.clone().open_internal(
500 &self.dir,
501 if truncated { None } else { Some(&self.indexes) },
502 None,
503 )?;
504 }
505 } else {
506 }
514 self.update_change_detector_to_match_meta();
515 return Ok(self.meta.primary_len);
516 }
517
518 let dir = self.dir.as_opt_path().unwrap().to_path_buf();
521 let lock = ScopedDirLock::new(&dir)?;
522
523 let mut meta = Self::load_or_create_meta(&self.dir, false)?;
525 let changed = self.meta != meta;
526 let truncated = self.meta.epoch != meta.epoch;
527 if !truncated {
528 check_append_only(self, &meta)?;
529 }
530
531 if changed && self.open_options.flush_filter.is_some() {
533 let filter = self.open_options.flush_filter.unwrap();
534
535 let mut log = self
537 .open_options
538 .clone()
539 .open_with_lock(&self.dir, &lock)
540 .context("re-open to run flush_filter")?;
541
542 for entry in self.iter_dirty() {
543 let content = entry?;
544 let context = FlushFilterContext { log: &log };
545 match filter(&context, content)
547 .map_err(|err| crate::Error::wrap(err, "failed to run filter function"))?
548 {
549 FlushFilterOutput::Drop => {}
550 FlushFilterOutput::Keep => log.append(content)?,
551 FlushFilterOutput::Replace(content) => log.append(content)?,
552 }
553 }
554
555 *self = log;
557 } else if truncated {
558 let mut log = self
560 .open_options
561 .clone()
562 .open_with_lock(&self.dir, &lock)
563 .context(|| {
564 format!(
565 "re-open since epoch has changed ({} to {})",
566 self.meta.epoch, meta.epoch
567 )
568 })?;
569
570 for entry in self.iter_dirty() {
571 let content = entry?;
572 log.append(content)?;
573 }
574
575 *self = log;
577 }
578
579 let primary_path = self.dir.as_opt_path().unwrap().join(PRIMARY_FILE);
581 let mut primary_file = fs::OpenOptions::new()
582 .read(true)
583 .write(true)
584 .open(&primary_path)
585 .context(&primary_path, "cannot open for read-write")?;
586
587 let pos = primary_file
598 .seek(SeekFrom::Start(meta.primary_len))
599 .context(&primary_path, || {
600 format!("cannot seek to {}", meta.primary_len)
601 })?;
602 if pos != meta.primary_len {
603 let msg = format!(
604 "log file {} has {} bytes, expect at least {} bytes",
605 primary_path.to_string_lossy(),
606 pos,
607 meta.primary_len
608 );
609 let err = crate::Error::path(&primary_path, msg);
613 return Err(err);
614 }
615
616 primary_file
618 .write_all(&self.mem_buf)
619 .context(&primary_path, || {
620 format!("cannot write data ({} bytes)", self.mem_buf.len())
621 })?;
622
623 if self.open_options.fsync || config::get_global_fsync() {
624 primary_file
625 .sync_all()
626 .context(&primary_path, "cannot fsync")?;
627 }
628
629 meta.primary_len += self.mem_buf.len() as u64;
630 self.mem_buf.clear();
631
632 let (disk_buf, indexes) = Self::load_log_and_indexes(
634 &self.dir,
635 &meta,
636 &self.open_options.index_defs,
637 &self.mem_buf,
638 if changed {
639 None
641 } else {
642 Self::set_index_log_len(self.indexes.iter_mut(), meta.primary_len);
655 Some(&self.indexes)
656 },
657 self.open_options.fsync,
658 )?;
659
660 self.disk_buf = disk_buf;
661 self.indexes = indexes;
662 self.meta = meta;
663
664 self.update_indexes_for_on_disk_entries()?;
666 let lagging_index_ids = self.lagging_index_ids();
667 self.flush_lagging_indexes(&lagging_index_ids, &lock)?;
668 self.update_and_flush_disk_folds()?;
669 self.all_folds = self.disk_folds.clone();
670
671 self.dir.write_meta(&self.meta, self.open_options.fsync)?;
673
674 self.update_change_detector_to_match_meta();
676
677 Ok(self.meta.primary_len)
678 })();
679
680 result
681 .context("in Log::sync")
682 .context(|| format!(" Log.dir = {:?}", self.dir))
683 }
684
685 pub(crate) fn update_change_detector_to_match_meta(&self) {
686 if let Some(detector) = &self.change_detector {
687 detector.set(self.meta.primary_len ^ self.meta.epoch);
688 }
689 }
690
691 pub(crate) fn flush_lagging_indexes(
696 &mut self,
697 index_ids: &[usize],
698 _lock: &ScopedDirLock,
699 ) -> crate::Result<()> {
700 for &index_id in index_ids.iter() {
701 let metaname = self.open_options.index_defs[index_id].metaname();
702 let new_length = self.indexes[index_id].flush();
703 let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
704 self.meta.indexes.insert(metaname, new_length);
705 trace!(
706 name = "Log::flush_lagging_index",
707 index_name = self.open_options.index_defs[index_id].name.as_str(),
708 new_index_length = new_length,
709 );
710 }
711 Ok(())
712 }
713
714 pub(crate) fn lagging_index_ids(&self) -> Vec<usize> {
717 let log_bytes = self.meta.primary_len;
718 self.open_options
719 .index_defs
720 .iter()
721 .enumerate()
722 .filter(|(i, def)| {
723 let indexed_bytes = Self::get_index_log_len(&self.indexes[*i], false).unwrap_or(0);
724 let lag_bytes = log_bytes.max(indexed_bytes) - indexed_bytes;
725 let lag_threshold = def.lag_threshold;
726 trace!(
727 name = "Log::is_index_lagging",
728 index_name = def.name.as_str(),
729 lag = lag_bytes,
730 threshold = lag_threshold
731 );
732 lag_bytes > lag_threshold
733 })
734 .map(|(i, _def)| i)
735 .collect()
736 }
737
738 pub fn is_changed_on_disk(&self) -> bool {
748 match &self.change_detector {
749 None => false,
750 Some(detector) => detector.is_changed(),
751 }
752 }
753
754 pub fn flush(&mut self) -> crate::Result<u64> {
756 self.sync()
757 }
758
759 pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
762 self.disk_buf.slice_to_bytes(slice)
763 }
764
765 pub fn index_slice_to_bytes(&self, index_id: usize, slice: &[u8]) -> Bytes {
768 self.indexes[index_id].slice_to_bytes(slice)
769 }
770
771 pub(crate) fn finalize_indexes(&mut self, _lock: &ScopedDirLock) -> crate::Result<()> {
777 let result: crate::Result<_> = (|| {
778 let dir = self.dir.clone();
779 if let Some(dir) = dir.as_opt_path() {
780 if !self.mem_buf.is_empty() {
781 return Err(crate::Error::programming(
782 "sync() should be called before finalize_indexes()",
783 ));
784 }
785
786 let _lock = ScopedDirLock::new(dir)?;
787
788 let meta = Self::load_or_create_meta(&self.dir, false)?;
789 if self.meta.primary_len != meta.primary_len || self.meta.epoch != meta.epoch {
805 return Err(crate::Error::programming(format!(
806 "race detected, callsite responsible for preventing races (old meta: {:?}, new meta: {:?})",
807 &self.meta, &meta
808 )));
809 }
810 self.meta = meta;
811
812 for i in 0..self.indexes.len() {
814 let new_length = self.indexes[i].flush();
815 let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
816 let name = self.open_options.index_defs[i].metaname();
817 self.meta.indexes.insert(name, new_length);
818 }
819
820 self.dir.write_meta(&self.meta, self.open_options.fsync)?;
821 }
822 Ok(())
823 })();
824 result
825 .context("in Log::finalize_indexes")
826 .context(|| format!(" Log.dir = {:?}", self.dir))
827 }
828
829 pub fn rebuild_indexes(self, force: bool) -> crate::Result<String> {
842 let dir = self.dir.clone();
843 let result: crate::Result<_> = (|this: Log| {
844 if let Some(dir) = this.dir.clone().as_opt_path() {
845 let lock = ScopedDirLock::new(dir)?;
846 this.rebuild_indexes_with_lock(force, &lock)
847 } else {
848 Ok(String::new())
849 }
850 })(self);
851
852 result
853 .context(|| format!("in Log::rebuild_indexes(force={})", force))
854 .context(|| format!(" Log.dir = {:?}", dir))
855 }
856
857 fn rebuild_indexes_with_lock(
858 mut self,
859 force: bool,
860 _lock: &ScopedDirLock,
861 ) -> crate::Result<String> {
862 let mut message = String::new();
863 {
864 if let Some(ref dir) = self.dir.as_opt_path() {
865 for (i, def) in self.open_options.index_defs.iter().enumerate() {
866 let name = def.name.as_str();
867
868 if let Some(index) = &self.indexes.get(i) {
869 let should_skip = if force {
870 false
871 } else {
872 match Self::get_index_log_len(index, true) {
873 Err(_) => false,
874 Ok(len) => {
875 if len > self.meta.primary_len {
876 message += &format!(
877 "Index {:?} is incompatible with (truncated) log\n",
878 name
879 );
880 false
881 } else if index.verify().is_ok() {
882 message +=
883 &format!("Index {:?} passed integrity check\n", name);
884 true
885 } else {
886 message +=
887 &format!("Index {:?} failed integrity check\n", name);
888 false
889 }
890 }
891 }
892 };
893 if should_skip {
894 continue;
895 } else {
896 self.indexes[i] = index::OpenOptions::new().create_in_memory()?;
904 }
905 }
906
907 let tmp = tempfile::NamedTempFile::new_in(dir).context(dir, || {
908 format!("cannot create tempfile for rebuilding index {:?}", name)
909 })?;
910 let index_len = {
911 let mut index = index::OpenOptions::new()
912 .key_buf(Some(Arc::new(self.disk_buf.clone())))
913 .open(tmp.path())?;
914 Self::update_index_for_on_disk_entry_unchecked(
915 &self.dir,
916 &mut index,
917 def,
918 &self.disk_buf,
919 self.meta.primary_len,
920 )?;
921 index.flush()?
922 };
923
924 let meta_path = dir.join(META_FILE);
927 self.meta.indexes.insert(def.metaname(), 0);
928 self.meta
929 .write_file(&meta_path, self.open_options.fsync)
930 .context(|| format!(" before replacing index {:?})", name))?;
931
932 let _ = utils::fix_perm_file(tmp.as_file(), false);
933
934 let path = dir.join(def.filename());
935 tmp.persist(&path).map_err(|e| {
936 crate::Error::wrap(Box::new(e), || {
937 format!("cannot persist tempfile to replace index {:?}", name)
938 })
939 })?;
940
941 self.meta.indexes.insert(def.metaname(), index_len);
942 self.meta
943 .write_file(&meta_path, self.open_options.fsync)
944 .context(|| format!(" after replacing index {:?}", name))?;
945 message += &format!("Rebuilt index {:?}\n", name);
946 }
947 }
948 }
949
950 Ok(message)
951 }
952
953 pub fn lookup<K: AsRef<[u8]>>(&self, index_id: usize, key: K) -> crate::Result<LogLookupIter> {
958 let result: crate::Result<_> = (|| {
959 self.maybe_return_index_error()?;
960 if let Some(index) = self.indexes.get(index_id) {
961 assert!(!key.as_ref().is_empty());
962 let link_offset = index.get(&key)?;
963 let inner_iter = link_offset.values(index);
964 Ok(LogLookupIter {
965 inner_iter,
966 errored: false,
967 log: self,
968 })
969 } else {
970 let msg = format!(
971 "invalid index_id {} (len={}, path={:?})",
972 index_id,
973 self.indexes.len(),
974 &self.dir
975 );
976 Err(crate::Error::programming(msg))
977 }
978 })();
979 result
980 .context(|| format!("in Log::lookup({}, {:?})", index_id, key.as_ref()))
981 .context(|| format!(" Log.dir = {:?}", self.dir))
982 }
983
984 pub fn lookup_prefix<K: AsRef<[u8]>>(
991 &self,
992 index_id: usize,
993 prefix: K,
994 ) -> crate::Result<LogRangeIter> {
995 let prefix = prefix.as_ref();
996 let result: crate::Result<_> = (|| {
997 let index = self.indexes.get(index_id).unwrap();
998 let inner_iter = index.scan_prefix(prefix)?;
999 Ok(LogRangeIter {
1000 inner_iter,
1001 errored: false,
1002 log: self,
1003 index,
1004 })
1005 })();
1006 result
1007 .context(|| format!("in Log::lookup_prefix({}, {:?})", index_id, prefix))
1008 .context(|| format!(" Log.dir = {:?}", self.dir))
1009 }
1010
1011 pub fn lookup_range<'a>(
1020 &self,
1021 index_id: usize,
1022 range: impl RangeBounds<&'a [u8]>,
1023 ) -> crate::Result<LogRangeIter> {
1024 let start = range.start_bound();
1025 let end = range.end_bound();
1026 let result: crate::Result<_> = (|| {
1027 let index = self.indexes.get(index_id).unwrap();
1028 let inner_iter = index.range((start, end))?;
1029 Ok(LogRangeIter {
1030 inner_iter,
1031 errored: false,
1032 log: self,
1033 index,
1034 })
1035 })();
1036 result
1037 .context(|| {
1038 format!(
1039 "in Log::lookup_range({}, {:?} to {:?})",
1040 index_id, start, end,
1041 )
1042 })
1043 .context(|| format!(" Log.dir = {:?}", self.dir))
1044 }
1045
1046 pub fn lookup_prefix_hex<K: AsRef<[u8]>>(
1053 &self,
1054 index_id: usize,
1055 hex_prefix: K,
1056 ) -> crate::Result<LogRangeIter> {
1057 let prefix = hex_prefix.as_ref();
1058 let result: crate::Result<_> = (|| {
1059 let index = self.indexes.get(index_id).unwrap();
1060 let inner_iter = index.scan_prefix_hex(prefix)?;
1061 Ok(LogRangeIter {
1062 inner_iter,
1063 errored: false,
1064 log: self,
1065 index,
1066 })
1067 })();
1068 result
1069 .context(|| format!("in Log::lookup_prefix_hex({}, {:?})", index_id, prefix))
1070 .context(|| format!(" Log.dir = {:?}", self.dir))
1071 }
1072
1073 pub fn iter(&self) -> LogIter {
1075 LogIter {
1076 log: self,
1077 next_offset: PRIMARY_START_OFFSET,
1078 errored: false,
1079 }
1080 }
1081
1082 pub fn iter_dirty(&self) -> LogIter {
1086 LogIter {
1087 log: self,
1088 next_offset: self.meta.primary_len,
1089 errored: false,
1090 }
1091 }
1092
1093 pub fn index_func<'a>(
1095 &self,
1096 index_id: usize,
1097 entry: &'a [u8],
1098 ) -> crate::Result<Vec<Cow<'a, [u8]>>> {
1099 let index_def = self.get_index_def(index_id)?;
1100 let mut result = vec![];
1101 for output in (index_def.func)(entry).into_iter() {
1102 result.push(
1103 output
1104 .into_cow(entry)
1105 .context(|| format!("index_id = {}", index_id))?,
1106 );
1107 }
1108
1109 Ok(result)
1110 }
1111
1112 pub fn fold(&self, fold_id: usize) -> crate::Result<&dyn Fold> {
1118 match self.all_folds.get(fold_id) {
1119 Some(f) => Ok(f.fold.as_ref()),
1120 None => Err(self.fold_out_of_bound(fold_id)),
1121 }
1122 }
1123
1124 fn fold_out_of_bound(&self, fold_id: usize) -> crate::Error {
1125 let msg = format!(
1126 "fold_id {} is out of bound (len={}, dir={:?})",
1127 fold_id,
1128 self.open_options.fold_defs.len(),
1129 &self.dir
1130 );
1131 crate::Error::programming(msg)
1132 }
1133
1134 fn update_indexes_for_in_memory_entry(
1140 &mut self,
1141 data: &[u8],
1142 offset: u64,
1143 data_offset: u64,
1144 ) -> crate::Result<()> {
1145 let result = self.update_indexes_for_in_memory_entry_unchecked(data, offset, data_offset);
1146 self.maybe_set_index_error(result)
1147 }
1148
1149 fn update_fold_for_in_memory_entry(
1151 &mut self,
1152 data: &[u8],
1153 offset: u64,
1154 data_offset: u64,
1155 ) -> crate::Result<()> {
1156 for fold_state in self.all_folds.iter_mut() {
1157 fold_state.process_entry(data, offset, data_offset + data.len() as u64)?;
1158 }
1159 Ok(())
1160 }
1161
1162 fn update_indexes_for_in_memory_entry_unchecked(
1163 &mut self,
1164 data: &[u8],
1165 offset: u64,
1166 data_offset: u64,
1167 ) -> crate::Result<()> {
1168 for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1169 for index_output in (def.func)(data) {
1170 match index_output {
1171 IndexOutput::Reference(range) => {
1172 assert!(range.start <= range.end && range.end <= data.len() as u64);
1173 let start = range.start + data_offset;
1174 let end = range.end + data_offset;
1175 let key = InsertKey::Reference((start, end - start));
1176 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1177 }
1178 IndexOutput::Owned(key) => {
1179 let key = InsertKey::Embed(&key);
1180 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1181 }
1182 IndexOutput::Remove(key) => {
1183 index.remove(key)?;
1184 }
1185 IndexOutput::RemovePrefix(key) => {
1186 index.remove_prefix(key)?;
1187 }
1188 }
1189 }
1190 }
1191 Ok(())
1192 }
1193
1194 fn update_and_flush_disk_folds(&mut self) -> crate::Result<()> {
1200 let mut folds = self.open_options.empty_folds();
1201 std::mem::swap(&mut self.disk_folds, &mut folds);
1203 let result = (|| -> crate::Result<()> {
1204 for fold_state in folds.iter_mut() {
1205 fold_state.catch_up_with_log_on_disk_entries(self)?;
1206 }
1207 Ok(())
1208 })();
1209 self.disk_folds = folds;
1210 result
1211 }
1212
1213 fn update_indexes_for_on_disk_entries(&mut self) -> crate::Result<()> {
1217 let result = self.update_indexes_for_on_disk_entries_unchecked();
1218 self.maybe_set_index_error(result)
1219 }
1220
1221 fn update_indexes_for_on_disk_entries_unchecked(&mut self) -> crate::Result<()> {
1222 for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1224 Self::update_index_for_on_disk_entry_unchecked(
1225 &self.dir,
1226 index,
1227 def,
1228 &self.disk_buf,
1229 self.meta.primary_len,
1230 )?;
1231 }
1232 Ok(())
1233 }
1234
1235 fn update_index_for_on_disk_entry_unchecked(
1236 path: &GenericPath,
1237 index: &mut Index,
1238 def: &IndexDef,
1239 disk_buf: &Bytes,
1240 primary_len: u64,
1241 ) -> crate::Result<usize> {
1242 let mut offset = Self::get_index_log_len(index, true)?;
1244 let mut count = 0;
1246 while let Some(entry_result) =
1248 Self::read_entry_from_buf(path, disk_buf, offset).context(|| {
1249 format!(
1250 "while updating index {:?} for on-disk entry at {}",
1251 def.name, offset
1252 )
1253 })?
1254 {
1255 count += 1;
1256 let data = entry_result.data;
1257 for index_output in (def.func)(data) {
1258 match index_output {
1259 IndexOutput::Reference(range) => {
1260 assert!(range.start <= range.end && range.end <= data.len() as u64);
1261 let start = range.start + entry_result.data_offset;
1262 let end = range.end + entry_result.data_offset;
1263 let key = InsertKey::Reference((start, end - start));
1264
1265 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1266 }
1267 IndexOutput::Owned(key) => {
1268 let key = InsertKey::Embed(&key);
1269 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1270 }
1271 IndexOutput::Remove(key) => {
1272 index.remove(key)?;
1273 }
1274 IndexOutput::RemovePrefix(key) => {
1275 index.remove_prefix(key)?;
1276 }
1277 }
1278 }
1279 offset = entry_result.next_offset;
1280 }
1281 Self::set_index_log_len(std::iter::once(index), primary_len);
1283
1284 Ok(count)
1285 }
1286
1287 pub(crate) fn load_or_create_meta(
1293 path: &GenericPath,
1294 create: bool,
1295 ) -> crate::Result<LogMetadata> {
1296 Self::load_or_create_meta_internal(path, create)
1297 }
1298
1299 pub(crate) fn load_or_create_meta_internal(
1300 path: &GenericPath,
1301 create: bool,
1302 ) -> crate::Result<LogMetadata> {
1303 match path.read_meta() {
1304 Err(err) => {
1305 if err.io_error_kind() == io::ErrorKind::NotFound && create {
1306 let dir = path.as_opt_path().unwrap();
1307 let primary_path = dir.join(PRIMARY_FILE);
1309 let mut primary_file =
1310 File::create(&primary_path).context(&primary_path, "cannot create")?;
1311 primary_file
1312 .write_all(PRIMARY_HEADER)
1313 .context(&primary_path, "cannot write")?;
1314 let _ = utils::fix_perm_file(&primary_file, false);
1315 let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
1317 path.write_meta(&meta, false)?;
1319 Ok(meta)
1320 } else {
1321 Err(err)
1322 }
1323 }
1324 Ok(meta) => Ok(meta),
1325 }
1326 }
1327
1328 fn load_log_and_indexes(
1337 dir: &GenericPath,
1338 meta: &LogMetadata,
1339 index_defs: &[IndexDef],
1340 mem_buf: &Pin<Box<Vec<u8>>>,
1341 reuse_indexes: Option<&Vec<Index>>,
1342 fsync: bool,
1343 ) -> crate::Result<(Bytes, Vec<Index>)> {
1344 let primary_buf = match dir.as_opt_path() {
1345 Some(dir) => mmap_path(&dir.join(PRIMARY_FILE), meta.primary_len)?,
1346 None => Bytes::new(),
1347 };
1348
1349 let mem_buf: &Vec<u8> = mem_buf;
1350 let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
1351 let key_buf = Arc::new(ExternalKeyBuffer {
1352 disk_buf: primary_buf.clone(),
1353 disk_len: meta.primary_len,
1354 mem_buf,
1355 });
1356
1357 let indexes = match reuse_indexes {
1358 None => {
1359 let mut indexes = Vec::with_capacity(index_defs.len());
1361 for def in index_defs.iter() {
1362 let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1363 indexes.push(Self::load_index(
1364 dir,
1365 def,
1366 index_len,
1367 key_buf.clone(),
1368 fsync,
1369 )?);
1370 }
1371 indexes
1372 }
1373 Some(indexes) => {
1374 assert_eq!(index_defs.len(), indexes.len());
1375 let mut new_indexes = Vec::with_capacity(indexes.len());
1376 for (index, def) in indexes.iter().zip(index_defs) {
1379 let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1380 let index = if index_len > Self::get_index_log_len(index, true).unwrap_or(0) {
1381 Self::load_index(dir, def, index_len, key_buf.clone(), fsync)?
1382 } else {
1383 let mut index = index.try_clone()?;
1384 index.key_buf = key_buf.clone();
1385 index
1386 };
1387 new_indexes.push(index);
1388 }
1389 new_indexes
1390 }
1391 };
1392 Ok((primary_buf, indexes))
1393 }
1394
1395 pub fn path(&self) -> &GenericPath {
1397 &self.dir
1398 }
1399
1400 pub fn version(&self) -> (u64, u64) {
1408 (self.meta.epoch, self.meta.primary_len)
1409 }
1410
1411 fn load_index(
1413 dir: &GenericPath,
1414 def: &IndexDef,
1415 len: u64,
1416 buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
1417 fsync: bool,
1418 ) -> crate::Result<Index> {
1419 match dir.as_opt_path() {
1420 Some(dir) => {
1421 let path = dir.join(def.filename());
1422 index::OpenOptions::new()
1423 .checksum_chunk_size_logarithm(INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM)
1424 .logical_len(Some(len))
1425 .key_buf(Some(buf))
1426 .fsync(fsync)
1427 .open(path)
1428 }
1429 None => index::OpenOptions::new()
1430 .logical_len(Some(len))
1431 .key_buf(Some(buf))
1432 .fsync(fsync)
1433 .create_in_memory(),
1434 }
1435 }
1436
1437 fn read_entry(&self, offset: u64) -> crate::Result<Option<EntryResult>> {
1441 let result = if offset < self.meta.primary_len {
1442 let entry = Self::read_entry_from_buf(&self.dir, &self.disk_buf, offset)?;
1443 if let Some(ref entry) = entry {
1444 crate::page_out::adjust_available(-(entry.data.len() as i64));
1445 }
1446 entry
1447 } else {
1448 let offset = offset - self.meta.primary_len;
1449 if offset >= self.mem_buf.len() as u64 {
1450 return Ok(None);
1451 }
1452 Self::read_entry_from_buf(&self.dir, &self.mem_buf, offset)?
1453 .map(|entry_result| entry_result.offset(self.meta.primary_len))
1454 };
1455 Ok(result)
1456 }
1457
1458 fn read_entry_from_buf<'a>(
1462 path: &GenericPath,
1463 buf: &'a [u8],
1464 offset: u64,
1465 ) -> crate::Result<Option<EntryResult<'a>>> {
1466 let data_error = |msg: String| -> crate::Error {
1467 match path.as_opt_path() {
1468 Some(path) => crate::Error::corruption(path, msg),
1469 None => crate::Error::path(Path::new("<memory>"), msg),
1470 }
1471 };
1472
1473 use std::cmp::Ordering::Equal;
1474 use std::cmp::Ordering::Greater;
1475 match offset.cmp(&(buf.len() as u64)) {
1476 Equal => return Ok(None),
1477 Greater => {
1478 let msg = format!("read offset {} exceeds buffer size {}", offset, buf.len());
1479 return Err(data_error(msg));
1480 }
1481 _ => {}
1482 }
1483
1484 let (entry_flags, vlq_len): (u32, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1485 crate::Error::wrap(Box::new(e), || {
1486 format!("cannot read entry_flags at {}", offset)
1487 })
1488 .mark_corruption()
1489 })?;
1490 let offset = offset + vlq_len as u64;
1491
1492 let (data_len, vlq_len): (u64, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1494 crate::Error::wrap(Box::new(e), || {
1495 format!("cannot read data_len at {}", offset)
1496 })
1497 .mark_corruption()
1498 })?;
1499 let offset = offset + vlq_len as u64;
1500
1501 let checksum_flags = entry_flags & (ENTRY_FLAG_HAS_XXHASH64 | ENTRY_FLAG_HAS_XXHASH32);
1503 let (checksum, offset) = match checksum_flags {
1504 ENTRY_FLAG_HAS_XXHASH64 => {
1505 let checksum = LittleEndian::read_u64(
1506 buf.get(offset as usize..offset as usize + 8)
1507 .ok_or_else(|| {
1508 data_error(format!("xxhash cannot be read at {}", offset))
1509 })?,
1510 );
1511 (checksum, offset + 8)
1512 }
1513 ENTRY_FLAG_HAS_XXHASH32 => {
1514 let checksum = LittleEndian::read_u32(
1515 buf.get(offset as usize..offset as usize + 4)
1516 .ok_or_else(|| {
1517 data_error(format!("xxhash32 cannot be read at {}", offset))
1518 })?,
1519 ) as u64;
1520 (checksum, offset + 4)
1521 }
1522 _ => {
1523 return Err(data_error(format!(
1524 "entry at {} has malformed checksum metadata",
1525 offset
1526 )));
1527 }
1528 };
1529
1530 let end = offset + data_len;
1532 if end > buf.len() as u64 {
1533 return Err(data_error(format!("incomplete entry data at {}", offset)));
1534 }
1535 let data = &buf[offset as usize..end as usize];
1536
1537 let verified = match checksum_flags {
1538 0 => true,
1539 ENTRY_FLAG_HAS_XXHASH64 => xxhash(data) == checksum,
1540 ENTRY_FLAG_HAS_XXHASH32 => xxhash32(data) as u64 == checksum,
1541 _ => unreachable!(),
1543 };
1544 if verified {
1545 Ok(Some(EntryResult {
1546 data,
1547 data_offset: offset,
1548 next_offset: end,
1549 }))
1550 } else {
1551 Err(data_error(format!("integrity check failed at {}", offset)))
1552 }
1553 }
1554
1555 #[inline]
1558 fn maybe_set_index_error<T>(&mut self, result: crate::Result<T>) -> crate::Result<T> {
1559 if result.is_err() && !self.index_corrupted {
1560 self.index_corrupted = true;
1561 }
1562 result
1563 }
1564
1565 #[inline]
1568 fn maybe_return_index_error(&self) -> crate::Result<()> {
1569 if self.index_corrupted {
1570 let msg = "index is corrupted".to_string();
1571 Err(self.corruption(msg))
1572 } else {
1573 Ok(())
1574 }
1575 }
1576
1577 fn get_index_log_len(index: &Index, consider_dirty: bool) -> crate::Result<u64> {
1587 let index_meta = if consider_dirty {
1588 index.get_meta()
1589 } else {
1590 index.get_original_meta()
1591 };
1592 Ok(if index_meta.is_empty() {
1593 PRIMARY_START_OFFSET
1595 } else {
1596 index_meta
1597 .read_vlq_at(0)
1598 .context(&index.path, || {
1599 format!(
1600 "index metadata cannot be parsed as an integer: {:?}",
1601 &index_meta
1602 )
1603 })?
1604 .0
1605 })
1606 }
1607
1608 fn set_index_log_len<'a>(indexes: impl Iterator<Item = &'a mut Index>, len: u64) {
1612 let mut index_meta = Vec::new();
1613 index_meta.write_vlq(len).unwrap();
1614 for index in indexes {
1615 index.set_meta(&index_meta);
1616 }
1617 }
1618}
1619
1620impl Log {
1623 fn get_index(&self, index_id: usize) -> crate::Result<&Index> {
1625 self.indexes.get(index_id).ok_or_else(|| {
1626 let msg = format!(
1627 "index_id {} is out of bound (len={}, dir={:?})",
1628 index_id,
1629 self.indexes.len(),
1630 &self.dir
1631 );
1632 crate::Error::programming(msg)
1633 })
1634 }
1635
1636 fn get_index_def(&self, index_id: usize) -> crate::Result<&IndexDef> {
1638 self.open_options.index_defs.get(index_id).ok_or_else(|| {
1639 let msg = format!(
1640 "index_id {} is out of bound (len={}, dir={:?})",
1641 index_id,
1642 self.indexes.len(),
1643 &self.dir
1644 );
1645 crate::Error::programming(msg)
1646 })
1647 }
1648
1649 fn corruption(&self, message: String) -> crate::Error {
1650 let path: &Path = match self.dir.as_opt_path() {
1651 Some(path) => path,
1652 None => Path::new("<memory>"),
1653 };
1654 crate::Error::corruption(path, message)
1655 }
1656}
1657
1658struct EntryResult<'a> {
1660 data: &'a [u8],
1661 data_offset: u64,
1662 next_offset: u64,
1663}
1664
1665impl<'a> EntryResult<'a> {
1666 fn offset(self, offset: u64) -> EntryResult<'a> {
1668 EntryResult {
1669 data: self.data,
1670 data_offset: self.data_offset,
1673 next_offset: self.next_offset + offset,
1674 }
1675 }
1676}
1677
1678impl<'a> Iterator for LogLookupIter<'a> {
1679 type Item = crate::Result<&'a [u8]>;
1680
1681 fn next(&mut self) -> Option<Self::Item> {
1682 if self.errored {
1683 return None;
1684 }
1685 match self.inner_iter.next() {
1686 None => None,
1687 Some(Err(err)) => {
1688 self.errored = true;
1689 Some(Err(err))
1690 }
1691 Some(Ok(offset)) => match self
1692 .log
1693 .read_entry(offset)
1694 .context("in LogLookupIter::next")
1695 {
1696 Ok(Some(entry)) => Some(Ok(entry.data)),
1697 Ok(None) => None,
1698 Err(err) => {
1699 Some(Err(err))
1707 }
1708 },
1709 }
1710 }
1711}
1712
1713impl<'a> LogLookupIter<'a> {
1714 pub fn into_vec(self) -> crate::Result<Vec<&'a [u8]>> {
1716 self.collect()
1717 }
1718
1719 pub fn is_empty(&self) -> bool {
1720 self.inner_iter.is_empty()
1721 }
1722}
1723
1724impl<'a> Iterator for LogIter<'a> {
1725 type Item = crate::Result<&'a [u8]>;
1726
1727 fn next(&mut self) -> Option<Self::Item> {
1728 if self.errored {
1729 return None;
1730 }
1731 match self
1732 .log
1733 .read_entry(self.next_offset)
1734 .context("in LogIter::next")
1735 {
1736 Err(e) => {
1737 self.errored = true;
1738 Some(Err(e))
1739 }
1740 Ok(Some(entry_result)) => {
1741 assert!(entry_result.next_offset > self.next_offset);
1742 self.next_offset = entry_result.next_offset;
1743 Some(Ok(entry_result.data))
1744 }
1745 Ok(None) => None,
1746 }
1747 }
1748}
1749
1750impl<'a> LogRangeIter<'a> {
1751 fn wrap_inner_next_result(
1753 &mut self,
1754 item: Option<crate::Result<(Cow<'a, [u8]>, index::LinkOffset)>>,
1755 ) -> Option<crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>> {
1756 match item {
1757 None => None,
1758 Some(Err(err)) => {
1759 self.errored = true;
1760 Some(Err(err))
1761 }
1762 Some(Ok((key, link_offset))) => {
1763 let iter = LogLookupIter {
1764 inner_iter: link_offset.values(self.index),
1765 errored: false,
1766 log: self.log,
1767 };
1768 Some(Ok((key, iter)))
1769 }
1770 }
1771 }
1772}
1773
1774impl<'a> Iterator for LogRangeIter<'a> {
1775 type Item = crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>;
1776
1777 fn next(&mut self) -> Option<Self::Item> {
1778 if self.errored {
1779 return None;
1780 }
1781 let inner = self.inner_iter.next();
1782 self.wrap_inner_next_result(inner)
1783 }
1784}
1785
1786impl<'a> DoubleEndedIterator for LogRangeIter<'a> {
1787 fn next_back(&mut self) -> Option<Self::Item> {
1788 if self.errored {
1789 return None;
1790 }
1791 let inner = self.inner_iter.next_back();
1792 self.wrap_inner_next_result(inner)
1793 }
1794}
1795
1796impl Debug for Log {
1797 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
1798 let mut count = 0;
1799 let mut iter = self.iter();
1800 let bytes_per_line = 16;
1801 loop {
1802 let offset = iter.next_offset;
1803 count += 1;
1804 match iter.next() {
1805 None => break,
1806 Some(Ok(bytes)) => {
1807 if count > 1 {
1808 write!(f, "\n")?;
1809 }
1810 write!(f, "# Entry {}:\n", count)?;
1811 for (i, chunk) in bytes.chunks(bytes_per_line).enumerate() {
1812 write!(f, "{:08x}:", offset as usize + i * bytes_per_line)?;
1813 for b in chunk {
1814 write!(f, " {:02x}", b)?;
1815 }
1816 for _ in chunk.len()..bytes_per_line {
1817 write!(f, " ")?;
1818 }
1819 write!(f, " ")?;
1820 for &b in chunk {
1821 let ch = match b {
1822 0x20..=0x7e => b as char, _ => '.',
1824 };
1825 write!(f, "{}", ch)?;
1826 }
1827 write!(f, "\n")?;
1828 }
1829 }
1830 Some(Err(err)) => writeln!(f, "# Error: {:?}", err)?,
1831 }
1832 }
1833 Ok(())
1834 }
1835}
1836
1837impl ReadonlyBuffer for ExternalKeyBuffer {
1838 #[inline]
1839 fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
1840 if start < self.disk_len {
1841 self.disk_buf.get((start as usize)..(start + len) as usize)
1842 } else {
1843 let start = start - self.disk_len;
1844 let mem_buf = unsafe { &*self.mem_buf };
1847 mem_buf.get((start as usize)..(start + len) as usize)
1848 }
1849 }
1850}