spacetimedb_commitlog/
commitlog.rs

1use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7    commit::StoredCommit,
8    error::{self, source_chain},
9    index::IndexError,
10    payload::Decoder,
11    repo::{self, Repo, TxOffsetIndex},
12    segment::{self, FileLike, Transaction, Writer},
13    Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
14};
15
16pub use crate::segment::Committed;
17
18/// A commitlog generic over the storage backend as well as the type of records
19/// its [`Commit`]s contain.
20#[derive(Debug)]
21pub struct Generic<R: Repo, T> {
22    /// The storage backend.
23    pub(crate) repo: R,
24    /// The segment currently being written to.
25    ///
26    /// If we squint, all segments in a log are a non-empty linked list, the
27    /// head of which is the segment open for writing.
28    pub(crate) head: Writer<R::SegmentWriter>,
29    /// The tail of the non-empty list of segments.
30    ///
31    /// We only retain the min transaction offset of each, from which the
32    /// segments can be opened for reading when needed.
33    ///
34    /// This is a `Vec`, not a linked list, so the last element is the newest
35    /// segment (after `head`).
36    tail: Vec<u64>,
37    /// Configuration options.
38    opts: Options,
39    /// Type of a single record in this log's [`Commit::records`].
40    _record: PhantomData<T>,
41    /// Tracks panics/errors to control what happens on drop.
42    ///
43    /// Set to `true` before any I/O operation, and back to `false` after it
44    /// succeeded. This way, we won't try to perform I/O on drop when it is
45    /// unlikely to succeed, or even has a chance to panic.
46    panicked: bool,
47}
48
49impl<R: Repo, T> Generic<R, T> {
50    pub fn open(repo: R, opts: Options) -> io::Result<Self> {
51        let mut tail = repo.existing_offsets()?;
52        if !tail.is_empty() {
53            debug!("segments: {tail:?}");
54        }
55        let head = if let Some(last) = tail.pop() {
56            debug!("resuming last segment: {last}");
57            repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
58                tail.push(meta.tx_range.start);
59                repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
60            })?
61        } else {
62            debug!("starting fresh log");
63            repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
64        };
65
66        Ok(Self {
67            repo,
68            head,
69            tail,
70            opts,
71            _record: PhantomData,
72            panicked: false,
73        })
74    }
75
76    /// Get the current epoch.
77    ///
78    /// See also: [`Commit::epoch`].
79    pub fn epoch(&self) -> u64 {
80        self.head.commit.epoch
81    }
82
83    /// Update the current epoch.
84    ///
85    /// Calls [`Self::commit`] to flush all data of the previous epoch, and
86    /// returns the result.
87    ///
88    /// Does nothing if the given `epoch` is equal to the current epoch.
89    ///
90    /// # Errors
91    ///
92    /// If `epoch` is smaller than the current epoch, an error of kind
93    /// [`io::ErrorKind::InvalidInput`] is returned.
94    ///
95    /// Also see [`Self::commit`].
96    pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
97        use std::cmp::Ordering::*;
98
99        match epoch.cmp(&self.head.epoch()) {
100            Less => Err(io::Error::new(
101                io::ErrorKind::InvalidInput,
102                "new epoch is smaller than current epoch",
103            )),
104            Equal => Ok(None),
105            Greater => {
106                let res = self.commit()?;
107                self.head.set_epoch(epoch);
108                Ok(res)
109            }
110        }
111    }
112
113    /// Write the currently buffered data to storage and rotate segments as
114    /// necessary.
115    ///
116    /// Note that this does not imply that the data is durable, in particular
117    /// when a filesystem storage backend is used. Call [`Self::sync`] to flush
118    /// any OS buffers to stable storage.
119    ///
120    /// # Errors
121    ///
122    /// If an error occurs writing the data, the current [`Commit`] buffer is
123    /// retained, but a new segment is created. Retrying in case of an `Err`
124    /// return value thus will write the current data to that new segment.
125    ///
126    /// If this fails, however, the next attempt to create a new segment will
127    /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind
128    /// this means that something is seriously wrong underlying storage, and the
129    /// caller should stop writing to the log.
130    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
131        self.panicked = true;
132        let writer = &mut self.head;
133        let sz = writer.commit.encoded_len();
134        // If the segment is empty, but the commit exceeds the max size,
135        // we got a huge commit which needs to be written even if that
136        // results in a huge segment.
137        let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
138        let writer = if should_rotate {
139            self.sync();
140            self.start_new_segment()?
141        } else {
142            writer
143        };
144
145        let ret = writer.commit().or_else(|e| {
146            warn!("Commit failed: {e}");
147            // Nb.: Don't risk a panic by calling `self.sync()`.
148            // We already gave up on the last commit, and will retry it next time.
149            self.start_new_segment()?;
150            Err(e)
151        });
152        self.panicked = false;
153        ret
154    }
155
156    /// Force the currently active segment to be flushed to storage.
157    ///
158    /// Using a filesystem backend, this means to call `fsync(2)`.
159    ///
160    /// # Panics
161    ///
162    /// As an `fsync` failure leaves a file in a more of less undefined state,
163    /// this method panics in this case, thereby preventing any further writes
164    /// to the log and forcing the user to re-read the state from disk.
165    pub fn sync(&mut self) {
166        self.panicked = true;
167        if let Err(e) = self.head.fsync() {
168            panic!("Failed to fsync segment: {e}");
169        }
170        self.panicked = false;
171    }
172
173    /// The last transaction offset written to disk, or `None` if nothing has
174    /// been written yet.
175    ///
176    /// Note that this does not imply durability: [`Self::sync`] may not have
177    /// been called at this offset.
178    pub fn max_committed_offset(&self) -> Option<u64> {
179        // Naming is hard: the segment's `next_tx_offset` indicates how many
180        // txs are already in the log (it's the next commit's min-tx-offset).
181        // If the value is zero, however, the initial commit hasn't been
182        // committed yet.
183        self.head.next_tx_offset().checked_sub(1)
184    }
185
186    /// The first transaction offset written to disk, or `None` if nothing has
187    /// been written yet.
188    pub fn min_committed_offset(&self) -> Option<u64> {
189        self.tail
190            .first()
191            .copied()
192            .or_else(|| (!self.head.is_empty()).then(|| self.head.min_tx_offset()))
193    }
194
195    // Helper to obtain a list of the segment offsets which include transaction
196    // offset `offset`.
197    //
198    // The returned `Vec` is sorted in **ascending** order, such that the first
199    // element is the segment which contains `offset`.
200    //
201    // The offset of `self.head` is always included, regardless of how many
202    // entries it actually contains.
203    fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
204        if offset >= self.head.min_tx_offset {
205            vec![self.head.min_tx_offset]
206        } else {
207            let mut offs = Vec::with_capacity(self.tail.len() + 1);
208            if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
209                offs.extend_from_slice(&self.tail[pos..]);
210                offs.push(self.head.min_tx_offset);
211            }
212
213            offs
214        }
215    }
216
217    pub fn commits_from(&self, offset: u64) -> Commits<R> {
218        let offsets = self.segment_offsets_from(offset);
219        let segments = Segments {
220            offs: offsets.into_iter(),
221            repo: self.repo.clone(),
222            max_log_format_version: self.opts.log_format_version,
223        };
224        Commits {
225            inner: None,
226            segments,
227            last_commit: CommitInfo::Initial { next_offset: offset },
228            last_error: None,
229        }
230    }
231
232    pub fn reset(mut self) -> io::Result<Self> {
233        info!("hard reset");
234
235        self.panicked = true;
236        self.tail.reserve(1);
237        self.tail.push(self.head.min_tx_offset);
238        for segment in self.tail.iter().rev() {
239            debug!("removing segment {segment}");
240            self.repo.remove_segment(*segment)?;
241        }
242        // Prevent finalizer from running by not updating self.panicked.
243
244        Self::open(self.repo.clone(), self.opts)
245    }
246
247    pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
248        info!("reset to {offset}");
249
250        self.panicked = true;
251        self.tail.reserve(1);
252        self.tail.push(self.head.min_tx_offset);
253        reset_to_internal(&self.repo, &self.tail, offset)?;
254        // Prevent finalizer from running by not updating self.panicked.
255
256        Self::open(self.repo.clone(), self.opts)
257    }
258
259    /// Start a new segment, preserving the current head's `Commit`.
260    ///
261    /// The caller must ensure that the current head is synced to disk as
262    /// appropriate. It is not appropriate to sync after a write error, as that
263    /// is likely to return an error as well: the `Commit` will be written to
264    /// the new segment anyway.
265    fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::SegmentWriter>> {
266        debug!(
267            "starting new segment offset={} prev-offset={}",
268            self.head.next_tx_offset(),
269            self.head.min_tx_offset()
270        );
271        let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
272        let old = mem::replace(&mut self.head, new);
273        self.tail.push(old.min_tx_offset());
274        self.head.commit = old.commit;
275
276        Ok(&mut self.head)
277    }
278}
279
280impl<R: Repo, T: Encode> Generic<R, T> {
281    pub fn append(&mut self, record: T) -> Result<(), T> {
282        self.head.append(record)
283    }
284
285    pub fn transactions_from<'a, D>(
286        &self,
287        offset: u64,
288        decoder: &'a D,
289    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
290    where
291        D: Decoder<Record = T>,
292        D::Error: From<error::Traversal>,
293        R: 'a,
294        T: 'a,
295    {
296        transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
297    }
298
299    pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
300    where
301        D: Decoder,
302        D::Error: From<error::Traversal>,
303    {
304        fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
305    }
306}
307
308impl<R: Repo, T> Drop for Generic<R, T> {
309    fn drop(&mut self) {
310        if !self.panicked {
311            if let Err(e) = self.head.commit() {
312                warn!("failed to commit on drop: {e}");
313            }
314        }
315    }
316}
317
318/// Extract the most recently written [`segment::Metadata`] from the commitlog
319/// in `repo`.
320///
321/// Returns `None` if the commitlog is empty.
322///
323/// Note that this function validates the most recent segment, which entails
324/// traversing it from the start.
325///
326/// The function can be used instead of the pattern:
327///
328/// ```ignore
329/// let log = Commitlog::open(..)?;
330/// let max_offset = log.max_committed_offset();
331/// ```
332///
333/// like so:
334///
335/// ```ignore
336/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
337/// ```
338///
339/// Unlike `open`, no segment will be created in an empty `repo`.
340pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
341    let Some(last) = repo.existing_offsets()?.pop() else {
342        return Ok(None);
343    };
344
345    let mut storage = repo.open_segment_reader(last)?;
346    let offset_index = repo.get_offset_index(last).ok();
347    segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
348}
349
350pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
351    let mut offsets = repo.existing_offsets()?;
352    if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
353        offsets = offsets.split_off(pos);
354    }
355    let segments = Segments {
356        offs: offsets.into_iter(),
357        repo,
358        max_log_format_version,
359    };
360    Ok(Commits {
361        inner: None,
362        segments,
363        last_commit: CommitInfo::Initial { next_offset: offset },
364        last_error: None,
365    })
366}
367
368pub fn transactions_from<'a, R, D, T>(
369    repo: R,
370    max_log_format_version: u8,
371    offset: u64,
372    de: &'a D,
373) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
374where
375    R: Repo + 'a,
376    D: Decoder<Record = T>,
377    D::Error: From<error::Traversal>,
378    T: 'a,
379{
380    commits_from(repo, max_log_format_version, offset)
381        .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
382}
383
384pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
385where
386    R: Repo,
387    D: Decoder,
388    D::Error: From<error::Traversal> + From<io::Error>,
389{
390    let commits = commits_from(repo, max_log_format_version, offset)?;
391    fold_transactions_internal(commits.with_log_format_version(), de, offset)
392}
393
394fn transactions_from_internal<'a, R, D, T>(
395    commits: CommitsWithVersion<R>,
396    offset: u64,
397    de: &'a D,
398) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
399where
400    R: Repo + 'a,
401    D: Decoder<Record = T>,
402    D::Error: From<error::Traversal>,
403    T: 'a,
404{
405    commits
406        .map(|x| x.map_err(D::Error::from))
407        .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
408        .flatten_ok()
409        .map(|x| x.and_then(|y| y))
410}
411
412fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
413where
414    R: Repo,
415    D: Decoder,
416    D::Error: From<error::Traversal>,
417{
418    while let Some(commit) = commits.next() {
419        let (version, commit) = match commit {
420            Ok(version_and_commit) => version_and_commit,
421            Err(e) => {
422                // Ignore it if the very last commit in the log is broken.
423                // The next `append` will fix the log, but the `decoder`
424                // has no way to tell whether we're at the end or not.
425                // This is unlike the consumer of an iterator, which can
426                // perform below check itself.
427                if commits.next().is_none() {
428                    return Ok(());
429                }
430
431                return Err(e.into());
432            }
433        };
434        trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
435
436        let max_tx_offset = commit.min_tx_offset + commit.n as u64;
437        if max_tx_offset <= from {
438            continue;
439        }
440
441        let records = &mut commit.records.as_slice();
442        for n in 0..commit.n {
443            let tx_offset = commit.min_tx_offset + n as u64;
444            if tx_offset < from {
445                de.skip_record(version, tx_offset, records)?;
446            } else {
447                de.consume_record(version, tx_offset, records)?;
448            }
449        }
450    }
451
452    Ok(())
453}
454
455/// Remove all data past the given transaction `offset`.
456///
457/// The function deletes log segments starting from the newest. As multiple
458/// segments cannot be deleted atomically, the log may be left longer than
459/// `offset` if the function does not return successfully.
460///
461/// If the function returns successfully, the most recent [`Commit`] in the
462/// log will contain the transaction at `offset`.
463///
464/// The log must be re-opened if it is to be used after calling this function.
465pub fn reset_to(repo: &impl Repo, offset: u64) -> io::Result<()> {
466    let segments = repo.existing_offsets()?;
467    reset_to_internal(repo, &segments, offset)
468}
469
470fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Result<()> {
471    for segment in segments.iter().copied().rev() {
472        if segment > offset {
473            // Segment is outside the offset, so remove it wholesale.
474            debug!("removing segment {segment}");
475            repo.remove_segment(segment)?;
476        } else {
477            // Read commit-wise until we find the byte offset.
478            let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
479
480            let (index_file, mut byte_offset) = try_seek_using_offset_index(repo, &mut reader, offset)
481                .map(|(index_file, byte_offset)| (Some(index_file), byte_offset))
482                .unwrap_or((None, segment::Header::LEN as u64));
483
484            let commits = reader.commits();
485
486            for commit in commits {
487                let commit = commit?;
488                if commit.min_tx_offset > offset {
489                    break;
490                }
491                byte_offset += Commit::from(commit).encoded_len() as u64;
492            }
493
494            if byte_offset == segment::Header::LEN as u64 {
495                // Segment is empty, just remove it.
496                repo.remove_segment(segment)?;
497            } else {
498                debug!("truncating segment {segment} to {offset} at {byte_offset}");
499                let mut file = repo.open_segment_writer(segment)?;
500
501                if let Some(mut index_file) = index_file {
502                    let index_file = index_file.as_mut();
503                    // Note: The offset index truncates equal or greater,
504                    // inclusive. We'd like to retain `offset` in the index, as
505                    // the commit is also retained in the log.
506                    index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
507                        io::Error::new(
508                            io::ErrorKind::InvalidData,
509                            format!("Failed to truncate offset index: {e}"),
510                        )
511                    })?;
512                    index_file.async_flush()?;
513                }
514
515                file.ftruncate(offset, byte_offset)?;
516                // Some filesystems require fsync after ftruncate.
517                file.fsync()?;
518                break;
519            }
520        }
521    }
522
523    Ok(())
524}
525
526pub struct Segments<R> {
527    repo: R,
528    offs: vec::IntoIter<u64>,
529    max_log_format_version: u8,
530}
531
532impl<R: Repo> Iterator for Segments<R> {
533    type Item = io::Result<segment::Reader<R::SegmentReader>>;
534
535    fn next(&mut self) -> Option<Self::Item> {
536        let off = self.offs.next()?;
537        debug!("iter segment {off}");
538        Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
539    }
540}
541
542/// Helper for the [`Commits`] iterator.
543enum CommitInfo {
544    /// Constructed in [`Generic::commits_from`], specifying the offset the next
545    /// commit should have.
546    Initial { next_offset: u64 },
547    /// The last commit seen by the iterator.
548    ///
549    /// Stores the range of transaction offsets, where `tx_range.end` is the
550    /// offset the next commit is expected to have. Also retains the checksum
551    /// needed to detect duplicate commits.
552    LastSeen { tx_range: Range<u64>, checksum: u32 },
553}
554
555impl CommitInfo {
556    /// `true` if the last seen commit in self and the provided one have the
557    /// same `min_tx_offset`.
558    fn same_offset_as(&self, commit: &StoredCommit) -> bool {
559        let Self::LastSeen { tx_range, .. } = self else {
560            return false;
561        };
562        tx_range.start == commit.min_tx_offset
563    }
564
565    /// `true` if the last seen commit in self and the provided one have the
566    /// same `checksum`.
567    fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
568        let Some(checksum) = self.checksum() else { return false };
569        checksum == &commit.checksum
570    }
571
572    fn checksum(&self) -> Option<&u32> {
573        match self {
574            Self::Initial { .. } => None,
575            Self::LastSeen { checksum, .. } => Some(checksum),
576        }
577    }
578
579    fn expected_offset(&self) -> &u64 {
580        match self {
581            Self::Initial { next_offset } => next_offset,
582            Self::LastSeen { tx_range, .. } => &tx_range.end,
583        }
584    }
585
586    // If initial offset falls within a commit, adjust it to the commit boundary.
587    //
588    // Returns `true` if the initial offset is past `commit`.
589    // Returns `false` if `self` isn't `Self::Initial`,
590    // or the initial offset has been adjusted to the starting offset of `commit`.
591    //
592    // For iteration, `true` means to skip the commit, `false` to yield it.
593    fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
594        if let Self::Initial { next_offset } = self {
595            let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
596            if *next_offset > last_tx_offset {
597                return true;
598            } else {
599                *next_offset = commit.min_tx_offset;
600            }
601        }
602
603        false
604    }
605}
606
607pub struct Commits<R: Repo> {
608    inner: Option<segment::Commits<R::SegmentReader>>,
609    segments: Segments<R>,
610    last_commit: CommitInfo,
611    last_error: Option<error::Traversal>,
612}
613
614impl<R: Repo> Commits<R> {
615    fn current_segment_header(&self) -> Option<&segment::Header> {
616        self.inner.as_ref().map(|segment| &segment.header)
617    }
618
619    /// Turn `self` into an iterator which pairs the log format version of the
620    /// current segment with the [`Commit`].
621    pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
622        CommitsWithVersion { inner: self }
623    }
624
625    /// Advance the current-segment iterator to yield the next commit.
626    ///
627    /// Checks that the offset sequence is contiguous, and may skip commits
628    /// until the requested offset.
629    ///
630    /// Returns `None` if the segment iterator is exhausted or returns an error.
631    fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
632        loop {
633            match self.inner.as_mut()?.next()? {
634                Ok(commit) => {
635                    // Pop the last error. Either we'll return it below, or it's no longer
636                    // interesting.
637                    let prev_error = self.last_error.take();
638
639                    // Skip entries before the initial commit.
640                    if self.last_commit.adjust_initial_offset(&commit) {
641                        trace!("adjust initial offset");
642                        continue;
643                    // Same offset: ignore if duplicate (same crc), else report a "fork".
644                    } else if self.last_commit.same_offset_as(&commit) {
645                        if !self.last_commit.same_checksum_as(&commit) {
646                            warn!(
647                                "forked: commit={:?} last-error={:?} last-crc={:?}",
648                                commit,
649                                prev_error,
650                                self.last_commit.checksum()
651                            );
652                            return Some(Err(error::Traversal::Forked {
653                                offset: commit.min_tx_offset,
654                            }));
655                        } else {
656                            trace!("ignore duplicate");
657                            continue;
658                        }
659                    // Not the expected offset: report out-of-order.
660                    } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
661                        warn!("out-of-order: commit={commit:?} last-error={prev_error:?}");
662                        return Some(Err(error::Traversal::OutOfOrder {
663                            expected_offset: *self.last_commit.expected_offset(),
664                            actual_offset: commit.min_tx_offset,
665                            prev_error: prev_error.map(Box::new),
666                        }));
667                    // Seems legit, record info.
668                    } else {
669                        self.last_commit = CommitInfo::LastSeen {
670                            tx_range: commit.tx_range(),
671                            checksum: commit.checksum,
672                        };
673
674                        return Some(Ok(commit));
675                    }
676                }
677
678                Err(e) => {
679                    warn!("error reading next commit: {e}");
680                    // Stop traversing this segment here.
681                    //
682                    // If this is just a partial write at the end of the segment,
683                    // we may be able to obtain a commit with right offset from
684                    // the next segment.
685                    //
686                    // If we don't, the error here is likely more helpful, but
687                    // would be clobbered by `OutOfOrder`. Therefore we store it
688                    // here.
689                    self.set_last_error(e);
690
691                    return None;
692                }
693            }
694        }
695    }
696
697    /// Store `e` has the last error for delayed reporting.
698    fn set_last_error(&mut self, e: io::Error) {
699        // Recover a checksum mismatch.
700        let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
701            e.into_inner()
702                .unwrap()
703                .downcast::<error::ChecksumMismatch>()
704                .map(|source| error::Traversal::Checksum {
705                    offset: *self.last_commit.expected_offset(),
706                    source: *source,
707                })
708                .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
709        } else {
710            error::Traversal::from(e)
711        };
712        self.last_error = Some(last_error);
713    }
714
715    /// If we're still looking for the initial commit, try to use the offset
716    /// index to advance the segment reader.
717    fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::SegmentReader>) {
718        if let CommitInfo::Initial { next_offset } = &self.last_commit {
719            try_seek_using_offset_index(&self.segments.repo, segment, *next_offset);
720        }
721    }
722}
723
724impl<R: Repo> Iterator for Commits<R> {
725    type Item = Result<StoredCommit, error::Traversal>;
726
727    fn next(&mut self) -> Option<Self::Item> {
728        if let Some(item) = self.next_commit() {
729            return Some(item);
730        }
731
732        match self.segments.next() {
733            // When there is no more data, the last commit being bad is an error
734            None => self.last_error.take().map(Err),
735            Some(segment) => segment.map_or_else(
736                |e| Some(Err(e.into())),
737                |mut segment| {
738                    self.try_seek_to_initial_offset(&mut segment);
739                    self.inner = Some(segment.commits());
740                    self.next()
741                },
742            ),
743        }
744    }
745}
746
747pub struct CommitsWithVersion<R: Repo> {
748    inner: Commits<R>,
749}
750
751impl<R: Repo> Iterator for CommitsWithVersion<R> {
752    type Item = Result<(u8, Commit), error::Traversal>;
753
754    fn next(&mut self) -> Option<Self::Item> {
755        let next = self.inner.next()?;
756        match next {
757            Ok(commit) => {
758                let version = self
759                    .inner
760                    .current_segment_header()
761                    .map(|hdr| hdr.log_format_version)
762                    .expect("segment header none even though segment yielded a commit");
763                Some(Ok((version, commit.into())))
764            }
765            Err(e) => Some(Err(e)),
766        }
767    }
768}
769
770/// Try to advance `reader` to `offset` using the offset index.
771///
772/// If successful, returns the offset index and the byte position of `reader`.
773/// `None` if the position of `reader` is unchanged.
774fn try_seek_using_offset_index<R: Repo>(
775    repo: &R,
776    reader: &mut segment::Reader<R::SegmentReader>,
777    offset: u64,
778) -> Option<(TxOffsetIndex, u64)> {
779    let segment_offset = reader.min_tx_offset;
780    let index = repo
781        .get_offset_index(segment_offset)
782        .inspect_err(|e| {
783            if e.kind() == io::ErrorKind::NotFound {
784                debug!("offset index does not exist segment={segment_offset}");
785            } else {
786                warn!(
787                    "error opening offset index segment={segment_offset}: {e} {}",
788                    source_chain(&e)
789                );
790            }
791        })
792        .ok()?;
793
794    reader
795        .seek_to_offset(&index, offset)
796        .inspect_err(|e| match e {
797            // Can happen if the segment is empty or small, so don't spam the logs.
798            IndexError::KeyNotFound => {
799                debug!("offset not found segment={segment_offset} offset={offset}");
800            }
801            e => {
802                warn!(
803                    "error reading index segment={segment_offset} offset={offset}: {e} {}",
804                    source_chain(&e)
805                );
806            }
807        })
808        .ok()
809        .map(|pos| (index, pos))
810}
811
812#[cfg(test)]
813mod tests {
814    use std::{cell::Cell, iter::repeat};
815
816    use pretty_assertions::assert_matches;
817
818    use super::*;
819    use crate::{
820        payload::{ArrayDecodeError, ArrayDecoder},
821        tests::helpers::{fill_log, mem_log},
822    };
823
824    #[test]
825    fn rotate_segments_simple() {
826        let mut log = mem_log::<[u8; 32]>(128);
827        for _ in 0..3 {
828            log.append([0; 32]).unwrap();
829            log.commit().unwrap();
830        }
831
832        let offsets = log.repo.existing_offsets().unwrap();
833        assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
834        assert_eq!(offsets[offsets.len() - 1], 2);
835    }
836
837    #[test]
838    fn huge_commit() {
839        let mut log = mem_log::<[u8; 32]>(32);
840
841        log.append([0; 32]).unwrap();
842        log.append([1; 32]).unwrap();
843        log.commit().unwrap();
844        assert!(log.head.len() > log.opts.max_segment_size);
845
846        log.append([2; 32]).unwrap();
847        log.commit().unwrap();
848
849        assert_eq!(&log.tail, &[0]);
850        assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
851    }
852
853    #[test]
854    fn traverse_commits() {
855        let mut log = mem_log::<[u8; 32]>(32);
856        fill_log(&mut log, 10, repeat(1));
857
858        for (i, commit) in (0..10).zip(log.commits_from(0)) {
859            assert_eq!(i, commit.unwrap().min_tx_offset);
860        }
861    }
862
863    #[test]
864    fn traverse_commits_with_offset() {
865        let mut log = mem_log::<[u8; 32]>(32);
866        fill_log(&mut log, 10, repeat(1));
867
868        for offset in 0..10 {
869            for commit in log.commits_from(offset) {
870                let commit = commit.unwrap();
871                assert!(commit.min_tx_offset >= offset);
872            }
873        }
874        assert_eq!(0, log.commits_from(10).count());
875    }
876
877    #[test]
878    fn fold_transactions_with_offset() {
879        let mut log = mem_log::<[u8; 32]>(32);
880        fill_log(&mut log, 10, repeat(1));
881
882        /// A [`Decoder`] which counts the number of records decoded,
883        /// and asserts that the `tx_offset` is as expected.
884        struct CountDecoder {
885            count: Cell<u64>,
886            next_tx_offset: Cell<u64>,
887        }
888
889        impl Decoder for &CountDecoder {
890            type Record = [u8; 32];
891            type Error = ArrayDecodeError;
892
893            fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
894                &self,
895                _version: u8,
896                _tx_offset: u64,
897                _reader: &mut R,
898            ) -> Result<Self::Record, Self::Error> {
899                unreachable!("Folding never calls `decode_record`")
900            }
901
902            fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
903                &self,
904                version: u8,
905                tx_offset: u64,
906                reader: &mut R,
907            ) -> Result<(), Self::Error> {
908                let decoder = ArrayDecoder::<32>;
909                decoder.consume_record(version, tx_offset, reader)?;
910                self.count.set(self.count.get() + 1);
911                let expected_tx_offset = self.next_tx_offset.get();
912                assert_eq!(expected_tx_offset, tx_offset);
913                self.next_tx_offset.set(expected_tx_offset + 1);
914                Ok(())
915            }
916
917            fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
918                &self,
919                version: u8,
920                tx_offset: u64,
921                reader: &mut R,
922            ) -> Result<(), Self::Error> {
923                let decoder = ArrayDecoder::<32>;
924                decoder.consume_record(version, tx_offset, reader)?;
925                Ok(())
926            }
927        }
928
929        for offset in 0..10 {
930            let decoder = CountDecoder {
931                count: Cell::new(0),
932                next_tx_offset: Cell::new(offset),
933            };
934
935            log.fold_transactions_from(offset, &decoder).unwrap();
936
937            assert_eq!(decoder.count.get(), 10 - offset);
938            assert_eq!(decoder.next_tx_offset.get(), 10);
939        }
940    }
941
942    #[test]
943    fn traverse_commits_ignores_duplicates() {
944        let mut log = mem_log::<[u8; 32]>(1024);
945
946        log.append([42; 32]).unwrap();
947        let commit1 = log.head.commit.clone();
948        log.commit().unwrap();
949        log.head.commit = commit1.clone();
950        log.commit().unwrap();
951        log.append([43; 32]).unwrap();
952        let commit2 = log.head.commit.clone();
953        log.commit().unwrap();
954
955        assert_eq!(
956            [commit1, commit2].as_slice(),
957            &log.commits_from(0)
958                .map_ok(Commit::from)
959                .collect::<Result<Vec<_>, _>>()
960                .unwrap()
961        );
962    }
963
964    #[test]
965    fn traverse_commits_errors_when_forked() {
966        let mut log = mem_log::<[u8; 32]>(1024);
967
968        log.append([42; 32]).unwrap();
969        log.commit().unwrap();
970        log.head.commit = Commit {
971            min_tx_offset: 0,
972            n: 1,
973            records: [43; 32].to_vec(),
974            epoch: 0,
975        };
976        log.commit().unwrap();
977
978        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
979        assert!(
980            matches!(res, Err(error::Traversal::Forked { offset: 0 })),
981            "expected fork error: {res:?}"
982        )
983    }
984
985    #[test]
986    fn traverse_commits_errors_when_offset_not_contiguous() {
987        let mut log = mem_log::<[u8; 32]>(1024);
988
989        log.append([42; 32]).unwrap();
990        log.commit().unwrap();
991        log.head.commit.min_tx_offset = 18;
992        log.append([42; 32]).unwrap();
993        log.commit().unwrap();
994
995        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
996        assert!(
997            matches!(
998                res,
999                Err(error::Traversal::OutOfOrder {
1000                    expected_offset: 1,
1001                    actual_offset: 18,
1002                    prev_error: None
1003                })
1004            ),
1005            "expected fork error: {res:?}"
1006        )
1007    }
1008
1009    #[test]
1010    fn traverse_transactions() {
1011        let mut log = mem_log::<[u8; 32]>(32);
1012        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
1013
1014        for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
1015            assert_eq!(i, tx.unwrap().offset);
1016        }
1017    }
1018
1019    #[test]
1020    fn traverse_transactions_with_offset() {
1021        let mut log = mem_log::<[u8; 32]>(32);
1022        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
1023
1024        for offset in 0..total_txs {
1025            let mut iter = log.transactions_from(offset, &ArrayDecoder);
1026            assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
1027            for tx in iter {
1028                assert!(tx.unwrap().offset >= offset);
1029            }
1030        }
1031        assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
1032    }
1033
1034    #[test]
1035    fn traverse_empty() {
1036        let log = mem_log::<[u8; 32]>(32);
1037
1038        assert_eq!(0, log.commits_from(0).count());
1039        assert_eq!(0, log.commits_from(42).count());
1040        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1041        assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
1042    }
1043
1044    #[test]
1045    fn reset_hard() {
1046        let mut log = mem_log::<[u8; 32]>(128);
1047        fill_log(&mut log, 50, (1..=10).cycle());
1048
1049        log = log.reset().unwrap();
1050        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1051    }
1052
1053    #[test]
1054    fn reset_to_offset() {
1055        let mut log = mem_log::<[u8; 32]>(128);
1056        let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
1057
1058        for offset in (0..total_txs).rev() {
1059            log = log.reset_to(offset).unwrap();
1060            assert_eq!(
1061                offset,
1062                log.transactions_from(0, &ArrayDecoder)
1063                    .map(Result::unwrap)
1064                    .last()
1065                    .unwrap()
1066                    .offset
1067            );
1068            // We're counting from zero, so offset + 1 is the # of txs.
1069            assert_eq!(
1070                offset + 1,
1071                log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
1072            );
1073        }
1074    }
1075
1076    #[test]
1077    fn reset_to_offset_many_txs_per_commit() {
1078        let mut log = mem_log::<[u8; 32]>(128);
1079        let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
1080
1081        // No op.
1082        log = log.reset_to(total_txs).unwrap();
1083        assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
1084
1085        let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
1086
1087        // Both fall into the middle commit, which should be retained.
1088        log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
1089        assert_eq!(
1090            middle_commit.tx_range().end,
1091            log.transactions_from(0, &ArrayDecoder).count() as u64
1092        );
1093        log = log.reset_to(middle_commit.min_tx_offset).unwrap();
1094        assert_eq!(
1095            middle_commit.tx_range().end,
1096            log.transactions_from(0, &ArrayDecoder).count() as u64
1097        );
1098
1099        // Offset falls into 2nd commit.
1100        // 1st commit (1 tx) + 2nd commit (2 txs) = 3
1101        log = log.reset_to(1).unwrap();
1102        assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
1103
1104        // Offset falls into 1st commit.
1105        // 1st commit (1 tx) = 1
1106        log = log.reset_to(0).unwrap();
1107        assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1108    }
1109
1110    #[test]
1111    fn reopen() {
1112        let mut log = mem_log::<[u8; 32]>(1024);
1113        let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1114        assert_eq!(
1115            total_txs,
1116            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1117        );
1118
1119        let mut log = Generic::<_, [u8; 32]>::open(
1120            log.repo.clone(),
1121            Options {
1122                max_segment_size: 1024,
1123                ..Options::default()
1124            },
1125        )
1126        .unwrap();
1127        total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1128
1129        assert_eq!(
1130            total_txs,
1131            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1132        );
1133    }
1134
1135    #[test]
1136    fn set_same_epoch_does_nothing() {
1137        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1138        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1139        let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1140        assert_eq!(committed, None);
1141    }
1142
1143    #[test]
1144    fn set_new_epoch_commits() {
1145        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1146        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1147        log.append(<_>::default()).unwrap();
1148        let committed = log
1149            .set_epoch(42)
1150            .unwrap()
1151            .expect("should have committed the pending transaction");
1152        assert_eq!(log.epoch(), 42);
1153        assert_eq!(committed.tx_range.start, 0);
1154    }
1155
1156    #[test]
1157    fn set_lower_epoch_returns_error() {
1158        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1159        log.set_epoch(42).unwrap();
1160        assert_eq!(log.epoch(), 42);
1161        assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1162    }
1163}