spacetimedb_commitlog/
segment.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4    num::{NonZeroU16, NonZeroU64},
5    ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11    commit::{self, Commit, StoredCommit},
12    error,
13    index::{IndexError, IndexFileMut},
14    payload::Encode,
15    repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16    Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27/// Lookup table for checksum length, index is [`Header::checksum_algorithm`].
28// Supported algorithms must be numbered consecutively!
29pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33    pub log_format_version: u8,
34    pub checksum_algorithm: u8,
35}
36
37impl Header {
38    pub const LEN: usize = MAGIC.len() + /* log_format_version + checksum_algorithm + reserved + reserved */ 4;
39
40    pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41        out.write_all(&MAGIC)?;
42        out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44        Ok(())
45    }
46
47    pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48        let mut buf = [0; Self::LEN];
49        read.read_exact(&mut buf)?;
50
51        if !buf.starts_with(&MAGIC) {
52            return Err(io::Error::new(
53                io::ErrorKind::InvalidData,
54                "segment header does not start with magic",
55            ));
56        }
57
58        Ok(Self {
59            log_format_version: buf[MAGIC.len()],
60            checksum_algorithm: buf[MAGIC.len() + 1],
61        })
62    }
63
64    pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65        if self.log_format_version > max_log_format_version {
66            return Err(format!("unsupported log format version: {}", self.log_format_version));
67        }
68        if self.checksum_algorithm != checksum_algorithm {
69            return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70        }
71
72        Ok(())
73    }
74}
75
76impl Default for Header {
77    fn default() -> Self {
78        Self {
79            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80            checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81        }
82    }
83}
84
85/// Metadata about a [`Commit`] which was successfully written via [`Writer::commit`].
86#[derive(Debug, PartialEq)]
87pub struct Committed {
88    /// The range of transaction offsets included in the commit.
89    pub tx_range: Range<u64>,
90    /// The crc32 checksum of the commit's serialized form,
91    /// as written to the commitlog.
92    pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97    pub(crate) commit: Commit,
98    pub(crate) inner: BufWriter<W>,
99
100    pub(crate) min_tx_offset: u64,
101    pub(crate) bytes_written: u64,
102
103    pub(crate) max_records_in_commit: NonZeroU16,
104
105    pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109    /// Append the record (aka transaction) `T` to the segment.
110    ///
111    /// If the number of currently buffered records would exceed `max_records_in_commit`
112    /// after the method returns, the argument is returned in an `Err` and not
113    /// appended to this writer's buffer.
114    ///
115    /// Otherwise, the `record` is encoded and and stored in the buffer.
116    ///
117    /// An `Err` result indicates that [`Self::commit`] should be called in
118    /// order to flush the buffered records to persistent storage.
119    pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120        if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121            Err(record)
122        } else {
123            self.commit.n += 1;
124            record.encode_record(&mut self.commit.records);
125            Ok(())
126        }
127    }
128
129    /// Write the current [`Commit`] to the underlying [`io::Write`].
130    ///
131    /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero).
132    /// In this case, `None` is returned.
133    ///
134    /// Otherwise `Some` [`Committed`] is returned, providing some metadata about
135    /// the commit.
136    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137        if self.commit.n == 0 {
138            return Ok(None);
139        }
140        let checksum = self.commit.write(&mut self.inner)?;
141        self.inner.flush()?;
142
143        let commit_len = self.commit.encoded_len() as u64;
144        self.offset_index_head.as_mut().map(|index| {
145            debug!(
146                "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147                self.commit.min_tx_offset, self.bytes_written, commit_len
148            );
149            index
150                .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151                .map_err(|e| {
152                    debug!("failed to append to offset index: {:?}", e);
153                })
154        });
155
156        let tx_range_start = self.commit.min_tx_offset;
157
158        self.bytes_written += commit_len;
159        self.commit.min_tx_offset += self.commit.n as u64;
160        self.commit.n = 0;
161        self.commit.records.clear();
162
163        Ok(Some(Committed {
164            tx_range: tx_range_start..self.commit.min_tx_offset,
165            checksum,
166        }))
167    }
168
169    /// Get the current epoch.
170    pub fn epoch(&self) -> u64 {
171        self.commit.epoch
172    }
173
174    /// Update the epoch.
175    ///
176    /// The caller must ensure that:
177    ///
178    /// - The new epoch is greater than the current epoch.
179    /// - [`Self::commit`] has been called as appropriate.
180    ///
181    pub fn set_epoch(&mut self, epoch: u64) {
182        self.commit.epoch = epoch;
183    }
184
185    /// The smallest transaction offset in this segment.
186    pub fn min_tx_offset(&self) -> u64 {
187        self.min_tx_offset
188    }
189
190    /// The next transaction offset to be written if [`Self::commit`] was called.
191    pub fn next_tx_offset(&self) -> u64 {
192        self.commit.min_tx_offset
193    }
194
195    /// `true` if the segment contains no commits.
196    ///
197    /// The segment will, however, contain a header. This thus violates the
198    /// convention that `is_empty == (len == 0)`.
199    pub fn is_empty(&self) -> bool {
200        self.bytes_written <= Header::LEN as u64
201    }
202
203    /// Number of bytes written to this segment, including the header.
204    pub fn len(&self) -> u64 {
205        self.bytes_written
206    }
207}
208
209pub trait FileLike {
210    fn fsync(&mut self) -> io::Result<()>;
211    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215    fn fsync(&mut self) -> io::Result<()> {
216        self.sync_data()
217    }
218
219    fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220        self.set_len(size)
221    }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225    fn fsync(&mut self) -> io::Result<()> {
226        self.get_mut().fsync()
227    }
228
229    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230        self.get_mut().ftruncate(tx_offset, size)
231    }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235    fn fsync(&mut self) -> io::Result<()> {
236        self.inner.fsync()?;
237        self.offset_index_head.as_mut().map(|index| index.fsync());
238        Ok(())
239    }
240
241    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242        self.inner.ftruncate(tx_offset, size)?;
243        self.offset_index_head
244            .as_mut()
245            .map(|index| index.ftruncate(tx_offset, size));
246        Ok(())
247    }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252    pub(crate) head: TxOffsetIndexMut,
253
254    require_segment_fsync: bool,
255    min_write_interval: NonZeroU64,
256
257    pub(crate) candidate_min_tx_offset: TxOffset,
258    pub(crate) candidate_byte_offset: u64,
259    pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263    pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264        OffsetIndexWriter {
265            head,
266            require_segment_fsync: opts.offset_index_require_segment_fsync,
267            min_write_interval: opts.offset_index_interval_bytes,
268            candidate_min_tx_offset: TxOffset::default(),
269            candidate_byte_offset: 0,
270            bytes_since_last_index: 0,
271        }
272    }
273
274    fn reset(&mut self) {
275        self.candidate_byte_offset = 0;
276        self.candidate_min_tx_offset = TxOffset::default();
277        self.bytes_since_last_index = 0;
278    }
279
280    /// Either append to index or save offsets to append at future fsync
281    pub fn append_after_commit(
282        &mut self,
283        min_tx_offset: TxOffset,
284        byte_offset: u64,
285        commit_len: u64,
286    ) -> Result<(), IndexError> {
287        self.bytes_since_last_index += commit_len;
288
289        if self.candidate_min_tx_offset == 0 {
290            self.candidate_byte_offset = byte_offset;
291            self.candidate_min_tx_offset = min_tx_offset;
292        }
293
294        if !self.require_segment_fsync {
295            self.append_internal()?;
296        }
297
298        Ok(())
299    }
300
301    fn append_internal(&mut self) -> Result<(), IndexError> {
302        // If the candidate offset is zero, there has not been a commit since the last offset entry
303        if self.candidate_min_tx_offset == 0 {
304            return Ok(());
305        }
306
307        if self.bytes_since_last_index < self.min_write_interval.get() {
308            return Ok(());
309        }
310
311        self.head
312            .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313        self.head.async_flush()?;
314        self.reset();
315
316        Ok(())
317    }
318}
319
320impl FileLike for OffsetIndexWriter {
321    /// Must be called via SegmentWriter::fsync
322    fn fsync(&mut self) -> io::Result<()> {
323        let _ = self.append_internal().map_err(|e| {
324            warn!("failed to append to offset index: {e:?}");
325        });
326        let _ = self
327            .head
328            .async_flush()
329            .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330        Ok(())
331    }
332
333    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334        self.reset();
335        self.head
336            .truncate(tx_offset)
337            .inspect_err(|e| {
338                warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339            })
340            .ok();
341        Ok(())
342    }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346    fn fsync(&mut self) -> io::Result<()> {
347        self.async_flush()
348    }
349
350    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351        self.truncate(tx_offset).map_err(|e| {
352            io::Error::new(
353                ErrorKind::Other,
354                format!("failed to truncate offset index at {tx_offset}: {e:?}"),
355            )
356        })
357    }
358}
359
360#[derive(Debug)]
361pub struct Reader<R> {
362    pub header: Header,
363    pub min_tx_offset: u64,
364    inner: R,
365}
366
367impl<R: io::Read + io::Seek> Reader<R> {
368    pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
369        let header = Header::decode(&mut inner)?;
370        header
371            .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
372            .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
373
374        Ok(Self {
375            header,
376            min_tx_offset,
377            inner,
378        })
379    }
380}
381
382impl<R: io::BufRead + io::Seek> Reader<R> {
383    pub fn commits(self) -> Commits<R> {
384        Commits {
385            header: self.header,
386            reader: self.inner,
387        }
388    }
389
390    pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> {
391        seek_to_offset(&mut self.inner, index_file, start_tx_offset)
392    }
393
394    #[cfg(test)]
395    pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
396    where
397        D: crate::Decoder,
398        D::Error: From<io::Error>,
399        R: 'a,
400    {
401        use itertools::Itertools as _;
402
403        self.commits()
404            .with_log_format_version()
405            .map(|x| x.map_err(Into::into))
406            .map_ok(move |(version, commit)| {
407                let start = commit.min_tx_offset;
408                commit.into_transactions(version, start, de)
409            })
410            .flatten_ok()
411            .map(|x| x.and_then(|y| y))
412    }
413
414    #[cfg(test)]
415    pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
416        Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
417    }
418}
419
420/// Advances the `segment` reader to the position corresponding to the `start_tx_offset`
421/// using the `index_file` for efficient seeking.
422///
423/// Input:
424/// - `segment` - segment reader
425/// - `min_tx_offset` - minimum transaction offset in the segment
426/// - `start_tx_offset` - transaction offset to advance to
427pub fn seek_to_offset<R: io::Read + io::Seek>(
428    mut segment: &mut R,
429    index_file: &TxOffsetIndex,
430    start_tx_offset: u64,
431) -> Result<(), IndexError> {
432    let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
433
434    // If the index_key is 0, it means the index file is empty, no need to seek
435    if index_key == 0 {
436        return Ok(());
437    }
438    debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
439    // returned `index_key` should never be greater than `start_tx_offset`
440    debug_assert!(index_key <= start_tx_offset);
441
442    // Check if the offset index is pointing to the right commit.
443    validate_commit_header(&mut segment, byte_offset).map(|hdr| {
444        if hdr.min_tx_offset == index_key {
445            // Advance the segment Seek if expected commit is found.
446            segment
447                .seek(SeekFrom::Start(byte_offset))
448                .map(|_| ())
449                .map_err(Into::into)
450        } else {
451            Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into())
452        }
453    })?
454}
455
456/// Try to extract the commit header from the asked position without advancing seek.
457/// `IndexFileMut` fsync asynchoronously, which makes it important for reader to verify its entry
458pub fn validate_commit_header<Reader: io::Read + io::Seek>(
459    mut reader: &mut Reader,
460    byte_offset: u64,
461) -> io::Result<commit::Header> {
462    let pos = reader.stream_position()?;
463    reader.seek(SeekFrom::Start(byte_offset))?;
464
465    let hdr = commit::Header::decode(&mut reader)
466        .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
467
468    // Restore the original position
469    reader.seek(SeekFrom::Start(pos))?;
470
471    hdr
472}
473
474/// Pair of transaction offset and payload.
475///
476/// Created by iterators which "flatten" commits into individual transaction
477/// records.
478#[derive(Debug, PartialEq)]
479pub struct Transaction<T> {
480    /// The offset of this transaction relative to the start of the log.
481    pub offset: u64,
482    /// The transaction payload.
483    pub txdata: T,
484}
485
486pub struct Commits<R> {
487    pub header: Header,
488    reader: R,
489}
490
491impl<R: io::BufRead> Iterator for Commits<R> {
492    type Item = io::Result<StoredCommit>;
493
494    fn next(&mut self) -> Option<Self::Item> {
495        StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
496    }
497}
498
499#[cfg(test)]
500impl<R: io::BufRead> Commits<R> {
501    pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
502        CommitsWithVersion { inner: self }
503    }
504}
505
506#[cfg(test)]
507struct CommitsWithVersion<R> {
508    inner: Commits<R>,
509}
510
511#[cfg(test)]
512impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
513    type Item = io::Result<(u8, StoredCommit)>;
514
515    fn next(&mut self) -> Option<Self::Item> {
516        let next = self.inner.next()?;
517        match next {
518            Ok(commit) => {
519                let version = self.inner.header.log_format_version;
520                Some(Ok((version, commit)))
521            }
522            Err(e) => Some(Err(e)),
523        }
524    }
525}
526
527#[derive(Clone, Debug, Eq, PartialEq)]
528pub struct Metadata {
529    pub header: Header,
530    pub tx_range: Range<u64>,
531    pub size_in_bytes: u64,
532    pub max_epoch: u64,
533}
534
535impl Metadata {
536    /// Reads and validates metadata from a segment.  
537    /// It will look for last commit index offset and then traverse the segment
538    ///
539    /// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
540    pub(crate) fn extract<R: io::Read + io::Seek>(
541        min_tx_offset: TxOffset,
542        mut reader: R,
543        offset_index: Option<&TxOffsetIndex>,
544    ) -> Result<Self, error::SegmentMetadata> {
545        let header = Header::decode(&mut reader)?;
546        Self::with_header(min_tx_offset, header, reader, offset_index)
547    }
548
549    fn with_header<R: io::Read + io::Seek>(
550        min_tx_offset: u64,
551        header: Header,
552        mut reader: R,
553        offset_index: Option<&TxOffsetIndex>,
554    ) -> Result<Self, error::SegmentMetadata> {
555        let mut sofar = offset_index
556            .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
557            .unwrap_or_else(|| Self {
558                header,
559                tx_range: Range {
560                    start: min_tx_offset,
561                    end: min_tx_offset,
562                },
563                size_in_bytes: Header::LEN as u64,
564                max_epoch: u64::default(),
565            });
566
567        reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
568
569        fn commit_meta<R: io::Read>(
570            reader: &mut R,
571            sofar: &Metadata,
572        ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
573            commit::Metadata::extract(reader).map_err(|e| {
574                if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
575                    error::SegmentMetadata::InvalidCommit {
576                        sofar: sofar.clone(),
577                        source: e,
578                    }
579                } else {
580                    e.into()
581                }
582            })
583        }
584        while let Some(commit) = commit_meta(&mut reader, &sofar)? {
585            debug!("commit::{commit:?}");
586            if commit.tx_range.start != sofar.tx_range.end {
587                return Err(io::Error::new(
588                    io::ErrorKind::InvalidData,
589                    format!(
590                        "out-of-order offset: expected={} actual={}",
591                        sofar.tx_range.end, commit.tx_range.start,
592                    ),
593                )
594                .into());
595            }
596            sofar.tx_range.end = commit.tx_range.end;
597            sofar.size_in_bytes += commit.size_in_bytes;
598            // TODO: Should it be an error to encounter an epoch going backwards?
599            sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
600        }
601
602        Ok(sofar)
603    }
604
605    /// Finds the last valid commit in the segment using the offset index.
606    /// It traverses the index in reverse order, starting from the last key.
607    ///
608    /// Returns
609    /// * `Ok((Metadata)` - If a valid commit is found containing the commit, It adds a default
610    ///     header, which should be replaced with the actual header.
611    /// * `Err` - If no valid commit is found or if the index is empty
612    fn find_valid_indexed_commit<R: io::Read + io::Seek>(
613        min_tx_offset: u64,
614        header: Header,
615        reader: &mut R,
616        offset_index: &TxOffsetIndex,
617    ) -> io::Result<Metadata> {
618        let mut candidate_last_key = TxOffset::MAX;
619
620        while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
621            match Self::validate_commit_at_offset(reader, key, byte_offset) {
622                Ok(commit) => {
623                    return Ok(Metadata {
624                        header,
625                        tx_range: Range {
626                            start: min_tx_offset,
627                            end: commit.tx_range.end,
628                        },
629                        size_in_bytes: byte_offset + commit.size_in_bytes,
630                        max_epoch: commit.epoch,
631                    });
632                }
633
634                // `TxOffset` at `byte_offset` is not valid, so try with previous entry
635                Err(_) => {
636                    candidate_last_key = key.saturating_sub(1);
637                    if candidate_last_key == 0 {
638                        break;
639                    }
640                }
641            }
642        }
643
644        Err(io::Error::new(
645            ErrorKind::InvalidData,
646            format!("No valid commit found in index up to key: {}", candidate_last_key),
647        ))
648    }
649
650    /// Validates and decodes a commit at `byte_offset` in the segment.
651    ///
652    /// # Returns
653    /// * `Ok(commit::Metadata)` - If a valid commit is found with matching transaction offset
654    /// * `Err` - If commit can't be decoded or has mismatched transaction offset
655    fn validate_commit_at_offset<R: io::Read + io::Seek>(
656        reader: &mut R,
657        tx_offset: TxOffset,
658        byte_offset: u64,
659    ) -> io::Result<commit::Metadata> {
660        reader.seek(SeekFrom::Start(byte_offset))?;
661        let commit = commit::Metadata::extract(reader)?
662            .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
663
664        if commit.tx_range.start != tx_offset {
665            return Err(io::Error::new(
666                ErrorKind::InvalidData,
667                format!(
668                    "mismatch key in index offset file: expected={} actual={}",
669                    tx_offset, commit.tx_range.start
670                ),
671            ));
672        }
673
674        Ok(commit)
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use std::num::NonZeroU16;
681
682    use super::*;
683    use crate::{payload::ArrayDecoder, repo, Options};
684    use itertools::Itertools;
685    use proptest::prelude::*;
686    use spacetimedb_paths::server::CommitLogDir;
687    use tempfile::tempdir;
688
689    #[test]
690    fn header_roundtrip() {
691        let hdr = Header {
692            log_format_version: 42,
693            checksum_algorithm: 7,
694        };
695
696        let mut buf = [0u8; Header::LEN];
697        hdr.write(&mut &mut buf[..]).unwrap();
698        let h2 = Header::decode(&buf[..]).unwrap();
699
700        assert_eq!(hdr, h2);
701    }
702
703    #[test]
704    fn write_read_roundtrip() {
705        let repo = repo::Memory::default();
706
707        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
708        writer.append([0; 32]).unwrap();
709        writer.append([1; 32]).unwrap();
710        writer.append([2; 32]).unwrap();
711        writer.commit().unwrap();
712
713        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
714        let header = reader.header;
715        let commit = reader
716            .commits()
717            .next()
718            .expect("expected one commit")
719            .expect("unexpected IO");
720
721        assert_eq!(
722            header,
723            Header {
724                log_format_version: DEFAULT_LOG_FORMAT_VERSION,
725                checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
726            }
727        );
728        assert_eq!(commit.min_tx_offset, 0);
729        assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
730    }
731
732    #[test]
733    fn metadata() {
734        let repo = repo::Memory::default();
735
736        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
737        writer.append([0; 32]).unwrap();
738        writer.append([0; 32]).unwrap();
739        writer.commit().unwrap();
740        writer.append([1; 32]).unwrap();
741        writer.commit().unwrap();
742        writer.append([2; 32]).unwrap();
743        writer.append([2; 32]).unwrap();
744        writer.commit().unwrap();
745
746        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
747        let Metadata {
748            header: _,
749            tx_range,
750            size_in_bytes,
751            max_epoch: _,
752        } = reader.metadata().unwrap();
753
754        assert_eq!(tx_range.start, 0);
755        assert_eq!(tx_range.end, 5);
756        assert_eq!(
757            size_in_bytes,
758            (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64
759        );
760    }
761
762    #[test]
763    fn commits() {
764        let repo = repo::Memory::default();
765        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
766
767        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
768        for commit in &commits {
769            for tx in commit {
770                writer.append(*tx).unwrap();
771            }
772            writer.commit().unwrap();
773        }
774
775        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
776        let mut commits1 = Vec::with_capacity(commits.len());
777        let mut min_tx_offset = 0;
778        for txs in commits {
779            commits1.push(Commit {
780                min_tx_offset,
781                n: txs.len() as u16,
782                records: txs.concat(),
783                epoch: 0,
784            });
785            min_tx_offset += txs.len() as u64;
786        }
787        let commits2 = reader
788            .commits()
789            .map_ok(Into::into)
790            .collect::<Result<Vec<Commit>, _>>()
791            .unwrap();
792        assert_eq!(commits1, commits2);
793    }
794
795    #[test]
796    fn transactions() {
797        let repo = repo::Memory::default();
798        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
799
800        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
801        for commit in &commits {
802            for tx in commit {
803                writer.append(*tx).unwrap();
804            }
805            writer.commit().unwrap();
806        }
807
808        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
809        let txs = reader
810            .transactions(&ArrayDecoder)
811            .collect::<Result<Vec<_>, _>>()
812            .unwrap();
813        assert_eq!(
814            txs,
815            commits
816                .into_iter()
817                .flatten()
818                .enumerate()
819                .map(|(offset, txdata)| Transaction {
820                    offset: offset as u64,
821                    txdata
822                })
823                .collect::<Vec<_>>()
824        );
825    }
826
827    proptest! {
828        #[test]
829        fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
830            let mut writer = Writer {
831                commit: Commit::default(),
832                inner: BufWriter::new(Vec::new()),
833
834                min_tx_offset: 0,
835                bytes_written: 0,
836
837                max_records_in_commit,
838
839                offset_index_head: None,
840            };
841
842            for i in 0..max_records_in_commit.get() {
843                assert!(
844                    writer.append([0; 16]).is_ok(),
845                    "less than {} records written: {}",
846                    max_records_in_commit.get(),
847                    i
848                );
849            }
850            assert!(
851                writer.append([0; 16]).is_err(),
852                "more than {} records written",
853                max_records_in_commit.get()
854            );
855        }
856    }
857
858    #[test]
859    fn next_tx_offset() {
860        let mut writer = Writer {
861            commit: Commit::default(),
862            inner: BufWriter::new(Vec::new()),
863
864            min_tx_offset: 0,
865            bytes_written: 0,
866
867            max_records_in_commit: NonZeroU16::MAX,
868            offset_index_head: None,
869        };
870
871        assert_eq!(0, writer.next_tx_offset());
872        writer.append([0; 16]).unwrap();
873        assert_eq!(0, writer.next_tx_offset());
874        writer.commit().unwrap();
875        assert_eq!(1, writer.next_tx_offset());
876        writer.commit().unwrap();
877        assert_eq!(1, writer.next_tx_offset());
878        writer.append([1; 16]).unwrap();
879        writer.append([1; 16]).unwrap();
880        writer.commit().unwrap();
881        assert_eq!(3, writer.next_tx_offset());
882    }
883
884    #[test]
885    fn offset_index_writer_truncates_to_offset() {
886        use spacetimedb_paths::FromPathUnchecked as _;
887
888        let tmp = tempdir().unwrap();
889        let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
890        let index_path = commitlog_dir.index(0);
891        let mut writer = OffsetIndexWriter::new(
892            TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
893            Options {
894                // Ensure we're writing every index entry.
895                offset_index_interval_bytes: 127.try_into().unwrap(),
896                offset_index_require_segment_fsync: false,
897                ..Default::default()
898            },
899        );
900
901        for i in 1..=10 {
902            writer.append_after_commit(i, i * 128, 128).unwrap();
903        }
904        // Ensure all entries have been written.
905        for i in 1..=10 {
906            assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
907        }
908
909        // Truncating to any offset in the written range or larger
910        // retains that offset - 1, or the max offset written.
911        let truncate_to: TxOffset = rand::random_range(1..=32);
912        let retained_key = truncate_to.saturating_sub(1).min(10);
913        let retained_val = retained_key * 128;
914        let retained = (retained_key, retained_val);
915
916        writer.ftruncate(truncate_to, rand::random()).unwrap();
917        assert_eq!(writer.head.key_lookup(truncate_to).unwrap(), retained);
918        // Make sure this also holds after reopen.
919        drop(writer);
920        let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
921        assert_eq!(index.key_lookup(truncate_to).unwrap(), retained);
922    }
923}