spacetimedb_commitlog/
commitlog.rs

1use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7    commit::StoredCommit,
8    error,
9    payload::Decoder,
10    repo::{self, Repo},
11    segment::{self, FileLike, Transaction, Writer},
12    Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
13};
14
15pub use crate::segment::Committed;
16
17/// A commitlog generic over the storage backend as well as the type of records
18/// its [`Commit`]s contain.
19#[derive(Debug)]
20pub struct Generic<R: Repo, T> {
21    /// The storage backend.
22    pub(crate) repo: R,
23    /// The segment currently being written to.
24    ///
25    /// If we squint, all segments in a log are a non-empty linked list, the
26    /// head of which is the segment open for writing.
27    pub(crate) head: Writer<R::SegmentWriter>,
28    /// The tail of the non-empty list of segments.
29    ///
30    /// We only retain the min transaction offset of each, from which the
31    /// segments can be opened for reading when needed.
32    ///
33    /// This is a `Vec`, not a linked list, so the last element is the newest
34    /// segment (after `head`).
35    tail: Vec<u64>,
36    /// Configuration options.
37    opts: Options,
38    /// Type of a single record in this log's [`Commit::records`].
39    _record: PhantomData<T>,
40    /// Tracks panics/errors to control what happens on drop.
41    ///
42    /// Set to `true` before any I/O operation, and back to `false` after it
43    /// succeeded. This way, we won't try to perform I/O on drop when it is
44    /// unlikely to succeed, or even has a chance to panic.
45    panicked: bool,
46}
47
48impl<R: Repo, T> Generic<R, T> {
49    pub fn open(repo: R, opts: Options) -> io::Result<Self> {
50        let mut tail = repo.existing_offsets()?;
51        if !tail.is_empty() {
52            debug!("segments: {tail:?}");
53        }
54        let head = if let Some(last) = tail.pop() {
55            debug!("resuming last segment: {last}");
56            repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
57                tail.push(meta.tx_range.start);
58                repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
59            })?
60        } else {
61            debug!("starting fresh log");
62            repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
63        };
64
65        Ok(Self {
66            repo,
67            head,
68            tail,
69            opts,
70            _record: PhantomData,
71            panicked: false,
72        })
73    }
74
75    /// Get the current epoch.
76    ///
77    /// See also: [`Commit::epoch`].
78    pub fn epoch(&self) -> u64 {
79        self.head.commit.epoch
80    }
81
82    /// Update the current epoch.
83    ///
84    /// Calls [`Self::commit`] to flush all data of the previous epoch, and
85    /// returns the result.
86    ///
87    /// Does nothing if the given `epoch` is equal to the current epoch.
88    ///
89    /// # Errors
90    ///
91    /// If `epoch` is smaller than the current epoch, an error of kind
92    /// [`io::ErrorKind::InvalidInput`] is returned.
93    ///
94    /// Also see [`Self::commit`].
95    pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
96        use std::cmp::Ordering::*;
97
98        match epoch.cmp(&self.head.epoch()) {
99            Less => Err(io::Error::new(
100                io::ErrorKind::InvalidInput,
101                "new epoch is smaller than current epoch",
102            )),
103            Equal => Ok(None),
104            Greater => {
105                let res = self.commit()?;
106                self.head.set_epoch(epoch);
107                Ok(res)
108            }
109        }
110    }
111
112    /// Write the currently buffered data to storage and rotate segments as
113    /// necessary.
114    ///
115    /// Note that this does not imply that the data is durable, in particular
116    /// when a filesystem storage backend is used. Call [`Self::sync`] to flush
117    /// any OS buffers to stable storage.
118    ///
119    /// # Errors
120    ///
121    /// If an error occurs writing the data, the current [`Commit`] buffer is
122    /// retained, but a new segment is created. Retrying in case of an `Err`
123    /// return value thus will write the current data to that new segment.
124    ///
125    /// If this fails, however, the next attempt to create a new segment will
126    /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind
127    /// this means that something is seriously wrong underlying storage, and the
128    /// caller should stop writing to the log.
129    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
130        self.panicked = true;
131        let writer = &mut self.head;
132        let sz = writer.commit.encoded_len();
133        // If the segment is empty, but the commit exceeds the max size,
134        // we got a huge commit which needs to be written even if that
135        // results in a huge segment.
136        let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
137        let writer = if should_rotate {
138            self.sync();
139            self.start_new_segment()?
140        } else {
141            writer
142        };
143
144        let ret = writer.commit().or_else(|e| {
145            warn!("Commit failed: {e}");
146            // Nb.: Don't risk a panic by calling `self.sync()`.
147            // We already gave up on the last commit, and will retry it next time.
148            self.start_new_segment()?;
149            Err(e)
150        });
151        self.panicked = false;
152        ret
153    }
154
155    /// Force the currently active segment to be flushed to storage.
156    ///
157    /// Using a filesystem backend, this means to call `fsync(2)`.
158    ///
159    /// # Panics
160    ///
161    /// As an `fsync` failure leaves a file in a more of less undefined state,
162    /// this method panics in this case, thereby preventing any further writes
163    /// to the log and forcing the user to re-read the state from disk.
164    pub fn sync(&mut self) {
165        self.panicked = true;
166        if let Err(e) = self.head.fsync() {
167            panic!("Failed to fsync segment: {e}");
168        }
169        self.panicked = false;
170    }
171
172    /// The last transaction offset written to disk, or `None` if nothing has
173    /// been written yet.
174    ///
175    /// Note that this does not imply durability: [`Self::sync`] may not have
176    /// been called at this offset.
177    pub fn max_committed_offset(&self) -> Option<u64> {
178        // Naming is hard: the segment's `next_tx_offset` indicates how many
179        // txs are already in the log (it's the next commit's min-tx-offset).
180        // If the value is zero, however, the initial commit hasn't been
181        // committed yet.
182        self.head.next_tx_offset().checked_sub(1)
183    }
184
185    // Helper to obtain a list of the segment offsets which include transaction
186    // offset `offset`.
187    //
188    // The returned `Vec` is sorted in **ascending** order, such that the first
189    // element is the segment which contains `offset`.
190    //
191    // The offset of `self.head` is always included, regardless of how many
192    // entries it actually contains.
193    fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
194        if offset >= self.head.min_tx_offset {
195            vec![self.head.min_tx_offset]
196        } else {
197            let mut offs = Vec::with_capacity(self.tail.len() + 1);
198            if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
199                offs.extend_from_slice(&self.tail[pos..]);
200                offs.push(self.head.min_tx_offset);
201            }
202
203            offs
204        }
205    }
206
207    pub fn commits_from(&self, offset: u64) -> Commits<R> {
208        let offsets = self.segment_offsets_from(offset);
209        let segments = Segments {
210            offs: offsets.into_iter(),
211            repo: self.repo.clone(),
212            max_log_format_version: self.opts.log_format_version,
213        };
214        Commits {
215            inner: None,
216            segments,
217            last_commit: CommitInfo::Initial { next_offset: offset },
218            last_error: None,
219        }
220    }
221
222    pub fn reset(mut self) -> io::Result<Self> {
223        info!("hard reset");
224
225        self.panicked = true;
226        self.tail.reserve(1);
227        self.tail.push(self.head.min_tx_offset);
228        for segment in self.tail.iter().rev() {
229            debug!("removing segment {segment}");
230            self.repo.remove_segment(*segment)?;
231        }
232        // Prevent finalizer from running by not updating self.panicked.
233
234        Self::open(self.repo.clone(), self.opts)
235    }
236
237    pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
238        info!("reset to {offset}");
239
240        self.panicked = true;
241        self.tail.reserve(1);
242        self.tail.push(self.head.min_tx_offset);
243        reset_to_internal(&self.repo, &self.tail, offset)?;
244        // Prevent finalizer from running by not updating self.panicked.
245
246        Self::open(self.repo.clone(), self.opts)
247    }
248
249    /// Start a new segment, preserving the current head's `Commit`.
250    ///
251    /// The caller must ensure that the current head is synced to disk as
252    /// appropriate. It is not appropriate to sync after a write error, as that
253    /// is likely to return an error as well: the `Commit` will be written to
254    /// the new segment anyway.
255    fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::SegmentWriter>> {
256        debug!(
257            "starting new segment offset={} prev-offset={}",
258            self.head.next_tx_offset(),
259            self.head.min_tx_offset()
260        );
261        let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
262        let old = mem::replace(&mut self.head, new);
263        self.tail.push(old.min_tx_offset());
264        self.head.commit = old.commit;
265
266        Ok(&mut self.head)
267    }
268}
269
270impl<R: Repo, T: Encode> Generic<R, T> {
271    pub fn append(&mut self, record: T) -> Result<(), T> {
272        self.head.append(record)
273    }
274
275    pub fn transactions_from<'a, D>(
276        &self,
277        offset: u64,
278        decoder: &'a D,
279    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
280    where
281        D: Decoder<Record = T>,
282        D::Error: From<error::Traversal>,
283        R: 'a,
284        T: 'a,
285    {
286        transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
287    }
288
289    pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
290    where
291        D: Decoder,
292        D::Error: From<error::Traversal>,
293    {
294        fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
295    }
296}
297
298impl<R: Repo, T> Drop for Generic<R, T> {
299    fn drop(&mut self) {
300        if !self.panicked {
301            if let Err(e) = self.head.commit() {
302                warn!("failed to commit on drop: {e}");
303            }
304        }
305    }
306}
307
308/// Extract the most recently written [`segment::Metadata`] from the commitlog
309/// in `repo`.
310///
311/// Returns `None` if the commitlog is empty.
312///
313/// Note that this function validates the most recent segment, which entails
314/// traversing it from the start.
315///
316/// The function can be used instead of the pattern:
317///
318/// ```ignore
319/// let log = Commitlog::open(..)?;
320/// let max_offset = log.max_committed_offset();
321/// ```
322///
323/// like so:
324///
325/// ```ignore
326/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
327/// ```
328///
329/// Unlike `open`, no segment will be created in an empty `repo`.
330pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
331    let Some(last) = repo.existing_offsets()?.pop() else {
332        return Ok(None);
333    };
334
335    let mut storage = repo.open_segment_reader(last)?;
336    let offset_index = repo.get_offset_index(last).ok();
337    segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
338}
339
340pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
341    let mut offsets = repo.existing_offsets()?;
342    if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
343        offsets = offsets.split_off(pos);
344    }
345    let segments = Segments {
346        offs: offsets.into_iter(),
347        repo,
348        max_log_format_version,
349    };
350    Ok(Commits {
351        inner: None,
352        segments,
353        last_commit: CommitInfo::Initial { next_offset: offset },
354        last_error: None,
355    })
356}
357
358pub fn transactions_from<'a, R, D, T>(
359    repo: R,
360    max_log_format_version: u8,
361    offset: u64,
362    de: &'a D,
363) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
364where
365    R: Repo + 'a,
366    D: Decoder<Record = T>,
367    D::Error: From<error::Traversal>,
368    T: 'a,
369{
370    commits_from(repo, max_log_format_version, offset)
371        .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
372}
373
374pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
375where
376    R: Repo,
377    D: Decoder,
378    D::Error: From<error::Traversal> + From<io::Error>,
379{
380    let commits = commits_from(repo, max_log_format_version, offset)?;
381    fold_transactions_internal(commits.with_log_format_version(), de, offset)
382}
383
384fn transactions_from_internal<'a, R, D, T>(
385    commits: CommitsWithVersion<R>,
386    offset: u64,
387    de: &'a D,
388) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
389where
390    R: Repo + 'a,
391    D: Decoder<Record = T>,
392    D::Error: From<error::Traversal>,
393    T: 'a,
394{
395    commits
396        .map(|x| x.map_err(D::Error::from))
397        .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
398        .flatten_ok()
399        .map(|x| x.and_then(|y| y))
400}
401
402fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
403where
404    R: Repo,
405    D: Decoder,
406    D::Error: From<error::Traversal>,
407{
408    while let Some(commit) = commits.next() {
409        let (version, commit) = match commit {
410            Ok(version_and_commit) => version_and_commit,
411            Err(e) => {
412                // Ignore it if the very last commit in the log is broken.
413                // The next `append` will fix the log, but the `decoder`
414                // has no way to tell whether we're at the end or not.
415                // This is unlike the consumer of an iterator, which can
416                // perform below check itself.
417                if commits.next().is_none() {
418                    return Ok(());
419                }
420
421                return Err(e.into());
422            }
423        };
424        trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
425
426        let max_tx_offset = commit.min_tx_offset + commit.n as u64;
427        if max_tx_offset <= from {
428            continue;
429        }
430
431        let records = &mut commit.records.as_slice();
432        for n in 0..commit.n {
433            let tx_offset = commit.min_tx_offset + n as u64;
434            if tx_offset < from {
435                de.skip_record(version, tx_offset, records)?;
436            } else {
437                de.consume_record(version, tx_offset, records)?;
438            }
439        }
440    }
441
442    Ok(())
443}
444
445/// Remove all data past the given transaction `offset`.
446///
447/// The function deletes log segments starting from the newest. As multiple
448/// segments cannot be deleted atomically, the log may be left longer than
449/// `offset` if the function does not return successfully.
450///
451/// If the function returns successfully, the most recent [`Commit`] in the
452/// log will contain the transaction at `offset`.
453///
454/// The log must be re-opened if it is to be used after calling this function.
455pub fn reset_to(repo: &impl Repo, offset: u64) -> io::Result<()> {
456    let segments = repo.existing_offsets()?;
457    reset_to_internal(repo, &segments, offset)
458}
459
460fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Result<()> {
461    for segment in segments.iter().copied().rev() {
462        if segment > offset {
463            // Segment is outside the offset, so remove it wholesale.
464            debug!("removing segment {segment}");
465            repo.remove_segment(segment)?;
466        } else {
467            // Read commit-wise until we find the byte offset.
468            let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
469
470            let (index_file, mut byte_offset) = repo
471                .get_offset_index(segment)
472                .and_then(|index_file| {
473                    let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| {
474                        io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}"))
475                    })?;
476
477                    reader.seek_to_offset(&index_file, key).map_err(|e| {
478                        io::Error::new(
479                            io::ErrorKind::InvalidData,
480                            format!("Offset index is not used at offset {key}: {e}"),
481                        )
482                    })?;
483
484                    Ok((Some(index_file), byte_offset))
485                })
486                .inspect_err(|e| {
487                    warn!("commitlog offset index is not used: {e:?}");
488                })
489                .unwrap_or((None, segment::Header::LEN as u64));
490
491            let commits = reader.commits();
492
493            for commit in commits {
494                let commit = commit?;
495                if commit.min_tx_offset > offset {
496                    break;
497                }
498                byte_offset += Commit::from(commit).encoded_len() as u64;
499            }
500
501            if byte_offset == segment::Header::LEN as u64 {
502                // Segment is empty, just remove it.
503                repo.remove_segment(segment)?;
504            } else {
505                debug!("truncating segment {segment} to {offset} at {byte_offset}");
506                let mut file = repo.open_segment_writer(segment)?;
507
508                if let Some(mut index_file) = index_file {
509                    let index_file = index_file.as_mut();
510                    // Note: The offset index truncates equal or greater,
511                    // inclusive. We'd like to retain `offset` in the index, as
512                    // the commit is also retained in the log.
513                    index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
514                        io::Error::new(
515                            io::ErrorKind::InvalidData,
516                            format!("Failed to truncate offset index: {e}"),
517                        )
518                    })?;
519                    index_file.async_flush()?;
520                }
521
522                file.ftruncate(offset, byte_offset)?;
523                // Some filesystems require fsync after ftruncate.
524                file.fsync()?;
525                break;
526            }
527        }
528    }
529
530    Ok(())
531}
532
533pub struct Segments<R> {
534    repo: R,
535    offs: vec::IntoIter<u64>,
536    max_log_format_version: u8,
537}
538
539impl<R: Repo> Iterator for Segments<R> {
540    type Item = io::Result<segment::Reader<R::SegmentReader>>;
541
542    fn next(&mut self) -> Option<Self::Item> {
543        let off = self.offs.next()?;
544        debug!("iter segment {off}");
545        Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
546    }
547}
548
549/// Helper for the [`Commits`] iterator.
550enum CommitInfo {
551    /// Constructed in [`Generic::commits_from`], specifying the offset the next
552    /// commit should have.
553    Initial { next_offset: u64 },
554    /// The last commit seen by the iterator.
555    ///
556    /// Stores the range of transaction offsets, where `tx_range.end` is the
557    /// offset the next commit is expected to have. Also retains the checksum
558    /// needed to detect duplicate commits.
559    LastSeen { tx_range: Range<u64>, checksum: u32 },
560}
561
562impl CommitInfo {
563    /// `true` if the last seen commit in self and the provided one have the
564    /// same `min_tx_offset`.
565    fn same_offset_as(&self, commit: &StoredCommit) -> bool {
566        let Self::LastSeen { tx_range, .. } = self else {
567            return false;
568        };
569        tx_range.start == commit.min_tx_offset
570    }
571
572    /// `true` if the last seen commit in self and the provided one have the
573    /// same `checksum`.
574    fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
575        let Some(checksum) = self.checksum() else { return false };
576        checksum == &commit.checksum
577    }
578
579    fn checksum(&self) -> Option<&u32> {
580        match self {
581            Self::Initial { .. } => None,
582            Self::LastSeen { checksum, .. } => Some(checksum),
583        }
584    }
585
586    fn expected_offset(&self) -> &u64 {
587        match self {
588            Self::Initial { next_offset } => next_offset,
589            Self::LastSeen { tx_range, .. } => &tx_range.end,
590        }
591    }
592
593    // If initial offset falls within a commit, adjust it to the commit boundary.
594    //
595    // Returns `true` if the initial offset is past `commit`.
596    // Returns `false` if `self` isn't `Self::Initial`,
597    // or the initial offset has been adjusted to the starting offset of `commit`.
598    //
599    // For iteration, `true` means to skip the commit, `false` to yield it.
600    fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
601        if let Self::Initial { next_offset } = self {
602            let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
603            if *next_offset > last_tx_offset {
604                return true;
605            } else {
606                *next_offset = commit.min_tx_offset;
607            }
608        }
609
610        false
611    }
612}
613
614pub struct Commits<R: Repo> {
615    inner: Option<segment::Commits<R::SegmentReader>>,
616    segments: Segments<R>,
617    last_commit: CommitInfo,
618    last_error: Option<error::Traversal>,
619}
620
621impl<R: Repo> Commits<R> {
622    fn current_segment_header(&self) -> Option<&segment::Header> {
623        self.inner.as_ref().map(|segment| &segment.header)
624    }
625
626    /// Turn `self` into an iterator which pairs the log format version of the
627    /// current segment with the [`Commit`].
628    pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
629        CommitsWithVersion { inner: self }
630    }
631
632    /// Advance the current-segment iterator to yield the next commit.
633    ///
634    /// Checks that the offset sequence is contiguous, and may skip commits
635    /// until the requested offset.
636    ///
637    /// Returns `None` if the segment iterator is exhausted or returns an error.
638    fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
639        loop {
640            match self.inner.as_mut()?.next()? {
641                Ok(commit) => {
642                    // Pop the last error. Either we'll return it below, or it's no longer
643                    // interesting.
644                    let prev_error = self.last_error.take();
645
646                    // Skip entries before the initial commit.
647                    if self.last_commit.adjust_initial_offset(&commit) {
648                        trace!("adjust initial offset");
649                        continue;
650                    // Same offset: ignore if duplicate (same crc), else report a "fork".
651                    } else if self.last_commit.same_offset_as(&commit) {
652                        if !self.last_commit.same_checksum_as(&commit) {
653                            warn!(
654                                "forked: commit={:?} last-error={:?} last-crc={:?}",
655                                commit,
656                                prev_error,
657                                self.last_commit.checksum()
658                            );
659                            return Some(Err(error::Traversal::Forked {
660                                offset: commit.min_tx_offset,
661                            }));
662                        } else {
663                            trace!("ignore duplicate");
664                            continue;
665                        }
666                    // Not the expected offset: report out-of-order.
667                    } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
668                        warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
669                        return Some(Err(error::Traversal::OutOfOrder {
670                            expected_offset: *self.last_commit.expected_offset(),
671                            actual_offset: commit.min_tx_offset,
672                            prev_error: prev_error.map(Box::new),
673                        }));
674                    // Seems legit, record info.
675                    } else {
676                        self.last_commit = CommitInfo::LastSeen {
677                            tx_range: commit.tx_range(),
678                            checksum: commit.checksum,
679                        };
680
681                        return Some(Ok(commit));
682                    }
683                }
684
685                Err(e) => {
686                    warn!("error reading next commit: {e}");
687                    // Stop traversing this segment here.
688                    //
689                    // If this is just a partial write at the end of the segment,
690                    // we may be able to obtain a commit with right offset from
691                    // the next segment.
692                    //
693                    // If we don't, the error here is likely more helpful, but
694                    // would be clobbered by `OutOfOrder`. Therefore we store it
695                    // here.
696                    self.set_last_error(e);
697
698                    return None;
699                }
700            }
701        }
702    }
703
704    /// Store `e` has the last error for delayed reporting.
705    fn set_last_error(&mut self, e: io::Error) {
706        // Recover a checksum mismatch.
707        let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
708            e.into_inner()
709                .unwrap()
710                .downcast::<error::ChecksumMismatch>()
711                .map(|source| error::Traversal::Checksum {
712                    offset: *self.last_commit.expected_offset(),
713                    source: *source,
714                })
715                .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
716        } else {
717            error::Traversal::from(e)
718        };
719        self.last_error = Some(last_error);
720    }
721
722    /// If we're still looking for the initial commit, try to use the offset
723    /// index to advance the segment reader.
724    fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::SegmentReader>) {
725        if let CommitInfo::Initial { next_offset } = &self.last_commit {
726            let _ = self
727                .segments
728                .repo
729                .get_offset_index(segment.min_tx_offset)
730                .map_err(Into::into)
731                .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
732                .inspect_err(|e| {
733                    warn!(
734                        "commitlog offset index is not used at segment {}: {}",
735                        segment.min_tx_offset, e
736                    );
737                });
738        }
739    }
740}
741
742impl<R: Repo> Iterator for Commits<R> {
743    type Item = Result<StoredCommit, error::Traversal>;
744
745    fn next(&mut self) -> Option<Self::Item> {
746        if let Some(item) = self.next_commit() {
747            return Some(item);
748        }
749
750        match self.segments.next() {
751            // When there is no more data, the last commit being bad is an error
752            None => self.last_error.take().map(Err),
753            Some(segment) => segment.map_or_else(
754                |e| Some(Err(e.into())),
755                |mut segment| {
756                    self.try_seek_to_initial_offset(&mut segment);
757                    self.inner = Some(segment.commits());
758                    self.next()
759                },
760            ),
761        }
762    }
763}
764
765pub struct CommitsWithVersion<R: Repo> {
766    inner: Commits<R>,
767}
768
769impl<R: Repo> Iterator for CommitsWithVersion<R> {
770    type Item = Result<(u8, Commit), error::Traversal>;
771
772    fn next(&mut self) -> Option<Self::Item> {
773        let next = self.inner.next()?;
774        match next {
775            Ok(commit) => {
776                let version = self
777                    .inner
778                    .current_segment_header()
779                    .map(|hdr| hdr.log_format_version)
780                    .expect("segment header none even though segment yielded a commit");
781                Some(Ok((version, commit.into())))
782            }
783            Err(e) => Some(Err(e)),
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use std::{cell::Cell, iter::repeat};
791
792    use pretty_assertions::assert_matches;
793
794    use super::*;
795    use crate::{
796        payload::{ArrayDecodeError, ArrayDecoder},
797        tests::helpers::{fill_log, mem_log},
798    };
799
800    #[test]
801    fn rotate_segments_simple() {
802        let mut log = mem_log::<[u8; 32]>(128);
803        for _ in 0..3 {
804            log.append([0; 32]).unwrap();
805            log.commit().unwrap();
806        }
807
808        let offsets = log.repo.existing_offsets().unwrap();
809        assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
810        assert_eq!(offsets[offsets.len() - 1], 2);
811    }
812
813    #[test]
814    fn huge_commit() {
815        let mut log = mem_log::<[u8; 32]>(32);
816
817        log.append([0; 32]).unwrap();
818        log.append([1; 32]).unwrap();
819        log.commit().unwrap();
820        assert!(log.head.len() > log.opts.max_segment_size);
821
822        log.append([2; 32]).unwrap();
823        log.commit().unwrap();
824
825        assert_eq!(&log.tail, &[0]);
826        assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
827    }
828
829    #[test]
830    fn traverse_commits() {
831        let mut log = mem_log::<[u8; 32]>(32);
832        fill_log(&mut log, 10, repeat(1));
833
834        for (i, commit) in (0..10).zip(log.commits_from(0)) {
835            assert_eq!(i, commit.unwrap().min_tx_offset);
836        }
837    }
838
839    #[test]
840    fn traverse_commits_with_offset() {
841        let mut log = mem_log::<[u8; 32]>(32);
842        fill_log(&mut log, 10, repeat(1));
843
844        for offset in 0..10 {
845            for commit in log.commits_from(offset) {
846                let commit = commit.unwrap();
847                assert!(commit.min_tx_offset >= offset);
848            }
849        }
850        assert_eq!(0, log.commits_from(10).count());
851    }
852
853    #[test]
854    fn fold_transactions_with_offset() {
855        let mut log = mem_log::<[u8; 32]>(32);
856        fill_log(&mut log, 10, repeat(1));
857
858        /// A [`Decoder`] which counts the number of records decoded,
859        /// and asserts that the `tx_offset` is as expected.
860        struct CountDecoder {
861            count: Cell<u64>,
862            next_tx_offset: Cell<u64>,
863        }
864
865        impl Decoder for &CountDecoder {
866            type Record = [u8; 32];
867            type Error = ArrayDecodeError;
868
869            fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
870                &self,
871                _version: u8,
872                _tx_offset: u64,
873                _reader: &mut R,
874            ) -> Result<Self::Record, Self::Error> {
875                unreachable!("Folding never calls `decode_record`")
876            }
877
878            fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
879                &self,
880                version: u8,
881                tx_offset: u64,
882                reader: &mut R,
883            ) -> Result<(), Self::Error> {
884                let decoder = ArrayDecoder::<32>;
885                decoder.consume_record(version, tx_offset, reader)?;
886                self.count.set(self.count.get() + 1);
887                let expected_tx_offset = self.next_tx_offset.get();
888                assert_eq!(expected_tx_offset, tx_offset);
889                self.next_tx_offset.set(expected_tx_offset + 1);
890                Ok(())
891            }
892
893            fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
894                &self,
895                version: u8,
896                tx_offset: u64,
897                reader: &mut R,
898            ) -> Result<(), Self::Error> {
899                let decoder = ArrayDecoder::<32>;
900                decoder.consume_record(version, tx_offset, reader)?;
901                Ok(())
902            }
903        }
904
905        for offset in 0..10 {
906            let decoder = CountDecoder {
907                count: Cell::new(0),
908                next_tx_offset: Cell::new(offset),
909            };
910
911            log.fold_transactions_from(offset, &decoder).unwrap();
912
913            assert_eq!(decoder.count.get(), 10 - offset);
914            assert_eq!(decoder.next_tx_offset.get(), 10);
915        }
916    }
917
918    #[test]
919    fn traverse_commits_ignores_duplicates() {
920        let mut log = mem_log::<[u8; 32]>(1024);
921
922        log.append([42; 32]).unwrap();
923        let commit1 = log.head.commit.clone();
924        log.commit().unwrap();
925        log.head.commit = commit1.clone();
926        log.commit().unwrap();
927        log.append([43; 32]).unwrap();
928        let commit2 = log.head.commit.clone();
929        log.commit().unwrap();
930
931        assert_eq!(
932            [commit1, commit2].as_slice(),
933            &log.commits_from(0)
934                .map_ok(Commit::from)
935                .collect::<Result<Vec<_>, _>>()
936                .unwrap()
937        );
938    }
939
940    #[test]
941    fn traverse_commits_errors_when_forked() {
942        let mut log = mem_log::<[u8; 32]>(1024);
943
944        log.append([42; 32]).unwrap();
945        log.commit().unwrap();
946        log.head.commit = Commit {
947            min_tx_offset: 0,
948            n: 1,
949            records: [43; 32].to_vec(),
950            epoch: 0,
951        };
952        log.commit().unwrap();
953
954        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
955        assert!(
956            matches!(res, Err(error::Traversal::Forked { offset: 0 })),
957            "expected fork error: {res:?}"
958        )
959    }
960
961    #[test]
962    fn traverse_commits_errors_when_offset_not_contiguous() {
963        let mut log = mem_log::<[u8; 32]>(1024);
964
965        log.append([42; 32]).unwrap();
966        log.commit().unwrap();
967        log.head.commit.min_tx_offset = 18;
968        log.append([42; 32]).unwrap();
969        log.commit().unwrap();
970
971        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
972        assert!(
973            matches!(
974                res,
975                Err(error::Traversal::OutOfOrder {
976                    expected_offset: 1,
977                    actual_offset: 18,
978                    prev_error: None
979                })
980            ),
981            "expected fork error: {res:?}"
982        )
983    }
984
985    #[test]
986    fn traverse_transactions() {
987        let mut log = mem_log::<[u8; 32]>(32);
988        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
989
990        for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
991            assert_eq!(i, tx.unwrap().offset);
992        }
993    }
994
995    #[test]
996    fn traverse_transactions_with_offset() {
997        let mut log = mem_log::<[u8; 32]>(32);
998        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
999
1000        for offset in 0..total_txs {
1001            let mut iter = log.transactions_from(offset, &ArrayDecoder);
1002            assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
1003            for tx in iter {
1004                assert!(tx.unwrap().offset >= offset);
1005            }
1006        }
1007        assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
1008    }
1009
1010    #[test]
1011    fn traverse_empty() {
1012        let log = mem_log::<[u8; 32]>(32);
1013
1014        assert_eq!(0, log.commits_from(0).count());
1015        assert_eq!(0, log.commits_from(42).count());
1016        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1017        assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
1018    }
1019
1020    #[test]
1021    fn reset_hard() {
1022        let mut log = mem_log::<[u8; 32]>(128);
1023        fill_log(&mut log, 50, (1..=10).cycle());
1024
1025        log = log.reset().unwrap();
1026        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1027    }
1028
1029    #[test]
1030    fn reset_to_offset() {
1031        let mut log = mem_log::<[u8; 32]>(128);
1032        let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
1033
1034        for offset in (0..total_txs).rev() {
1035            log = log.reset_to(offset).unwrap();
1036            assert_eq!(
1037                offset,
1038                log.transactions_from(0, &ArrayDecoder)
1039                    .map(Result::unwrap)
1040                    .last()
1041                    .unwrap()
1042                    .offset
1043            );
1044            // We're counting from zero, so offset + 1 is the # of txs.
1045            assert_eq!(
1046                offset + 1,
1047                log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
1048            );
1049        }
1050    }
1051
1052    #[test]
1053    fn reset_to_offset_many_txs_per_commit() {
1054        let mut log = mem_log::<[u8; 32]>(128);
1055        let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
1056
1057        // No op.
1058        log = log.reset_to(total_txs).unwrap();
1059        assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
1060
1061        let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
1062
1063        // Both fall into the middle commit, which should be retained.
1064        log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
1065        assert_eq!(
1066            middle_commit.tx_range().end,
1067            log.transactions_from(0, &ArrayDecoder).count() as u64
1068        );
1069        log = log.reset_to(middle_commit.min_tx_offset).unwrap();
1070        assert_eq!(
1071            middle_commit.tx_range().end,
1072            log.transactions_from(0, &ArrayDecoder).count() as u64
1073        );
1074
1075        // Offset falls into 2nd commit.
1076        // 1st commit (1 tx) + 2nd commit (2 txs) = 3
1077        log = log.reset_to(1).unwrap();
1078        assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
1079
1080        // Offset falls into 1st commit.
1081        // 1st commit (1 tx) = 1
1082        log = log.reset_to(0).unwrap();
1083        assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1084    }
1085
1086    #[test]
1087    fn reopen() {
1088        let mut log = mem_log::<[u8; 32]>(1024);
1089        let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1090        assert_eq!(
1091            total_txs,
1092            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1093        );
1094
1095        let mut log = Generic::<_, [u8; 32]>::open(
1096            log.repo.clone(),
1097            Options {
1098                max_segment_size: 1024,
1099                ..Options::default()
1100            },
1101        )
1102        .unwrap();
1103        total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1104
1105        assert_eq!(
1106            total_txs,
1107            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1108        );
1109    }
1110
1111    #[test]
1112    fn set_same_epoch_does_nothing() {
1113        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1114        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1115        let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1116        assert_eq!(committed, None);
1117    }
1118
1119    #[test]
1120    fn set_new_epoch_commits() {
1121        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1122        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1123        log.append(<_>::default()).unwrap();
1124        let committed = log
1125            .set_epoch(42)
1126            .unwrap()
1127            .expect("should have committed the pending transaction");
1128        assert_eq!(log.epoch(), 42);
1129        assert_eq!(committed.tx_range.start, 0);
1130    }
1131
1132    #[test]
1133    fn set_lower_epoch_returns_error() {
1134        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1135        log.set_epoch(42).unwrap();
1136        assert_eq!(log.epoch(), 42);
1137        assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1138    }
1139}