1use std::{
2 fs::File,
3 io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4 num::{NonZeroU16, NonZeroU64},
5 ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11 commit::{self, Commit, StoredCommit},
12 error,
13 index::{IndexError, IndexFileMut},
14 payload::Encode,
15 repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16 Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33 pub log_format_version: u8,
34 pub checksum_algorithm: u8,
35}
36
37impl Header {
38 pub const LEN: usize = MAGIC.len() + 4;
39
40 pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41 out.write_all(&MAGIC)?;
42 out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44 Ok(())
45 }
46
47 pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48 let mut buf = [0; Self::LEN];
49 read.read_exact(&mut buf)?;
50
51 if !buf.starts_with(&MAGIC) {
52 return Err(io::Error::new(
53 io::ErrorKind::InvalidData,
54 "segment header does not start with magic",
55 ));
56 }
57
58 Ok(Self {
59 log_format_version: buf[MAGIC.len()],
60 checksum_algorithm: buf[MAGIC.len() + 1],
61 })
62 }
63
64 pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65 if self.log_format_version > max_log_format_version {
66 return Err(format!("unsupported log format version: {}", self.log_format_version));
67 }
68 if self.checksum_algorithm != checksum_algorithm {
69 return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70 }
71
72 Ok(())
73 }
74}
75
76impl Default for Header {
77 fn default() -> Self {
78 Self {
79 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81 }
82 }
83}
84
85#[derive(Debug, PartialEq)]
87pub struct Committed {
88 pub tx_range: Range<u64>,
90 pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97 pub(crate) commit: Commit,
98 pub(crate) inner: BufWriter<W>,
99
100 pub(crate) min_tx_offset: u64,
101 pub(crate) bytes_written: u64,
102
103 pub(crate) max_records_in_commit: NonZeroU16,
104
105 pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109 pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120 if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121 Err(record)
122 } else {
123 self.commit.n += 1;
124 record.encode_record(&mut self.commit.records);
125 Ok(())
126 }
127 }
128
129 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137 if self.commit.n == 0 {
138 return Ok(None);
139 }
140 let checksum = self.commit.write(&mut self.inner)?;
141 self.inner.flush()?;
142
143 let commit_len = self.commit.encoded_len() as u64;
144 self.offset_index_head.as_mut().map(|index| {
145 debug!(
146 "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147 self.commit.min_tx_offset, self.bytes_written, commit_len
148 );
149 index
150 .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151 .map_err(|e| {
152 debug!("failed to append to offset index: {e:?}");
153 })
154 });
155
156 let tx_range_start = self.commit.min_tx_offset;
157
158 self.bytes_written += commit_len;
159 self.commit.min_tx_offset += self.commit.n as u64;
160 self.commit.n = 0;
161 self.commit.records.clear();
162
163 Ok(Some(Committed {
164 tx_range: tx_range_start..self.commit.min_tx_offset,
165 checksum,
166 }))
167 }
168
169 pub fn epoch(&self) -> u64 {
171 self.commit.epoch
172 }
173
174 pub fn set_epoch(&mut self, epoch: u64) {
182 self.commit.epoch = epoch;
183 }
184
185 pub fn min_tx_offset(&self) -> u64 {
187 self.min_tx_offset
188 }
189
190 pub fn next_tx_offset(&self) -> u64 {
192 self.commit.min_tx_offset
193 }
194
195 pub fn is_empty(&self) -> bool {
200 self.bytes_written <= Header::LEN as u64
201 }
202
203 pub fn len(&self) -> u64 {
205 self.bytes_written
206 }
207}
208
209pub trait FileLike {
210 fn fsync(&mut self) -> io::Result<()>;
211 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215 fn fsync(&mut self) -> io::Result<()> {
216 self.sync_data()
217 }
218
219 fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220 self.set_len(size)
221 }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225 fn fsync(&mut self) -> io::Result<()> {
226 self.get_mut().fsync()
227 }
228
229 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230 self.get_mut().ftruncate(tx_offset, size)
231 }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235 fn fsync(&mut self) -> io::Result<()> {
236 self.inner.fsync()?;
237 self.offset_index_head.as_mut().map(|index| index.fsync());
238 Ok(())
239 }
240
241 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242 self.inner.ftruncate(tx_offset, size)?;
243 self.offset_index_head
244 .as_mut()
245 .map(|index| index.ftruncate(tx_offset, size));
246 Ok(())
247 }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252 pub(crate) head: TxOffsetIndexMut,
253
254 require_segment_fsync: bool,
255 min_write_interval: NonZeroU64,
256
257 pub(crate) candidate_min_tx_offset: TxOffset,
258 pub(crate) candidate_byte_offset: u64,
259 pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263 pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264 OffsetIndexWriter {
265 head,
266 require_segment_fsync: opts.offset_index_require_segment_fsync,
267 min_write_interval: opts.offset_index_interval_bytes,
268 candidate_min_tx_offset: TxOffset::default(),
269 candidate_byte_offset: 0,
270 bytes_since_last_index: 0,
271 }
272 }
273
274 fn reset(&mut self) {
275 self.candidate_byte_offset = 0;
276 self.candidate_min_tx_offset = TxOffset::default();
277 self.bytes_since_last_index = 0;
278 }
279
280 pub fn append_after_commit(
282 &mut self,
283 min_tx_offset: TxOffset,
284 byte_offset: u64,
285 commit_len: u64,
286 ) -> Result<(), IndexError> {
287 self.bytes_since_last_index += commit_len;
288
289 if self.candidate_min_tx_offset == 0 {
290 self.candidate_byte_offset = byte_offset;
291 self.candidate_min_tx_offset = min_tx_offset;
292 }
293
294 if !self.require_segment_fsync {
295 self.append_internal()?;
296 }
297
298 Ok(())
299 }
300
301 fn append_internal(&mut self) -> Result<(), IndexError> {
302 if self.candidate_min_tx_offset == 0 {
304 return Ok(());
305 }
306
307 if self.bytes_since_last_index < self.min_write_interval.get() {
308 return Ok(());
309 }
310
311 self.head
312 .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313 self.head.async_flush()?;
314 self.reset();
315
316 Ok(())
317 }
318}
319
320impl FileLike for OffsetIndexWriter {
321 fn fsync(&mut self) -> io::Result<()> {
323 let _ = self.append_internal().map_err(|e| {
324 warn!("failed to append to offset index: {e:?}");
325 });
326 let _ = self
327 .head
328 .async_flush()
329 .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330 Ok(())
331 }
332
333 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334 self.reset();
335 self.head
336 .truncate(tx_offset)
337 .inspect_err(|e| {
338 warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339 })
340 .ok();
341 Ok(())
342 }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346 fn fsync(&mut self) -> io::Result<()> {
347 self.async_flush()
348 }
349
350 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351 self.truncate(tx_offset)
352 .map_err(|e| io::Error::other(format!("failed to truncate offset index at {tx_offset}: {e:?}")))
353 }
354}
355
356#[derive(Debug)]
357pub struct Reader<R> {
358 pub header: Header,
359 pub min_tx_offset: u64,
360 inner: R,
361}
362
363impl<R: io::Read + io::Seek> Reader<R> {
364 pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
365 let header = Header::decode(&mut inner)?;
366 header
367 .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
368 .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
369
370 Ok(Self {
371 header,
372 min_tx_offset,
373 inner,
374 })
375 }
376}
377
378impl<R: io::BufRead + io::Seek> Reader<R> {
379 pub fn commits(self) -> Commits<R> {
380 Commits {
381 header: self.header,
382 reader: self.inner,
383 }
384 }
385
386 pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<u64, IndexError> {
387 seek_to_offset(&mut self.inner, index_file, start_tx_offset)
388 }
389
390 #[cfg(test)]
391 pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
392 where
393 D: crate::Decoder,
394 D::Error: From<io::Error>,
395 R: 'a,
396 {
397 use itertools::Itertools as _;
398
399 self.commits()
400 .with_log_format_version()
401 .map(|x| x.map_err(Into::into))
402 .map_ok(move |(version, commit)| {
403 let start = commit.min_tx_offset;
404 commit.into_transactions(version, start, de)
405 })
406 .flatten_ok()
407 .map(|x| x.and_then(|y| y))
408 }
409
410 #[cfg(test)]
411 pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
412 Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
413 }
414}
415
416pub fn seek_to_offset<R: io::Read + io::Seek>(
426 mut segment: &mut R,
427 index_file: &TxOffsetIndex,
428 start_tx_offset: u64,
429) -> Result<u64, IndexError> {
430 let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
431
432 if index_key == 0 {
434 return Err(IndexError::KeyNotFound);
435 }
436 debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
437 debug_assert!(index_key <= start_tx_offset);
439
440 let hdr = validate_commit_header(&mut segment, byte_offset)?;
442 if hdr.min_tx_offset == index_key {
443 segment.seek(SeekFrom::Start(byte_offset))
445 } else {
446 Err(io::Error::new(
447 io::ErrorKind::InvalidData,
448 "mismatched key in offset index file",
449 ))
450 }
451 .map_err(Into::into)
452}
453
454pub fn validate_commit_header<Reader: io::Read + io::Seek>(
457 mut reader: &mut Reader,
458 byte_offset: u64,
459) -> io::Result<commit::Header> {
460 let pos = reader.stream_position()?;
461 reader.seek(SeekFrom::Start(byte_offset))?;
462
463 let hdr = commit::Header::decode(&mut reader)
464 .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
465
466 reader.seek(SeekFrom::Start(pos))?;
468
469 hdr
470}
471
472#[derive(Debug, PartialEq)]
477pub struct Transaction<T> {
478 pub offset: u64,
480 pub txdata: T,
482}
483
484pub struct Commits<R> {
485 pub header: Header,
486 reader: R,
487}
488
489impl<R: io::BufRead> Iterator for Commits<R> {
490 type Item = io::Result<StoredCommit>;
491
492 fn next(&mut self) -> Option<Self::Item> {
493 StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
494 }
495}
496
497#[cfg(test)]
498impl<R: io::BufRead> Commits<R> {
499 pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
500 CommitsWithVersion { inner: self }
501 }
502}
503
504#[cfg(test)]
505struct CommitsWithVersion<R> {
506 inner: Commits<R>,
507}
508
509#[cfg(test)]
510impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
511 type Item = io::Result<(u8, StoredCommit)>;
512
513 fn next(&mut self) -> Option<Self::Item> {
514 let next = self.inner.next()?;
515 match next {
516 Ok(commit) => {
517 let version = self.inner.header.log_format_version;
518 Some(Ok((version, commit)))
519 }
520 Err(e) => Some(Err(e)),
521 }
522 }
523}
524
525#[derive(Clone, Debug, Eq, PartialEq)]
526pub struct Metadata {
527 pub header: Header,
529 pub tx_range: Range<u64>,
531 pub size_in_bytes: u64,
533 pub max_epoch: u64,
535 pub max_commit_offset: u64,
541}
542
543impl Metadata {
544 pub(crate) fn extract<R: io::Read + io::Seek>(
549 min_tx_offset: TxOffset,
550 mut reader: R,
551 offset_index: Option<&TxOffsetIndex>,
552 ) -> Result<Self, error::SegmentMetadata> {
553 let header = Header::decode(&mut reader)?;
554 Self::with_header(min_tx_offset, header, reader, offset_index)
555 }
556
557 fn with_header<R: io::Read + io::Seek>(
558 min_tx_offset: u64,
559 header: Header,
560 mut reader: R,
561 offset_index: Option<&TxOffsetIndex>,
562 ) -> Result<Self, error::SegmentMetadata> {
563 let mut sofar = offset_index
564 .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
565 .unwrap_or_else(|| Self {
566 header,
567 tx_range: Range {
568 start: min_tx_offset,
569 end: min_tx_offset,
570 },
571 size_in_bytes: Header::LEN as u64,
572 max_epoch: u64::default(),
573 max_commit_offset: min_tx_offset,
574 });
575
576 reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
577
578 fn commit_meta<R: io::Read>(
579 reader: &mut R,
580 sofar: &Metadata,
581 ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
582 commit::Metadata::extract(reader).map_err(|e| {
583 if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
584 error::SegmentMetadata::InvalidCommit {
585 sofar: sofar.clone(),
586 source: e,
587 }
588 } else {
589 e.into()
590 }
591 })
592 }
593 while let Some(commit) = commit_meta(&mut reader, &sofar)? {
594 debug!("commit::{commit:?}");
595 if commit.tx_range.start != sofar.tx_range.end {
596 return Err(io::Error::new(
597 io::ErrorKind::InvalidData,
598 format!(
599 "out-of-order offset: expected={} actual={}",
600 sofar.tx_range.end, commit.tx_range.start,
601 ),
602 )
603 .into());
604 }
605 sofar.tx_range.end = commit.tx_range.end;
606 sofar.size_in_bytes += commit.size_in_bytes;
607 sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
609 sofar.max_commit_offset = commit.tx_range.start;
610 }
611
612 Ok(sofar)
613 }
614
615 fn find_valid_indexed_commit<R: io::Read + io::Seek>(
623 min_tx_offset: u64,
624 header: Header,
625 reader: &mut R,
626 offset_index: &TxOffsetIndex,
627 ) -> io::Result<Metadata> {
628 let mut candidate_last_key = TxOffset::MAX;
629
630 while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
631 match Self::validate_commit_at_offset(reader, key, byte_offset) {
632 Ok(commit) => {
633 return Ok(Metadata {
634 header,
635 tx_range: Range {
636 start: min_tx_offset,
637 end: commit.tx_range.end,
638 },
639 size_in_bytes: byte_offset + commit.size_in_bytes,
640 max_epoch: commit.epoch,
641 max_commit_offset: commit.tx_range.start,
642 });
643 }
644
645 Err(_) => {
647 candidate_last_key = key.saturating_sub(1);
648 if candidate_last_key == 0 {
649 break;
650 }
651 }
652 }
653 }
654
655 Err(io::Error::new(
656 ErrorKind::InvalidData,
657 format!("No valid commit found in index up to key: {candidate_last_key}"),
658 ))
659 }
660
661 fn validate_commit_at_offset<R: io::Read + io::Seek>(
667 reader: &mut R,
668 tx_offset: TxOffset,
669 byte_offset: u64,
670 ) -> io::Result<commit::Metadata> {
671 reader.seek(SeekFrom::Start(byte_offset))?;
672 let commit = commit::Metadata::extract(reader)?
673 .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
674
675 if commit.tx_range.start != tx_offset {
676 return Err(io::Error::new(
677 ErrorKind::InvalidData,
678 format!(
679 "mismatch key in index offset file: expected={} actual={}",
680 tx_offset, commit.tx_range.start
681 ),
682 ));
683 }
684
685 Ok(commit)
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use std::num::NonZeroU16;
692
693 use super::*;
694 use crate::{payload::ArrayDecoder, repo, Options};
695 use itertools::Itertools;
696 use pretty_assertions::assert_matches;
697 use proptest::prelude::*;
698 use spacetimedb_paths::server::CommitLogDir;
699 use tempfile::tempdir;
700
701 #[test]
702 fn header_roundtrip() {
703 let hdr = Header {
704 log_format_version: 42,
705 checksum_algorithm: 7,
706 };
707
708 let mut buf = [0u8; Header::LEN];
709 hdr.write(&mut &mut buf[..]).unwrap();
710 let h2 = Header::decode(&buf[..]).unwrap();
711
712 assert_eq!(hdr, h2);
713 }
714
715 #[test]
716 fn write_read_roundtrip() {
717 let repo = repo::Memory::default();
718
719 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
720 writer.append([0; 32]).unwrap();
721 writer.append([1; 32]).unwrap();
722 writer.append([2; 32]).unwrap();
723 writer.commit().unwrap();
724
725 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
726 let header = reader.header;
727 let commit = reader
728 .commits()
729 .next()
730 .expect("expected one commit")
731 .expect("unexpected IO");
732
733 assert_eq!(
734 header,
735 Header {
736 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
737 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
738 }
739 );
740 assert_eq!(commit.min_tx_offset, 0);
741 assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
742 }
743
744 #[test]
745 fn metadata() {
746 let repo = repo::Memory::default();
747
748 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
749 writer.append([0; 32]).unwrap();
751 writer.append([0; 32]).unwrap();
752 writer.commit().unwrap();
753 writer.append([1; 32]).unwrap();
755 writer.commit().unwrap();
756 writer.append([2; 32]).unwrap();
758 writer.append([2; 32]).unwrap();
759 writer.commit().unwrap();
760
761 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
762 let metadata = reader.metadata().unwrap();
763
764 assert_eq!(
765 metadata,
766 Metadata {
767 header: Header::default(),
768 tx_range: Range { start: 0, end: 5 },
769 size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
771 max_epoch: Commit::DEFAULT_EPOCH,
772 max_commit_offset: 3
773 }
774 );
775 }
776
777 #[test]
778 fn commits() {
779 let repo = repo::Memory::default();
780 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
781
782 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
783 for commit in &commits {
784 for tx in commit {
785 writer.append(*tx).unwrap();
786 }
787 writer.commit().unwrap();
788 }
789
790 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
791 let mut commits1 = Vec::with_capacity(commits.len());
792 let mut min_tx_offset = 0;
793 for txs in commits {
794 commits1.push(Commit {
795 min_tx_offset,
796 n: txs.len() as u16,
797 records: txs.concat(),
798 epoch: 0,
799 });
800 min_tx_offset += txs.len() as u64;
801 }
802 let commits2 = reader
803 .commits()
804 .map_ok(Into::into)
805 .collect::<Result<Vec<Commit>, _>>()
806 .unwrap();
807 assert_eq!(commits1, commits2);
808 }
809
810 #[test]
811 fn transactions() {
812 let repo = repo::Memory::default();
813 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
814
815 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
816 for commit in &commits {
817 for tx in commit {
818 writer.append(*tx).unwrap();
819 }
820 writer.commit().unwrap();
821 }
822
823 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
824 let txs = reader
825 .transactions(&ArrayDecoder)
826 .collect::<Result<Vec<_>, _>>()
827 .unwrap();
828 assert_eq!(
829 txs,
830 commits
831 .into_iter()
832 .flatten()
833 .enumerate()
834 .map(|(offset, txdata)| Transaction {
835 offset: offset as u64,
836 txdata
837 })
838 .collect::<Vec<_>>()
839 );
840 }
841
842 proptest! {
843 #[test]
844 fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
845 let mut writer = Writer {
846 commit: Commit::default(),
847 inner: BufWriter::new(Vec::new()),
848
849 min_tx_offset: 0,
850 bytes_written: 0,
851
852 max_records_in_commit,
853
854 offset_index_head: None,
855 };
856
857 for i in 0..max_records_in_commit.get() {
858 assert!(
859 writer.append([0; 16]).is_ok(),
860 "less than {} records written: {}",
861 max_records_in_commit.get(),
862 i
863 );
864 }
865 assert!(
866 writer.append([0; 16]).is_err(),
867 "more than {} records written",
868 max_records_in_commit.get()
869 );
870 }
871 }
872
873 #[test]
874 fn next_tx_offset() {
875 let mut writer = Writer {
876 commit: Commit::default(),
877 inner: BufWriter::new(Vec::new()),
878
879 min_tx_offset: 0,
880 bytes_written: 0,
881
882 max_records_in_commit: NonZeroU16::MAX,
883 offset_index_head: None,
884 };
885
886 assert_eq!(0, writer.next_tx_offset());
887 writer.append([0; 16]).unwrap();
888 assert_eq!(0, writer.next_tx_offset());
889 writer.commit().unwrap();
890 assert_eq!(1, writer.next_tx_offset());
891 writer.commit().unwrap();
892 assert_eq!(1, writer.next_tx_offset());
893 writer.append([1; 16]).unwrap();
894 writer.append([1; 16]).unwrap();
895 writer.commit().unwrap();
896 assert_eq!(3, writer.next_tx_offset());
897 }
898
899 #[test]
900 fn offset_index_writer_truncates_to_offset() {
901 use spacetimedb_paths::FromPathUnchecked as _;
902
903 let tmp = tempdir().unwrap();
904 let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
905 let index_path = commitlog_dir.index(0);
906 let mut writer = OffsetIndexWriter::new(
907 TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
908 Options {
909 offset_index_interval_bytes: 127.try_into().unwrap(),
911 offset_index_require_segment_fsync: false,
912 ..Default::default()
913 },
914 );
915
916 for i in 1..=10 {
917 writer.append_after_commit(i, i * 128, 128).unwrap();
918 }
919 for i in 1..=10 {
921 assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
922 }
923
924 for truncate_to in (2..=10u64).rev() {
927 let retained_key = truncate_to.saturating_sub(1).min(10);
928 let retained_val = retained_key * 128;
929 let retained = (retained_key, retained_val);
930
931 writer.ftruncate(truncate_to, rand::random()).unwrap();
932 assert_matches!(
933 writer.head.key_lookup(truncate_to),
934 Ok(x) if x == retained,
935 "truncate to {truncate_to} should retain {retained:?}"
936 );
937 let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
939 assert_matches!(
940 index.key_lookup(truncate_to),
941 Ok(x) if x == retained,
942 "truncate to {truncate_to} should retain {retained:?} after reopen"
943 );
944 }
945
946 writer.ftruncate(1, rand::random()).unwrap();
948 assert_matches!(writer.head.key_lookup(1), Err(IndexError::KeyNotFound));
949 }
950}