use calimero_primitives::crdt::CrdtType;
use super::runtime::SimDuration;
use super::types::{DeltaId, EntityId, MessageId, NodeId, TimerId, TimerKind};
#[derive(Debug, Default)]
pub struct SyncActions {
pub messages: Vec<OutgoingMessage>,
pub storage_ops: Vec<StorageOp>,
pub timer_ops: Vec<TimerOp>,
}
impl SyncActions {
pub fn new() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty() && self.storage_ops.is_empty() && self.timer_ops.is_empty()
}
pub fn send(&mut self, to: NodeId, msg: SyncMessage, msg_id: MessageId) {
self.messages.push(OutgoingMessage { to, msg, msg_id });
}
pub fn storage(&mut self, op: StorageOp) {
self.storage_ops.push(op);
}
pub fn set_timer(&mut self, id: TimerId, delay: SimDuration, kind: TimerKind) {
self.timer_ops.push(TimerOp::Set { id, delay, kind });
}
pub fn cancel_timer(&mut self, id: TimerId) {
self.timer_ops.push(TimerOp::Cancel { id });
}
pub fn merge(&mut self, other: SyncActions) {
self.messages.extend(other.messages);
self.storage_ops.extend(other.storage_ops);
self.timer_ops.extend(other.timer_ops);
}
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage {
pub to: NodeId,
pub msg: SyncMessage,
pub msg_id: MessageId,
}
#[derive(Debug, Clone)]
pub enum StorageOp {
Insert {
id: EntityId,
data: Vec<u8>,
metadata: EntityMetadata,
},
Update {
id: EntityId,
data: Vec<u8>,
metadata: EntityMetadata,
},
Remove { id: EntityId },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityMetadata {
pub crdt_type: CrdtType,
pub hlc_timestamp: u64,
pub version: u32,
pub collection_id: [u8; 32],
}
impl Default for EntityMetadata {
fn default() -> Self {
Self {
crdt_type: CrdtType::lww_register("test"),
hlc_timestamp: 0,
version: 1,
collection_id: [0; 32],
}
}
}
impl EntityMetadata {
pub fn new(crdt_type: CrdtType, hlc_timestamp: u64) -> Self {
Self {
crdt_type,
hlc_timestamp,
version: 1,
collection_id: [0; 32],
}
}
}
#[derive(Debug, Clone)]
pub enum TimerOp {
Set {
id: TimerId,
delay: SimDuration,
kind: TimerKind,
},
Cancel { id: TimerId },
}
#[derive(Debug, Clone)]
pub enum SyncMessage {
Handshake(HandshakeRequest),
HandshakeResponse(HandshakeResponse),
SnapshotRequest { page: u32 },
SnapshotPage {
page: u32,
entities: Vec<EntityTransfer>,
has_more: bool,
total_pages: u32,
},
SnapshotComplete {
success: bool,
error: Option<String>,
},
HashCompareRequest { node_hash: [u8; 32], level: u32 },
HashCompareResponse {
node_hash: [u8; 32],
children: Vec<([u8; 32], bool)>, has_more: bool,
},
EntityRequest { ids: Vec<EntityId> },
EntityResponse { entities: Vec<EntityTransfer> },
DeltaHeads { heads: Vec<DeltaId> },
DeltaRequest { ids: Vec<DeltaId> },
DeltaResponse { deltas: Vec<DeltaTransfer> },
SyncComplete { success: bool },
SyncError { code: u32, message: String },
}
impl SyncMessage {
pub fn estimated_size(&self) -> usize {
use std::mem::size_of;
let base = size_of::<Self>();
let heap_size = match self {
SyncMessage::Handshake(req) => req.dag_heads.len() * size_of::<DeltaId>(),
SyncMessage::HandshakeResponse(resp) => resp.reason.len(),
SyncMessage::SnapshotRequest { .. } => 0,
SyncMessage::SnapshotPage { entities, .. } => entities
.iter()
.map(|e| e.data.len() + size_of::<EntityTransfer>())
.sum(),
SyncMessage::SnapshotComplete { error, .. } => {
error.as_ref().map(|s| s.len()).unwrap_or(0)
}
SyncMessage::HashCompareRequest { .. } => 0,
SyncMessage::HashCompareResponse { children, .. } => {
children.len() * size_of::<([u8; 32], bool)>()
}
SyncMessage::EntityRequest { ids } => ids.len() * size_of::<EntityId>(),
SyncMessage::EntityResponse { entities } => entities
.iter()
.map(|e| e.data.len() + size_of::<EntityTransfer>())
.sum(),
SyncMessage::DeltaHeads { heads } => heads.len() * size_of::<DeltaId>(),
SyncMessage::DeltaRequest { ids } => ids.len() * size_of::<DeltaId>(),
SyncMessage::DeltaResponse { deltas } => deltas
.iter()
.map(|d| {
d.parents.len() * size_of::<DeltaId>()
+ d.operations
.iter()
.map(|op| match op {
StorageOp::Insert { data, .. } | StorageOp::Update { data, .. } => {
data.len() + size_of::<StorageOp>()
}
StorageOp::Remove { .. } => size_of::<StorageOp>(),
})
.sum::<usize>()
+ size_of::<DeltaTransfer>()
})
.sum(),
SyncMessage::SyncComplete { .. } => 0,
SyncMessage::SyncError { message, .. } => message.len(),
};
base + heap_size
}
}
#[derive(Debug, Clone)]
pub struct HandshakeRequest {
pub version: u32,
pub root_hash: [u8; 32],
pub entity_count: u64,
pub max_depth: u32,
pub dag_heads: Vec<DeltaId>,
pub has_state: bool,
}
#[derive(Debug, Clone)]
pub struct HandshakeResponse {
pub protocol: SelectedProtocol,
pub root_hash: [u8; 32],
pub entity_count: u64,
pub reason: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SelectedProtocol {
None,
Snapshot { compressed: bool },
HashComparison,
DeltaSync { missing_count: usize },
BloomFilter { filter_size: u64 },
SubtreePrefetch,
LevelWise { max_depth: u32 },
}
#[derive(Debug, Clone)]
pub struct EntityTransfer {
pub id: EntityId,
pub data: Vec<u8>,
pub metadata: EntityMetadata,
}
#[derive(Debug, Clone)]
pub struct DeltaTransfer {
pub id: DeltaId,
pub parents: Vec<DeltaId>,
pub operations: Vec<StorageOp>,
pub hlc_timestamp: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_actions_empty() {
let actions = SyncActions::new();
assert!(actions.is_empty());
}
#[test]
fn test_sync_actions_send() {
let mut actions = SyncActions::new();
actions.send(
NodeId::new("bob"),
SyncMessage::SyncComplete { success: true },
MessageId::new("alice", 1, 1),
);
assert!(!actions.is_empty());
assert_eq!(actions.messages.len(), 1);
assert_eq!(actions.messages[0].to.as_str(), "bob");
}
#[test]
fn test_sync_actions_storage() {
let mut actions = SyncActions::new();
actions.storage(StorageOp::Insert {
id: EntityId::from_u64(1),
data: vec![1, 2, 3],
metadata: EntityMetadata::default(),
});
assert!(!actions.is_empty());
assert_eq!(actions.storage_ops.len(), 1);
}
#[test]
fn test_sync_actions_timer() {
let mut actions = SyncActions::new();
actions.set_timer(
TimerId::new(1),
SimDuration::from_millis(100),
TimerKind::Sync,
);
actions.cancel_timer(TimerId::new(2));
assert!(!actions.is_empty());
assert_eq!(actions.timer_ops.len(), 2);
}
#[test]
fn test_sync_actions_merge() {
let mut a = SyncActions::new();
a.send(
NodeId::new("bob"),
SyncMessage::SyncComplete { success: true },
MessageId::new("alice", 1, 1),
);
let mut b = SyncActions::new();
b.storage(StorageOp::Remove {
id: EntityId::from_u64(1),
});
a.merge(b);
assert_eq!(a.messages.len(), 1);
assert_eq!(a.storage_ops.len(), 1);
}
}