indexedlog/
rotate.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Rotation support for a set of [`Log`]s.
9
10use std::fmt;
11use std::fs;
12use std::io;
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::atomic::AtomicUsize;
16use std::sync::atomic::Ordering::SeqCst;
17
18use minibytes::Bytes;
19use once_cell::sync::OnceCell;
20use tracing::debug;
21use tracing::debug_span;
22use tracing::trace;
23
24use crate::change_detect::SharedChangeDetector;
25use crate::errors::IoResultExt;
26use crate::errors::ResultExt;
27use crate::lock::ScopedDirLock;
28use crate::lock::READER_LOCK_OPTS;
29use crate::log;
30use crate::log::FlushFilterContext;
31use crate::log::FlushFilterFunc;
32use crate::log::FlushFilterOutput;
33use crate::log::IndexDef;
34use crate::log::Log;
35use crate::repair::OpenOptionsOutput;
36use crate::repair::OpenOptionsRepair;
37use crate::repair::RepairMessage;
38use crate::utils;
39
40/// A collection of [`Log`]s that get rotated or deleted automatically when they
41/// exceed size or count limits.
42///
43/// Writes go to the active [`Log`]. Reads scan through all [`Log`]s.
44pub struct RotateLog {
45    dir: Option<PathBuf>,
46    open_options: OpenOptions,
47    logs: Vec<OnceCell<Log>>,
48    // Logical length of `logs`. It can be smaller than `logs.len()` if some Log
49    // fails to load.
50    logs_len: AtomicUsize,
51    latest: u8,
52    // Indicate an active reader. Destrictive writes (repair) are unsafe.
53    reader_lock: Option<ScopedDirLock>,
54    change_detector: Option<SharedChangeDetector>,
55    // Run after log.sync(). For testing purpose only.
56    #[cfg(test)]
57    hook_after_log_sync: Option<Box<dyn Fn()>>,
58}
59
60// On disk, a RotateLog is a directory containing:
61// - 0/, 1/, 2/, 3/, ...: one Log per directory.
62// - latest: a file, the name of the directory that is considered "active".
63
64const LATEST_FILE: &str = "latest";
65
66/// Options used to configure how a [`RotateLog`] is opened.
67#[derive(Clone)]
68pub struct OpenOptions {
69    pub(crate) max_bytes_per_log: u64,
70    pub(crate) max_log_count: u8,
71    pub(crate) log_open_options: log::OpenOptions,
72    pub(crate) auto_sync_threshold: Option<u64>,
73}
74
75impl OpenOptions {
76    #[allow(clippy::new_without_default)]
77    /// Creates a default new set of options ready for configuration.
78    ///
79    /// The default values are:
80    /// - Keep 2 logs.
81    /// - A log gets rotated when it exceeds 2GB.
82    /// - No indexes.
83    /// - Do not create on demand.
84    /// - Do not sync automatically on append().
85    pub fn new() -> Self {
86        // Some "seemingly reasonable" default values. Not scientifically chosen.
87        let max_log_count = 2;
88        let max_bytes_per_log = 2_000_000_000; // 2 GB
89        Self {
90            max_bytes_per_log,
91            max_log_count,
92            log_open_options: log::OpenOptions::new(),
93            auto_sync_threshold: None,
94        }
95    }
96
97    /// Set the maximum [`Log`] count.
98    ///
99    /// A larger value would hurt lookup performance.
100    pub fn max_log_count(mut self, count: u8) -> Self {
101        assert!(count >= 1);
102        self.max_log_count = count;
103        self
104    }
105
106    /// Set the maximum bytes per [`Log`].
107    pub fn max_bytes_per_log(mut self, bytes: u64) -> Self {
108        assert!(bytes > 0);
109        self.max_bytes_per_log = bytes;
110        self
111    }
112
113    /// Sets the checksum type.
114    ///
115    /// See [log::ChecksumType] for details.
116    pub fn checksum_type(mut self, checksum_type: log::ChecksumType) -> Self {
117        self.log_open_options = self.log_open_options.checksum_type(checksum_type);
118        self
119    }
120
121    /// Set whether create the [`RotateLog`] structure if it does not exist.
122    pub fn create(mut self, create: bool) -> Self {
123        self.log_open_options = self.log_open_options.create(create);
124        self
125    }
126
127    /// Add an index function.
128    pub fn index(mut self, name: &'static str, func: fn(&[u8]) -> Vec<log::IndexOutput>) -> Self {
129        self.log_open_options = self.log_open_options.index(name, func);
130        self
131    }
132
133    /// Set the index definitions.
134    ///
135    /// See [`IndexDef`] for details.
136    pub fn index_defs(mut self, index_defs: Vec<IndexDef>) -> Self {
137        self.log_open_options = self.log_open_options.index_defs(index_defs);
138        self
139    }
140
141    /// Sets the flush filter function.
142    ///
143    /// The function will be called at [`RotateLog::sync`] time, if there are
144    /// changes since `open` (or last `sync`) time.
145    ///
146    /// The filter function can be used to avoid writing content that already
147    /// exists in the latest [`Log`], or rewrite content as needed.
148    pub fn flush_filter(mut self, flush_filter: Option<FlushFilterFunc>) -> Self {
149        self.log_open_options = self.log_open_options.flush_filter(flush_filter);
150        self
151    }
152
153    /// Call `sync` automatically if the in-memory buffer size has exceeded
154    /// the given size threshold.
155    ///
156    /// This is useful to make in-memory buffer size bounded.
157    pub fn auto_sync_threshold(mut self, threshold: impl Into<Option<u64>>) -> Self {
158        self.auto_sync_threshold = threshold.into();
159        self
160    }
161
162    /// Open [`RotateLog`] at given location.
163    pub fn open(&self, dir: impl AsRef<Path>) -> crate::Result<RotateLog> {
164        let dir = dir.as_ref();
165        let result: crate::Result<_> = (|| {
166            let reader_lock = ScopedDirLock::new_with_options(dir, &READER_LOCK_OPTS)?;
167            let change_detector = reader_lock.shared_change_detector()?;
168            let span = debug_span!("RotateLog::open", dir = &dir.to_string_lossy().as_ref());
169            let _guard = span.enter();
170
171            let latest_and_log = read_latest_and_logs(dir, self);
172
173            let (latest, logs) = match latest_and_log {
174                Ok((latest, logs)) => (latest, logs),
175                Err(e) => {
176                    if !self.log_open_options.create {
177                        return Err(e)
178                            .context("not creating new logs since OpenOption::create is not set");
179                    } else {
180                        utils::mkdir_p(dir)?;
181                        let lock = ScopedDirLock::new(dir)?;
182
183                        match read_latest_raw(dir) {
184                            Ok(latest) => {
185                                match read_logs(dir, self, latest) {
186                                    Ok(logs) => {
187                                        // Both latest and logs are read properly.
188                                        (latest, logs)
189                                    }
190                                    Err(err) => {
191                                        // latest is fine, but logs cannot be read.
192                                        // Try auto recover by creating an empty log.
193                                        let latest = latest.wrapping_add(1);
194                                        match create_empty_log(Some(dir), self, latest, &lock) {
195                                            Ok(new_log) => {
196                                                if let Ok(logs) = read_logs(dir, self, latest) {
197                                                    (latest, logs)
198                                                } else {
199                                                    (latest, vec![create_log_cell(new_log)])
200                                                }
201                                            }
202                                            Err(new_log_err) => {
203                                                let msg = "cannot create new empty log after failing to read existing logs";
204                                                return Err(new_log_err.message(msg).source(err));
205                                            }
206                                        }
207                                    }
208                                }
209                            }
210                            Err(err) => {
211                                if err.kind() == io::ErrorKind::NotFound {
212                                    // latest does not exist.
213                                    // Most likely, it is a new empty directory.
214                                    // Create an empty log and update latest.
215                                    let latest = 0;
216                                    let new_log = create_empty_log(Some(dir), self, latest, &lock)?;
217                                    (latest, vec![create_log_cell(new_log)])
218                                } else {
219                                    // latest cannot be read for other reasons.
220                                    //
221                                    // Mark as corrupted, if 'latest' contains a number that cannot be
222                                    // parsed.
223                                    let corrupted = err.kind() == io::ErrorKind::InvalidData;
224                                    let mut result = Err(err).context(dir, "cannot read 'latest'");
225                                    if corrupted {
226                                        result = result.corruption();
227                                    }
228                                    return result;
229                                }
230                            }
231                        }
232                    }
233                }
234            };
235
236            let logs_len = AtomicUsize::new(logs.len());
237            let mut rotate_log = RotateLog {
238                dir: Some(dir.into()),
239                open_options: self.clone(),
240                logs,
241                logs_len,
242                latest,
243                reader_lock: Some(reader_lock),
244                change_detector: Some(change_detector),
245                #[cfg(test)]
246                hook_after_log_sync: None,
247            };
248            rotate_log.update_change_detector_to_match_meta();
249            Ok(rotate_log)
250        })();
251
252        result.context(|| format!("in rotate::OpenOptions::open({:?})", dir))
253    }
254
255    /// Open an-empty [`RotateLog`] in memory. The [`RotateLog`] cannot [`RotateLog::sync`].
256    pub fn create_in_memory(&self) -> crate::Result<RotateLog> {
257        let result: crate::Result<_> = (|| {
258            let cell = create_log_cell(self.log_open_options.open(())?);
259            let mut logs = Vec::with_capacity(1);
260            logs.push(cell);
261            let logs_len = AtomicUsize::new(logs.len());
262            Ok(RotateLog {
263                dir: None,
264                open_options: self.clone(),
265                logs,
266                logs_len,
267                latest: 0,
268                reader_lock: None,
269                change_detector: None,
270                #[cfg(test)]
271                hook_after_log_sync: None,
272            })
273        })();
274        result.context("in rotate::OpenOptions::create_in_memory")
275    }
276
277    /// Try repair all logs in the specified directory.
278    ///
279    /// This just calls into [`log::OpenOptions::repair`] recursively.
280    pub fn repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
281        let dir = dir.as_ref();
282        (|| -> crate::Result<_> {
283            let _lock = ScopedDirLock::new(dir)?;
284
285            let mut message = RepairMessage::new(dir);
286            message += &format!("Processing RotateLog: {:?}\n", dir);
287            let read_dir = dir.read_dir().context(dir, "cannot readdir")?;
288            let mut ids = Vec::new();
289
290            for entry in read_dir {
291                let entry = entry.context(dir, "cannot readdir")?;
292                let name = entry.file_name();
293                if let Some(name) = name.to_str() {
294                    if let Ok(id) = name.parse::<u8>() {
295                        ids.push(id);
296                    }
297                }
298            }
299
300            ids.sort_unstable();
301            for &id in ids.iter() {
302                let name = id.to_string();
303                message += &format!("Attempt to repair log {:?}\n", name);
304                match self.log_open_options.repair(&dir.join(name)) {
305                    Ok(log) => message += &log,
306                    Err(err) => message += &format!("Failed: {}\n", err),
307                }
308            }
309
310            let latest_path = dir.join(LATEST_FILE);
311            match read_latest_raw(dir) {
312                Ok(latest) => message += &format!("Latest = {}\n", latest),
313                Err(err) => match err.kind() {
314                    io::ErrorKind::NotFound
315                    | io::ErrorKind::InvalidData
316                    | io::ErrorKind::UnexpectedEof => {
317                        let latest = guess_latest(ids);
318                        let content = format!("{}", latest);
319                        let fsync = false;
320                        utils::atomic_write(&latest_path, content, fsync)?;
321                        message += &format!("Reset latest to {}\n", latest);
322                    }
323                    _ => return Err(err).context(&latest_path, "cannot read or parse"),
324                },
325            };
326
327            Ok(message.into_string())
328        })()
329        .context(|| format!("in rotate::OpenOptions::repair({:?})", dir))
330    }
331}
332
333impl OpenOptionsRepair for OpenOptions {
334    fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
335        OpenOptions::repair(self, dir.as_ref())
336    }
337}
338
339impl OpenOptionsOutput for OpenOptions {
340    type Output = RotateLog;
341
342    fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
343        self.open(path)
344    }
345}
346
347impl fmt::Debug for OpenOptions {
348    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349        write!(f, "OpenOptions {{ ")?;
350        write!(f, "max_bytes_per_log: {}, ", self.max_bytes_per_log)?;
351        write!(f, "max_log_count: {}, ", self.max_log_count)?;
352        write!(f, "auto_sync_threshold: {:?}, ", self.auto_sync_threshold)?;
353        write!(f, "log_open_options: {:?} }}", &self.log_open_options)?;
354        Ok(())
355    }
356}
357
358impl RotateLog {
359    /// Append data to the writable [`Log`].
360    pub fn append(&mut self, data: impl AsRef<[u8]>) -> crate::Result<()> {
361        (|| -> crate::Result<_> {
362            let threshold = self.open_options.auto_sync_threshold;
363            let log = self.writable_log();
364            log.append(data)?;
365            if let Some(threshold) = threshold {
366                if log.mem_buf.len() as u64 >= threshold {
367                    self.sync()
368                        .context("sync triggered by auto_sync_threshold")?;
369                }
370            }
371            Ok(())
372        })()
373        .context("in RotateLog::append")
374    }
375
376    /// Look up an entry using the given index. The `index_id` is the index of
377    /// `index_defs` stored in [`OpenOptions`].
378    pub fn lookup(
379        &self,
380        index_id: usize,
381        key: impl Into<Bytes>,
382    ) -> crate::Result<RotateLogLookupIter> {
383        let key = key.into();
384        let result: crate::Result<_> = (|| {
385            Ok(RotateLogLookupIter {
386                inner_iter: self.logs[0].get().unwrap().lookup(index_id, &key)?,
387                end: false,
388                log_rotate: self,
389                log_index: 0,
390                index_id,
391                key: key.clone(),
392            })
393        })();
394        result
395            .context(|| format!("in RotateLog::lookup({}, {:?})", index_id, key.as_ref()))
396            .context(|| format!("  RotateLog.dir = {:?}", self.dir))
397    }
398
399    /// Convert a slice to [`Bytes`].
400    ///
401    /// Do not copy the slice if it's from the main on-disk buffer of
402    /// one of the loaded logs.
403    pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
404        for log in &self.logs {
405            if let Some(log) = log.get() {
406                if log.disk_buf.range_of_slice(slice).is_some() {
407                    return log.slice_to_bytes(slice);
408                }
409            }
410        }
411        Bytes::copy_from_slice(slice)
412    }
413
414    /// Look up an entry using the given index. The `index_id` is the index of
415    /// `index_defs` stored in [`OpenOptions`].
416    ///
417    /// Unlike [`RotateLog::lookup`], this function only checks the "latest"
418    /// (i.e. "writable") [`Log`] without checking others. It is useful to make
419    /// sure certain contents depending on other entries are inserted into
420    /// the same [`Log`].
421    ///
422    /// Practically, a `flush_filter` should also be used to make sure dependent
423    /// entries are stored in a same [`Log`]. So this function will panic if
424    /// `flush_filter` is not set on [`OpenOptions`].
425    pub fn lookup_latest(
426        &self,
427        index_id: usize,
428        key: impl AsRef<[u8]>,
429    ) -> crate::Result<log::LogLookupIter> {
430        let key = key.as_ref();
431        assert!(
432            self.open_options.log_open_options.flush_filter.is_some(),
433            "programming error: flush_filter should also be set"
434        );
435        self.logs[0]
436            .get()
437            .unwrap()
438            .lookup(index_id, key)
439            .context(|| format!("in RotateLog::lookup_latest({}, {:?})", index_id, key))
440            .context(|| format!("  RotateLog.dir = {:?}", self.dir))
441    }
442
443    /// Read latest data from disk. Write in-memory entries to disk.
444    ///
445    /// Return the index of the latest [`Log`].
446    ///
447    /// For in-memory [`RotateLog`], this function always returns 0.
448    pub fn sync(&mut self) -> crate::Result<u8> {
449        let result: crate::Result<_> = (|| {
450            let span = debug_span!("RotateLog::sync", latest = self.latest as u32);
451            if let Some(dir) = &self.dir {
452                span.record("dir", dir.to_string_lossy().as_ref());
453            }
454            let _guard = span.enter();
455
456            if self.dir.is_none() {
457                return Ok(0);
458            }
459
460            if self.writable_log().iter_dirty().next().is_none() {
461                // Read-only path, no need to take directory lock.
462                if let Ok(latest) = read_latest(self.dir.as_ref().unwrap()) {
463                    if latest != self.latest {
464                        // Latest changed. Re-load and write to the real latest Log.
465                        // PERF(minor): This can be smarter by avoiding reloading some logs.
466                        self.set_logs(read_logs(
467                            self.dir.as_ref().unwrap(),
468                            &self.open_options,
469                            latest,
470                        )?);
471                        self.latest = latest;
472                    }
473                    self.writable_log().sync()?;
474                } else {
475                    // If latest can not be read, do not error out.
476                    // This RotateLog can still be used to answer queries.
477                }
478            } else {
479                // Read-write path. Take the directory lock.
480                let dir = self.dir.clone().unwrap();
481                let lock = ScopedDirLock::new(&dir)?;
482
483                // Re-read latest, since it might have changed after taking the lock.
484                let latest = read_latest(self.dir.as_ref().unwrap())?;
485                if latest != self.latest {
486                    // Latest changed. Re-load and write to the real latest Log.
487                    //
488                    // This is needed because RotateLog assumes non-latest logs
489                    // are read-only. Other processes using RotateLog won't reload
490                    // non-latest logs automatically.
491
492                    // PERF(minor): This can be smarter by avoiding reloading some logs.
493                    let mut new_logs =
494                        read_logs(self.dir.as_ref().unwrap(), &self.open_options, latest)?;
495                    if let Some(filter) = self.open_options.log_open_options.flush_filter {
496                        let log = new_logs[0].get_mut().unwrap();
497                        for entry in self.writable_log().iter_dirty() {
498                            let content = entry?;
499                            let context = FlushFilterContext { log };
500                            match filter(&context, content).map_err(|err| {
501                                crate::Error::wrap(err, "failed to run filter function")
502                            })? {
503                                FlushFilterOutput::Drop => {}
504                                FlushFilterOutput::Keep => log.append(content)?,
505                                FlushFilterOutput::Replace(content) => log.append(content)?,
506                            }
507                        }
508                    } else {
509                        let log = new_logs[0].get_mut().unwrap();
510                        // Copy entries to new Logs.
511                        for entry in self.writable_log().iter_dirty() {
512                            let bytes = entry?;
513                            log.append(bytes)?;
514                        }
515                    }
516                    self.set_logs(new_logs);
517                    self.latest = latest;
518                }
519
520                let size = self.writable_log().flush()?;
521
522                #[cfg(test)]
523                if let Some(func) = self.hook_after_log_sync.as_ref() {
524                    func();
525                }
526
527                if size >= self.open_options.max_bytes_per_log {
528                    // `self.writable_log()` will be rotated (i.e., becomes immutable).
529                    // Make sure indexes are up-to-date so reading it would not require
530                    // building missing indexes in-memory.
531                    self.writable_log().finalize_indexes(&lock)?;
532                    self.rotate_internal(&lock)?;
533                }
534            }
535
536            self.update_change_detector_to_match_meta();
537            Ok(self.latest)
538        })();
539
540        result
541            .context("in RotateLog::sync")
542            .context(|| format!("  RotateLog.dir = {:?}", self.dir))
543    }
544
545    fn update_change_detector_to_match_meta(&mut self) {
546        let meta = &self.writable_log().meta;
547        let value = meta.primary_len ^ meta.epoch ^ ((self.latest as u64) << 56);
548        if let Some(detector) = &self.change_detector {
549            detector.set(value);
550        }
551    }
552
553    /// Attempt to remove outdated logs.
554    ///
555    /// Does nothing if the content of the 'latest' file has changed on disk,
556    /// which indicates rotation was triggered elsewhere, or the [`RotateLog`]
557    /// is in-memory.
558    pub fn remove_old_logs(&mut self) -> crate::Result<()> {
559        if let Some(dir) = &self.dir {
560            let lock = ScopedDirLock::new(dir)?;
561            let latest = read_latest(dir)?;
562            if latest == self.latest {
563                self.try_remove_old_logs(&lock);
564            }
565        }
566        Ok(())
567    }
568
569    /// Returns `true` if `sync` will load more data on disk.
570    ///
571    /// This function is optimized to be called frequently. It does not access
572    /// the filesystem directly, but communicate using a shared mmap buffer.
573    ///
574    /// This is not about testing buffered pending changes. To access buffered
575    /// pending changes, use [`RotateLog::iter_dirty`] instead.
576    pub fn is_changed_on_disk(&self) -> bool {
577        match &self.change_detector {
578            Some(detector) => detector.is_changed(),
579            None => false,
580        }
581    }
582
583    /// Force create a new [`Log`]. Bump latest.
584    ///
585    /// This function requires it's protected by a directory lock, and the
586    /// callsite makes sure that [`Log`]s are consistent (ex. up-to-date,
587    /// and do not have dirty entries in non-writable logs).
588    fn rotate_internal(&mut self, lock: &ScopedDirLock) -> crate::Result<()> {
589        let span = debug_span!("RotateLog::rotate", latest = self.latest as u32);
590        if let Some(dir) = &self.dir {
591            span.record("dir", dir.to_string_lossy().as_ref());
592        }
593        let _guard = span.enter();
594
595        // Create a new Log. Bump latest.
596        let next = self.latest.wrapping_add(1);
597        let log = create_empty_log(
598            Some(self.dir.as_ref().unwrap()),
599            &self.open_options,
600            next,
601            lock,
602        )?;
603        if self.logs.len() >= self.open_options.max_log_count as usize {
604            self.logs.pop();
605        }
606        self.logs.insert(0, create_log_cell(log));
607        self.logs_len = AtomicUsize::new(self.logs.len());
608        self.latest = next;
609        self.try_remove_old_logs(lock);
610        Ok(())
611    }
612
613    /// Renamed. Use [`RotateLog::sync`] instead.
614    pub fn flush(&mut self) -> crate::Result<u8> {
615        self.sync()
616    }
617
618    fn set_logs(&mut self, logs: Vec<OnceCell<Log>>) {
619        self.logs_len = AtomicUsize::new(logs.len());
620        self.logs = logs;
621    }
622
623    #[allow(clippy::nonminimal_bool)]
624    fn try_remove_old_logs(&self, _lock: &ScopedDirLock) {
625        if let Ok(read_dir) = self.dir.as_ref().unwrap().read_dir() {
626            let latest = self.latest;
627            let earliest = latest.wrapping_sub(self.open_options.max_log_count - 1);
628            for entry in read_dir {
629                if let Ok(entry) = entry {
630                    let name = entry.file_name();
631                    debug!("Inspecting {:?} for rotate log removal", name);
632                    if let Some(name) = name.to_str() {
633                        if let Ok(id) = name.parse::<u8>() {
634                            if (latest >= earliest && (id > latest || id < earliest))
635                                || (latest < earliest && (id > latest && id < earliest))
636                            {
637                                // Explicitly delete the `meta` file first. This marks
638                                // the log as "deleted" in an atomic way.
639                                //
640                                // Errors are not fatal. On Windows, this can fail if
641                                // other processes have files in entry.path() mmap-ed.
642                                // Newly opened or flushed RotateLog will unmap files.
643                                // New rotation would trigger remove_dir_all to try
644                                // remove old logs again.
645                                match fs::remove_file(entry.path().join(log::META_FILE)) {
646                                    Ok(()) => {}
647                                    Err(e) if e.kind() == io::ErrorKind::NotFound => {
648                                        // Meta file is already deleted.
649                                    }
650                                    Err(e) => {
651                                        // Don't delete the log if we were unable to delete the
652                                        // meta file.
653                                        debug!(
654                                            "Error removing rotate log meta: {:?} {:?}",
655                                            name, e
656                                        );
657                                        continue;
658                                    }
659                                }
660
661                                // Delete the rest of the directory.
662                                let res = fs::remove_dir_all(entry.path());
663                                match res {
664                                    Ok(_) => debug!("Removed rotate log: {:?}", name),
665                                    Err(err) => {
666                                        debug!("Error removing rotate log directory: {:?}", err)
667                                    }
668                                };
669                            } else {
670                                debug!(
671                                    "Not removing rotate log: {:?} (latest: {:?}, earliest: {:?})",
672                                    name, latest, earliest
673                                );
674                            }
675                        }
676                    }
677                }
678            }
679        }
680    }
681
682    /// Get the writable [`Log`].
683    fn writable_log(&mut self) -> &mut Log {
684        self.logs[0].get_mut().unwrap()
685    }
686
687    /// Lazily load a log. The 'latest' (or 'writable') log has index 0.
688    fn load_log(&self, index: usize) -> crate::Result<Option<&Log>> {
689        if index >= self.logs_len.load(SeqCst) {
690            return Ok(None);
691        }
692        match self.logs.get(index) {
693            Some(cell) => {
694                let id = self.latest.wrapping_sub(index as u8);
695                if let Some(dir) = &self.dir {
696                    let log = cell.get_or_try_init(|| {
697                        let mut open_options = self.open_options.log_open_options.clone();
698                        if index > 0 {
699                            open_options = open_options.with_zero_index_lag();
700                        }
701                        let log = load_log(dir, id, open_options);
702                        trace!(
703                            name = "RotateLog::load_log",
704                            index = index,
705                            success = log.is_ok()
706                        );
707                        log
708                    });
709                    match log {
710                        Ok(log) => Ok(Some(log)),
711                        Err(err) => {
712                            // Logically truncate self.logs. This avoids loading broken Logs again.
713                            self.logs_len.store(index, SeqCst);
714                            Err(err)
715                        }
716                    }
717                } else {
718                    Ok(cell.get())
719                }
720            }
721            None => unreachable!(),
722        }
723    }
724
725    /// Iterate over all the entries.
726    ///
727    /// The entries are returned in FIFO order.
728    pub fn iter(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
729        let logs = self.logs();
730        logs.into_iter().rev().flat_map(|log| log.iter())
731    }
732
733    /// Iterate over all dirty entries.
734    pub fn iter_dirty(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
735        self.logs[0].get().unwrap().iter_dirty()
736    }
737}
738
739/// Wrap `Log` in a `OnceCell`.
740fn create_log_cell(log: Log) -> OnceCell<Log> {
741    let cell = OnceCell::new();
742    cell.set(log)
743        .expect("cell is empty so cell.set cannot fail");
744    cell
745}
746
747/// Load a single log at the given location.
748fn load_log(dir: &Path, id: u8, open_options: log::OpenOptions) -> crate::Result<Log> {
749    let name = format!("{}", id);
750    let log_path = dir.join(name);
751    open_options.create(false).open(log_path)
752}
753
754/// Get access to internals of [`RotateLog`].
755///
756/// This can be useful when there are low-level needs. For example:
757/// - Get access to individual logs for things like range query.
758/// - Rotate logs manually.
759pub trait RotateLowLevelExt {
760    /// Get a view of all individual logs. Newest first.
761    fn logs(&self) -> Vec<&Log>;
762
763    /// Forced rotate. This can be useful as a quick way to ensure new
764    /// data can be written when data corruption happens.
765    ///
766    /// Data not written will get lost.
767    fn force_rotate(&mut self) -> crate::Result<()>;
768}
769
770impl RotateLowLevelExt for RotateLog {
771    fn logs(&self) -> Vec<&Log> {
772        (0..)
773            .map(|i| self.load_log(i))
774            .take_while(|res| match res {
775                Ok(Some(_)) => true,
776                _ => false,
777            })
778            .map(|res| res.unwrap().unwrap())
779            .collect()
780    }
781
782    fn force_rotate(&mut self) -> crate::Result<()> {
783        if self.dir.is_none() {
784            // rotate does not make sense for an in-memory RotateLog.
785            return Ok(());
786        }
787        // Read-write path. Take the directory lock.
788        let dir = self.dir.clone().unwrap();
789        let lock = ScopedDirLock::new(&dir)?;
790        self.latest = read_latest(self.dir.as_ref().unwrap())?;
791        self.rotate_internal(&lock)?;
792        self.set_logs(read_logs(
793            self.dir.as_ref().unwrap(),
794            &self.open_options,
795            self.latest,
796        )?);
797        Ok(())
798    }
799}
800
801/// Iterator over [`RotateLog`] entries selected by an index lookup.
802pub struct RotateLogLookupIter<'a> {
803    inner_iter: log::LogLookupIter<'a>,
804    end: bool,
805    log_rotate: &'a RotateLog,
806    log_index: usize,
807    index_id: usize,
808    key: Bytes,
809}
810
811impl<'a> RotateLogLookupIter<'a> {
812    fn load_next_log(&mut self) -> crate::Result<()> {
813        if self.log_index + 1 >= self.log_rotate.logs.len() {
814            self.end = true;
815            Ok(())
816        } else {
817            // Try the next log
818            self.log_index += 1;
819            match self.log_rotate.load_log(self.log_index) {
820                Ok(None) => {
821                    self.end = true;
822                    Ok(())
823                }
824                Err(_err) => {
825                    self.end = true;
826                    // Not fatal (since RotateLog is designed to be able
827                    // to drop data).
828                    Ok(())
829                }
830                Ok(Some(log)) => match log.lookup(self.index_id, &self.key) {
831                    Err(err) => {
832                        self.end = true;
833                        Err(err)
834                    }
835                    Ok(iter) => {
836                        self.inner_iter = iter;
837                        Ok(())
838                    }
839                },
840            }
841        }
842    }
843
844    /// Consume iterator, returning whether the iterator has any data.
845    pub fn is_empty(mut self) -> crate::Result<bool> {
846        while !self.end {
847            if !self.inner_iter.is_empty() {
848                return Ok(false);
849            }
850            self.load_next_log()?;
851        }
852        Ok(true)
853    }
854}
855
856impl<'a> Iterator for RotateLogLookupIter<'a> {
857    type Item = crate::Result<&'a [u8]>;
858
859    fn next(&mut self) -> Option<Self::Item> {
860        if self.end {
861            return None;
862        }
863        match self.inner_iter.next() {
864            None => {
865                if let Err(err) = self.load_next_log() {
866                    return Some(Err(err));
867                }
868
869                if self.end {
870                    return None;
871                }
872
873                self.next()
874            }
875            Some(Err(err)) => {
876                self.end = true;
877                Some(Err(err))
878            }
879            Some(Ok(slice)) => Some(Ok(slice)),
880        }
881    }
882}
883
884fn create_empty_log(
885    dir: Option<&Path>,
886    open_options: &OpenOptions,
887    latest: u8,
888    _lock: &ScopedDirLock,
889) -> crate::Result<Log> {
890    Ok(match dir {
891        Some(dir) => {
892            let latest_path = dir.join(LATEST_FILE);
893            let latest_str = format!("{}", latest);
894            let log_path = dir.join(&latest_str);
895            let opts = open_options.log_open_options.clone().create(true);
896            opts.delete_content(&log_path)?;
897            let log = opts.open(&log_path)?;
898            utils::atomic_write(latest_path, latest_str.as_bytes(), false)?;
899            log
900        }
901        None => open_options.log_open_options.clone().open(())?,
902    })
903}
904
905fn read_latest(dir: &Path) -> crate::Result<u8> {
906    read_latest_raw(dir).context(dir, "cannot read latest")
907}
908
909// Unlike read_latest, this function returns io::Result.
910fn read_latest_raw(dir: &Path) -> io::Result<u8> {
911    let latest_path = dir.join(LATEST_FILE);
912    let data = utils::atomic_read(&latest_path)?;
913    let content: String = String::from_utf8(data).map_err(|_e| {
914        io::Error::new(
915            io::ErrorKind::InvalidData,
916            format!("{:?}: failed to read as utf8 string", latest_path),
917        )
918    })?;
919    let id: u8 = content.parse().map_err(|_e| {
920        io::Error::new(
921            io::ErrorKind::InvalidData,
922            format!(
923                "{:?}: failed to parse {:?} as u8 integer",
924                latest_path, content
925            ),
926        )
927    })?;
928    Ok(id)
929}
930
931fn read_logs(
932    dir: &Path,
933    open_options: &OpenOptions,
934    latest: u8,
935) -> crate::Result<Vec<OnceCell<Log>>> {
936    let mut logs = Vec::with_capacity(open_options.max_log_count as usize);
937
938    // Make sure the first log (latest) can be loaded.
939    let log = load_log(dir, latest, open_options.log_open_options.clone())?;
940    logs.push(create_log_cell(log));
941
942    // Lazily load the rest of logs.
943    for index in 1..open_options.max_log_count {
944        let id = latest.wrapping_sub(index);
945        // Do a quick check about whether the log exists or not so we
946        // can avoid unnecessary `Log::open`.
947        let name = format!("{}", id);
948        let log_path = dir.join(&name);
949        if !log_path.is_dir() {
950            break;
951        }
952        logs.push(OnceCell::new());
953    }
954    trace!(
955        name = "RotateLog::read_logs",
956        max_log_count = open_options.max_log_count,
957        logs_len = logs.len()
958    );
959
960    Ok(logs)
961}
962
963fn read_latest_and_logs(
964    dir: &Path,
965    open_options: &OpenOptions,
966) -> crate::Result<(u8, Vec<OnceCell<Log>>)> {
967    let latest = read_latest(dir)?;
968    Ok((latest, read_logs(dir, open_options, latest)?))
969}
970
971/// Given a list of ids, guess a `latest`.
972fn guess_latest(mut ids: Vec<u8>) -> u8 {
973    // Guess a sensible `latest` from `ids`.
974    ids.sort_unstable();
975
976    let mut id_to_ignore = 255;
977    loop {
978        match ids.pop() {
979            Some(id) => {
980                // Remove 255, 254, at the end, since they might have been wrapped.
981                // For example, guess([0, 1, 2, 254, 255]) is 2.
982                if id == id_to_ignore {
983                    id_to_ignore -= 1;
984                    if id_to_ignore == 0 {
985                        // All 255 logs exist - rare.
986                        break 0;
987                    }
988                    continue;
989                } else {
990                    // This is probably the desirable id.
991                    // For example, guess([3, 4, 5]) is 5.
992                    break id;
993                }
994            }
995            None => {
996                // For example, guess([]) is 0.
997                break 0;
998            }
999        }
1000    }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005    use log::IndexOutput;
1006    use tempfile::tempdir;
1007
1008    use super::*;
1009
1010    #[test]
1011    fn test_open() {
1012        let dir = tempdir().unwrap();
1013        let path = dir.path().join("rotate");
1014
1015        assert!(OpenOptions::new().create(false).open(&path).is_err());
1016        assert!(OpenOptions::new().create(true).open(&path).is_ok());
1017        assert!(
1018            OpenOptions::new()
1019                .checksum_type(log::ChecksumType::Xxhash64)
1020                .create(false)
1021                .open(&path)
1022                .is_ok()
1023        );
1024    }
1025
1026    // lookup via index 0
1027    fn lookup<'a>(rotate: &'a RotateLog, key: &[u8]) -> Vec<&'a [u8]> {
1028        let values = rotate
1029            .lookup(0, key.to_vec())
1030            .unwrap()
1031            .collect::<crate::Result<Vec<&[u8]>>>()
1032            .unwrap();
1033        for value in &values {
1034            let b1 = rotate.slice_to_bytes(value);
1035            let b2 = rotate.slice_to_bytes(value);
1036            // Dirty entires cannot be zero-copied.
1037            if rotate
1038                .iter_dirty()
1039                .any(|i| i.unwrap().as_ptr() == value.as_ptr())
1040            {
1041                continue;
1042            }
1043            assert_eq!(
1044                b1.as_ptr(),
1045                b2.as_ptr(),
1046                "slice_to_bytes should return zero-copy"
1047            );
1048        }
1049        values
1050    }
1051
1052    fn iter(rotate: &RotateLog) -> Vec<&[u8]> {
1053        rotate
1054            .iter()
1055            .collect::<crate::Result<Vec<&[u8]>>>()
1056            .unwrap()
1057    }
1058
1059    #[test]
1060    fn test_trivial_append_lookup() {
1061        let dir = tempdir().unwrap();
1062        let opts = OpenOptions::new()
1063            .create(true)
1064            .index_defs(vec![IndexDef::new("two-bytes", |_| {
1065                vec![IndexOutput::Reference(0..2)]
1066            })]);
1067
1068        let rotate = opts.open(&dir).unwrap();
1069        let rotate_mem = opts.create_in_memory().unwrap();
1070
1071        for rotate in &mut [rotate, rotate_mem] {
1072            rotate.append(b"aaa").unwrap();
1073            rotate.append(b"abbb").unwrap();
1074            rotate.append(b"abc").unwrap();
1075
1076            assert_eq!(lookup(rotate, b"aa"), vec![b"aaa"]);
1077            assert_eq!(lookup(rotate, b"ab"), vec![&b"abc"[..], b"abbb"]);
1078            assert_eq!(lookup(rotate, b"ac"), Vec::<&[u8]>::new());
1079        }
1080    }
1081
1082    #[test]
1083    fn test_simple_rotate() {
1084        let dir = tempdir().unwrap();
1085        let mut rotate = OpenOptions::new()
1086            .create(true)
1087            .max_bytes_per_log(100)
1088            .max_log_count(2)
1089            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1090            .open(&dir)
1091            .unwrap();
1092
1093        // No rotate.
1094        rotate.append(b"a").unwrap();
1095        assert_eq!(rotate.sync().unwrap(), 0);
1096        rotate.append(b"a").unwrap();
1097        assert_eq!(rotate.sync().unwrap(), 0);
1098
1099        // Trigger rotate. "a" is still accessible.
1100        rotate.append(vec![b'b'; 100]).unwrap();
1101        assert_eq!(rotate.sync().unwrap(), 1);
1102        assert_eq!(lookup(&rotate, b"a").len(), 2);
1103
1104        // Trigger rotate again. Only new entries are accessible.
1105        // Older directories should be deleted automatically.
1106        rotate.append(vec![b'c'; 50]).unwrap();
1107        assert_eq!(rotate.sync().unwrap(), 1);
1108        rotate.append(vec![b'd'; 50]).unwrap();
1109        assert_eq!(rotate.sync().unwrap(), 2);
1110        assert_eq!(lookup(&rotate, b"a").len(), 0);
1111        assert_eq!(lookup(&rotate, b"b").len(), 0);
1112        assert_eq!(lookup(&rotate, b"c").len(), 1);
1113        assert_eq!(lookup(&rotate, b"d").len(), 1);
1114        assert!(!dir.path().join("0").exists());
1115    }
1116
1117    #[test]
1118    fn test_manual_remove_old_logs() {
1119        let dir = tempdir().unwrap();
1120        let dir = &dir;
1121        let open = |n: u8| -> RotateLog {
1122            OpenOptions::new()
1123                .create(true)
1124                .max_bytes_per_log(1)
1125                .max_log_count(n)
1126                .open(dir)
1127                .unwrap()
1128        };
1129        let read_all =
1130            |log: &RotateLog| -> Vec<Vec<u8>> { log.iter().map(|v| v.unwrap().to_vec()).collect() };
1131
1132        // Create 5 logs
1133        {
1134            let mut rotate = open(5);
1135            for i in 0..5 {
1136                rotate.append(vec![i]).unwrap();
1137                rotate.sync().unwrap();
1138            }
1139        }
1140
1141        // Content depends on max_log_count.
1142        {
1143            let rotate = open(4);
1144            assert_eq!(read_all(&rotate), [[2], [3], [4]]);
1145            let rotate = open(3);
1146            assert_eq!(read_all(&rotate), [[3], [4]]);
1147        }
1148
1149        // Remove old logs.
1150        {
1151            let mut rotate = open(3);
1152            rotate.remove_old_logs().unwrap();
1153        }
1154
1155        // Verify that [2] is removed.
1156        {
1157            let rotate = open(4);
1158            assert_eq!(read_all(&rotate), [[3], [4]]);
1159        }
1160    }
1161
1162    fn test_wrapping_rotate(max_log_count: u8) {
1163        let dir = tempdir().unwrap();
1164        let mut rotate = OpenOptions::new()
1165            .create(true)
1166            .max_bytes_per_log(10)
1167            .max_log_count(max_log_count)
1168            .open(&dir)
1169            .unwrap();
1170
1171        let count = || {
1172            fs::read_dir(&dir)
1173                .unwrap()
1174                .map(|entry| entry.unwrap().file_name().into_string().unwrap())
1175                // On Windows, the "lock" file was created by open_dir.
1176                .filter(|name| name != "lock" && name != "rlock")
1177                .count()
1178        };
1179
1180        for i in 1..=(max_log_count - 1) {
1181            rotate.append(b"abcdefghijklmn").unwrap();
1182            assert_eq!(rotate.sync().unwrap(), i);
1183            assert_eq!(count(), (i as usize) + 2);
1184        }
1185
1186        for i in max_log_count..=255 {
1187            rotate.append(b"abcdefghijklmn").unwrap();
1188            assert_eq!(rotate.sync().unwrap(), i);
1189            assert_eq!(count(), (max_log_count as usize) + 1);
1190        }
1191
1192        for _ in 0..=max_log_count {
1193            rotate.append(b"abcdefghijklmn").unwrap();
1194            assert_eq!(count(), (max_log_count as usize) + 1);
1195        }
1196    }
1197
1198    #[test]
1199    fn test_wrapping_rotate_10() {
1200        test_wrapping_rotate(10)
1201    }
1202
1203    #[test]
1204    fn test_wrapping_rotate_255() {
1205        test_wrapping_rotate(255)
1206    }
1207
1208    #[test]
1209    fn test_force_rotate() {
1210        let dir = tempdir().unwrap();
1211        let mut rotate = OpenOptions::new()
1212            .create(true)
1213            .max_bytes_per_log(1 << 30)
1214            .max_log_count(3)
1215            .open(&dir)
1216            .unwrap();
1217
1218        use super::RotateLowLevelExt;
1219        assert_eq!(rotate.logs().len(), 1);
1220        rotate.force_rotate().unwrap();
1221        assert_eq!(rotate.logs().len(), 2);
1222        rotate.force_rotate().unwrap();
1223        assert_eq!(rotate.logs().len(), 3);
1224        rotate.force_rotate().unwrap();
1225        assert_eq!(rotate.logs().len(), 3);
1226    }
1227
1228    #[test]
1229    fn test_lookup_rotated() {
1230        // Look up or iteration should work with rotated logs.
1231        let dir = tempdir().unwrap();
1232        let open_opts = OpenOptions::new()
1233            .create(true)
1234            .max_bytes_per_log(1)
1235            .max_log_count(3)
1236            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1237
1238        // Prepare test data
1239        let mut rotate1 = open_opts.open(&dir).unwrap();
1240        rotate1.append(b"a1").unwrap();
1241        assert_eq!(rotate1.sync().unwrap(), 1);
1242        rotate1.append(b"a2").unwrap();
1243        assert_eq!(rotate1.sync().unwrap(), 2);
1244
1245        // Warm up rotate1.
1246        assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1247        assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1248
1249        // rotate2 has lazy logs
1250        let rotate2 = open_opts.open(&dir).unwrap();
1251
1252        // Trigger rotate from another RotateLog
1253        let mut rotate3 = open_opts.open(&dir).unwrap();
1254        rotate3.append(b"a3").unwrap();
1255        assert_eq!(rotate3.sync().unwrap(), 3);
1256
1257        // rotate1 can still use its existing indexes even if "a1"
1258        // might have been deleted (on Unix).
1259        assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1260        assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1261
1262        // rotate2 does not have access to the deleted "a1".
1263        // (on Windows, 'meta' can be deleted, while mmap-ed files cannot)
1264        assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
1265        assert_eq!(iter(&rotate2), vec![b"a2"]);
1266    }
1267
1268    #[test]
1269    fn test_is_empty() -> crate::Result<()> {
1270        let dir = tempdir().unwrap();
1271        let open_opts = OpenOptions::new()
1272            .create(true)
1273            .max_bytes_per_log(2)
1274            .max_log_count(4)
1275            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1276
1277        let mut rotate = open_opts.open(&dir)?;
1278        rotate.append(b"a1")?;
1279        assert_eq!(rotate.sync()?, 1);
1280
1281        rotate.append(b"a2")?;
1282        assert_eq!(rotate.sync()?, 2);
1283
1284        rotate.append(b"b1")?;
1285        assert_eq!(rotate.sync()?, 3);
1286
1287        assert_eq!(lookup(&rotate, b"a"), vec![b"a2", b"a1"]);
1288        assert_eq!(lookup(&rotate, b"b"), vec![b"b1"]);
1289
1290        assert!(!rotate.lookup(0, b"a".to_vec())?.is_empty()?);
1291        assert!(!rotate.lookup(0, b"b".to_vec())?.is_empty()?);
1292        assert!(rotate.lookup(0, b"c".to_vec())?.is_empty()?);
1293
1294        Ok(())
1295    }
1296
1297    #[test]
1298    fn test_lookup_truncated_meta() {
1299        // Look up or iteration should work with rotated logs.
1300        let dir = tempdir().unwrap();
1301        let open_opts = OpenOptions::new()
1302            .create(true)
1303            .max_bytes_per_log(1)
1304            .max_log_count(3)
1305            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
1306
1307        // Prepare test data
1308        let mut rotate1 = open_opts.open(&dir).unwrap();
1309        rotate1.append(b"a1").unwrap();
1310        assert_eq!(rotate1.sync().unwrap(), 1);
1311        rotate1.append(b"a2").unwrap();
1312        assert_eq!(rotate1.sync().unwrap(), 2);
1313
1314        // Warm up rotate1
1315        assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1316        assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1317
1318        // rotate2 has lazy logs
1319        let rotate2 = open_opts.open(&dir).unwrap();
1320
1321        // Break logs by truncating "meta".
1322        utils::atomic_write(dir.path().join("0").join(log::META_FILE), "", false).unwrap();
1323
1324        // rotate1 can still use its existing indexes even if "a1"
1325        // might have been deleted (on Unix).
1326        assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
1327        assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
1328
1329        // rotate2 does not have access to the deleted "a1".
1330        assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
1331        assert_eq!(iter(&rotate2), vec![b"a2"]);
1332    }
1333
1334    #[test]
1335    fn test_concurrent_writes() {
1336        let dir = tempdir().unwrap();
1337        let mut rotate1 = OpenOptions::new()
1338            .create(true)
1339            .max_bytes_per_log(100)
1340            .max_log_count(2)
1341            .open(&dir)
1342            .unwrap();
1343        let mut rotate2 = OpenOptions::new()
1344            .max_bytes_per_log(100)
1345            .max_log_count(2)
1346            .open(&dir)
1347            .unwrap();
1348
1349        // rotate1 triggers a rotation.
1350        rotate1.append(vec![b'a'; 100]).unwrap();
1351        assert_eq!(rotate1.sync().unwrap(), 1);
1352
1353        let size = |log_index: u64| {
1354            dir.path()
1355                .join(format!("{}", log_index))
1356                .join(log::PRIMARY_FILE)
1357                .metadata()
1358                .unwrap()
1359                .len()
1360        };
1361
1362        let size1 = size(1);
1363
1364        // rotate2 writes to the right location ("1"), not "0";
1365        rotate2.append(vec![b'b'; 100]).unwrap();
1366        assert_eq!(rotate2.sync().unwrap(), 2);
1367
1368        #[cfg(unix)]
1369        {
1370            assert!(!dir.path().join("0").exists());
1371        }
1372        assert!(size(1) > size1 + 100);
1373        assert!(size(2) > 0);
1374    }
1375
1376    #[test]
1377    fn test_flush_filter() {
1378        let dir = tempdir().unwrap();
1379
1380        let read_log = |name: &str| -> Vec<Vec<u8>> {
1381            let log = Log::open(dir.path().join(name), Vec::new()).unwrap();
1382            log.iter().map(|v| v.unwrap().to_vec()).collect()
1383        };
1384
1385        let mut rotate1 = OpenOptions::new()
1386            .create(true)
1387            .max_bytes_per_log(100)
1388            .flush_filter(Some(|ctx, bytes| {
1389                // 'aa' is not inserted yet. It should not exist in the log.
1390                assert!(!ctx.log.iter().any(|x| x.unwrap() == b"aa"));
1391                Ok(match bytes.len() {
1392                    1 => FlushFilterOutput::Replace(b"xx".to_vec()),
1393                    _ => FlushFilterOutput::Keep,
1394                })
1395            }))
1396            .open(&dir)
1397            .unwrap();
1398
1399        let mut rotate2 = OpenOptions::new()
1400            .max_bytes_per_log(100)
1401            .open(&dir)
1402            .unwrap();
1403
1404        rotate2.append(vec![b'a'; 3]).unwrap();
1405        rotate2.sync().unwrap();
1406
1407        rotate1.append(vec![b'a'; 1]).unwrap(); // replaced to 'xx'
1408        rotate1.append(vec![b'a'; 2]).unwrap();
1409        assert_eq!(rotate1.sync().unwrap(), 0); // trigger flush filter by Log
1410        assert_eq!(read_log("0"), vec![&b"aaa"[..], b"xx", b"aa"]);
1411
1412        rotate1.append(vec![b'a'; 1]).unwrap(); // not replaced
1413        assert_eq!(rotate1.sync().unwrap(), 0); // do not trigger flush filter
1414        assert_eq!(read_log("0").last().unwrap(), b"a");
1415
1416        rotate1.append(vec![b'a'; 1]).unwrap(); // replaced to 'xx'
1417        rotate1.append(vec![b'a'; 2]).unwrap();
1418
1419        rotate2.append(vec![b'a'; 100]).unwrap(); // rotate
1420        assert_eq!(rotate2.sync().unwrap(), 1);
1421
1422        assert_eq!(rotate1.sync().unwrap(), 1); // trigger flush filter by RotateLog
1423        assert_eq!(read_log("1"), vec![b"xx", b"aa"]);
1424    }
1425
1426    #[test]
1427    fn test_is_changed_on_disk() {
1428        let dir = tempdir().unwrap();
1429        let open_opts = OpenOptions::new()
1430            .create(true)
1431            .max_bytes_per_log(5000)
1432            .max_log_count(2);
1433
1434        // Repeat a few times to trigger rotation.
1435        for _ in 0..10 {
1436            let mut rotate1 = open_opts.open(&dir).unwrap();
1437            let mut rotate2 = open_opts.open(&dir).unwrap();
1438
1439            assert!(!rotate1.is_changed_on_disk());
1440            assert!(!rotate2.is_changed_on_disk());
1441
1442            // no-op sync() does not set is_changed().
1443            rotate1.sync().unwrap();
1444            assert!(!rotate2.is_changed_on_disk());
1445
1446            // change before flush does not set is_changed().
1447            rotate1.append([b'a'; 1000]).unwrap();
1448
1449            assert!(!rotate1.is_changed_on_disk());
1450            assert!(!rotate2.is_changed_on_disk());
1451
1452            // sync() does not set is_changed().
1453            rotate1.sync().unwrap();
1454            assert!(!rotate1.is_changed_on_disk());
1455
1456            // rotate2 should be able to detect the on-disk change from rotate1.
1457            assert!(rotate2.is_changed_on_disk());
1458
1459            // is_changed() does not clear is_changed().
1460            assert!(rotate2.is_changed_on_disk());
1461
1462            // read-only sync() should clear is_changed().
1463            rotate2.sync().unwrap();
1464            assert!(!rotate2.is_changed_on_disk());
1465            // ... and not set other Logs' is_changed().
1466            assert!(!rotate1.is_changed_on_disk());
1467
1468            rotate2.append([b'a'; 1000]).unwrap();
1469            rotate2.sync().unwrap();
1470
1471            // rotate1 should be able to detect the on-disk change from rotate2.
1472            assert!(rotate1.is_changed_on_disk());
1473
1474            // read-write sync() should clear is_changed().
1475            rotate1.append([b'a'; 1000]).unwrap();
1476            rotate1.sync().unwrap();
1477            assert!(!rotate1.is_changed_on_disk());
1478        }
1479    }
1480
1481    #[test]
1482    fn test_lookup_latest() {
1483        let dir = tempdir().unwrap();
1484        let mut rotate = OpenOptions::new()
1485            .create(true)
1486            .max_bytes_per_log(100)
1487            .flush_filter(Some(|_, _| panic!()))
1488            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1489            .open(&dir)
1490            .unwrap();
1491
1492        rotate.append(vec![b'a'; 101]).unwrap();
1493        rotate.sync().unwrap(); // trigger rotate
1494        rotate.append(vec![b'b'; 10]).unwrap();
1495
1496        assert_eq!(rotate.lookup_latest(0, b"b").unwrap().count(), 1);
1497        assert_eq!(rotate.lookup_latest(0, b"a").unwrap().count(), 0);
1498
1499        rotate.append(vec![b'c'; 101]).unwrap();
1500        rotate.sync().unwrap(); // trigger rotate again
1501
1502        rotate.append(vec![b'd'; 10]).unwrap();
1503        rotate.sync().unwrap(); // not trigger rotate
1504        rotate.append(vec![b'e'; 10]).unwrap();
1505
1506        assert_eq!(rotate.lookup_latest(0, b"c").unwrap().count(), 0);
1507        assert_eq!(rotate.lookup_latest(0, b"d").unwrap().count(), 1);
1508        assert_eq!(rotate.lookup_latest(0, b"e").unwrap().count(), 1);
1509    }
1510
1511    #[test]
1512    #[should_panic]
1513    fn test_lookup_latest_panic() {
1514        let dir = tempdir().unwrap();
1515        let rotate = OpenOptions::new()
1516            .create(true)
1517            .index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
1518            .open(&dir)
1519            .unwrap();
1520        rotate.lookup_latest(0, b"a").unwrap(); // flush_filter is not set
1521    }
1522
1523    #[test]
1524    fn test_iter() {
1525        let dir = tempdir().unwrap();
1526        let mut rotate = OpenOptions::new()
1527            .create(true)
1528            .max_bytes_per_log(100)
1529            .open(&dir)
1530            .unwrap();
1531
1532        let a = vec![b'a'; 101];
1533        let b = vec![b'b'; 10];
1534
1535        rotate.append(a.clone()).unwrap();
1536        assert_eq!(
1537            rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
1538            vec![&a[..]]
1539        );
1540
1541        rotate.sync().unwrap(); // trigger rotate
1542        rotate.append(b.clone()).unwrap();
1543        rotate.append(a.clone()).unwrap();
1544        rotate.append(a.clone()).unwrap();
1545        assert_eq!(
1546            rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
1547            vec![&b[..], &a, &a]
1548        );
1549
1550        assert_eq!(
1551            rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
1552            vec![&a[..], &b, &a, &a],
1553        );
1554
1555        rotate.sync().unwrap(); // trigger rotate
1556        assert_eq!(
1557            rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
1558            vec![&b[..], &a, &a],
1559        );
1560    }
1561
1562    #[test]
1563    fn test_recover_from_empty_logs() {
1564        let dir = tempdir().unwrap();
1565        let rotate = OpenOptions::new().create(true).open(&dir).unwrap();
1566        drop(rotate);
1567
1568        // Delete all logs, but keep "latest".
1569        for dirent in fs::read_dir(&dir).unwrap() {
1570            let dirent = dirent.unwrap();
1571            let path = dirent.path();
1572            if path.is_dir() {
1573                fs::remove_dir_all(path).unwrap();
1574            }
1575        }
1576
1577        let _ = OpenOptions::new().create(true).open(&dir).unwrap();
1578    }
1579
1580    #[test]
1581    fn test_recover_from_occupied_logs() {
1582        let dir = tempdir().unwrap();
1583
1584        // Take the "1" spot.
1585        // Note: Cannot use "2" - it will be deleted by rotate -> try_remove_old_logs.
1586        {
1587            let mut log = log::OpenOptions::new()
1588                .create(true)
1589                .open(dir.path().join("1"))
1590                .unwrap();
1591            log.append(&[b'b'; 100][..]).unwrap();
1592            log.append(&[b'c'; 100][..]).unwrap();
1593            log.sync().unwrap();
1594        }
1595
1596        // Rotate to "1" and "2".
1597        let mut rotate = OpenOptions::new()
1598            .create(true)
1599            .max_bytes_per_log(100)
1600            .max_log_count(3)
1601            .open(&dir)
1602            .unwrap();
1603        for i in [1, 2] {
1604            rotate.append(vec![b'a'; 101]).unwrap();
1605            assert_eq!(rotate.sync().unwrap(), i); // trigger rotate
1606        }
1607
1608        // Content in the old "1" log should not appear here.
1609        assert_eq!(
1610            rotate.iter().map(|b| b.unwrap()[0]).collect::<Vec<_>>(),
1611            vec![b'a'; 2]
1612        );
1613    }
1614
1615    #[test]
1616    fn test_index_lag() {
1617        let dir = tempdir().unwrap();
1618        let opts = OpenOptions::new()
1619            .create(true)
1620            .index_defs(vec![
1621                IndexDef::new("idx", |_| vec![IndexOutput::Reference(0..2)])
1622                    .lag_threshold(u64::max_value()),
1623            ])
1624            .max_bytes_per_log(100)
1625            .max_log_count(3);
1626
1627        let size = |name: &str| dir.path().join(name).metadata().unwrap().len();
1628
1629        let mut rotate = opts.open(&dir).unwrap();
1630        rotate.append(vec![b'x'; 200]).unwrap();
1631        rotate.sync().unwrap();
1632        rotate.append(vec![b'y'; 200]).unwrap();
1633        rotate.sync().unwrap();
1634        rotate.append(vec![b'z'; 10]).unwrap();
1635        rotate.sync().unwrap();
1636
1637        // First 2 logs become immutable, indexes are written regardless of
1638        // lag_threshold.
1639        assert!(size("0/index2-idx") > 0);
1640        assert!(size("0/log") > 100);
1641
1642        assert!(size("1/index2-idx") > 0);
1643        assert!(size("1/log") > 100);
1644
1645        // The "current" log is still mutable. Its index respects lag_threshold,
1646        // and is logically empty (because side effect of delete_content, the
1647        // index has some bytes in it).
1648        assert_eq!(size("2/index2-idx"), 25);
1649        assert!(size("2/log") < 100);
1650    }
1651
1652    #[test]
1653    fn test_sync_missing_latest() {
1654        let dir = tempdir().unwrap();
1655        let opts = OpenOptions::new()
1656            .max_bytes_per_log(10000)
1657            .max_log_count(10);
1658        let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1659        rotate.append(vec![b'x'; 200]).unwrap();
1660        rotate.sync().unwrap();
1661
1662        let mut rotate2 = opts.open(&dir).unwrap();
1663        fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
1664        rotate2.sync().unwrap(); // not a failure
1665        rotate2.append(vec![b'y'; 200]).unwrap();
1666        rotate2.sync().unwrap_err(); // a failure
1667    }
1668
1669    #[test]
1670    fn test_auto_sync_threshold() {
1671        let dir = tempdir().unwrap();
1672        let opts = OpenOptions::new().auto_sync_threshold(100).create(true);
1673
1674        let mut rotate = opts.create(true).open(&dir).unwrap();
1675        rotate.append(vec![b'x'; 50]).unwrap();
1676        assert_eq!(rotate.logs()[0].iter_dirty().count(), 1);
1677        rotate.append(vec![b'x'; 50]).unwrap(); // trigger sync
1678        assert_eq!(rotate.logs()[0].iter_dirty().count(), 0);
1679    }
1680
1681    #[test]
1682    fn test_auto_sync_threshold_with_racy_index_update_on_open() {
1683        fn index_defs(lag_threshold: u64) -> Vec<IndexDef> {
1684            let index_names = ["a"];
1685            (0..index_names.len())
1686                .map(|i| {
1687                    IndexDef::new(index_names[i], |_| vec![IndexOutput::Reference(0..1)])
1688                        .lag_threshold(lag_threshold)
1689                })
1690                .collect()
1691        }
1692
1693        fn open_opts(lag_threshold: u64) -> OpenOptions {
1694            let index_defs = index_defs(lag_threshold);
1695            OpenOptions::new()
1696                .auto_sync_threshold(1000)
1697                .max_bytes_per_log(400)
1698                .max_log_count(10)
1699                .create(true)
1700                .index_defs(index_defs)
1701        }
1702
1703        let dir = tempdir().unwrap();
1704        let path = dir.path();
1705        let data: &[u8] = &[b'x'; 100];
1706        let n = 10;
1707        for _i in 0..n {
1708            let mut rotate1 = open_opts(300).open(path).unwrap();
1709            rotate1.hook_after_log_sync = Some({
1710                let path = path.to_path_buf();
1711                Box::new(move || {
1712                    // This might updating indexes (see D20042046 and D20286509).
1713                    let rotate2 = open_opts(100).open(&path).unwrap();
1714                    // Force loading "lazy" indexes.
1715                    let _all = rotate2.iter().collect::<Result<Vec<_>, _>>().unwrap();
1716                })
1717            });
1718            rotate1.append(data).unwrap();
1719            rotate1.sync().unwrap();
1720        }
1721
1722        // Verify that data can be read through index.
1723        let rotate1 = open_opts(300).open(path).unwrap();
1724        let mut count = 0;
1725        for entry in rotate1.lookup(0, b"x" as &[u8]).unwrap() {
1726            let entry = entry.unwrap();
1727            assert_eq!(entry, data);
1728            count += 1;
1729        }
1730        assert_eq!(count, n);
1731    }
1732
1733    #[test]
1734    fn test_reindex_old_logs() {
1735        let dir = tempdir().unwrap();
1736        let opts = OpenOptions::new()
1737            .max_bytes_per_log(10)
1738            .max_log_count(10)
1739            .create(true);
1740
1741        let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1742        for i in 0..2u8 {
1743            rotate.append(vec![i; 50]).unwrap();
1744            rotate.sync().unwrap(); // rotate
1745        }
1746
1747        // New OpenOptions: With indexes.
1748        let opts = opts.index("a", |_data| vec![IndexOutput::Reference(0..1)]);
1749
1750        // Triggers rebuilding indexes.
1751        let rotate = opts.create(true).open(&dir).unwrap();
1752
1753        // Because older log is lazy. It hasn't been loaded yet. So it does not have the index.
1754        assert!(!dir.path().join("1/index2-a").exists());
1755        assert!(!dir.path().join("0/index2-a").exists());
1756
1757        // Do an index lookup. This will trigger loading old logs.
1758        let mut iter = rotate.lookup(0, b"\x00".to_vec()).unwrap();
1759
1760        // The iterator is lazy. So it does not build the index immediately.
1761        assert!(!dir.path().join("1/index2-a").exists());
1762
1763        // Iterate through all logs.
1764        assert_eq!(iter.next().unwrap().unwrap(), &[0; 50][..]);
1765
1766        // Now the index is built for older logs.
1767        assert!(dir.path().join("1/index2-a").exists());
1768        assert!(dir.path().join("0/index2-a").exists());
1769    }
1770
1771    #[test]
1772    fn test_repair_latest() {
1773        assert_eq!(guess_latest(vec![]), 0);
1774        assert_eq!(guess_latest(vec![3, 4, 5]), 5);
1775        assert_eq!(guess_latest(vec![0, 1, 2, 254, 255]), 2);
1776        assert_eq!(guess_latest((0..=255).collect::<Vec<_>>()), 0);
1777
1778        let dir = tempdir().unwrap();
1779        let opts = OpenOptions::new().max_bytes_per_log(100).max_log_count(10);
1780        let mut rotate = opts.clone().create(true).open(&dir).unwrap();
1781        for i in 1..=2 {
1782            rotate.append(vec![b'x'; 200]).unwrap();
1783            assert_eq!(rotate.sync().unwrap(), i);
1784        }
1785
1786        // Corrupt "latest".
1787        let latest_path = dir.path().join(LATEST_FILE);
1788        utils::atomic_write(latest_path, "NaN", false).unwrap();
1789        assert!(opts.open(&dir).is_err());
1790        assert_eq!(
1791            opts.repair(&dir)
1792                .unwrap()
1793                .lines()
1794                .filter(|l| !l.contains("Processing"))
1795                .collect::<Vec<_>>()
1796                .join("\n"),
1797            r#"Attempt to repair log "0"
1798Verified 1 entries, 223 bytes in log
1799Attempt to repair log "1"
1800Verified 1 entries, 223 bytes in log
1801Attempt to repair log "2"
1802Verified 0 entries, 12 bytes in log
1803Reset latest to 2"#
1804        );
1805        opts.open(&dir).unwrap();
1806
1807        // Delete "latest".
1808        fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
1809        assert!(opts.open(&dir).is_err());
1810
1811        // Repair can fix it.
1812        assert_eq!(
1813            opts.repair(&dir)
1814                .unwrap()
1815                .lines()
1816                .filter(|l| !l.contains("Processing"))
1817                .collect::<Vec<_>>()
1818                .join("\n"),
1819            r#"Attempt to repair log "0"
1820Verified 1 entries, 223 bytes in log
1821Attempt to repair log "1"
1822Verified 1 entries, 223 bytes in log
1823Attempt to repair log "2"
1824Verified 0 entries, 12 bytes in log
1825Reset latest to 2"#
1826        );
1827        opts.open(&dir).unwrap();
1828    }
1829
1830    #[test]
1831    fn test_load_broken_logs_once() {
1832        let dir = tempdir().unwrap();
1833        let open_opts = OpenOptions::new()
1834            .create(true)
1835            .max_log_count(10)
1836            .max_bytes_per_log(100);
1837        let mut log = open_opts.open(dir.path()).unwrap();
1838
1839        // Create 0, 1, 2, 3 logs
1840        for i in 0..4 {
1841            log.append(&[i; 200][..]).unwrap();
1842            log.sync().unwrap();
1843        }
1844
1845        // Break 1/
1846        utils::atomic_write(dir.path().join("1").join("meta"), "foo", false).unwrap();
1847        let log = open_opts.open(dir.path()).unwrap();
1848
1849        // The broken log should only be loaded once.
1850        assert!(log.load_log(3).is_err()); // Reports error loading the broken Log.
1851        assert!(log.load_log(3).is_ok()); // The error is "cached" - not loading the Log again.
1852
1853        // Logs iteration will only have 2, no 0 or 1.
1854        assert_eq!(
1855            log.iter().map(|i| i.unwrap()[0]).collect::<Vec<_>>(),
1856            [2, 3]
1857        );
1858    }
1859
1860    #[test]
1861    fn test_multithread_sync() {
1862        let dir = tempdir().unwrap();
1863
1864        // Release mode runs much faster.
1865        const THREAD_COUNT: u8 = if cfg!(debug_assertions) { 10 } else { 30 };
1866        const WRITE_COUNT_PER_THREAD: u8 = if cfg!(debug_assertions) { 10 } else { 50 };
1867
1868        // Some indexes. They have different lag_threshold.
1869        fn index_ref(data: &[u8]) -> Vec<IndexOutput> {
1870            vec![IndexOutput::Reference(0..data.len() as u64)]
1871        }
1872        fn index_copy(data: &[u8]) -> Vec<IndexOutput> {
1873            vec![IndexOutput::Owned(data.to_vec().into_boxed_slice())]
1874        }
1875        let indexes = vec![
1876            IndexDef::new("key1", index_ref).lag_threshold(1),
1877            IndexDef::new("key2", index_ref).lag_threshold(50),
1878            IndexDef::new("key3", index_ref).lag_threshold(1000),
1879            IndexDef::new("key4", index_copy).lag_threshold(1),
1880            IndexDef::new("key5", index_copy).lag_threshold(50),
1881            IndexDef::new("key6", index_copy).lag_threshold(1000),
1882        ];
1883        let index_len = indexes.len();
1884        let open_opts = OpenOptions::new()
1885            .create(true)
1886            .max_log_count(200)
1887            .max_bytes_per_log(200)
1888            .index_defs(indexes);
1889
1890        use std::sync::Arc;
1891        use std::sync::Barrier;
1892        let barrier = Arc::new(Barrier::new(THREAD_COUNT as usize));
1893        let threads: Vec<_> = (0..THREAD_COUNT)
1894            .map(|i| {
1895                let barrier = barrier.clone();
1896                let open_opts = open_opts.clone();
1897                let path = dir.path().to_path_buf();
1898                std::thread::spawn(move || {
1899                    barrier.wait();
1900                    let mut log = open_opts.open(path).unwrap();
1901                    for j in 1..=WRITE_COUNT_PER_THREAD {
1902                        let buf = [i, j];
1903                        log.append(buf).unwrap();
1904                        if j % (i + 1) == 0 || j == WRITE_COUNT_PER_THREAD {
1905                            log.sync().unwrap();
1906                            // Verify that the indexes match the entries.
1907                            for entry in log.iter().map(|d| d.unwrap()) {
1908                                for index_id in 0..index_len {
1909                                    for index_value in log.lookup(index_id, entry.to_vec()).unwrap()
1910                                    {
1911                                        assert_eq!(index_value.unwrap(), entry);
1912                                    }
1913                                }
1914                            }
1915                        }
1916                    }
1917                })
1918            })
1919            .collect();
1920
1921        // Wait for them.
1922        for thread in threads {
1923            thread.join().expect("joined");
1924        }
1925
1926        // Check how many entries were written.
1927        let log = open_opts.open(dir.path()).unwrap();
1928        let count = log.iter().count() as u64;
1929        assert_eq!(count, THREAD_COUNT as u64 * WRITE_COUNT_PER_THREAD as u64);
1930    }
1931}