1use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7 commit::StoredCommit,
8 error::{self, source_chain},
9 index::IndexError,
10 payload::Decoder,
11 repo::{self, Repo, TxOffsetIndex},
12 segment::{self, FileLike, Transaction, Writer},
13 Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
14};
15
16pub use crate::segment::Committed;
17
18#[derive(Debug)]
21pub struct Generic<R: Repo, T> {
22 pub(crate) repo: R,
24 pub(crate) head: Writer<R::SegmentWriter>,
29 tail: Vec<u64>,
37 opts: Options,
39 _record: PhantomData<T>,
41 panicked: bool,
47}
48
49impl<R: Repo, T> Generic<R, T> {
50 pub fn open(repo: R, opts: Options) -> io::Result<Self> {
51 let mut tail = repo.existing_offsets()?;
52 if !tail.is_empty() {
53 debug!("segments: {tail:?}");
54 }
55 let head = if let Some(last) = tail.pop() {
56 debug!("resuming last segment: {last}");
57 repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
58 tail.push(meta.tx_range.start);
59 repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
60 })?
61 } else {
62 debug!("starting fresh log");
63 repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
64 };
65
66 Ok(Self {
67 repo,
68 head,
69 tail,
70 opts,
71 _record: PhantomData,
72 panicked: false,
73 })
74 }
75
76 pub fn epoch(&self) -> u64 {
80 self.head.commit.epoch
81 }
82
83 pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
97 use std::cmp::Ordering::*;
98
99 match epoch.cmp(&self.head.epoch()) {
100 Less => Err(io::Error::new(
101 io::ErrorKind::InvalidInput,
102 "new epoch is smaller than current epoch",
103 )),
104 Equal => Ok(None),
105 Greater => {
106 let res = self.commit()?;
107 self.head.set_epoch(epoch);
108 Ok(res)
109 }
110 }
111 }
112
113 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
131 self.panicked = true;
132 let writer = &mut self.head;
133 let sz = writer.commit.encoded_len();
134 let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
138 let writer = if should_rotate {
139 self.sync();
140 self.start_new_segment()?
141 } else {
142 writer
143 };
144
145 let ret = writer.commit().or_else(|e| {
146 warn!("Commit failed: {e}");
147 self.start_new_segment()?;
150 Err(e)
151 });
152 self.panicked = false;
153 ret
154 }
155
156 pub fn sync(&mut self) {
166 self.panicked = true;
167 if let Err(e) = self.head.fsync() {
168 panic!("Failed to fsync segment: {e}");
169 }
170 self.panicked = false;
171 }
172
173 pub fn max_committed_offset(&self) -> Option<u64> {
179 self.head.next_tx_offset().checked_sub(1)
184 }
185
186 pub fn min_committed_offset(&self) -> Option<u64> {
189 self.tail
190 .first()
191 .copied()
192 .or_else(|| (!self.head.is_empty()).then(|| self.head.min_tx_offset()))
193 }
194
195 fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
204 if offset >= self.head.min_tx_offset {
205 vec![self.head.min_tx_offset]
206 } else {
207 let mut offs = Vec::with_capacity(self.tail.len() + 1);
208 if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
209 offs.extend_from_slice(&self.tail[pos..]);
210 offs.push(self.head.min_tx_offset);
211 }
212
213 offs
214 }
215 }
216
217 pub fn commits_from(&self, offset: u64) -> Commits<R> {
218 let offsets = self.segment_offsets_from(offset);
219 let segments = Segments {
220 offs: offsets.into_iter(),
221 repo: self.repo.clone(),
222 max_log_format_version: self.opts.log_format_version,
223 };
224 Commits {
225 inner: None,
226 segments,
227 last_commit: CommitInfo::Initial { next_offset: offset },
228 last_error: None,
229 }
230 }
231
232 pub fn reset(mut self) -> io::Result<Self> {
233 info!("hard reset");
234
235 self.panicked = true;
236 self.tail.reserve(1);
237 self.tail.push(self.head.min_tx_offset);
238 for segment in self.tail.iter().rev() {
239 debug!("removing segment {segment}");
240 self.repo.remove_segment(*segment)?;
241 }
242 Self::open(self.repo.clone(), self.opts)
245 }
246
247 pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
248 info!("reset to {offset}");
249
250 self.panicked = true;
251 self.tail.reserve(1);
252 self.tail.push(self.head.min_tx_offset);
253 reset_to_internal(&self.repo, &self.tail, offset)?;
254 Self::open(self.repo.clone(), self.opts)
257 }
258
259 fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::SegmentWriter>> {
266 debug!(
267 "starting new segment offset={} prev-offset={}",
268 self.head.next_tx_offset(),
269 self.head.min_tx_offset()
270 );
271 let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
272 let old = mem::replace(&mut self.head, new);
273 self.tail.push(old.min_tx_offset());
274 self.head.commit = old.commit;
275
276 Ok(&mut self.head)
277 }
278}
279
280impl<R: Repo, T: Encode> Generic<R, T> {
281 pub fn append(&mut self, record: T) -> Result<(), T> {
282 self.head.append(record)
283 }
284
285 pub fn transactions_from<'a, D>(
286 &self,
287 offset: u64,
288 decoder: &'a D,
289 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
290 where
291 D: Decoder<Record = T>,
292 D::Error: From<error::Traversal>,
293 R: 'a,
294 T: 'a,
295 {
296 transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
297 }
298
299 pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
300 where
301 D: Decoder,
302 D::Error: From<error::Traversal>,
303 {
304 fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
305 }
306}
307
308impl<R: Repo, T> Drop for Generic<R, T> {
309 fn drop(&mut self) {
310 if !self.panicked {
311 if let Err(e) = self.head.commit() {
312 warn!("failed to commit on drop: {e}");
313 }
314 }
315 }
316}
317
318pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
341 let Some(last) = repo.existing_offsets()?.pop() else {
342 return Ok(None);
343 };
344
345 let mut storage = repo.open_segment_reader(last)?;
346 let offset_index = repo.get_offset_index(last).ok();
347 segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
348}
349
350pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
351 let mut offsets = repo.existing_offsets()?;
352 if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
353 offsets = offsets.split_off(pos);
354 }
355 let segments = Segments {
356 offs: offsets.into_iter(),
357 repo,
358 max_log_format_version,
359 };
360 Ok(Commits {
361 inner: None,
362 segments,
363 last_commit: CommitInfo::Initial { next_offset: offset },
364 last_error: None,
365 })
366}
367
368pub fn transactions_from<'a, R, D, T>(
369 repo: R,
370 max_log_format_version: u8,
371 offset: u64,
372 de: &'a D,
373) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
374where
375 R: Repo + 'a,
376 D: Decoder<Record = T>,
377 D::Error: From<error::Traversal>,
378 T: 'a,
379{
380 commits_from(repo, max_log_format_version, offset)
381 .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
382}
383
384pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
385where
386 R: Repo,
387 D: Decoder,
388 D::Error: From<error::Traversal> + From<io::Error>,
389{
390 let commits = commits_from(repo, max_log_format_version, offset)?;
391 fold_transactions_internal(commits.with_log_format_version(), de, offset)
392}
393
394fn transactions_from_internal<'a, R, D, T>(
395 commits: CommitsWithVersion<R>,
396 offset: u64,
397 de: &'a D,
398) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
399where
400 R: Repo + 'a,
401 D: Decoder<Record = T>,
402 D::Error: From<error::Traversal>,
403 T: 'a,
404{
405 commits
406 .map(|x| x.map_err(D::Error::from))
407 .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
408 .flatten_ok()
409 .map(|x| x.and_then(|y| y))
410}
411
412fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
413where
414 R: Repo,
415 D: Decoder,
416 D::Error: From<error::Traversal>,
417{
418 while let Some(commit) = commits.next() {
419 let (version, commit) = match commit {
420 Ok(version_and_commit) => version_and_commit,
421 Err(e) => {
422 if commits.next().is_none() {
428 return Ok(());
429 }
430
431 return Err(e.into());
432 }
433 };
434 trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
435
436 let max_tx_offset = commit.min_tx_offset + commit.n as u64;
437 if max_tx_offset <= from {
438 continue;
439 }
440
441 let records = &mut commit.records.as_slice();
442 for n in 0..commit.n {
443 let tx_offset = commit.min_tx_offset + n as u64;
444 if tx_offset < from {
445 de.skip_record(version, tx_offset, records)?;
446 } else {
447 de.consume_record(version, tx_offset, records)?;
448 }
449 }
450 }
451
452 Ok(())
453}
454
455pub fn reset_to(repo: &impl Repo, offset: u64) -> io::Result<()> {
466 let segments = repo.existing_offsets()?;
467 reset_to_internal(repo, &segments, offset)
468}
469
470fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Result<()> {
471 for segment in segments.iter().copied().rev() {
472 if segment > offset {
473 debug!("removing segment {segment}");
475 repo.remove_segment(segment)?;
476 } else {
477 let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
479
480 let (index_file, mut byte_offset) = try_seek_using_offset_index(repo, &mut reader, offset)
481 .map(|(index_file, byte_offset)| (Some(index_file), byte_offset))
482 .unwrap_or((None, segment::Header::LEN as u64));
483
484 let commits = reader.commits();
485
486 for commit in commits {
487 let commit = commit?;
488 if commit.min_tx_offset > offset {
489 break;
490 }
491 byte_offset += Commit::from(commit).encoded_len() as u64;
492 }
493
494 if byte_offset == segment::Header::LEN as u64 {
495 repo.remove_segment(segment)?;
497 } else {
498 debug!("truncating segment {segment} to {offset} at {byte_offset}");
499 let mut file = repo.open_segment_writer(segment)?;
500
501 if let Some(mut index_file) = index_file {
502 let index_file = index_file.as_mut();
503 index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
507 io::Error::new(
508 io::ErrorKind::InvalidData,
509 format!("Failed to truncate offset index: {e}"),
510 )
511 })?;
512 index_file.async_flush()?;
513 }
514
515 file.ftruncate(offset, byte_offset)?;
516 file.fsync()?;
518 break;
519 }
520 }
521 }
522
523 Ok(())
524}
525
526pub struct Segments<R> {
527 repo: R,
528 offs: vec::IntoIter<u64>,
529 max_log_format_version: u8,
530}
531
532impl<R: Repo> Iterator for Segments<R> {
533 type Item = io::Result<segment::Reader<R::SegmentReader>>;
534
535 fn next(&mut self) -> Option<Self::Item> {
536 let off = self.offs.next()?;
537 debug!("iter segment {off}");
538 Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
539 }
540}
541
542enum CommitInfo {
544 Initial { next_offset: u64 },
547 LastSeen { tx_range: Range<u64>, checksum: u32 },
553}
554
555impl CommitInfo {
556 fn same_offset_as(&self, commit: &StoredCommit) -> bool {
559 let Self::LastSeen { tx_range, .. } = self else {
560 return false;
561 };
562 tx_range.start == commit.min_tx_offset
563 }
564
565 fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
568 let Some(checksum) = self.checksum() else { return false };
569 checksum == &commit.checksum
570 }
571
572 fn checksum(&self) -> Option<&u32> {
573 match self {
574 Self::Initial { .. } => None,
575 Self::LastSeen { checksum, .. } => Some(checksum),
576 }
577 }
578
579 fn expected_offset(&self) -> &u64 {
580 match self {
581 Self::Initial { next_offset } => next_offset,
582 Self::LastSeen { tx_range, .. } => &tx_range.end,
583 }
584 }
585
586 fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
594 if let Self::Initial { next_offset } = self {
595 let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
596 if *next_offset > last_tx_offset {
597 return true;
598 } else {
599 *next_offset = commit.min_tx_offset;
600 }
601 }
602
603 false
604 }
605}
606
607pub struct Commits<R: Repo> {
608 inner: Option<segment::Commits<R::SegmentReader>>,
609 segments: Segments<R>,
610 last_commit: CommitInfo,
611 last_error: Option<error::Traversal>,
612}
613
614impl<R: Repo> Commits<R> {
615 fn current_segment_header(&self) -> Option<&segment::Header> {
616 self.inner.as_ref().map(|segment| &segment.header)
617 }
618
619 pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
622 CommitsWithVersion { inner: self }
623 }
624
625 fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
632 loop {
633 match self.inner.as_mut()?.next()? {
634 Ok(commit) => {
635 let prev_error = self.last_error.take();
638
639 if self.last_commit.adjust_initial_offset(&commit) {
641 trace!("adjust initial offset");
642 continue;
643 } else if self.last_commit.same_offset_as(&commit) {
645 if !self.last_commit.same_checksum_as(&commit) {
646 warn!(
647 "forked: commit={:?} last-error={:?} last-crc={:?}",
648 commit,
649 prev_error,
650 self.last_commit.checksum()
651 );
652 return Some(Err(error::Traversal::Forked {
653 offset: commit.min_tx_offset,
654 }));
655 } else {
656 trace!("ignore duplicate");
657 continue;
658 }
659 } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
661 warn!("out-of-order: commit={commit:?} last-error={prev_error:?}");
662 return Some(Err(error::Traversal::OutOfOrder {
663 expected_offset: *self.last_commit.expected_offset(),
664 actual_offset: commit.min_tx_offset,
665 prev_error: prev_error.map(Box::new),
666 }));
667 } else {
669 self.last_commit = CommitInfo::LastSeen {
670 tx_range: commit.tx_range(),
671 checksum: commit.checksum,
672 };
673
674 return Some(Ok(commit));
675 }
676 }
677
678 Err(e) => {
679 warn!("error reading next commit: {e}");
680 self.set_last_error(e);
690
691 return None;
692 }
693 }
694 }
695 }
696
697 fn set_last_error(&mut self, e: io::Error) {
699 let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
701 e.into_inner()
702 .unwrap()
703 .downcast::<error::ChecksumMismatch>()
704 .map(|source| error::Traversal::Checksum {
705 offset: *self.last_commit.expected_offset(),
706 source: *source,
707 })
708 .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
709 } else {
710 error::Traversal::from(e)
711 };
712 self.last_error = Some(last_error);
713 }
714
715 fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::SegmentReader>) {
718 if let CommitInfo::Initial { next_offset } = &self.last_commit {
719 try_seek_using_offset_index(&self.segments.repo, segment, *next_offset);
720 }
721 }
722}
723
724impl<R: Repo> Iterator for Commits<R> {
725 type Item = Result<StoredCommit, error::Traversal>;
726
727 fn next(&mut self) -> Option<Self::Item> {
728 if let Some(item) = self.next_commit() {
729 return Some(item);
730 }
731
732 match self.segments.next() {
733 None => self.last_error.take().map(Err),
735 Some(segment) => segment.map_or_else(
736 |e| Some(Err(e.into())),
737 |mut segment| {
738 self.try_seek_to_initial_offset(&mut segment);
739 self.inner = Some(segment.commits());
740 self.next()
741 },
742 ),
743 }
744 }
745}
746
747pub struct CommitsWithVersion<R: Repo> {
748 inner: Commits<R>,
749}
750
751impl<R: Repo> Iterator for CommitsWithVersion<R> {
752 type Item = Result<(u8, Commit), error::Traversal>;
753
754 fn next(&mut self) -> Option<Self::Item> {
755 let next = self.inner.next()?;
756 match next {
757 Ok(commit) => {
758 let version = self
759 .inner
760 .current_segment_header()
761 .map(|hdr| hdr.log_format_version)
762 .expect("segment header none even though segment yielded a commit");
763 Some(Ok((version, commit.into())))
764 }
765 Err(e) => Some(Err(e)),
766 }
767 }
768}
769
770fn try_seek_using_offset_index<R: Repo>(
775 repo: &R,
776 reader: &mut segment::Reader<R::SegmentReader>,
777 offset: u64,
778) -> Option<(TxOffsetIndex, u64)> {
779 let segment_offset = reader.min_tx_offset;
780 let index = repo
781 .get_offset_index(segment_offset)
782 .inspect_err(|e| {
783 if e.kind() == io::ErrorKind::NotFound {
784 debug!("offset index does not exist segment={segment_offset}");
785 } else {
786 warn!(
787 "error opening offset index segment={segment_offset}: {e} {}",
788 source_chain(&e)
789 );
790 }
791 })
792 .ok()?;
793
794 reader
795 .seek_to_offset(&index, offset)
796 .inspect_err(|e| match e {
797 IndexError::KeyNotFound => {
799 debug!("offset not found segment={segment_offset} offset={offset}");
800 }
801 e => {
802 warn!(
803 "error reading index segment={segment_offset} offset={offset}: {e} {}",
804 source_chain(&e)
805 );
806 }
807 })
808 .ok()
809 .map(|pos| (index, pos))
810}
811
812#[cfg(test)]
813mod tests {
814 use std::{cell::Cell, iter::repeat};
815
816 use pretty_assertions::assert_matches;
817
818 use super::*;
819 use crate::{
820 payload::{ArrayDecodeError, ArrayDecoder},
821 tests::helpers::{fill_log, mem_log},
822 };
823
824 #[test]
825 fn rotate_segments_simple() {
826 let mut log = mem_log::<[u8; 32]>(128);
827 for _ in 0..3 {
828 log.append([0; 32]).unwrap();
829 log.commit().unwrap();
830 }
831
832 let offsets = log.repo.existing_offsets().unwrap();
833 assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
834 assert_eq!(offsets[offsets.len() - 1], 2);
835 }
836
837 #[test]
838 fn huge_commit() {
839 let mut log = mem_log::<[u8; 32]>(32);
840
841 log.append([0; 32]).unwrap();
842 log.append([1; 32]).unwrap();
843 log.commit().unwrap();
844 assert!(log.head.len() > log.opts.max_segment_size);
845
846 log.append([2; 32]).unwrap();
847 log.commit().unwrap();
848
849 assert_eq!(&log.tail, &[0]);
850 assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
851 }
852
853 #[test]
854 fn traverse_commits() {
855 let mut log = mem_log::<[u8; 32]>(32);
856 fill_log(&mut log, 10, repeat(1));
857
858 for (i, commit) in (0..10).zip(log.commits_from(0)) {
859 assert_eq!(i, commit.unwrap().min_tx_offset);
860 }
861 }
862
863 #[test]
864 fn traverse_commits_with_offset() {
865 let mut log = mem_log::<[u8; 32]>(32);
866 fill_log(&mut log, 10, repeat(1));
867
868 for offset in 0..10 {
869 for commit in log.commits_from(offset) {
870 let commit = commit.unwrap();
871 assert!(commit.min_tx_offset >= offset);
872 }
873 }
874 assert_eq!(0, log.commits_from(10).count());
875 }
876
877 #[test]
878 fn fold_transactions_with_offset() {
879 let mut log = mem_log::<[u8; 32]>(32);
880 fill_log(&mut log, 10, repeat(1));
881
882 struct CountDecoder {
885 count: Cell<u64>,
886 next_tx_offset: Cell<u64>,
887 }
888
889 impl Decoder for &CountDecoder {
890 type Record = [u8; 32];
891 type Error = ArrayDecodeError;
892
893 fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
894 &self,
895 _version: u8,
896 _tx_offset: u64,
897 _reader: &mut R,
898 ) -> Result<Self::Record, Self::Error> {
899 unreachable!("Folding never calls `decode_record`")
900 }
901
902 fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
903 &self,
904 version: u8,
905 tx_offset: u64,
906 reader: &mut R,
907 ) -> Result<(), Self::Error> {
908 let decoder = ArrayDecoder::<32>;
909 decoder.consume_record(version, tx_offset, reader)?;
910 self.count.set(self.count.get() + 1);
911 let expected_tx_offset = self.next_tx_offset.get();
912 assert_eq!(expected_tx_offset, tx_offset);
913 self.next_tx_offset.set(expected_tx_offset + 1);
914 Ok(())
915 }
916
917 fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
918 &self,
919 version: u8,
920 tx_offset: u64,
921 reader: &mut R,
922 ) -> Result<(), Self::Error> {
923 let decoder = ArrayDecoder::<32>;
924 decoder.consume_record(version, tx_offset, reader)?;
925 Ok(())
926 }
927 }
928
929 for offset in 0..10 {
930 let decoder = CountDecoder {
931 count: Cell::new(0),
932 next_tx_offset: Cell::new(offset),
933 };
934
935 log.fold_transactions_from(offset, &decoder).unwrap();
936
937 assert_eq!(decoder.count.get(), 10 - offset);
938 assert_eq!(decoder.next_tx_offset.get(), 10);
939 }
940 }
941
942 #[test]
943 fn traverse_commits_ignores_duplicates() {
944 let mut log = mem_log::<[u8; 32]>(1024);
945
946 log.append([42; 32]).unwrap();
947 let commit1 = log.head.commit.clone();
948 log.commit().unwrap();
949 log.head.commit = commit1.clone();
950 log.commit().unwrap();
951 log.append([43; 32]).unwrap();
952 let commit2 = log.head.commit.clone();
953 log.commit().unwrap();
954
955 assert_eq!(
956 [commit1, commit2].as_slice(),
957 &log.commits_from(0)
958 .map_ok(Commit::from)
959 .collect::<Result<Vec<_>, _>>()
960 .unwrap()
961 );
962 }
963
964 #[test]
965 fn traverse_commits_errors_when_forked() {
966 let mut log = mem_log::<[u8; 32]>(1024);
967
968 log.append([42; 32]).unwrap();
969 log.commit().unwrap();
970 log.head.commit = Commit {
971 min_tx_offset: 0,
972 n: 1,
973 records: [43; 32].to_vec(),
974 epoch: 0,
975 };
976 log.commit().unwrap();
977
978 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
979 assert!(
980 matches!(res, Err(error::Traversal::Forked { offset: 0 })),
981 "expected fork error: {res:?}"
982 )
983 }
984
985 #[test]
986 fn traverse_commits_errors_when_offset_not_contiguous() {
987 let mut log = mem_log::<[u8; 32]>(1024);
988
989 log.append([42; 32]).unwrap();
990 log.commit().unwrap();
991 log.head.commit.min_tx_offset = 18;
992 log.append([42; 32]).unwrap();
993 log.commit().unwrap();
994
995 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
996 assert!(
997 matches!(
998 res,
999 Err(error::Traversal::OutOfOrder {
1000 expected_offset: 1,
1001 actual_offset: 18,
1002 prev_error: None
1003 })
1004 ),
1005 "expected fork error: {res:?}"
1006 )
1007 }
1008
1009 #[test]
1010 fn traverse_transactions() {
1011 let mut log = mem_log::<[u8; 32]>(32);
1012 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
1013
1014 for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
1015 assert_eq!(i, tx.unwrap().offset);
1016 }
1017 }
1018
1019 #[test]
1020 fn traverse_transactions_with_offset() {
1021 let mut log = mem_log::<[u8; 32]>(32);
1022 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
1023
1024 for offset in 0..total_txs {
1025 let mut iter = log.transactions_from(offset, &ArrayDecoder);
1026 assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
1027 for tx in iter {
1028 assert!(tx.unwrap().offset >= offset);
1029 }
1030 }
1031 assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
1032 }
1033
1034 #[test]
1035 fn traverse_empty() {
1036 let log = mem_log::<[u8; 32]>(32);
1037
1038 assert_eq!(0, log.commits_from(0).count());
1039 assert_eq!(0, log.commits_from(42).count());
1040 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1041 assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
1042 }
1043
1044 #[test]
1045 fn reset_hard() {
1046 let mut log = mem_log::<[u8; 32]>(128);
1047 fill_log(&mut log, 50, (1..=10).cycle());
1048
1049 log = log.reset().unwrap();
1050 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1051 }
1052
1053 #[test]
1054 fn reset_to_offset() {
1055 let mut log = mem_log::<[u8; 32]>(128);
1056 let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
1057
1058 for offset in (0..total_txs).rev() {
1059 log = log.reset_to(offset).unwrap();
1060 assert_eq!(
1061 offset,
1062 log.transactions_from(0, &ArrayDecoder)
1063 .map(Result::unwrap)
1064 .last()
1065 .unwrap()
1066 .offset
1067 );
1068 assert_eq!(
1070 offset + 1,
1071 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
1072 );
1073 }
1074 }
1075
1076 #[test]
1077 fn reset_to_offset_many_txs_per_commit() {
1078 let mut log = mem_log::<[u8; 32]>(128);
1079 let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
1080
1081 log = log.reset_to(total_txs).unwrap();
1083 assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
1084
1085 let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
1086
1087 log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
1089 assert_eq!(
1090 middle_commit.tx_range().end,
1091 log.transactions_from(0, &ArrayDecoder).count() as u64
1092 );
1093 log = log.reset_to(middle_commit.min_tx_offset).unwrap();
1094 assert_eq!(
1095 middle_commit.tx_range().end,
1096 log.transactions_from(0, &ArrayDecoder).count() as u64
1097 );
1098
1099 log = log.reset_to(1).unwrap();
1102 assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
1103
1104 log = log.reset_to(0).unwrap();
1107 assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1108 }
1109
1110 #[test]
1111 fn reopen() {
1112 let mut log = mem_log::<[u8; 32]>(1024);
1113 let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1114 assert_eq!(
1115 total_txs,
1116 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1117 );
1118
1119 let mut log = Generic::<_, [u8; 32]>::open(
1120 log.repo.clone(),
1121 Options {
1122 max_segment_size: 1024,
1123 ..Options::default()
1124 },
1125 )
1126 .unwrap();
1127 total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1128
1129 assert_eq!(
1130 total_txs,
1131 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1132 );
1133 }
1134
1135 #[test]
1136 fn set_same_epoch_does_nothing() {
1137 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1138 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1139 let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1140 assert_eq!(committed, None);
1141 }
1142
1143 #[test]
1144 fn set_new_epoch_commits() {
1145 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1146 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1147 log.append(<_>::default()).unwrap();
1148 let committed = log
1149 .set_epoch(42)
1150 .unwrap()
1151 .expect("should have committed the pending transaction");
1152 assert_eq!(log.epoch(), 42);
1153 assert_eq!(committed.tx_range.start, 0);
1154 }
1155
1156 #[test]
1157 fn set_lower_epoch_returns_error() {
1158 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1159 log.set_epoch(42).unwrap();
1160 assert_eq!(log.epoch(), 42);
1161 assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1162 }
1163}