1use std::{
2 fs::File,
3 io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4 num::{NonZeroU16, NonZeroU64},
5 ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11 commit::{self, Commit, StoredCommit},
12 error,
13 index::{IndexError, IndexFileMut},
14 payload::Encode,
15 repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16 Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33 pub log_format_version: u8,
34 pub checksum_algorithm: u8,
35}
36
37impl Header {
38 pub const LEN: usize = MAGIC.len() + 4;
39
40 pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41 out.write_all(&MAGIC)?;
42 out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44 Ok(())
45 }
46
47 pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48 let mut buf = [0; Self::LEN];
49 read.read_exact(&mut buf)?;
50
51 if !buf.starts_with(&MAGIC) {
52 return Err(io::Error::new(
53 io::ErrorKind::InvalidData,
54 "segment header does not start with magic",
55 ));
56 }
57
58 Ok(Self {
59 log_format_version: buf[MAGIC.len()],
60 checksum_algorithm: buf[MAGIC.len() + 1],
61 })
62 }
63
64 pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65 if self.log_format_version > max_log_format_version {
66 return Err(format!("unsupported log format version: {}", self.log_format_version));
67 }
68 if self.checksum_algorithm != checksum_algorithm {
69 return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70 }
71
72 Ok(())
73 }
74}
75
76impl Default for Header {
77 fn default() -> Self {
78 Self {
79 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81 }
82 }
83}
84
85#[derive(Debug, PartialEq)]
87pub struct Committed {
88 pub tx_range: Range<u64>,
90 pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97 pub(crate) commit: Commit,
98 pub(crate) inner: BufWriter<W>,
99
100 pub(crate) min_tx_offset: u64,
101 pub(crate) bytes_written: u64,
102
103 pub(crate) max_records_in_commit: NonZeroU16,
104
105 pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109 pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120 if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121 Err(record)
122 } else {
123 self.commit.n += 1;
124 record.encode_record(&mut self.commit.records);
125 Ok(())
126 }
127 }
128
129 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137 if self.commit.n == 0 {
138 return Ok(None);
139 }
140 let checksum = self.commit.write(&mut self.inner)?;
141 self.inner.flush()?;
142
143 let commit_len = self.commit.encoded_len() as u64;
144 self.offset_index_head.as_mut().map(|index| {
145 debug!(
146 "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147 self.commit.min_tx_offset, self.bytes_written, commit_len
148 );
149 index
150 .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151 .map_err(|e| {
152 debug!("failed to append to offset index: {:?}", e);
153 })
154 });
155
156 let tx_range_start = self.commit.min_tx_offset;
157
158 self.bytes_written += commit_len;
159 self.commit.min_tx_offset += self.commit.n as u64;
160 self.commit.n = 0;
161 self.commit.records.clear();
162
163 Ok(Some(Committed {
164 tx_range: tx_range_start..self.commit.min_tx_offset,
165 checksum,
166 }))
167 }
168
169 pub fn epoch(&self) -> u64 {
171 self.commit.epoch
172 }
173
174 pub fn set_epoch(&mut self, epoch: u64) {
182 self.commit.epoch = epoch;
183 }
184
185 pub fn min_tx_offset(&self) -> u64 {
187 self.min_tx_offset
188 }
189
190 pub fn next_tx_offset(&self) -> u64 {
192 self.commit.min_tx_offset
193 }
194
195 pub fn is_empty(&self) -> bool {
200 self.bytes_written <= Header::LEN as u64
201 }
202
203 pub fn len(&self) -> u64 {
205 self.bytes_written
206 }
207}
208
209pub trait FileLike {
210 fn fsync(&mut self) -> io::Result<()>;
211 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215 fn fsync(&mut self) -> io::Result<()> {
216 self.sync_data()
217 }
218
219 fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220 self.set_len(size)
221 }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225 fn fsync(&mut self) -> io::Result<()> {
226 self.get_mut().fsync()
227 }
228
229 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230 self.get_mut().ftruncate(tx_offset, size)
231 }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235 fn fsync(&mut self) -> io::Result<()> {
236 self.inner.fsync()?;
237 self.offset_index_head.as_mut().map(|index| index.fsync());
238 Ok(())
239 }
240
241 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242 self.inner.ftruncate(tx_offset, size)?;
243 self.offset_index_head
244 .as_mut()
245 .map(|index| index.ftruncate(tx_offset, size));
246 Ok(())
247 }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252 pub(crate) head: TxOffsetIndexMut,
253
254 require_segment_fsync: bool,
255 min_write_interval: NonZeroU64,
256
257 pub(crate) candidate_min_tx_offset: TxOffset,
258 pub(crate) candidate_byte_offset: u64,
259 pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263 pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264 OffsetIndexWriter {
265 head,
266 require_segment_fsync: opts.offset_index_require_segment_fsync,
267 min_write_interval: opts.offset_index_interval_bytes,
268 candidate_min_tx_offset: TxOffset::default(),
269 candidate_byte_offset: 0,
270 bytes_since_last_index: 0,
271 }
272 }
273
274 fn reset(&mut self) {
275 self.candidate_byte_offset = 0;
276 self.candidate_min_tx_offset = TxOffset::default();
277 self.bytes_since_last_index = 0;
278 }
279
280 pub fn append_after_commit(
282 &mut self,
283 min_tx_offset: TxOffset,
284 byte_offset: u64,
285 commit_len: u64,
286 ) -> Result<(), IndexError> {
287 self.bytes_since_last_index += commit_len;
288
289 if self.candidate_min_tx_offset == 0 {
290 self.candidate_byte_offset = byte_offset;
291 self.candidate_min_tx_offset = min_tx_offset;
292 }
293
294 if !self.require_segment_fsync {
295 self.append_internal()?;
296 }
297
298 Ok(())
299 }
300
301 fn append_internal(&mut self) -> Result<(), IndexError> {
302 if self.candidate_min_tx_offset == 0 {
304 return Ok(());
305 }
306
307 if self.bytes_since_last_index < self.min_write_interval.get() {
308 return Ok(());
309 }
310
311 self.head
312 .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313 self.head.async_flush()?;
314 self.reset();
315
316 Ok(())
317 }
318}
319
320impl FileLike for OffsetIndexWriter {
321 fn fsync(&mut self) -> io::Result<()> {
323 let _ = self.append_internal().map_err(|e| {
324 warn!("failed to append to offset index: {e:?}");
325 });
326 let _ = self
327 .head
328 .async_flush()
329 .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330 Ok(())
331 }
332
333 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334 self.reset();
335 self.head
336 .truncate(tx_offset)
337 .inspect_err(|e| {
338 warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339 })
340 .ok();
341 Ok(())
342 }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346 fn fsync(&mut self) -> io::Result<()> {
347 self.async_flush()
348 }
349
350 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351 self.truncate(tx_offset).map_err(|e| {
352 io::Error::new(
353 ErrorKind::Other,
354 format!("failed to truncate offset index at {tx_offset}: {e:?}"),
355 )
356 })
357 }
358}
359
360#[derive(Debug)]
361pub struct Reader<R> {
362 pub header: Header,
363 pub min_tx_offset: u64,
364 inner: R,
365}
366
367impl<R: io::Read + io::Seek> Reader<R> {
368 pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
369 let header = Header::decode(&mut inner)?;
370 header
371 .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
372 .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
373
374 Ok(Self {
375 header,
376 min_tx_offset,
377 inner,
378 })
379 }
380}
381
382impl<R: io::BufRead + io::Seek> Reader<R> {
383 pub fn commits(self) -> Commits<R> {
384 Commits {
385 header: self.header,
386 reader: self.inner,
387 }
388 }
389
390 pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<u64, IndexError> {
391 seek_to_offset(&mut self.inner, index_file, start_tx_offset)
392 }
393
394 #[cfg(test)]
395 pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
396 where
397 D: crate::Decoder,
398 D::Error: From<io::Error>,
399 R: 'a,
400 {
401 use itertools::Itertools as _;
402
403 self.commits()
404 .with_log_format_version()
405 .map(|x| x.map_err(Into::into))
406 .map_ok(move |(version, commit)| {
407 let start = commit.min_tx_offset;
408 commit.into_transactions(version, start, de)
409 })
410 .flatten_ok()
411 .map(|x| x.and_then(|y| y))
412 }
413
414 #[cfg(test)]
415 pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
416 Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
417 }
418}
419
420pub fn seek_to_offset<R: io::Read + io::Seek>(
430 mut segment: &mut R,
431 index_file: &TxOffsetIndex,
432 start_tx_offset: u64,
433) -> Result<u64, IndexError> {
434 let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
435
436 if index_key == 0 {
438 return Err(IndexError::KeyNotFound);
439 }
440 debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
441 debug_assert!(index_key <= start_tx_offset);
443
444 let hdr = validate_commit_header(&mut segment, byte_offset)?;
446 if hdr.min_tx_offset == index_key {
447 segment.seek(SeekFrom::Start(byte_offset))
449 } else {
450 Err(io::Error::new(
451 io::ErrorKind::InvalidData,
452 "mismatched key in offset index file",
453 ))
454 }
455 .map_err(Into::into)
456}
457
458pub fn validate_commit_header<Reader: io::Read + io::Seek>(
461 mut reader: &mut Reader,
462 byte_offset: u64,
463) -> io::Result<commit::Header> {
464 let pos = reader.stream_position()?;
465 reader.seek(SeekFrom::Start(byte_offset))?;
466
467 let hdr = commit::Header::decode(&mut reader)
468 .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
469
470 reader.seek(SeekFrom::Start(pos))?;
472
473 hdr
474}
475
476#[derive(Debug, PartialEq)]
481pub struct Transaction<T> {
482 pub offset: u64,
484 pub txdata: T,
486}
487
488pub struct Commits<R> {
489 pub header: Header,
490 reader: R,
491}
492
493impl<R: io::BufRead> Iterator for Commits<R> {
494 type Item = io::Result<StoredCommit>;
495
496 fn next(&mut self) -> Option<Self::Item> {
497 StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
498 }
499}
500
501#[cfg(test)]
502impl<R: io::BufRead> Commits<R> {
503 pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
504 CommitsWithVersion { inner: self }
505 }
506}
507
508#[cfg(test)]
509struct CommitsWithVersion<R> {
510 inner: Commits<R>,
511}
512
513#[cfg(test)]
514impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
515 type Item = io::Result<(u8, StoredCommit)>;
516
517 fn next(&mut self) -> Option<Self::Item> {
518 let next = self.inner.next()?;
519 match next {
520 Ok(commit) => {
521 let version = self.inner.header.log_format_version;
522 Some(Ok((version, commit)))
523 }
524 Err(e) => Some(Err(e)),
525 }
526 }
527}
528
529#[derive(Clone, Debug, Eq, PartialEq)]
530pub struct Metadata {
531 pub header: Header,
533 pub tx_range: Range<u64>,
535 pub size_in_bytes: u64,
537 pub max_epoch: u64,
539 pub max_commit_offset: u64,
545}
546
547impl Metadata {
548 pub(crate) fn extract<R: io::Read + io::Seek>(
553 min_tx_offset: TxOffset,
554 mut reader: R,
555 offset_index: Option<&TxOffsetIndex>,
556 ) -> Result<Self, error::SegmentMetadata> {
557 let header = Header::decode(&mut reader)?;
558 Self::with_header(min_tx_offset, header, reader, offset_index)
559 }
560
561 fn with_header<R: io::Read + io::Seek>(
562 min_tx_offset: u64,
563 header: Header,
564 mut reader: R,
565 offset_index: Option<&TxOffsetIndex>,
566 ) -> Result<Self, error::SegmentMetadata> {
567 let mut sofar = offset_index
568 .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
569 .unwrap_or_else(|| Self {
570 header,
571 tx_range: Range {
572 start: min_tx_offset,
573 end: min_tx_offset,
574 },
575 size_in_bytes: Header::LEN as u64,
576 max_epoch: u64::default(),
577 max_commit_offset: min_tx_offset,
578 });
579
580 reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
581
582 fn commit_meta<R: io::Read>(
583 reader: &mut R,
584 sofar: &Metadata,
585 ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
586 commit::Metadata::extract(reader).map_err(|e| {
587 if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
588 error::SegmentMetadata::InvalidCommit {
589 sofar: sofar.clone(),
590 source: e,
591 }
592 } else {
593 e.into()
594 }
595 })
596 }
597 while let Some(commit) = commit_meta(&mut reader, &sofar)? {
598 debug!("commit::{commit:?}");
599 if commit.tx_range.start != sofar.tx_range.end {
600 return Err(io::Error::new(
601 io::ErrorKind::InvalidData,
602 format!(
603 "out-of-order offset: expected={} actual={}",
604 sofar.tx_range.end, commit.tx_range.start,
605 ),
606 )
607 .into());
608 }
609 sofar.tx_range.end = commit.tx_range.end;
610 sofar.size_in_bytes += commit.size_in_bytes;
611 sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
613 sofar.max_commit_offset = commit.tx_range.start;
614 }
615
616 Ok(sofar)
617 }
618
619 fn find_valid_indexed_commit<R: io::Read + io::Seek>(
627 min_tx_offset: u64,
628 header: Header,
629 reader: &mut R,
630 offset_index: &TxOffsetIndex,
631 ) -> io::Result<Metadata> {
632 let mut candidate_last_key = TxOffset::MAX;
633
634 while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
635 match Self::validate_commit_at_offset(reader, key, byte_offset) {
636 Ok(commit) => {
637 return Ok(Metadata {
638 header,
639 tx_range: Range {
640 start: min_tx_offset,
641 end: commit.tx_range.end,
642 },
643 size_in_bytes: byte_offset + commit.size_in_bytes,
644 max_epoch: commit.epoch,
645 max_commit_offset: commit.tx_range.start,
646 });
647 }
648
649 Err(_) => {
651 candidate_last_key = key.saturating_sub(1);
652 if candidate_last_key == 0 {
653 break;
654 }
655 }
656 }
657 }
658
659 Err(io::Error::new(
660 ErrorKind::InvalidData,
661 format!("No valid commit found in index up to key: {}", candidate_last_key),
662 ))
663 }
664
665 fn validate_commit_at_offset<R: io::Read + io::Seek>(
671 reader: &mut R,
672 tx_offset: TxOffset,
673 byte_offset: u64,
674 ) -> io::Result<commit::Metadata> {
675 reader.seek(SeekFrom::Start(byte_offset))?;
676 let commit = commit::Metadata::extract(reader)?
677 .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
678
679 if commit.tx_range.start != tx_offset {
680 return Err(io::Error::new(
681 ErrorKind::InvalidData,
682 format!(
683 "mismatch key in index offset file: expected={} actual={}",
684 tx_offset, commit.tx_range.start
685 ),
686 ));
687 }
688
689 Ok(commit)
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use std::num::NonZeroU16;
696
697 use super::*;
698 use crate::{payload::ArrayDecoder, repo, Options};
699 use itertools::Itertools;
700 use pretty_assertions::assert_matches;
701 use proptest::prelude::*;
702 use spacetimedb_paths::server::CommitLogDir;
703 use tempfile::tempdir;
704
705 #[test]
706 fn header_roundtrip() {
707 let hdr = Header {
708 log_format_version: 42,
709 checksum_algorithm: 7,
710 };
711
712 let mut buf = [0u8; Header::LEN];
713 hdr.write(&mut &mut buf[..]).unwrap();
714 let h2 = Header::decode(&buf[..]).unwrap();
715
716 assert_eq!(hdr, h2);
717 }
718
719 #[test]
720 fn write_read_roundtrip() {
721 let repo = repo::Memory::default();
722
723 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
724 writer.append([0; 32]).unwrap();
725 writer.append([1; 32]).unwrap();
726 writer.append([2; 32]).unwrap();
727 writer.commit().unwrap();
728
729 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
730 let header = reader.header;
731 let commit = reader
732 .commits()
733 .next()
734 .expect("expected one commit")
735 .expect("unexpected IO");
736
737 assert_eq!(
738 header,
739 Header {
740 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
741 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
742 }
743 );
744 assert_eq!(commit.min_tx_offset, 0);
745 assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
746 }
747
748 #[test]
749 fn metadata() {
750 let repo = repo::Memory::default();
751
752 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
753 writer.append([0; 32]).unwrap();
755 writer.append([0; 32]).unwrap();
756 writer.commit().unwrap();
757 writer.append([1; 32]).unwrap();
759 writer.commit().unwrap();
760 writer.append([2; 32]).unwrap();
762 writer.append([2; 32]).unwrap();
763 writer.commit().unwrap();
764
765 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
766 let metadata = reader.metadata().unwrap();
767
768 assert_eq!(
769 metadata,
770 Metadata {
771 header: Header::default(),
772 tx_range: Range { start: 0, end: 5 },
773 size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
775 max_epoch: Commit::DEFAULT_EPOCH,
776 max_commit_offset: 3
777 }
778 );
779 }
780
781 #[test]
782 fn commits() {
783 let repo = repo::Memory::default();
784 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
785
786 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
787 for commit in &commits {
788 for tx in commit {
789 writer.append(*tx).unwrap();
790 }
791 writer.commit().unwrap();
792 }
793
794 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
795 let mut commits1 = Vec::with_capacity(commits.len());
796 let mut min_tx_offset = 0;
797 for txs in commits {
798 commits1.push(Commit {
799 min_tx_offset,
800 n: txs.len() as u16,
801 records: txs.concat(),
802 epoch: 0,
803 });
804 min_tx_offset += txs.len() as u64;
805 }
806 let commits2 = reader
807 .commits()
808 .map_ok(Into::into)
809 .collect::<Result<Vec<Commit>, _>>()
810 .unwrap();
811 assert_eq!(commits1, commits2);
812 }
813
814 #[test]
815 fn transactions() {
816 let repo = repo::Memory::default();
817 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
818
819 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
820 for commit in &commits {
821 for tx in commit {
822 writer.append(*tx).unwrap();
823 }
824 writer.commit().unwrap();
825 }
826
827 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
828 let txs = reader
829 .transactions(&ArrayDecoder)
830 .collect::<Result<Vec<_>, _>>()
831 .unwrap();
832 assert_eq!(
833 txs,
834 commits
835 .into_iter()
836 .flatten()
837 .enumerate()
838 .map(|(offset, txdata)| Transaction {
839 offset: offset as u64,
840 txdata
841 })
842 .collect::<Vec<_>>()
843 );
844 }
845
846 proptest! {
847 #[test]
848 fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
849 let mut writer = Writer {
850 commit: Commit::default(),
851 inner: BufWriter::new(Vec::new()),
852
853 min_tx_offset: 0,
854 bytes_written: 0,
855
856 max_records_in_commit,
857
858 offset_index_head: None,
859 };
860
861 for i in 0..max_records_in_commit.get() {
862 assert!(
863 writer.append([0; 16]).is_ok(),
864 "less than {} records written: {}",
865 max_records_in_commit.get(),
866 i
867 );
868 }
869 assert!(
870 writer.append([0; 16]).is_err(),
871 "more than {} records written",
872 max_records_in_commit.get()
873 );
874 }
875 }
876
877 #[test]
878 fn next_tx_offset() {
879 let mut writer = Writer {
880 commit: Commit::default(),
881 inner: BufWriter::new(Vec::new()),
882
883 min_tx_offset: 0,
884 bytes_written: 0,
885
886 max_records_in_commit: NonZeroU16::MAX,
887 offset_index_head: None,
888 };
889
890 assert_eq!(0, writer.next_tx_offset());
891 writer.append([0; 16]).unwrap();
892 assert_eq!(0, writer.next_tx_offset());
893 writer.commit().unwrap();
894 assert_eq!(1, writer.next_tx_offset());
895 writer.commit().unwrap();
896 assert_eq!(1, writer.next_tx_offset());
897 writer.append([1; 16]).unwrap();
898 writer.append([1; 16]).unwrap();
899 writer.commit().unwrap();
900 assert_eq!(3, writer.next_tx_offset());
901 }
902
903 #[test]
904 fn offset_index_writer_truncates_to_offset() {
905 use spacetimedb_paths::FromPathUnchecked as _;
906
907 let tmp = tempdir().unwrap();
908 let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
909 let index_path = commitlog_dir.index(0);
910 let mut writer = OffsetIndexWriter::new(
911 TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
912 Options {
913 offset_index_interval_bytes: 127.try_into().unwrap(),
915 offset_index_require_segment_fsync: false,
916 ..Default::default()
917 },
918 );
919
920 for i in 1..=10 {
921 writer.append_after_commit(i, i * 128, 128).unwrap();
922 }
923 for i in 1..=10 {
925 assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
926 }
927
928 for truncate_to in (2..=10u64).rev() {
931 let retained_key = truncate_to.saturating_sub(1).min(10);
932 let retained_val = retained_key * 128;
933 let retained = (retained_key, retained_val);
934
935 writer.ftruncate(truncate_to, rand::random()).unwrap();
936 assert_matches!(
937 writer.head.key_lookup(truncate_to),
938 Ok(x) if x == retained,
939 "truncate to {truncate_to} should retain {retained:?}"
940 );
941 let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
943 assert_matches!(
944 index.key_lookup(truncate_to),
945 Ok(x) if x == retained,
946 "truncate to {truncate_to} should retain {retained:?} after reopen"
947 );
948 }
949
950 writer.ftruncate(1, rand::random()).unwrap();
952 assert_matches!(writer.head.key_lookup(1), Err(IndexError::KeyNotFound));
953 }
954}