use core::ops::{Range, RangeBounds};
use crate::panic_messages as panic;
use crate::*;
#[derive(Clone)]
pub struct Replica {
id: ReplicaId,
run_tree: RunTree,
lamport_clock: LamportClock,
run_clock: RunClock,
version_map: VersionMap,
deletion_map: DeletionMap,
backlog: Backlog,
}
impl Replica {
#[doc(hidden)]
pub fn assert_invariants(&self) {
self.run_tree.assert_invariants();
self.backlog.assert_invariants(&self.version_map, &self.deletion_map);
}
#[doc(hidden)]
pub fn average_gtree_inode_occupancy(&self) -> f32 {
self.run_tree.average_inode_occupancy()
}
#[inline]
pub fn backlogged_deletions(&mut self) -> BackloggedDeletions<'_> {
BackloggedDeletions::from_replica(self)
}
#[inline]
pub fn backlogged_insertions(&mut self) -> BackloggedInsertions<'_> {
BackloggedInsertions::from_replica(self)
}
#[inline]
pub(crate) fn backlog_mut(&mut self) -> &mut Backlog {
&mut self.backlog
}
#[inline]
pub(crate) fn can_merge_deletion(&self, deletion: &Deletion) -> bool {
debug_assert!(!self.has_merged_deletion(deletion));
(
self.deletion_map.get(deletion.deleted_by()) + 1
== deletion.deletion_ts()
) && (
self.version_map >= *deletion.version_map()
)
}
#[inline]
pub(crate) fn can_merge_insertion(&self, insertion: &Insertion) -> bool {
debug_assert!(!self.has_merged_insertion(insertion));
(
self.version_map.get(insertion.inserted_by()) == insertion.start()
) && (
self.has_anchor(insertion.anchor())
)
}
#[track_caller]
#[inline]
pub fn create_anchor(
&self,
at_offset: Length,
with_bias: AnchorBias,
) -> Anchor {
if at_offset > self.len() {
panic::offset_out_of_bounds(at_offset, self.len());
}
self.run_tree.create_anchor(at_offset, with_bias)
}
#[doc(hidden)]
pub fn debug(&self) -> debug::DebugAsSelf<'_> {
self.into()
}
#[doc(hidden)]
pub fn debug_as_btree(&self) -> debug::DebugAsBtree<'_> {
self.into()
}
#[cfg(feature = "encode")]
#[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
#[track_caller]
#[inline]
pub fn decode(
id: ReplicaId,
encoded: &EncodedReplica<'_>,
) -> Result<Self, DecodeError> {
if id == 0 {
panic::replica_id_is_zero();
}
let (
run_tree,
lamport_clock,
mut version_map,
mut deletion_map,
backlog,
) = encoded.to_replica()?;
version_map.fork_in_place(id, 0);
deletion_map.fork_in_place(id, 0);
let replica = Self {
id,
run_tree,
run_clock: RunClock::new(),
lamport_clock,
version_map,
deletion_map,
backlog,
};
Ok(replica)
}
#[track_caller]
#[must_use]
#[inline]
pub fn deleted<R>(&mut self, range: R) -> Deletion
where
R: RangeBounds<Length>,
{
let (start, end) = range_bounds_to_start_end(range, 0, self.len());
if end > self.len() {
panic::offset_out_of_bounds(end, self.len());
}
if start > end {
panic::start_greater_than_end(start, end);
}
if start == end {
return Deletion::no_op();
}
let deleted_range = (start..end).into();
let mut version_map = VersionMap::new(self.id(), 0);
let (start, end) =
self.run_tree.delete(deleted_range, &mut version_map);
for (id, ts) in version_map.iter_mut() {
*ts = self.version_map.get(id);
}
*self.deletion_map.this_mut() += 1;
Deletion::new(start, end, version_map, self.deletion_map.this())
}
#[doc(hidden)]
pub fn empty_leaves(&self) -> (usize, usize) {
self.run_tree.count_empty_leaves()
}
#[doc(hidden)]
pub fn eq_decoded(&self, other: &Self) -> bool {
self.run_tree == other.run_tree && self.backlog == other.backlog
}
#[cfg(feature = "encode")]
#[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
#[inline]
pub fn encode(&self) -> EncodedReplica<'static> {
EncodedReplica::from_replica(self)
}
#[track_caller]
#[inline]
pub fn fork(&self, new_id: ReplicaId) -> Self {
if new_id == 0 {
panic::replica_id_is_zero();
}
if new_id == self.id {
panic::replica_id_equal_to_forked();
}
Self {
id: new_id,
run_tree: self.run_tree.clone(),
run_clock: RunClock::new(),
lamport_clock: self.lamport_clock,
version_map: self.version_map.fork(new_id, 0),
deletion_map: self.deletion_map.fork(new_id, 0),
backlog: self.backlog.clone(),
}
}
#[inline]
fn has_anchor(&self, anchor: InnerAnchor) -> bool {
self.version_map.get(anchor.replica_id()) >= anchor.offset()
}
#[inline]
fn has_merged_deletion(&self, deletion: &Deletion) -> bool {
self.deletion_map.get(deletion.deleted_by()) >= deletion.deletion_ts()
}
#[inline]
fn has_merged_insertion(&self, insertion: &Insertion) -> bool {
self.version_map.get(insertion.inserted_by()) > insertion.start()
}
#[inline]
pub fn id(&self) -> ReplicaId {
self.id
}
#[track_caller]
#[must_use]
#[inline]
pub fn inserted(&mut self, at_offset: Length, len: Length) -> Insertion {
if at_offset > self.len() {
panic::offset_out_of_bounds(at_offset, self.len());
}
if len == 0 {
return Insertion::no_op();
}
let start = self.version_map.this();
*self.version_map.this_mut() += len;
let end = self.version_map.this();
let text = Text::new(self.id, start..end);
let anchor = self.run_tree.insert(
at_offset,
text.clone(),
&mut self.run_clock,
&mut self.lamport_clock,
);
Insertion::new(
anchor,
text,
self.lamport_clock.highest(),
self.run_clock.last(),
)
}
#[allow(clippy::len_without_is_empty)]
#[doc(hidden)]
pub fn len(&self) -> Length {
self.run_tree.len()
}
#[must_use]
#[inline]
pub fn integrate_deletion(
&mut self,
deletion: &Deletion,
) -> Vec<Range<Length>> {
if deletion.is_no_op() || self.has_merged_deletion(deletion) {
Vec::new()
} else if self.can_merge_deletion(deletion) {
self.merge_unchecked_deletion(deletion)
} else {
self.backlog.insert_deletion(deletion.clone());
Vec::new()
}
}
#[must_use]
#[inline]
pub fn integrate_insertion(
&mut self,
insertion: &Insertion,
) -> Option<Length> {
if insertion.is_no_op() || self.has_merged_insertion(insertion) {
None
} else if self.can_merge_insertion(insertion) {
Some(self.merge_unchecked_insertion(insertion))
} else {
self.backlog.insert_insertion(insertion.clone());
None
}
}
#[inline]
pub(crate) fn merge_unchecked_deletion(
&mut self,
deletion: &Deletion,
) -> Vec<Range<Length>> {
debug_assert!(self.can_merge_deletion(deletion));
let ranges = self.run_tree.merge_deletion(deletion);
*self.deletion_map.get_mut(deletion.deleted_by()) =
deletion.deletion_ts();
ranges
}
#[inline]
pub(crate) fn merge_unchecked_insertion(
&mut self,
insertion: &Insertion,
) -> Length {
debug_assert!(self.can_merge_insertion(insertion));
let offset = self.run_tree.merge_insertion(insertion);
*self.version_map.get_mut(insertion.inserted_by()) += insertion.len();
self.lamport_clock.merge(insertion.lamport_ts());
offset
}
#[track_caller]
#[inline]
pub fn new(id: ReplicaId, len: Length) -> Self {
if id == 0 {
panic::replica_id_is_zero();
}
let mut run_clock = RunClock::new();
let mut lamport_clock = LamportClock::new();
let initial_text = Text::new(id, 0..len);
let first_run = EditRun::new_visible(
initial_text,
run_clock.next(),
lamport_clock.next(),
);
let run_tree = RunTree::new(first_run);
Self {
id,
run_tree,
run_clock,
lamport_clock,
version_map: VersionMap::new(id, len),
deletion_map: DeletionMap::new(id, 0),
backlog: Backlog::new(),
}
}
#[doc(hidden)]
pub fn num_runs(&self) -> usize {
self.run_tree.count_empty_leaves().1
}
#[inline]
pub fn resolve_anchor(&self, anchor: Anchor) -> Option<Length> {
if self.has_anchor(anchor.inner()) {
Some(self.run_tree.resolve_anchor(anchor))
} else {
None
}
}
}
impl core::fmt::Debug for Replica {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
struct DebugHexU64(u64);
impl core::fmt::Debug for DebugHexU64 {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "{:x}", self.0)
}
}
f.debug_tuple("Replica").field(&DebugHexU64(self.id)).finish()
}
}
pub type LamportTs = u64;
#[derive(Copy, Clone)]
pub struct LamportClock(LamportTs);
impl core::fmt::Debug for LamportClock {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl LamportClock {
#[inline]
pub fn highest(&self) -> LamportTs {
self.0.saturating_sub(1)
}
#[inline]
fn merge(&mut self, remote_ts: LamportTs) {
if remote_ts >= self.0 {
self.0 = remote_ts + 1;
}
}
#[inline]
fn new() -> Self {
Self(0)
}
#[inline]
pub fn next(&mut self) -> LamportTs {
let next = self.0;
self.0 += 1;
next
}
}
pub type RunTs = u64;
#[derive(Copy, Clone)]
pub struct RunClock(RunTs);
impl core::fmt::Debug for RunClock {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl RunClock {
#[inline]
fn last(&self) -> RunTs {
self.0.saturating_sub(1)
}
#[inline]
fn new() -> Self {
Self(0)
}
#[inline]
pub fn next(&mut self) -> RunTs {
let next = self.0;
self.0 += 1;
next
}
}
pub type DeletionTs = u64;
#[cfg(feature = "encode")]
mod encode {
use super::*;
use crate::backlog::encode::BacklogDecodeError;
use crate::encode::{Decode, Encode, IntDecodeError};
use crate::run_tree::encode::RunTreeDecodeError;
use crate::version_map::encode::BaseMapDecodeError;
impl Encode for LamportClock {
#[inline(always)]
fn encode(&self, buf: &mut Vec<u8>) {
self.0.encode(buf)
}
}
impl Decode for LamportClock {
type Value = Self;
type Error = IntDecodeError;
#[inline(always)]
fn decode(buf: &[u8]) -> Result<(Self, &[u8]), IntDecodeError> {
LamportTs::decode(buf).map(|(ts, buf)| (Self(ts), buf))
}
}
impl Encode for Replica {
#[inline(always)]
fn encode(&self, buf: &mut Vec<u8>) {
self.run_tree.encode(buf);
self.lamport_clock.encode(buf);
self.version_map.encode(buf);
self.deletion_map.encode(buf);
self.backlog.encode(buf);
}
}
pub(crate) enum ReplicaDecodeError {
Backlog(BacklogDecodeError),
DeletionMap(BaseMapDecodeError<DeletionTs>),
Int(IntDecodeError),
RunTree(RunTreeDecodeError),
VersionMap(BaseMapDecodeError<Length>),
}
impl From<BacklogDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn from(err: BacklogDecodeError) -> Self {
Self::Backlog(err)
}
}
impl From<BaseMapDecodeError<DeletionTs>> for ReplicaDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<DeletionTs>) -> Self {
Self::DeletionMap(err)
}
}
impl From<IntDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn from(err: IntDecodeError) -> Self {
Self::Int(err)
}
}
impl From<RunTreeDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn from(err: RunTreeDecodeError) -> Self {
Self::RunTree(err)
}
}
impl From<BaseMapDecodeError<Length>> for ReplicaDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<Length>) -> Self {
Self::VersionMap(err)
}
}
impl core::fmt::Display for ReplicaDecodeError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let err: &dyn core::fmt::Display = match self {
Self::Backlog(err) => err,
Self::DeletionMap(err) => err,
Self::Int(err) => err,
Self::RunTree(err) => err,
Self::VersionMap(err) => err,
};
write!(f, "Replica: couldn't be decoded: {err}")
}
}
impl Decode for Replica {
type Value = (RunTree, LamportClock, VersionMap, DeletionMap, Backlog);
type Error = ReplicaDecodeError;
#[inline(always)]
fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
let (run_tree, buf) = RunTree::decode(buf)?;
let (lamport_clock, buf) = LamportClock::decode(buf)?;
let (version_map, buf) = VersionMap::decode(buf)?;
let (deletion_map, buf) = DeletionMap::decode(buf)?;
let (backlog, buf) = Backlog::decode(buf)?;
let this =
(run_tree, lamport_clock, version_map, deletion_map, backlog);
Ok((this, buf))
}
}
}
mod debug {
use core::fmt::Debug;
use super::*;
pub struct DebugAsSelf<'a>(BaseDebug<'a, run_tree::DebugAsSelf<'a>>);
impl<'a> From<&'a Replica> for DebugAsSelf<'a> {
#[inline]
fn from(replica: &'a Replica) -> DebugAsSelf<'a> {
let base = BaseDebug {
replica,
debug_run_tree: replica.run_tree.debug_as_self(),
};
Self(base)
}
}
impl core::fmt::Debug for DebugAsSelf<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
pub struct DebugAsBtree<'a>(BaseDebug<'a, run_tree::DebugAsBtree<'a>>);
impl<'a> From<&'a Replica> for DebugAsBtree<'a> {
#[inline]
fn from(replica: &'a Replica) -> DebugAsBtree<'a> {
let base = BaseDebug {
replica,
debug_run_tree: replica.run_tree.debug_as_btree(),
};
Self(base)
}
}
impl core::fmt::Debug for DebugAsBtree<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
struct BaseDebug<'a, T: Debug> {
replica: &'a Replica,
debug_run_tree: T,
}
impl<T: Debug> Debug for BaseDebug<'_, T> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let replica = &self.replica;
f.debug_struct("Replica")
.field("id", &replica.id)
.field("run_tree", &self.debug_run_tree)
.field("run_indices", &replica.run_tree.run_indices())
.field("lamport_clock", &replica.lamport_clock)
.field("run_clock", &replica.run_clock)
.field("version_map", &replica.version_map)
.field("deletion_map", &replica.deletion_map)
.field("backlog", &replica.backlog)
.finish()
}
}
}