use core::ops::{Range, RangeBounds};
use crate::panic_messages as panic;
use crate::*;
pub struct Replica {
id: ReplicaId,
run_tree: RunTree,
lamport_clock: LamportClock,
run_clock: RunClock,
version_map: VersionMap,
deletion_map: DeletionMap,
backlog: Backlog,
}
impl Replica {
#[doc(hidden)]
pub fn assert_invariants(&self) {
self.run_tree.assert_invariants();
self.backlog.assert_invariants(&self.version_map, &self.deletion_map);
}
#[doc(hidden)]
pub fn average_gtree_inode_occupancy(&self) -> f32 {
self.run_tree.average_inode_occupancy()
}
#[inline]
pub fn backlogged_deletions(&mut self) -> BackloggedDeletions<'_> {
BackloggedDeletions::from_replica(self)
}
#[inline]
pub fn backlogged_insertions(&mut self) -> BackloggedInsertions<'_> {
BackloggedInsertions::from_replica(self)
}
#[inline]
pub(crate) fn backlog_mut(&mut self) -> &mut Backlog {
&mut self.backlog
}
#[inline]
pub(crate) fn can_merge_deletion(&self, deletion: &Deletion) -> bool {
debug_assert!(!self.has_merged_deletion(deletion));
(
self.deletion_map.get(deletion.deleted_by()) + 1
== deletion.deletion_ts()
) && (
self.version_map >= *deletion.version_map()
)
}
#[inline]
pub(crate) fn can_merge_insertion(&self, insertion: &Insertion) -> bool {
debug_assert!(!self.has_merged_insertion(insertion));
(
self.version_map.get(insertion.inserted_by()) == insertion.start()
) && (
self.version_map.get(insertion.anchor().replica_id())
>= insertion.anchor().character_ts()
)
}
#[doc(hidden)]
pub fn debug(&self) -> debug::DebugAsSelf<'_> {
self.into()
}
#[doc(hidden)]
pub fn debug_as_btree(&self) -> debug::DebugAsBtree<'_> {
self.into()
}
#[cfg(feature = "encode")]
#[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
#[track_caller]
#[inline]
pub fn decode(
id: ReplicaId,
encoded: &EncodedReplica,
) -> Result<Self, DecodeError> {
if id == 0 {
panic::replica_id_is_zero();
}
if encoded.protocol_version() != PROTOCOL_VERSION {
return Err(DecodeError::DifferentProtocol {
encoded_on: encoded.protocol_version(),
decoding_on: PROTOCOL_VERSION,
});
}
if encoded.checksum() != &checksum(encoded.bytes()) {
return Err(DecodeError::ChecksumFailed);
}
let Some((
run_tree,
lamport_clock,
mut version_map,
mut deletion_map,
backlog,
)) = encode::decode(encoded.bytes())
else {
return Err(DecodeError::InvalidData);
};
version_map.fork_in_place(id, 0);
deletion_map.fork_in_place(id, 0);
let replica = Self {
id,
run_tree,
run_clock: RunClock::new(),
lamport_clock,
version_map,
deletion_map,
backlog,
};
Ok(replica)
}
#[track_caller]
#[must_use]
#[inline]
pub fn deleted<R>(&mut self, range: R) -> Deletion
where
R: RangeBounds<Length>,
{
let (start, end) = range_bounds_to_start_end(range, 0, self.len());
if end > self.len() {
panic::offset_out_of_bounds(end, self.len());
}
if start > end {
panic::start_greater_than_end(start, end);
}
if start == end {
return Deletion::no_op();
}
let deleted_range = (start..end).into();
let (start, start_ts, end, end_ts) =
self.run_tree.delete(deleted_range);
*self.deletion_map.this_mut() += 1;
Deletion::new(
start,
start_ts,
end,
end_ts,
self.version_map.clone(),
self.deletion_map.this(),
)
}
#[doc(hidden)]
pub fn empty_leaves(&self) -> (usize, usize) {
self.run_tree.count_empty_leaves()
}
#[doc(hidden)]
pub fn eq_decoded(&self, other: &Self) -> bool {
self.run_tree == other.run_tree && self.backlog == other.backlog
}
#[cfg(feature = "encode")]
#[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
#[inline]
pub fn encode(&self) -> EncodedReplica {
let bytes = encode::encode(self);
let checksum = checksum(&bytes);
EncodedReplica::new(PROTOCOL_VERSION, checksum, bytes)
}
#[track_caller]
#[inline]
pub fn fork(&self, new_id: ReplicaId) -> Self {
if new_id == 0 {
panic::replica_id_is_zero();
}
Self {
id: new_id,
run_tree: self.run_tree.clone(),
run_clock: RunClock::new(),
lamport_clock: self.lamport_clock,
version_map: self.version_map.fork(new_id, 0),
deletion_map: self.deletion_map.fork(new_id, 0),
backlog: self.backlog.clone(),
}
}
#[inline]
fn has_merged_deletion(&self, deletion: &Deletion) -> bool {
self.deletion_map.get(deletion.deleted_by()) >= deletion.deletion_ts()
}
#[inline]
fn has_merged_insertion(&self, insertion: &Insertion) -> bool {
self.version_map.get(insertion.inserted_by()) > insertion.start()
}
#[inline]
pub fn id(&self) -> ReplicaId {
self.id
}
#[track_caller]
#[must_use]
#[inline]
pub fn inserted(&mut self, at_offset: Length, len: Length) -> Insertion {
if at_offset > self.len() {
panic::offset_out_of_bounds(at_offset, self.len());
}
if len == 0 {
return Insertion::no_op();
}
let start = self.version_map.this();
*self.version_map.this_mut() += len;
let end = self.version_map.this();
let text = Text::new(self.id, start..end);
let (anchor, anchor_ts) = self.run_tree.insert(
at_offset,
text.clone(),
&mut self.run_clock,
&mut self.lamport_clock,
);
Insertion::new(
anchor,
anchor_ts,
text,
self.lamport_clock.highest(),
self.run_clock.last(),
)
}
#[allow(clippy::len_without_is_empty)]
#[doc(hidden)]
pub fn len(&self) -> Length {
self.run_tree.len()
}
#[must_use]
#[inline]
pub fn integrate_deletion(
&mut self,
deletion: &Deletion,
) -> Vec<Range<Length>> {
if deletion.is_no_op() || self.has_merged_deletion(deletion) {
Vec::new()
} else if self.can_merge_deletion(deletion) {
self.merge_unchecked_deletion(deletion)
} else {
self.backlog.insert_deletion(deletion.clone());
Vec::new()
}
}
#[must_use]
#[inline]
pub fn integrate_insertion(
&mut self,
insertion: &Insertion,
) -> Option<Length> {
if insertion.is_no_op() || self.has_merged_insertion(insertion) {
None
} else if self.can_merge_insertion(insertion) {
Some(self.merge_unchecked_insertion(insertion))
} else {
self.backlog.insert_insertion(insertion.clone());
None
}
}
#[inline]
pub(crate) fn merge_unchecked_deletion(
&mut self,
deletion: &Deletion,
) -> Vec<Range<Length>> {
debug_assert!(self.can_merge_deletion(deletion));
let ranges = self.run_tree.merge_deletion(deletion);
*self.deletion_map.get_mut(deletion.deleted_by()) =
deletion.deletion_ts();
ranges
}
#[inline]
pub(crate) fn merge_unchecked_insertion(
&mut self,
insertion: &Insertion,
) -> Length {
debug_assert!(self.can_merge_insertion(insertion));
let offset = self.run_tree.merge_insertion(insertion);
*self.version_map.get_mut(insertion.inserted_by()) += insertion.len();
self.lamport_clock.merge(insertion.lamport_ts());
offset
}
#[track_caller]
#[inline]
pub fn new(id: ReplicaId, len: Length) -> Self {
if id == 0 {
panic::replica_id_is_zero();
}
let mut run_clock = RunClock::new();
let mut lamport_clock = LamportClock::new();
let initial_text = Text::new(id, 0..len);
let first_run =
EditRun::new(initial_text, run_clock.next(), lamport_clock.next());
let run_tree = RunTree::new(first_run);
Self {
id,
run_tree,
run_clock,
lamport_clock,
version_map: VersionMap::new(id, len),
deletion_map: DeletionMap::new(id, 0),
backlog: Backlog::new(),
}
}
#[doc(hidden)]
pub fn num_runs(&self) -> usize {
self.run_tree.count_empty_leaves().1
}
}
impl core::fmt::Debug for Replica {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
struct DebugHexU64(u64);
impl core::fmt::Debug for DebugHexU64 {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "{:x}", self.0)
}
}
f.debug_tuple("Replica").field(&DebugHexU64(self.id)).finish()
}
}
pub type LamportTs = u64;
#[derive(Copy, Clone)]
#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
pub struct LamportClock(LamportTs);
impl core::fmt::Debug for LamportClock {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl LamportClock {
#[inline]
pub fn highest(&self) -> LamportTs {
self.0.saturating_sub(1)
}
#[inline]
fn merge(&mut self, remote_ts: LamportTs) {
if remote_ts >= self.0 {
self.0 = remote_ts + 1;
}
}
#[inline]
fn new() -> Self {
Self(0)
}
#[inline]
pub fn next(&mut self) -> LamportTs {
let next = self.0;
self.0 += 1;
next
}
}
pub type RunTs = u64;
#[derive(Copy, Clone)]
pub struct RunClock(RunTs);
impl core::fmt::Debug for RunClock {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
impl RunClock {
#[inline]
fn last(&self) -> RunTs {
self.0.saturating_sub(1)
}
#[inline]
fn new() -> Self {
Self(0)
}
#[inline]
pub fn next(&mut self) -> RunTs {
let next = self.0;
self.0 += 1;
next
}
}
pub type DeletionTs = u64;
#[cfg(feature = "encode")]
mod encode {
use serde::{de, ser};
use super::*;
type EncodedFields =
(RunTree, LamportClock, VersionMap, DeletionMap, Backlog);
#[inline]
pub(super) fn encode(replica: &Replica) -> Vec<u8> {
let mut encoded = Vec::new();
encode_field(&mut encoded, &replica.run_tree);
encode_field(&mut encoded, &replica.lamport_clock);
encode_field(&mut encoded, &replica.version_map);
encode_field(&mut encoded, &replica.deletion_map);
encode_field(&mut encoded, &replica.backlog);
encoded
}
#[inline]
pub(super) fn decode(bytes: &[u8]) -> Option<EncodedFields> {
let (run_tree, bytes) = decode_field(bytes)?;
let (lamport_clock, bytes) = decode_field(bytes)?;
let (version_map, bytes) = decode_field(bytes)?;
let (deletion_map, bytes) = decode_field(bytes)?;
let (backlog, bytes) = decode_field(bytes)?;
if bytes.is_empty() {
Some((run_tree, lamport_clock, version_map, deletion_map, backlog))
} else {
None
}
}
#[inline]
fn encode_field<T>(buf: &mut Vec<u8>, field: &T)
where
T: ser::Serialize,
{
let field_bytes = serialize(field);
let len_bytes = field_bytes.len().to_le_bytes();
buf.extend_from_slice(&len_bytes);
buf.extend_from_slice(&field_bytes);
}
#[inline]
fn decode_field<'a, T>(buf: &'a [u8]) -> Option<(T, &'a [u8])>
where
T: de::Deserialize<'a>,
{
let (len_bytes, rest) = if buf.len() >= 8 {
buf.split_at(8)
} else {
return None;
};
let len_bytes: [u8; 8] = len_bytes.try_into().ok()?;
let len = usize::from_le_bytes(len_bytes);
let (encoded_field, rest) = if rest.len() >= len {
rest.split_at(len)
} else {
return None;
};
deserialize::<T>(encoded_field).map(|field| (field, rest))
}
#[inline]
fn serialize<T>(value: &T) -> Vec<u8>
where
T: ser::Serialize,
{
bincode::serialize(value).expect("failed to serialize")
}
#[inline]
fn deserialize<'a, T>(bytes: &'a [u8]) -> Option<T>
where
T: de::Deserialize<'a>,
{
bincode::deserialize(bytes).ok()
}
}
mod debug {
use core::fmt::Debug;
use super::*;
pub struct DebugAsSelf<'a>(BaseDebug<'a, run_tree::DebugAsSelf<'a>>);
impl<'a> From<&'a Replica> for DebugAsSelf<'a> {
#[inline]
fn from(replica: &'a Replica) -> DebugAsSelf<'a> {
let base = BaseDebug {
replica,
debug_run_tree: replica.run_tree.debug_as_self(),
};
Self(base)
}
}
impl<'a> core::fmt::Debug for DebugAsSelf<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
pub struct DebugAsBtree<'a>(BaseDebug<'a, run_tree::DebugAsBtree<'a>>);
impl<'a> From<&'a Replica> for DebugAsBtree<'a> {
#[inline]
fn from(replica: &'a Replica) -> DebugAsBtree<'a> {
let base = BaseDebug {
replica,
debug_run_tree: replica.run_tree.debug_as_btree(),
};
Self(base)
}
}
impl<'a> core::fmt::Debug for DebugAsBtree<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
self.0.fmt(f)
}
}
struct BaseDebug<'a, T: Debug> {
replica: &'a Replica,
debug_run_tree: T,
}
impl<'a, T: Debug> Debug for BaseDebug<'a, T> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let replica = &self.replica;
f.debug_struct("Replica")
.field("id", &replica.id)
.field("run_tree", &self.debug_run_tree)
.field("run_indices", &replica.run_tree.run_indices())
.field("lamport_clock", &replica.lamport_clock)
.field("run_clock", &replica.run_clock)
.field("version_map", &replica.version_map)
.field("deletion_map", &replica.deletion_map)
.field("backlog", &replica.backlog)
.finish()
}
}
}