1use std::borrow::Cow;
37use std::fmt;
38use std::fmt::Debug;
39use std::fmt::Formatter;
40use std::fs;
41use std::fs::File;
42use std::io;
43use std::io::Seek;
44use std::io::SeekFrom;
45use std::io::Write;
46use std::ops::RangeBounds;
47use std::path::Path;
48use std::pin::Pin;
49use std::sync::Arc;
50
51use byteorder::ByteOrder;
52use byteorder::LittleEndian;
53use byteorder::WriteBytesExt;
54use minibytes::Bytes;
55use tracing::debug_span;
56use tracing::trace;
57use vlqencoding::VLQDecodeAt;
58use vlqencoding::VLQEncode;
59
60use crate::config;
61use crate::errors::IoResultExt;
62use crate::errors::ResultExt;
63use crate::index;
64use crate::index::Index;
65use crate::index::InsertKey;
66use crate::index::InsertValue;
67use crate::index::LeafValueIter;
68use crate::index::RangeIter;
69use crate::index::ReadonlyBuffer;
70use crate::lock::ScopedDirLock;
71use crate::lock::READER_LOCK_OPTS;
72use crate::utils;
73use crate::utils::mmap_path;
74use crate::utils::xxhash;
75use crate::utils::xxhash32;
76
77mod fold;
78mod meta;
79mod open_options;
80mod path;
81mod repair;
82#[cfg(test)]
83pub(crate) mod tests;
84
85pub use open_options::ChecksumType;
86pub use open_options::FlushFilterContext;
87pub use open_options::FlushFilterFunc;
88pub use open_options::FlushFilterOutput;
89pub use open_options::IndexDef;
90pub use open_options::IndexOutput;
91pub use open_options::OpenOptions;
92pub use path::GenericPath;
93
94pub use self::fold::Fold;
95pub use self::fold::FoldDef;
96use self::fold::FoldState;
97pub use self::meta::LogMetadata;
98
99pub(crate) const PRIMARY_FILE: &str = "log";
101const PRIMARY_HEADER: &[u8] = b"indexedlog0\0";
102const PRIMARY_START_OFFSET: u64 = 12; pub(crate) const META_FILE: &str = "meta";
104
105const ENTRY_FLAG_HAS_XXHASH64: u32 = 1;
106const ENTRY_FLAG_HAS_XXHASH32: u32 = 2;
107
108const INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM: u32 = 20;
110
111pub struct Log {
129 pub dir: GenericPath,
130 pub(crate) disk_buf: Bytes,
131 pub(crate) mem_buf: Pin<Box<Vec<u8>>>,
132 pub(crate) meta: LogMetadata,
133 indexes: Vec<Index>,
134 disk_folds: Vec<FoldState>,
138 all_folds: Vec<FoldState>,
139 index_corrupted: bool,
144 open_options: OpenOptions,
145 reader_lock: Option<ScopedDirLock>,
147}
148
149pub struct LogIter<'a> {
151 next_offset: u64,
152 errored: bool,
153 log: &'a Log,
154}
155
156pub struct LogLookupIter<'a> {
160 inner_iter: LeafValueIter<'a>,
161 errored: bool,
162 log: &'a Log,
163}
164
165pub struct LogRangeIter<'a> {
169 inner_iter: RangeIter<'a>,
170 errored: bool,
171 log: &'a Log,
172 index: &'a Index,
173}
174
175struct ExternalKeyBuffer {
178 disk_buf: Bytes,
179 disk_len: u64,
180
181 mem_buf: *const Vec<u8>,
213}
214
215unsafe impl Send for ExternalKeyBuffer {}
220unsafe impl Sync for ExternalKeyBuffer {}
221
222impl Log {
229 pub fn open<P: AsRef<Path>>(dir: P, index_defs: Vec<IndexDef>) -> crate::Result<Self> {
236 OpenOptions::new()
237 .index_defs(index_defs)
238 .create(true)
239 .open(dir.as_ref())
240 }
241
242 pub fn append<T: AsRef<[u8]>>(&mut self, data: T) -> crate::Result<()> {
249 let result: crate::Result<_> = (|| {
250 let data = data.as_ref();
251
252 let checksum_type = if self.open_options.checksum_type == ChecksumType::Auto {
253 const XXHASH64_THRESHOLD: usize = 88;
270 if data.len() >= XXHASH64_THRESHOLD {
271 ChecksumType::Xxhash64
272 } else {
273 ChecksumType::Xxhash32
274 }
275 } else {
276 self.open_options.checksum_type
277 };
278
279 let offset = self.meta.primary_len + self.mem_buf.len() as u64;
280
281 let mut entry_flags = 0;
289 entry_flags |= match checksum_type {
290 ChecksumType::Xxhash64 => ENTRY_FLAG_HAS_XXHASH64,
291 ChecksumType::Xxhash32 => ENTRY_FLAG_HAS_XXHASH32,
292 ChecksumType::Auto => unreachable!(),
293 };
294
295 self.mem_buf.write_vlq(entry_flags).infallible()?;
296 self.mem_buf.write_vlq(data.len()).infallible()?;
297
298 match checksum_type {
299 ChecksumType::Xxhash64 => {
300 self.mem_buf
301 .write_u64::<LittleEndian>(xxhash(data))
302 .infallible()?;
303 }
304 ChecksumType::Xxhash32 => {
305 self.mem_buf
306 .write_u32::<LittleEndian>(xxhash32(data))
307 .infallible()?;
308 }
309 ChecksumType::Auto => unreachable!(),
310 };
311 let data_offset = self.meta.primary_len + self.mem_buf.len() as u64;
312
313 self.mem_buf.write_all(data).infallible()?;
314 self.update_indexes_for_in_memory_entry(data, offset, data_offset)?;
315 self.update_fold_for_in_memory_entry(data, offset, data_offset)?;
316
317 if let Some(threshold) = self.open_options.auto_sync_threshold {
318 if self.mem_buf.len() as u64 >= threshold {
319 self.sync()
320 .context("sync triggered by auto_sync_threshold")?;
321 }
322 }
323
324 Ok(())
325 })();
326
327 result
328 .context(|| {
329 let data = data.as_ref();
330 if data.len() < 128 {
331 format!("in Log::append({:?})", data)
332 } else {
333 format!("in Log::append(<a {}-byte long slice>)", data.len())
334 }
335 })
336 .context(|| format!(" Log.dir = {:?}", self.dir))
337 }
338
339 pub fn clear_dirty(&mut self) -> crate::Result<()> {
342 let result: crate::Result<_> = (|| {
343 self.maybe_return_index_error()?;
344 for index in self.indexes.iter_mut() {
345 index.clear_dirty();
346 }
347 self.mem_buf.clear();
348 self.all_folds = self.disk_folds.clone();
349 self.update_indexes_for_on_disk_entries()?;
350 Ok(())
351 })();
352 result
353 .context("in Log::clear_dirty")
354 .context(|| format!(" Log.dir = {:?}", self.dir))
355 }
356
357 pub fn try_clone(&self) -> crate::Result<Self> {
359 self.try_clone_internal(true)
360 .context("in Log:try_clone")
361 .context(|| format!(" Log.dir = {:?}", self.dir))
362 }
363
364 pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
369 self.try_clone_internal(false)
370 .context("in Log:try_clone_without_dirty")
371 }
372
373 fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Self> {
374 self.maybe_return_index_error()?;
375
376 let mut indexes = self
378 .indexes
379 .iter()
380 .map(|i| i.try_clone_internal(copy_dirty))
381 .collect::<Result<Vec<Index>, _>>()?;
382 let disk_buf = self.disk_buf.clone();
383 let mem_buf = if copy_dirty {
384 self.mem_buf.clone()
385 } else {
386 Box::pin(Vec::new())
387 };
388
389 {
390 let mem_buf: &Vec<u8> = &mem_buf;
392 let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
393 let index_key_buf = Arc::new(ExternalKeyBuffer {
394 disk_buf: disk_buf.clone(),
395 disk_len: self.meta.primary_len,
396 mem_buf,
397 });
398 for index in indexes.iter_mut() {
399 index.key_buf = index_key_buf.clone();
400 }
401 }
402
403 let reader_lock = match self.dir.as_opt_path() {
404 Some(d) => Some(ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?),
405 None => None,
406 };
407
408 let mut log = Log {
410 dir: self.dir.clone(),
411 disk_buf,
412 mem_buf,
413 meta: self.meta.clone(),
414 indexes,
415 disk_folds: self.disk_folds.clone(),
416 all_folds: if copy_dirty {
417 &self.all_folds
418 } else {
419 &self.disk_folds
420 }
421 .clone(),
422 index_corrupted: false,
423 open_options: self.open_options.clone(),
424 reader_lock,
425 };
426
427 if !copy_dirty {
428 log.update_indexes_for_on_disk_entries()?;
431 }
432
433 Ok(log)
434 }
435
436 pub fn sync(&mut self) -> crate::Result<u64> {
452 let result: crate::Result<_> = (|| {
453 let span = debug_span!("Log::sync", dirty_bytes = self.mem_buf.len());
454 if let Some(dir) = &self.dir.as_opt_path() {
455 span.record("dir", &dir.to_string_lossy().as_ref());
456 }
457 let _guard = span.enter();
458
459 if self.dir.as_opt_path().is_none() {
460 return Ok(0);
462 }
463
464 fn check_append_only(this: &Log, new_meta: &LogMetadata) -> crate::Result<()> {
465 let old_meta = &this.meta;
466 if old_meta.primary_len > new_meta.primary_len {
467 Err(crate::Error::path(
468 this.dir.as_opt_path().unwrap(),
469 format!(
470 "on-disk log is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
471 new_meta.primary_len, old_meta.primary_len
472 ),
473 ))
474 } else {
475 Ok(())
476 }
477 }
478
479 if self.mem_buf.is_empty() {
481 if let Ok(meta) = Self::load_or_create_meta(&self.dir, false) {
482 let changed = self.meta != meta;
483 let truncated = self.meta.epoch != meta.epoch;
484 if !truncated {
485 check_append_only(self, &meta)?;
486 }
487 if changed {
489 *self = self.open_options.clone().open_internal(
494 &self.dir,
495 if truncated { None } else { Some(&self.indexes) },
496 None,
497 )?;
498 }
499 } else {
500 }
508 return Ok(self.meta.primary_len);
509 }
510
511 let dir = self.dir.as_opt_path().unwrap().to_path_buf();
514 let lock = ScopedDirLock::new(&dir)?;
515
516 let mut meta = Self::load_or_create_meta(&self.dir, false)?;
518 let changed = self.meta != meta;
519 let truncated = self.meta.epoch != meta.epoch;
520 if !truncated {
521 check_append_only(self, &meta)?;
522 }
523
524 if changed && self.open_options.flush_filter.is_some() {
526 let filter = self.open_options.flush_filter.unwrap();
527
528 let mut log = self
530 .open_options
531 .clone()
532 .open_with_lock(&self.dir, &lock)
533 .context("re-open to run flush_filter")?;
534
535 for entry in self.iter_dirty() {
536 let content = entry?;
537 let context = FlushFilterContext { log: &log };
538 match filter(&context, content)
540 .map_err(|err| crate::Error::wrap(err, "failed to run filter function"))?
541 {
542 FlushFilterOutput::Drop => {}
543 FlushFilterOutput::Keep => log.append(content)?,
544 FlushFilterOutput::Replace(content) => log.append(content)?,
545 }
546 }
547
548 *self = log;
550 } else if truncated {
551 let mut log = self
553 .open_options
554 .clone()
555 .open_with_lock(&self.dir, &lock)
556 .context(|| {
557 format!(
558 "re-open since epoch has changed ({} to {})",
559 self.meta.epoch, meta.epoch
560 )
561 })?;
562
563 for entry in self.iter_dirty() {
564 let content = entry?;
565 log.append(content)?;
566 }
567
568 *self = log;
570 }
571
572 let primary_path = self.dir.as_opt_path().unwrap().join(PRIMARY_FILE);
574 let mut primary_file = fs::OpenOptions::new()
575 .read(true)
576 .write(true)
577 .open(&primary_path)
578 .context(&primary_path, "cannot open for read-write")?;
579
580 let pos = primary_file
591 .seek(SeekFrom::Start(meta.primary_len))
592 .context(&primary_path, || {
593 format!("cannot seek to {}", meta.primary_len)
594 })?;
595 if pos != meta.primary_len {
596 let msg = format!(
597 "log file {} has {} bytes, expect at least {} bytes",
598 primary_path.to_string_lossy(),
599 pos,
600 meta.primary_len
601 );
602 let err = crate::Error::path(&primary_path, msg);
606 return Err(err);
607 }
608
609 primary_file
611 .write_all(&self.mem_buf)
612 .context(&primary_path, || {
613 format!("cannot write data ({} bytes)", self.mem_buf.len())
614 })?;
615
616 if self.open_options.fsync || config::get_global_fsync() {
617 primary_file
618 .sync_all()
619 .context(&primary_path, "cannot fsync")?;
620 }
621
622 meta.primary_len += self.mem_buf.len() as u64;
623 self.mem_buf.clear();
624
625 let (disk_buf, indexes) = Self::load_log_and_indexes(
627 &self.dir,
628 &meta,
629 &self.open_options.index_defs,
630 &self.mem_buf,
631 if changed {
632 None
634 } else {
635 Self::set_index_log_len(self.indexes.iter_mut(), meta.primary_len);
648 Some(&self.indexes)
649 },
650 self.open_options.fsync,
651 )?;
652
653 self.disk_buf = disk_buf;
654 self.indexes = indexes;
655 self.meta = meta;
656
657 self.update_indexes_for_on_disk_entries()?;
659 let lagging_index_ids = self.lagging_index_ids();
660 self.flush_lagging_indexes(&lagging_index_ids, &lock)?;
661 self.update_and_flush_disk_folds()?;
662 self.all_folds = self.disk_folds.clone();
663
664 self.dir.write_meta(&self.meta, self.open_options.fsync)?;
666
667 Ok(self.meta.primary_len)
668 })();
669
670 result
671 .context("in Log::sync")
672 .context(|| format!(" Log.dir = {:?}", self.dir))
673 }
674
675 pub(crate) fn flush_lagging_indexes(
680 &mut self,
681 index_ids: &[usize],
682 _lock: &ScopedDirLock,
683 ) -> crate::Result<()> {
684 for &index_id in index_ids.iter() {
685 let metaname = self.open_options.index_defs[index_id].metaname();
686 let new_length = self.indexes[index_id].flush();
687 let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
688 self.meta.indexes.insert(metaname, new_length);
689 trace!(
690 name = "Log::flush_lagging_index",
691 index_name = self.open_options.index_defs[index_id].name.as_str(),
692 new_index_length = new_length,
693 );
694 }
695 Ok(())
696 }
697
698 pub(crate) fn lagging_index_ids(&self) -> Vec<usize> {
701 let log_bytes = self.meta.primary_len;
702 self.open_options
703 .index_defs
704 .iter()
705 .enumerate()
706 .filter(|(i, def)| {
707 let indexed_bytes = Self::get_index_log_len(&self.indexes[*i], false).unwrap_or(0);
708 let lag_bytes = log_bytes.max(indexed_bytes) - indexed_bytes;
709 let lag_threshold = def.lag_threshold;
710 trace!(
711 name = "Log::is_index_lagging",
712 index_name = def.name.as_str(),
713 lag = lag_bytes,
714 threshold = lag_threshold
715 );
716 lag_bytes > lag_threshold
717 })
718 .map(|(i, _def)| i)
719 .collect()
720 }
721
722 pub fn is_changed(&self) -> bool {
724 match self.dir.read_meta() {
725 Ok(meta) => meta != self.meta,
726 Err(_) => true,
727 }
728 }
729
730 pub fn flush(&mut self) -> crate::Result<u64> {
732 self.sync()
733 }
734
735 pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
738 self.disk_buf.slice_to_bytes(slice)
739 }
740
741 pub fn index_slice_to_bytes(&self, index_id: usize, slice: &[u8]) -> Bytes {
744 self.indexes[index_id].slice_to_bytes(slice)
745 }
746
747 pub(crate) fn finalize_indexes(&mut self, _lock: &ScopedDirLock) -> crate::Result<()> {
753 let result: crate::Result<_> = (|| {
754 let dir = self.dir.clone();
755 if let Some(dir) = dir.as_opt_path() {
756 if !self.mem_buf.is_empty() {
757 return Err(crate::Error::programming(
758 "sync() should be called before finalize_indexes()",
759 ));
760 }
761
762 let _lock = ScopedDirLock::new(&dir)?;
763
764 let meta = Self::load_or_create_meta(&self.dir, false)?;
765 if self.meta.primary_len != meta.primary_len || self.meta.epoch != meta.epoch {
781 return Err(crate::Error::programming(format!(
782 "race detected, callsite responsible for preventing races (old meta: {:?}, new meta: {:?})",
783 &self.meta, &meta
784 )));
785 }
786 self.meta = meta;
787
788 for i in 0..self.indexes.len() {
790 let new_length = self.indexes[i].flush();
791 let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
792 let name = self.open_options.index_defs[i].metaname();
793 self.meta.indexes.insert(name, new_length);
794 }
795
796 self.dir.write_meta(&self.meta, self.open_options.fsync)?;
797 }
798 Ok(())
799 })();
800 result
801 .context("in Log::finalize_indexes")
802 .context(|| format!(" Log.dir = {:?}", self.dir))
803 }
804
805 pub fn rebuild_indexes(self, force: bool) -> crate::Result<String> {
818 let dir = self.dir.clone();
819 let result: crate::Result<_> = (|this: Log| {
820 if let Some(dir) = this.dir.clone().as_opt_path() {
821 let lock = ScopedDirLock::new(&dir)?;
822 this.rebuild_indexes_with_lock(force, &lock)
823 } else {
824 Ok(String::new())
825 }
826 })(self);
827
828 result
829 .context(|| format!("in Log::rebuild_indexes(force={})", force))
830 .context(|| format!(" Log.dir = {:?}", dir))
831 }
832
833 fn rebuild_indexes_with_lock(
834 mut self,
835 force: bool,
836 _lock: &ScopedDirLock,
837 ) -> crate::Result<String> {
838 let mut message = String::new();
839 {
840 if let Some(ref dir) = self.dir.as_opt_path() {
841 for (i, def) in self.open_options.index_defs.iter().enumerate() {
842 let name = def.name.as_str();
843
844 if let Some(index) = &self.indexes.get(i) {
845 let should_skip = if force {
846 false
847 } else {
848 match Self::get_index_log_len(index, true) {
849 Err(_) => false,
850 Ok(len) => {
851 if len > self.meta.primary_len {
852 message += &format!(
853 "Index {:?} is incompatible with (truncated) log\n",
854 name
855 );
856 false
857 } else if index.verify().is_ok() {
858 message +=
859 &format!("Index {:?} passed integrity check\n", name);
860 true
861 } else {
862 message +=
863 &format!("Index {:?} failed integrity check\n", name);
864 false
865 }
866 }
867 }
868 };
869 if should_skip {
870 continue;
871 } else {
872 self.indexes[i] = index::OpenOptions::new().create_in_memory()?;
880 }
881 }
882
883 let tmp = tempfile::NamedTempFile::new_in(dir).context(&dir, || {
884 format!("cannot create tempfile for rebuilding index {:?}", name)
885 })?;
886 let index_len = {
887 let mut index = index::OpenOptions::new()
888 .key_buf(Some(Arc::new(self.disk_buf.clone())))
889 .open(&tmp.path())?;
890 Self::update_index_for_on_disk_entry_unchecked(
891 &self.dir,
892 &mut index,
893 def,
894 &self.disk_buf,
895 self.meta.primary_len,
896 )?;
897 index.flush()?
898 };
899
900 let meta_path = dir.join(META_FILE);
903 self.meta.indexes.insert(def.metaname(), 0);
904 self.meta
905 .write_file(&meta_path, self.open_options.fsync)
906 .context(|| format!(" before replacing index {:?})", name))?;
907
908 let _ = utils::fix_perm_file(tmp.as_file(), false);
909
910 let path = dir.join(def.filename());
911 tmp.persist(&path).map_err(|e| {
912 crate::Error::wrap(Box::new(e), || {
913 format!("cannot persist tempfile to replace index {:?}", name)
914 })
915 })?;
916
917 self.meta.indexes.insert(def.metaname(), index_len);
918 self.meta
919 .write_file(&meta_path, self.open_options.fsync)
920 .context(|| format!(" after replacing index {:?}", name))?;
921 message += &format!("Rebuilt index {:?}\n", name);
922 }
923 }
924 }
925
926 Ok(message)
927 }
928
929 pub fn lookup<K: AsRef<[u8]>>(&self, index_id: usize, key: K) -> crate::Result<LogLookupIter> {
934 let result: crate::Result<_> = (|| {
935 self.maybe_return_index_error()?;
936 if let Some(index) = self.indexes.get(index_id) {
937 assert!(!key.as_ref().is_empty());
938 let link_offset = index.get(&key)?;
939 let inner_iter = link_offset.values(index);
940 Ok(LogLookupIter {
941 inner_iter,
942 errored: false,
943 log: self,
944 })
945 } else {
946 let msg = format!(
947 "invalid index_id {} (len={}, path={:?})",
948 index_id,
949 self.indexes.len(),
950 &self.dir
951 );
952 Err(crate::Error::programming(msg))
953 }
954 })();
955 result
956 .context(|| format!("in Log::lookup({}, {:?})", index_id, key.as_ref()))
957 .context(|| format!(" Log.dir = {:?}", self.dir))
958 }
959
960 pub fn lookup_prefix<K: AsRef<[u8]>>(
967 &self,
968 index_id: usize,
969 prefix: K,
970 ) -> crate::Result<LogRangeIter> {
971 let prefix = prefix.as_ref();
972 let result: crate::Result<_> = (|| {
973 let index = self.indexes.get(index_id).unwrap();
974 let inner_iter = index.scan_prefix(prefix)?;
975 Ok(LogRangeIter {
976 inner_iter,
977 errored: false,
978 log: self,
979 index,
980 })
981 })();
982 result
983 .context(|| format!("in Log::lookup_prefix({}, {:?})", index_id, prefix))
984 .context(|| format!(" Log.dir = {:?}", self.dir))
985 }
986
987 pub fn lookup_range<'a>(
996 &self,
997 index_id: usize,
998 range: impl RangeBounds<&'a [u8]>,
999 ) -> crate::Result<LogRangeIter> {
1000 let start = range.start_bound();
1001 let end = range.end_bound();
1002 let result: crate::Result<_> = (|| {
1003 let index = self.indexes.get(index_id).unwrap();
1004 let inner_iter = index.range((start, end))?;
1005 Ok(LogRangeIter {
1006 inner_iter,
1007 errored: false,
1008 log: self,
1009 index,
1010 })
1011 })();
1012 result
1013 .context(|| {
1014 format!(
1015 "in Log::lookup_range({}, {:?} to {:?})",
1016 index_id, start, end,
1017 )
1018 })
1019 .context(|| format!(" Log.dir = {:?}", self.dir))
1020 }
1021
1022 pub fn lookup_prefix_hex<K: AsRef<[u8]>>(
1029 &self,
1030 index_id: usize,
1031 hex_prefix: K,
1032 ) -> crate::Result<LogRangeIter> {
1033 let prefix = hex_prefix.as_ref();
1034 let result: crate::Result<_> = (|| {
1035 let index = self.indexes.get(index_id).unwrap();
1036 let inner_iter = index.scan_prefix_hex(prefix)?;
1037 Ok(LogRangeIter {
1038 inner_iter,
1039 errored: false,
1040 log: self,
1041 index,
1042 })
1043 })();
1044 result
1045 .context(|| format!("in Log::lookup_prefix_hex({}, {:?})", index_id, prefix))
1046 .context(|| format!(" Log.dir = {:?}", self.dir))
1047 }
1048
1049 pub fn iter(&self) -> LogIter {
1051 LogIter {
1052 log: self,
1053 next_offset: PRIMARY_START_OFFSET,
1054 errored: false,
1055 }
1056 }
1057
1058 pub fn iter_dirty(&self) -> LogIter {
1062 LogIter {
1063 log: self,
1064 next_offset: self.meta.primary_len,
1065 errored: false,
1066 }
1067 }
1068
1069 pub fn index_func<'a>(
1071 &self,
1072 index_id: usize,
1073 entry: &'a [u8],
1074 ) -> crate::Result<Vec<Cow<'a, [u8]>>> {
1075 let index_def = self.get_index_def(index_id)?;
1076 let mut result = vec![];
1077 for output in (index_def.func)(entry).into_iter() {
1078 result.push(
1079 output
1080 .into_cow(&entry)
1081 .context(|| format!("index_id = {}", index_id))?,
1082 );
1083 }
1084
1085 Ok(result)
1086 }
1087
1088 pub fn fold(&self, fold_id: usize) -> crate::Result<&dyn Fold> {
1094 match self.all_folds.get(fold_id) {
1095 Some(f) => Ok(f.fold.as_ref()),
1096 None => Err(self.fold_out_of_bound(fold_id)),
1097 }
1098 }
1099
1100 fn fold_out_of_bound(&self, fold_id: usize) -> crate::Error {
1101 let msg = format!(
1102 "fold_id {} is out of bound (len={}, dir={:?})",
1103 fold_id,
1104 self.open_options.fold_defs.len(),
1105 &self.dir
1106 );
1107 crate::Error::programming(msg)
1108 }
1109
1110 fn update_indexes_for_in_memory_entry(
1116 &mut self,
1117 data: &[u8],
1118 offset: u64,
1119 data_offset: u64,
1120 ) -> crate::Result<()> {
1121 let result = self.update_indexes_for_in_memory_entry_unchecked(data, offset, data_offset);
1122 self.maybe_set_index_error(result)
1123 }
1124
1125 fn update_fold_for_in_memory_entry(
1127 &mut self,
1128 data: &[u8],
1129 offset: u64,
1130 data_offset: u64,
1131 ) -> crate::Result<()> {
1132 for fold_state in self.all_folds.iter_mut() {
1133 fold_state.process_entry(data, offset, data_offset + data.len() as u64)?;
1134 }
1135 Ok(())
1136 }
1137
1138 fn update_indexes_for_in_memory_entry_unchecked(
1139 &mut self,
1140 data: &[u8],
1141 offset: u64,
1142 data_offset: u64,
1143 ) -> crate::Result<()> {
1144 for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1145 for index_output in (def.func)(data) {
1146 match index_output {
1147 IndexOutput::Reference(range) => {
1148 assert!(range.start <= range.end && range.end <= data.len() as u64);
1149 let start = range.start + data_offset;
1150 let end = range.end + data_offset;
1151 let key = InsertKey::Reference((start, end - start));
1152 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1153 }
1154 IndexOutput::Owned(key) => {
1155 let key = InsertKey::Embed(&key);
1156 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1157 }
1158 IndexOutput::Remove(key) => {
1159 index.remove(key)?;
1160 }
1161 IndexOutput::RemovePrefix(key) => {
1162 index.remove_prefix(key)?;
1163 }
1164 }
1165 }
1166 }
1167 Ok(())
1168 }
1169
1170 fn update_and_flush_disk_folds(&mut self) -> crate::Result<()> {
1176 let mut folds = self.open_options.empty_folds();
1177 std::mem::swap(&mut self.disk_folds, &mut folds);
1179 let result = (|| -> crate::Result<()> {
1180 for fold_state in folds.iter_mut() {
1181 fold_state.catch_up_with_log_on_disk_entries(self)?;
1182 }
1183 Ok(())
1184 })();
1185 self.disk_folds = folds;
1186 result
1187 }
1188
1189 fn update_indexes_for_on_disk_entries(&mut self) -> crate::Result<()> {
1193 let result = self.update_indexes_for_on_disk_entries_unchecked();
1194 self.maybe_set_index_error(result)
1195 }
1196
1197 fn update_indexes_for_on_disk_entries_unchecked(&mut self) -> crate::Result<()> {
1198 for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1200 Self::update_index_for_on_disk_entry_unchecked(
1201 &self.dir,
1202 index,
1203 def,
1204 &self.disk_buf,
1205 self.meta.primary_len,
1206 )?;
1207 }
1208 Ok(())
1209 }
1210
1211 fn update_index_for_on_disk_entry_unchecked(
1212 path: &GenericPath,
1213 index: &mut Index,
1214 def: &IndexDef,
1215 disk_buf: &Bytes,
1216 primary_len: u64,
1217 ) -> crate::Result<usize> {
1218 let mut offset = Self::get_index_log_len(index, true)?;
1220 let mut count = 0;
1222 while let Some(entry_result) =
1224 Self::read_entry_from_buf(&path, disk_buf, offset).context(|| {
1225 format!(
1226 "while updating index {:?} for on-disk entry at {}",
1227 def.name, offset
1228 )
1229 })?
1230 {
1231 count += 1;
1232 let data = entry_result.data;
1233 for index_output in (def.func)(data) {
1234 match index_output {
1235 IndexOutput::Reference(range) => {
1236 assert!(range.start <= range.end && range.end <= data.len() as u64);
1237 let start = range.start + entry_result.data_offset;
1238 let end = range.end + entry_result.data_offset;
1239 let key = InsertKey::Reference((start, end - start));
1240
1241 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1242 }
1243 IndexOutput::Owned(key) => {
1244 let key = InsertKey::Embed(&key);
1245 index.insert_advanced(key, InsertValue::Prepend(offset))?;
1246 }
1247 IndexOutput::Remove(key) => {
1248 index.remove(key)?;
1249 }
1250 IndexOutput::RemovePrefix(key) => {
1251 index.remove_prefix(key)?;
1252 }
1253 }
1254 }
1255 offset = entry_result.next_offset;
1256 }
1257 Self::set_index_log_len(std::iter::once(index), primary_len);
1259
1260 Ok(count)
1261 }
1262
1263 pub(crate) fn load_or_create_meta(
1269 path: &GenericPath,
1270 create: bool,
1271 ) -> crate::Result<LogMetadata> {
1272 Self::load_or_create_meta_internal(path, create)
1273 }
1274
1275 pub(crate) fn load_or_create_meta_internal(
1276 path: &GenericPath,
1277 create: bool,
1278 ) -> crate::Result<LogMetadata> {
1279 match path.read_meta() {
1280 Err(err) => {
1281 if err.io_error_kind() == io::ErrorKind::NotFound && create {
1282 let dir = path.as_opt_path().unwrap();
1283 let primary_path = dir.join(PRIMARY_FILE);
1285 let mut primary_file =
1286 File::create(&primary_path).context(&primary_path, "cannot create")?;
1287 primary_file
1288 .write_all(PRIMARY_HEADER)
1289 .context(&primary_path, "cannot write")?;
1290 let _ = utils::fix_perm_file(&primary_file, false);
1291 let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
1293 path.write_meta(&meta, false)?;
1295 Ok(meta)
1296 } else {
1297 Err(err)
1298 }
1299 }
1300 Ok(meta) => Ok(meta),
1301 }
1302 }
1303
1304 fn load_log_and_indexes(
1313 dir: &GenericPath,
1314 meta: &LogMetadata,
1315 index_defs: &[IndexDef],
1316 mem_buf: &Pin<Box<Vec<u8>>>,
1317 reuse_indexes: Option<&Vec<Index>>,
1318 fsync: bool,
1319 ) -> crate::Result<(Bytes, Vec<Index>)> {
1320 let primary_buf = match dir.as_opt_path() {
1321 Some(dir) => mmap_path(&dir.join(PRIMARY_FILE), meta.primary_len)?,
1322 None => Bytes::new(),
1323 };
1324
1325 let mem_buf: &Vec<u8> = &mem_buf;
1326 let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
1327 let key_buf = Arc::new(ExternalKeyBuffer {
1328 disk_buf: primary_buf.clone(),
1329 disk_len: meta.primary_len,
1330 mem_buf,
1331 });
1332
1333 let indexes = match reuse_indexes {
1334 None => {
1335 let mut indexes = Vec::with_capacity(index_defs.len());
1337 for def in index_defs.iter() {
1338 let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1339 indexes.push(Self::load_index(
1340 dir,
1341 &def,
1342 index_len,
1343 key_buf.clone(),
1344 fsync,
1345 )?);
1346 }
1347 indexes
1348 }
1349 Some(indexes) => {
1350 assert_eq!(index_defs.len(), indexes.len());
1351 let mut new_indexes = Vec::with_capacity(indexes.len());
1352 for (index, def) in indexes.iter().zip(index_defs) {
1355 let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1356 let index = if index_len > Self::get_index_log_len(index, true).unwrap_or(0) {
1357 Self::load_index(dir, &def, index_len, key_buf.clone(), fsync)?
1358 } else {
1359 let mut index = index.try_clone()?;
1360 index.key_buf = key_buf.clone();
1361 index
1362 };
1363 new_indexes.push(index);
1364 }
1365 new_indexes
1366 }
1367 };
1368 Ok((primary_buf, indexes))
1369 }
1370
1371 pub fn path(&self) -> &GenericPath {
1373 &self.dir
1374 }
1375
1376 fn load_index(
1378 dir: &GenericPath,
1379 def: &IndexDef,
1380 len: u64,
1381 buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
1382 fsync: bool,
1383 ) -> crate::Result<Index> {
1384 match dir.as_opt_path() {
1385 Some(dir) => {
1386 let path = dir.join(def.filename());
1387 index::OpenOptions::new()
1388 .checksum_chunk_size_logarithm(INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM)
1389 .logical_len(Some(len))
1390 .key_buf(Some(buf))
1391 .fsync(fsync)
1392 .open(path)
1393 }
1394 None => index::OpenOptions::new()
1395 .logical_len(Some(len))
1396 .key_buf(Some(buf))
1397 .fsync(fsync)
1398 .create_in_memory(),
1399 }
1400 }
1401
1402 fn read_entry(&self, offset: u64) -> crate::Result<Option<EntryResult>> {
1406 let result = if offset < self.meta.primary_len {
1407 Self::read_entry_from_buf(&self.dir, &self.disk_buf, offset)?
1408 } else {
1409 let offset = offset - self.meta.primary_len;
1410 if offset >= self.mem_buf.len() as u64 {
1411 return Ok(None);
1412 }
1413 Self::read_entry_from_buf(&self.dir, &self.mem_buf, offset)?
1414 .map(|entry_result| entry_result.offset(self.meta.primary_len))
1415 };
1416 Ok(result)
1417 }
1418
1419 fn read_entry_from_buf<'a>(
1423 path: &GenericPath,
1424 buf: &'a [u8],
1425 offset: u64,
1426 ) -> crate::Result<Option<EntryResult<'a>>> {
1427 let data_error = |msg: String| -> crate::Error {
1428 match path.as_opt_path() {
1429 Some(path) => crate::Error::corruption(path, msg),
1430 None => crate::Error::path(Path::new("<memory>"), msg),
1431 }
1432 };
1433
1434 use std::cmp::Ordering::Equal;
1435 use std::cmp::Ordering::Greater;
1436 match offset.cmp(&(buf.len() as u64)) {
1437 Equal => return Ok(None),
1438 Greater => {
1439 let msg = format!("read offset {} exceeds buffer size {}", offset, buf.len());
1440 return Err(data_error(msg));
1441 }
1442 _ => {}
1443 }
1444
1445 let (entry_flags, vlq_len): (u32, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1446 crate::Error::wrap(Box::new(e), || {
1447 format!("cannot read entry_flags at {}", offset)
1448 })
1449 .mark_corruption()
1450 })?;
1451 let offset = offset + vlq_len as u64;
1452
1453 let (data_len, vlq_len): (u64, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1455 crate::Error::wrap(Box::new(e), || {
1456 format!("cannot read data_len at {}", offset)
1457 })
1458 .mark_corruption()
1459 })?;
1460 let offset = offset + vlq_len as u64;
1461
1462 let checksum_flags = entry_flags & (ENTRY_FLAG_HAS_XXHASH64 | ENTRY_FLAG_HAS_XXHASH32);
1464 let (checksum, offset) = match checksum_flags {
1465 ENTRY_FLAG_HAS_XXHASH64 => {
1466 let checksum = LittleEndian::read_u64(
1467 &buf.get(offset as usize..offset as usize + 8)
1468 .ok_or_else(|| {
1469 data_error(format!("xxhash cannot be read at {}", offset))
1470 })?,
1471 );
1472 (checksum, offset + 8)
1473 }
1474 ENTRY_FLAG_HAS_XXHASH32 => {
1475 let checksum = LittleEndian::read_u32(
1476 &buf.get(offset as usize..offset as usize + 4)
1477 .ok_or_else(|| {
1478 data_error(format!("xxhash32 cannot be read at {}", offset))
1479 })?,
1480 ) as u64;
1481 (checksum, offset + 4)
1482 }
1483 _ => {
1484 return Err(data_error(format!(
1485 "entry at {} has malformed checksum metadata",
1486 offset
1487 )));
1488 }
1489 };
1490
1491 let end = offset + data_len;
1493 if end > buf.len() as u64 {
1494 return Err(data_error(format!("incomplete entry data at {}", offset)));
1495 }
1496 let data = &buf[offset as usize..end as usize];
1497
1498 let verified = match checksum_flags {
1499 0 => true,
1500 ENTRY_FLAG_HAS_XXHASH64 => xxhash(&data) == checksum,
1501 ENTRY_FLAG_HAS_XXHASH32 => xxhash32(&data) as u64 == checksum,
1502 _ => unreachable!(),
1504 };
1505 if verified {
1506 Ok(Some(EntryResult {
1507 data,
1508 data_offset: offset,
1509 next_offset: end,
1510 }))
1511 } else {
1512 Err(data_error(format!("integrity check failed at {}", offset)))
1513 }
1514 }
1515
1516 #[inline]
1519 fn maybe_set_index_error<T>(&mut self, result: crate::Result<T>) -> crate::Result<T> {
1520 if result.is_err() && !self.index_corrupted {
1521 self.index_corrupted = true;
1522 }
1523 result
1524 }
1525
1526 #[inline]
1529 fn maybe_return_index_error(&self) -> crate::Result<()> {
1530 if self.index_corrupted {
1531 let msg = "index is corrupted".to_string();
1532 Err(self.corruption(msg))
1533 } else {
1534 Ok(())
1535 }
1536 }
1537
1538 fn get_index_log_len(index: &Index, consider_dirty: bool) -> crate::Result<u64> {
1548 let index_meta = if consider_dirty {
1549 index.get_meta()
1550 } else {
1551 index.get_original_meta()
1552 };
1553 Ok(if index_meta.is_empty() {
1554 PRIMARY_START_OFFSET
1556 } else {
1557 index_meta
1558 .read_vlq_at(0)
1559 .context(&index.path, || {
1560 format!(
1561 "index metadata cannot be parsed as an integer: {:?}",
1562 &index_meta
1563 )
1564 })?
1565 .0
1566 })
1567 }
1568
1569 fn set_index_log_len<'a>(indexes: impl Iterator<Item = &'a mut Index>, len: u64) {
1573 let mut index_meta = Vec::new();
1574 index_meta.write_vlq(len).unwrap();
1575 for index in indexes {
1576 index.set_meta(&index_meta);
1577 }
1578 }
1579}
1580
1581impl Log {
1584 fn get_index(&self, index_id: usize) -> crate::Result<&Index> {
1586 self.indexes.get(index_id).ok_or_else(|| {
1587 let msg = format!(
1588 "index_id {} is out of bound (len={}, dir={:?})",
1589 index_id,
1590 self.indexes.len(),
1591 &self.dir
1592 );
1593 crate::Error::programming(msg)
1594 })
1595 }
1596
1597 fn get_index_def(&self, index_id: usize) -> crate::Result<&IndexDef> {
1599 self.open_options.index_defs.get(index_id).ok_or_else(|| {
1600 let msg = format!(
1601 "index_id {} is out of bound (len={}, dir={:?})",
1602 index_id,
1603 self.indexes.len(),
1604 &self.dir
1605 );
1606 crate::Error::programming(msg)
1607 })
1608 }
1609
1610 fn corruption(&self, message: String) -> crate::Error {
1611 let path: &Path = match self.dir.as_opt_path() {
1612 Some(ref path) => &path,
1613 None => Path::new("<memory>"),
1614 };
1615 crate::Error::corruption(path, message)
1616 }
1617}
1618
1619struct EntryResult<'a> {
1621 data: &'a [u8],
1622 data_offset: u64,
1623 next_offset: u64,
1624}
1625
1626impl<'a> EntryResult<'a> {
1627 fn offset(self, offset: u64) -> EntryResult<'a> {
1629 EntryResult {
1630 data: self.data,
1631 data_offset: self.data_offset,
1634 next_offset: self.next_offset + offset,
1635 }
1636 }
1637}
1638
1639impl<'a> Iterator for LogLookupIter<'a> {
1640 type Item = crate::Result<&'a [u8]>;
1641
1642 fn next(&mut self) -> Option<Self::Item> {
1643 if self.errored {
1644 return None;
1645 }
1646 match self.inner_iter.next() {
1647 None => None,
1648 Some(Err(err)) => {
1649 self.errored = true;
1650 Some(Err(err))
1651 }
1652 Some(Ok(offset)) => match self
1653 .log
1654 .read_entry(offset)
1655 .context("in LogLookupIter::next")
1656 {
1657 Ok(Some(entry)) => Some(Ok(entry.data)),
1658 Ok(None) => None,
1659 Err(err) => {
1660 Some(Err(err))
1668 }
1669 },
1670 }
1671 }
1672}
1673
1674impl<'a> LogLookupIter<'a> {
1675 pub fn into_vec(self) -> crate::Result<Vec<&'a [u8]>> {
1677 self.collect()
1678 }
1679}
1680
1681impl<'a> Iterator for LogIter<'a> {
1682 type Item = crate::Result<&'a [u8]>;
1683
1684 fn next(&mut self) -> Option<Self::Item> {
1685 if self.errored {
1686 return None;
1687 }
1688 match self
1689 .log
1690 .read_entry(self.next_offset)
1691 .context("in LogIter::next")
1692 {
1693 Err(e) => {
1694 self.errored = true;
1695 Some(Err(e))
1696 }
1697 Ok(Some(entry_result)) => {
1698 assert!(entry_result.next_offset > self.next_offset);
1699 self.next_offset = entry_result.next_offset;
1700 Some(Ok(entry_result.data))
1701 }
1702 Ok(None) => None,
1703 }
1704 }
1705}
1706
1707impl<'a> LogRangeIter<'a> {
1708 fn wrap_inner_next_result(
1710 &mut self,
1711 item: Option<crate::Result<(Cow<'a, [u8]>, index::LinkOffset)>>,
1712 ) -> Option<crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>> {
1713 match item {
1714 None => None,
1715 Some(Err(err)) => {
1716 self.errored = true;
1717 Some(Err(err))
1718 }
1719 Some(Ok((key, link_offset))) => {
1720 let iter = LogLookupIter {
1721 inner_iter: link_offset.values(self.index),
1722 errored: false,
1723 log: self.log,
1724 };
1725 Some(Ok((key, iter)))
1726 }
1727 }
1728 }
1729}
1730
1731impl<'a> Iterator for LogRangeIter<'a> {
1732 type Item = crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>;
1733
1734 fn next(&mut self) -> Option<Self::Item> {
1735 if self.errored {
1736 return None;
1737 }
1738 let inner = self.inner_iter.next();
1739 self.wrap_inner_next_result(inner)
1740 }
1741}
1742
1743impl<'a> DoubleEndedIterator for LogRangeIter<'a> {
1744 fn next_back(&mut self) -> Option<Self::Item> {
1745 if self.errored {
1746 return None;
1747 }
1748 let inner = self.inner_iter.next_back();
1749 self.wrap_inner_next_result(inner)
1750 }
1751}
1752
1753impl Debug for Log {
1754 fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
1755 let mut count = 0;
1756 let mut iter = self.iter();
1757 let bytes_per_line = 16;
1758 loop {
1759 let offset = iter.next_offset;
1760 count += 1;
1761 match iter.next() {
1762 None => break,
1763 Some(Ok(bytes)) => {
1764 if count > 1 {
1765 write!(f, "\n")?;
1766 }
1767 write!(f, "# Entry {}:\n", count)?;
1768 for (i, chunk) in bytes.chunks(bytes_per_line).enumerate() {
1769 write!(f, "{:08x}:", offset as usize + i * bytes_per_line)?;
1770 for b in chunk {
1771 write!(f, " {:02x}", b)?;
1772 }
1773 for _ in chunk.len()..bytes_per_line {
1774 write!(f, " ")?;
1775 }
1776 write!(f, " ")?;
1777 for &b in chunk {
1778 let ch = match b {
1779 0x20..=0x7e => b as char, _ => '.',
1781 };
1782 write!(f, "{}", ch)?;
1783 }
1784 write!(f, "\n")?;
1785 }
1786 }
1787 Some(Err(err)) => writeln!(f, "# Error: {:?}", err)?,
1788 }
1789 }
1790 Ok(())
1791 }
1792}
1793
1794impl ReadonlyBuffer for ExternalKeyBuffer {
1795 #[inline]
1796 fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
1797 if start < self.disk_len {
1798 self.disk_buf.get((start as usize)..(start + len) as usize)
1799 } else {
1800 let start = start - self.disk_len;
1801 let mem_buf = unsafe { &*self.mem_buf };
1804 mem_buf.get((start as usize)..(start + len) as usize)
1805 }
1806 }
1807}