spacetimedb_commitlog/
commitlog.rs

1use std::{io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7    commit::StoredCommit,
8    error,
9    payload::Decoder,
10    repo::{self, Repo},
11    segment::{self, FileLike, Transaction, Writer},
12    Commit, Encode, Options,
13};
14
15pub use crate::segment::Committed;
16
17/// A commitlog generic over the storage backend as well as the type of records
18/// its [`Commit`]s contain.
19#[derive(Debug)]
20pub struct Generic<R: Repo, T> {
21    /// The storage backend.
22    pub(crate) repo: R,
23    /// The segment currently being written to.
24    ///
25    /// If we squint, all segments in a log are a non-empty linked list, the
26    /// head of which is the segment open for writing.
27    pub(crate) head: Writer<R::Segment>,
28    /// The tail of the non-empty list of segments.
29    ///
30    /// We only retain the min transaction offset of each, from which the
31    /// segments can be opened for reading when needed.
32    ///
33    /// This is a `Vec`, not a linked list, so the last element is the newest
34    /// segment (after `head`).
35    tail: Vec<u64>,
36    /// Configuration options.
37    opts: Options,
38    /// Type of a single record in this log's [`Commit::records`].
39    _record: PhantomData<T>,
40    /// Tracks panics/errors to control what happens on drop.
41    ///
42    /// Set to `true` before any I/O operation, and back to `false` after it
43    /// succeeded. This way, we won't try to perform I/O on drop when it is
44    /// unlikely to succeed, or even has a chance to panic.
45    panicked: bool,
46}
47
48impl<R: Repo, T> Generic<R, T> {
49    pub fn open(repo: R, opts: Options) -> io::Result<Self> {
50        let mut tail = repo.existing_offsets()?;
51        if !tail.is_empty() {
52            debug!("segments: {tail:?}");
53        }
54        let head = if let Some(last) = tail.pop() {
55            debug!("resuming last segment: {last}");
56            repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
57                tail.push(meta.tx_range.start);
58                repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
59            })?
60        } else {
61            debug!("starting fresh log");
62            repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
63        };
64
65        Ok(Self {
66            repo,
67            head,
68            tail,
69            opts,
70            _record: PhantomData,
71            panicked: false,
72        })
73    }
74
75    /// Get the current epoch.
76    ///
77    /// See also: [`Commit::epoch`].
78    pub fn epoch(&self) -> u64 {
79        self.head.commit.epoch
80    }
81
82    /// Update the current epoch.
83    ///
84    /// Calls [`Self::commit`] to flush all data of the previous epoch, and
85    /// returns the result.
86    ///
87    /// Does nothing if the given `epoch` is equal to the current epoch.
88    ///
89    /// # Errors
90    ///
91    /// If `epoch` is smaller than the current epoch, an error of kind
92    /// [`io::ErrorKind::InvalidInput`] is returned.
93    ///
94    /// Also see [`Self::commit`].
95    pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
96        use std::cmp::Ordering::*;
97
98        match epoch.cmp(&self.head.epoch()) {
99            Less => Err(io::Error::new(
100                io::ErrorKind::InvalidInput,
101                "new epoch is smaller than current epoch",
102            )),
103            Equal => Ok(None),
104            Greater => {
105                let res = self.commit()?;
106                self.head.set_epoch(epoch);
107                Ok(res)
108            }
109        }
110    }
111
112    /// Write the currently buffered data to storage and rotate segments as
113    /// necessary.
114    ///
115    /// Note that this does not imply that the data is durable, in particular
116    /// when a filesystem storage backend is used. Call [`Self::sync`] to flush
117    /// any OS buffers to stable storage.
118    ///
119    /// # Errors
120    ///
121    /// If an error occurs writing the data, the current [`Commit`] buffer is
122    /// retained, but a new segment is created. Retrying in case of an `Err`
123    /// return value thus will write the current data to that new segment.
124    ///
125    /// If this fails, however, the next attempt to create a new segment will
126    /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind
127    /// this means that something is seriously wrong underlying storage, and the
128    /// caller should stop writing to the log.
129    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
130        self.panicked = true;
131        let writer = &mut self.head;
132        let sz = writer.commit.encoded_len();
133        // If the segment is empty, but the commit exceeds the max size,
134        // we got a huge commit which needs to be written even if that
135        // results in a huge segment.
136        let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
137        let writer = if should_rotate {
138            self.sync();
139            self.start_new_segment()?
140        } else {
141            writer
142        };
143
144        let ret = writer.commit().or_else(|e| {
145            warn!("Commit failed: {e}");
146            // Nb.: Don't risk a panic by calling `self.sync()`.
147            // We already gave up on the last commit, and will retry it next time.
148            self.start_new_segment()?;
149            Err(e)
150        });
151        self.panicked = false;
152        ret
153    }
154
155    /// Force the currently active segment to be flushed to storage.
156    ///
157    /// Using a filesystem backend, this means to call `fsync(2)`.
158    ///
159    /// # Panics
160    ///
161    /// As an `fsync` failure leaves a file in a more of less undefined state,
162    /// this method panics in this case, thereby preventing any further writes
163    /// to the log and forcing the user to re-read the state from disk.
164    pub fn sync(&mut self) {
165        self.panicked = true;
166        if let Err(e) = self.head.fsync() {
167            panic!("Failed to fsync segment: {e}");
168        }
169        self.panicked = false;
170    }
171
172    /// The last transaction offset written to disk, or `None` if nothing has
173    /// been written yet.
174    ///
175    /// Note that this does not imply durability: [`Self::sync`] may not have
176    /// been called at this offset.
177    pub fn max_committed_offset(&self) -> Option<u64> {
178        // Naming is hard: the segment's `next_tx_offset` indicates how many
179        // txs are already in the log (it's the next commit's min-tx-offset).
180        // If the value is zero, however, the initial commit hasn't been
181        // committed yet.
182        self.head.next_tx_offset().checked_sub(1)
183    }
184
185    // Helper to obtain a list of the segment offsets which include transaction
186    // offset `offset`.
187    //
188    // The returned `Vec` is sorted in **ascending** order, such that the first
189    // element is the segment which contains `offset`.
190    //
191    // The offset of `self.head` is always included, regardless of how many
192    // entries it actually contains.
193    fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
194        if offset >= self.head.min_tx_offset {
195            vec![self.head.min_tx_offset]
196        } else {
197            let mut offs = Vec::with_capacity(self.tail.len() + 1);
198            if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
199                offs.extend_from_slice(&self.tail[pos..]);
200                offs.push(self.head.min_tx_offset);
201            }
202
203            offs
204        }
205    }
206
207    pub fn commits_from(&self, offset: u64) -> Commits<R> {
208        let offsets = self.segment_offsets_from(offset);
209        let segments = Segments {
210            offs: offsets.into_iter(),
211            repo: self.repo.clone(),
212            max_log_format_version: self.opts.log_format_version,
213        };
214        Commits {
215            inner: None,
216            segments,
217            last_commit: CommitInfo::Initial { next_offset: offset },
218            last_error: None,
219        }
220    }
221
222    pub fn reset(mut self) -> io::Result<Self> {
223        info!("hard reset");
224
225        self.panicked = true;
226        self.tail.reserve(1);
227        self.tail.push(self.head.min_tx_offset);
228        for segment in self.tail.iter().rev() {
229            debug!("removing segment {segment}");
230            self.repo.remove_segment(*segment)?;
231        }
232        // Prevent finalizer from running by not updating self.panicked.
233
234        Self::open(self.repo.clone(), self.opts)
235    }
236
237    pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
238        info!("reset to {offset}");
239
240        self.panicked = true;
241        self.tail.reserve(1);
242        self.tail.push(self.head.min_tx_offset);
243        for segment in self.tail.iter().rev() {
244            let segment = *segment;
245            if segment > offset {
246                // Segment is outside the offset, so remove it wholesale.
247                debug!("removing segment {segment}");
248                self.repo.remove_segment(segment)?;
249            } else {
250                // Read commit-wise until we find the byte offset.
251                let reader = repo::open_segment_reader(&self.repo, self.opts.log_format_version, segment)?;
252                let commits = reader.commits();
253
254                let mut bytes_read = 0;
255                for commit in commits {
256                    let commit = commit?;
257                    if commit.min_tx_offset > offset {
258                        break;
259                    }
260                    bytes_read += Commit::from(commit).encoded_len() as u64;
261                }
262
263                if bytes_read == 0 {
264                    // Segment is empty, just remove it.
265                    self.repo.remove_segment(segment)?;
266                } else {
267                    let byte_offset = segment::Header::LEN as u64 + bytes_read;
268                    debug!("truncating segment {segment} to {offset} at {byte_offset}");
269                    let mut file = self.repo.open_segment(segment)?;
270                    // Note: The offset index truncates equal or greater,
271                    // inclusive. We'd like to retain `offset` in the index, as
272                    // the commit is also retained in the log.
273                    file.ftruncate(offset + 1, byte_offset)?;
274                    // Some filesystems require fsync after ftruncate.
275                    file.fsync()?;
276                    break;
277                }
278            }
279        }
280        // Prevent finalizer from running by not updating self.panicked.
281
282        Self::open(self.repo.clone(), self.opts)
283    }
284
285    /// Start a new segment, preserving the current head's `Commit`.
286    ///
287    /// The caller must ensure that the current head is synced to disk as
288    /// appropriate. It is not appropriate to sync after a write error, as that
289    /// is likely to return an error as well: the `Commit` will be written to
290    /// the new segment anyway.
291    fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::Segment>> {
292        debug!(
293            "starting new segment offset={} prev-offset={}",
294            self.head.next_tx_offset(),
295            self.head.min_tx_offset()
296        );
297        let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
298        let old = mem::replace(&mut self.head, new);
299        self.tail.push(old.min_tx_offset());
300        self.head.commit = old.commit;
301
302        Ok(&mut self.head)
303    }
304}
305
306impl<R: Repo, T: Encode> Generic<R, T> {
307    pub fn append(&mut self, record: T) -> Result<(), T> {
308        self.head.append(record)
309    }
310
311    pub fn transactions_from<'a, D>(
312        &self,
313        offset: u64,
314        decoder: &'a D,
315    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
316    where
317        D: Decoder<Record = T>,
318        D::Error: From<error::Traversal>,
319        R: 'a,
320        T: 'a,
321    {
322        transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
323    }
324
325    pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
326    where
327        D: Decoder,
328        D::Error: From<error::Traversal>,
329    {
330        fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
331    }
332}
333
334impl<R: Repo, T> Drop for Generic<R, T> {
335    fn drop(&mut self) {
336        if !self.panicked {
337            if let Err(e) = self.head.commit() {
338                warn!("failed to commit on drop: {e}");
339            }
340        }
341    }
342}
343
344pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
345    let mut offsets = repo.existing_offsets()?;
346    if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
347        offsets = offsets.split_off(pos);
348    }
349    let segments = Segments {
350        offs: offsets.into_iter(),
351        repo,
352        max_log_format_version,
353    };
354    Ok(Commits {
355        inner: None,
356        segments,
357        last_commit: CommitInfo::Initial { next_offset: offset },
358        last_error: None,
359    })
360}
361
362pub fn transactions_from<'a, R, D, T>(
363    repo: R,
364    max_log_format_version: u8,
365    offset: u64,
366    de: &'a D,
367) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
368where
369    R: Repo + 'a,
370    D: Decoder<Record = T>,
371    D::Error: From<error::Traversal>,
372    T: 'a,
373{
374    commits_from(repo, max_log_format_version, offset)
375        .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
376}
377
378pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
379where
380    R: Repo,
381    D: Decoder,
382    D::Error: From<error::Traversal> + From<io::Error>,
383{
384    let commits = commits_from(repo, max_log_format_version, offset)?;
385    fold_transactions_internal(commits.with_log_format_version(), de, offset)
386}
387
388fn transactions_from_internal<'a, R, D, T>(
389    commits: CommitsWithVersion<R>,
390    offset: u64,
391    de: &'a D,
392) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
393where
394    R: Repo + 'a,
395    D: Decoder<Record = T>,
396    D::Error: From<error::Traversal>,
397    T: 'a,
398{
399    commits
400        .map(|x| x.map_err(D::Error::from))
401        .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
402        .flatten_ok()
403        .map(|x| x.and_then(|y| y))
404}
405
406fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
407where
408    R: Repo,
409    D: Decoder,
410    D::Error: From<error::Traversal>,
411{
412    while let Some(commit) = commits.next() {
413        let (version, commit) = match commit {
414            Ok(version_and_commit) => version_and_commit,
415            Err(e) => {
416                // Ignore it if the very last commit in the log is broken.
417                // The next `append` will fix the log, but the `decoder`
418                // has no way to tell whether we're at the end or not.
419                // This is unlike the consumer of an iterator, which can
420                // perform below check itself.
421                if commits.next().is_none() {
422                    return Ok(());
423                }
424
425                return Err(e.into());
426            }
427        };
428        trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
429
430        let max_tx_offset = commit.min_tx_offset + commit.n as u64;
431        if max_tx_offset <= from {
432            continue;
433        }
434
435        let records = &mut commit.records.as_slice();
436        for n in 0..commit.n {
437            let tx_offset = commit.min_tx_offset + n as u64;
438            if tx_offset < from {
439                de.skip_record(version, tx_offset, records)?;
440            } else {
441                de.consume_record(version, tx_offset, records)?;
442            }
443        }
444    }
445
446    Ok(())
447}
448
449pub struct Segments<R> {
450    repo: R,
451    offs: vec::IntoIter<u64>,
452    max_log_format_version: u8,
453}
454
455impl<R: Repo> Iterator for Segments<R> {
456    type Item = io::Result<segment::Reader<R::Segment>>;
457
458    fn next(&mut self) -> Option<Self::Item> {
459        let off = self.offs.next()?;
460        debug!("iter segment {off}");
461        Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
462    }
463}
464
465/// Helper for the [`Commits`] iterator.
466enum CommitInfo {
467    /// Constructed in [`Generic::commits_from`], specifying the offset the next
468    /// commit should have.
469    Initial { next_offset: u64 },
470    /// The last commit seen by the iterator.
471    ///
472    /// Stores the range of transaction offsets, where `tx_range.end` is the
473    /// offset the next commit is expected to have. Also retains the checksum
474    /// needed to detect duplicate commits.
475    LastSeen { tx_range: Range<u64>, checksum: u32 },
476}
477
478impl CommitInfo {
479    /// `true` if the last seen commit in self and the provided one have the
480    /// same `min_tx_offset`.
481    fn same_offset_as(&self, commit: &StoredCommit) -> bool {
482        let Self::LastSeen { tx_range, .. } = self else {
483            return false;
484        };
485        tx_range.start == commit.min_tx_offset
486    }
487
488    /// `true` if the last seen commit in self and the provided one have the
489    /// same `checksum`.
490    fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
491        let Some(checksum) = self.checksum() else { return false };
492        checksum == &commit.checksum
493    }
494
495    fn checksum(&self) -> Option<&u32> {
496        match self {
497            Self::Initial { .. } => None,
498            Self::LastSeen { checksum, .. } => Some(checksum),
499        }
500    }
501
502    fn expected_offset(&self) -> &u64 {
503        match self {
504            Self::Initial { next_offset } => next_offset,
505            Self::LastSeen { tx_range, .. } => &tx_range.end,
506        }
507    }
508
509    // If initial offset falls within a commit, adjust it to the commit boundary.
510    //
511    // Returns `true` if the initial offset is past `commit`.
512    // Returns `false` if `self` isn't `Self::Initial`,
513    // or the initial offset has been adjusted to the starting offset of `commit`.
514    //
515    // For iteration, `true` means to skip the commit, `false` to yield it.
516    fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
517        if let Self::Initial { next_offset } = self {
518            let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
519            if *next_offset > last_tx_offset {
520                return true;
521            } else {
522                *next_offset = commit.min_tx_offset;
523            }
524        }
525
526        false
527    }
528}
529
530pub struct Commits<R: Repo> {
531    inner: Option<segment::Commits<R::Segment>>,
532    segments: Segments<R>,
533    last_commit: CommitInfo,
534    last_error: Option<error::Traversal>,
535}
536
537impl<R: Repo> Commits<R> {
538    fn current_segment_header(&self) -> Option<&segment::Header> {
539        self.inner.as_ref().map(|segment| &segment.header)
540    }
541
542    /// Turn `self` into an iterator which pairs the log format version of the
543    /// current segment with the [`Commit`].
544    pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
545        CommitsWithVersion { inner: self }
546    }
547
548    /// Advance the current-segment iterator to yield the next commit.
549    ///
550    /// Checks that the offset sequence is contiguous, and may skip commits
551    /// until the requested offset.
552    ///
553    /// Returns `None` if the segment iterator is exhausted or returns an error.
554    fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
555        loop {
556            match self.inner.as_mut()?.next()? {
557                Ok(commit) => {
558                    // Pop the last error. Either we'll return it below, or it's no longer
559                    // interesting.
560                    let prev_error = self.last_error.take();
561
562                    // Skip entries before the initial commit.
563                    if self.last_commit.adjust_initial_offset(&commit) {
564                        trace!("adjust initial offset");
565                        continue;
566                    // Same offset: ignore if duplicate (same crc), else report a "fork".
567                    } else if self.last_commit.same_offset_as(&commit) {
568                        if !self.last_commit.same_checksum_as(&commit) {
569                            warn!(
570                                "forked: commit={:?} last-error={:?} last-crc={:?}",
571                                commit,
572                                prev_error,
573                                self.last_commit.checksum()
574                            );
575                            return Some(Err(error::Traversal::Forked {
576                                offset: commit.min_tx_offset,
577                            }));
578                        } else {
579                            trace!("ignore duplicate");
580                            continue;
581                        }
582                    // Not the expected offset: report out-of-order.
583                    } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
584                        warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
585                        return Some(Err(error::Traversal::OutOfOrder {
586                            expected_offset: *self.last_commit.expected_offset(),
587                            actual_offset: commit.min_tx_offset,
588                            prev_error: prev_error.map(Box::new),
589                        }));
590                    // Seems legit, record info.
591                    } else {
592                        self.last_commit = CommitInfo::LastSeen {
593                            tx_range: commit.tx_range(),
594                            checksum: commit.checksum,
595                        };
596
597                        return Some(Ok(commit));
598                    }
599                }
600
601                Err(e) => {
602                    warn!("error reading next commit: {e}");
603                    // Stop traversing this segment here.
604                    //
605                    // If this is just a partial write at the end of the segment,
606                    // we may be able to obtain a commit with right offset from
607                    // the next segment.
608                    //
609                    // If we don't, the error here is likely more helpful, but
610                    // would be clobbered by `OutOfOrder`. Therefore we store it
611                    // here.
612                    self.set_last_error(e);
613
614                    return None;
615                }
616            }
617        }
618    }
619
620    /// Store `e` has the last error for delayed reporting.
621    fn set_last_error(&mut self, e: io::Error) {
622        // Recover a checksum mismatch.
623        let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
624            e.into_inner()
625                .unwrap()
626                .downcast::<error::ChecksumMismatch>()
627                .map(|source| error::Traversal::Checksum {
628                    offset: *self.last_commit.expected_offset(),
629                    source: *source,
630                })
631                .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
632        } else {
633            error::Traversal::from(e)
634        };
635        self.last_error = Some(last_error);
636    }
637
638    /// If we're still looking for the initial commit, try to use the offset
639    /// index to advance the segment reader.
640    fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::Segment>) {
641        if let CommitInfo::Initial { next_offset } = &self.last_commit {
642            let _ = self
643                .segments
644                .repo
645                .get_offset_index(segment.min_tx_offset)
646                .map_err(Into::into)
647                .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
648                .inspect_err(|e| {
649                    warn!(
650                        "commitlog offset index is not used at segment {}: {}",
651                        segment.min_tx_offset, e
652                    );
653                });
654        }
655    }
656}
657
658impl<R: Repo> Iterator for Commits<R> {
659    type Item = Result<StoredCommit, error::Traversal>;
660
661    fn next(&mut self) -> Option<Self::Item> {
662        if let Some(item) = self.next_commit() {
663            return Some(item);
664        }
665
666        match self.segments.next() {
667            // When there is no more data, the last commit being bad is an error
668            None => self.last_error.take().map(Err),
669            Some(segment) => segment.map_or_else(
670                |e| Some(Err(e.into())),
671                |mut segment| {
672                    self.try_seek_to_initial_offset(&mut segment);
673                    self.inner = Some(segment.commits());
674                    self.next()
675                },
676            ),
677        }
678    }
679}
680
681pub struct CommitsWithVersion<R: Repo> {
682    inner: Commits<R>,
683}
684
685impl<R: Repo> Iterator for CommitsWithVersion<R> {
686    type Item = Result<(u8, Commit), error::Traversal>;
687
688    fn next(&mut self) -> Option<Self::Item> {
689        let next = self.inner.next()?;
690        match next {
691            Ok(commit) => {
692                let version = self
693                    .inner
694                    .current_segment_header()
695                    .map(|hdr| hdr.log_format_version)
696                    .expect("segment header none even though segment yielded a commit");
697                Some(Ok((version, commit.into())))
698            }
699            Err(e) => Some(Err(e)),
700        }
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use std::{cell::Cell, iter::repeat};
707
708    use pretty_assertions::assert_matches;
709
710    use super::*;
711    use crate::{
712        payload::{ArrayDecodeError, ArrayDecoder},
713        tests::helpers::{fill_log, mem_log},
714    };
715
716    #[test]
717    fn rotate_segments_simple() {
718        let mut log = mem_log::<[u8; 32]>(128);
719        for _ in 0..3 {
720            log.append([0; 32]).unwrap();
721            log.commit().unwrap();
722        }
723
724        let offsets = log.repo.existing_offsets().unwrap();
725        assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
726        assert_eq!(offsets[offsets.len() - 1], 2);
727    }
728
729    #[test]
730    fn huge_commit() {
731        let mut log = mem_log::<[u8; 32]>(32);
732
733        log.append([0; 32]).unwrap();
734        log.append([1; 32]).unwrap();
735        log.commit().unwrap();
736        assert!(log.head.len() > log.opts.max_segment_size);
737
738        log.append([2; 32]).unwrap();
739        log.commit().unwrap();
740
741        assert_eq!(&log.tail, &[0]);
742        assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
743    }
744
745    #[test]
746    fn traverse_commits() {
747        let mut log = mem_log::<[u8; 32]>(32);
748        fill_log(&mut log, 10, repeat(1));
749
750        for (i, commit) in (0..10).zip(log.commits_from(0)) {
751            assert_eq!(i, commit.unwrap().min_tx_offset);
752        }
753    }
754
755    #[test]
756    fn traverse_commits_with_offset() {
757        let mut log = mem_log::<[u8; 32]>(32);
758        fill_log(&mut log, 10, repeat(1));
759
760        for offset in 0..10 {
761            for commit in log.commits_from(offset) {
762                let commit = commit.unwrap();
763                assert!(commit.min_tx_offset >= offset);
764            }
765        }
766        assert_eq!(0, log.commits_from(10).count());
767    }
768
769    #[test]
770    fn fold_transactions_with_offset() {
771        let mut log = mem_log::<[u8; 32]>(32);
772        fill_log(&mut log, 10, repeat(1));
773
774        /// A [`Decoder`] which counts the number of records decoded,
775        /// and asserts that the `tx_offset` is as expected.
776        struct CountDecoder {
777            count: Cell<u64>,
778            next_tx_offset: Cell<u64>,
779        }
780
781        impl Decoder for &CountDecoder {
782            type Record = [u8; 32];
783            type Error = ArrayDecodeError;
784
785            fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
786                &self,
787                _version: u8,
788                _tx_offset: u64,
789                _reader: &mut R,
790            ) -> Result<Self::Record, Self::Error> {
791                unreachable!("Folding never calls `decode_record`")
792            }
793
794            fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
795                &self,
796                version: u8,
797                tx_offset: u64,
798                reader: &mut R,
799            ) -> Result<(), Self::Error> {
800                let decoder = ArrayDecoder::<32>;
801                decoder.consume_record(version, tx_offset, reader)?;
802                self.count.set(self.count.get() + 1);
803                let expected_tx_offset = self.next_tx_offset.get();
804                assert_eq!(expected_tx_offset, tx_offset);
805                self.next_tx_offset.set(expected_tx_offset + 1);
806                Ok(())
807            }
808
809            fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
810                &self,
811                version: u8,
812                tx_offset: u64,
813                reader: &mut R,
814            ) -> Result<(), Self::Error> {
815                let decoder = ArrayDecoder::<32>;
816                decoder.consume_record(version, tx_offset, reader)?;
817                Ok(())
818            }
819        }
820
821        for offset in 0..10 {
822            let decoder = CountDecoder {
823                count: Cell::new(0),
824                next_tx_offset: Cell::new(offset),
825            };
826
827            log.fold_transactions_from(offset, &decoder).unwrap();
828
829            assert_eq!(decoder.count.get(), 10 - offset);
830            assert_eq!(decoder.next_tx_offset.get(), 10);
831        }
832    }
833
834    #[test]
835    fn traverse_commits_ignores_duplicates() {
836        let mut log = mem_log::<[u8; 32]>(1024);
837
838        log.append([42; 32]).unwrap();
839        let commit1 = log.head.commit.clone();
840        log.commit().unwrap();
841        log.head.commit = commit1.clone();
842        log.commit().unwrap();
843        log.append([43; 32]).unwrap();
844        let commit2 = log.head.commit.clone();
845        log.commit().unwrap();
846
847        assert_eq!(
848            [commit1, commit2].as_slice(),
849            &log.commits_from(0)
850                .map_ok(Commit::from)
851                .collect::<Result<Vec<_>, _>>()
852                .unwrap()
853        );
854    }
855
856    #[test]
857    fn traverse_commits_errors_when_forked() {
858        let mut log = mem_log::<[u8; 32]>(1024);
859
860        log.append([42; 32]).unwrap();
861        log.commit().unwrap();
862        log.head.commit = Commit {
863            min_tx_offset: 0,
864            n: 1,
865            records: [43; 32].to_vec(),
866            epoch: 0,
867        };
868        log.commit().unwrap();
869
870        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
871        assert!(
872            matches!(res, Err(error::Traversal::Forked { offset: 0 })),
873            "expected fork error: {res:?}"
874        )
875    }
876
877    #[test]
878    fn traverse_commits_errors_when_offset_not_contiguous() {
879        let mut log = mem_log::<[u8; 32]>(1024);
880
881        log.append([42; 32]).unwrap();
882        log.commit().unwrap();
883        log.head.commit.min_tx_offset = 18;
884        log.append([42; 32]).unwrap();
885        log.commit().unwrap();
886
887        let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
888        assert!(
889            matches!(
890                res,
891                Err(error::Traversal::OutOfOrder {
892                    expected_offset: 1,
893                    actual_offset: 18,
894                    prev_error: None
895                })
896            ),
897            "expected fork error: {res:?}"
898        )
899    }
900
901    #[test]
902    fn traverse_transactions() {
903        let mut log = mem_log::<[u8; 32]>(32);
904        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
905
906        for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
907            assert_eq!(i, tx.unwrap().offset);
908        }
909    }
910
911    #[test]
912    fn traverse_transactions_with_offset() {
913        let mut log = mem_log::<[u8; 32]>(32);
914        let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
915
916        for offset in 0..total_txs {
917            let mut iter = log.transactions_from(offset, &ArrayDecoder);
918            assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
919            for tx in iter {
920                assert!(tx.unwrap().offset >= offset);
921            }
922        }
923        assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
924    }
925
926    #[test]
927    fn traverse_empty() {
928        let log = mem_log::<[u8; 32]>(32);
929
930        assert_eq!(0, log.commits_from(0).count());
931        assert_eq!(0, log.commits_from(42).count());
932        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
933        assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
934    }
935
936    #[test]
937    fn reset_hard() {
938        let mut log = mem_log::<[u8; 32]>(128);
939        fill_log(&mut log, 50, (1..=10).cycle());
940
941        log = log.reset().unwrap();
942        assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
943    }
944
945    #[test]
946    fn reset_to_offset() {
947        let mut log = mem_log::<[u8; 32]>(128);
948        let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
949
950        for offset in (0..total_txs).rev() {
951            log = log.reset_to(offset).unwrap();
952            assert_eq!(
953                offset,
954                log.transactions_from(0, &ArrayDecoder)
955                    .map(Result::unwrap)
956                    .last()
957                    .unwrap()
958                    .offset
959            );
960            // We're counting from zero, so offset + 1 is the # of txs.
961            assert_eq!(
962                offset + 1,
963                log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
964            );
965        }
966    }
967
968    #[test]
969    fn reset_to_offset_many_txs_per_commit() {
970        let mut log = mem_log::<[u8; 32]>(128);
971        let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
972
973        // No op.
974        log = log.reset_to(total_txs).unwrap();
975        assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
976
977        let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
978
979        // Both fall into the middle commit, which should be retained.
980        log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
981        assert_eq!(
982            middle_commit.tx_range().end,
983            log.transactions_from(0, &ArrayDecoder).count() as u64
984        );
985        log = log.reset_to(middle_commit.min_tx_offset).unwrap();
986        assert_eq!(
987            middle_commit.tx_range().end,
988            log.transactions_from(0, &ArrayDecoder).count() as u64
989        );
990
991        // Offset falls into 2nd commit.
992        // 1st commit (1 tx) + 2nd commit (2 txs) = 3
993        log = log.reset_to(1).unwrap();
994        assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
995
996        // Offset falls into 1st commit.
997        // 1st commit (1 tx) = 1
998        log = log.reset_to(0).unwrap();
999        assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1000    }
1001
1002    #[test]
1003    fn reopen() {
1004        let mut log = mem_log::<[u8; 32]>(1024);
1005        let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1006        assert_eq!(
1007            total_txs,
1008            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1009        );
1010
1011        let mut log = Generic::<_, [u8; 32]>::open(
1012            log.repo.clone(),
1013            Options {
1014                max_segment_size: 1024,
1015                ..Options::default()
1016            },
1017        )
1018        .unwrap();
1019        total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1020
1021        assert_eq!(
1022            total_txs,
1023            log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1024        );
1025    }
1026
1027    #[test]
1028    fn set_same_epoch_does_nothing() {
1029        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1030        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1031        let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1032        assert_eq!(committed, None);
1033    }
1034
1035    #[test]
1036    fn set_new_epoch_commits() {
1037        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1038        assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1039        log.append(<_>::default()).unwrap();
1040        let committed = log
1041            .set_epoch(42)
1042            .unwrap()
1043            .expect("should have committed the pending transaction");
1044        assert_eq!(log.epoch(), 42);
1045        assert_eq!(committed.tx_range.start, 0);
1046    }
1047
1048    #[test]
1049    fn set_lower_epoch_returns_error() {
1050        let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1051        log.set_epoch(42).unwrap();
1052        assert_eq!(log.epoch(), 42);
1053        assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1054    }
1055}