spacetimedb_commitlog/
segment.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4    num::{NonZeroU16, NonZeroU64},
5    ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11    commit::{self, Commit, StoredCommit},
12    error,
13    index::IndexError,
14    payload::Encode,
15    repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16    Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25
26#[derive(Clone, Copy, Debug, Eq, PartialEq)]
27pub struct Header {
28    pub log_format_version: u8,
29    pub checksum_algorithm: u8,
30}
31
32impl Header {
33    pub const LEN: usize = MAGIC.len() + /* log_format_version + checksum_algorithm + reserved + reserved */ 4;
34
35    pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
36        out.write_all(&MAGIC)?;
37        out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
38
39        Ok(())
40    }
41
42    pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
43        let mut buf = [0; Self::LEN];
44        read.read_exact(&mut buf)?;
45
46        if !buf.starts_with(&MAGIC) {
47            return Err(io::Error::new(
48                io::ErrorKind::InvalidData,
49                "segment header does not start with magic",
50            ));
51        }
52
53        Ok(Self {
54            log_format_version: buf[MAGIC.len()],
55            checksum_algorithm: buf[MAGIC.len() + 1],
56        })
57    }
58
59    pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
60        if self.log_format_version > max_log_format_version {
61            return Err(format!("unsupported log format version: {}", self.log_format_version));
62        }
63        if self.checksum_algorithm != checksum_algorithm {
64            return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
65        }
66
67        Ok(())
68    }
69}
70
71impl Default for Header {
72    fn default() -> Self {
73        Self {
74            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
75            checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
76        }
77    }
78}
79
80/// Metadata about a [`Commit`] which was successfully written via [`Writer::commit`].
81#[derive(Debug, PartialEq)]
82pub struct Committed {
83    /// The range of transaction offsets included in the commit.
84    pub tx_range: Range<u64>,
85    /// The crc32 checksum of the commit's serialized form,
86    /// as written to the commitlog.
87    pub checksum: u32,
88}
89
90#[derive(Debug)]
91pub struct Writer<W: io::Write> {
92    pub(crate) commit: Commit,
93    pub(crate) inner: BufWriter<W>,
94
95    pub(crate) min_tx_offset: u64,
96    pub(crate) bytes_written: u64,
97
98    pub(crate) max_records_in_commit: NonZeroU16,
99
100    pub(crate) offset_index_head: Option<OffsetIndexWriter>,
101}
102
103impl<W: io::Write> Writer<W> {
104    /// Append the record (aka transaction) `T` to the segment.
105    ///
106    /// If the number of currently buffered records would exceed `max_records_in_commit`
107    /// after the method returns, the argument is returned in an `Err` and not
108    /// appended to this writer's buffer.
109    ///
110    /// Otherwise, the `record` is encoded and and stored in the buffer.
111    ///
112    /// An `Err` result indicates that [`Self::commit`] should be called in
113    /// order to flush the buffered records to persistent storage.
114    pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
115        if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
116            Err(record)
117        } else {
118            self.commit.n += 1;
119            record.encode_record(&mut self.commit.records);
120            Ok(())
121        }
122    }
123
124    /// Write the current [`Commit`] to the underlying [`io::Write`].
125    ///
126    /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero).
127    /// In this case, `None` is returned.
128    ///
129    /// Otherwise `Some` [`Committed`] is returned, providing some metadata about
130    /// the commit.
131    pub fn commit(&mut self) -> io::Result<Option<Committed>> {
132        if self.commit.n == 0 {
133            return Ok(None);
134        }
135        let checksum = self.commit.write(&mut self.inner)?;
136        self.inner.flush()?;
137
138        let commit_len = self.commit.encoded_len() as u64;
139        self.offset_index_head.as_mut().map(|index| {
140            index
141                .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
142                .map_err(|e| {
143                    debug!("failed to append to offset index: {:?}", e);
144                })
145        });
146
147        let tx_range_start = self.commit.min_tx_offset;
148
149        self.bytes_written += commit_len;
150        self.commit.min_tx_offset += self.commit.n as u64;
151        self.commit.n = 0;
152        self.commit.records.clear();
153
154        Ok(Some(Committed {
155            tx_range: tx_range_start..self.commit.min_tx_offset,
156            checksum,
157        }))
158    }
159
160    /// Get the current epoch.
161    pub fn epoch(&self) -> u64 {
162        self.commit.epoch
163    }
164
165    /// Update the epoch.
166    ///
167    /// The caller must ensure that:
168    ///
169    /// - The new epoch is greater than the current epoch.
170    /// - [`Self::commit`] has been called as appropriate.
171    ///
172    pub fn set_epoch(&mut self, epoch: u64) {
173        self.commit.epoch = epoch;
174    }
175
176    /// The smallest transaction offset in this segment.
177    pub fn min_tx_offset(&self) -> u64 {
178        self.min_tx_offset
179    }
180
181    /// The next transaction offset to be written if [`Self::commit`] was called.
182    pub fn next_tx_offset(&self) -> u64 {
183        self.commit.min_tx_offset
184    }
185
186    /// `true` if the segment contains no commits.
187    ///
188    /// The segment will, however, contain a header. This thus violates the
189    /// convention that `is_empty == (len == 0)`.
190    pub fn is_empty(&self) -> bool {
191        self.bytes_written <= Header::LEN as u64
192    }
193
194    /// Number of bytes written to this segment, including the header.
195    pub fn len(&self) -> u64 {
196        self.bytes_written
197    }
198}
199
200pub trait FileLike {
201    fn fsync(&mut self) -> io::Result<()>;
202    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
203}
204
205impl FileLike for File {
206    fn fsync(&mut self) -> io::Result<()> {
207        self.sync_data()
208    }
209
210    fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
211        self.set_len(size)
212    }
213}
214
215impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
216    fn fsync(&mut self) -> io::Result<()> {
217        self.get_mut().fsync()
218    }
219
220    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
221        self.get_mut().ftruncate(tx_offset, size)
222    }
223}
224
225impl<W: io::Write + FileLike> FileLike for Writer<W> {
226    fn fsync(&mut self) -> io::Result<()> {
227        self.inner.fsync()?;
228        self.offset_index_head.as_mut().map(|index| index.fsync());
229        Ok(())
230    }
231
232    fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
233        self.inner.ftruncate(tx_offset, size)?;
234        self.offset_index_head
235            .as_mut()
236            .map(|index| index.ftruncate(tx_offset, size));
237        Ok(())
238    }
239}
240
241#[derive(Debug)]
242pub struct OffsetIndexWriter {
243    pub(crate) head: TxOffsetIndexMut,
244
245    require_segment_fsync: bool,
246    min_write_interval: NonZeroU64,
247
248    pub(crate) candidate_min_tx_offset: TxOffset,
249    pub(crate) candidate_byte_offset: u64,
250    pub(crate) bytes_since_last_index: u64,
251}
252
253impl OffsetIndexWriter {
254    pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
255        OffsetIndexWriter {
256            head,
257            require_segment_fsync: opts.offset_index_require_segment_fsync,
258            min_write_interval: opts.offset_index_interval_bytes,
259            candidate_min_tx_offset: TxOffset::default(),
260            candidate_byte_offset: 0,
261            bytes_since_last_index: 0,
262        }
263    }
264
265    fn reset(&mut self) {
266        self.candidate_byte_offset = 0;
267        self.candidate_min_tx_offset = TxOffset::default();
268        self.bytes_since_last_index = 0;
269    }
270
271    /// Either append to index or save offsets to append at future fsync
272    pub fn append_after_commit(
273        &mut self,
274        min_tx_offset: TxOffset,
275        byte_offset: u64,
276        commit_len: u64,
277    ) -> Result<(), IndexError> {
278        self.bytes_since_last_index += commit_len;
279
280        if self.candidate_min_tx_offset == 0 {
281            self.candidate_byte_offset = byte_offset;
282            self.candidate_min_tx_offset = min_tx_offset;
283        }
284
285        if !self.require_segment_fsync {
286            self.append_internal()?;
287        }
288
289        Ok(())
290    }
291
292    fn append_internal(&mut self) -> Result<(), IndexError> {
293        // If the candidate offset is zero, there has not been a commit since the last offset entry
294        if self.candidate_min_tx_offset == 0 {
295            return Ok(());
296        }
297
298        if self.bytes_since_last_index < self.min_write_interval.get() {
299            return Ok(());
300        }
301
302        self.head
303            .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
304        self.head.async_flush()?;
305        self.reset();
306
307        Ok(())
308    }
309}
310
311impl FileLike for OffsetIndexWriter {
312    /// Must be called via SegmentWriter::fsync
313    fn fsync(&mut self) -> io::Result<()> {
314        let _ = self.append_internal().map_err(|e| {
315            warn!("failed to append to offset index: {e:?}");
316        });
317        let _ = self
318            .head
319            .async_flush()
320            .map_err(|e| warn!("failed to flush offset index: {e:?}"));
321        Ok(())
322    }
323
324    fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
325        self.reset();
326        let _ = self.head.truncate(tx_offset);
327        Ok(())
328    }
329}
330
331#[derive(Debug)]
332pub struct Reader<R> {
333    pub header: Header,
334    pub min_tx_offset: u64,
335    inner: R,
336}
337
338impl<R: io::Read + io::Seek> Reader<R> {
339    pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
340        let header = Header::decode(&mut inner)?;
341        header
342            .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
343            .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
344
345        Ok(Self {
346            header,
347            min_tx_offset,
348            inner,
349        })
350    }
351}
352
353impl<R: io::Read + io::Seek> Reader<R> {
354    pub fn commits(self) -> Commits<R> {
355        Commits {
356            header: self.header,
357            reader: io::BufReader::new(self.inner),
358        }
359    }
360
361    pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> {
362        seek_to_offset(&mut self.inner, index_file, start_tx_offset)
363    }
364
365    #[cfg(test)]
366    pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
367    where
368        D: crate::Decoder,
369        D::Error: From<io::Error>,
370        R: 'a,
371    {
372        use itertools::Itertools as _;
373
374        self.commits()
375            .with_log_format_version()
376            .map(|x| x.map_err(Into::into))
377            .map_ok(move |(version, commit)| {
378                let start = commit.min_tx_offset;
379                commit.into_transactions(version, start, de)
380            })
381            .flatten_ok()
382            .map(|x| x.and_then(|y| y))
383    }
384
385    #[cfg(test)]
386    pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
387        Metadata::with_header(self.min_tx_offset, self.header, io::BufReader::new(self.inner))
388    }
389}
390
391/// Advances the `segment` reader to the position corresponding to the `start_tx_offset`
392/// using the `index_file` for efficient seeking.
393///
394/// Input:
395/// - `segment` - segment reader
396/// - `min_tx_offset` - minimum transaction offset in the segment
397/// - `start_tx_offset` - transaction offset to advance to
398pub fn seek_to_offset<R: io::Read + io::Seek>(
399    mut segment: &mut R,
400    index_file: &TxOffsetIndex,
401    start_tx_offset: u64,
402) -> Result<(), IndexError> {
403    let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
404
405    // If the index_key is 0, it means the index file is empty, no need to seek
406    if index_key == 0 {
407        return Ok(());
408    }
409    debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
410    // returned `index_key` should never be greater than `start_tx_offset`
411    debug_assert!(index_key <= start_tx_offset);
412
413    // Check if the offset index is pointing to the right commit.
414    validate_commit_header(&mut segment, byte_offset).map(|hdr| {
415        if hdr.min_tx_offset == index_key {
416            // Advance the segment Seek if expected commit is found.
417            segment
418                .seek(SeekFrom::Start(byte_offset))
419                .map(|_| ())
420                .map_err(Into::into)
421        } else {
422            Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into())
423        }
424    })?
425}
426
427/// Try to extract the commit header from the asked position without advancing seek.
428/// `IndexFileMut` fsync asynchoronously, which makes it important for reader to verify its entry
429pub fn validate_commit_header<Reader: io::Read + io::Seek>(
430    mut reader: &mut Reader,
431    byte_offset: u64,
432) -> io::Result<commit::Header> {
433    let pos = reader.stream_position()?;
434    reader.seek(SeekFrom::Start(byte_offset))?;
435
436    let hdr = commit::Header::decode(&mut reader)
437        .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
438
439    // Restore the original position
440    reader.seek(SeekFrom::Start(pos))?;
441
442    hdr
443}
444
445/// Pair of transaction offset and payload.
446///
447/// Created by iterators which "flatten" commits into individual transaction
448/// records.
449#[derive(Debug, PartialEq)]
450pub struct Transaction<T> {
451    /// The offset of this transaction relative to the start of the log.
452    pub offset: u64,
453    /// The transaction payload.
454    pub txdata: T,
455}
456
457pub struct Commits<R> {
458    pub header: Header,
459    reader: io::BufReader<R>,
460}
461
462impl<R: io::Read> Iterator for Commits<R> {
463    type Item = io::Result<StoredCommit>;
464
465    fn next(&mut self) -> Option<Self::Item> {
466        StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
467    }
468}
469
470#[cfg(test)]
471impl<R: io::Read> Commits<R> {
472    pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
473        CommitsWithVersion { inner: self }
474    }
475}
476
477#[cfg(test)]
478struct CommitsWithVersion<R> {
479    inner: Commits<R>,
480}
481
482#[cfg(test)]
483impl<R: io::Read> Iterator for CommitsWithVersion<R> {
484    type Item = io::Result<(u8, StoredCommit)>;
485
486    fn next(&mut self) -> Option<Self::Item> {
487        let next = self.inner.next()?;
488        match next {
489            Ok(commit) => {
490                let version = self.inner.header.log_format_version;
491                Some(Ok((version, commit)))
492            }
493            Err(e) => Some(Err(e)),
494        }
495    }
496}
497
498#[derive(Clone, Debug, Eq, PartialEq)]
499pub struct Metadata {
500    pub header: Header,
501    pub tx_range: Range<u64>,
502    pub size_in_bytes: u64,
503    pub max_epoch: u64,
504}
505
506impl Metadata {
507    /// Read and validate metadata from a segment.
508    ///
509    /// This traverses the entire segment, consuming thre `reader.
510    /// Doing so is necessary to determine `max_tx_offset`, `size_in_bytes` and
511    /// `max_epoch`.
512    pub(crate) fn extract<R: io::Read>(min_tx_offset: u64, mut reader: R) -> Result<Self, error::SegmentMetadata> {
513        let header = Header::decode(&mut reader)?;
514        Self::with_header(min_tx_offset, header, reader)
515    }
516
517    fn with_header<R: io::Read>(
518        min_tx_offset: u64,
519        header: Header,
520        mut reader: R,
521    ) -> Result<Self, error::SegmentMetadata> {
522        let mut sofar = Self {
523            header,
524            tx_range: Range {
525                start: min_tx_offset,
526                end: min_tx_offset,
527            },
528            size_in_bytes: Header::LEN as u64,
529            max_epoch: Commit::DEFAULT_EPOCH,
530        };
531
532        fn commit_meta<R: io::Read>(
533            reader: &mut R,
534            sofar: &Metadata,
535        ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
536            commit::Metadata::extract(reader).map_err(|e| {
537                if e.kind() == io::ErrorKind::InvalidData {
538                    error::SegmentMetadata::InvalidCommit {
539                        sofar: sofar.clone(),
540                        source: e,
541                    }
542                } else {
543                    e.into()
544                }
545            })
546        }
547        while let Some(commit) = commit_meta(&mut reader, &sofar)? {
548            debug!("commit::{commit:?}");
549            if commit.tx_range.start != sofar.tx_range.end {
550                return Err(io::Error::new(
551                    io::ErrorKind::InvalidData,
552                    format!(
553                        "out-of-order offset: expected={} actual={}",
554                        sofar.tx_range.end, commit.tx_range.start,
555                    ),
556                )
557                .into());
558            }
559            sofar.tx_range.end = commit.tx_range.end;
560            sofar.size_in_bytes += commit.size_in_bytes;
561            // TODO: Should it be an error to encounter an epoch going backwards?
562            sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
563        }
564
565        Ok(sofar)
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use std::num::NonZeroU16;
572
573    use super::*;
574    use crate::{payload::ArrayDecoder, repo, Options};
575    use itertools::Itertools;
576    use proptest::prelude::*;
577    use rand::thread_rng;
578    use spacetimedb_paths::server::CommitLogDir;
579    use tempfile::tempdir;
580
581    #[test]
582    fn header_roundtrip() {
583        let hdr = Header {
584            log_format_version: 42,
585            checksum_algorithm: 7,
586        };
587
588        let mut buf = [0u8; Header::LEN];
589        hdr.write(&mut &mut buf[..]).unwrap();
590        let h2 = Header::decode(&buf[..]).unwrap();
591
592        assert_eq!(hdr, h2);
593    }
594
595    #[test]
596    fn write_read_roundtrip() {
597        let repo = repo::Memory::default();
598
599        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
600        writer.append([0; 32]).unwrap();
601        writer.append([1; 32]).unwrap();
602        writer.append([2; 32]).unwrap();
603        writer.commit().unwrap();
604
605        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
606        let header = reader.header;
607        let commit = reader
608            .commits()
609            .next()
610            .expect("expected one commit")
611            .expect("unexpected IO");
612
613        assert_eq!(
614            header,
615            Header {
616                log_format_version: DEFAULT_LOG_FORMAT_VERSION,
617                checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
618            }
619        );
620        assert_eq!(commit.min_tx_offset, 0);
621        assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
622    }
623
624    #[test]
625    fn metadata() {
626        let repo = repo::Memory::default();
627
628        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
629        writer.append([0; 32]).unwrap();
630        writer.append([0; 32]).unwrap();
631        writer.commit().unwrap();
632        writer.append([1; 32]).unwrap();
633        writer.commit().unwrap();
634        writer.append([2; 32]).unwrap();
635        writer.append([2; 32]).unwrap();
636        writer.commit().unwrap();
637
638        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
639        let Metadata {
640            header: _,
641            tx_range,
642            size_in_bytes,
643            max_epoch: _,
644        } = reader.metadata().unwrap();
645
646        assert_eq!(tx_range.start, 0);
647        assert_eq!(tx_range.end, 5);
648        assert_eq!(
649            size_in_bytes,
650            (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64
651        );
652    }
653
654    #[test]
655    fn commits() {
656        let repo = repo::Memory::default();
657        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
658
659        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
660        for commit in &commits {
661            for tx in commit {
662                writer.append(*tx).unwrap();
663            }
664            writer.commit().unwrap();
665        }
666
667        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
668        let mut commits1 = Vec::with_capacity(commits.len());
669        let mut min_tx_offset = 0;
670        for txs in commits {
671            commits1.push(Commit {
672                min_tx_offset,
673                n: txs.len() as u16,
674                records: txs.concat(),
675                epoch: 0,
676            });
677            min_tx_offset += txs.len() as u64;
678        }
679        let commits2 = reader
680            .commits()
681            .map_ok(Into::into)
682            .collect::<Result<Vec<Commit>, _>>()
683            .unwrap();
684        assert_eq!(commits1, commits2);
685    }
686
687    #[test]
688    fn transactions() {
689        let repo = repo::Memory::default();
690        let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
691
692        let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
693        for commit in &commits {
694            for tx in commit {
695                writer.append(*tx).unwrap();
696            }
697            writer.commit().unwrap();
698        }
699
700        let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
701        let txs = reader
702            .transactions(&ArrayDecoder)
703            .collect::<Result<Vec<_>, _>>()
704            .unwrap();
705        assert_eq!(
706            txs,
707            commits
708                .into_iter()
709                .flatten()
710                .enumerate()
711                .map(|(offset, txdata)| Transaction {
712                    offset: offset as u64,
713                    txdata
714                })
715                .collect::<Vec<_>>()
716        );
717    }
718
719    proptest! {
720        #[test]
721        fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
722            let mut writer = Writer {
723                commit: Commit::default(),
724                inner: BufWriter::new(Vec::new()),
725
726                min_tx_offset: 0,
727                bytes_written: 0,
728
729                max_records_in_commit,
730
731                offset_index_head: None,
732            };
733
734            for i in 0..max_records_in_commit.get() {
735                assert!(
736                    writer.append([0; 16]).is_ok(),
737                    "less than {} records written: {}",
738                    max_records_in_commit.get(),
739                    i
740                );
741            }
742            assert!(
743                writer.append([0; 16]).is_err(),
744                "more than {} records written",
745                max_records_in_commit.get()
746            );
747        }
748    }
749
750    #[test]
751    fn next_tx_offset() {
752        let mut writer = Writer {
753            commit: Commit::default(),
754            inner: BufWriter::new(Vec::new()),
755
756            min_tx_offset: 0,
757            bytes_written: 0,
758
759            max_records_in_commit: NonZeroU16::MAX,
760            offset_index_head: None,
761        };
762
763        assert_eq!(0, writer.next_tx_offset());
764        writer.append([0; 16]).unwrap();
765        assert_eq!(0, writer.next_tx_offset());
766        writer.commit().unwrap();
767        assert_eq!(1, writer.next_tx_offset());
768        writer.commit().unwrap();
769        assert_eq!(1, writer.next_tx_offset());
770        writer.append([1; 16]).unwrap();
771        writer.append([1; 16]).unwrap();
772        writer.commit().unwrap();
773        assert_eq!(3, writer.next_tx_offset());
774    }
775
776    #[test]
777    fn offset_index_writer_truncates_to_offset() {
778        use spacetimedb_paths::FromPathUnchecked as _;
779
780        let tmp = tempdir().unwrap();
781        let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
782        let index_path = commitlog_dir.index(0);
783        let mut writer = OffsetIndexWriter::new(
784            TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
785            Options {
786                // Ensure we're writing every index entry.
787                offset_index_interval_bytes: 127.try_into().unwrap(),
788                offset_index_require_segment_fsync: false,
789                ..Default::default()
790            },
791        );
792
793        for i in 1..=10 {
794            writer.append_after_commit(i, i * 128, 128).unwrap();
795        }
796        // Ensure all entries have been written.
797        for i in 1..=10 {
798            assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
799        }
800
801        let mut rng = thread_rng();
802
803        // Truncating to any offset in the written range or larger
804        // retains that offset - 1, or the max offset written.
805        let truncate_to: TxOffset = rng.gen_range(1..=32);
806        let retained_key = truncate_to.saturating_sub(1).min(10);
807        let retained_val = retained_key * 128;
808        let retained = (retained_key, retained_val);
809
810        writer.ftruncate(truncate_to, rng.gen()).unwrap();
811        assert_eq!(writer.head.key_lookup(truncate_to).unwrap(), retained);
812        // Make sure this also holds after reopen.
813        drop(writer);
814        let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
815        assert_eq!(index.key_lookup(truncate_to).unwrap(), retained);
816    }
817}