use std::{io, marker::PhantomData, mem, ops::Range, vec};
use itertools::Itertools;
use log::{debug, info, trace, warn};
use crate::{
commit::StoredCommit,
error,
payload::Decoder,
repo::{self, Repo},
segment::{self, FileLike, Transaction, Writer},
Commit, Encode, Options,
};
pub use crate::segment::Committed;
#[derive(Debug)]
pub struct Generic<R: Repo, T> {
pub(crate) repo: R,
pub(crate) head: Writer<R::Segment>,
tail: Vec<u64>,
opts: Options,
_record: PhantomData<T>,
panicked: bool,
}
impl<R: Repo, T> Generic<R, T> {
pub fn open(repo: R, opts: Options) -> io::Result<Self> {
let mut tail = repo.existing_offsets()?;
if !tail.is_empty() {
debug!("segments: {tail:?}");
}
let head = if let Some(last) = tail.pop() {
debug!("resuming last segment: {last}");
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
tail.push(meta.tx_range.start);
repo::create_segment_writer(&repo, opts, meta.tx_range.end)
})?
} else {
debug!("starting fresh log");
repo::create_segment_writer(&repo, opts, 0)?
};
Ok(Self {
repo,
head,
tail,
opts,
_record: PhantomData,
panicked: false,
})
}
pub fn commit(&mut self) -> io::Result<Option<Committed>> {
self.panicked = true;
let writer = &mut self.head;
let sz = writer.commit.encoded_len();
let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
let writer = if should_rotate {
self.sync();
self.start_new_segment()?
} else {
writer
};
let ret = writer.commit().or_else(|e| {
warn!("Commit failed: {e}");
self.start_new_segment()?;
Err(e)
});
self.panicked = false;
ret
}
pub fn sync(&mut self) {
self.panicked = true;
if let Err(e) = self.head.fsync() {
panic!("Failed to fsync segment: {e}");
}
self.panicked = false;
}
pub fn max_committed_offset(&self) -> Option<u64> {
self.head.next_tx_offset().checked_sub(1)
}
fn segment_offsets_from(&self, offset: u64) -> Vec<u64> {
if offset >= self.head.min_tx_offset {
vec![self.head.min_tx_offset]
} else {
let mut offs = Vec::with_capacity(self.tail.len() + 1);
if let Some(pos) = self.tail.iter().rposition(|off| off <= &offset) {
offs.extend_from_slice(&self.tail[pos..]);
offs.push(self.head.min_tx_offset);
}
offs
}
}
pub fn commits_from(&self, offset: u64) -> Commits<R> {
let offsets = self.segment_offsets_from(offset);
let segments = Segments {
offs: offsets.into_iter(),
repo: self.repo.clone(),
max_log_format_version: self.opts.log_format_version,
};
Commits {
inner: None,
segments,
last_commit: CommitInfo::Initial { next_offset: offset },
last_error: None,
}
}
pub fn reset(mut self) -> io::Result<Self> {
info!("hard reset");
self.panicked = true;
self.tail.reserve(1);
self.tail.push(self.head.min_tx_offset);
for segment in self.tail.iter().rev() {
debug!("removing segment {segment}");
self.repo.remove_segment(*segment)?;
}
Self::open(self.repo.clone(), self.opts)
}
pub fn reset_to(mut self, offset: u64) -> io::Result<Self> {
info!("reset to {offset}");
self.panicked = true;
self.tail.reserve(1);
self.tail.push(self.head.min_tx_offset);
for segment in self.tail.iter().rev() {
let segment = *segment;
if segment > offset {
debug!("removing segment {segment}");
self.repo.remove_segment(segment)?;
} else {
let reader = repo::open_segment_reader(&self.repo, self.opts.log_format_version, segment)?;
let commits = reader.commits();
let mut bytes_read = 0;
for commit in commits {
let commit = commit?;
if commit.min_tx_offset > offset {
break;
}
bytes_read += Commit::from(commit).encoded_len() as u64;
}
if bytes_read == 0 {
self.repo.remove_segment(segment)?;
} else {
let byte_offset = segment::Header::LEN as u64 + bytes_read;
debug!("truncating segment {segment} to {offset} at {byte_offset}");
let mut file = self.repo.open_segment(segment)?;
file.ftruncate(offset, byte_offset)?;
file.fsync()?;
break;
}
}
}
Self::open(self.repo.clone(), self.opts)
}
fn start_new_segment(&mut self) -> io::Result<&mut Writer<R::Segment>> {
debug!(
"starting new segment offset={} prev-offset={}",
self.head.next_tx_offset(),
self.head.min_tx_offset()
);
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.next_tx_offset())?;
let old = mem::replace(&mut self.head, new);
self.tail.push(old.min_tx_offset());
self.head.commit = old.commit;
Ok(&mut self.head)
}
}
impl<R: Repo, T: Encode> Generic<R, T> {
pub fn append(&mut self, record: T) -> Result<(), T> {
self.head.append(record)
}
pub fn transactions_from<'a, D>(
&self,
offset: u64,
decoder: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
R: 'a,
T: 'a,
{
transactions_from_internal(self.commits_from(offset).with_log_format_version(), offset, decoder)
}
pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
}
}
impl<R: Repo, T> Drop for Generic<R, T> {
fn drop(&mut self) {
if !self.panicked {
if let Err(e) = self.head.commit() {
warn!("failed to commit on drop: {e}");
}
}
}
}
pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
let mut offsets = repo.existing_offsets()?;
if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
offsets = offsets.split_off(pos);
}
let segments = Segments {
offs: offsets.into_iter(),
repo,
max_log_format_version,
};
Ok(Commits {
inner: None,
segments,
last_commit: CommitInfo::Initial { next_offset: offset },
last_error: None,
})
}
pub fn transactions_from<'a, R, D, T>(
repo: R,
max_log_format_version: u8,
offset: u64,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
R: Repo + 'a,
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commits_from(repo, max_log_format_version, offset)
.map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
}
pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
let commits = commits_from(repo, max_log_format_version, offset)?;
fold_transactions_internal(commits.with_log_format_version(), de, offset)
}
fn transactions_from_internal<'a, R, D, T>(
commits: CommitsWithVersion<R>,
offset: u64,
de: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
where
R: Repo + 'a,
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commits
.map(|x| x.map_err(D::Error::from))
.map_ok(move |(version, commit)| commit.into_transactions(version, offset, de))
.flatten_ok()
.map(|x| x.and_then(|y| y))
}
fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal>,
{
while let Some(commit) = commits.next() {
let (version, commit) = match commit {
Ok(version_and_commit) => version_and_commit,
Err(e) => {
if commits.next().is_none() {
return Ok(());
}
return Err(e.into());
}
};
trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);
let max_tx_offset = commit.min_tx_offset + commit.n as u64;
if max_tx_offset <= from {
continue;
}
let records = &mut commit.records.as_slice();
for n in 0..commit.n {
let tx_offset = commit.min_tx_offset + n as u64;
if tx_offset < from {
de.skip_record(version, tx_offset, records)?;
} else {
de.consume_record(version, tx_offset, records)?;
}
}
}
Ok(())
}
pub struct Segments<R> {
repo: R,
offs: vec::IntoIter<u64>,
max_log_format_version: u8,
}
impl<R: Repo> Iterator for Segments<R> {
type Item = io::Result<segment::Reader<R::Segment>>;
fn next(&mut self) -> Option<Self::Item> {
let off = self.offs.next()?;
debug!("iter segment {off}");
Some(repo::open_segment_reader(&self.repo, self.max_log_format_version, off))
}
}
enum CommitInfo {
Initial { next_offset: u64 },
LastSeen { tx_range: Range<u64>, checksum: u32 },
}
impl CommitInfo {
fn same_offset_as(&self, commit: &StoredCommit) -> bool {
let Self::LastSeen { tx_range, .. } = self else {
return false;
};
tx_range.start == commit.min_tx_offset
}
fn same_checksum_as(&self, commit: &StoredCommit) -> bool {
let Some(checksum) = self.checksum() else { return false };
checksum == &commit.checksum
}
fn checksum(&self) -> Option<&u32> {
match self {
Self::Initial { .. } => None,
Self::LastSeen { checksum, .. } => Some(checksum),
}
}
fn expected_offset(&self) -> &u64 {
match self {
Self::Initial { next_offset } => next_offset,
Self::LastSeen { tx_range, .. } => &tx_range.end,
}
}
fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
if let Self::Initial { next_offset } = self {
let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
if *next_offset > last_tx_offset {
return true;
} else {
*next_offset = commit.min_tx_offset;
}
}
false
}
}
pub struct Commits<R: Repo> {
inner: Option<segment::Commits<R::Segment>>,
segments: Segments<R>,
last_commit: CommitInfo,
last_error: Option<error::Traversal>,
}
impl<R: Repo> Commits<R> {
fn current_segment_header(&self) -> Option<&segment::Header> {
self.inner.as_ref().map(|segment| &segment.header)
}
pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
CommitsWithVersion { inner: self }
}
fn next_commit(&mut self) -> Option<Result<StoredCommit, error::Traversal>> {
loop {
match self.inner.as_mut()?.next()? {
Ok(commit) => {
let prev_error = self.last_error.take();
if self.last_commit.adjust_initial_offset(&commit) {
trace!("adjust initial offset");
continue;
} else if self.last_commit.same_offset_as(&commit) {
if !self.last_commit.same_checksum_as(&commit) {
warn!(
"forked: commit={:?} last-error={:?} last-crc={:?}",
commit,
prev_error,
self.last_commit.checksum()
);
return Some(Err(error::Traversal::Forked {
offset: commit.min_tx_offset,
}));
} else {
trace!("ignore duplicate");
continue;
}
} else if self.last_commit.expected_offset() != &commit.min_tx_offset {
warn!("out-of-order: commit={:?} last-error={:?}", commit, prev_error);
return Some(Err(error::Traversal::OutOfOrder {
expected_offset: *self.last_commit.expected_offset(),
actual_offset: commit.min_tx_offset,
prev_error: prev_error.map(Box::new),
}));
} else {
self.last_commit = CommitInfo::LastSeen {
tx_range: commit.tx_range(),
checksum: commit.checksum,
};
return Some(Ok(commit));
}
}
Err(e) => {
warn!("error reading next commit: {e}");
self.set_last_error(e);
return None;
}
}
}
}
fn set_last_error(&mut self, e: io::Error) {
let last_error = if e.kind() == io::ErrorKind::InvalidData && e.get_ref().is_some() {
e.into_inner()
.unwrap()
.downcast::<error::ChecksumMismatch>()
.map(|source| error::Traversal::Checksum {
offset: *self.last_commit.expected_offset(),
source: *source,
})
.unwrap_or_else(|e| io::Error::new(io::ErrorKind::InvalidData, e).into())
} else {
error::Traversal::from(e)
};
self.last_error = Some(last_error);
}
fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::Segment>) {
if let CommitInfo::Initial { next_offset } = &self.last_commit {
let _ = self
.segments
.repo
.get_offset_index(segment.min_tx_offset)
.map_err(Into::into)
.and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
.inspect_err(|e| {
warn!(
"commitlog offset index is not used at segment {}: {}",
segment.min_tx_offset, e
);
});
}
}
}
impl<R: Repo> Iterator for Commits<R> {
type Item = Result<StoredCommit, error::Traversal>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.next_commit() {
return Some(item);
}
match self.segments.next() {
None => self.last_error.take().map(Err),
Some(segment) => segment.map_or_else(
|e| Some(Err(e.into())),
|mut segment| {
self.try_seek_to_initial_offset(&mut segment);
self.inner = Some(segment.commits());
self.next()
},
),
}
}
}
pub struct CommitsWithVersion<R: Repo> {
inner: Commits<R>,
}
impl<R: Repo> Iterator for CommitsWithVersion<R> {
type Item = Result<(u8, Commit), error::Traversal>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.inner.next()?;
match next {
Ok(commit) => {
let version = self
.inner
.current_segment_header()
.map(|hdr| hdr.log_format_version)
.expect("segment header none even though segment yielded a commit");
Some(Ok((version, commit.into())))
}
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, iter::repeat};
use super::*;
use crate::{
payload::{ArrayDecodeError, ArrayDecoder},
tests::helpers::{fill_log, mem_log},
};
#[test]
fn rotate_segments_simple() {
let mut log = mem_log::<[u8; 32]>(128);
for _ in 0..3 {
log.append([0; 32]).unwrap();
log.commit().unwrap();
}
let offsets = log.repo.existing_offsets().unwrap();
assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
assert_eq!(offsets[offsets.len() - 1], 2);
}
#[test]
fn huge_commit() {
let mut log = mem_log::<[u8; 32]>(32);
log.append([0; 32]).unwrap();
log.append([1; 32]).unwrap();
log.commit().unwrap();
assert!(log.head.len() > log.opts.max_segment_size);
log.append([2; 32]).unwrap();
log.commit().unwrap();
assert_eq!(&log.tail, &[0]);
assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
}
#[test]
fn traverse_commits() {
let mut log = mem_log::<[u8; 32]>(32);
fill_log(&mut log, 10, repeat(1));
for (i, commit) in (0..10).zip(log.commits_from(0)) {
assert_eq!(i, commit.unwrap().min_tx_offset);
}
}
#[test]
fn traverse_commits_with_offset() {
let mut log = mem_log::<[u8; 32]>(32);
fill_log(&mut log, 10, repeat(1));
for offset in 0..10 {
for commit in log.commits_from(offset) {
let commit = commit.unwrap();
assert!(commit.min_tx_offset >= offset);
}
}
assert_eq!(0, log.commits_from(10).count());
}
#[test]
fn fold_transactions_with_offset() {
let mut log = mem_log::<[u8; 32]>(32);
fill_log(&mut log, 10, repeat(1));
struct CountDecoder {
count: Cell<u64>,
next_tx_offset: Cell<u64>,
}
impl Decoder for &CountDecoder {
type Record = [u8; 32];
type Error = ArrayDecodeError;
fn decode_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
&self,
_version: u8,
_tx_offset: u64,
_reader: &mut R,
) -> Result<Self::Record, Self::Error> {
unreachable!("Folding never calls `decode_record`")
}
fn consume_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> Result<(), Self::Error> {
let decoder = ArrayDecoder::<32>;
decoder.consume_record(version, tx_offset, reader)?;
self.count.set(self.count.get() + 1);
let expected_tx_offset = self.next_tx_offset.get();
assert_eq!(expected_tx_offset, tx_offset);
self.next_tx_offset.set(expected_tx_offset + 1);
Ok(())
}
fn skip_record<'a, R: spacetimedb_sats::buffer::BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> Result<(), Self::Error> {
let decoder = ArrayDecoder::<32>;
decoder.consume_record(version, tx_offset, reader)?;
Ok(())
}
}
for offset in 0..10 {
let decoder = CountDecoder {
count: Cell::new(0),
next_tx_offset: Cell::new(offset),
};
log.fold_transactions_from(offset, &decoder).unwrap();
assert_eq!(decoder.count.get(), 10 - offset);
assert_eq!(decoder.next_tx_offset.get(), 10);
}
}
#[test]
fn traverse_commits_ignores_duplicates() {
let mut log = mem_log::<[u8; 32]>(1024);
log.append([42; 32]).unwrap();
let commit1 = log.head.commit.clone();
log.commit().unwrap();
log.head.commit = commit1.clone();
log.commit().unwrap();
log.append([43; 32]).unwrap();
let commit2 = log.head.commit.clone();
log.commit().unwrap();
assert_eq!(
[commit1, commit2].as_slice(),
&log.commits_from(0)
.map_ok(Commit::from)
.collect::<Result<Vec<_>, _>>()
.unwrap()
);
}
#[test]
fn traverse_commits_errors_when_forked() {
let mut log = mem_log::<[u8; 32]>(1024);
log.append([42; 32]).unwrap();
log.commit().unwrap();
log.head.commit = Commit {
min_tx_offset: 0,
n: 1,
records: [43; 32].to_vec(),
};
log.commit().unwrap();
let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
matches!(res, Err(error::Traversal::Forked { offset: 0 })),
"expected fork error: {res:?}"
)
}
#[test]
fn traverse_commits_errors_when_offset_not_contiguous() {
let mut log = mem_log::<[u8; 32]>(1024);
log.append([42; 32]).unwrap();
log.commit().unwrap();
log.head.commit.min_tx_offset = 18;
log.append([42; 32]).unwrap();
log.commit().unwrap();
let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
matches!(
res,
Err(error::Traversal::OutOfOrder {
expected_offset: 1,
actual_offset: 18,
prev_error: None
})
),
"expected fork error: {res:?}"
)
}
#[test]
fn traverse_transactions() {
let mut log = mem_log::<[u8; 32]>(32);
let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
for (i, tx) in (0..total_txs).zip(log.transactions_from(0, &ArrayDecoder)) {
assert_eq!(i, tx.unwrap().offset);
}
}
#[test]
fn traverse_transactions_with_offset() {
let mut log = mem_log::<[u8; 32]>(32);
let total_txs = fill_log(&mut log, 10, (1..=3).cycle()) as u64;
for offset in 0..total_txs {
let mut iter = log.transactions_from(offset, &ArrayDecoder);
assert_eq!(offset, iter.next().expect("at least one tx expected").unwrap().offset);
for tx in iter {
assert!(tx.unwrap().offset >= offset);
}
}
assert_eq!(0, log.transactions_from(total_txs, &ArrayDecoder).count());
}
#[test]
fn traverse_empty() {
let log = mem_log::<[u8; 32]>(32);
assert_eq!(0, log.commits_from(0).count());
assert_eq!(0, log.commits_from(42).count());
assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
assert_eq!(0, log.transactions_from(42, &ArrayDecoder).count());
}
#[test]
fn reset_hard() {
let mut log = mem_log::<[u8; 32]>(128);
fill_log(&mut log, 50, (1..=10).cycle());
log = log.reset().unwrap();
assert_eq!(0, log.transactions_from(0, &ArrayDecoder).count());
}
#[test]
fn reset_to_offset() {
let mut log = mem_log::<[u8; 32]>(128);
let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;
for offset in (0..total_txs).rev() {
log = log.reset_to(offset).unwrap();
assert_eq!(
offset,
log.transactions_from(0, &ArrayDecoder)
.map(Result::unwrap)
.last()
.unwrap()
.offset
);
assert_eq!(
offset + 1,
log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() as u64
);
}
}
#[test]
fn reset_to_offset_many_txs_per_commit() {
let mut log = mem_log::<[u8; 32]>(128);
let total_txs = fill_log(&mut log, 50, (1..=10).cycle()) as u64;
log = log.reset_to(total_txs).unwrap();
assert_eq!(total_txs, log.transactions_from(0, &ArrayDecoder).count() as u64);
let middle_commit = log.commits_from(0).nth(25).unwrap().unwrap();
log = log.reset_to(middle_commit.min_tx_offset + 1).unwrap();
assert_eq!(
middle_commit.tx_range().end,
log.transactions_from(0, &ArrayDecoder).count() as u64
);
log = log.reset_to(middle_commit.min_tx_offset).unwrap();
assert_eq!(
middle_commit.tx_range().end,
log.transactions_from(0, &ArrayDecoder).count() as u64
);
log = log.reset_to(1).unwrap();
assert_eq!(3, log.transactions_from(0, &ArrayDecoder).count() as u64);
log = log.reset_to(0).unwrap();
assert_eq!(1, log.transactions_from(0, &ArrayDecoder).count() as u64);
}
#[test]
fn reopen() {
let mut log = mem_log::<[u8; 32]>(1024);
let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
assert_eq!(
total_txs,
log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
);
let mut log = Generic::<_, [u8; 32]>::open(
log.repo.clone(),
Options {
max_segment_size: 1024,
..Options::default()
},
)
.unwrap();
total_txs += fill_log(&mut log, 100, (1..=10).cycle());
assert_eq!(
total_txs,
log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
);
}
}