1use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7 commit::StoredCommit,
8 error,
9 payload::Decoder,
10 repo::{self, Repo},
11 segment::{self, FileLike, Transaction, Writer},
12 Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
13};
14
15pub use crate::segment::Committed;
16
17#[derive(Debug)]
20pub struct Generic<R: Repo, T> {
21 pub(crate) repo: R,
23 pub(crate) head: Writer<R::SegmentWriter>,
28 tail: Vec<u64>,
36 opts: Options,
38 _record: PhantomData<T>,
40 panicked: bool,
46}
47
48impl<R: Repo, T> Generic<R, T> {
49 pub fn open(repo: R, opts: Options) -> io::Result<Self> {
50 let mut tail = repo.existing_offsets()?;
51 if !tail.is_empty() {
52 debug!("segments: {tail:?}");
53 }
54 let head = if let Some(last) = tail.pop() {
55 debug!("resuming last segment: {last}");
56 repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
57 tail.push(meta.tx_range.start);
58 repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
59 })?
60 } else {
61 debug!("starting fresh log");
62 repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
63 };
64
65 Ok(Self {
66 repo,
67 head,
68 tail,
69 opts,
70 _record: PhantomData,
71 panicked: false,
72 })
73 }
74
75 pub fn epoch(&self) -> u64 {
79 self.head.commit.epoch
80 }
81
82 pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
96 use std::cmp::Ordering::*;
97
98 match epoch.cmp(&self.head.epoch()) {
99 Less => Err(io::Error::new(
100 io::ErrorKind::InvalidInput,
101 "new epoch is smaller than current epoch",
102 )),
103 Equal => Ok(None),
104 Greater => {
105 let res = self.commit()?;
106 self.head.set_epoch(epoch);
107 Ok(res)
108 }
109 }
110 }
111
112 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
130 self.panicked = true;
131 let writer = &mut self.head;
132 let sz = writer.commit.encoded_len();
133 let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
137 let writer = if should_rotate {
138 self.sync();
139 self.start_new_segment()?
140 } else {
141 writer
142 };
143
144 let ret = writer.commit().or_else(|e| {
145 warn!("Commit failed: {e}");
146 self.start_new_segment()?;
149 Err(e)
150 });
151 self.panicked = false;
152 ret
153 }
154
155 pub fn sync(&mut self) {
165 self.panicked = true;
166 if let Err(e) = self.head.fsync() {
167 panic!("Failed to fsync segment: {e}");
168 }
169 self.panicked = false;
170 }
171
172 pub fn max_committed_offset(&self) -> Option<u64> {
178 self.head.next_tx_offset().checked_sub(1)
183 }
184
185 fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
194 if offset >= self.head.min_tx_offset {
195 vec![self.head.min_tx_offset]
196 } else {
197 let mut offs = Vec::with_capacity(self.tail.len() + 1);
198 if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
199 offs.extend_from_slice(&self.tail[pos..]);
200 offs.push(self.head.min_tx_offset);
201 }
202
203 offs
204 }
205 }
206
207 pub fn commits_from(&self, offset: u64) -> Commits<R> {
208 let offsets = self.segment_offsets_from(offset);
209 let segments = Segments {
210 offs: offsets.into_iter(),
211 repo: self.repo.clone(),
212 max_log_format_version: self.opts.log_format_version,
213 };
214 Commits {
215 inner: None,
216 segments,
217 last_commit: CommitInfo::Initial { next_offset: offset },
218 last_error: None,
219 }
220 }
221
222 pub fn reset(mut self) -> io::Result<Self> {
223 info!("hard reset");
224
225 self.panicked = true;
226 self.tail.reserve(1);
227 self.tail.push(self.head.min_tx_offset);
228 for segment in self.tail.iter().rev() {
229 debug!("removing segment {segment}");
230 self.repo.remove_segment(*segment)?;
231 }
232 Self::open(self.repo.clone(), self.opts)
235 }
236
237 pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
238 info!("reset to {offset}");
239
240 self.panicked = true;
241 self.tail.reserve(1);
242 self.tail.push(self.head.min_tx_offset);
243 reset_to_internal(&self.repo, &self.tail, offset)?;
244 Self::open(self.repo.clone(), self.opts)
247 }
248
249 fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::SegmentWriter>> {
256 debug!(
257 "starting new segment offset={} prev-offset={}",
258 self.head.next_tx_offset(),
259 self.head.min_tx_offset()
260 );
261 let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
262 let old = mem::replace(&mut self.head, new);
263 self.tail.push(old.min_tx_offset());
264 self.head.commit = old.commit;
265
266 Ok(&mut self.head)
267 }
268}
269
270impl<R: Repo, T: Encode> Generic<R, T> {
271 pub fn append(&mut self, record: T) -> Result<(), T> {
272 self.head.append(record)
273 }
274
275 pub fn transactions_from<'a, D>(
276 &self,
277 offset: u64,
278 decoder: &'a D,
279 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
280 where
281 D: Decoder<Record = T>,
282 D::Error: From<error::Traversal>,
283 R: 'a,
284 T: 'a,
285 {
286 transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
287 }
288
289 pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
290 where
291 D: Decoder,
292 D::Error: From<error::Traversal>,
293 {
294 fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
295 }
296}
297
298impl<R: Repo, T> Drop for Generic<R, T> {
299 fn drop(&mut self) {
300 if !self.panicked {
301 if let Err(e) = self.head.commit() {
302 warn!("failed to commit on drop: {e}");
303 }
304 }
305 }
306}
307
308pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
331 let Some(last) = repo.existing_offsets()?.pop() else {
332 return Ok(None);
333 };
334
335 let mut storage = repo.open_segment_reader(last)?;
336 let offset_index = repo.get_offset_index(last).ok();
337 segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
338}
339
340pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
341 let mut offsets = repo.existing_offsets()?;
342 if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
343 offsets = offsets.split_off(pos);
344 }
345 let segments = Segments {
346 offs: offsets.into_iter(),
347 repo,
348 max_log_format_version,
349 };
350 Ok(Commits {
351 inner: None,
352 segments,
353 last_commit: CommitInfo::Initial { next_offset: offset },
354 last_error: None,
355 })
356}
357
358pub fn transactions_from<'a, R, D, T>(
359 repo: R,
360 max_log_format_version: u8,
361 offset: u64,
362 de: &'a D,
363) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
364where
365 R: Repo + 'a,
366 D: Decoder<Record = T>,
367 D::Error: From<error::Traversal>,
368 T: 'a,
369{
370 commits_from(repo, max_log_format_version, offset)
371 .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
372}
373
374pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
375where
376 R: Repo,
377 D: Decoder,
378 D::Error: From<error::Traversal> + From<io::Error>,
379{
380 let commits = commits_from(repo, max_log_format_version, offset)?;
381 fold_transactions_internal(commits.with_log_format_version(), de, offset)
382}
383
384fn transactions_from_internal<'a, R, D, T>(
385 commits: CommitsWithVersion<R>,
386 offset: u64,
387 de: &'a D,
388) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
389where
390 R: Repo + 'a,
391 D: Decoder<Record = T>,
392 D::Error: From<error::Traversal>,
393 T: 'a,
394{
395 commits
396 .map(|x| x.map_err(D::Error::from))
397 .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
398 .flatten_ok()
399 .map(|x| x.and_then(|y| y))
400}
401
402fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
403where
404 R: Repo,
405 D: Decoder,
406 D::Error: From<error::Traversal>,
407{
408 while let Some(commit) = commits.next() {
409 let (version, commit) = match commit {
410 Ok(version_and_commit) => version_and_commit,
411 Err(e) => {
412 if commits.next().is_none() {
418 return Ok(());
419 }
420
421 return Err(e.into());
422 }
423 };
424 trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
425
426 let max_tx_offset = commit.min_tx_offset + commit.n as u64;
427 if max_tx_offset <= from {
428 continue;
429 }
430
431 let records = &mut commit.records.as_slice();
432 for n in 0..commit.n {
433 let tx_offset = commit.min_tx_offset + n as u64;
434 if tx_offset < from {
435 de.skip_record(version, tx_offset, records)?;
436 } else {
437 de.consume_record(version, tx_offset, records)?;
438 }
439 }
440 }
441
442 Ok(())
443}
444
445pub fn reset_to(repo: &impl Repo, offset: u64) -> io::Result<()> {
456 let segments = repo.existing_offsets()?;
457 reset_to_internal(repo, &segments, offset)
458}
459
460fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Result<()> {
461 for segment in segments.iter().copied().rev() {
462 if segment > offset {
463 debug!("removing segment {segment}");
465 repo.remove_segment(segment)?;
466 } else {
467 let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
469
470 let (index_file, mut byte_offset) = repo
471 .get_offset_index(segment)
472 .and_then(|index_file| {
473 let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| {
474 io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}"))
475 })?;
476
477 reader.seek_to_offset(&index_file, key).map_err(|e| {
478 io::Error::new(
479 io::ErrorKind::InvalidData,
480 format!("Offset index is not used at offset {key}: {e}"),
481 )
482 })?;
483
484 Ok((Some(index_file), byte_offset))
485 })
486 .inspect_err(|e| {
487 warn!("commitlog offset index is not used: {e:?}");
488 })
489 .unwrap_or((None, segment::Header::LEN as u64));
490
491 let commits = reader.commits();
492
493 for commit in commits {
494 let commit = commit?;
495 if commit.min_tx_offset > offset {
496 break;
497 }
498 byte_offset += Commit::from(commit).encoded_len() as u64;
499 }
500
501 if byte_offset == segment::Header::LEN as u64 {
502 repo.remove_segment(segment)?;
504 } else {
505 debug!("truncating segment {segment} to {offset} at {byte_offset}");
506 let mut file = repo.open_segment_writer(segment)?;
507
508 if let Some(mut index_file) = index_file {
509 let index_file = index_file.as_mut();
510 index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
514 io::Error::new(
515 io::ErrorKind::InvalidData,
516 format!("Failed to truncate offset index: {e}"),
517 )
518 })?;
519 index_file.async_flush()?;
520 }
521
522 file.ftruncate(offset, byte_offset)?;
523 file.fsync()?;
525 break;
526 }
527 }
528 }
529
530 Ok(())
531}
532
533pub struct Segments<R> {
534 repo: R,
535 offs: vec::IntoIter<u64>,
536 max_log_format_version: u8,
537}
538
539impl<R: Repo> Iterator for Segments<R> {
540 type Item = io::Result<segment::Reader<R::SegmentReader>>;
541
542 fn next(&mut self) -> Option<Self::Item> {
543 let off = self.offs.next()?;
544 debug!("iter segment {off}");
545 Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
546 }
547}
548
549enum CommitInfo {
551 Initial { next_offset: u64 },
554 LastSeen { tx_range: Range<u64>, checksum: u32 },
560}
561
562impl CommitInfo {
563 fn same_offset_as(&self, commit: &StoredCommit) -> bool {
566 let Self::LastSeen { tx_range, .. } = self else {
567 return false;
568 };
569 tx_range.start == commit.min_tx_offset
570 }
571
572 fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
575 let Some(checksum) = self.checksum() else { return false };
576 checksum == &commit.checksum
577 }
578
579 fn checksum(&self) -> Option<&u32> {
580 match self {
581 Self::Initial { .. } => None,
582 Self::LastSeen { checksum, .. } => Some(checksum),
583 }
584 }
585
586 fn expected_offset(&self) -> &u64 {
587 match self {
588 Self::Initial { next_offset } => next_offset,
589 Self::LastSeen { tx_range, .. } => &tx_range.end,
590 }
591 }
592
593 fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
601 if let Self::Initial { next_offset } = self {
602 let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
603 if *next_offset > last_tx_offset {
604 return true;
605 } else {
606 *next_offset = commit.min_tx_offset;
607 }
608 }
609
610 false
611 }
612}
613
614pub struct Commits<R: Repo> {
615 inner: Option<segment::Commits<R::SegmentReader>>,
616 segments: Segments<R>,
617 last_commit: CommitInfo,
618 last_error: Option<error::Traversal>,
619}
620
621impl<R: Repo> Commits<R> {
622 fn current_segment_header(&self) -> Option<&segment::Header> {
623 self.inner.as_ref().map(|segment| &segment.header)
624 }
625
626 pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
629 CommitsWithVersion { inner: self }
630 }
631
632 fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
639 loop {
640 match self.inner.as_mut()?.next()? {
641 Ok(commit) => {
642 let prev_error = self.last_error.take();
645
646 if self.last_commit.adjust_initial_offset(&commit) {
648 trace!("adjust initial offset");
649 continue;
650 } else if self.last_commit.same_offset_as(&commit) {
652 if !self.last_commit.same_checksum_as(&commit) {
653 warn!(
654 "forked: commit={:?} last-error={:?} last-crc={:?}",
655 commit,
656 prev_error,
657 self.last_commit.checksum()
658 );
659 return Some(Err(error::Traversal::Forked {
660 offset: commit.min_tx_offset,
661 }));
662 } else {
663 trace!("ignore duplicate");
664 continue;
665 }
666 } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
668 warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
669 return Some(Err(error::Traversal::OutOfOrder {
670 expected_offset: *self.last_commit.expected_offset(),
671 actual_offset: commit.min_tx_offset,
672 prev_error: prev_error.map(Box::new),
673 }));
674 } else {
676 self.last_commit = CommitInfo::LastSeen {
677 tx_range: commit.tx_range(),
678 checksum: commit.checksum,
679 };
680
681 return Some(Ok(commit));
682 }
683 }
684
685 Err(e) => {
686 warn!("error reading next commit: {e}");
687 self.set_last_error(e);
697
698 return None;
699 }
700 }
701 }
702 }
703
704 fn set_last_error(&mut self, e: io::Error) {
706 let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
708 e.into_inner()
709 .unwrap()
710 .downcast::<error::ChecksumMismatch>()
711 .map(|source| error::Traversal::Checksum {
712 offset: *self.last_commit.expected_offset(),
713 source: *source,
714 })
715 .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
716 } else {
717 error::Traversal::from(e)
718 };
719 self.last_error = Some(last_error);
720 }
721
722 fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::SegmentReader>) {
725 if let CommitInfo::Initial { next_offset } = &self.last_commit {
726 let _ = self
727 .segments
728 .repo
729 .get_offset_index(segment.min_tx_offset)
730 .map_err(Into::into)
731 .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
732 .inspect_err(|e| {
733 warn!(
734 "commitlog offset index is not used at segment {}: {}",
735 segment.min_tx_offset, e
736 );
737 });
738 }
739 }
740}
741
742impl<R: Repo> Iterator for Commits<R> {
743 type Item = Result<StoredCommit, error::Traversal>;
744
745 fn next(&mut self) -> Option<Self::Item> {
746 if let Some(item) = self.next_commit() {
747 return Some(item);
748 }
749
750 match self.segments.next() {
751 None => self.last_error.take().map(Err),
753 Some(segment) => segment.map_or_else(
754 |e| Some(Err(e.into())),
755 |mut segment| {
756 self.try_seek_to_initial_offset(&mut segment);
757 self.inner = Some(segment.commits());
758 self.next()
759 },
760 ),
761 }
762 }
763}
764
765pub struct CommitsWithVersion<R: Repo> {
766 inner: Commits<R>,
767}
768
769impl<R: Repo> Iterator for CommitsWithVersion<R> {
770 type Item = Result<(u8, Commit), error::Traversal>;
771
772 fn next(&mut self) -> Option<Self::Item> {
773 let next = self.inner.next()?;
774 match next {
775 Ok(commit) => {
776 let version = self
777 .inner
778 .current_segment_header()
779 .map(|hdr| hdr.log_format_version)
780 .expect("segment header none even though segment yielded a commit");
781 Some(Ok((version, commit.into())))
782 }
783 Err(e) => Some(Err(e)),
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use std::{cell::Cell, iter::repeat};
791
792 use pretty_assertions::assert_matches;
793
794 use super::*;
795 use crate::{
796 payload::{ArrayDecodeError, ArrayDecoder},
797 tests::helpers::{fill_log, mem_log},
798 };
799
800 #[test]
801 fn rotate_segments_simple() {
802 let mut log = mem_log::<[u8; 32]>(128);
803 for _ in 0..3 {
804 log.append([0; 32]).unwrap();
805 log.commit().unwrap();
806 }
807
808 let offsets = log.repo.existing_offsets().unwrap();
809 assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
810 assert_eq!(offsets[offsets.len() - 1], 2);
811 }
812
813 #[test]
814 fn huge_commit() {
815 let mut log = mem_log::<[u8; 32]>(32);
816
817 log.append([0; 32]).unwrap();
818 log.append([1; 32]).unwrap();
819 log.commit().unwrap();
820 assert!(log.head.len() > log.opts.max_segment_size);
821
822 log.append([2; 32]).unwrap();
823 log.commit().unwrap();
824
825 assert_eq!(&log.tail, &[0]);
826 assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
827 }
828
829 #[test]
830 fn traverse_commits() {
831 let mut log = mem_log::<[u8; 32]>(32);
832 fill_log(&mut log, 10, repeat(1));
833
834 for (i, commit) in (0..10).zip(log.commits_from(0)) {
835 assert_eq!(i, commit.unwrap().min_tx_offset);
836 }
837 }
838
839 #[test]
840 fn traverse_commits_with_offset() {
841 let mut log = mem_log::<[u8; 32]>(32);
842 fill_log(&mut log, 10, repeat(1));
843
844 for offset in 0..10 {
845 for commit in log.commits_from(offset) {
846 let commit = commit.unwrap();
847 assert!(commit.min_tx_offset >= offset);
848 }
849 }
850 assert_eq!(0, log.commits_from(10).count());
851 }
852
853 #[test]
854 fn fold_transactions_with_offset() {
855 let mut log = mem_log::<[u8; 32]>(32);
856 fill_log(&mut log, 10, repeat(1));
857
858 struct CountDecoder {
861 count: Cell<u64>,
862 next_tx_offset: Cell<u64>,
863 }
864
865 impl Decoder for &CountDecoder {
866 type Record = [u8; 32];
867 type Error = ArrayDecodeError;
868
869 fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
870 &self,
871 _version: u8,
872 _tx_offset: u64,
873 _reader: &mut R,
874 ) -> Result<Self::Record, Self::Error> {
875 unreachable!("Folding never calls `decode_record`")
876 }
877
878 fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
879 &self,
880 version: u8,
881 tx_offset: u64,
882 reader: &mut R,
883 ) -> Result<(), Self::Error> {
884 let decoder = ArrayDecoder::<32>;
885 decoder.consume_record(version, tx_offset, reader)?;
886 self.count.set(self.count.get() + 1);
887 let expected_tx_offset = self.next_tx_offset.get();
888 assert_eq!(expected_tx_offset, tx_offset);
889 self.next_tx_offset.set(expected_tx_offset + 1);
890 Ok(())
891 }
892
893 fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
894 &self,
895 version: u8,
896 tx_offset: u64,
897 reader: &mut R,
898 ) -> Result<(), Self::Error> {
899 let decoder = ArrayDecoder::<32>;
900 decoder.consume_record(version, tx_offset, reader)?;
901 Ok(())
902 }
903 }
904
905 for offset in 0..10 {
906 let decoder = CountDecoder {
907 count: Cell::new(0),
908 next_tx_offset: Cell::new(offset),
909 };
910
911 log.fold_transactions_from(offset, &decoder).unwrap();
912
913 assert_eq!(decoder.count.get(), 10 - offset);
914 assert_eq!(decoder.next_tx_offset.get(), 10);
915 }
916 }
917
918 #[test]
919 fn traverse_commits_ignores_duplicates() {
920 let mut log = mem_log::<[u8; 32]>(1024);
921
922 log.append([42; 32]).unwrap();
923 let commit1 = log.head.commit.clone();
924 log.commit().unwrap();
925 log.head.commit = commit1.clone();
926 log.commit().unwrap();
927 log.append([43; 32]).unwrap();
928 let commit2 = log.head.commit.clone();
929 log.commit().unwrap();
930
931 assert_eq!(
932 [commit1, commit2].as_slice(),
933 &log.commits_from(0)
934 .map_ok(Commit::from)
935 .collect::<Result<Vec<_>, _>>()
936 .unwrap()
937 );
938 }
939
940 #[test]
941 fn traverse_commits_errors_when_forked() {
942 let mut log = mem_log::<[u8; 32]>(1024);
943
944 log.append([42; 32]).unwrap();
945 log.commit().unwrap();
946 log.head.commit = Commit {
947 min_tx_offset: 0,
948 n: 1,
949 records: [43; 32].to_vec(),
950 epoch: 0,
951 };
952 log.commit().unwrap();
953
954 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
955 assert!(
956 matches!(res, Err(error::Traversal::Forked { offset: 0 })),
957 "expected fork error: {res:?}"
958 )
959 }
960
961 #[test]
962 fn traverse_commits_errors_when_offset_not_contiguous() {
963 let mut log = mem_log::<[u8; 32]>(1024);
964
965 log.append([42; 32]).unwrap();
966 log.commit().unwrap();
967 log.head.commit.min_tx_offset = 18;
968 log.append([42; 32]).unwrap();
969 log.commit().unwrap();
970
971 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
972 assert!(
973 matches!(
974 res,
975 Err(error::Traversal::OutOfOrder {
976 expected_offset: 1,
977 actual_offset: 18,
978 prev_error: None
979 })
980 ),
981 "expected fork error: {res:?}"
982 )
983 }
984
985 #[test]
986 fn traverse_transactions() {
987 let mut log = mem_log::<[u8; 32]>(32);
988 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
989
990 for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
991 assert_eq!(i, tx.unwrap().offset);
992 }
993 }
994
995 #[test]
996 fn traverse_transactions_with_offset() {
997 let mut log = mem_log::<[u8; 32]>(32);
998 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
999
1000 for offset in 0..total_txs {
1001 let mut iter = log.transactions_from(offset, &ArrayDecoder);
1002 assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
1003 for tx in iter {
1004 assert!(tx.unwrap().offset >= offset);
1005 }
1006 }
1007 assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
1008 }
1009
1010 #[test]
1011 fn traverse_empty() {
1012 let log = mem_log::<[u8; 32]>(32);
1013
1014 assert_eq!(0, log.commits_from(0).count());
1015 assert_eq!(0, log.commits_from(42).count());
1016 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1017 assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
1018 }
1019
1020 #[test]
1021 fn reset_hard() {
1022 let mut log = mem_log::<[u8; 32]>(128);
1023 fill_log(&mut log, 50, (1..=10).cycle());
1024
1025 log = log.reset().unwrap();
1026 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
1027 }
1028
1029 #[test]
1030 fn reset_to_offset() {
1031 let mut log = mem_log::<[u8; 32]>(128);
1032 let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
1033
1034 for offset in (0..total_txs).rev() {
1035 log = log.reset_to(offset).unwrap();
1036 assert_eq!(
1037 offset,
1038 log.transactions_from(0, &ArrayDecoder)
1039 .map(Result::unwrap)
1040 .last()
1041 .unwrap()
1042 .offset
1043 );
1044 assert_eq!(
1046 offset + 1,
1047 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
1048 );
1049 }
1050 }
1051
1052 #[test]
1053 fn reset_to_offset_many_txs_per_commit() {
1054 let mut log = mem_log::<[u8; 32]>(128);
1055 let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
1056
1057 log = log.reset_to(total_txs).unwrap();
1059 assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
1060
1061 let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
1062
1063 log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
1065 assert_eq!(
1066 middle_commit.tx_range().end,
1067 log.transactions_from(0, &ArrayDecoder).count() as u64
1068 );
1069 log = log.reset_to(middle_commit.min_tx_offset).unwrap();
1070 assert_eq!(
1071 middle_commit.tx_range().end,
1072 log.transactions_from(0, &ArrayDecoder).count() as u64
1073 );
1074
1075 log = log.reset_to(1).unwrap();
1078 assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
1079
1080 log = log.reset_to(0).unwrap();
1083 assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1084 }
1085
1086 #[test]
1087 fn reopen() {
1088 let mut log = mem_log::<[u8; 32]>(1024);
1089 let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1090 assert_eq!(
1091 total_txs,
1092 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1093 );
1094
1095 let mut log = Generic::<_, [u8; 32]>::open(
1096 log.repo.clone(),
1097 Options {
1098 max_segment_size: 1024,
1099 ..Options::default()
1100 },
1101 )
1102 .unwrap();
1103 total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1104
1105 assert_eq!(
1106 total_txs,
1107 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1108 );
1109 }
1110
1111 #[test]
1112 fn set_same_epoch_does_nothing() {
1113 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1114 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1115 let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1116 assert_eq!(committed, None);
1117 }
1118
1119 #[test]
1120 fn set_new_epoch_commits() {
1121 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1122 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1123 log.append(<_>::default()).unwrap();
1124 let committed = log
1125 .set_epoch(42)
1126 .unwrap()
1127 .expect("should have committed the pending transaction");
1128 assert_eq!(log.epoch(), 42);
1129 assert_eq!(committed.tx_range.start, 0);
1130 }
1131
1132 #[test]
1133 fn set_lower_epoch_returns_error() {
1134 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1135 log.set_epoch(42).unwrap();
1136 assert_eq!(log.epoch(), 42);
1137 assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1138 }
1139}