1use std::{io, marker::PhantomData, mem, ops::Range, vec};
2
3use itertools::Itertools;
4use log::{debug, info, trace, warn};
5
6use crate::{
7 commit::StoredCommit,
8 error,
9 payload::Decoder,
10 repo::{self, Repo},
11 segment::{self, FileLike, Transaction, Writer},
12 Commit, Encode, Options,
13};
14
15pub use crate::segment::Committed;
16
17#[derive(Debug)]
20pub struct Generic<R: Repo, T> {
21 pub(crate) repo: R,
23 pub(crate) head: Writer<R::Segment>,
28 tail: Vec<u64>,
36 opts: Options,
38 _record: PhantomData<T>,
40 panicked: bool,
46}
47
48impl<R: Repo, T> Generic<R, T> {
49 pub fn open(repo: R, opts: Options) -> io::Result<Self> {
50 let mut tail = repo.existing_offsets()?;
51 if !tail.is_empty() {
52 debug!("segments: {tail:?}");
53 }
54 let head = if let Some(last) = tail.pop() {
55 debug!("resuming last segment: {last}");
56 repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
57 tail.push(meta.tx_range.start);
58 repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
59 })?
60 } else {
61 debug!("starting fresh log");
62 repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
63 };
64
65 Ok(Self {
66 repo,
67 head,
68 tail,
69 opts,
70 _record: PhantomData,
71 panicked: false,
72 })
73 }
74
75 pub fn epoch(&self) -> u64 {
79 self.head.commit.epoch
80 }
81
82 pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
96 use std::cmp::Ordering::*;
97
98 match epoch.cmp(&self.head.epoch()) {
99 Less => Err(io::Error::new(
100 io::ErrorKind::InvalidInput,
101 "new epoch is smaller than current epoch",
102 )),
103 Equal => Ok(None),
104 Greater => {
105 let res = self.commit()?;
106 self.head.set_epoch(epoch);
107 Ok(res)
108 }
109 }
110 }
111
112 pub fn commit(&mut self) -> io::Result<Option<Committed>> {
130 self.panicked = true;
131 let writer = &mut self.head;
132 let sz = writer.commit.encoded_len();
133 let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
137 let writer = if should_rotate {
138 self.sync();
139 self.start_new_segment()?
140 } else {
141 writer
142 };
143
144 let ret = writer.commit().or_else(|e| {
145 warn!("Commit failed: {e}");
146 self.start_new_segment()?;
149 Err(e)
150 });
151 self.panicked = false;
152 ret
153 }
154
155 pub fn sync(&mut self) {
165 self.panicked = true;
166 if let Err(e) = self.head.fsync() {
167 panic!("Failed to fsync segment: {e}");
168 }
169 self.panicked = false;
170 }
171
172 pub fn max_committed_offset(&self) -> Option<u64> {
178 self.head.next_tx_offset().checked_sub(1)
183 }
184
185 fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
194 if offset >= self.head.min_tx_offset {
195 vec![self.head.min_tx_offset]
196 } else {
197 let mut offs = Vec::with_capacity(self.tail.len() + 1);
198 if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
199 offs.extend_from_slice(&self.tail[pos..]);
200 offs.push(self.head.min_tx_offset);
201 }
202
203 offs
204 }
205 }
206
207 pub fn commits_from(&self, offset: u64) -> Commits<R> {
208 let offsets = self.segment_offsets_from(offset);
209 let segments = Segments {
210 offs: offsets.into_iter(),
211 repo: self.repo.clone(),
212 max_log_format_version: self.opts.log_format_version,
213 };
214 Commits {
215 inner: None,
216 segments,
217 last_commit: CommitInfo::Initial { next_offset: offset },
218 last_error: None,
219 }
220 }
221
222 pub fn reset(mut self) -> io::Result<Self> {
223 info!("hard reset");
224
225 self.panicked = true;
226 self.tail.reserve(1);
227 self.tail.push(self.head.min_tx_offset);
228 for segment in self.tail.iter().rev() {
229 debug!("removing segment {segment}");
230 self.repo.remove_segment(*segment)?;
231 }
232 Self::open(self.repo.clone(), self.opts)
235 }
236
237 pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
238 info!("reset to {offset}");
239
240 self.panicked = true;
241 self.tail.reserve(1);
242 self.tail.push(self.head.min_tx_offset);
243 for segment in self.tail.iter().rev() {
244 let segment = *segment;
245 if segment > offset {
246 debug!("removing segment {segment}");
248 self.repo.remove_segment(segment)?;
249 } else {
250 let reader = repo::open_segment_reader(&self.repo, self.opts.log_format_version, segment)?;
252 let commits = reader.commits();
253
254 let mut bytes_read = 0;
255 for commit in commits {
256 let commit = commit?;
257 if commit.min_tx_offset > offset {
258 break;
259 }
260 bytes_read += Commit::from(commit).encoded_len() as u64;
261 }
262
263 if bytes_read == 0 {
264 self.repo.remove_segment(segment)?;
266 } else {
267 let byte_offset = segment::Header::LEN as u64 + bytes_read;
268 debug!("truncating segment {segment} to {offset} at {byte_offset}");
269 let mut file = self.repo.open_segment(segment)?;
270 file.ftruncate(offset + 1, byte_offset)?;
274 file.fsync()?;
276 break;
277 }
278 }
279 }
280 Self::open(self.repo.clone(), self.opts)
283 }
284
285 fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::Segment>> {
292 debug!(
293 "starting new segment offset={} prev-offset={}",
294 self.head.next_tx_offset(),
295 self.head.min_tx_offset()
296 );
297 let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
298 let old = mem::replace(&mut self.head, new);
299 self.tail.push(old.min_tx_offset());
300 self.head.commit = old.commit;
301
302 Ok(&mut self.head)
303 }
304}
305
306impl<R: Repo, T: Encode> Generic<R, T> {
307 pub fn append(&mut self, record: T) -> Result<(), T> {
308 self.head.append(record)
309 }
310
311 pub fn transactions_from<'a, D>(
312 &self,
313 offset: u64,
314 decoder: &'a D,
315 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
316 where
317 D: Decoder<Record = T>,
318 D::Error: From<error::Traversal>,
319 R: 'a,
320 T: 'a,
321 {
322 transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
323 }
324
325 pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
326 where
327 D: Decoder,
328 D::Error: From<error::Traversal>,
329 {
330 fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
331 }
332}
333
334impl<R: Repo, T> Drop for Generic<R, T> {
335 fn drop(&mut self) {
336 if !self.panicked {
337 if let Err(e) = self.head.commit() {
338 warn!("failed to commit on drop: {e}");
339 }
340 }
341 }
342}
343
344pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
345 let mut offsets = repo.existing_offsets()?;
346 if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
347 offsets = offsets.split_off(pos);
348 }
349 let segments = Segments {
350 offs: offsets.into_iter(),
351 repo,
352 max_log_format_version,
353 };
354 Ok(Commits {
355 inner: None,
356 segments,
357 last_commit: CommitInfo::Initial { next_offset: offset },
358 last_error: None,
359 })
360}
361
362pub fn transactions_from<'a, R, D, T>(
363 repo: R,
364 max_log_format_version: u8,
365 offset: u64,
366 de: &'a D,
367) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
368where
369 R: Repo + 'a,
370 D: Decoder<Record = T>,
371 D::Error: From<error::Traversal>,
372 T: 'a,
373{
374 commits_from(repo, max_log_format_version, offset)
375 .map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
376}
377
378pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
379where
380 R: Repo,
381 D: Decoder,
382 D::Error: From<error::Traversal> + From<io::Error>,
383{
384 let commits = commits_from(repo, max_log_format_version, offset)?;
385 fold_transactions_internal(commits.with_log_format_version(), de, offset)
386}
387
388fn transactions_from_internal<'a, R, D, T>(
389 commits: CommitsWithVersion<R>,
390 offset: u64,
391 de: &'a D,
392) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
393where
394 R: Repo + 'a,
395 D: Decoder<Record = T>,
396 D::Error: From<error::Traversal>,
397 T: 'a,
398{
399 commits
400 .map(|x| x.map_err(D::Error::from))
401 .map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
402 .flatten_ok()
403 .map(|x| x.and_then(|y| y))
404}
405
406fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
407where
408 R: Repo,
409 D: Decoder,
410 D::Error: From<error::Traversal>,
411{
412 while let Some(commit) = commits.next() {
413 let (version, commit) = match commit {
414 Ok(version_and_commit) => version_and_commit,
415 Err(e) => {
416 if commits.next().is_none() {
422 return Ok(());
423 }
424
425 return Err(e.into());
426 }
427 };
428 trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
429
430 let max_tx_offset = commit.min_tx_offset + commit.n as u64;
431 if max_tx_offset <= from {
432 continue;
433 }
434
435 let records = &mut commit.records.as_slice();
436 for n in 0..commit.n {
437 let tx_offset = commit.min_tx_offset + n as u64;
438 if tx_offset < from {
439 de.skip_record(version, tx_offset, records)?;
440 } else {
441 de.consume_record(version, tx_offset, records)?;
442 }
443 }
444 }
445
446 Ok(())
447}
448
449pub struct Segments<R> {
450 repo: R,
451 offs: vec::IntoIter<u64>,
452 max_log_format_version: u8,
453}
454
455impl<R: Repo> Iterator for Segments<R> {
456 type Item = io::Result<segment::Reader<R::Segment>>;
457
458 fn next(&mut self) -> Option<Self::Item> {
459 let off = self.offs.next()?;
460 debug!("iter segment {off}");
461 Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
462 }
463}
464
465enum CommitInfo {
467 Initial { next_offset: u64 },
470 LastSeen { tx_range: Range<u64>, checksum: u32 },
476}
477
478impl CommitInfo {
479 fn same_offset_as(&self, commit: &StoredCommit) -> bool {
482 let Self::LastSeen { tx_range, .. } = self else {
483 return false;
484 };
485 tx_range.start == commit.min_tx_offset
486 }
487
488 fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
491 let Some(checksum) = self.checksum() else { return false };
492 checksum == &commit.checksum
493 }
494
495 fn checksum(&self) -> Option<&u32> {
496 match self {
497 Self::Initial { .. } => None,
498 Self::LastSeen { checksum, .. } => Some(checksum),
499 }
500 }
501
502 fn expected_offset(&self) -> &u64 {
503 match self {
504 Self::Initial { next_offset } => next_offset,
505 Self::LastSeen { tx_range, .. } => &tx_range.end,
506 }
507 }
508
509 fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
517 if let Self::Initial { next_offset } = self {
518 let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
519 if *next_offset > last_tx_offset {
520 return true;
521 } else {
522 *next_offset = commit.min_tx_offset;
523 }
524 }
525
526 false
527 }
528}
529
530pub struct Commits<R: Repo> {
531 inner: Option<segment::Commits<R::Segment>>,
532 segments: Segments<R>,
533 last_commit: CommitInfo,
534 last_error: Option<error::Traversal>,
535}
536
537impl<R: Repo> Commits<R> {
538 fn current_segment_header(&self) -> Option<&segment::Header> {
539 self.inner.as_ref().map(|segment| &segment.header)
540 }
541
542 pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
545 CommitsWithVersion { inner: self }
546 }
547
548 fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
555 loop {
556 match self.inner.as_mut()?.next()? {
557 Ok(commit) => {
558 let prev_error = self.last_error.take();
561
562 if self.last_commit.adjust_initial_offset(&commit) {
564 trace!("adjust initial offset");
565 continue;
566 } else if self.last_commit.same_offset_as(&commit) {
568 if !self.last_commit.same_checksum_as(&commit) {
569 warn!(
570 "forked: commit={:?} last-error={:?} last-crc={:?}",
571 commit,
572 prev_error,
573 self.last_commit.checksum()
574 );
575 return Some(Err(error::Traversal::Forked {
576 offset: commit.min_tx_offset,
577 }));
578 } else {
579 trace!("ignore duplicate");
580 continue;
581 }
582 } else if self.last_commit.expected_offset() != &commit.min_tx_offset {
584 warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
585 return Some(Err(error::Traversal::OutOfOrder {
586 expected_offset: *self.last_commit.expected_offset(),
587 actual_offset: commit.min_tx_offset,
588 prev_error: prev_error.map(Box::new),
589 }));
590 } else {
592 self.last_commit = CommitInfo::LastSeen {
593 tx_range: commit.tx_range(),
594 checksum: commit.checksum,
595 };
596
597 return Some(Ok(commit));
598 }
599 }
600
601 Err(e) => {
602 warn!("error reading next commit: {e}");
603 self.set_last_error(e);
613
614 return None;
615 }
616 }
617 }
618 }
619
620 fn set_last_error(&mut self, e: io::Error) {
622 let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
624 e.into_inner()
625 .unwrap()
626 .downcast::<error::ChecksumMismatch>()
627 .map(|source| error::Traversal::Checksum {
628 offset: *self.last_commit.expected_offset(),
629 source: *source,
630 })
631 .unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
632 } else {
633 error::Traversal::from(e)
634 };
635 self.last_error = Some(last_error);
636 }
637
638 fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::Segment>) {
641 if let CommitInfo::Initial { next_offset } = &self.last_commit {
642 let _ = self
643 .segments
644 .repo
645 .get_offset_index(segment.min_tx_offset)
646 .map_err(Into::into)
647 .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
648 .inspect_err(|e| {
649 warn!(
650 "commitlog offset index is not used at segment {}: {}",
651 segment.min_tx_offset, e
652 );
653 });
654 }
655 }
656}
657
658impl<R: Repo> Iterator for Commits<R> {
659 type Item = Result<StoredCommit, error::Traversal>;
660
661 fn next(&mut self) -> Option<Self::Item> {
662 if let Some(item) = self.next_commit() {
663 return Some(item);
664 }
665
666 match self.segments.next() {
667 None => self.last_error.take().map(Err),
669 Some(segment) => segment.map_or_else(
670 |e| Some(Err(e.into())),
671 |mut segment| {
672 self.try_seek_to_initial_offset(&mut segment);
673 self.inner = Some(segment.commits());
674 self.next()
675 },
676 ),
677 }
678 }
679}
680
681pub struct CommitsWithVersion<R: Repo> {
682 inner: Commits<R>,
683}
684
685impl<R: Repo> Iterator for CommitsWithVersion<R> {
686 type Item = Result<(u8, Commit), error::Traversal>;
687
688 fn next(&mut self) -> Option<Self::Item> {
689 let next = self.inner.next()?;
690 match next {
691 Ok(commit) => {
692 let version = self
693 .inner
694 .current_segment_header()
695 .map(|hdr| hdr.log_format_version)
696 .expect("segment header none even though segment yielded a commit");
697 Some(Ok((version, commit.into())))
698 }
699 Err(e) => Some(Err(e)),
700 }
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use std::{cell::Cell, iter::repeat};
707
708 use pretty_assertions::assert_matches;
709
710 use super::*;
711 use crate::{
712 payload::{ArrayDecodeError, ArrayDecoder},
713 tests::helpers::{fill_log, mem_log},
714 };
715
716 #[test]
717 fn rotate_segments_simple() {
718 let mut log = mem_log::<[u8; 32]>(128);
719 for _ in 0..3 {
720 log.append([0; 32]).unwrap();
721 log.commit().unwrap();
722 }
723
724 let offsets = log.repo.existing_offsets().unwrap();
725 assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
726 assert_eq!(offsets[offsets.len() - 1], 2);
727 }
728
729 #[test]
730 fn huge_commit() {
731 let mut log = mem_log::<[u8; 32]>(32);
732
733 log.append([0; 32]).unwrap();
734 log.append([1; 32]).unwrap();
735 log.commit().unwrap();
736 assert!(log.head.len() > log.opts.max_segment_size);
737
738 log.append([2; 32]).unwrap();
739 log.commit().unwrap();
740
741 assert_eq!(&log.tail, &[0]);
742 assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
743 }
744
745 #[test]
746 fn traverse_commits() {
747 let mut log = mem_log::<[u8; 32]>(32);
748 fill_log(&mut log, 10, repeat(1));
749
750 for (i, commit) in (0..10).zip(log.commits_from(0)) {
751 assert_eq!(i, commit.unwrap().min_tx_offset);
752 }
753 }
754
755 #[test]
756 fn traverse_commits_with_offset() {
757 let mut log = mem_log::<[u8; 32]>(32);
758 fill_log(&mut log, 10, repeat(1));
759
760 for offset in 0..10 {
761 for commit in log.commits_from(offset) {
762 let commit = commit.unwrap();
763 assert!(commit.min_tx_offset >= offset);
764 }
765 }
766 assert_eq!(0, log.commits_from(10).count());
767 }
768
769 #[test]
770 fn fold_transactions_with_offset() {
771 let mut log = mem_log::<[u8; 32]>(32);
772 fill_log(&mut log, 10, repeat(1));
773
774 struct CountDecoder {
777 count: Cell<u64>,
778 next_tx_offset: Cell<u64>,
779 }
780
781 impl Decoder for &CountDecoder {
782 type Record = [u8; 32];
783 type Error = ArrayDecodeError;
784
785 fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
786 &self,
787 _version: u8,
788 _tx_offset: u64,
789 _reader: &mut R,
790 ) -> Result<Self::Record, Self::Error> {
791 unreachable!("Folding never calls `decode_record`")
792 }
793
794 fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
795 &self,
796 version: u8,
797 tx_offset: u64,
798 reader: &mut R,
799 ) -> Result<(), Self::Error> {
800 let decoder = ArrayDecoder::<32>;
801 decoder.consume_record(version, tx_offset, reader)?;
802 self.count.set(self.count.get() + 1);
803 let expected_tx_offset = self.next_tx_offset.get();
804 assert_eq!(expected_tx_offset, tx_offset);
805 self.next_tx_offset.set(expected_tx_offset + 1);
806 Ok(())
807 }
808
809 fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
810 &self,
811 version: u8,
812 tx_offset: u64,
813 reader: &mut R,
814 ) -> Result<(), Self::Error> {
815 let decoder = ArrayDecoder::<32>;
816 decoder.consume_record(version, tx_offset, reader)?;
817 Ok(())
818 }
819 }
820
821 for offset in 0..10 {
822 let decoder = CountDecoder {
823 count: Cell::new(0),
824 next_tx_offset: Cell::new(offset),
825 };
826
827 log.fold_transactions_from(offset, &decoder).unwrap();
828
829 assert_eq!(decoder.count.get(), 10 - offset);
830 assert_eq!(decoder.next_tx_offset.get(), 10);
831 }
832 }
833
834 #[test]
835 fn traverse_commits_ignores_duplicates() {
836 let mut log = mem_log::<[u8; 32]>(1024);
837
838 log.append([42; 32]).unwrap();
839 let commit1 = log.head.commit.clone();
840 log.commit().unwrap();
841 log.head.commit = commit1.clone();
842 log.commit().unwrap();
843 log.append([43; 32]).unwrap();
844 let commit2 = log.head.commit.clone();
845 log.commit().unwrap();
846
847 assert_eq!(
848 [commit1, commit2].as_slice(),
849 &log.commits_from(0)
850 .map_ok(Commit::from)
851 .collect::<Result<Vec<_>, _>>()
852 .unwrap()
853 );
854 }
855
856 #[test]
857 fn traverse_commits_errors_when_forked() {
858 let mut log = mem_log::<[u8; 32]>(1024);
859
860 log.append([42; 32]).unwrap();
861 log.commit().unwrap();
862 log.head.commit = Commit {
863 min_tx_offset: 0,
864 n: 1,
865 records: [43; 32].to_vec(),
866 epoch: 0,
867 };
868 log.commit().unwrap();
869
870 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
871 assert!(
872 matches!(res, Err(error::Traversal::Forked { offset: 0 })),
873 "expected fork error: {res:?}"
874 )
875 }
876
877 #[test]
878 fn traverse_commits_errors_when_offset_not_contiguous() {
879 let mut log = mem_log::<[u8; 32]>(1024);
880
881 log.append([42; 32]).unwrap();
882 log.commit().unwrap();
883 log.head.commit.min_tx_offset = 18;
884 log.append([42; 32]).unwrap();
885 log.commit().unwrap();
886
887 let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
888 assert!(
889 matches!(
890 res,
891 Err(error::Traversal::OutOfOrder {
892 expected_offset: 1,
893 actual_offset: 18,
894 prev_error: None
895 })
896 ),
897 "expected fork error: {res:?}"
898 )
899 }
900
901 #[test]
902 fn traverse_transactions() {
903 let mut log = mem_log::<[u8; 32]>(32);
904 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
905
906 for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
907 assert_eq!(i, tx.unwrap().offset);
908 }
909 }
910
911 #[test]
912 fn traverse_transactions_with_offset() {
913 let mut log = mem_log::<[u8; 32]>(32);
914 let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
915
916 for offset in 0..total_txs {
917 let mut iter = log.transactions_from(offset, &ArrayDecoder);
918 assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
919 for tx in iter {
920 assert!(tx.unwrap().offset >= offset);
921 }
922 }
923 assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
924 }
925
926 #[test]
927 fn traverse_empty() {
928 let log = mem_log::<[u8; 32]>(32);
929
930 assert_eq!(0, log.commits_from(0).count());
931 assert_eq!(0, log.commits_from(42).count());
932 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
933 assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
934 }
935
936 #[test]
937 fn reset_hard() {
938 let mut log = mem_log::<[u8; 32]>(128);
939 fill_log(&mut log, 50, (1..=10).cycle());
940
941 log = log.reset().unwrap();
942 assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
943 }
944
945 #[test]
946 fn reset_to_offset() {
947 let mut log = mem_log::<[u8; 32]>(128);
948 let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
949
950 for offset in (0..total_txs).rev() {
951 log = log.reset_to(offset).unwrap();
952 assert_eq!(
953 offset,
954 log.transactions_from(0, &ArrayDecoder)
955 .map(Result::unwrap)
956 .last()
957 .unwrap()
958 .offset
959 );
960 assert_eq!(
962 offset + 1,
963 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
964 );
965 }
966 }
967
968 #[test]
969 fn reset_to_offset_many_txs_per_commit() {
970 let mut log = mem_log::<[u8; 32]>(128);
971 let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
972
973 log = log.reset_to(total_txs).unwrap();
975 assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
976
977 let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
978
979 log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
981 assert_eq!(
982 middle_commit.tx_range().end,
983 log.transactions_from(0, &ArrayDecoder).count() as u64
984 );
985 log = log.reset_to(middle_commit.min_tx_offset).unwrap();
986 assert_eq!(
987 middle_commit.tx_range().end,
988 log.transactions_from(0, &ArrayDecoder).count() as u64
989 );
990
991 log = log.reset_to(1).unwrap();
994 assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
995
996 log = log.reset_to(0).unwrap();
999 assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
1000 }
1001
1002 #[test]
1003 fn reopen() {
1004 let mut log = mem_log::<[u8; 32]>(1024);
1005 let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
1006 assert_eq!(
1007 total_txs,
1008 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1009 );
1010
1011 let mut log = Generic::<_, [u8; 32]>::open(
1012 log.repo.clone(),
1013 Options {
1014 max_segment_size: 1024,
1015 ..Options::default()
1016 },
1017 )
1018 .unwrap();
1019 total_txs += fill_log(&mut log, 100, (1..=10).cycle());
1020
1021 assert_eq!(
1022 total_txs,
1023 log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
1024 );
1025 }
1026
1027 #[test]
1028 fn set_same_epoch_does_nothing() {
1029 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1030 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1031 let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
1032 assert_eq!(committed, None);
1033 }
1034
1035 #[test]
1036 fn set_new_epoch_commits() {
1037 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1038 assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
1039 log.append(<_>::default()).unwrap();
1040 let committed = log
1041 .set_epoch(42)
1042 .unwrap()
1043 .expect("should have committed the pending transaction");
1044 assert_eq!(log.epoch(), 42);
1045 assert_eq!(committed.tx_range.start, 0);
1046 }
1047
1048 #[test]
1049 fn set_lower_epoch_returns_error() {
1050 let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
1051 log.set_epoch(42).unwrap();
1052 assert_eq!(log.epoch(), 42);
1053 assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
1054 }
1055}