1use std::{
2 fs::File,
3 io::{self, BufWriter, ErrorKind, SeekFrom, Write as _},
4 num::{NonZeroU16, NonZeroU64},
5 ops::Range,
6};
7
8use log::{debug, warn};
9
10use crate::{
11 commit::{self, Commit, StoredCommit},
12 error,
13 index::IndexError,
14 payload::Encode,
15 repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
16 Options,
17};
18
19pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
20
21pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
22pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
23
24pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
25
26#[derive(Clone, Copy, Debug, Eq, PartialEq)]
27pub struct Header {
28 pub log_format_version: u8,
29 pub checksum_algorithm: u8,
30}
31
32impl Header {
33 pub const LEN: usize = MAGIC.len() + 4;
34
35 pub fn write<W: io::Write>(&self, mut out: W) -> io::Result<()> {
36 out.write_all(&MAGIC)?;
37 out.write_all(&[self.log_format_version, self.checksum_algorithm, 0, 0])?;
38
39 Ok(())
40 }
41
42 pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
43 let mut buf = [0; Self::LEN];
44 read.read_exact(&mut buf)?;
45
46 if !buf.starts_with(&MAGIC) {
47 return Err(io::Error::new(
48 io::ErrorKind::InvalidData,
49 "segment header does not start with magic",
50 ));
51 }
52
53 Ok(Self {
54 log_format_version: buf[MAGIC.len()],
55 checksum_algorithm: buf[MAGIC.len() + 1],
56 })
57 }
58
59 pub fn ensure_compatible(&self, max_log_format_version: u8, checksum_algorithm: u8) -> Result<(), String> {
60 if self.log_format_version > max_log_format_version {
61 return Err(format!("unsupported log format version: {}", self.log_format_version));
62 }
63 if self.checksum_algorithm != checksum_algorithm {
64 return Err(format!("unsupported checksum algorithm: {}", self.checksum_algorithm));
65 }
66
67 Ok(())
68 }
69}
70
71impl Default for Header {
72 fn default() -> Self {
73 Self {
74 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
75 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM,
76 }
77 }
78}
79
80#[derive(Debug, PartialEq)]
82pub struct Committed {
83 pub tx_range: Range<u64>,
85 pub checksum: u32,
88}
89
90#[derive(Debug)]
91pub struct Writer<W: io::Write> {
92 pub(crate) commit: Commit,
93 pub(crate) inner: BufWriter<W>,
94
95 pub(crate) min_tx_offset: u64,
96 pub(crate) bytes_written: u64,
97
98 pub(crate) max_records_in_commit: NonZeroU16,
99
100 pub(crate) offset_index_head: Option<OffsetIndexWriter>,
101}
102
103impl<W: io::Write> Writer<W> {
104 pub fn append<T: Encode>(&mut self, record: T) -> Result<(), T> {
115 if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() {
116 Err(record)
117 } else {
118 self.commit.n += 1;
119 record.encode_record(&mut self.commit.records);
120 Ok(())
121 }
122 }
123
124 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
132 if self.commit.n == 0 {
133 return Ok(None);
134 }
135 let checksum = self.commit.write(&mut self.inner)?;
136 self.inner.flush()?;
137
138 let commit_len = self.commit.encoded_len() as u64;
139 self.offset_index_head.as_mut().map(|index| {
140 index
141 .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len)
142 .map_err(|e| {
143 debug!("failed to append to offset index: {:?}", e);
144 })
145 });
146
147 let tx_range_start = self.commit.min_tx_offset;
148
149 self.bytes_written += commit_len;
150 self.commit.min_tx_offset += self.commit.n as u64;
151 self.commit.n = 0;
152 self.commit.records.clear();
153
154 Ok(Some(Committed {
155 tx_range: tx_range_start..self.commit.min_tx_offset,
156 checksum,
157 }))
158 }
159
160 pub fn epoch(&self) -> u64 {
162 self.commit.epoch
163 }
164
165 pub fn set_epoch(&mut self, epoch: u64) {
173 self.commit.epoch = epoch;
174 }
175
176 pub fn min_tx_offset(&self) -> u64 {
178 self.min_tx_offset
179 }
180
181 pub fn next_tx_offset(&self) -> u64 {
183 self.commit.min_tx_offset
184 }
185
186 pub fn is_empty(&self) -> bool {
191 self.bytes_written <= Header::LEN as u64
192 }
193
194 pub fn len(&self) -> u64 {
196 self.bytes_written
197 }
198}
199
200pub trait FileLike {
201 fn fsync(&mut self) -> io::Result<()>;
202 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>;
203}
204
205impl FileLike for File {
206 fn fsync(&mut self) -> io::Result<()> {
207 self.sync_data()
208 }
209
210 fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> {
211 self.set_len(size)
212 }
213}
214
215impl<W: io::Write + FileLike> FileLike for BufWriter<W> {
216 fn fsync(&mut self) -> io::Result<()> {
217 self.get_mut().fsync()
218 }
219
220 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
221 self.get_mut().ftruncate(tx_offset, size)
222 }
223}
224
225impl<W: io::Write + FileLike> FileLike for Writer<W> {
226 fn fsync(&mut self) -> io::Result<()> {
227 self.inner.fsync()?;
228 self.offset_index_head.as_mut().map(|index| index.fsync());
229 Ok(())
230 }
231
232 fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
233 self.inner.ftruncate(tx_offset, size)?;
234 self.offset_index_head
235 .as_mut()
236 .map(|index| index.ftruncate(tx_offset, size));
237 Ok(())
238 }
239}
240
241#[derive(Debug)]
242pub struct OffsetIndexWriter {
243 pub(crate) head: TxOffsetIndexMut,
244
245 require_segment_fsync: bool,
246 min_write_interval: NonZeroU64,
247
248 pub(crate) candidate_min_tx_offset: TxOffset,
249 pub(crate) candidate_byte_offset: u64,
250 pub(crate) bytes_since_last_index: u64,
251}
252
253impl OffsetIndexWriter {
254 pub fn new(head: TxOffsetIndexMut, opts: Options) -> Self {
255 OffsetIndexWriter {
256 head,
257 require_segment_fsync: opts.offset_index_require_segment_fsync,
258 min_write_interval: opts.offset_index_interval_bytes,
259 candidate_min_tx_offset: TxOffset::default(),
260 candidate_byte_offset: 0,
261 bytes_since_last_index: 0,
262 }
263 }
264
265 fn reset(&mut self) {
266 self.candidate_byte_offset = 0;
267 self.candidate_min_tx_offset = TxOffset::default();
268 self.bytes_since_last_index = 0;
269 }
270
271 pub fn append_after_commit(
273 &mut self,
274 min_tx_offset: TxOffset,
275 byte_offset: u64,
276 commit_len: u64,
277 ) -> Result<(), IndexError> {
278 self.bytes_since_last_index += commit_len;
279
280 if self.candidate_min_tx_offset == 0 {
281 self.candidate_byte_offset = byte_offset;
282 self.candidate_min_tx_offset = min_tx_offset;
283 }
284
285 if !self.require_segment_fsync {
286 self.append_internal()?;
287 }
288
289 Ok(())
290 }
291
292 fn append_internal(&mut self) -> Result<(), IndexError> {
293 if self.candidate_min_tx_offset == 0 {
295 return Ok(());
296 }
297
298 if self.bytes_since_last_index < self.min_write_interval.get() {
299 return Ok(());
300 }
301
302 self.head
303 .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
304 self.head.async_flush()?;
305 self.reset();
306
307 Ok(())
308 }
309}
310
311impl FileLike for OffsetIndexWriter {
312 fn fsync(&mut self) -> io::Result<()> {
314 let _ = self.append_internal().map_err(|e| {
315 warn!("failed to append to offset index: {e:?}");
316 });
317 let _ = self
318 .head
319 .async_flush()
320 .map_err(|e| warn!("failed to flush offset index: {e:?}"));
321 Ok(())
322 }
323
324 fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
325 self.reset();
326 let _ = self.head.truncate(tx_offset);
327 Ok(())
328 }
329}
330
331#[derive(Debug)]
332pub struct Reader<R> {
333 pub header: Header,
334 pub min_tx_offset: u64,
335 inner: R,
336}
337
338impl<R: io::Read + io::Seek> Reader<R> {
339 pub fn new(max_log_format_version: u8, min_tx_offset: u64, mut inner: R) -> io::Result<Self> {
340 let header = Header::decode(&mut inner)?;
341 header
342 .ensure_compatible(max_log_format_version, Commit::CHECKSUM_ALGORITHM)
343 .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
344
345 Ok(Self {
346 header,
347 min_tx_offset,
348 inner,
349 })
350 }
351}
352
353impl<R: io::Read + io::Seek> Reader<R> {
354 pub fn commits(self) -> Commits<R> {
355 Commits {
356 header: self.header,
357 reader: io::BufReader::new(self.inner),
358 }
359 }
360
361 pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> {
362 seek_to_offset(&mut self.inner, index_file, start_tx_offset)
363 }
364
365 #[cfg(test)]
366 pub fn transactions<'a, D>(self, de: &'a D) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + 'a
367 where
368 D: crate::Decoder,
369 D::Error: From<io::Error>,
370 R: 'a,
371 {
372 use itertools::Itertools as _;
373
374 self.commits()
375 .with_log_format_version()
376 .map(|x| x.map_err(Into::into))
377 .map_ok(move |(version, commit)| {
378 let start = commit.min_tx_offset;
379 commit.into_transactions(version, start, de)
380 })
381 .flatten_ok()
382 .map(|x| x.and_then(|y| y))
383 }
384
385 #[cfg(test)]
386 pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
387 Metadata::with_header(self.min_tx_offset, self.header, io::BufReader::new(self.inner))
388 }
389}
390
391pub fn seek_to_offset<R: io::Read + io::Seek>(
399 mut segment: &mut R,
400 index_file: &TxOffsetIndex,
401 start_tx_offset: u64,
402) -> Result<(), IndexError> {
403 let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
404
405 if index_key == 0 {
407 return Ok(());
408 }
409 debug!("index lookup for key={start_tx_offset}: found key={index_key} at byte-offset={byte_offset}");
410 debug_assert!(index_key <= start_tx_offset);
412
413 validate_commit_header(&mut segment, byte_offset).map(|hdr| {
415 if hdr.min_tx_offset == index_key {
416 segment
418 .seek(SeekFrom::Start(byte_offset))
419 .map(|_| ())
420 .map_err(Into::into)
421 } else {
422 Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into())
423 }
424 })?
425}
426
427pub fn validate_commit_header<Reader: io::Read + io::Seek>(
430 mut reader: &mut Reader,
431 byte_offset: u64,
432) -> io::Result<commit::Header> {
433 let pos = reader.stream_position()?;
434 reader.seek(SeekFrom::Start(byte_offset))?;
435
436 let hdr = commit::Header::decode(&mut reader)
437 .and_then(|hdr| hdr.ok_or_else(|| io::Error::new(ErrorKind::UnexpectedEof, "unexpected EOF")));
438
439 reader.seek(SeekFrom::Start(pos))?;
441
442 hdr
443}
444
445#[derive(Debug, PartialEq)]
450pub struct Transaction<T> {
451 pub offset: u64,
453 pub txdata: T,
455}
456
457pub struct Commits<R> {
458 pub header: Header,
459 reader: io::BufReader<R>,
460}
461
462impl<R: io::Read> Iterator for Commits<R> {
463 type Item = io::Result<StoredCommit>;
464
465 fn next(&mut self) -> Option<Self::Item> {
466 StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
467 }
468}
469
470#[cfg(test)]
471impl<R: io::Read> Commits<R> {
472 pub fn with_log_format_version(self) -> impl Iterator<Item = io::Result<(u8, StoredCommit)>> {
473 CommitsWithVersion { inner: self }
474 }
475}
476
477#[cfg(test)]
478struct CommitsWithVersion<R> {
479 inner: Commits<R>,
480}
481
482#[cfg(test)]
483impl<R: io::Read> Iterator for CommitsWithVersion<R> {
484 type Item = io::Result<(u8, StoredCommit)>;
485
486 fn next(&mut self) -> Option<Self::Item> {
487 let next = self.inner.next()?;
488 match next {
489 Ok(commit) => {
490 let version = self.inner.header.log_format_version;
491 Some(Ok((version, commit)))
492 }
493 Err(e) => Some(Err(e)),
494 }
495 }
496}
497
498#[derive(Clone, Debug, Eq, PartialEq)]
499pub struct Metadata {
500 pub header: Header,
501 pub tx_range: Range<u64>,
502 pub size_in_bytes: u64,
503 pub max_epoch: u64,
504}
505
506impl Metadata {
507 pub(crate) fn extract<R: io::Read>(min_tx_offset: u64, mut reader: R) -> Result<Self, error::SegmentMetadata> {
513 let header = Header::decode(&mut reader)?;
514 Self::with_header(min_tx_offset, header, reader)
515 }
516
517 fn with_header<R: io::Read>(
518 min_tx_offset: u64,
519 header: Header,
520 mut reader: R,
521 ) -> Result<Self, error::SegmentMetadata> {
522 let mut sofar = Self {
523 header,
524 tx_range: Range {
525 start: min_tx_offset,
526 end: min_tx_offset,
527 },
528 size_in_bytes: Header::LEN as u64,
529 max_epoch: Commit::DEFAULT_EPOCH,
530 };
531
532 fn commit_meta<R: io::Read>(
533 reader: &mut R,
534 sofar: &Metadata,
535 ) -> Result<Option<commit::Metadata>, error::SegmentMetadata> {
536 commit::Metadata::extract(reader).map_err(|e| {
537 if e.kind() == io::ErrorKind::InvalidData {
538 error::SegmentMetadata::InvalidCommit {
539 sofar: sofar.clone(),
540 source: e,
541 }
542 } else {
543 e.into()
544 }
545 })
546 }
547 while let Some(commit) = commit_meta(&mut reader, &sofar)? {
548 debug!("commit::{commit:?}");
549 if commit.tx_range.start != sofar.tx_range.end {
550 return Err(io::Error::new(
551 io::ErrorKind::InvalidData,
552 format!(
553 "out-of-order offset: expected={} actual={}",
554 sofar.tx_range.end, commit.tx_range.start,
555 ),
556 )
557 .into());
558 }
559 sofar.tx_range.end = commit.tx_range.end;
560 sofar.size_in_bytes += commit.size_in_bytes;
561 sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
563 }
564
565 Ok(sofar)
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use std::num::NonZeroU16;
572
573 use super::*;
574 use crate::{payload::ArrayDecoder, repo, Options};
575 use itertools::Itertools;
576 use proptest::prelude::*;
577 use rand::thread_rng;
578 use spacetimedb_paths::server::CommitLogDir;
579 use tempfile::tempdir;
580
581 #[test]
582 fn header_roundtrip() {
583 let hdr = Header {
584 log_format_version: 42,
585 checksum_algorithm: 7,
586 };
587
588 let mut buf = [0u8; Header::LEN];
589 hdr.write(&mut &mut buf[..]).unwrap();
590 let h2 = Header::decode(&buf[..]).unwrap();
591
592 assert_eq!(hdr, h2);
593 }
594
595 #[test]
596 fn write_read_roundtrip() {
597 let repo = repo::Memory::default();
598
599 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
600 writer.append([0; 32]).unwrap();
601 writer.append([1; 32]).unwrap();
602 writer.append([2; 32]).unwrap();
603 writer.commit().unwrap();
604
605 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
606 let header = reader.header;
607 let commit = reader
608 .commits()
609 .next()
610 .expect("expected one commit")
611 .expect("unexpected IO");
612
613 assert_eq!(
614 header,
615 Header {
616 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
617 checksum_algorithm: DEFAULT_CHECKSUM_ALGORITHM
618 }
619 );
620 assert_eq!(commit.min_tx_offset, 0);
621 assert_eq!(commit.records, [[0; 32], [1; 32], [2; 32]].concat());
622 }
623
624 #[test]
625 fn metadata() {
626 let repo = repo::Memory::default();
627
628 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
629 writer.append([0; 32]).unwrap();
630 writer.append([0; 32]).unwrap();
631 writer.commit().unwrap();
632 writer.append([1; 32]).unwrap();
633 writer.commit().unwrap();
634 writer.append([2; 32]).unwrap();
635 writer.append([2; 32]).unwrap();
636 writer.commit().unwrap();
637
638 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
639 let Metadata {
640 header: _,
641 tx_range,
642 size_in_bytes,
643 max_epoch: _,
644 } = reader.metadata().unwrap();
645
646 assert_eq!(tx_range.start, 0);
647 assert_eq!(tx_range.end, 5);
648 assert_eq!(
649 size_in_bytes,
650 (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64
651 );
652 }
653
654 #[test]
655 fn commits() {
656 let repo = repo::Memory::default();
657 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
658
659 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
660 for commit in &commits {
661 for tx in commit {
662 writer.append(*tx).unwrap();
663 }
664 writer.commit().unwrap();
665 }
666
667 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
668 let mut commits1 = Vec::with_capacity(commits.len());
669 let mut min_tx_offset = 0;
670 for txs in commits {
671 commits1.push(Commit {
672 min_tx_offset,
673 n: txs.len() as u16,
674 records: txs.concat(),
675 epoch: 0,
676 });
677 min_tx_offset += txs.len() as u64;
678 }
679 let commits2 = reader
680 .commits()
681 .map_ok(Into::into)
682 .collect::<Result<Vec<Commit>, _>>()
683 .unwrap();
684 assert_eq!(commits1, commits2);
685 }
686
687 #[test]
688 fn transactions() {
689 let repo = repo::Memory::default();
690 let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
691
692 let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
693 for commit in &commits {
694 for tx in commit {
695 writer.append(*tx).unwrap();
696 }
697 writer.commit().unwrap();
698 }
699
700 let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
701 let txs = reader
702 .transactions(&ArrayDecoder)
703 .collect::<Result<Vec<_>, _>>()
704 .unwrap();
705 assert_eq!(
706 txs,
707 commits
708 .into_iter()
709 .flatten()
710 .enumerate()
711 .map(|(offset, txdata)| Transaction {
712 offset: offset as u64,
713 txdata
714 })
715 .collect::<Vec<_>>()
716 );
717 }
718
719 proptest! {
720 #[test]
721 fn max_records_in_commit(max_records_in_commit in any::<NonZeroU16>()) {
722 let mut writer = Writer {
723 commit: Commit::default(),
724 inner: BufWriter::new(Vec::new()),
725
726 min_tx_offset: 0,
727 bytes_written: 0,
728
729 max_records_in_commit,
730
731 offset_index_head: None,
732 };
733
734 for i in 0..max_records_in_commit.get() {
735 assert!(
736 writer.append([0; 16]).is_ok(),
737 "less than {} records written: {}",
738 max_records_in_commit.get(),
739 i
740 );
741 }
742 assert!(
743 writer.append([0; 16]).is_err(),
744 "more than {} records written",
745 max_records_in_commit.get()
746 );
747 }
748 }
749
750 #[test]
751 fn next_tx_offset() {
752 let mut writer = Writer {
753 commit: Commit::default(),
754 inner: BufWriter::new(Vec::new()),
755
756 min_tx_offset: 0,
757 bytes_written: 0,
758
759 max_records_in_commit: NonZeroU16::MAX,
760 offset_index_head: None,
761 };
762
763 assert_eq!(0, writer.next_tx_offset());
764 writer.append([0; 16]).unwrap();
765 assert_eq!(0, writer.next_tx_offset());
766 writer.commit().unwrap();
767 assert_eq!(1, writer.next_tx_offset());
768 writer.commit().unwrap();
769 assert_eq!(1, writer.next_tx_offset());
770 writer.append([1; 16]).unwrap();
771 writer.append([1; 16]).unwrap();
772 writer.commit().unwrap();
773 assert_eq!(3, writer.next_tx_offset());
774 }
775
776 #[test]
777 fn offset_index_writer_truncates_to_offset() {
778 use spacetimedb_paths::FromPathUnchecked as _;
779
780 let tmp = tempdir().unwrap();
781 let commitlog_dir = CommitLogDir::from_path_unchecked(tmp.path());
782 let index_path = commitlog_dir.index(0);
783 let mut writer = OffsetIndexWriter::new(
784 TxOffsetIndexMut::create_index_file(&index_path, 100).unwrap(),
785 Options {
786 offset_index_interval_bytes: 127.try_into().unwrap(),
788 offset_index_require_segment_fsync: false,
789 ..Default::default()
790 },
791 );
792
793 for i in 1..=10 {
794 writer.append_after_commit(i, i * 128, 128).unwrap();
795 }
796 for i in 1..=10 {
798 assert_eq!(writer.head.key_lookup(i).unwrap(), (i, i * 128));
799 }
800
801 let mut rng = thread_rng();
802
803 let truncate_to: TxOffset = rng.gen_range(1..=32);
806 let retained_key = truncate_to.saturating_sub(1).min(10);
807 let retained_val = retained_key * 128;
808 let retained = (retained_key, retained_val);
809
810 writer.ftruncate(truncate_to, rng.gen()).unwrap();
811 assert_eq!(writer.head.key_lookup(truncate_to).unwrap(), retained);
812 drop(writer);
814 let index = TxOffsetIndex::open_index_file(&index_path).unwrap();
815 assert_eq!(index.key_lookup(truncate_to).unwrap(), retained);
816 }
817}