spacetimedb_commitlog/
segment.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4    num::{NonZeroU16, NonZeroU64},
5    ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11    commit::{self, Commit, StoredCommit},
12    error,
13    index::{IndexError, IndexFileMut},
14    payload::Encode,
15    repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16    Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27/// Lookup table for checksum length, index is [`Header::checksum_algorithm`].
28// Supported algorithms must be numbered consecutively!
29pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33    pub log_format_version: u8,
34    pub checksum_algorithm: u8,
35}
36
37impl Header {
38    pub const LEN: usize = MAGIC.len() + /* log_format_version + checksum_algorithm + reserved + reserved */ 4;
39
40    pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41        out.write_all(&MAGIC)?;
42        out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44        Ok(())
45    }
46
47    pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48        let mut buf = [0; Self::LEN];
49        read.read_exact(&mut buf)?;
50
51        if !buf.starts_with(&MAGIC) {
52            return Err(io::Error::new(
53                io::ErrorKind::InvalidData,
54                "segment header does not start with magic",
55            ));
56        }
57
58        Ok(Self {
59            log_format_version: buf[MAGIC.len()],
60            checksum_algorithm: buf[MAGIC.len() + 1],
61        })
62    }
63
64    pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65        if self.log_format_version > max_log_format_version {
66            return Err(format!("unsupported log format version: {}", self.log_format_version));
67        }
68        if self.checksum_algorithm != checksum_algorithm {
69            return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70        }
71
72        Ok(())
73    }
74}
75
76impl Default for Header {
77    fn default() -> Self {
78        Self {
79            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80            checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81        }
82    }
83}
84
85/// Metadata about a [`Commit`] which was successfully written via [`Writer::commit`].
86#[derive(Debug, PartialEq)]
87pub struct Committed {
88    /// The range of transaction offsets included in the commit.
89    pub tx_range: Range<u64>,
90    /// The crc32 checksum of the commit's serialized form,
91    /// as written to the commitlog.
92    pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97    pub(crate) commit: Commit,
98    pub(crate) inner: BufWriter<W>,
99
100    pub(crate) min_tx_offset: u64,
101    pub(crate) bytes_written: u64,
102
103    pub(crate) max_records_in_commit: NonZeroU16,
104
105    pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109    /// Append the record (aka transaction) `T` to the segment.
110    ///
111    /// If the number of currently buffered records would exceed `max_records_in_commit`
112    /// after the method returns, the argument is returned in an `Err` and not
113    /// appended to this writer's buffer.
114    ///
115    /// Otherwise, the `record` is encoded and and stored in the buffer.
116    ///
117    /// An `Err` result indicates that [`Self::commit`] should be called in
118    /// order to flush the buffered records to persistent storage.
119    pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120        if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121            Err(record)
122        } else {
123            self.commit.n += 1;
124            record.encode_record(&mut self.commit.records);
125            Ok(())
126        }
127    }
128
129    /// Write the current [`Commit`] to the underlying [`io::Write`].
130    ///
131    /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero).
132    /// In this case, `None` is returned.
133    ///
134    /// Otherwise `Some` [`Committed`] is returned, providing some metadata about
135    /// the commit.
136    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137        if self.commit.n == 0 {
138            return Ok(None);
139        }
140        let checksum = self.commit.write(&mut self.inner)?;
141        self.inner.flush()?;
142
143        let commit_len = self.commit.encoded_len() as u64;
144        self.offset_index_head.as_mut().map(|index| {
145            debug!(
146                "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147                self.commit.min_tx_offset, self.bytes_written, commit_len
148            );
149            index
150                .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151                .map_err(|e| {
152                    debug!("failed to append to offset index: {e:?}");
153                })
154        });
155
156        let tx_range_start = self.commit.min_tx_offset;
157
158        self.bytes_written += commit_len;
159        self.commit.min_tx_offset += self.commit.n as u64;
160        self.commit.n = 0;
161        self.commit.records.clear();
162
163        Ok(Some(Committed {
164            tx_range: tx_range_start..self.commit.min_tx_offset,
165            checksum,
166        }))
167    }
168
169    /// Get the current epoch.
170    pub fn epoch(&self) -> u64 {
171        self.commit.epoch
172    }
173
174    /// Update the epoch.
175    ///
176    /// The caller must ensure that:
177    ///
178    /// - The new epoch is greater than the current epoch.
179    /// - [`Self::commit`] has been called as appropriate.
180    ///
181    pub fn set_epoch(&mut self, epoch: u64) {
182        self.commit.epoch = epoch;
183    }
184
185    /// The smallest transaction offset in this segment.
186    pub fn min_tx_offset(&self) -> u64 {
187        self.min_tx_offset
188    }
189
190    /// The next transaction offset to be written if [`Self::commit`] was called.
191    pub fn next_tx_offset(&self) -> u64 {
192        self.commit.min_tx_offset
193    }
194
195    /// `true` if the segment contains no commits.
196    ///
197    /// The segment will, however, contain a header. This thus violates the
198    /// convention that `is_empty == (len == 0)`.
199    pub fn is_empty(&self) -> bool {
200        self.bytes_written <= Header::LEN as u64
201    }
202
203    /// Number of bytes written to this segment, including the header.
204    pub fn len(&self) -> u64 {
205        self.bytes_written
206    }
207}
208
209pub trait FileLike {
210    fn fsync(&mut self) -> io::Result<()>;
211    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215    fn fsync(&mut self) -> io::Result<()> {
216        self.sync_data()
217    }
218
219    fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220        self.set_len(size)
221    }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225    fn fsync(&mut self) -> io::Result<()> {
226        self.get_mut().fsync()
227    }
228
229    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230        self.get_mut().ftruncate(tx_offset, size)
231    }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235    fn fsync(&mut self) -> io::Result<()> {
236        self.inner.fsync()?;
237        self.offset_index_head.as_mut().map(|index| index.fsync());
238        Ok(())
239    }
240
241    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242        self.inner.ftruncate(tx_offset, size)?;
243        self.offset_index_head
244            .as_mut()
245            .map(|index| index.ftruncate(tx_offset, size));
246        Ok(())
247    }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252    pub(crate) head: TxOffsetIndexMut,
253
254    require_segment_fsync: bool,
255    min_write_interval: NonZeroU64,
256
257    pub(crate) candidate_min_tx_offset: TxOffset,
258    pub(crate) candidate_byte_offset: u64,
259    pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263    pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264        OffsetIndexWriter {
265            head,
266            require_segment_fsync: opts.offset_index_require_segment_fsync,
267            min_write_interval: opts.offset_index_interval_bytes,
268            candidate_min_tx_offset: TxOffset::default(),
269            candidate_byte_offset: 0,
270            bytes_since_last_index: 0,
271        }
272    }
273
274    fn reset(&mut self) {
275        self.candidate_byte_offset = 0;
276        self.candidate_min_tx_offset = TxOffset::default();
277        self.bytes_since_last_index = 0;
278    }
279
280    /// Either append to index or save offsets to append at future fsync
281    pub fn append_after_commit(
282        &mut self,
283        min_tx_offset: TxOffset,
284        byte_offset: u64,
285        commit_len: u64,
286    ) -> Result<(), IndexError> {
287        self.bytes_since_last_index += commit_len;
288
289        if self.candidate_min_tx_offset == 0 {
290            self.candidate_byte_offset = byte_offset;
291            self.candidate_min_tx_offset = min_tx_offset;
292        }
293
294        if !self.require_segment_fsync {
295            self.append_internal()?;
296        }
297
298        Ok(())
299    }
300
301    fn append_internal(&mut self) -> Result<(), IndexError> {
302        // If the candidate offset is zero, there has not been a commit since the last offset entry
303        if self.candidate_min_tx_offset == 0 {
304            return Ok(());
305        }
306
307        if self.bytes_since_last_index < self.min_write_interval.get() {
308            return Ok(());
309        }
310
311        self.head
312            .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313        self.head.async_flush()?;
314        self.reset();
315
316        Ok(())
317    }
318}
319
320impl FileLike for OffsetIndexWriter {
321    /// Must be called via SegmentWriter::fsync
322    fn fsync(&mut self) -> io::Result<()> {
323        let _ = self.append_internal().map_err(|e| {
324            warn!("failed to append to offset index: {e:?}");
325        });
326        let _ = self
327            .head
328            .async_flush()
329            .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330        Ok(())
331    }
332
333    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334        self.reset();
335        self.head
336            .truncate(tx_offset)
337            .inspect_err(|e| {
338                warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339            })
340            .ok();
341        Ok(())
342    }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346    fn fsync(&mut self) -> io::Result<()> {
347        self.async_flush()
348    }
349
350    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351        self.truncate(tx_offset)
352            .map_err(|e| io::Error::other(format!("failed to truncate offset index at {tx_offset}: {e:?}")))
353    }
354}
355
356#[derive(Debug)]
357pub struct Reader<R> {
358    pub header: Header,
359    pub min_tx_offset: u64,
360    inner: R,
361}
362
363impl<R: io::Read + io::Seek> Reader<R> {
364    pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
365        let header = Header::decode(&mut inner)?;
366        header
367            .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
368            .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
369
370        Ok(Self {
371            header,
372            min_tx_offset,
373            inner,
374        })
375    }
376}
377
378impl<R: io::BufRead + io::Seek> Reader<R> {
379    pub fn commits(self) -> Commits<R> {
380        Commits {
381            header: self.header,
382            reader: self.inner,
383        }
384    }
385
386    pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<u64, IndexError> {
387        seek_to_offset(&mut self.inner, index_file, start_tx_offset)
388    }
389
390    #[cfg(test)]
391    pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
392    where
393        D: crate::Decoder,
394        D::Error: From<io::Error>,
395        R: 'a,
396    {
397        use itertools::Itertools as _;
398
399        self.commits()
400            .with_log_format_version()
401            .map(|x| x.map_err(Into::into))
402            .map_ok(move |(version, commit)| {
403                let start = commit.min_tx_offset;
404                commit.into_transactions(version, start, de)
405            })
406            .flatten_ok()
407            .map(|x| x.and_then(|y| y))
408    }
409
410    #[cfg(test)]
411    pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
412        Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
413    }
414}
415
416/// Advances the `segment` reader to the position corresponding to the `start_tx_offset`
417/// using the `index_file` for efficient seeking.
418///
419/// Input:
420/// - `segment` - segment reader
421/// - `min_tx_offset` - minimum transaction offset in the segment
422/// - `start_tx_offset` - transaction offset to advance to
423///
424/// Returns the byte position `segment` is at after seeking.
425pub fn seek_to_offset<R: io::Read + io::Seek>(
426    mut segment: &mut R,
427    index_file: &TxOffsetIndex,
428    start_tx_offset: u64,
429) -> Result<u64, IndexError> {
430    let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
431
432    // If the index_key is 0, it means the index file is empty, return error without seeking
433    if index_key == 0 {
434        return Err(IndexError::KeyNotFound);
435    }
436    debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
437    // returned `index_key` should never be greater than `start_tx_offset`
438    debug_assert!(index_key <= start_tx_offset);
439
440    // Check if the offset index is pointing to the right commit.
441    let hdr = validate_commit_header(&mut segment, byte_offset)?;
442    if hdr.min_tx_offset == index_key {
443        // Advance the segment Seek if expected commit is found.
444        segment.seek(SeekFrom::Start(byte_offset))
445    } else {
446        Err(io::Error::new(
447            io::ErrorKind::InvalidData,
448            "mismatched key in offset index file",
449        ))
450    }
451    .map_err(Into::into)
452}
453
454/// Try to extract the commit header from the asked position without advancing seek.
455/// `IndexFileMut` fsync asynchoronously, which makes it important for reader to verify its entry
456pub fn validate_commit_header<Reader: io::Read + io::Seek>(
457    mut reader: &mut Reader,
458    byte_offset: u64,
459) -> io::Result<commit::Header> {
460    let pos = reader.stream_position()?;
461    reader.seek(SeekFrom::Start(byte_offset))?;
462
463    let hdr = commit::Header::decode(&mut reader)
464        .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
465
466    // Restore the original position
467    reader.seek(SeekFrom::Start(pos))?;
468
469    hdr
470}
471
472/// Pair of transaction offset and payload.
473///
474/// Created by iterators which "flatten" commits into individual transaction
475/// records.
476#[derive(Debug, PartialEq)]
477pub struct Transaction<T> {
478    /// The offset of this transaction relative to the start of the log.
479    pub offset: u64,
480    /// The transaction payload.
481    pub txdata: T,
482}
483
484pub struct Commits<R> {
485    pub header: Header,
486    reader: R,
487}
488
489impl<R: io::BufRead> Iterator for Commits<R> {
490    type Item = io::Result<StoredCommit>;
491
492    fn next(&mut self) -> Option<Self::Item> {
493        StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
494    }
495}
496
497#[cfg(test)]
498impl<R: io::BufRead> Commits<R> {
499    pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
500        CommitsWithVersion { inner: self }
501    }
502}
503
504#[cfg(test)]
505struct CommitsWithVersion<R> {
506    inner: Commits<R>,
507}
508
509#[cfg(test)]
510impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
511    type Item = io::Result<(u8, StoredCommit)>;
512
513    fn next(&mut self) -> Option<Self::Item> {
514        let next = self.inner.next()?;
515        match next {
516            Ok(commit) => {
517                let version = self.inner.header.log_format_version;
518                Some(Ok((version, commit)))
519            }
520            Err(e) => Some(Err(e)),
521        }
522    }
523}
524
525#[derive(Clone, Debug, Eq, PartialEq)]
526pub struct Metadata {
527    /// The segment header.
528    pub header: Header,
529    /// The range of transactions contained in the segment.
530    pub tx_range: Range<u64>,
531    /// The size of the segment.
532    pub size_in_bytes: u64,
533    /// The largest epoch found in the segment.
534    pub max_epoch: u64,
535    /// The latest commit found in the segment.
536    ///
537    /// The value is the `min_tx_offset` of the commit, i.e.
538    /// `max_commit_offset..tx_range.end` is the range of
539    /// transactions contained in it.
540    pub max_commit_offset: u64,
541}
542
543impl Metadata {
544    /// Reads and validates metadata from a segment.
545    /// It will look for last commit index offset and then traverse the segment
546    ///
547    /// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
548    pub(crate) fn extract<R: io::Read + io::Seek>(
549        min_tx_offset: TxOffset,
550        mut reader: R,
551        offset_index: Option<&TxOffsetIndex>,
552    ) -> Result<Self, error::SegmentMetadata> {
553        let header = Header::decode(&mut reader)?;
554        Self::with_header(min_tx_offset, header, reader, offset_index)
555    }
556
557    fn with_header<R: io::Read + io::Seek>(
558        min_tx_offset: u64,
559        header: Header,
560        mut reader: R,
561        offset_index: Option<&TxOffsetIndex>,
562    ) -> Result<Self, error::SegmentMetadata> {
563        let mut sofar = offset_index
564            .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
565            .unwrap_or_else(|| Self {
566                header,
567                tx_range: Range {
568                    start: min_tx_offset,
569                    end: min_tx_offset,
570                },
571                size_in_bytes: Header::LEN as u64,
572                max_epoch: u64::default(),
573                max_commit_offset: min_tx_offset,
574            });
575
576        reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
577
578        fn commit_meta<R: io::Read>(
579            reader: &mut R,
580            sofar: &Metadata,
581        ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
582            commit::Metadata::extract(reader).map_err(|e| {
583                if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
584                    error::SegmentMetadata::InvalidCommit {
585                        sofar: sofar.clone(),
586                        source: e,
587                    }
588                } else {
589                    e.into()
590                }
591            })
592        }
593        while let Some(commit) = commit_meta(&mut reader, &sofar)? {
594            debug!("commit::{commit:?}");
595            if commit.tx_range.start != sofar.tx_range.end {
596                return Err(io::Error::new(
597                    io::ErrorKind::InvalidData,
598                    format!(
599                        "out-of-order offset: expected={} actual={}",
600                        sofar.tx_range.end, commit.tx_range.start,
601                    ),
602                )
603                .into());
604            }
605            sofar.tx_range.end = commit.tx_range.end;
606            sofar.size_in_bytes += commit.size_in_bytes;
607            // TODO: Should it be an error to encounter an epoch going backwards?
608            sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
609            sofar.max_commit_offset = commit.tx_range.start;
610        }
611
612        Ok(sofar)
613    }
614
615    /// Finds the last valid commit in the segment using the offset index.
616    /// It traverses the index in reverse order, starting from the last key.
617    ///
618    /// Returns
619    /// * `Ok((Metadata)` - If a valid commit is found containing the commit, It adds a default
620    ///   header, which should be replaced with the actual header.
621    /// * `Err` - If no valid commit is found or if the index is empty
622    fn find_valid_indexed_commit<R: io::Read + io::Seek>(
623        min_tx_offset: u64,
624        header: Header,
625        reader: &mut R,
626        offset_index: &TxOffsetIndex,
627    ) -> io::Result<Metadata> {
628        let mut candidate_last_key = TxOffset::MAX;
629
630        while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
631            match Self::validate_commit_at_offset(reader, key, byte_offset) {
632                Ok(commit) => {
633                    return Ok(Metadata {
634                        header,
635                        tx_range: Range {
636                            start: min_tx_offset,
637                            end: commit.tx_range.end,
638                        },
639                        size_in_bytes: byte_offset + commit.size_in_bytes,
640                        max_epoch: commit.epoch,
641                        max_commit_offset: commit.tx_range.start,
642                    });
643                }
644
645                // `TxOffset` at `byte_offset` is not valid, so try with previous entry
646                Err(_) => {
647                    candidate_last_key = key.saturating_sub(1);
648                    if candidate_last_key == 0 {
649                        break;
650                    }
651                }
652            }
653        }
654
655        Err(io::Error::new(
656            ErrorKind::InvalidData,
657            format!("No valid commit found in index up to key: {candidate_last_key}"),
658        ))
659    }
660
661    /// Validates and decodes a commit at `byte_offset` in the segment.
662    ///
663    /// # Returns
664    /// * `Ok(commit::Metadata)` - If a valid commit is found with matching transaction offset
665    /// * `Err` - If commit can't be decoded or has mismatched transaction offset
666    fn validate_commit_at_offset<R: io::Read + io::Seek>(
667        reader: &mut R,
668        tx_offset: TxOffset,
669        byte_offset: u64,
670    ) -> io::Result<commit::Metadata> {
671        reader.seek(SeekFrom::Start(byte_offset))?;
672        let commit = commit::Metadata::extract(reader)?
673            .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
674
675        if commit.tx_range.start != tx_offset {
676            return Err(io::Error::new(
677                ErrorKind::InvalidData,
678                format!(
679                    "mismatch key in index offset file: expected={} actual={}",
680                    tx_offset, commit.tx_range.start
681                ),
682            ));
683        }
684
685        Ok(commit)
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use std::num::NonZeroU16;
692
693    use super::*;
694    use crate::{payload::ArrayDecoder, repo, Options};
695    use itertools::Itertools;
696    use pretty_assertions::assert_matches;
697    use proptest::prelude::*;
698    use spacetimedb_paths::server::CommitLogDir;
699    use tempfile::tempdir;
700
701    #[test]
702    fn header_roundtrip() {
703        let hdr = Header {
704            log_format_version: 42,
705            checksum_algorithm: 7,
706        };
707
708        let mut buf = [0u8; Header::LEN];
709        hdr.write(&mut &mut buf[..]).unwrap();
710        let h2 = Header::decode(&buf[..]).unwrap();
711
712        assert_eq!(hdr, h2);
713    }
714
715    #[test]
716    fn write_read_roundtrip() {
717        let repo = repo::Memory::default();
718
719        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
720        writer.append([0; 32]).unwrap();
721        writer.append([1; 32]).unwrap();
722        writer.append([2; 32]).unwrap();
723        writer.commit().unwrap();
724
725        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
726        let header = reader.header;
727        let commit = reader
728            .commits()
729            .next()
730            .expect("expected one commit")
731            .expect("unexpected IO");
732
733        assert_eq!(
734            header,
735            Header {
736                log_format_version: DEFAULT_LOG_FORMAT_VERSION,
737                checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
738            }
739        );
740        assert_eq!(commit.min_tx_offset, 0);
741        assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
742    }
743
744    #[test]
745    fn metadata() {
746        let repo = repo::Memory::default();
747
748        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
749        // Commit 0..2
750        writer.append([0; 32]).unwrap();
751        writer.append([0; 32]).unwrap();
752        writer.commit().unwrap();
753        // Commit 2..3
754        writer.append([1; 32]).unwrap();
755        writer.commit().unwrap();
756        // Commit 3..5
757        writer.append([2; 32]).unwrap();
758        writer.append([2; 32]).unwrap();
759        writer.commit().unwrap();
760
761        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
762        let metadata = reader.metadata().unwrap();
763
764        assert_eq!(
765            metadata,
766            Metadata {
767                header: Header::default(),
768                tx_range: Range { start: 0, end: 5 },
769                // header + 5 txs + 3 commits
770                size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
771                max_epoch: Commit::DEFAULT_EPOCH,
772                max_commit_offset: 3
773            }
774        );
775    }
776
777    #[test]
778    fn commits() {
779        let repo = repo::Memory::default();
780        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
781
782        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
783        for commit in &commits {
784            for tx in commit {
785                writer.append(*tx).unwrap();
786            }
787            writer.commit().unwrap();
788        }
789
790        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
791        let mut commits1 = Vec::with_capacity(commits.len());
792        let mut min_tx_offset = 0;
793        for txs in commits {
794            commits1.push(Commit {
795                min_tx_offset,
796                n: txs.len() as u16,
797                records: txs.concat(),
798                epoch: 0,
799            });
800            min_tx_offset += txs.len() as u64;
801        }
802        let commits2 = reader
803            .commits()
804            .map_ok(Into::into)
805            .collect::<Result<Vec<Commit>, _>>()
806            .unwrap();
807        assert_eq!(commits1, commits2);
808    }
809
810    #[test]
811    fn transactions() {
812        let repo = repo::Memory::default();
813        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
814
815        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
816        for commit in &commits {
817            for tx in commit {
818                writer.append(*tx).unwrap();
819            }
820            writer.commit().unwrap();
821        }
822
823        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
824        let txs = reader
825            .transactions(&ArrayDecoder)
826            .collect::<Result<Vec<_>, _>>()
827            .unwrap();
828        assert_eq!(
829            txs,
830            commits
831                .into_iter()
832                .flatten()
833                .enumerate()
834                .map(|(offset, txdata)| Transaction {
835                    offset: offset as u64,
836                    txdata
837                })
838                .collect::<Vec<_>>()
839        );
840    }
841
842    proptest! {
843        #[test]
844        fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
845            let mut writer = Writer {
846                commit: Commit::default(),
847                inner: BufWriter::new(Vec::new()),
848
849                min_tx_offset: 0,
850                bytes_written: 0,
851
852                max_records_in_commit,
853
854                offset_index_head: None,
855            };
856
857            for i in 0..max_records_in_commit.get() {
858                assert!(
859                    writer.append([0; 16]).is_ok(),
860                    "less than {} records written: {}",
861                    max_records_in_commit.get(),
862                    i
863                );
864            }
865            assert!(
866                writer.append([0; 16]).is_err(),
867                "more than {} records written",
868                max_records_in_commit.get()
869            );
870        }
871    }
872
873    #[test]
874    fn next_tx_offset() {
875        let mut writer = Writer {
876            commit: Commit::default(),
877            inner: BufWriter::new(Vec::new()),
878
879            min_tx_offset: 0,
880            bytes_written: 0,
881
882            max_records_in_commit: NonZeroU16::MAX,
883            offset_index_head: None,
884        };
885
886        assert_eq!(0, writer.next_tx_offset());
887        writer.append([0; 16]).unwrap();
888        assert_eq!(0, writer.next_tx_offset());
889        writer.commit().unwrap();
890        assert_eq!(1, writer.next_tx_offset());
891        writer.commit().unwrap();
892        assert_eq!(1, writer.next_tx_offset());
893        writer.append([1; 16]).unwrap();
894        writer.append([1; 16]).unwrap();
895        writer.commit().unwrap();
896        assert_eq!(3, writer.next_tx_offset());
897    }
898
899    #[test]
900    fn offset_index_writer_truncates_to_offset() {
901        use spacetimedb_paths::FromPathUnchecked as _;
902
903        let tmp = tempdir().unwrap();
904        let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
905        let index_path = commitlog_dir.index(0);
906        let mut writer = OffsetIndexWriter::new(
907            TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
908            Options {
909                // Ensure we're writing every index entry.
910                offset_index_interval_bytes: 127.try_into().unwrap(),
911                offset_index_require_segment_fsync: false,
912                ..Default::default()
913            },
914        );
915
916        for i in 1..=10 {
917            writer.append_after_commit(i, i * 128, 128).unwrap();
918        }
919        // Ensure all entries have been written.
920        for i in 1..=10 {
921            assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
922        }
923
924        // Truncating to any offset in the written range or larger
925        // retains that offset - 1, or the max offset written.
926        for truncate_to in (2..=10u64).rev() {
927            let retained_key = truncate_to.saturating_sub(1).min(10);
928            let retained_val = retained_key * 128;
929            let retained = (retained_key, retained_val);
930
931            writer.ftruncate(truncate_to, rand::random()).unwrap();
932            assert_matches!(
933                writer.head.key_lookup(truncate_to),
934                Ok(x) if x == retained,
935                "truncate to {truncate_to} should retain {retained:?}"
936            );
937            // Make sure this also holds after reopen.
938            let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
939            assert_matches!(
940                index.key_lookup(truncate_to),
941                Ok(x) if x == retained,
942                "truncate to {truncate_to} should retain {retained:?} after reopen"
943            );
944        }
945
946        // Truncating to 1 leaves no entries in the index
947        writer.ftruncate(1, rand::random()).unwrap();
948        assert_matches!(writer.head.key_lookup(1), Err(IndexError::KeyNotFound));
949    }
950}