1use std::{
2 fs::File,
3 io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4 num::{NonZeroU16, NonZeroU64},
5 ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11 commit::{self, Commit, StoredCommit},
12 error,
13 index::{IndexError, IndexFileMut},
14 payload::Encode,
15 repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16 Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25pub const CHECKSUM_CRC32C_LEN: usize = 4;
26
27pub const CHECKSUM_LEN: [usize; 1] = [CHECKSUM_CRC32C_LEN];
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
32pub struct Header {
33 pub log_format_version: u8,
34 pub checksum_algorithm: u8,
35}
36
37impl Header {
38 pub const LEN: usize = MAGIC.len() + 4;
39
40 pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
41 out.write_all(&MAGIC)?;
42 out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
43
44 Ok(())
45 }
46
47 pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
48 let mut buf = [0; Self::LEN];
49 read.read_exact(&mut buf)?;
50
51 if !buf.starts_with(&MAGIC) {
52 return Err(io::Error::new(
53 io::ErrorKind::InvalidData,
54 "segment header does not start with magic",
55 ));
56 }
57
58 Ok(Self {
59 log_format_version: buf[MAGIC.len()],
60 checksum_algorithm: buf[MAGIC.len() + 1],
61 })
62 }
63
64 pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
65 if self.log_format_version > max_log_format_version {
66 return Err(format!("unsupported log format version: {}", self.log_format_version));
67 }
68 if self.checksum_algorithm != checksum_algorithm {
69 return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
70 }
71
72 Ok(())
73 }
74}
75
76impl Default for Header {
77 fn default() -> Self {
78 Self {
79 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
81 }
82 }
83}
84
85#[derive(Debug, PartialEq)]
87pub struct Committed {
88 pub tx_range: Range<u64>,
90 pub checksum: u32,
93}
94
95#[derive(Debug)]
96pub struct Writer<W: io::Write> {
97 pub(crate) commit: Commit,
98 pub(crate) inner: BufWriter<W>,
99
100 pub(crate) min_tx_offset: u64,
101 pub(crate) bytes_written: u64,
102
103 pub(crate) max_records_in_commit: NonZeroU16,
104
105 pub(crate) offset_index_head: Option<OffsetIndexWriter>,
106}
107
108impl<W: io::Write> Writer<W> {
109 pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
120 if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
121 Err(record)
122 } else {
123 self.commit.n += 1;
124 record.encode_record(&mut self.commit.records);
125 Ok(())
126 }
127 }
128
129 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
137 if self.commit.n == 0 {
138 return Ok(None);
139 }
140 let checksum = self.commit.write(&mut self.inner)?;
141 self.inner.flush()?;
142
143 let commit_len = self.commit.encoded_len() as u64;
144 self.offset_index_head.as_mut().map(|index| {
145 debug!(
146 "append_after commit min_tx_offset={} bytes_written={} commit_len={}",
147 self.commit.min_tx_offset, self.bytes_written, commit_len
148 );
149 index
150 .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
151 .map_err(|e| {
152 debug!("failed to append to offset index: {:?}", e);
153 })
154 });
155
156 let tx_range_start = self.commit.min_tx_offset;
157
158 self.bytes_written += commit_len;
159 self.commit.min_tx_offset += self.commit.n as u64;
160 self.commit.n = 0;
161 self.commit.records.clear();
162
163 Ok(Some(Committed {
164 tx_range: tx_range_start..self.commit.min_tx_offset,
165 checksum,
166 }))
167 }
168
169 pub fn epoch(&self) -> u64 {
171 self.commit.epoch
172 }
173
174 pub fn set_epoch(&mut self, epoch: u64) {
182 self.commit.epoch = epoch;
183 }
184
185 pub fn min_tx_offset(&self) -> u64 {
187 self.min_tx_offset
188 }
189
190 pub fn next_tx_offset(&self) -> u64 {
192 self.commit.min_tx_offset
193 }
194
195 pub fn is_empty(&self) -> bool {
200 self.bytes_written <= Header::LEN as u64
201 }
202
203 pub fn len(&self) -> u64 {
205 self.bytes_written
206 }
207}
208
209pub trait FileLike {
210 fn fsync(&mut self) -> io::Result<()>;
211 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
212}
213
214impl FileLike for File {
215 fn fsync(&mut self) -> io::Result<()> {
216 self.sync_data()
217 }
218
219 fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
220 self.set_len(size)
221 }
222}
223
224impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
225 fn fsync(&mut self) -> io::Result<()> {
226 self.get_mut().fsync()
227 }
228
229 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
230 self.get_mut().ftruncate(tx_offset, size)
231 }
232}
233
234impl<W: io::Write + FileLike> FileLike for Writer<W> {
235 fn fsync(&mut self) -> io::Result<()> {
236 self.inner.fsync()?;
237 self.offset_index_head.as_mut().map(|index| index.fsync());
238 Ok(())
239 }
240
241 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
242 self.inner.ftruncate(tx_offset, size)?;
243 self.offset_index_head
244 .as_mut()
245 .map(|index| index.ftruncate(tx_offset, size));
246 Ok(())
247 }
248}
249
250#[derive(Debug)]
251pub struct OffsetIndexWriter {
252 pub(crate) head: TxOffsetIndexMut,
253
254 require_segment_fsync: bool,
255 min_write_interval: NonZeroU64,
256
257 pub(crate) candidate_min_tx_offset: TxOffset,
258 pub(crate) candidate_byte_offset: u64,
259 pub(crate) bytes_since_last_index: u64,
260}
261
262impl OffsetIndexWriter {
263 pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
264 OffsetIndexWriter {
265 head,
266 require_segment_fsync: opts.offset_index_require_segment_fsync,
267 min_write_interval: opts.offset_index_interval_bytes,
268 candidate_min_tx_offset: TxOffset::default(),
269 candidate_byte_offset: 0,
270 bytes_since_last_index: 0,
271 }
272 }
273
274 fn reset(&mut self) {
275 self.candidate_byte_offset = 0;
276 self.candidate_min_tx_offset = TxOffset::default();
277 self.bytes_since_last_index = 0;
278 }
279
280 pub fn append_after_commit(
282 &mut self,
283 min_tx_offset: TxOffset,
284 byte_offset: u64,
285 commit_len: u64,
286 ) -> Result<(), IndexError> {
287 self.bytes_since_last_index += commit_len;
288
289 if self.candidate_min_tx_offset == 0 {
290 self.candidate_byte_offset = byte_offset;
291 self.candidate_min_tx_offset = min_tx_offset;
292 }
293
294 if !self.require_segment_fsync {
295 self.append_internal()?;
296 }
297
298 Ok(())
299 }
300
301 fn append_internal(&mut self) -> Result<(), IndexError> {
302 if self.candidate_min_tx_offset == 0 {
304 return Ok(());
305 }
306
307 if self.bytes_since_last_index < self.min_write_interval.get() {
308 return Ok(());
309 }
310
311 self.head
312 .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
313 self.head.async_flush()?;
314 self.reset();
315
316 Ok(())
317 }
318}
319
320impl FileLike for OffsetIndexWriter {
321 fn fsync(&mut self) -> io::Result<()> {
323 let _ = self.append_internal().map_err(|e| {
324 warn!("failed to append to offset index: {e:?}");
325 });
326 let _ = self
327 .head
328 .async_flush()
329 .map_err(|e| warn!("failed to flush offset index: {e:?}"));
330 Ok(())
331 }
332
333 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334 self.reset();
335 self.head
336 .truncate(tx_offset)
337 .inspect_err(|e| {
338 warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339 })
340 .ok();
341 Ok(())
342 }
343}
344
345impl FileLike for IndexFileMut<TxOffset> {
346 fn fsync(&mut self) -> io::Result<()> {
347 self.async_flush()
348 }
349
350 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351 self.truncate(tx_offset).map_err(|e| {
352 io::Error::new(
353 ErrorKind::Other,
354 format!("failed to truncate offset index at {tx_offset}: {e:?}"),
355 )
356 })
357 }
358}
359
360#[derive(Debug)]
361pub struct Reader<R> {
362 pub header: Header,
363 pub min_tx_offset: u64,
364 inner: R,
365}
366
367impl<R: io::Read + io::Seek> Reader<R> {
368 pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
369 let header = Header::decode(&mut inner)?;
370 header
371 .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
372 .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
373
374 Ok(Self {
375 header,
376 min_tx_offset,
377 inner,
378 })
379 }
380}
381
382impl<R: io::BufRead + io::Seek> Reader<R> {
383 pub fn commits(self) -> Commits<R> {
384 Commits {
385 header: self.header,
386 reader: self.inner,
387 }
388 }
389
390 pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> {
391 seek_to_offset(&mut self.inner, index_file, start_tx_offset)
392 }
393
394 #[cfg(test)]
395 pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
396 where
397 D: crate::Decoder,
398 D::Error: From<io::Error>,
399 R: 'a,
400 {
401 use itertools::Itertools as _;
402
403 self.commits()
404 .with_log_format_version()
405 .map(|x| x.map_err(Into::into))
406 .map_ok(move |(version, commit)| {
407 let start = commit.min_tx_offset;
408 commit.into_transactions(version, start, de)
409 })
410 .flatten_ok()
411 .map(|x| x.and_then(|y| y))
412 }
413
414 #[cfg(test)]
415 pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
416 Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
417 }
418}
419
420pub fn seek_to_offset<R: io::Read + io::Seek>(
428 mut segment: &mut R,
429 index_file: &TxOffsetIndex,
430 start_tx_offset: u64,
431) -> Result<(), IndexError> {
432 let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
433
434 if index_key == 0 {
436 return Ok(());
437 }
438 debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
439 debug_assert!(index_key <= start_tx_offset);
441
442 validate_commit_header(&mut segment, byte_offset).map(|hdr| {
444 if hdr.min_tx_offset == index_key {
445 segment
447 .seek(SeekFrom::Start(byte_offset))
448 .map(|_| ())
449 .map_err(Into::into)
450 } else {
451 Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into())
452 }
453 })?
454}
455
456pub fn validate_commit_header<Reader: io::Read + io::Seek>(
459 mut reader: &mut Reader,
460 byte_offset: u64,
461) -> io::Result<commit::Header> {
462 let pos = reader.stream_position()?;
463 reader.seek(SeekFrom::Start(byte_offset))?;
464
465 let hdr = commit::Header::decode(&mut reader)
466 .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
467
468 reader.seek(SeekFrom::Start(pos))?;
470
471 hdr
472}
473
474#[derive(Debug, PartialEq)]
479pub struct Transaction<T> {
480 pub offset: u64,
482 pub txdata: T,
484}
485
486pub struct Commits<R> {
487 pub header: Header,
488 reader: R,
489}
490
491impl<R: io::BufRead> Iterator for Commits<R> {
492 type Item = io::Result<StoredCommit>;
493
494 fn next(&mut self) -> Option<Self::Item> {
495 StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
496 }
497}
498
499#[cfg(test)]
500impl<R: io::BufRead> Commits<R> {
501 pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
502 CommitsWithVersion { inner: self }
503 }
504}
505
506#[cfg(test)]
507struct CommitsWithVersion<R> {
508 inner: Commits<R>,
509}
510
511#[cfg(test)]
512impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
513 type Item = io::Result<(u8, StoredCommit)>;
514
515 fn next(&mut self) -> Option<Self::Item> {
516 let next = self.inner.next()?;
517 match next {
518 Ok(commit) => {
519 let version = self.inner.header.log_format_version;
520 Some(Ok((version, commit)))
521 }
522 Err(e) => Some(Err(e)),
523 }
524 }
525}
526
527#[derive(Clone, Debug, Eq, PartialEq)]
528pub struct Metadata {
529 pub header: Header,
530 pub tx_range: Range<u64>,
531 pub size_in_bytes: u64,
532 pub max_epoch: u64,
533}
534
535impl Metadata {
536 pub(crate) fn extract<R: io::Read + io::Seek>(
541 min_tx_offset: TxOffset,
542 mut reader: R,
543 offset_index: Option<&TxOffsetIndex>,
544 ) -> Result<Self, error::SegmentMetadata> {
545 let header = Header::decode(&mut reader)?;
546 Self::with_header(min_tx_offset, header, reader, offset_index)
547 }
548
549 fn with_header<R: io::Read + io::Seek>(
550 min_tx_offset: u64,
551 header: Header,
552 mut reader: R,
553 offset_index: Option<&TxOffsetIndex>,
554 ) -> Result<Self, error::SegmentMetadata> {
555 let mut sofar = offset_index
556 .and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
557 .unwrap_or_else(|| Self {
558 header,
559 tx_range: Range {
560 start: min_tx_offset,
561 end: min_tx_offset,
562 },
563 size_in_bytes: Header::LEN as u64,
564 max_epoch: u64::default(),
565 });
566
567 reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
568
569 fn commit_meta<R: io::Read>(
570 reader: &mut R,
571 sofar: &Metadata,
572 ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
573 commit::Metadata::extract(reader).map_err(|e| {
574 if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) {
575 error::SegmentMetadata::InvalidCommit {
576 sofar: sofar.clone(),
577 source: e,
578 }
579 } else {
580 e.into()
581 }
582 })
583 }
584 while let Some(commit) = commit_meta(&mut reader, &sofar)? {
585 debug!("commit::{commit:?}");
586 if commit.tx_range.start != sofar.tx_range.end {
587 return Err(io::Error::new(
588 io::ErrorKind::InvalidData,
589 format!(
590 "out-of-order offset: expected={} actual={}",
591 sofar.tx_range.end, commit.tx_range.start,
592 ),
593 )
594 .into());
595 }
596 sofar.tx_range.end = commit.tx_range.end;
597 sofar.size_in_bytes += commit.size_in_bytes;
598 sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
600 }
601
602 Ok(sofar)
603 }
604
605 fn find_valid_indexed_commit<R: io::Read + io::Seek>(
613 min_tx_offset: u64,
614 header: Header,
615 reader: &mut R,
616 offset_index: &TxOffsetIndex,
617 ) -> io::Result<Metadata> {
618 let mut candidate_last_key = TxOffset::MAX;
619
620 while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
621 match Self::validate_commit_at_offset(reader, key, byte_offset) {
622 Ok(commit) => {
623 return Ok(Metadata {
624 header,
625 tx_range: Range {
626 start: min_tx_offset,
627 end: commit.tx_range.end,
628 },
629 size_in_bytes: byte_offset + commit.size_in_bytes,
630 max_epoch: commit.epoch,
631 });
632 }
633
634 Err(_) => {
636 candidate_last_key = key.saturating_sub(1);
637 if candidate_last_key == 0 {
638 break;
639 }
640 }
641 }
642 }
643
644 Err(io::Error::new(
645 ErrorKind::InvalidData,
646 format!("No valid commit found in index up to key: {}", candidate_last_key),
647 ))
648 }
649
650 fn validate_commit_at_offset<R: io::Read + io::Seek>(
656 reader: &mut R,
657 tx_offset: TxOffset,
658 byte_offset: u64,
659 ) -> io::Result<commit::Metadata> {
660 reader.seek(SeekFrom::Start(byte_offset))?;
661 let commit = commit::Metadata::extract(reader)?
662 .ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
663
664 if commit.tx_range.start != tx_offset {
665 return Err(io::Error::new(
666 ErrorKind::InvalidData,
667 format!(
668 "mismatch key in index offset file: expected={} actual={}",
669 tx_offset, commit.tx_range.start
670 ),
671 ));
672 }
673
674 Ok(commit)
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use std::num::NonZeroU16;
681
682 use super::*;
683 use crate::{payload::ArrayDecoder, repo, Options};
684 use itertools::Itertools;
685 use proptest::prelude::*;
686 use spacetimedb_paths::server::CommitLogDir;
687 use tempfile::tempdir;
688
689 #[test]
690 fn header_roundtrip() {
691 let hdr = Header {
692 log_format_version: 42,
693 checksum_algorithm: 7,
694 };
695
696 let mut buf = [0u8; Header::LEN];
697 hdr.write(&mut &mut buf[..]).unwrap();
698 let h2 = Header::decode(&buf[..]).unwrap();
699
700 assert_eq!(hdr, h2);
701 }
702
703 #[test]
704 fn write_read_roundtrip() {
705 let repo = repo::Memory::default();
706
707 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
708 writer.append([0; 32]).unwrap();
709 writer.append([1; 32]).unwrap();
710 writer.append([2; 32]).unwrap();
711 writer.commit().unwrap();
712
713 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
714 let header = reader.header;
715 let commit = reader
716 .commits()
717 .next()
718 .expect("expected one commit")
719 .expect("unexpected IO");
720
721 assert_eq!(
722 header,
723 Header {
724 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
725 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
726 }
727 );
728 assert_eq!(commit.min_tx_offset, 0);
729 assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
730 }
731
732 #[test]
733 fn metadata() {
734 let repo = repo::Memory::default();
735
736 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
737 writer.append([0; 32]).unwrap();
738 writer.append([0; 32]).unwrap();
739 writer.commit().unwrap();
740 writer.append([1; 32]).unwrap();
741 writer.commit().unwrap();
742 writer.append([2; 32]).unwrap();
743 writer.append([2; 32]).unwrap();
744 writer.commit().unwrap();
745
746 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
747 let Metadata {
748 header: _,
749 tx_range,
750 size_in_bytes,
751 max_epoch: _,
752 } = reader.metadata().unwrap();
753
754 assert_eq!(tx_range.start, 0);
755 assert_eq!(tx_range.end, 5);
756 assert_eq!(
757 size_in_bytes,
758 (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64
759 );
760 }
761
762 #[test]
763 fn commits() {
764 let repo = repo::Memory::default();
765 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
766
767 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
768 for commit in &commits {
769 for tx in commit {
770 writer.append(*tx).unwrap();
771 }
772 writer.commit().unwrap();
773 }
774
775 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
776 let mut commits1 = Vec::with_capacity(commits.len());
777 let mut min_tx_offset = 0;
778 for txs in commits {
779 commits1.push(Commit {
780 min_tx_offset,
781 n: txs.len() as u16,
782 records: txs.concat(),
783 epoch: 0,
784 });
785 min_tx_offset += txs.len() as u64;
786 }
787 let commits2 = reader
788 .commits()
789 .map_ok(Into::into)
790 .collect::<Result<Vec<Commit>, _>>()
791 .unwrap();
792 assert_eq!(commits1, commits2);
793 }
794
795 #[test]
796 fn transactions() {
797 let repo = repo::Memory::default();
798 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
799
800 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
801 for commit in &commits {
802 for tx in commit {
803 writer.append(*tx).unwrap();
804 }
805 writer.commit().unwrap();
806 }
807
808 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
809 let txs = reader
810 .transactions(&ArrayDecoder)
811 .collect::<Result<Vec<_>, _>>()
812 .unwrap();
813 assert_eq!(
814 txs,
815 commits
816 .into_iter()
817 .flatten()
818 .enumerate()
819 .map(|(offset, txdata)| Transaction {
820 offset: offset as u64,
821 txdata
822 })
823 .collect::<Vec<_>>()
824 );
825 }
826
827 proptest! {
828 #[test]
829 fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
830 let mut writer = Writer {
831 commit: Commit::default(),
832 inner: BufWriter::new(Vec::new()),
833
834 min_tx_offset: 0,
835 bytes_written: 0,
836
837 max_records_in_commit,
838
839 offset_index_head: None,
840 };
841
842 for i in 0..max_records_in_commit.get() {
843 assert!(
844 writer.append([0; 16]).is_ok(),
845 "less than {} records written: {}",
846 max_records_in_commit.get(),
847 i
848 );
849 }
850 assert!(
851 writer.append([0; 16]).is_err(),
852 "more than {} records written",
853 max_records_in_commit.get()
854 );
855 }
856 }
857
858 #[test]
859 fn next_tx_offset() {
860 let mut writer = Writer {
861 commit: Commit::default(),
862 inner: BufWriter::new(Vec::new()),
863
864 min_tx_offset: 0,
865 bytes_written: 0,
866
867 max_records_in_commit: NonZeroU16::MAX,
868 offset_index_head: None,
869 };
870
871 assert_eq!(0, writer.next_tx_offset());
872 writer.append([0; 16]).unwrap();
873 assert_eq!(0, writer.next_tx_offset());
874 writer.commit().unwrap();
875 assert_eq!(1, writer.next_tx_offset());
876 writer.commit().unwrap();
877 assert_eq!(1, writer.next_tx_offset());
878 writer.append([1; 16]).unwrap();
879 writer.append([1; 16]).unwrap();
880 writer.commit().unwrap();
881 assert_eq!(3, writer.next_tx_offset());
882 }
883
884 #[test]
885 fn offset_index_writer_truncates_to_offset() {
886 use spacetimedb_paths::FromPathUnchecked as _;
887
888 let tmp = tempdir().unwrap();
889 let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
890 let index_path = commitlog_dir.index(0);
891 let mut writer = OffsetIndexWriter::new(
892 TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
893 Options {
894 offset_index_interval_bytes: 127.try_into().unwrap(),
896 offset_index_require_segment_fsync: false,
897 ..Default::default()
898 },
899 );
900
901 for i in 1..=10 {
902 writer.append_after_commit(i, i * 128, 128).unwrap();
903 }
904 for i in 1..=10 {
906 assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
907 }
908
909 let truncate_to: TxOffset = rand::random_range(1..=32);
912 let retained_key = truncate_to.saturating_sub(1).min(10);
913 let retained_val = retained_key * 128;
914 let retained = (retained_key, retained_val);
915
916 writer.ftruncate(truncate_to, rand::random()).unwrap();
917 assert_eq!(writer.head.key_lookup(truncate_to).unwrap(), retained);
918 drop(writer);
920 let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
921 assert_eq!(index.key_lookup(truncate_to).unwrap(), retained);
922 }
923}