use std::collections::{HashMap, HashSet, VecDeque};
use calimero_node_primitives::sync::handshake::SyncHandshake;
use calimero_node_primitives::sync::protocol::SyncProtocol;
use calimero_node_primitives::sync::state_machine::{build_handshake, LocalSyncState};
use crate::sync_sim::actions::SelectedProtocol;
use calimero_primitives::context::ContextId;
use calimero_primitives::crdt::CrdtType;
use calimero_primitives::identity::PublicKey;
use calimero_storage::address::Id;
use calimero_storage::entities::Metadata;
use crate::sync_sim::actions::{EntityMetadata, StorageOp};
use crate::sync_sim::runtime::SimTime;
use crate::sync_sim::storage::SimStorage;
use crate::sync_sim::types::{
DeltaId, DigestEntity, EntityId, MessageId, NodeId, StateDigest, TimerId, TimerKind,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncState {
Idle,
Initiating { peer: NodeId },
Responding { peer: NodeId },
SnapshotTransfer { peer: NodeId, page: u32 },
HashComparison { peer: NodeId, level: u32 },
DeltaSync { peer: NodeId },
}
impl SyncState {
pub fn is_idle(&self) -> bool {
matches!(self, Self::Idle)
}
pub fn is_active(&self) -> bool {
!self.is_idle()
}
pub fn peer(&self) -> Option<&NodeId> {
match self {
Self::Idle => None,
Self::Initiating { peer }
| Self::Responding { peer }
| Self::SnapshotTransfer { peer, .. }
| Self::HashComparison { peer, .. }
| Self::DeltaSync { peer } => Some(peer),
}
}
}
#[derive(Debug, Clone)]
pub struct TimerEntry {
pub id: TimerId,
pub kind: TimerKind,
pub fire_time: SimTime,
}
const MAX_PROCESSED_MESSAGES: usize = 10_000;
pub const DEFAULT_SIM_BUFFER_CAPACITY: usize = 10_000;
const MAX_HIERARCHICAL_DEPTH: usize = 24;
#[derive(Debug)]
pub struct SimNode {
pub id: NodeId,
pub session: u64,
pub out_seq: u64,
storage: SimStorage,
entity_metadata: HashMap<EntityId, EntityMetadata>,
pub dag_heads: Vec<DeltaId>,
pub delta_buffer: VecDeque<DeltaId>,
buffered_operations: HashMap<DeltaId, Vec<crate::sync_sim::actions::StorageOp>>,
buffer_capacity: usize,
buffer_drops: u64,
pub sync_state: SyncState,
pub timers: Vec<TimerEntry>,
next_timer_id: u64,
processed_messages: HashSet<MessageId>,
processed_order: VecDeque<MessageId>,
sender_sessions: HashMap<String, u64>,
pub has_state: bool,
pub is_crashed: bool,
forced_protocol: Option<SelectedProtocol>,
}
impl SimNode {
pub const DEFAULT_CONTEXT_ID: [u8; 32] = [0xCA; 32];
pub fn new(id: impl Into<NodeId>) -> Self {
Self::with_buffer_capacity(id, DEFAULT_SIM_BUFFER_CAPACITY)
}
pub fn new_in_context(id: impl Into<NodeId>, context_id: ContextId) -> Self {
Self::with_context_and_buffer(id, context_id, DEFAULT_SIM_BUFFER_CAPACITY)
}
pub fn with_buffer_capacity(id: impl Into<NodeId>, buffer_capacity: usize) -> Self {
let node_id = id.into();
let context_id = Self::create_context_id(&node_id);
Self::with_context_and_buffer(node_id, context_id, buffer_capacity)
}
pub fn with_context_and_buffer(
id: impl Into<NodeId>,
context_id: ContextId,
buffer_capacity: usize,
) -> Self {
let node_id = id.into();
let executor_id = Self::create_executor_id(&node_id);
Self {
id: node_id,
session: 0,
out_seq: 0,
storage: SimStorage::new(context_id, executor_id),
entity_metadata: HashMap::new(),
dag_heads: vec![DeltaId::ZERO],
delta_buffer: VecDeque::new(), buffered_operations: HashMap::new(),
buffer_capacity,
buffer_drops: 0,
sync_state: SyncState::Idle,
timers: Vec::new(),
next_timer_id: 0,
processed_messages: HashSet::new(),
processed_order: VecDeque::new(),
sender_sessions: HashMap::new(),
has_state: false,
is_crashed: false,
forced_protocol: None,
}
}
fn create_context_id(node_id: &NodeId) -> ContextId {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(b"sim_context:");
hasher.update(node_id.as_str().as_bytes());
let hash: [u8; 32] = hasher.finalize().into();
ContextId::from(hash)
}
fn create_executor_id(node_id: &NodeId) -> PublicKey {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(b"sim_executor:");
hasher.update(node_id.as_str().as_bytes());
let hash: [u8; 32] = hasher.finalize().into();
PublicKey::from(hash)
}
pub fn id(&self) -> &NodeId {
&self.id
}
pub fn next_message_id(&mut self) -> MessageId {
let id = MessageId::new(self.id.as_str(), self.session, self.out_seq);
self.out_seq += 1;
id
}
pub fn next_timer_id(&mut self) -> TimerId {
let id = TimerId::new(self.next_timer_id);
self.next_timer_id += 1;
id
}
pub fn is_duplicate(&self, msg_id: &MessageId) -> bool {
if let Some(&last_session) = self.sender_sessions.get(&msg_id.sender) {
if msg_id.session < last_session {
return true;
}
}
self.processed_messages.contains(msg_id)
}
pub fn mark_processed(&mut self, msg_id: MessageId) {
let current = self
.sender_sessions
.entry(msg_id.sender.clone())
.or_insert(0);
if msg_id.session > *current {
*current = msg_id.session;
}
if self.processed_messages.insert(msg_id.clone()) {
self.processed_order.push_back(msg_id);
while self.processed_messages.len() > MAX_PROCESSED_MESSAGES {
if let Some(oldest) = self.processed_order.pop_front() {
self.processed_messages.remove(&oldest);
} else {
break;
}
}
}
}
pub fn state_digest(&self) -> StateDigest {
StateDigest(self.storage.root_hash())
}
pub fn entity_count(&self) -> usize {
self.storage.leaf_count()
}
pub fn root_hash(&self) -> [u8; 32] {
self.storage.root_hash()
}
pub fn context_id(&self) -> ContextId {
self.storage.context_id()
}
pub fn storage(&self) -> &SimStorage {
&self.storage
}
pub fn storage_mut(&mut self) -> &mut SimStorage {
&mut self.storage
}
pub fn has_any_state(&self) -> bool {
self.has_state || !self.entity_metadata.is_empty()
}
pub fn tree_stats(&self) -> (usize, usize, u32) {
let real_entities = self.entity_metadata.len();
let total_tree_nodes = self.storage.entity_count();
let tree_depth = self.storage.max_depth();
(real_entities, total_tree_nodes, tree_depth)
}
fn entity_id_to_storage_id(entity_id: EntityId) -> Id {
Id::new(entity_id.0)
}
fn entity_metadata_to_storage_metadata(metadata: &EntityMetadata) -> Metadata {
Metadata::with_crdt_type(
metadata.hlc_timestamp,
metadata.hlc_timestamp,
metadata.crdt_type.clone(),
)
}
pub fn dag_heads(&self) -> &[DeltaId] {
&self.dag_heads
}
pub fn buffer_size(&self) -> usize {
self.delta_buffer.len()
}
pub fn sync_timer_count(&self) -> usize {
self.timers
.iter()
.filter(|t| t.kind == TimerKind::Sync)
.count()
}
pub fn apply_storage_op(&mut self, op: StorageOp) {
match op {
StorageOp::Insert { id, data, metadata } | StorageOp::Update { id, data, metadata } => {
self.insert_entity_with_metadata(id, data, metadata);
}
StorageOp::Remove { id } => {
let storage_id = Self::entity_id_to_storage_id(id);
self.storage.remove_entity(storage_id);
self.entity_metadata.remove(&id);
}
}
}
pub fn set_timer(&mut self, id: TimerId, fire_time: SimTime, kind: TimerKind) {
self.timers.retain(|t| t.id != id);
self.timers.push(TimerEntry {
id,
kind,
fire_time,
});
}
pub fn cancel_timer(&mut self, id: TimerId) {
self.timers.retain(|t| t.id != id);
}
pub fn get_timer(&self, id: TimerId) -> Option<&TimerEntry> {
self.timers.iter().find(|t| t.id == id)
}
pub fn buffer_delta(&mut self, delta_id: DeltaId) -> bool {
if self.buffer_capacity == 0 {
self.buffer_drops += 1;
return false; }
if self.delta_buffer.contains(&delta_id) {
return true; }
if self.delta_buffer.len() >= self.buffer_capacity {
if let Some(evicted_id) = self.delta_buffer.pop_front() {
self.buffered_operations.remove(&evicted_id);
self.buffer_drops += 1;
}
self.delta_buffer.push_back(delta_id);
false } else {
self.delta_buffer.push_back(delta_id);
true }
}
pub fn clear_buffer(&mut self) {
self.delta_buffer.clear();
}
pub fn reset_buffer_state(&mut self) {
self.delta_buffer.clear();
self.buffered_operations.clear();
}
pub fn buffer_drops(&self) -> u64 {
self.buffer_drops
}
pub fn set_buffer_capacity(&mut self, capacity: usize) {
self.buffer_capacity = capacity;
}
pub fn buffer_operations(
&mut self,
delta_id: DeltaId,
operations: Vec<crate::sync_sim::actions::StorageOp>,
) {
self.buffered_operations.insert(delta_id, operations);
}
pub fn drain_buffered_operations(
&mut self,
) -> Vec<(DeltaId, Vec<crate::sync_sim::actions::StorageOp>)> {
let mut result = Vec::new();
for delta_id in &self.delta_buffer {
if let Some(ops) = self.buffered_operations.remove(delta_id) {
result.push((*delta_id, ops));
}
}
let orphan_count = self.buffered_operations.len();
debug_assert!(
orphan_count == 0,
"Found {} orphaned buffered operations (delta_id not in delta_buffer). \
This indicates a bug in the buffering logic.",
orphan_count
);
self.buffered_operations.clear();
result
}
pub fn buffered_operations_count(&self) -> usize {
self.buffered_operations.len()
}
pub fn finish_sync(&mut self) -> usize {
if !self.sync_state.is_active() {
return 0;
}
let buffered_ops = self.drain_buffered_operations();
let mut ops_applied = 0;
for (_delta_id, operations) in buffered_ops {
for op in operations {
self.apply_storage_op(op);
ops_applied += 1;
}
}
self.clear_buffer();
self.sync_state = SyncState::Idle;
ops_applied
}
pub fn crash(&mut self) {
self.timers.clear();
self.sync_state = SyncState::Idle;
self.delta_buffer.clear();
self.buffered_operations.clear();
self.processed_messages.clear();
self.processed_order.clear();
self.sender_sessions.clear();
self.out_seq = 0;
self.is_crashed = true;
}
pub fn restart(&mut self) {
self.session += 1;
self.is_crashed = false;
}
pub fn insert_entity(&mut self, id: EntityId, data: Vec<u8>, crdt_type: CrdtType) {
let metadata = EntityMetadata::new(crdt_type, 0);
self.insert_entity_with_metadata(id, data, metadata);
}
pub fn insert_entity_with_metadata(
&mut self,
id: EntityId,
data: Vec<u8>,
metadata: EntityMetadata,
) {
let storage_id = Self::entity_id_to_storage_id(id);
let storage_metadata = Self::entity_metadata_to_storage_metadata(&metadata);
self.storage.add_entity(storage_id, &data, storage_metadata);
self.entity_metadata.insert(id, metadata);
self.has_state = true;
}
pub fn insert_entity_hierarchical(
&mut self,
id: EntityId,
data: Vec<u8>,
metadata: EntityMetadata,
depth: u32,
) {
let key = id.0;
let depth = (depth as usize).min(MAX_HIERARCHICAL_DEPTH);
if self.storage.is_empty() {
self.storage.init_root();
}
let mut parent_id = self.storage.root_id();
let storage_id = Self::entity_id_to_storage_id(id);
for d in 1..=depth {
let mut intermediate_key = [0u8; 32];
intermediate_key[..d].copy_from_slice(&key[..d]);
let intermediate_id = Id::new(intermediate_key);
if intermediate_id == storage_id {
continue;
}
if !self.storage.has_entity(intermediate_id) {
self.storage.add_entity_with_parent(
intermediate_id,
parent_id,
&[], Metadata::default(),
);
}
parent_id = intermediate_id;
}
let storage_metadata = Self::entity_metadata_to_storage_metadata(&metadata);
self.storage
.add_entity_with_parent(storage_id, parent_id, &data, storage_metadata);
self.entity_metadata.insert(id, metadata);
self.has_state = true;
}
pub fn get_entity(&self, id: &EntityId) -> Option<DigestEntity> {
let metadata = self.entity_metadata.get(id)?;
let storage_id = Self::entity_id_to_storage_id(*id);
let data = self.storage.get_entity_data(storage_id)?;
Some(DigestEntity {
id: *id,
data,
metadata: metadata.clone(),
})
}
pub fn has_entity(&self, id: &EntityId) -> bool {
self.entity_metadata.contains_key(id)
}
pub fn has_storage_node(&self, id: &EntityId) -> bool {
let storage_id = Self::entity_id_to_storage_id(*id);
self.storage.has_entity(storage_id)
}
pub fn iter_entities(&self) -> impl Iterator<Item = DigestEntity> + '_ {
self.entity_metadata
.keys()
.filter_map(move |id| self.get_entity(id))
}
pub fn entity_ids(&self) -> impl Iterator<Item = EntityId> + '_ {
self.entity_metadata.keys().copied()
}
pub fn build_handshake(&mut self) -> SyncHandshake {
build_handshake(self)
}
pub fn force_protocol(&mut self, protocol: SelectedProtocol) {
self.forced_protocol = Some(protocol);
}
pub fn clear_forced_protocol(&mut self) {
self.forced_protocol = None;
}
pub fn forced_protocol(&self) -> Option<&SelectedProtocol> {
self.forced_protocol.as_ref()
}
pub fn select_protocol_for_sync(
&mut self,
remote: &SyncHandshake,
) -> (SelectedProtocol, String) {
if let Some(protocol) = &self.forced_protocol {
return (protocol.clone(), "forced for testing".to_string());
}
let local = self.build_handshake();
let selection = calimero_node_primitives::sync::protocol::select_protocol(&local, remote);
let protocol = Self::sync_protocol_to_selected(&selection.protocol);
(protocol, selection.reason.to_string())
}
pub fn sync_protocol_to_selected(protocol: &SyncProtocol) -> SelectedProtocol {
match protocol {
SyncProtocol::None => SelectedProtocol::None,
SyncProtocol::Snapshot { compressed, .. } => SelectedProtocol::Snapshot {
compressed: *compressed,
},
SyncProtocol::HashComparison { .. } => SelectedProtocol::HashComparison,
SyncProtocol::DeltaSync { missing_delta_ids } => SelectedProtocol::DeltaSync {
missing_count: missing_delta_ids.len(),
},
SyncProtocol::BloomFilter { filter_size, .. } => SelectedProtocol::BloomFilter {
filter_size: *filter_size,
},
SyncProtocol::SubtreePrefetch { .. } => SelectedProtocol::SubtreePrefetch,
SyncProtocol::LevelWise { max_depth } => SelectedProtocol::LevelWise {
max_depth: *max_depth,
},
}
}
}
impl LocalSyncState for SimNode {
fn root_hash(&self) -> [u8; 32] {
self.storage.root_hash()
}
fn entity_count(&self) -> u64 {
self.entity_count() as u64
}
fn max_depth(&self) -> u32 {
let depth = self.storage.max_depth();
if depth > 0 {
depth - 1
} else {
0
}
}
fn dag_heads(&self) -> Vec<[u8; 32]> {
self.dag_heads.iter().map(|d| d.0).collect()
}
fn has_state(&self) -> bool {
self.has_state
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_creation() {
let node = SimNode::new("alice");
assert_eq!(node.id().as_str(), "alice");
assert_eq!(node.session, 0);
assert!(node.sync_state.is_idle());
assert_eq!(node.entity_count(), 0);
assert!(!node.has_any_state());
}
#[test]
fn test_message_id_generation() {
let mut node = SimNode::new("alice");
let m1 = node.next_message_id();
let m2 = node.next_message_id();
assert_eq!(m1.sender, "alice");
assert_eq!(m1.session, 0);
assert_eq!(m1.seq, 0);
assert_eq!(m2.seq, 1);
}
#[test]
fn test_duplicate_detection() {
let mut node = SimNode::new("alice");
let msg_id = MessageId::new("bob", 0, 1);
assert!(!node.is_duplicate(&msg_id));
node.mark_processed(msg_id.clone());
assert!(node.is_duplicate(&msg_id));
}
#[test]
fn test_duplicate_detection_sender_session() {
let mut node = SimNode::new("alice");
let msg_session1 = MessageId::new("bob", 1, 0);
assert!(!node.is_duplicate(&msg_session1));
node.mark_processed(msg_session1);
let old_msg = MessageId::new("bob", 0, 5);
assert!(node.is_duplicate(&old_msg));
let msg_session1_seq1 = MessageId::new("bob", 1, 1);
assert!(!node.is_duplicate(&msg_session1_seq1));
let charlie_msg = MessageId::new("charlie", 0, 0);
assert!(!node.is_duplicate(&charlie_msg));
}
#[test]
fn test_storage_operations() {
let mut node = SimNode::new("alice");
let id = EntityId::from_u64(1);
node.insert_entity(id, vec![1, 2, 3], CrdtType::lww_register("test"));
assert!(node.has_any_state());
assert_eq!(node.entity_count(), 1);
assert!(node.has_entity(&id));
let entity = node.get_entity(&id).unwrap();
assert_eq!(entity.data, vec![1, 2, 3]);
}
#[test]
fn test_crash_restart() {
let mut node = SimNode::new("alice");
node.insert_entity(
EntityId::from_u64(1),
vec![1],
CrdtType::lww_register("test"),
);
node.sync_state = SyncState::Initiating {
peer: NodeId::new("bob"),
};
node.out_seq = 10;
node.set_timer(TimerId::new(1), SimTime::from_millis(100), TimerKind::Sync);
node.crash();
assert_eq!(node.entity_count(), 1);
assert!(node.sync_state.is_idle());
assert!(node.timers.is_empty());
assert_eq!(node.out_seq, 0);
node.restart();
assert_eq!(node.session, 1);
}
#[test]
fn test_timers() {
let mut node = SimNode::new("alice");
let t1 = node.next_timer_id();
let t2 = node.next_timer_id();
node.set_timer(t1, SimTime::from_millis(100), TimerKind::Sync);
node.set_timer(t2, SimTime::from_millis(200), TimerKind::Housekeeping);
assert_eq!(node.sync_timer_count(), 1);
assert!(node.get_timer(t1).is_some());
node.cancel_timer(t1);
assert!(node.get_timer(t1).is_none());
assert_eq!(node.sync_timer_count(), 0);
}
#[test]
fn test_delta_buffer() {
let mut node = SimNode::new("alice");
assert_eq!(node.buffer_size(), 0);
node.buffer_delta(DeltaId::from_bytes([1; 32]));
node.buffer_delta(DeltaId::from_bytes([2; 32]));
node.buffer_delta(DeltaId::from_bytes([1; 32]));
assert_eq!(node.buffer_size(), 2);
node.clear_buffer();
assert_eq!(node.buffer_size(), 0);
}
#[test]
fn test_state_digest() {
let mut node = SimNode::new("alice");
let d1 = node.state_digest();
assert_eq!(d1, StateDigest::ZERO);
node.insert_entity(
EntityId::from_u64(1),
vec![1],
CrdtType::lww_register("test"),
);
let d2 = node.state_digest();
assert_ne!(d2, StateDigest::ZERO);
assert_ne!(d2, d1);
}
#[test]
fn test_zero_capacity_drops_all() {
let mut node = SimNode::with_buffer_capacity("zero_cap", 0);
assert_eq!(node.buffer_size(), 0);
assert_eq!(node.buffer_drops(), 0);
let result = node.buffer_delta(DeltaId::from_bytes([1; 32]));
assert!(!result, "Should return false when dropped");
assert_eq!(node.buffer_size(), 0, "Buffer should remain empty");
assert_eq!(node.buffer_drops(), 1, "Should increment drops");
let result = node.buffer_delta(DeltaId::from_bytes([2; 32]));
assert!(!result);
assert_eq!(node.buffer_size(), 0);
assert_eq!(node.buffer_drops(), 2);
let result = node.buffer_delta(DeltaId::from_bytes([3; 32]));
assert!(!result);
assert_eq!(node.buffer_size(), 0);
assert_eq!(node.buffer_drops(), 3);
}
#[test]
fn test_buffer_fifo_eviction_order() {
let mut node = SimNode::with_buffer_capacity("evict_test", 3);
node.buffer_delta(DeltaId::from_bytes([1; 32]));
node.buffer_delta(DeltaId::from_bytes([2; 32]));
node.buffer_delta(DeltaId::from_bytes([3; 32]));
assert_eq!(node.buffer_size(), 3);
assert_eq!(node.buffer_drops(), 0);
let result = node.buffer_delta(DeltaId::from_bytes([4; 32]));
assert!(!result, "Should return false when eviction occurred");
assert_eq!(node.buffer_size(), 3);
assert_eq!(node.buffer_drops(), 1);
node.buffer_delta(DeltaId::from_bytes([5; 32]));
assert_eq!(node.buffer_size(), 3);
assert_eq!(node.buffer_drops(), 2);
assert!(node.delta_buffer.contains(&DeltaId::from_bytes([3; 32])));
assert!(node.delta_buffer.contains(&DeltaId::from_bytes([4; 32])));
assert!(node.delta_buffer.contains(&DeltaId::from_bytes([5; 32])));
assert!(!node.delta_buffer.contains(&DeltaId::from_bytes([1; 32])));
assert!(!node.delta_buffer.contains(&DeltaId::from_bytes([2; 32])));
}
#[test]
fn test_hierarchical_insertion_excludes_intermediate_nodes() {
let mut node = SimNode::new("alice");
let key1 = EntityId::from_bytes([
1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0,
]);
let key2 = EntityId::from_bytes([
1, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0,
0, 0, 0,
]);
let key3 = EntityId::from_bytes([
2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0,
0, 0, 0,
]);
let metadata = EntityMetadata::new(CrdtType::lww_register("test"), 0);
node.insert_entity_hierarchical(key1, vec![1], metadata.clone(), 3);
node.insert_entity_hierarchical(key2, vec![2], metadata.clone(), 3);
node.insert_entity_hierarchical(key3, vec![3], metadata.clone(), 3);
assert_eq!(node.entity_count(), 3, "Should count only real entities");
let (real, total, depth) = node.tree_stats();
assert_eq!(real, 3, "Should have 3 real entities");
assert!(
total > 3,
"Tree should have intermediate nodes: got {}",
total
);
assert!(depth > 1, "Tree should have depth > 1: got {}", depth);
let entities: Vec<_> = node.iter_entities().collect();
assert_eq!(
entities.len(),
3,
"iter_entities should return only real entities"
);
assert!(node.has_entity(&key1));
assert!(node.has_entity(&key2));
assert!(node.has_entity(&key3));
let intermediate = EntityId::from_bytes([
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
]);
assert!(
!node.has_entity(&intermediate),
"Intermediate nodes should not be 'real' entities"
);
assert!(
node.has_storage_node(&intermediate),
"Intermediate nodes should exist in storage"
);
}
#[test]
fn test_hierarchical_insertion_reuses_intermediate_nodes() {
let mut node = SimNode::new("alice");
let key1 = EntityId::from_bytes([
1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0,
]);
let key2 = EntityId::from_bytes([
1, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0,
0, 0, 0,
]);
let metadata = EntityMetadata::new(CrdtType::lww_register("test"), 0);
node.insert_entity_hierarchical(key1, vec![1], metadata.clone(), 3);
let (_, total_after_first, _) = node.tree_stats();
node.insert_entity_hierarchical(key2, vec![2], metadata.clone(), 3);
let (real, total_after_second, _) = node.tree_stats();
assert_eq!(real, 2);
assert!(
total_after_second < total_after_first + 4,
"Should reuse intermediate nodes: first={}, second={}",
total_after_first,
total_after_second
);
}
}