use std::{fmt::Display, ops::RangeInclusive};
use bilrost::Message;
use crate::core::{LogId, commit_hash::CommitHash, gid::VolumeId, lsn::LSN};
#[derive(Debug, Clone, Message, PartialEq, Eq)]
pub struct SyncPoint {
#[bilrost(1)]
pub remote: LSN,
#[bilrost(2)]
pub local_watermark: Option<LSN>,
}
#[derive(Debug, Clone, Message, PartialEq, Eq)]
pub struct PendingCommit {
#[bilrost(1)]
pub local: LSN,
#[bilrost(2)]
pub commit: LSN,
#[bilrost(3)]
pub commit_hash: CommitHash,
}
impl From<PendingCommit> for SyncPoint {
fn from(value: PendingCommit) -> Self {
Self {
remote: value.commit,
local_watermark: Some(value.local),
}
}
}
#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
pub struct Volume {
#[bilrost(1)]
pub vid: VolumeId,
#[bilrost(2)]
pub local: LogId,
#[bilrost(3)]
pub remote: LogId,
#[bilrost(4)]
pub sync: Option<SyncPoint>,
#[bilrost(5)]
pub pending_commit: Option<PendingCommit>,
}
impl Volume {
pub fn new(
vid: VolumeId,
local: LogId,
remote: LogId,
sync: Option<SyncPoint>,
pending_commit: Option<PendingCommit>,
) -> Self {
Self { vid, local, remote, sync, pending_commit }
}
pub fn new_random() -> Self {
Self {
vid: VolumeId::random(),
local: LogId::random(),
remote: LogId::random(),
sync: None,
pending_commit: None,
}
}
pub fn with_sync(self, sync: Option<SyncPoint>) -> Self {
Self { sync, ..self }
}
pub fn sync(&self) -> Option<&SyncPoint> {
self.sync.as_ref()
}
pub fn with_pending_commit(self, pending_commit: Option<PendingCommit>) -> Self {
Self { pending_commit, ..self }
}
pub fn pending_commit(&self) -> Option<&PendingCommit> {
self.pending_commit.as_ref()
}
pub fn local_watermark(&self) -> Option<LSN> {
self.sync().and_then(|s| s.local_watermark)
}
pub fn remote_commit(&self) -> Option<LSN> {
self.sync().map(|s| s.remote)
}
pub fn local_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
AheadStatus { head, base: self.local_watermark() }.changes()
}
pub fn remote_changes(&self, head: Option<LSN>) -> Option<RangeInclusive<LSN>> {
AheadStatus {
head,
base: self.sync().map(|s| s.remote),
}
.changes()
}
pub fn status(&self, latest_local: Option<LSN>, latest_remote: Option<LSN>) -> VolumeStatus {
VolumeStatus {
vid: self.vid.clone(),
local: self.local.clone(),
local_status: AheadStatus {
head: latest_local,
base: self.local_watermark(),
},
remote: self.remote.clone(),
remote_status: AheadStatus {
head: latest_remote,
base: self.sync().map(|s| s.remote),
},
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct AheadStatus {
pub head: Option<LSN>,
pub base: Option<LSN>,
}
impl AheadStatus {
pub fn new(head: Option<LSN>, base: Option<LSN>) -> Self {
Self { head, base }
}
pub fn changes(&self) -> Option<RangeInclusive<LSN>> {
match (self.base, self.head) {
(None, None) => None,
(None, Some(head)) => Some(LSN::FIRST..=head),
(Some(base), Some(head)) => (base < head).then(|| base.next()..=head),
(Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
}
}
}
impl Display for AheadStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match (self.base, self.head) {
(Some(base), Some(head)) => {
let ahead = head.since(base).expect("BUG: monotonicity violation");
if ahead == 0 {
write!(f, "{head}")
} else {
write!(f, "{head}+{ahead}")
}
}
(None, Some(head)) => write!(f, "{head}"),
(None, None) => write!(f, "_"),
(Some(_), None) => unreachable!("BUG: snapshot behind sync point"),
}
}
}
#[derive(Debug)]
pub struct VolumeStatus {
pub vid: VolumeId,
pub local: LogId,
pub local_status: AheadStatus,
pub remote: LogId,
pub remote_status: AheadStatus,
}
impl Display for VolumeStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} r{}", self.local_status, self.remote_status)
}
}
impl VolumeStatus {
pub fn has_diverged(&self) -> bool {
self.local_status.changes().is_some() && self.remote_status.changes().is_some()
}
}