spacetimedb_commitlog/
segment.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4    num::{NonZeroU16, NonZeroU64},
5    ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11    commit::{self, Commit, StoredCommit},
12    error,
13    index::{IndexError, IndexFileMut},
14    payload::Encode,
15    repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16    Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27/// Lookup table for checksum length, index is [`Header::checksum_algorithm`].
28// Supported algorithms must be numbered consecutively!
29pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33    pub log_format_version: u8,
34    pub checksum_algorithm: u8,
35}
36
37impl Header {
38    pub const LEN: usize = MAGIC.len() + /* log_format_version + checksum_algorithm + reserved + reserved */ 4;
39
40    pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41        out.write_all(&MAGIC)?;
42        out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44        Ok(())
45    }
46
47    pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48        let mut buf = [0; Self::LEN];
49        read.read_exact(&mut buf)?;
50
51        if !buf.starts_with(&MAGIC) {
52            return Err(io::Error::new(
53                io::ErrorKind::InvalidData,
54                "segment header does not start with magic",
55            ));
56        }
57
58        Ok(Self {
59            log_format_version: buf[MAGIC.len()],
60            checksum_algorithm: buf[MAGIC.len() + 1],
61        })
62    }
63
64    pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65        if self.log_format_version > max_log_format_version {
66            return Err(format!("unsupported log format version: {}", self.log_format_version));
67        }
68        if self.checksum_algorithm != checksum_algorithm {
69            return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70        }
71
72        Ok(())
73    }
74}
75
76impl Default for Header {
77    fn default() -> Self {
78        Self {
79            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80            checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81        }
82    }
83}
84
85/// Metadata about a [`Commit`] which was successfully written via [`Writer::commit`].
86#[derive(Debug, PartialEq)]
87pub struct Committed {
88    /// The range of transaction offsets included in the commit.
89    pub tx_range: Range<u64>,
90    /// The crc32 checksum of the commit's serialized form,
91    /// as written to the commitlog.
92    pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97    pub(crate) commit: Commit,
98    pub(crate) inner: BufWriter<W>,
99
100    pub(crate) min_tx_offset: u64,
101    pub(crate) bytes_written: u64,
102
103    pub(crate) max_records_in_commit: NonZeroU16,
104
105    pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109    /// Append the record (aka transaction) `T` to the segment.
110    ///
111    /// If the number of currently buffered records would exceed `max_records_in_commit`
112    /// after the method returns, the argument is returned in an `Err` and not
113    /// appended to this writer's buffer.
114    ///
115    /// Otherwise, the `record` is encoded and and stored in the buffer.
116    ///
117    /// An `Err` result indicates that [`Self::commit`] should be called in
118    /// order to flush the buffered records to persistent storage.
119    pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120        if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121            Err(record)
122        } else {
123            self.commit.n += 1;
124            record.encode_record(&mut self.commit.records);
125            Ok(())
126        }
127    }
128
129    /// Write the current [`Commit`] to the underlying [`io::Write`].
130    ///
131    /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero).
132    /// In this case, `None` is returned.
133    ///
134    /// Otherwise `Some` [`Committed`] is returned, providing some metadata about
135    /// the commit.
136    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137        if self.commit.n == 0 {
138            return Ok(None);
139        }
140        let checksum = self.commit.write(&mut self.inner)?;
141        self.inner.flush()?;
142
143        let commit_len = self.commit.encoded_len() as u64;
144        self.offset_index_head.as_mut().map(|index| {
145            debug!(
146                "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147                self.commit.min_tx_offset, self.bytes_written, commit_len
148            );
149            index
150                .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151                .map_err(|e| {
152                    debug!("failed to append to offset index: {:?}", e);
153                })
154        });
155
156        let tx_range_start = self.commit.min_tx_offset;
157
158        self.bytes_written += commit_len;
159        self.commit.min_tx_offset += self.commit.n as u64;
160        self.commit.n = 0;
161        self.commit.records.clear();
162
163        Ok(Some(Committed {
164            tx_range: tx_range_start..self.commit.min_tx_offset,
165            checksum,
166        }))
167    }
168
169    /// Get the current epoch.
170    pub fn epoch(&self) -> u64 {
171        self.commit.epoch
172    }
173
174    /// Update the epoch.
175    ///
176    /// The caller must ensure that:
177    ///
178    /// - The new epoch is greater than the current epoch.
179    /// - [`Self::commit`] has been called as appropriate.
180    ///
181    pub fn set_epoch(&mut self, epoch: u64) {
182        self.commit.epoch = epoch;
183    }
184
185    /// The smallest transaction offset in this segment.
186    pub fn min_tx_offset(&self) -> u64 {
187        self.min_tx_offset
188    }
189
190    /// The next transaction offset to be written if [`Self::commit`] was called.
191    pub fn next_tx_offset(&self) -> u64 {
192        self.commit.min_tx_offset
193    }
194
195    /// `true` if the segment contains no commits.
196    ///
197    /// The segment will, however, contain a header. This thus violates the
198    /// convention that `is_empty == (len == 0)`.
199    pub fn is_empty(&self) -> bool {
200        self.bytes_written <= Header::LEN as u64
201    }
202
203    /// Number of bytes written to this segment, including the header.
204    pub fn len(&self) -> u64 {
205        self.bytes_written
206    }
207}
208
209pub trait FileLike {
210    fn fsync(&mut self) -> io::Result<()>;
211    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215    fn fsync(&mut self) -> io::Result<()> {
216        self.sync_data()
217    }
218
219    fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220        self.set_len(size)
221    }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225    fn fsync(&mut self) -> io::Result<()> {
226        self.get_mut().fsync()
227    }
228
229    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230        self.get_mut().ftruncate(tx_offset, size)
231    }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235    fn fsync(&mut self) -> io::Result<()> {
236        self.inner.fsync()?;
237        self.offset_index_head.as_mut().map(|index| index.fsync());
238        Ok(())
239    }
240
241    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242        self.inner.ftruncate(tx_offset, size)?;
243        self.offset_index_head
244            .as_mut()
245            .map(|index| index.ftruncate(tx_offset, size));
246        Ok(())
247    }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252    pub(crate) head: TxOffsetIndexMut,
253
254    require_segment_fsync: bool,
255    min_write_interval: NonZeroU64,
256
257    pub(crate) candidate_min_tx_offset: TxOffset,
258    pub(crate) candidate_byte_offset: u64,
259    pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263    pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264        OffsetIndexWriter {
265            head,
266            require_segment_fsync: opts.offset_index_require_segment_fsync,
267            min_write_interval: opts.offset_index_interval_bytes,
268            candidate_min_tx_offset: TxOffset::default(),
269            candidate_byte_offset: 0,
270            bytes_since_last_index: 0,
271        }
272    }
273
274    fn reset(&mut self) {
275        self.candidate_byte_offset = 0;
276        self.candidate_min_tx_offset = TxOffset::default();
277        self.bytes_since_last_index = 0;
278    }
279
280    /// Either append to index or save offsets to append at future fsync
281    pub fn append_after_commit(
282        &mut self,
283        min_tx_offset: TxOffset,
284        byte_offset: u64,
285        commit_len: u64,
286    ) -> Result<(), IndexError> {
287        self.bytes_since_last_index += commit_len;
288
289        if self.candidate_min_tx_offset == 0 {
290            self.candidate_byte_offset = byte_offset;
291            self.candidate_min_tx_offset = min_tx_offset;
292        }
293
294        if !self.require_segment_fsync {
295            self.append_internal()?;
296        }
297
298        Ok(())
299    }
300
301    fn append_internal(&mut self) -> Result<(), IndexError> {
302        // If the candidate offset is zero, there has not been a commit since the last offset entry
303        if self.candidate_min_tx_offset == 0 {
304            return Ok(());
305        }
306
307        if self.bytes_since_last_index < self.min_write_interval.get() {
308            return Ok(());
309        }
310
311        self.head
312            .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313        self.head.async_flush()?;
314        self.reset();
315
316        Ok(())
317    }
318}
319
320impl FileLike for OffsetIndexWriter {
321    /// Must be called via SegmentWriter::fsync
322    fn fsync(&mut self) -> io::Result<()> {
323        let _ = self.append_internal().map_err(|e| {
324            warn!("failed to append to offset index: {e:?}");
325        });
326        let _ = self
327            .head
328            .async_flush()
329            .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330        Ok(())
331    }
332
333    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334        self.reset();
335        self.head
336            .truncate(tx_offset)
337            .inspect_err(|e| {
338                warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339            })
340            .ok();
341        Ok(())
342    }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346    fn fsync(&mut self) -> io::Result<()> {
347        self.async_flush()
348    }
349
350    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351        self.truncate(tx_offset).map_err(|e| {
352            io::Error::new(
353                ErrorKind::Other,
354                format!("failed to truncate offset index at {tx_offset}: {e:?}"),
355            )
356        })
357    }
358}
359
360#[derive(Debug)]
361pub struct Reader<R> {
362    pub header: Header,
363    pub min_tx_offset: u64,
364    inner: R,
365}
366
367impl<R: io::Read + io::Seek> Reader<R> {
368    pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
369        let header = Header::decode(&mut inner)?;
370        header
371            .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
372            .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
373
374        Ok(Self {
375            header,
376            min_tx_offset,
377            inner,
378        })
379    }
380}
381
382impl<R: io::BufRead + io::Seek> Reader<R> {
383    pub fn commits(self) -> Commits<R> {
384        Commits {
385            header: self.header,
386            reader: self.inner,
387        }
388    }
389
390    pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<u64, IndexError> {
391        seek_to_offset(&mut self.inner, index_file, start_tx_offset)
392    }
393
394    #[cfg(test)]
395    pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
396    where
397        D: crate::Decoder,
398        D::Error: From<io::Error>,
399        R: 'a,
400    {
401        use itertools::Itertools as _;
402
403        self.commits()
404            .with_log_format_version()
405            .map(|x| x.map_err(Into::into))
406            .map_ok(move |(version, commit)| {
407                let start = commit.min_tx_offset;
408                commit.into_transactions(version, start, de)
409            })
410            .flatten_ok()
411            .map(|x| x.and_then(|y| y))
412    }
413
414    #[cfg(test)]
415    pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
416        Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
417    }
418}
419
420/// Advances the `segment` reader to the position corresponding to the `start_tx_offset`
421/// using the `index_file` for efficient seeking.
422///
423/// Input:
424/// - `segment` - segment reader
425/// - `min_tx_offset` - minimum transaction offset in the segment
426/// - `start_tx_offset` - transaction offset to advance to
427///
428/// Returns the byte position `segment` is at after seeking.
429pub fn seek_to_offset<R: io::Read + io::Seek>(
430    mut segment: &mut R,
431    index_file: &TxOffsetIndex,
432    start_tx_offset: u64,
433) -> Result<u64, IndexError> {
434    let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
435
436    // If the index_key is 0, it means the index file is empty, return error without seeking
437    if index_key == 0 {
438        return Err(IndexError::KeyNotFound);
439    }
440    debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
441    // returned `index_key` should never be greater than `start_tx_offset`
442    debug_assert!(index_key <= start_tx_offset);
443
444    // Check if the offset index is pointing to the right commit.
445    let hdr = validate_commit_header(&mut segment, byte_offset)?;
446    if hdr.min_tx_offset == index_key {
447        // Advance the segment Seek if expected commit is found.
448        segment.seek(SeekFrom::Start(byte_offset))
449    } else {
450        Err(io::Error::new(
451            io::ErrorKind::InvalidData,
452            "mismatched key in offset index file",
453        ))
454    }
455    .map_err(Into::into)
456}
457
458/// Try to extract the commit header from the asked position without advancing seek.
459/// `IndexFileMut` fsync asynchoronously, which makes it important for reader to verify its entry
460pub fn validate_commit_header<Reader: io::Read + io::Seek>(
461    mut reader: &mut Reader,
462    byte_offset: u64,
463) -> io::Result<commit::Header> {
464    let pos = reader.stream_position()?;
465    reader.seek(SeekFrom::Start(byte_offset))?;
466
467    let hdr = commit::Header::decode(&mut reader)
468        .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
469
470    // Restore the original position
471    reader.seek(SeekFrom::Start(pos))?;
472
473    hdr
474}
475
476/// Pair of transaction offset and payload.
477///
478/// Created by iterators which "flatten" commits into individual transaction
479/// records.
480#[derive(Debug, PartialEq)]
481pub struct Transaction<T> {
482    /// The offset of this transaction relative to the start of the log.
483    pub offset: u64,
484    /// The transaction payload.
485    pub txdata: T,
486}
487
488pub struct Commits<R> {
489    pub header: Header,
490    reader: R,
491}
492
493impl<R: io::BufRead> Iterator for Commits<R> {
494    type Item = io::Result<StoredCommit>;
495
496    fn next(&mut self) -> Option<Self::Item> {
497        StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
498    }
499}
500
501#[cfg(test)]
502impl<R: io::BufRead> Commits<R> {
503    pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
504        CommitsWithVersion { inner: self }
505    }
506}
507
508#[cfg(test)]
509struct CommitsWithVersion<R> {
510    inner: Commits<R>,
511}
512
513#[cfg(test)]
514impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
515    type Item = io::Result<(u8, StoredCommit)>;
516
517    fn next(&mut self) -> Option<Self::Item> {
518        let next = self.inner.next()?;
519        match next {
520            Ok(commit) => {
521                let version = self.inner.header.log_format_version;
522                Some(Ok((version, commit)))
523            }
524            Err(e) => Some(Err(e)),
525        }
526    }
527}
528
529#[derive(Clone, Debug, Eq, PartialEq)]
530pub struct Metadata {
531    /// The segment header.
532    pub header: Header,
533    /// The range of transactions contained in the segment.
534    pub tx_range: Range<u64>,
535    /// The size of the segment.
536    pub size_in_bytes: u64,
537    /// The largest epoch found in the segment.
538    pub max_epoch: u64,
539    /// The latest commit found in the segment.
540    ///
541    /// The value is the `min_tx_offset` of the commit, i.e.
542    /// `max_commit_offset..tx_range.end` is the range of
543    /// transactions contained in it.
544    pub max_commit_offset: u64,
545}
546
547impl Metadata {
548    /// Reads and validates metadata from a segment.
549    /// It will look for last commit index offset and then traverse the segment
550    ///
551    /// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
552    pub(crate) fn extract<R: io::Read + io::Seek>(
553        min_tx_offset: TxOffset,
554        mut reader: R,
555        offset_index: Option<&TxOffsetIndex>,
556    ) -> Result<Self, error::SegmentMetadata> {
557        let header = Header::decode(&mut reader)?;
558        Self::with_header(min_tx_offset, header, reader, offset_index)
559    }
560
561    fn with_header<R: io::Read + io::Seek>(
562        min_tx_offset: u64,
563        header: Header,
564        mut reader: R,
565        offset_index: Option<&TxOffsetIndex>,
566    ) -> Result<Self, error::SegmentMetadata> {
567        let mut sofar = offset_index
568            .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
569            .unwrap_or_else(|| Self {
570                header,
571                tx_range: Range {
572                    start: min_tx_offset,
573                    end: min_tx_offset,
574                },
575                size_in_bytes: Header::LEN as u64,
576                max_epoch: u64::default(),
577                max_commit_offset: min_tx_offset,
578            });
579
580        reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
581
582        fn commit_meta<R: io::Read>(
583            reader: &mut R,
584            sofar: &Metadata,
585        ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
586            commit::Metadata::extract(reader).map_err(|e| {
587                if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
588                    error::SegmentMetadata::InvalidCommit {
589                        sofar: sofar.clone(),
590                        source: e,
591                    }
592                } else {
593                    e.into()
594                }
595            })
596        }
597        while let Some(commit) = commit_meta(&mut reader, &sofar)? {
598            debug!("commit::{commit:?}");
599            if commit.tx_range.start != sofar.tx_range.end {
600                return Err(io::Error::new(
601                    io::ErrorKind::InvalidData,
602                    format!(
603                        "out-of-order offset: expected={} actual={}",
604                        sofar.tx_range.end, commit.tx_range.start,
605                    ),
606                )
607                .into());
608            }
609            sofar.tx_range.end = commit.tx_range.end;
610            sofar.size_in_bytes += commit.size_in_bytes;
611            // TODO: Should it be an error to encounter an epoch going backwards?
612            sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
613            sofar.max_commit_offset = commit.tx_range.start;
614        }
615
616        Ok(sofar)
617    }
618
619    /// Finds the last valid commit in the segment using the offset index.
620    /// It traverses the index in reverse order, starting from the last key.
621    ///
622    /// Returns
623    /// * `Ok((Metadata)` - If a valid commit is found containing the commit, It adds a default
624    ///     header, which should be replaced with the actual header.
625    /// * `Err` - If no valid commit is found or if the index is empty
626    fn find_valid_indexed_commit<R: io::Read + io::Seek>(
627        min_tx_offset: u64,
628        header: Header,
629        reader: &mut R,
630        offset_index: &TxOffsetIndex,
631    ) -> io::Result<Metadata> {
632        let mut candidate_last_key = TxOffset::MAX;
633
634        while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
635            match Self::validate_commit_at_offset(reader, key, byte_offset) {
636                Ok(commit) => {
637                    return Ok(Metadata {
638                        header,
639                        tx_range: Range {
640                            start: min_tx_offset,
641                            end: commit.tx_range.end,
642                        },
643                        size_in_bytes: byte_offset + commit.size_in_bytes,
644                        max_epoch: commit.epoch,
645                        max_commit_offset: commit.tx_range.start,
646                    });
647                }
648
649                // `TxOffset` at `byte_offset` is not valid, so try with previous entry
650                Err(_) => {
651                    candidate_last_key = key.saturating_sub(1);
652                    if candidate_last_key == 0 {
653                        break;
654                    }
655                }
656            }
657        }
658
659        Err(io::Error::new(
660            ErrorKind::InvalidData,
661            format!("No valid commit found in index up to key: {}", candidate_last_key),
662        ))
663    }
664
665    /// Validates and decodes a commit at `byte_offset` in the segment.
666    ///
667    /// # Returns
668    /// * `Ok(commit::Metadata)` - If a valid commit is found with matching transaction offset
669    /// * `Err` - If commit can't be decoded or has mismatched transaction offset
670    fn validate_commit_at_offset<R: io::Read + io::Seek>(
671        reader: &mut R,
672        tx_offset: TxOffset,
673        byte_offset: u64,
674    ) -> io::Result<commit::Metadata> {
675        reader.seek(SeekFrom::Start(byte_offset))?;
676        let commit = commit::Metadata::extract(reader)?
677            .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
678
679        if commit.tx_range.start != tx_offset {
680            return Err(io::Error::new(
681                ErrorKind::InvalidData,
682                format!(
683                    "mismatch key in index offset file: expected={} actual={}",
684                    tx_offset, commit.tx_range.start
685                ),
686            ));
687        }
688
689        Ok(commit)
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use std::num::NonZeroU16;
696
697    use super::*;
698    use crate::{payload::ArrayDecoder, repo, Options};
699    use itertools::Itertools;
700    use pretty_assertions::assert_matches;
701    use proptest::prelude::*;
702    use spacetimedb_paths::server::CommitLogDir;
703    use tempfile::tempdir;
704
705    #[test]
706    fn header_roundtrip() {
707        let hdr = Header {
708            log_format_version: 42,
709            checksum_algorithm: 7,
710        };
711
712        let mut buf = [0u8; Header::LEN];
713        hdr.write(&mut &mut buf[..]).unwrap();
714        let h2 = Header::decode(&buf[..]).unwrap();
715
716        assert_eq!(hdr, h2);
717    }
718
719    #[test]
720    fn write_read_roundtrip() {
721        let repo = repo::Memory::default();
722
723        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
724        writer.append([0; 32]).unwrap();
725        writer.append([1; 32]).unwrap();
726        writer.append([2; 32]).unwrap();
727        writer.commit().unwrap();
728
729        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
730        let header = reader.header;
731        let commit = reader
732            .commits()
733            .next()
734            .expect("expected one commit")
735            .expect("unexpected IO");
736
737        assert_eq!(
738            header,
739            Header {
740                log_format_version: DEFAULT_LOG_FORMAT_VERSION,
741                checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
742            }
743        );
744        assert_eq!(commit.min_tx_offset, 0);
745        assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
746    }
747
748    #[test]
749    fn metadata() {
750        let repo = repo::Memory::default();
751
752        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
753        // Commit 0..2
754        writer.append([0; 32]).unwrap();
755        writer.append([0; 32]).unwrap();
756        writer.commit().unwrap();
757        // Commit 2..3
758        writer.append([1; 32]).unwrap();
759        writer.commit().unwrap();
760        // Commit 3..5
761        writer.append([2; 32]).unwrap();
762        writer.append([2; 32]).unwrap();
763        writer.commit().unwrap();
764
765        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
766        let metadata = reader.metadata().unwrap();
767
768        assert_eq!(
769            metadata,
770            Metadata {
771                header: Header::default(),
772                tx_range: Range { start: 0, end: 5 },
773                // header + 5 txs + 3 commits
774                size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
775                max_epoch: Commit::DEFAULT_EPOCH,
776                max_commit_offset: 3
777            }
778        );
779    }
780
781    #[test]
782    fn commits() {
783        let repo = repo::Memory::default();
784        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
785
786        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
787        for commit in &commits {
788            for tx in commit {
789                writer.append(*tx).unwrap();
790            }
791            writer.commit().unwrap();
792        }
793
794        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
795        let mut commits1 = Vec::with_capacity(commits.len());
796        let mut min_tx_offset = 0;
797        for txs in commits {
798            commits1.push(Commit {
799                min_tx_offset,
800                n: txs.len() as u16,
801                records: txs.concat(),
802                epoch: 0,
803            });
804            min_tx_offset += txs.len() as u64;
805        }
806        let commits2 = reader
807            .commits()
808            .map_ok(Into::into)
809            .collect::<Result<Vec<Commit>, _>>()
810            .unwrap();
811        assert_eq!(commits1, commits2);
812    }
813
814    #[test]
815    fn transactions() {
816        let repo = repo::Memory::default();
817        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
818
819        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
820        for commit in &commits {
821            for tx in commit {
822                writer.append(*tx).unwrap();
823            }
824            writer.commit().unwrap();
825        }
826
827        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
828        let txs = reader
829            .transactions(&ArrayDecoder)
830            .collect::<Result<Vec<_>, _>>()
831            .unwrap();
832        assert_eq!(
833            txs,
834            commits
835                .into_iter()
836                .flatten()
837                .enumerate()
838                .map(|(offset, txdata)| Transaction {
839                    offset: offset as u64,
840                    txdata
841                })
842                .collect::<Vec<_>>()
843        );
844    }
845
846    proptest! {
847        #[test]
848        fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
849            let mut writer = Writer {
850                commit: Commit::default(),
851                inner: BufWriter::new(Vec::new()),
852
853                min_tx_offset: 0,
854                bytes_written: 0,
855
856                max_records_in_commit,
857
858                offset_index_head: None,
859            };
860
861            for i in 0..max_records_in_commit.get() {
862                assert!(
863                    writer.append([0; 16]).is_ok(),
864                    "less than {} records written: {}",
865                    max_records_in_commit.get(),
866                    i
867                );
868            }
869            assert!(
870                writer.append([0; 16]).is_err(),
871                "more than {} records written",
872                max_records_in_commit.get()
873            );
874        }
875    }
876
877    #[test]
878    fn next_tx_offset() {
879        let mut writer = Writer {
880            commit: Commit::default(),
881            inner: BufWriter::new(Vec::new()),
882
883            min_tx_offset: 0,
884            bytes_written: 0,
885
886            max_records_in_commit: NonZeroU16::MAX,
887            offset_index_head: None,
888        };
889
890        assert_eq!(0, writer.next_tx_offset());
891        writer.append([0; 16]).unwrap();
892        assert_eq!(0, writer.next_tx_offset());
893        writer.commit().unwrap();
894        assert_eq!(1, writer.next_tx_offset());
895        writer.commit().unwrap();
896        assert_eq!(1, writer.next_tx_offset());
897        writer.append([1; 16]).unwrap();
898        writer.append([1; 16]).unwrap();
899        writer.commit().unwrap();
900        assert_eq!(3, writer.next_tx_offset());
901    }
902
903    #[test]
904    fn offset_index_writer_truncates_to_offset() {
905        use spacetimedb_paths::FromPathUnchecked as _;
906
907        let tmp = tempdir().unwrap();
908        let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
909        let index_path = commitlog_dir.index(0);
910        let mut writer = OffsetIndexWriter::new(
911            TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
912            Options {
913                // Ensure we're writing every index entry.
914                offset_index_interval_bytes: 127.try_into().unwrap(),
915                offset_index_require_segment_fsync: false,
916                ..Default::default()
917            },
918        );
919
920        for i in 1..=10 {
921            writer.append_after_commit(i, i * 128, 128).unwrap();
922        }
923        // Ensure all entries have been written.
924        for i in 1..=10 {
925            assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
926        }
927
928        // Truncating to any offset in the written range or larger
929        // retains that offset - 1, or the max offset written.
930        for truncate_to in (2..=10u64).rev() {
931            let retained_key = truncate_to.saturating_sub(1).min(10);
932            let retained_val = retained_key * 128;
933            let retained = (retained_key, retained_val);
934
935            writer.ftruncate(truncate_to, rand::random()).unwrap();
936            assert_matches!(
937                writer.head.key_lookup(truncate_to),
938                Ok(x) if x == retained,
939                "truncate to {truncate_to} should retain {retained:?}"
940            );
941            // Make sure this also holds after reopen.
942            let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
943            assert_matches!(
944                index.key_lookup(truncate_to),
945                Ok(x) if x == retained,
946                "truncate to {truncate_to} should retain {retained:?} after reopen"
947            );
948        }
949
950        // Truncating to 1 leaves no entries in the index
951        writer.ftruncate(1, rand::random()).unwrap();
952        assert_matches!(writer.head.key_lookup(1), Err(IndexError::KeyNotFound));
953    }
954}