1use std::fmt;
11use std::fs;
12use std::io;
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::atomic::AtomicUsize;
16use std::sync::atomic::Ordering::SeqCst;
17
18use minibytes::Bytes;
19use once_cell::sync::OnceCell;
20use tracing::debug;
21use tracing::debug_span;
22use tracing::trace;
23
24use crate::change_detect::SharedChangeDetector;
25use crate::errors::IoResultExt;
26use crate::errors::ResultExt;
27use crate::lock::ScopedDirLock;
28use crate::lock::READER_LOCK_OPTS;
29use crate::log;
30use crate::log::FlushFilterContext;
31use crate::log::FlushFilterFunc;
32use crate::log::FlushFilterOutput;
33use crate::log::IndexDef;
34use crate::log::Log;
35use crate::repair::OpenOptionsOutput;
36use crate::repair::OpenOptionsRepair;
37use crate::repair::RepairMessage;
38use crate::utils;
39
40pub struct RotateLog {
45 dir: Option<PathBuf>,
46 open_options: OpenOptions,
47 logs: Vec<OnceCell<Log>>,
48 logs_len: AtomicUsize,
51 latest: u8,
52 reader_lock: Option<ScopedDirLock>,
54 change_detector: Option<SharedChangeDetector>,
55 #[cfg(test)]
57 hook_after_log_sync: Option<Box<dyn Fn()>>,
58}
59
60const LATEST_FILE: &str = "latest";
65
66#[derive(Clone)]
68pub struct OpenOptions {
69 pub(crate) max_bytes_per_log: u64,
70 pub(crate) max_log_count: u8,
71 pub(crate) log_open_options: log::OpenOptions,
72 pub(crate) auto_sync_threshold: Option<u64>,
73}
74
75impl OpenOptions {
76 #[allow(clippy::new_without_default)]
77 pub fn new() -> Self {
86 let max_log_count = 2;
88 let max_bytes_per_log = 2_000_000_000; Self {
90 max_bytes_per_log,
91 max_log_count,
92 log_open_options: log::OpenOptions::new(),
93 auto_sync_threshold: None,
94 }
95 }
96
97 pub fn max_log_count(mut self, count: u8) -> Self {
101 assert!(count >= 1);
102 self.max_log_count = count;
103 self
104 }
105
106 pub fn max_bytes_per_log(mut self, bytes: u64) -> Self {
108 assert!(bytes > 0);
109 self.max_bytes_per_log = bytes;
110 self
111 }
112
113 pub fn checksum_type(mut self, checksum_type: log::ChecksumType) -> Self {
117 self.log_open_options = self.log_open_options.checksum_type(checksum_type);
118 self
119 }
120
121 pub fn create(mut self, create: bool) -> Self {
123 self.log_open_options = self.log_open_options.create(create);
124 self
125 }
126
127 pub fn index(mut self, name: &'static str, func: fn(&[u8]) -> Vec<log::IndexOutput>) -> Self {
129 self.log_open_options = self.log_open_options.index(name, func);
130 self
131 }
132
133 pub fn index_defs(mut self, index_defs: Vec<IndexDef>) -> Self {
137 self.log_open_options = self.log_open_options.index_defs(index_defs);
138 self
139 }
140
141 pub fn flush_filter(mut self, flush_filter: Option<FlushFilterFunc>) -> Self {
149 self.log_open_options = self.log_open_options.flush_filter(flush_filter);
150 self
151 }
152
153 pub fn auto_sync_threshold(mut self, threshold: impl Into<Option<u64>>) -> Self {
158 self.auto_sync_threshold = threshold.into();
159 self
160 }
161
162 pub fn open(&self, dir: impl AsRef<Path>) -> crate::Result<RotateLog> {
164 let dir = dir.as_ref();
165 let result: crate::Result<_> = (|| {
166 let reader_lock = ScopedDirLock::new_with_options(dir, &READER_LOCK_OPTS)?;
167 let change_detector = reader_lock.shared_change_detector()?;
168 let span = debug_span!("RotateLog::open", dir = &dir.to_string_lossy().as_ref());
169 let _guard = span.enter();
170
171 let latest_and_log = read_latest_and_logs(dir, self);
172
173 let (latest, logs) = match latest_and_log {
174 Ok((latest, logs)) => (latest, logs),
175 Err(e) => {
176 if !self.log_open_options.create {
177 return Err(e)
178 .context("not creating new logs since OpenOption::create is not set");
179 } else {
180 utils::mkdir_p(dir)?;
181 let lock = ScopedDirLock::new(dir)?;
182
183 match read_latest_raw(dir) {
184 Ok(latest) => {
185 match read_logs(dir, self, latest) {
186 Ok(logs) => {
187 (latest, logs)
189 }
190 Err(err) => {
191 let latest = latest.wrapping_add(1);
194 match create_empty_log(Some(dir), self, latest, &lock) {
195 Ok(new_log) => {
196 if let Ok(logs) = read_logs(dir, self, latest) {
197 (latest, logs)
198 } else {
199 (latest, vec![create_log_cell(new_log)])
200 }
201 }
202 Err(new_log_err) => {
203 let msg = "cannot create new empty log after failing to read existing logs";
204 return Err(new_log_err.message(msg).source(err));
205 }
206 }
207 }
208 }
209 }
210 Err(err) => {
211 if err.kind() == io::ErrorKind::NotFound {
212 let latest = 0;
216 let new_log = create_empty_log(Some(dir), self, latest, &lock)?;
217 (latest, vec![create_log_cell(new_log)])
218 } else {
219 let corrupted = err.kind() == io::ErrorKind::InvalidData;
224 let mut result = Err(err).context(dir, "cannot read 'latest'");
225 if corrupted {
226 result = result.corruption();
227 }
228 return result;
229 }
230 }
231 }
232 }
233 }
234 };
235
236 let logs_len = AtomicUsize::new(logs.len());
237 let mut rotate_log = RotateLog {
238 dir: Some(dir.into()),
239 open_options: self.clone(),
240 logs,
241 logs_len,
242 latest,
243 reader_lock: Some(reader_lock),
244 change_detector: Some(change_detector),
245 #[cfg(test)]
246 hook_after_log_sync: None,
247 };
248 rotate_log.update_change_detector_to_match_meta();
249 Ok(rotate_log)
250 })();
251
252 result.context(|| format!("in rotate::OpenOptions::open({:?})", dir))
253 }
254
255 pub fn create_in_memory(&self) -> crate::Result<RotateLog> {
257 let result: crate::Result<_> = (|| {
258 let cell = create_log_cell(self.log_open_options.open(())?);
259 let mut logs = Vec::with_capacity(1);
260 logs.push(cell);
261 let logs_len = AtomicUsize::new(logs.len());
262 Ok(RotateLog {
263 dir: None,
264 open_options: self.clone(),
265 logs,
266 logs_len,
267 latest: 0,
268 reader_lock: None,
269 change_detector: None,
270 #[cfg(test)]
271 hook_after_log_sync: None,
272 })
273 })();
274 result.context("in rotate::OpenOptions::create_in_memory")
275 }
276
277 pub fn repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
281 let dir = dir.as_ref();
282 (|| -> crate::Result<_> {
283 let _lock = ScopedDirLock::new(dir)?;
284
285 let mut message = RepairMessage::new(dir);
286 message += &format!("Processing RotateLog: {:?}\n", dir);
287 let read_dir = dir.read_dir().context(dir, "cannot readdir")?;
288 let mut ids = Vec::new();
289
290 for entry in read_dir {
291 let entry = entry.context(dir, "cannot readdir")?;
292 let name = entry.file_name();
293 if let Some(name) = name.to_str() {
294 if let Ok(id) = name.parse::<u8>() {
295 ids.push(id);
296 }
297 }
298 }
299
300 ids.sort_unstable();
301 for &id in ids.iter() {
302 let name = id.to_string();
303 message += &format!("Attempt to repair log {:?}\n", name);
304 match self.log_open_options.repair(&dir.join(name)) {
305 Ok(log) => message += &log,
306 Err(err) => message += &format!("Failed: {}\n", err),
307 }
308 }
309
310 let latest_path = dir.join(LATEST_FILE);
311 match read_latest_raw(dir) {
312 Ok(latest) => message += &format!("Latest = {}\n", latest),
313 Err(err) => match err.kind() {
314 io::ErrorKind::NotFound
315 | io::ErrorKind::InvalidData
316 | io::ErrorKind::UnexpectedEof => {
317 let latest = guess_latest(ids);
318 let content = format!("{}", latest);
319 let fsync = false;
320 utils::atomic_write(&latest_path, content, fsync)?;
321 message += &format!("Reset latest to {}\n", latest);
322 }
323 _ => return Err(err).context(&latest_path, "cannot read or parse"),
324 },
325 };
326
327 Ok(message.into_string())
328 })()
329 .context(|| format!("in rotate::OpenOptions::repair({:?})", dir))
330 }
331}
332
333impl OpenOptionsRepair for OpenOptions {
334 fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
335 OpenOptions::repair(self, dir.as_ref())
336 }
337}
338
339impl OpenOptionsOutput for OpenOptions {
340 type Output = RotateLog;
341
342 fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
343 self.open(path)
344 }
345}
346
347impl fmt::Debug for OpenOptions {
348 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349 write!(f, "OpenOptions {{ ")?;
350 write!(f, "max_bytes_per_log: {}, ", self.max_bytes_per_log)?;
351 write!(f, "max_log_count: {}, ", self.max_log_count)?;
352 write!(f, "auto_sync_threshold: {:?}, ", self.auto_sync_threshold)?;
353 write!(f, "log_open_options: {:?} }}", &self.log_open_options)?;
354 Ok(())
355 }
356}
357
358impl RotateLog {
359 pub fn append(&mut self, data: impl AsRef<[u8]>) -> crate::Result<()> {
361 (|| -> crate::Result<_> {
362 let threshold = self.open_options.auto_sync_threshold;
363 let log = self.writable_log();
364 log.append(data)?;
365 if let Some(threshold) = threshold {
366 if log.mem_buf.len() as u64 >= threshold {
367 self.sync()
368 .context("sync triggered by auto_sync_threshold")?;
369 }
370 }
371 Ok(())
372 })()
373 .context("in RotateLog::append")
374 }
375
376 pub fn lookup(
379 &self,
380 index_id: usize,
381 key: impl Into<Bytes>,
382 ) -> crate::Result<RotateLogLookupIter> {
383 let key = key.into();
384 let result: crate::Result<_> = (|| {
385 Ok(RotateLogLookupIter {
386 inner_iter: self.logs[0].get().unwrap().lookup(index_id, &key)?,
387 end: false,
388 log_rotate: self,
389 log_index: 0,
390 index_id,
391 key: key.clone(),
392 })
393 })();
394 result
395 .context(|| format!("in RotateLog::lookup({}, {:?})", index_id, key.as_ref()))
396 .context(|| format!(" RotateLog.dir = {:?}", self.dir))
397 }
398
399 pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
404 for log in &self.logs {
405 if let Some(log) = log.get() {
406 if log.disk_buf.range_of_slice(slice).is_some() {
407 return log.slice_to_bytes(slice);
408 }
409 }
410 }
411 Bytes::copy_from_slice(slice)
412 }
413
414 pub fn lookup_latest(
426 &self,
427 index_id: usize,
428 key: impl AsRef<[u8]>,
429 ) -> crate::Result<log::LogLookupIter> {
430 let key = key.as_ref();
431 assert!(
432 self.open_options.log_open_options.flush_filter.is_some(),
433 "programming error: flush_filter should also be set"
434 );
435 self.logs[0]
436 .get()
437 .unwrap()
438 .lookup(index_id, key)
439 .context(|| format!("in RotateLog::lookup_latest({}, {:?})", index_id, key))
440 .context(|| format!(" RotateLog.dir = {:?}", self.dir))
441 }
442
443 pub fn sync(&mut self) -> crate::Result<u8> {
449 let result: crate::Result<_> = (|| {
450 let span = debug_span!("RotateLog::sync", latest = self.latest as u32);
451 if let Some(dir) = &self.dir {
452 span.record("dir", dir.to_string_lossy().as_ref());
453 }
454 let _guard = span.enter();
455
456 if self.dir.is_none() {
457 return Ok(0);
458 }
459
460 if self.writable_log().iter_dirty().next().is_none() {
461 if let Ok(latest) = read_latest(self.dir.as_ref().unwrap()) {
463 if latest != self.latest {
464 self.set_logs(read_logs(
467 self.dir.as_ref().unwrap(),
468 &self.open_options,
469 latest,
470 )?);
471 self.latest = latest;
472 }
473 self.writable_log().sync()?;
474 } else {
475 }
478 } else {
479 let dir = self.dir.clone().unwrap();
481 let lock = ScopedDirLock::new(&dir)?;
482
483 let latest = read_latest(self.dir.as_ref().unwrap())?;
485 if latest != self.latest {
486 let mut new_logs =
494 read_logs(self.dir.as_ref().unwrap(), &self.open_options, latest)?;
495 if let Some(filter) = self.open_options.log_open_options.flush_filter {
496 let log = new_logs[0].get_mut().unwrap();
497 for entry in self.writable_log().iter_dirty() {
498 let content = entry?;
499 let context = FlushFilterContext { log };
500 match filter(&context, content).map_err(|err| {
501 crate::Error::wrap(err, "failed to run filter function")
502 })? {
503 FlushFilterOutput::Drop => {}
504 FlushFilterOutput::Keep => log.append(content)?,
505 FlushFilterOutput::Replace(content) => log.append(content)?,
506 }
507 }
508 } else {
509 let log = new_logs[0].get_mut().unwrap();
510 for entry in self.writable_log().iter_dirty() {
512 let bytes = entry?;
513 log.append(bytes)?;
514 }
515 }
516 self.set_logs(new_logs);
517 self.latest = latest;
518 }
519
520 let size = self.writable_log().flush()?;
521
522 #[cfg(test)]
523 if let Some(func) = self.hook_after_log_sync.as_ref() {
524 func();
525 }
526
527 if size >= self.open_options.max_bytes_per_log {
528 self.writable_log().finalize_indexes(&lock)?;
532 self.rotate_internal(&lock)?;
533 }
534 }
535
536 self.update_change_detector_to_match_meta();
537 Ok(self.latest)
538 })();
539
540 result
541 .context("in RotateLog::sync")
542 .context(|| format!(" RotateLog.dir = {:?}", self.dir))
543 }
544
545 fn update_change_detector_to_match_meta(&mut self) {
546 let meta = &self.writable_log().meta;
547 let value = meta.primary_len ^ meta.epoch ^ ((self.latest as u64) << 56);
548 if let Some(detector) = &self.change_detector {
549 detector.set(value);
550 }
551 }
552
553 pub fn remove_old_logs(&mut self) -> crate::Result<()> {
559 if let Some(dir) = &self.dir {
560 let lock = ScopedDirLock::new(dir)?;
561 let latest = read_latest(dir)?;
562 if latest == self.latest {
563 self.try_remove_old_logs(&lock);
564 }
565 }
566 Ok(())
567 }
568
569 pub fn is_changed_on_disk(&self) -> bool {
577 match &self.change_detector {
578 Some(detector) => detector.is_changed(),
579 None => false,
580 }
581 }
582
583 fn rotate_internal(&mut self, lock: &ScopedDirLock) -> crate::Result<()> {
589 let span = debug_span!("RotateLog::rotate", latest = self.latest as u32);
590 if let Some(dir) = &self.dir {
591 span.record("dir", dir.to_string_lossy().as_ref());
592 }
593 let _guard = span.enter();
594
595 let next = self.latest.wrapping_add(1);
597 let log = create_empty_log(
598 Some(self.dir.as_ref().unwrap()),
599 &self.open_options,
600 next,
601 lock,
602 )?;
603 if self.logs.len() >= self.open_options.max_log_count as usize {
604 self.logs.pop();
605 }
606 self.logs.insert(0, create_log_cell(log));
607 self.logs_len = AtomicUsize::new(self.logs.len());
608 self.latest = next;
609 self.try_remove_old_logs(lock);
610 Ok(())
611 }
612
613 pub fn flush(&mut self) -> crate::Result<u8> {
615 self.sync()
616 }
617
618 fn set_logs(&mut self, logs: Vec<OnceCell<Log>>) {
619 self.logs_len = AtomicUsize::new(logs.len());
620 self.logs = logs;
621 }
622
623 #[allow(clippy::nonminimal_bool)]
624 fn try_remove_old_logs(&self, _lock: &ScopedDirLock) {
625 if let Ok(read_dir) = self.dir.as_ref().unwrap().read_dir() {
626 let latest = self.latest;
627 let earliest = latest.wrapping_sub(self.open_options.max_log_count - 1);
628 for entry in read_dir {
629 if let Ok(entry) = entry {
630 let name = entry.file_name();
631 debug!("Inspecting {:?} for rotate log removal", name);
632 if let Some(name) = name.to_str() {
633 if let Ok(id) = name.parse::<u8>() {
634 if (latest >= earliest && (id > latest || id < earliest))
635 || (latest < earliest && (id > latest && id < earliest))
636 {
637 match fs::remove_file(entry.path().join(log::META_FILE)) {
646 Ok(()) => {}
647 Err(e) if e.kind() == io::ErrorKind::NotFound => {
648 }
650 Err(e) => {
651 debug!(
654 "Error removing rotate log meta: {:?} {:?}",
655 name, e
656 );
657 continue;
658 }
659 }
660
661 let res = fs::remove_dir_all(entry.path());
663 match res {
664 Ok(_) => debug!("Removed rotate log: {:?}", name),
665 Err(err) => {
666 debug!("Error removing rotate log directory: {:?}", err)
667 }
668 };
669 } else {
670 debug!(
671 "Not removing rotate log: {:?} (latest: {:?}, earliest: {:?})",
672 name, latest, earliest
673 );
674 }
675 }
676 }
677 }
678 }
679 }
680 }
681
682 fn writable_log(&mut self) -> &mut Log {
684 self.logs[0].get_mut().unwrap()
685 }
686
687 fn load_log(&self, index: usize) -> crate::Result<Option<&Log>> {
689 if index >= self.logs_len.load(SeqCst) {
690 return Ok(None);
691 }
692 match self.logs.get(index) {
693 Some(cell) => {
694 let id = self.latest.wrapping_sub(index as u8);
695 if let Some(dir) = &self.dir {
696 let log = cell.get_or_try_init(|| {
697 let mut open_options = self.open_options.log_open_options.clone();
698 if index > 0 {
699 open_options = open_options.with_zero_index_lag();
700 }
701 let log = load_log(dir, id, open_options);
702 trace!(
703 name = "RotateLog::load_log",
704 index = index,
705 success = log.is_ok()
706 );
707 log
708 });
709 match log {
710 Ok(log) => Ok(Some(log)),
711 Err(err) => {
712 self.logs_len.store(index, SeqCst);
714 Err(err)
715 }
716 }
717 } else {
718 Ok(cell.get())
719 }
720 }
721 None => unreachable!(),
722 }
723 }
724
725 pub fn iter(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
729 let logs = self.logs();
730 logs.into_iter().rev().flat_map(|log| log.iter())
731 }
732
733 pub fn iter_dirty(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
735 self.logs[0].get().unwrap().iter_dirty()
736 }
737}
738
739fn create_log_cell(log: Log) -> OnceCell<Log> {
741 let cell = OnceCell::new();
742 cell.set(log)
743 .expect("cell is empty so cell.set cannot fail");
744 cell
745}
746
747fn load_log(dir: &Path, id: u8, open_options: log::OpenOptions) -> crate::Result<Log> {
749 let name = format!("{}", id);
750 let log_path = dir.join(name);
751 open_options.create(false).open(log_path)
752}
753
754pub trait RotateLowLevelExt {
760 fn logs(&self) -> Vec<&Log>;
762
763 fn force_rotate(&mut self) -> crate::Result<()>;
768}
769
770impl RotateLowLevelExt for RotateLog {
771 fn logs(&self) -> Vec<&Log> {
772 (0..)
773 .map(|i| self.load_log(i))
774 .take_while(|res| match res {
775 Ok(Some(_)) => true,
776 _ => false,
777 })
778 .map(|res| res.unwrap().unwrap())
779 .collect()
780 }
781
782 fn force_rotate(&mut self) -> crate::Result<()> {
783 if self.dir.is_none() {
784 return Ok(());
786 }
787 let dir = self.dir.clone().unwrap();
789 let lock = ScopedDirLock::new(&dir)?;
790 self.latest = read_latest(self.dir.as_ref().unwrap())?;
791 self.rotate_internal(&lock)?;
792 self.set_logs(read_logs(
793 self.dir.as_ref().unwrap(),
794 &self.open_options,
795 self.latest,
796 )?);
797 Ok(())
798 }
799}
800
801pub struct RotateLogLookupIter<'a> {
803 inner_iter: log::LogLookupIter<'a>,
804 end: bool,
805 log_rotate: &'a RotateLog,
806 log_index: usize,
807 index_id: usize,
808 key: Bytes,
809}
810
811impl<'a> RotateLogLookupIter<'a> {
812 fn load_next_log(&mut self) -> crate::Result<()> {
813 if self.log_index + 1 >= self.log_rotate.logs.len() {
814 self.end = true;
815 Ok(())
816 } else {
817 self.log_index += 1;
819 match self.log_rotate.load_log(self.log_index) {
820 Ok(None) => {
821 self.end = true;
822 Ok(())
823 }
824 Err(_err) => {
825 self.end = true;
826 Ok(())
829 }
830 Ok(Some(log)) => match log.lookup(self.index_id, &self.key) {
831 Err(err) => {
832 self.end = true;
833 Err(err)
834 }
835 Ok(iter) => {
836 self.inner_iter = iter;
837 Ok(())
838 }
839 },
840 }
841 }
842 }
843
844 pub fn is_empty(mut self) -> crate::Result<bool> {
846 while !self.end {
847 if !self.inner_iter.is_empty() {
848 return Ok(false);
849 }
850 self.load_next_log()?;
851 }
852 Ok(true)
853 }
854}
855
856impl<'a> Iterator for RotateLogLookupIter<'a> {
857 type Item = crate::Result<&'a [u8]>;
858
859 fn next(&mut self) -> Option<Self::Item> {
860 if self.end {
861 return None;
862 }
863 match self.inner_iter.next() {
864 None => {
865 if let Err(err) = self.load_next_log() {
866 return Some(Err(err));
867 }
868
869 if self.end {
870 return None;
871 }
872
873 self.next()
874 }
875 Some(Err(err)) => {
876 self.end = true;
877 Some(Err(err))
878 }
879 Some(Ok(slice)) => Some(Ok(slice)),
880 }
881 }
882}
883
884fn create_empty_log(
885 dir: Option<&Path>,
886 open_options: &OpenOptions,
887 latest: u8,
888 _lock: &ScopedDirLock,
889) -> crate::Result<Log> {
890 Ok(match dir {
891 Some(dir) => {
892 let latest_path = dir.join(LATEST_FILE);
893 let latest_str = format!("{}", latest);
894 let log_path = dir.join(&latest_str);
895 let opts = open_options.log_open_options.clone().create(true);
896 opts.delete_content(&log_path)?;
897 let log = opts.open(&log_path)?;
898 utils::atomic_write(latest_path, latest_str.as_bytes(), false)?;
899 log
900 }
901 None => open_options.log_open_options.clone().open(())?,
902 })
903}
904
905fn read_latest(dir: &Path) -> crate::Result<u8> {
906 read_latest_raw(dir).context(dir, "cannot read latest")
907}
908
909fn read_latest_raw(dir: &Path) -> io::Result<u8> {
911 let latest_path = dir.join(LATEST_FILE);
912 let data = utils::atomic_read(&latest_path)?;
913 let content: String = String::from_utf8(data).map_err(|_e| {
914 io::Error::new(
915 io::ErrorKind::InvalidData,
916 format!("{:?}: failed to read as utf8 string", latest_path),
917 )
918 })?;
919 let id: u8 = content.parse().map_err(|_e| {
920 io::Error::new(
921 io::ErrorKind::InvalidData,
922 format!(
923 "{:?}: failed to parse {:?} as u8 integer",
924 latest_path, content
925 ),
926 )
927 })?;
928 Ok(id)
929}
930
931fn read_logs(
932 dir: &Path,
933 open_options: &OpenOptions,
934 latest: u8,
935) -> crate::Result<Vec<OnceCell<Log>>> {
936 let mut logs = Vec::with_capacity(open_options.max_log_count as usize);
937
938 let log = load_log(dir, latest, open_options.log_open_options.clone())?;
940 logs.push(create_log_cell(log));
941
942 for index in 1..open_options.max_log_count {
944 let id = latest.wrapping_sub(index);
945 let name = format!("{}", id);
948 let log_path = dir.join(&name);
949 if !log_path.is_dir() {
950 break;
951 }
952 logs.push(OnceCell::new());
953 }
954 trace!(
955 name = "RotateLog::read_logs",
956 max_log_count = open_options.max_log_count,
957 logs_len = logs.len()
958 );
959
960 Ok(logs)
961}
962
963fn read_latest_and_logs(
964 dir: &Path,
965 open_options: &OpenOptions,
966) -> crate::Result<(u8, Vec<OnceCell<Log>>)> {
967 let latest = read_latest(dir)?;
968 Ok((latest, read_logs(dir, open_options, latest)?))
969}
970
971fn guess_latest(mut ids: Vec<u8>) -> u8 {
973 ids.sort_unstable();
975
976 let mut id_to_ignore = 255;
977 loop {
978 match ids.pop() {
979 Some(id) => {
980 if id == id_to_ignore {
983 id_to_ignore -= 1;
984 if id_to_ignore == 0 {
985 break 0;
987 }
988 continue;
989 } else {
990 break id;
993 }
994 }
995 None => {
996 break 0;
998 }
999 }
1000 }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005 use log::IndexOutput;
1006 use tempfile::tempdir;
1007
1008 use super::*;
1009
1010 #[test]
1011 fn test_open() {
1012 let dir = tempdir().unwrap();
1013 let path = dir.path().join("rotate");
1014
1015 assert!(OpenOptions::new().create(false).open(&path).is_err());
1016 assert!(OpenOptions::new().create(true).open(&path).is_ok());
1017 assert!(
1018 OpenOptions::new()
1019 .checksum_type(log::ChecksumType::Xxhash64)
1020 .create(false)
1021 .open(&path)
1022 .is_ok()
1023 );
1024 }
1025
1026 fn lookup<'a>(rotate: &'a RotateLog, key: &[u8]) -> Vec<&'a [u8]> {
1028 let values = rotate
1029 .lookup(0, key.to_vec())
1030 .unwrap()
1031 .collect::<crate::Result<Vec<&[u8]>>>()
1032 .unwrap();
1033 for value in &values {
1034 let b1 = rotate.slice_to_bytes(value);
1035 let b2 = rotate.slice_to_bytes(value);
1036 if rotate
1038 .iter_dirty()
1039 .any(|i| i.unwrap().as_ptr() == value.as_ptr())
1040 {
1041 continue;
1042 }
1043 assert_eq!(
1044 b1.as_ptr(),
1045 b2.as_ptr(),
1046 "slice_to_bytes should return zero-copy"
1047 );
1048 }
1049 values
1050 }
1051
1052 fn iter(rotate: &RotateLog) -> Vec<&[u8]> {
1053 rotate
1054 .iter()
1055 .collect::<crate::Result<Vec<&[u8]>>>()
1056 .unwrap()
1057 }
1058
1059 #[test]
1060 fn test_trivial_append_lookup() {
1061 let dir = tempdir().unwrap();
1062 let opts = OpenOptions::new()
1063 .create(true)
1064 .index_defs(vec![IndexDef::new("two-bytes", |_| {
1065 vec![IndexOutput::Reference(0..2)]
1066 })]);
1067
1068 let rotate = opts.open(&dir).unwrap();
1069 let rotate_mem = opts.create_in_memory().unwrap();
1070
1071 for rotate in &mut [rotate, rotate_mem] {
1072 rotate.append(b"aaa").unwrap();
1073 rotate.append(b"abbb").unwrap();
1074 rotate.append(b"abc").unwrap();
1075
1076 assert_eq!(lookup(rotate, b"aa"), vec![b"aaa"]);
1077 assert_eq!(lookup(rotate, b"ab"), vec![&b"abc"[..], b"abbb"]);
1078 assert_eq!(lookup(rotate, b"ac"), Vec::<&[u8]>::new());
1079 }
1080 }
1081
1082 #[test]
1083 fn test_simple_rotate() {
1084 let dir = tempdir().unwrap();
1085 let mut rotate = OpenOptions::new()
1086 .create(true)
1087 .max_bytes_per_log(100)
1088 .max_log_count(2)
1089 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1090 .open(&dir)
1091 .unwrap();
1092
1093 rotate.append(b"a").unwrap();
1095 assert_eq!(rotate.sync().unwrap(), 0);
1096 rotate.append(b"a").unwrap();
1097 assert_eq!(rotate.sync().unwrap(), 0);
1098
1099 rotate.append(vec![b'b'; 100]).unwrap();
1101 assert_eq!(rotate.sync().unwrap(), 1);
1102 assert_eq!(lookup(&rotate, b"a").len(), 2);
1103
1104 rotate.append(vec![b'c'; 50]).unwrap();
1107 assert_eq!(rotate.sync().unwrap(), 1);
1108 rotate.append(vec![b'd'; 50]).unwrap();
1109 assert_eq!(rotate.sync().unwrap(), 2);
1110 assert_eq!(lookup(&rotate, b"a").len(), 0);
1111 assert_eq!(lookup(&rotate, b"b").len(), 0);
1112 assert_eq!(lookup(&rotate, b"c").len(), 1);
1113 assert_eq!(lookup(&rotate, b"d").len(), 1);
1114 assert!(!dir.path().join("0").exists());
1115 }
1116
1117 #[test]
1118 fn test_manual_remove_old_logs() {
1119 let dir = tempdir().unwrap();
1120 let dir = &dir;
1121 let open = |n: u8| -> RotateLog {
1122 OpenOptions::new()
1123 .create(true)
1124 .max_bytes_per_log(1)
1125 .max_log_count(n)
1126 .open(dir)
1127 .unwrap()
1128 };
1129 let read_all =
1130 |log: &RotateLog| -> Vec<Vec<u8>> { log.iter().map(|v| v.unwrap().to_vec()).collect() };
1131
1132 {
1134 let mut rotate = open(5);
1135 for i in 0..5 {
1136 rotate.append(vec![i]).unwrap();
1137 rotate.sync().unwrap();
1138 }
1139 }
1140
1141 {
1143 let rotate = open(4);
1144 assert_eq!(read_all(&rotate), [[2], [3], [4]]);
1145 let rotate = open(3);
1146 assert_eq!(read_all(&rotate), [[3], [4]]);
1147 }
1148
1149 {
1151 let mut rotate = open(3);
1152 rotate.remove_old_logs().unwrap();
1153 }
1154
1155 {
1157 let rotate = open(4);
1158 assert_eq!(read_all(&rotate), [[3], [4]]);
1159 }
1160 }
1161
1162 fn test_wrapping_rotate(max_log_count: u8) {
1163 let dir = tempdir().unwrap();
1164 let mut rotate = OpenOptions::new()
1165 .create(true)
1166 .max_bytes_per_log(10)
1167 .max_log_count(max_log_count)
1168 .open(&dir)
1169 .unwrap();
1170
1171 let count = || {
1172 fs::read_dir(&dir)
1173 .unwrap()
1174 .map(|entry| entry.unwrap().file_name().into_string().unwrap())
1175 .filter(|name| name != "lock" && name != "rlock")
1177 .count()
1178 };
1179
1180 for i in 1..=(max_log_count - 1) {
1181 rotate.append(b"abcdefghijklmn").unwrap();
1182 assert_eq!(rotate.sync().unwrap(), i);
1183 assert_eq!(count(), (i as usize) + 2);
1184 }
1185
1186 for i in max_log_count..=255 {
1187 rotate.append(b"abcdefghijklmn").unwrap();
1188 assert_eq!(rotate.sync().unwrap(), i);
1189 assert_eq!(count(), (max_log_count as usize) + 1);
1190 }
1191
1192 for _ in 0..=max_log_count {
1193 rotate.append(b"abcdefghijklmn").unwrap();
1194 assert_eq!(count(), (max_log_count as usize) + 1);
1195 }
1196 }
1197
1198 #[test]
1199 fn test_wrapping_rotate_10() {
1200 test_wrapping_rotate(10)
1201 }
1202
1203 #[test]
1204 fn test_wrapping_rotate_255() {
1205 test_wrapping_rotate(255)
1206 }
1207
1208 #[test]
1209 fn test_force_rotate() {
1210 let dir = tempdir().unwrap();
1211 let mut rotate = OpenOptions::new()
1212 .create(true)
1213 .max_bytes_per_log(1 << 30)
1214 .max_log_count(3)
1215 .open(&dir)
1216 .unwrap();
1217
1218 use super::RotateLowLevelExt;
1219 assert_eq!(rotate.logs().len(), 1);
1220 rotate.force_rotate().unwrap();
1221 assert_eq!(rotate.logs().len(), 2);
1222 rotate.force_rotate().unwrap();
1223 assert_eq!(rotate.logs().len(), 3);
1224 rotate.force_rotate().unwrap();
1225 assert_eq!(rotate.logs().len(), 3);
1226 }
1227
1228 #[test]
1229 fn test_lookup_rotated() {
1230 let dir = tempdir().unwrap();
1232 let open_opts = OpenOptions::new()
1233 .create(true)
1234 .max_bytes_per_log(1)
1235 .max_log_count(3)
1236 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1237
1238 let mut rotate1 = open_opts.open(&dir).unwrap();
1240 rotate1.append(b"a1").unwrap();
1241 assert_eq!(rotate1.sync().unwrap(), 1);
1242 rotate1.append(b"a2").unwrap();
1243 assert_eq!(rotate1.sync().unwrap(), 2);
1244
1245 assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1247 assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1248
1249 let rotate2 = open_opts.open(&dir).unwrap();
1251
1252 let mut rotate3 = open_opts.open(&dir).unwrap();
1254 rotate3.append(b"a3").unwrap();
1255 assert_eq!(rotate3.sync().unwrap(), 3);
1256
1257 assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1260 assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1261
1262 assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
1265 assert_eq!(iter(&rotate2), vec![b"a2"]);
1266 }
1267
1268 #[test]
1269 fn test_is_empty() -> crate::Result<()> {
1270 let dir = tempdir().unwrap();
1271 let open_opts = OpenOptions::new()
1272 .create(true)
1273 .max_bytes_per_log(2)
1274 .max_log_count(4)
1275 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1276
1277 let mut rotate = open_opts.open(&dir)?;
1278 rotate.append(b"a1")?;
1279 assert_eq!(rotate.sync()?, 1);
1280
1281 rotate.append(b"a2")?;
1282 assert_eq!(rotate.sync()?, 2);
1283
1284 rotate.append(b"b1")?;
1285 assert_eq!(rotate.sync()?, 3);
1286
1287 assert_eq!(lookup(&rotate, b"a"), vec![b"a2", b"a1"]);
1288 assert_eq!(lookup(&rotate, b"b"), vec![b"b1"]);
1289
1290 assert!(!rotate.lookup(0, b"a".to_vec())?.is_empty()?);
1291 assert!(!rotate.lookup(0, b"b".to_vec())?.is_empty()?);
1292 assert!(rotate.lookup(0, b"c".to_vec())?.is_empty()?);
1293
1294 Ok(())
1295 }
1296
1297 #[test]
1298 fn test_lookup_truncated_meta() {
1299 let dir = tempdir().unwrap();
1301 let open_opts = OpenOptions::new()
1302 .create(true)
1303 .max_bytes_per_log(1)
1304 .max_log_count(3)
1305 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1306
1307 let mut rotate1 = open_opts.open(&dir).unwrap();
1309 rotate1.append(b"a1").unwrap();
1310 assert_eq!(rotate1.sync().unwrap(), 1);
1311 rotate1.append(b"a2").unwrap();
1312 assert_eq!(rotate1.sync().unwrap(), 2);
1313
1314 assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1316 assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1317
1318 let rotate2 = open_opts.open(&dir).unwrap();
1320
1321 utils::atomic_write(dir.path().join("0").join(log::META_FILE), "", false).unwrap();
1323
1324 assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1327 assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1328
1329 assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
1331 assert_eq!(iter(&rotate2), vec![b"a2"]);
1332 }
1333
1334 #[test]
1335 fn test_concurrent_writes() {
1336 let dir = tempdir().unwrap();
1337 let mut rotate1 = OpenOptions::new()
1338 .create(true)
1339 .max_bytes_per_log(100)
1340 .max_log_count(2)
1341 .open(&dir)
1342 .unwrap();
1343 let mut rotate2 = OpenOptions::new()
1344 .max_bytes_per_log(100)
1345 .max_log_count(2)
1346 .open(&dir)
1347 .unwrap();
1348
1349 rotate1.append(vec![b'a'; 100]).unwrap();
1351 assert_eq!(rotate1.sync().unwrap(), 1);
1352
1353 let size = |log_index: u64| {
1354 dir.path()
1355 .join(format!("{}", log_index))
1356 .join(log::PRIMARY_FILE)
1357 .metadata()
1358 .unwrap()
1359 .len()
1360 };
1361
1362 let size1 = size(1);
1363
1364 rotate2.append(vec![b'b'; 100]).unwrap();
1366 assert_eq!(rotate2.sync().unwrap(), 2);
1367
1368 #[cfg(unix)]
1369 {
1370 assert!(!dir.path().join("0").exists());
1371 }
1372 assert!(size(1) > size1 + 100);
1373 assert!(size(2) > 0);
1374 }
1375
1376 #[test]
1377 fn test_flush_filter() {
1378 let dir = tempdir().unwrap();
1379
1380 let read_log = |name: &str| -> Vec<Vec<u8>> {
1381 let log = Log::open(dir.path().join(name), Vec::new()).unwrap();
1382 log.iter().map(|v| v.unwrap().to_vec()).collect()
1383 };
1384
1385 let mut rotate1 = OpenOptions::new()
1386 .create(true)
1387 .max_bytes_per_log(100)
1388 .flush_filter(Some(|ctx, bytes| {
1389 assert!(!ctx.log.iter().any(|x| x.unwrap() == b"aa"));
1391 Ok(match bytes.len() {
1392 1 => FlushFilterOutput::Replace(b"xx".to_vec()),
1393 _ => FlushFilterOutput::Keep,
1394 })
1395 }))
1396 .open(&dir)
1397 .unwrap();
1398
1399 let mut rotate2 = OpenOptions::new()
1400 .max_bytes_per_log(100)
1401 .open(&dir)
1402 .unwrap();
1403
1404 rotate2.append(vec![b'a'; 3]).unwrap();
1405 rotate2.sync().unwrap();
1406
1407 rotate1.append(vec![b'a'; 1]).unwrap(); rotate1.append(vec![b'a'; 2]).unwrap();
1409 assert_eq!(rotate1.sync().unwrap(), 0); assert_eq!(read_log("0"), vec![&b"aaa"[..], b"xx", b"aa"]);
1411
1412 rotate1.append(vec![b'a'; 1]).unwrap(); assert_eq!(rotate1.sync().unwrap(), 0); assert_eq!(read_log("0").last().unwrap(), b"a");
1415
1416 rotate1.append(vec![b'a'; 1]).unwrap(); rotate1.append(vec![b'a'; 2]).unwrap();
1418
1419 rotate2.append(vec![b'a'; 100]).unwrap(); assert_eq!(rotate2.sync().unwrap(), 1);
1421
1422 assert_eq!(rotate1.sync().unwrap(), 1); assert_eq!(read_log("1"), vec![b"xx", b"aa"]);
1424 }
1425
1426 #[test]
1427 fn test_is_changed_on_disk() {
1428 let dir = tempdir().unwrap();
1429 let open_opts = OpenOptions::new()
1430 .create(true)
1431 .max_bytes_per_log(5000)
1432 .max_log_count(2);
1433
1434 for _ in 0..10 {
1436 let mut rotate1 = open_opts.open(&dir).unwrap();
1437 let mut rotate2 = open_opts.open(&dir).unwrap();
1438
1439 assert!(!rotate1.is_changed_on_disk());
1440 assert!(!rotate2.is_changed_on_disk());
1441
1442 rotate1.sync().unwrap();
1444 assert!(!rotate2.is_changed_on_disk());
1445
1446 rotate1.append([b'a'; 1000]).unwrap();
1448
1449 assert!(!rotate1.is_changed_on_disk());
1450 assert!(!rotate2.is_changed_on_disk());
1451
1452 rotate1.sync().unwrap();
1454 assert!(!rotate1.is_changed_on_disk());
1455
1456 assert!(rotate2.is_changed_on_disk());
1458
1459 assert!(rotate2.is_changed_on_disk());
1461
1462 rotate2.sync().unwrap();
1464 assert!(!rotate2.is_changed_on_disk());
1465 assert!(!rotate1.is_changed_on_disk());
1467
1468 rotate2.append([b'a'; 1000]).unwrap();
1469 rotate2.sync().unwrap();
1470
1471 assert!(rotate1.is_changed_on_disk());
1473
1474 rotate1.append([b'a'; 1000]).unwrap();
1476 rotate1.sync().unwrap();
1477 assert!(!rotate1.is_changed_on_disk());
1478 }
1479 }
1480
1481 #[test]
1482 fn test_lookup_latest() {
1483 let dir = tempdir().unwrap();
1484 let mut rotate = OpenOptions::new()
1485 .create(true)
1486 .max_bytes_per_log(100)
1487 .flush_filter(Some(|_, _| panic!()))
1488 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1489 .open(&dir)
1490 .unwrap();
1491
1492 rotate.append(vec![b'a'; 101]).unwrap();
1493 rotate.sync().unwrap(); rotate.append(vec![b'b'; 10]).unwrap();
1495
1496 assert_eq!(rotate.lookup_latest(0, b"b").unwrap().count(), 1);
1497 assert_eq!(rotate.lookup_latest(0, b"a").unwrap().count(), 0);
1498
1499 rotate.append(vec![b'c'; 101]).unwrap();
1500 rotate.sync().unwrap(); rotate.append(vec![b'd'; 10]).unwrap();
1503 rotate.sync().unwrap(); rotate.append(vec![b'e'; 10]).unwrap();
1505
1506 assert_eq!(rotate.lookup_latest(0, b"c").unwrap().count(), 0);
1507 assert_eq!(rotate.lookup_latest(0, b"d").unwrap().count(), 1);
1508 assert_eq!(rotate.lookup_latest(0, b"e").unwrap().count(), 1);
1509 }
1510
1511 #[test]
1512 #[should_panic]
1513 fn test_lookup_latest_panic() {
1514 let dir = tempdir().unwrap();
1515 let rotate = OpenOptions::new()
1516 .create(true)
1517 .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1518 .open(&dir)
1519 .unwrap();
1520 rotate.lookup_latest(0, b"a").unwrap(); }
1522
1523 #[test]
1524 fn test_iter() {
1525 let dir = tempdir().unwrap();
1526 let mut rotate = OpenOptions::new()
1527 .create(true)
1528 .max_bytes_per_log(100)
1529 .open(&dir)
1530 .unwrap();
1531
1532 let a = vec![b'a'; 101];
1533 let b = vec![b'b'; 10];
1534
1535 rotate.append(a.clone()).unwrap();
1536 assert_eq!(
1537 rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
1538 vec![&a[..]]
1539 );
1540
1541 rotate.sync().unwrap(); rotate.append(b.clone()).unwrap();
1543 rotate.append(a.clone()).unwrap();
1544 rotate.append(a.clone()).unwrap();
1545 assert_eq!(
1546 rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
1547 vec![&b[..], &a, &a]
1548 );
1549
1550 assert_eq!(
1551 rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
1552 vec![&a[..], &b, &a, &a],
1553 );
1554
1555 rotate.sync().unwrap(); assert_eq!(
1557 rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
1558 vec![&b[..], &a, &a],
1559 );
1560 }
1561
1562 #[test]
1563 fn test_recover_from_empty_logs() {
1564 let dir = tempdir().unwrap();
1565 let rotate = OpenOptions::new().create(true).open(&dir).unwrap();
1566 drop(rotate);
1567
1568 for dirent in fs::read_dir(&dir).unwrap() {
1570 let dirent = dirent.unwrap();
1571 let path = dirent.path();
1572 if path.is_dir() {
1573 fs::remove_dir_all(path).unwrap();
1574 }
1575 }
1576
1577 let _ = OpenOptions::new().create(true).open(&dir).unwrap();
1578 }
1579
1580 #[test]
1581 fn test_recover_from_occupied_logs() {
1582 let dir = tempdir().unwrap();
1583
1584 {
1587 let mut log = log::OpenOptions::new()
1588 .create(true)
1589 .open(dir.path().join("1"))
1590 .unwrap();
1591 log.append(&[b'b'; 100][..]).unwrap();
1592 log.append(&[b'c'; 100][..]).unwrap();
1593 log.sync().unwrap();
1594 }
1595
1596 let mut rotate = OpenOptions::new()
1598 .create(true)
1599 .max_bytes_per_log(100)
1600 .max_log_count(3)
1601 .open(&dir)
1602 .unwrap();
1603 for i in [1, 2] {
1604 rotate.append(vec![b'a'; 101]).unwrap();
1605 assert_eq!(rotate.sync().unwrap(), i); }
1607
1608 assert_eq!(
1610 rotate.iter().map(|b| b.unwrap()[0]).collect::<Vec<_>>(),
1611 vec![b'a'; 2]
1612 );
1613 }
1614
1615 #[test]
1616 fn test_index_lag() {
1617 let dir = tempdir().unwrap();
1618 let opts = OpenOptions::new()
1619 .create(true)
1620 .index_defs(vec![
1621 IndexDef::new("idx", |_| vec![IndexOutput::Reference(0..2)])
1622 .lag_threshold(u64::max_value()),
1623 ])
1624 .max_bytes_per_log(100)
1625 .max_log_count(3);
1626
1627 let size = |name: &str| dir.path().join(name).metadata().unwrap().len();
1628
1629 let mut rotate = opts.open(&dir).unwrap();
1630 rotate.append(vec![b'x'; 200]).unwrap();
1631 rotate.sync().unwrap();
1632 rotate.append(vec![b'y'; 200]).unwrap();
1633 rotate.sync().unwrap();
1634 rotate.append(vec![b'z'; 10]).unwrap();
1635 rotate.sync().unwrap();
1636
1637 assert!(size("0/index2-idx") > 0);
1640 assert!(size("0/log") > 100);
1641
1642 assert!(size("1/index2-idx") > 0);
1643 assert!(size("1/log") > 100);
1644
1645 assert_eq!(size("2/index2-idx"), 25);
1649 assert!(size("2/log") < 100);
1650 }
1651
1652 #[test]
1653 fn test_sync_missing_latest() {
1654 let dir = tempdir().unwrap();
1655 let opts = OpenOptions::new()
1656 .max_bytes_per_log(10000)
1657 .max_log_count(10);
1658 let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1659 rotate.append(vec![b'x'; 200]).unwrap();
1660 rotate.sync().unwrap();
1661
1662 let mut rotate2 = opts.open(&dir).unwrap();
1663 fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
1664 rotate2.sync().unwrap(); rotate2.append(vec![b'y'; 200]).unwrap();
1666 rotate2.sync().unwrap_err(); }
1668
1669 #[test]
1670 fn test_auto_sync_threshold() {
1671 let dir = tempdir().unwrap();
1672 let opts = OpenOptions::new().auto_sync_threshold(100).create(true);
1673
1674 let mut rotate = opts.create(true).open(&dir).unwrap();
1675 rotate.append(vec![b'x'; 50]).unwrap();
1676 assert_eq!(rotate.logs()[0].iter_dirty().count(), 1);
1677 rotate.append(vec![b'x'; 50]).unwrap(); assert_eq!(rotate.logs()[0].iter_dirty().count(), 0);
1679 }
1680
1681 #[test]
1682 fn test_auto_sync_threshold_with_racy_index_update_on_open() {
1683 fn index_defs(lag_threshold: u64) -> Vec<IndexDef> {
1684 let index_names = ["a"];
1685 (0..index_names.len())
1686 .map(|i| {
1687 IndexDef::new(index_names[i], |_| vec![IndexOutput::Reference(0..1)])
1688 .lag_threshold(lag_threshold)
1689 })
1690 .collect()
1691 }
1692
1693 fn open_opts(lag_threshold: u64) -> OpenOptions {
1694 let index_defs = index_defs(lag_threshold);
1695 OpenOptions::new()
1696 .auto_sync_threshold(1000)
1697 .max_bytes_per_log(400)
1698 .max_log_count(10)
1699 .create(true)
1700 .index_defs(index_defs)
1701 }
1702
1703 let dir = tempdir().unwrap();
1704 let path = dir.path();
1705 let data: &[u8] = &[b'x'; 100];
1706 let n = 10;
1707 for _i in 0..n {
1708 let mut rotate1 = open_opts(300).open(path).unwrap();
1709 rotate1.hook_after_log_sync = Some({
1710 let path = path.to_path_buf();
1711 Box::new(move || {
1712 let rotate2 = open_opts(100).open(&path).unwrap();
1714 let _all = rotate2.iter().collect::<Result<Vec<_>, _>>().unwrap();
1716 })
1717 });
1718 rotate1.append(data).unwrap();
1719 rotate1.sync().unwrap();
1720 }
1721
1722 let rotate1 = open_opts(300).open(path).unwrap();
1724 let mut count = 0;
1725 for entry in rotate1.lookup(0, b"x" as &[u8]).unwrap() {
1726 let entry = entry.unwrap();
1727 assert_eq!(entry, data);
1728 count += 1;
1729 }
1730 assert_eq!(count, n);
1731 }
1732
1733 #[test]
1734 fn test_reindex_old_logs() {
1735 let dir = tempdir().unwrap();
1736 let opts = OpenOptions::new()
1737 .max_bytes_per_log(10)
1738 .max_log_count(10)
1739 .create(true);
1740
1741 let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1742 for i in 0..2u8 {
1743 rotate.append(vec![i; 50]).unwrap();
1744 rotate.sync().unwrap(); }
1746
1747 let opts = opts.index("a", |_data| vec![IndexOutput::Reference(0..1)]);
1749
1750 let rotate = opts.create(true).open(&dir).unwrap();
1752
1753 assert!(!dir.path().join("1/index2-a").exists());
1755 assert!(!dir.path().join("0/index2-a").exists());
1756
1757 let mut iter = rotate.lookup(0, b"\x00".to_vec()).unwrap();
1759
1760 assert!(!dir.path().join("1/index2-a").exists());
1762
1763 assert_eq!(iter.next().unwrap().unwrap(), &[0; 50][..]);
1765
1766 assert!(dir.path().join("1/index2-a").exists());
1768 assert!(dir.path().join("0/index2-a").exists());
1769 }
1770
1771 #[test]
1772 fn test_repair_latest() {
1773 assert_eq!(guess_latest(vec![]), 0);
1774 assert_eq!(guess_latest(vec![3, 4, 5]), 5);
1775 assert_eq!(guess_latest(vec![0, 1, 2, 254, 255]), 2);
1776 assert_eq!(guess_latest((0..=255).collect::<Vec<_>>()), 0);
1777
1778 let dir = tempdir().unwrap();
1779 let opts = OpenOptions::new().max_bytes_per_log(100).max_log_count(10);
1780 let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1781 for i in 1..=2 {
1782 rotate.append(vec![b'x'; 200]).unwrap();
1783 assert_eq!(rotate.sync().unwrap(), i);
1784 }
1785
1786 let latest_path = dir.path().join(LATEST_FILE);
1788 utils::atomic_write(latest_path, "NaN", false).unwrap();
1789 assert!(opts.open(&dir).is_err());
1790 assert_eq!(
1791 opts.repair(&dir)
1792 .unwrap()
1793 .lines()
1794 .filter(|l| !l.contains("Processing"))
1795 .collect::<Vec<_>>()
1796 .join("\n"),
1797 r#"Attempt to repair log "0"
1798Verified 1 entries, 223 bytes in log
1799Attempt to repair log "1"
1800Verified 1 entries, 223 bytes in log
1801Attempt to repair log "2"
1802Verified 0 entries, 12 bytes in log
1803Reset latest to 2"#
1804 );
1805 opts.open(&dir).unwrap();
1806
1807 fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
1809 assert!(opts.open(&dir).is_err());
1810
1811 assert_eq!(
1813 opts.repair(&dir)
1814 .unwrap()
1815 .lines()
1816 .filter(|l| !l.contains("Processing"))
1817 .collect::<Vec<_>>()
1818 .join("\n"),
1819 r#"Attempt to repair log "0"
1820Verified 1 entries, 223 bytes in log
1821Attempt to repair log "1"
1822Verified 1 entries, 223 bytes in log
1823Attempt to repair log "2"
1824Verified 0 entries, 12 bytes in log
1825Reset latest to 2"#
1826 );
1827 opts.open(&dir).unwrap();
1828 }
1829
1830 #[test]
1831 fn test_load_broken_logs_once() {
1832 let dir = tempdir().unwrap();
1833 let open_opts = OpenOptions::new()
1834 .create(true)
1835 .max_log_count(10)
1836 .max_bytes_per_log(100);
1837 let mut log = open_opts.open(dir.path()).unwrap();
1838
1839 for i in 0..4 {
1841 log.append(&[i; 200][..]).unwrap();
1842 log.sync().unwrap();
1843 }
1844
1845 utils::atomic_write(dir.path().join("1").join("meta"), "foo", false).unwrap();
1847 let log = open_opts.open(dir.path()).unwrap();
1848
1849 assert!(log.load_log(3).is_err()); assert!(log.load_log(3).is_ok()); assert_eq!(
1855 log.iter().map(|i| i.unwrap()[0]).collect::<Vec<_>>(),
1856 [2, 3]
1857 );
1858 }
1859
1860 #[test]
1861 fn test_multithread_sync() {
1862 let dir = tempdir().unwrap();
1863
1864 const THREAD_COUNT: u8 = if cfg!(debug_assertions) { 10 } else { 30 };
1866 const WRITE_COUNT_PER_THREAD: u8 = if cfg!(debug_assertions) { 10 } else { 50 };
1867
1868 fn index_ref(data: &[u8]) -> Vec<IndexOutput> {
1870 vec![IndexOutput::Reference(0..data.len() as u64)]
1871 }
1872 fn index_copy(data: &[u8]) -> Vec<IndexOutput> {
1873 vec![IndexOutput::Owned(data.to_vec().into_boxed_slice())]
1874 }
1875 let indexes = vec![
1876 IndexDef::new("key1", index_ref).lag_threshold(1),
1877 IndexDef::new("key2", index_ref).lag_threshold(50),
1878 IndexDef::new("key3", index_ref).lag_threshold(1000),
1879 IndexDef::new("key4", index_copy).lag_threshold(1),
1880 IndexDef::new("key5", index_copy).lag_threshold(50),
1881 IndexDef::new("key6", index_copy).lag_threshold(1000),
1882 ];
1883 let index_len = indexes.len();
1884 let open_opts = OpenOptions::new()
1885 .create(true)
1886 .max_log_count(200)
1887 .max_bytes_per_log(200)
1888 .index_defs(indexes);
1889
1890 use std::sync::Arc;
1891 use std::sync::Barrier;
1892 let barrier = Arc::new(Barrier::new(THREAD_COUNT as usize));
1893 let threads: Vec<_> = (0..THREAD_COUNT)
1894 .map(|i| {
1895 let barrier = barrier.clone();
1896 let open_opts = open_opts.clone();
1897 let path = dir.path().to_path_buf();
1898 std::thread::spawn(move || {
1899 barrier.wait();
1900 let mut log = open_opts.open(path).unwrap();
1901 for j in 1..=WRITE_COUNT_PER_THREAD {
1902 let buf = [i, j];
1903 log.append(buf).unwrap();
1904 if j % (i + 1) == 0 || j == WRITE_COUNT_PER_THREAD {
1905 log.sync().unwrap();
1906 for entry in log.iter().map(|d| d.unwrap()) {
1908 for index_id in 0..index_len {
1909 for index_value in log.lookup(index_id, entry.to_vec()).unwrap()
1910 {
1911 assert_eq!(index_value.unwrap(), entry);
1912 }
1913 }
1914 }
1915 }
1916 }
1917 })
1918 })
1919 .collect();
1920
1921 for thread in threads {
1923 thread.join().expect("joined");
1924 }
1925
1926 let log = open_opts.open(dir.path()).unwrap();
1928 let count = log.iter().count() as u64;
1929 assert_eq!(count, THREAD_COUNT as u64 * WRITE_COUNT_PER_THREAD as u64);
1930 }
1931}