use std::{
io,
num::{NonZeroU16, NonZeroU64},
sync::RwLock,
};
use log::trace;
use repo::Repo;
use spacetimedb_paths::server::CommitLogDir;
pub mod commit;
pub mod commitlog;
mod index;
pub mod repo;
pub mod segment;
mod varchar;
mod varint;
pub use crate::{
commit::{Commit, StoredCommit},
payload::{Decoder, Encode},
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
varchar::Varchar,
};
pub mod error;
pub mod payload;
#[cfg(feature = "streaming")]
pub mod stream;
#[cfg(any(test, feature = "test"))]
pub mod tests;
#[derive(Clone, Copy, Debug, PartialEq)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "kebab-case")
)]
pub struct Options {
#[cfg_attr(feature = "serde", serde(default = "Options::default_log_format_version"))]
pub log_format_version: u8,
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
pub max_segment_size: u64,
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
pub max_records_in_commit: NonZeroU16,
#[cfg_attr(feature = "serde", serde(default = "Options::default_offset_index_interval_bytes"))]
pub offset_index_interval_bytes: NonZeroU64,
#[cfg_attr(
feature = "serde",
serde(default = "Options::default_offset_index_require_segment_fsync")
)]
pub offset_index_require_segment_fsync: bool,
}
impl Default for Options {
fn default() -> Self {
Self::DEFAULT
}
}
impl Options {
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX;
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
max_segment_size: Self::default_max_segment_size(),
max_records_in_commit: Self::default_max_records_in_commit(),
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
};
pub const fn default_log_format_version() -> u8 {
DEFAULT_LOG_FORMAT_VERSION
}
pub const fn default_max_segment_size() -> u64 {
Self::DEFAULT_MAX_SEGMENT_SIZE
}
pub const fn default_max_records_in_commit() -> NonZeroU16 {
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
}
pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
}
pub const fn default_offset_index_require_segment_fsync() -> bool {
Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC
}
pub fn offset_index_len(&self) -> u64 {
self.max_segment_size / self.offset_index_interval_bytes
}
}
pub struct Commitlog<T> {
inner: RwLock<commitlog::Generic<repo::Fs, T>>,
}
impl<T> Commitlog<T> {
pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
Ok(Self {
inner: RwLock::new(inner),
})
}
pub fn max_committed_offset(&self) -> Option<u64> {
self.inner.read().unwrap().max_committed_offset()
}
pub fn min_committed_offset(&self) -> Option<u64> {
self.inner.read().unwrap().min_committed_offset()
}
pub fn epoch(&self) -> u64 {
self.inner.read().unwrap().epoch()
}
pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
inner.set_epoch(epoch)?;
Ok(inner.max_committed_offset())
}
pub fn sync(&self) -> Option<u64> {
let mut inner = self.inner.write().unwrap();
trace!("sync commitlog");
inner.sync();
inner.max_committed_offset()
}
pub fn flush(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush commitlog");
inner.commit()?;
Ok(inner.max_committed_offset())
}
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush and sync commitlog");
inner.commit()?;
inner.sync();
Ok(inner.max_committed_offset())
}
pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
self.commits_from(0)
}
pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
self.inner.read().unwrap().commits_from(offset)
}
pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
self.inner.read().unwrap().repo.existing_offsets()
}
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
#[allow(clippy::readonly_write_lock)]
let inner = self.inner.write().unwrap();
assert!(!offsets.contains(&inner.head.min_tx_offset()));
offsets
.iter()
.try_for_each(|&offset| inner.repo.compress_segment(offset))
}
pub fn reset(self) -> io::Result<Self> {
let inner = self.inner.into_inner().unwrap().reset()?;
Ok(Self {
inner: RwLock::new(inner),
})
}
pub fn reset_to(self, offset: u64) -> io::Result<Self> {
let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
Ok(Self {
inner: RwLock::new(inner),
})
}
pub fn size_on_disk(&self) -> io::Result<u64> {
let inner = self.inner.read().unwrap();
inner.repo.size_on_disk()
}
}
impl<T: Encode> Commitlog<T> {
pub fn append(&self, txdata: T) -> Result<(), T> {
let mut inner = self.inner.write().unwrap();
inner.append(txdata)
}
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
let mut inner = self.inner.write().unwrap();
if let Err(txdata) = inner.append(txdata) {
if let Err(source) = inner.commit() {
return Err(error::Append { txdata, source });
}
let res = inner.append(txdata);
debug_assert!(res.is_ok(), "failed to append while holding write lock");
}
Ok(())
}
pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
self.transactions_from(0, de)
}
pub fn transactions_from<'a, D>(
&self,
offset: u64,
de: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
self.inner.read().unwrap().transactions_from(offset, de)
}
pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
self.fold_transactions_from(0, de)
}
pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
self.inner.read().unwrap().fold_transactions_from(offset, de)
}
}
pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
commitlog::committed_meta(repo::Fs::new(root)?)
}
pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
commits_from(root, 0)
}
pub fn commits_from(
root: CommitLogDir,
offset: u64,
) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
}
pub fn transactions<'a, D, T>(
root: CommitLogDir,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
transactions_from(root, 0, de)
}
pub fn transactions_from<'a, D, T>(
root: CommitLogDir,
offset: u64,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
}
pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
fold_transactions_from(root, 0, de)
}
pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
}