use std::{fmt::Debug, ops::RangeInclusive, path::Path, sync::Arc};
use crate::core::{
LogId, PageCount, PageIdx, SegmentId, VolumeId,
checkpoints::CachedCheckpoints,
checksum::{Checksum, ChecksumBuilder},
commit::{Commit, SegmentIdx, SegmentRangeRef},
commit_hash::CommitHash,
logref::LogRef,
lsn::{LSN, LSNRangeExt, LSNSet},
page::Page,
pageset::PageSet,
};
use bytestring::ByteString;
use fjall::{Batch, Instant, KvSeparationOptions, PartitionCreateOptions};
use parking_lot::{Mutex, MutexGuard};
use tryiter::TryIteratorExt;
use crate::{
LogicalErr,
local::fjall_storage::{
keys::PageKey,
typed_partition::{TypedPartition, TypedPartitionSnapshot, fjall_batch_ext::FjallBatchExt},
},
snapshot::Snapshot,
volume::{PendingCommit, SyncPoint, Volume},
};
use culprit::{Result, ResultExt};
mod fjall_repr;
pub mod keys;
mod typed_partition;
mod values;
#[derive(Debug, thiserror::Error)]
pub enum FjallStorageErr {
#[error("Fjall error: {0}")]
FjallErr(#[from] fjall::Error),
#[error("Fjall LSM Tree error: {0}")]
LsmTreeErr(#[from] lsm_tree::Error),
#[error("Failed to decode key: {0}")]
DecodeErr(#[from] fjall_repr::DecodeErr),
#[error("I/O Error: {0}")]
IoErr(#[from] std::io::Error),
#[error("batch commit precondition failed")]
BatchPreconditionErr,
#[error(transparent)]
LogicalErr(#[from] LogicalErr),
}
pub struct FjallStorage {
keyspace: fjall::Keyspace,
tags: TypedPartition<ByteString, VolumeId>,
volumes: TypedPartition<VolumeId, Volume>,
checkpoints: TypedPartition<LogId, CachedCheckpoints>,
log: TypedPartition<LogRef, Commit>,
pages: TypedPartition<PageKey, Page>,
lock: Arc<Mutex<()>>,
}
impl Debug for FjallStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FjallStorage").finish()
}
}
impl FjallStorage {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, FjallStorageErr> {
Self::open_config(fjall::Config::new(path))
}
pub fn open_temporary() -> Result<Self, FjallStorageErr> {
let path = tempfile::tempdir()?.keep();
Self::open_config(fjall::Config::new(path).temporary(true))
}
fn open_config(config: fjall::Config) -> Result<Self, FjallStorageErr> {
let keyspace = config.open()?;
let tags = TypedPartition::open(&keyspace, "tags", Default::default())?;
let volumes = TypedPartition::open(&keyspace, "volumes", Default::default())?;
let checkpoints = TypedPartition::open(&keyspace, "checkpoints", Default::default())?;
let log = TypedPartition::open(&keyspace, "log", Default::default())?;
let pages = TypedPartition::open(
&keyspace,
"pages",
PartitionCreateOptions::default().with_kv_separation(KvSeparationOptions::default()),
)?;
Ok(Self {
keyspace,
tags,
volumes,
checkpoints,
log,
pages,
lock: Default::default(),
})
}
pub(crate) fn read(&self) -> ReadGuard<'_> {
ReadGuard::open(self)
}
pub(crate) fn batch(&self) -> WriteBatch<'_> {
WriteBatch::open(self)
}
pub(crate) fn read_write(&self) -> ReadWriteGuard<'_> {
ReadWriteGuard::open(self)
}
pub fn write_page(
&self,
sid: SegmentId,
pageidx: PageIdx,
page: Page,
) -> Result<(), FjallStorageErr> {
self.pages
.insert(PageKey::new(sid, pageidx), page)
.or_into_ctx()
}
pub fn remove_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<(), FjallStorageErr> {
self.pages.remove(PageKey::new(sid, pageidx)).or_into_ctx()
}
pub fn remove_page_range(
&self,
sid: &SegmentId,
pages: RangeInclusive<PageIdx>,
) -> Result<(), FjallStorageErr> {
let keyrange =
PageKey::new(sid.clone(), *pages.end())..=PageKey::new(sid.clone(), *pages.start());
let mut batch = self.keyspace.batch();
let mut iter = self.pages.snapshot().range(keyrange);
while let Some((key, _)) = iter.try_next()? {
batch.remove_typed(&self.pages, key);
}
batch.commit()?;
Ok(())
}
pub fn tag_delete(&self, tag: &str) -> Result<(), FjallStorageErr> {
self.tags.remove(tag.into())
}
pub fn volume_delete(&self, vid: &VolumeId) -> Result<(), FjallStorageErr> {
self.volumes.remove(vid.clone())
}
pub fn write_checkpoints(
&self,
log: LogId,
checkpoints: CachedCheckpoints,
) -> Result<(), FjallStorageErr> {
self.checkpoints.insert(log, checkpoints)
}
pub fn volume_from_snapshot(&self, snapshot: &Snapshot) -> Result<Volume, FjallStorageErr> {
let volume = Volume::new_random();
let commits = self
.read()
.commits(snapshot)
.collect::<Result<Vec<_>, _>>()?;
let mut lsn = LSN::FIRST.checked_add(commits.len() as u64).unwrap();
let mut batch = self.batch();
for commit in commits {
lsn = lsn.checked_prev().unwrap();
batch.write_commit(commit.with_log_id(volume.local.clone()).with_lsn(lsn));
}
batch.write_volume(volume.clone());
batch.commit()?;
Ok(volume)
}
}
pub struct ReadGuard<'a> {
storage: &'a FjallStorage,
seqno: Instant,
}
impl Drop for ReadGuard<'_> {
fn drop(&mut self) {
self.storage.keyspace.snapshot_tracker.close(self.seqno);
}
}
impl<'a> ReadGuard<'a> {
fn open(storage: &'a FjallStorage) -> ReadGuard<'a> {
let seqno = storage.keyspace.instant();
storage.keyspace.snapshot_tracker.open(seqno);
Self { storage, seqno }
}
#[inline]
fn _tags(&self) -> TypedPartitionSnapshot<ByteString, VolumeId> {
self.storage.tags.snapshot_at(self.seqno)
}
#[inline]
fn _volumes(&self) -> TypedPartitionSnapshot<VolumeId, Volume> {
self.storage.volumes.snapshot_at(self.seqno)
}
#[inline]
fn _checkpoints(&self) -> TypedPartitionSnapshot<LogId, CachedCheckpoints> {
self.storage.checkpoints.snapshot_at(self.seqno)
}
#[inline]
fn _log(&self) -> TypedPartitionSnapshot<LogRef, Commit> {
self.storage.log.snapshot_at(self.seqno)
}
#[inline]
fn _pages(&self) -> TypedPartitionSnapshot<PageKey, Page> {
self.storage.pages.snapshot_at(self.seqno)
}
pub fn iter_tags(
&self,
) -> impl Iterator<Item = Result<(ByteString, VolumeId), FjallStorageErr>> + use<> {
self._tags().range(..)
}
pub fn tag_exists(&self, tag: &str) -> Result<bool, FjallStorageErr> {
self._tags().contains(tag)
}
pub fn get_tag(&self, tag: &str) -> Result<Option<VolumeId>, FjallStorageErr> {
self._tags().get(tag)
}
pub fn latest_lsn(&self, log: &LogId) -> Result<Option<LSN>, FjallStorageErr> {
Ok(self._log().first(log)?.map(|(logref, _)| logref.lsn))
}
pub fn iter_volumes(&self) -> impl Iterator<Item = Result<Volume, FjallStorageErr>> + use<> {
self._volumes().values()
}
pub fn volume_exists(&self, vid: &VolumeId) -> Result<bool, FjallStorageErr> {
self._volumes().contains(vid)
}
pub fn volume(&self, vid: &VolumeId) -> Result<Volume, FjallStorageErr> {
self._volumes()
.get(vid)?
.ok_or_else(|| LogicalErr::VolumeNotFound(vid.clone()).into())
}
pub fn is_latest_snapshot(
&self,
vid: &VolumeId,
snapshot: &Snapshot,
) -> Result<bool, FjallStorageErr> {
let volume = self.volume(vid)?;
let latest_local = self.latest_lsn(&volume.local)?;
Ok(match snapshot.head() {
Some((log, lsn)) if log == &volume.local => Some(lsn) == latest_local,
Some((log, lsn)) if log == &volume.remote => {
if let Some(sync) = volume.sync {
lsn == sync.remote && sync.local_watermark == latest_local
} else {
false
}
}
Some(_) => false,
None => latest_local.is_none() && volume.sync().is_none(),
})
}
pub fn snapshot(&self, vid: &VolumeId) -> Result<Snapshot, FjallStorageErr> {
let volume = self.volume(vid)?;
let mut snapshot = Snapshot::EMPTY;
if let Some(latest) = self.latest_lsn(&volume.local)? {
if let Some(watermark) = volume.sync().and_then(|s| s.local_watermark) {
if watermark < latest {
snapshot.append(volume.local, watermark..=latest);
}
} else {
snapshot.append(volume.local, LSN::FIRST..=latest);
}
}
if let Some(remote) = volume.sync.map(|s| s.remote) {
snapshot.append(volume.remote, LSN::FIRST..=remote);
}
Ok(snapshot)
}
pub fn get_commit(&self, log: &LogId, lsn: LSN) -> Result<Option<Commit>, FjallStorageErr> {
self._log().get_owned(LogRef::new(log.clone(), lsn))
}
pub fn commits(
&self,
snapshot: &Snapshot,
) -> impl Iterator<Item = Result<Commit, FjallStorageErr>> {
let log = self._log();
snapshot.iter().flat_map(move |entry| {
let low = entry.start_ref();
let high = entry.end_ref();
let range = high..=low;
log.range(range).map_ok(|(_, commit)| Ok(commit))
})
}
pub fn iter_visible_pages(
&self,
snapshot: &Snapshot,
) -> impl Iterator<Item = Result<(SegmentIdx, PageSet), FjallStorageErr>> {
let mut pages = PageSet::FULL;
let mut page_count = PageCount::MAX;
self.commits(snapshot).try_filter_map(move |commit| {
if pages.is_empty() {
return Ok(None);
}
if commit.page_count < page_count {
page_count = commit.page_count;
pages.truncate(page_count);
}
if let Some(idx) = commit.segment_idx {
let mut commit_pages = idx.pageset.clone();
if commit_pages.last().map(|idx| idx.pages()) > Some(page_count) {
commit_pages.truncate(page_count);
}
let outstanding = pages.cut(&commit_pages);
if !outstanding.is_empty() {
return Ok(Some((idx, outstanding)));
}
}
Ok(None)
})
}
pub fn lsns(&self, log: &LogId, lsns: &RangeInclusive<LSN>) -> Result<LSNSet, FjallStorageErr> {
let low = LogRef::new(log.clone(), *lsns.start());
let high = LogRef::new(log.clone(), *lsns.end());
let range = high..=low;
self._log()
.range_keys(range)
.map_ok(|key| Ok(key.lsn()))
.collect()
}
pub fn search_page(
&self,
snapshot: &Snapshot,
pageidx: PageIdx,
) -> Result<Option<Commit>, FjallStorageErr> {
let mut commits = self.commits(snapshot);
while let Some(commit) = commits.try_next()? {
if !commit.page_count().contains(pageidx) {
break;
}
let Some(idx) = commit.segment_idx() else {
continue;
};
if !idx.contains(pageidx) {
continue;
}
return Ok(Some(commit));
}
Ok(None)
}
pub fn has_page(&self, sid: SegmentId, pageidx: PageIdx) -> Result<bool, FjallStorageErr> {
self._pages().contains(&PageKey::new(sid, pageidx))
}
pub fn read_page(
&self,
sid: SegmentId,
pageidx: PageIdx,
) -> Result<Option<Page>, FjallStorageErr> {
self._pages()
.get_owned(PageKey::new(sid, pageidx))
.or_into_ctx()
}
pub fn page_count(&self, log: &LogId, lsn: LSN) -> Result<Option<PageCount>, FjallStorageErr> {
Ok(self.get_commit(log, lsn)?.map(|c| c.page_count()))
}
pub fn checkpoints(&self, log: &LogId) -> Result<Option<CachedCheckpoints>, FjallStorageErr> {
self._checkpoints().get(log)
}
pub fn checksum(&self, snapshot: &Snapshot) -> Result<Checksum, FjallStorageErr> {
let pages = self._pages();
let mut builder = ChecksumBuilder::new();
let mut iter = self.iter_visible_pages(snapshot);
while let Some((idx, pageset)) = iter.try_next()? {
for pageidx in pageset.iter() {
let key = PageKey::new(idx.sid.clone(), pageidx);
if let Some(page) = pages.get(&key)? {
builder.write(&page);
}
}
}
Ok(builder.build())
}
pub fn find_missing_frames(
&self,
snapshot: &Snapshot,
) -> Result<Vec<SegmentRangeRef>, FjallStorageErr> {
let mut missing_frames = vec![];
let pages = self._pages();
let mut iter = self.iter_visible_pages(snapshot);
while let Some((idx, pageset)) = iter.try_next()? {
let frames = idx.iter_frames(|pages| pageset.contains_any(pages));
for frame in frames {
if let Some(first_page) = frame.pageset.first()
&& !pages.contains(&PageKey::new(frame.sid.clone(), first_page))?
{
missing_frames.push(frame);
}
}
}
Ok(missing_frames)
}
}
pub struct WriteBatch<'a> {
storage: &'a FjallStorage,
batch: Batch,
}
impl<'a> WriteBatch<'a> {
fn open(storage: &'a FjallStorage) -> Self {
Self { storage, batch: storage.keyspace.batch() }
}
pub fn write_tag(&mut self, tag: &str, vid: VolumeId) {
self.batch.insert_typed(&self.storage.tags, tag.into(), vid);
}
pub fn write_commit(&mut self, commit: Commit) {
self.batch
.insert_typed(&self.storage.log, commit.logref(), commit);
}
pub fn write_volume(&mut self, volume: Volume) {
self.batch
.insert_typed(&self.storage.volumes, volume.vid.clone(), volume);
}
pub fn write_page(&mut self, sid: SegmentId, pageidx: PageIdx, page: Page) {
self.batch
.insert_typed(&self.storage.pages, PageKey::new(sid, pageidx), page);
}
pub fn commit(self) -> Result<(), FjallStorageErr> {
self.batch.commit().or_into_ctx()
}
}
pub struct ReadWriteGuard<'a> {
_permit: MutexGuard<'a, ()>,
read: ReadGuard<'a>,
}
impl<'a> ReadWriteGuard<'a> {
fn open(storage: &'a FjallStorage) -> Self {
let _permit = storage.lock.lock();
let read = storage.read();
Self { _permit, read }
}
fn storage(&self) -> &'a FjallStorage {
self.read.storage
}
pub fn tag_replace(
&self,
tag: &str,
vid: VolumeId,
) -> Result<Option<VolumeId>, FjallStorageErr> {
let out = self.read.get_tag(tag)?;
self.storage().tags.insert(tag.into(), vid)?;
Ok(out)
}
pub fn volume_open(
self,
vid: Option<VolumeId>,
local: Option<LogId>,
remote: Option<LogId>,
) -> Result<Volume, FjallStorageErr> {
let vid = vid.unwrap_or_else(VolumeId::random);
if let Some(volume) = self.read._volumes().get(&vid)? {
if let Some(remote) = remote
&& volume.remote != remote
{
return Err(LogicalErr::VolumeRemoteMismatch {
vid: volume.vid,
expected: remote,
actual: volume.remote,
}
.into());
}
return Ok(volume);
}
let local = local.unwrap_or_else(LogId::random);
let remote = remote.unwrap_or_else(LogId::random);
let sync = self
.read
.latest_lsn(&remote)?
.map(|latest_remote| SyncPoint {
remote: latest_remote,
local_watermark: None,
});
let volume = Volume::new(vid.clone(), local, remote, sync, None);
self.storage().volumes.insert(vid, volume.clone())?;
tracing::debug!(
vid = ?volume.vid,
local_log = ?volume.local,
remote_log = ?volume.remote,
"open volume"
);
Ok(volume)
}
pub fn commit(
self,
vid: &VolumeId,
snapshot: Snapshot,
page_count: PageCount,
segment: SegmentIdx,
) -> Result<Snapshot, FjallStorageErr> {
if !self.read.is_latest_snapshot(vid, &snapshot)? {
return Err(LogicalErr::VolumeConcurrentWrite(vid.clone()).into());
}
let volume = self.read.volume(vid)?;
let commit_lsn = self
.read
.latest_lsn(&volume.local)?
.map_or(LSN::FIRST, |lsn| lsn.next());
tracing::debug!(vid=?volume.vid, log=?volume.local, %commit_lsn, "local commit");
let commit = Commit::new(volume.local.clone(), commit_lsn, page_count)
.with_segment_idx(Some(segment));
self.read.storage.log.insert(commit.logref(), commit)?;
ReadGuard::open(self.storage()).snapshot(&volume.vid)
}
pub fn remote_commit_prepare(
self,
vid: &VolumeId,
pending_commit: PendingCommit,
) -> Result<(), FjallStorageErr> {
let volume = self.read.volume(vid)?;
assert!(
volume.pending_commit().is_none(),
"BUG: pending commit is present"
);
if let Some(local_watermark) = volume.local_watermark() {
assert!(
local_watermark < pending_commit.local,
"BUG: local_watermark monotonicity violation"
);
}
let latest_remote = self.read.latest_lsn(&volume.remote)?;
assert_eq!(
latest_remote,
pending_commit.commit.checked_prev(),
"BUG: remote lsn monotonicity violation"
);
assert!(pending_commit.commit_hash != CommitHash::ZERO);
let volume = volume.with_pending_commit(Some(pending_commit));
self.storage().volumes.insert(volume.vid.clone(), volume)?;
Ok(())
}
pub fn remote_commit_success(
self,
vid: &VolumeId,
remote_commit: Commit,
) -> Result<(), FjallStorageErr> {
let volume = self.read.volume(vid)?;
let pending_commit = volume.pending_commit.unwrap();
assert_eq!(remote_commit.lsn(), pending_commit.commit);
assert_eq!(
remote_commit.commit_hash(),
Some(&pending_commit.commit_hash)
);
assert!(
!self.read._log().contains(&remote_commit.logref())?,
"BUG: remote commit already exists"
);
let volume = Volume {
sync: Some(pending_commit.into()),
pending_commit: None,
..volume
};
let mut batch = self.storage().batch();
batch.write_commit(remote_commit);
batch.write_volume(volume);
batch.commit()
}
pub fn drop_pending_commit(self, vid: &VolumeId) -> Result<(), FjallStorageErr> {
let volume = self.read.volume(vid)?;
self.storage()
.volumes
.insert(volume.vid.clone(), volume.with_pending_commit(None))
}
pub fn sync_remote_to_local(self, vid: VolumeId) -> Result<(), FjallStorageErr> {
let volume = self.read.volume(&vid)?;
let latest_remote = self.read.latest_lsn(&volume.remote).or_into_ctx()?;
let Some(remote_changes) = volume.remote_changes(latest_remote) else {
return Ok(());
};
let latest_local = self.read.latest_lsn(&volume.local).or_into_ctx()?;
if volume.local_changes(latest_local).is_some() {
let status = volume.status(latest_local, latest_remote);
tracing::debug!("volume {} has diverged; status=`{status}`", volume.vid);
return Err(LogicalErr::VolumeDiverged(volume.vid).into());
}
tracing::debug!(
vid = ?volume.vid,
sync = ?volume.sync(),
lsns = %remote_changes.to_string(),
local = ?volume.local,
remote = ?volume.remote,
"fast-forwarding volume"
);
let remote_lsn = *remote_changes.end();
let new_sync = match volume.sync() {
Some(sync) => {
assert!(
remote_lsn > sync.remote,
"BUG: attempt to sync volume to older version of the remote"
);
SyncPoint {
remote: remote_lsn,
local_watermark: sync.local_watermark,
}
}
None => SyncPoint {
remote: remote_lsn,
local_watermark: None,
},
};
self.storage()
.volumes
.insert(volume.vid.clone(), volume.with_sync(Some(new_sync)))
}
}