use std::{collections::BTreeMap, ops::RangeInclusive};
use crate::core::{
CommitHashBuilder, LogId, PageCount, PageIdx, SegmentId, VolumeId,
commit::{Commit, SegmentIdx},
commit_hash::CommitHash,
logref::LogRef,
lsn::LSN,
};
use bytes::Bytes;
use culprit::ResultExt;
use smallvec::SmallVec;
use splinter_rs::{Optimizable, PartitionRead, Splinter};
use tryiter::TryIteratorExt;
use crate::{
GraftErr, LogicalErr,
local::fjall_storage::FjallStorage,
remote::{Remote, segment::SegmentBuilder},
rt::action::{Action, FetchLog},
snapshot::Snapshot,
volume::PendingCommit,
};
#[derive(Debug)]
pub struct RemoteCommit {
pub vid: VolumeId,
}
impl Action for RemoteCommit {
async fn run(self, storage: &FjallStorage, remote: &Remote) -> culprit::Result<(), GraftErr> {
attempt_recovery(storage, &self.vid).or_into_culprit("attempting recovery")?;
let Some(plan) = plan_commit(storage, &self.vid)? else {
return Ok(());
};
let (commit_hash, segment_idx, segment_chunks) = build_segment(storage, &plan)?;
remote
.put_segment(segment_idx.sid(), segment_chunks)
.await
.or_into_ctx()?;
storage
.read_write()
.remote_commit_prepare(
&self.vid,
PendingCommit {
local: *plan.lsns.end(),
commit: plan.commit_ref.lsn,
commit_hash: commit_hash.clone(),
},
)
.or_into_ctx()?;
let commit = Commit::new(
plan.commit_ref.log().clone(),
plan.commit_ref.lsn(),
plan.page_count,
)
.with_commit_hash(Some(commit_hash.clone()))
.with_segment_idx(Some(segment_idx));
let result = remote.put_commit(&commit).await;
match result {
Ok(()) => {
storage
.read_write()
.remote_commit_success(&self.vid, commit)
.or_into_ctx()?;
Ok(())
}
Err(err) if err.ctx().is_already_exists() => {
FetchLog {
log: commit.log,
max_lsn: Some(commit.lsn),
}
.run(storage, remote)
.await?;
attempt_recovery(storage, &self.vid)
.or_into_culprit("recovering from existing remote commit")
}
Err(err) => {
Err(err).or_into_ctx()
}
}
}
}
struct CommitPlan {
local: LogId,
lsns: RangeInclusive<LSN>,
commit_ref: LogRef,
page_count: PageCount,
}
fn plan_commit(
storage: &FjallStorage,
vid: &VolumeId,
) -> culprit::Result<Option<CommitPlan>, GraftErr> {
let reader = storage.read();
let volume = reader.volume(vid).or_into_ctx()?;
if volume.pending_commit().is_some() {
return Err(LogicalErr::VolumeNeedsRecovery(volume.vid).into());
}
let Some(latest_local) = reader.latest_lsn(&volume.local).or_into_ctx()? else {
return Ok(None);
};
let latest_remote = reader.latest_lsn(&volume.remote).or_into_ctx()?;
let page_count = reader
.page_count(&volume.local, latest_local)
.or_into_ctx()?
.expect("BUG: no page count for commit");
let Some(sync) = volume.sync() else {
assert_eq!(latest_remote, None, "BUG: remote should be empty");
return Ok(Some(CommitPlan {
local: volume.local.clone(),
lsns: LSN::FIRST..=latest_local,
commit_ref: LogRef::new(volume.remote, LSN::FIRST),
page_count,
}));
};
if volume.remote_changes(latest_remote).is_some() {
let status = volume.status(Some(latest_local), latest_remote);
tracing::debug!("volume {} has diverged; status=`{status}`", volume.local);
return Err(LogicalErr::VolumeDiverged(volume.vid).into());
}
let Some(local_lsns) = volume.local_changes(Some(latest_local)) else {
return Ok(None);
};
let commit_lsn = sync.remote.next();
Ok(Some(CommitPlan {
local: volume.local.clone(),
lsns: local_lsns,
commit_ref: LogRef::new(volume.remote.clone(), commit_lsn),
page_count,
}))
}
fn build_segment(
storage: &FjallStorage,
plan: &CommitPlan,
) -> culprit::Result<(CommitHash, SegmentIdx, SmallVec<[Bytes; 1]>), GraftErr> {
let reader = storage.read();
let segment_path = Snapshot::new(plan.local.clone(), plan.lsns.clone());
let mut page_count = plan.page_count;
let mut pages = BTreeMap::new();
let mut pageset = Splinter::default();
let mut commits = reader.commits(&segment_path);
while let Some(commit) = commits.try_next().or_into_ctx()? {
page_count = page_count.min(commit.page_count);
if let Some(idx) = commit.segment_idx {
let mut commit_pages = idx.pageset;
if commit_pages.last().map(|idx| idx.pages()) > Some(page_count) {
commit_pages.truncate(page_count);
}
let outstanding = Splinter::from(commit_pages) - &pageset;
for pageidx in outstanding.iter() {
let pageidx = unsafe { PageIdx::new_unchecked(pageidx) };
debug_assert!(plan.page_count.contains(pageidx));
let page = reader.read_page(idx.sid.clone(), pageidx).or_into_ctx()?;
pages.insert(pageidx, page.expect("BUG: missing page"));
}
pageset |= outstanding;
}
}
pageset.optimize();
let mut segment_builder = SegmentBuilder::new();
let mut commithash_builder = CommitHashBuilder::new(
plan.commit_ref.log().clone(),
plan.commit_ref.lsn(),
plan.page_count,
);
let sid = SegmentId::random();
let mut batch = storage.batch();
for (pageidx, page) in pages {
commithash_builder.write_page(pageidx, &page);
segment_builder.write(pageidx, &page);
batch.write_page(sid.clone(), pageidx, page);
}
let commit_hash = commithash_builder.build();
let (frames, chunks) = segment_builder.finish();
let idx = SegmentIdx::new(sid, pageset.into()).with_frames(frames);
batch.commit().or_into_ctx()?;
Ok((commit_hash, idx, chunks))
}
fn attempt_recovery(storage: &FjallStorage, vid: &VolumeId) -> culprit::Result<(), GraftErr> {
let reader = storage.read();
let volume = reader.volume(vid).or_into_ctx()?;
if let Some(pending) = volume.pending_commit {
tracing::debug!(?pending, "got pending commit");
match storage
.read()
.get_commit(&volume.remote, pending.commit)
.or_into_ctx()?
{
Some(commit) if commit.commit_hash() == Some(&pending.commit_hash) => {
storage
.read_write()
.remote_commit_success(&volume.vid, commit)
.or_into_ctx()?;
Ok(())
}
Some(commit) => {
storage
.read_write()
.drop_pending_commit(&volume.vid)
.or_into_ctx()?;
tracing::warn!(
"remote commit rejected for volume {}, commit {}/{} already exists with different hash: {:?}",
volume.vid,
volume.remote,
pending.commit,
commit.commit_hash
);
Err(LogicalErr::VolumeDiverged(volume.vid).into())
}
None => {
Err(LogicalErr::VolumeNeedsRecovery(volume.vid).into())
}
}
} else {
Ok(())
}
}