use super::*;
use crate::message::NodeEvent;
use crate::node::OpManager;
use crate::wasm_runtime::{InMemoryContractStore, MockStateStorage, StateStorage};
use dashmap::DashSet;
use std::sync::Arc;
static CRDT_CONTRACTS: std::sync::LazyLock<DashSet<ContractInstanceId>> =
std::sync::LazyLock::new(DashSet::new);
pub fn register_crdt_contract(contract_id: ContractInstanceId) {
CRDT_CONTRACTS.insert(contract_id);
tracing::debug!(contract = %contract_id, "Registered contract for CRDT emulation mode");
}
pub fn is_crdt_contract(contract_id: &ContractInstanceId) -> bool {
CRDT_CONTRACTS.contains(contract_id)
}
pub fn clear_crdt_contracts() {
CRDT_CONTRACTS.clear();
}
mod crdt_encoding {
use super::*;
pub const MIN_STATE_SIZE: usize = 8;
pub fn encode_state(version: u64, data: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(8 + data.len());
result.extend_from_slice(&version.to_le_bytes());
result.extend_from_slice(data);
result
}
pub fn decode_state(state: &[u8]) -> Option<(u64, &[u8])> {
if state.len() < MIN_STATE_SIZE {
return None;
}
let version = u64::from_le_bytes(state[0..8].try_into().ok()?);
let data = &state[8..];
Some((version, data))
}
pub fn get_version(state: &[u8]) -> Option<u64> {
if state.len() < MIN_STATE_SIZE {
return None;
}
Some(u64::from_le_bytes(state[0..8].try_into().ok()?))
}
pub fn create_summary(state: &[u8]) -> Option<Vec<u8>> {
let (version, data) = decode_state(state)?;
let hash = blake3::hash(data);
let mut summary = Vec::with_capacity(40);
summary.extend_from_slice(&version.to_le_bytes());
summary.extend_from_slice(hash.as_bytes());
Some(summary)
}
pub fn summary_version(summary: &[u8]) -> Option<u64> {
if summary.len() < 8 {
return None;
}
Some(u64::from_le_bytes(summary[0..8].try_into().ok()?))
}
pub fn create_delta(from_version: u64, to_version: u64, new_data: &[u8]) -> Vec<u8> {
let mut delta = Vec::with_capacity(16 + new_data.len());
delta.extend_from_slice(&from_version.to_le_bytes());
delta.extend_from_slice(&to_version.to_le_bytes());
delta.extend_from_slice(new_data);
delta
}
pub fn decode_delta(delta: &[u8]) -> Option<(u64, u64, &[u8])> {
if delta.len() < 16 {
return None;
}
let from_version = u64::from_le_bytes(delta[0..8].try_into().ok()?);
let to_version = u64::from_le_bytes(delta[8..16].try_into().ok()?);
let new_data = &delta[16..];
Some((from_version, to_version, new_data))
}
}
pub(crate) struct MockRuntime {
pub contract_store: InMemoryContractStore,
}
impl Executor<MockRuntime, Storage> {
pub async fn new_mock(
identifier: &str,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
) -> anyhow::Result<Self> {
let data_dir = Self::test_data_dir(identifier);
let contract_store = InMemoryContractStore::new();
tracing::debug!("creating state store at path: {data_dir:?}");
std::fs::create_dir_all(&data_dir).expect("directory created");
let state_store = StateStore::new(Storage::new(&data_dir).await?, u16::MAX as u32).unwrap();
tracing::debug!("state store created");
let executor = Executor::new(
state_store,
|| Ok(()),
OperationMode::Local,
MockRuntime { contract_store },
op_sender,
op_manager,
)
.await?;
Ok(executor)
}
}
impl Executor<MockRuntime, MockStateStorage> {
pub async fn new_mock_in_memory(
_identifier: &str,
shared_storage: MockStateStorage,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
) -> anyhow::Result<Self> {
let contract_store = InMemoryContractStore::new();
let state_store = StateStore::new_uncached(shared_storage);
tracing::debug!("created fully in-memory uncached executor for deterministic simulation");
let executor = Executor::new(
state_store,
|| Ok(()),
OperationMode::Local,
MockRuntime { contract_store },
op_sender,
op_manager,
)
.await?;
Ok(executor)
}
}
impl<S> Executor<MockRuntime, S>
where
S: StateStorage + Send + Sync + 'static,
<S as StateStorage>::Error: Into<anyhow::Error>,
{
pub async fn handle_request(
&mut self,
_id: ClientId,
_req: ClientRequest<'_>,
_updates: Option<mpsc::Sender<Result<HostResponse, WsClientError>>>,
) -> Response {
unreachable!("MockRuntime does not handle client requests directly")
}
async fn broadcast_state_change(&self, key: ContractKey, state: &WrappedState) {
if let Some(op_manager) = &self.op_manager {
tracing::debug!(
contract = %key,
state_size = state.size(),
"MockRuntime: Emitting BroadcastStateChange"
);
if let Err(err) = op_manager
.notify_node_event(NodeEvent::BroadcastStateChange {
key,
new_state: state.clone(),
})
.await
{
tracing::warn!(
contract = %key,
error = %err,
"MockRuntime: Failed to broadcast state change"
);
}
}
}
async fn apply_crdt_full_state(
&mut self,
key: &ContractKey,
current_state: &WrappedState,
incoming_state: &WrappedState,
) -> Result<UpsertResult, ExecutorError> {
let (current_version, current_data) = crdt_encoding::decode_state(current_state.as_ref())
.ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("Invalid CRDT state format (current)"))
})?;
let (incoming_version, incoming_data) =
crdt_encoding::decode_state(incoming_state.as_ref()).ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("Invalid CRDT state format (incoming)"))
})?;
tracing::debug!(
contract = %key,
current_version,
incoming_version,
"CRDT mode: applying full state with LWW semantics"
);
let should_update = if incoming_version > current_version {
true
} else if incoming_version == current_version {
let incoming_hash = blake3::hash(incoming_data);
let current_hash = blake3::hash(current_data);
incoming_hash.as_bytes() > current_hash.as_bytes()
} else {
false
};
let result = if should_update {
self.state_store
.update(key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(incoming_state.clone()))
} else if incoming_version == current_version
&& incoming_state.as_ref() == current_state.as_ref()
{
Ok(UpsertResult::NoChange)
} else {
Ok(UpsertResult::CurrentWon(current_state.clone()))
};
if let Ok(ref upsert_result) = result {
match upsert_result {
UpsertResult::Updated(state) | UpsertResult::CurrentWon(state) => {
self.broadcast_state_change(*key, state).await;
}
UpsertResult::NoChange => {}
}
}
result
}
async fn apply_crdt_delta(
&mut self,
key: &ContractKey,
current_state: &WrappedState,
delta: &StateDelta<'_>,
_has_contract_code: bool,
) -> Result<UpsertResult, ExecutorError> {
let (_from_version, to_version, new_data) = crdt_encoding::decode_delta(delta.as_ref())
.ok_or_else(|| ExecutorError::other(anyhow::anyhow!("Invalid CRDT delta format")))?;
let (current_version, current_data) = crdt_encoding::decode_state(current_state.as_ref())
.ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("Invalid CRDT state format"))
})?;
tracing::debug!(
contract = %key,
current_version = current_version,
delta_to_version = to_version,
"CRDT mode: applying delta with LWW semantics"
);
let should_update = if to_version > current_version {
tracing::debug!(
contract = %key,
"CRDT: incoming version {} > current version {} - accepting",
to_version, current_version
);
true
} else if to_version == current_version {
let incoming_hash = blake3::hash(new_data);
let current_hash = blake3::hash(current_data);
let accept = incoming_hash.as_bytes() > current_hash.as_bytes();
tracing::debug!(
contract = %key,
"CRDT: equal versions, hash tiebreaker - {}",
if accept { "accepting incoming" } else { "keeping current" }
);
accept
} else {
tracing::debug!(
contract = %key,
"CRDT: incoming version {} < current version {} - rejecting",
to_version, current_version
);
false
};
let result = if should_update {
let new_state_bytes = crdt_encoding::encode_state(to_version, new_data);
let new_state = WrappedState::new(new_state_bytes);
self.state_store
.update(key, new_state.clone())
.await
.map_err(ExecutorError::other)?;
tracing::debug!(
contract = %key,
old_version = current_version,
new_version = to_version,
"CRDT mode: delta applied successfully"
);
Ok(UpsertResult::Updated(new_state))
} else {
Ok(UpsertResult::NoChange)
};
if let Ok(UpsertResult::Updated(ref state)) = result {
self.broadcast_state_change(*key, state).await;
}
result
}
}
impl<S> ContractExecutor for Executor<MockRuntime, S>
where
S: StateStorage + Send + Sync + 'static,
<S as StateStorage>::Error: Into<anyhow::Error>,
{
fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey> {
let code_hash = self.runtime.contract_store.code_hash_from_id(instance_id)?;
Some(ContractKey::from_id_and_code(*instance_id, code_hash))
}
async fn fetch_contract(
&mut self,
key: ContractKey,
return_contract_code: bool,
) -> Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError> {
let Some(parameters) = self
.state_store
.get_params(&key)
.await
.map_err(ExecutorError::other)?
else {
return Err(ExecutorError::other(anyhow::anyhow!(
"missing state and/or parameters for contract {key}"
)));
};
let contract = if return_contract_code {
self.runtime
.contract_store
.fetch_contract(&key, ¶meters)
} else {
None
};
let Ok(state) = self.state_store.get(&key).await else {
return Err(ExecutorError::other(anyhow::anyhow!(
"missing state for contract {key}"
)));
};
Ok((Some(state), contract))
}
async fn upsert_contract_state(
&mut self,
key: ContractKey,
state: Either<WrappedState, StateDelta<'static>>,
_related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<UpsertResult, ExecutorError> {
let result = match (state, code) {
(Either::Left(incoming_state), Some(contract)) => {
self.runtime
.contract_store
.store_contract(contract.clone())
.map_err(ExecutorError::other)?;
match self.state_store.get(&key).await {
Ok(current_state) => {
if is_crdt_contract(key.id()) {
return self
.apply_crdt_full_state(&key, ¤t_state, &incoming_state)
.await;
}
let incoming_hash = blake3::hash(incoming_state.as_ref());
let current_hash = blake3::hash(current_state.as_ref());
if incoming_hash.as_bytes() > current_hash.as_bytes() {
self.state_store
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(incoming_state))
} else if incoming_hash.as_bytes() == current_hash.as_bytes() {
Ok(UpsertResult::NoChange)
} else {
Ok(UpsertResult::CurrentWon(current_state))
}
}
Err(_) => {
self.state_store
.store(key, incoming_state.clone(), contract.params().into_owned())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(incoming_state))
}
}
}
(Either::Left(incoming_state), None) => {
match self.state_store.get(&key).await {
Ok(current_state) => {
if is_crdt_contract(key.id()) {
return self
.apply_crdt_full_state(&key, ¤t_state, &incoming_state)
.await;
}
let incoming_hash = blake3::hash(incoming_state.as_ref());
let current_hash = blake3::hash(current_state.as_ref());
if incoming_hash.as_bytes() > current_hash.as_bytes() {
self.state_store
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(incoming_state))
} else if incoming_hash.as_bytes() == current_hash.as_bytes() {
Ok(UpsertResult::NoChange)
} else {
Ok(UpsertResult::CurrentWon(current_state))
}
}
Err(_) => {
tracing::warn!(
contract = %key,
"Update received for non-existent contract state"
);
Err(ExecutorError::request(StdContractError::MissingContract {
key: key.into(),
}))
}
}
}
(Either::Right(delta), Some(contract)) => {
self.runtime
.contract_store
.store_contract(contract.clone())
.map_err(ExecutorError::other)?;
match self.state_store.get(&key).await {
Ok(current_state) => {
if is_crdt_contract(key.id()) {
return self
.apply_crdt_delta(&key, ¤t_state, &delta, true)
.await;
}
let mut new_state_bytes = current_state.as_ref().to_vec();
new_state_bytes.extend_from_slice(delta.as_ref());
let new_state = WrappedState::new(new_state_bytes);
let new_hash = blake3::hash(new_state.as_ref());
let current_hash = blake3::hash(current_state.as_ref());
if new_hash.as_bytes() > current_hash.as_bytes() {
self.state_store
.update(&key, new_state.clone())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(new_state))
} else {
Ok(UpsertResult::NoChange)
}
}
Err(_) => {
let initial_state = WrappedState::new(delta.as_ref().to_vec());
self.state_store
.store(key, initial_state.clone(), contract.params().into_owned())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(initial_state))
}
}
}
(Either::Right(delta), None) => {
match self.state_store.get(&key).await {
Ok(current_state) => {
if is_crdt_contract(key.id()) {
return self
.apply_crdt_delta(&key, ¤t_state, &delta, false)
.await;
}
let mut new_state_bytes = current_state.as_ref().to_vec();
new_state_bytes.extend_from_slice(delta.as_ref());
let new_state = WrappedState::new(new_state_bytes);
let new_hash = blake3::hash(new_state.as_ref());
let current_hash = blake3::hash(current_state.as_ref());
if new_hash.as_bytes() > current_hash.as_bytes() {
self.state_store
.update(&key, new_state.clone())
.await
.map_err(ExecutorError::other)?;
Ok(UpsertResult::Updated(new_state))
} else {
Ok(UpsertResult::NoChange)
}
}
Err(_) => {
tracing::warn!(
contract = %key,
"Delta received for non-existent contract state"
);
Err(ExecutorError::request(StdContractError::MissingContract {
key: key.into(),
}))
}
}
}
};
if let Ok(ref upsert_result) = result {
match upsert_result {
UpsertResult::Updated(state) | UpsertResult::CurrentWon(state) => {
self.broadcast_state_change(key, state).await;
}
UpsertResult::NoChange => {}
}
}
result
}
fn register_contract_notifier(
&mut self,
_key: ContractInstanceId,
_cli_id: ClientId,
_notification_ch: tokio::sync::mpsc::Sender<HostResult>,
_summary: Option<StateSummary<'_>>,
) -> Result<(), Box<RequestError>> {
Ok(())
}
async fn execute_delegate_request(
&mut self,
_req: DelegateRequest<'_>,
_origin_contract: Option<&ContractInstanceId>,
_caller_delegate: Option<&DelegateKey>,
) -> Response {
Err(ExecutorError::other(anyhow::anyhow!(
"not supported in mock runtime"
)))
}
fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
vec![] }
fn notify_subscription_error(&self, _key: ContractInstanceId, _reason: String) {
}
async fn summarize_contract_state(
&mut self,
key: ContractKey,
) -> Result<StateSummary<'static>, ExecutorError> {
let state = self
.state_store
.get(&key)
.await
.map_err(ExecutorError::other)?;
if is_crdt_contract(key.id()) {
if let Some(summary) = crdt_encoding::create_summary(state.as_ref()) {
tracing::trace!(
contract = %key,
version = crdt_encoding::get_version(state.as_ref()),
"CRDT mode: created versioned summary"
);
return Ok(StateSummary::from(summary));
}
tracing::warn!(
contract = %key,
"CRDT mode: invalid state format, falling back to hash summary"
);
}
let hash = blake3::hash(state.as_ref());
Ok(StateSummary::from(hash.as_bytes().to_vec()))
}
async fn get_contract_state_delta(
&mut self,
key: ContractKey,
their_summary: StateSummary<'static>,
) -> Result<StateDelta<'static>, ExecutorError> {
if is_crdt_contract(key.id()) {
let state = self
.state_store
.get(&key)
.await
.map_err(ExecutorError::other)?;
let their_version =
crdt_encoding::summary_version(their_summary.as_ref()).ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("Invalid CRDT summary format"))
})?;
let (our_version, our_data) =
crdt_encoding::decode_state(state.as_ref()).ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("Invalid CRDT state format"))
})?;
tracing::trace!(
contract = %key,
their_version = their_version,
our_version = our_version,
"CRDT mode: computing delta"
);
let delta = crdt_encoding::create_delta(their_version, our_version, our_data);
return Ok(StateDelta::from(delta));
}
Err(ExecutorError::other(anyhow::anyhow!(
"MockRuntime cannot compute deltas - use full state sync"
)))
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn local_node_handle() -> Result<(), Box<dyn std::error::Error>> {
const MAX_MEM_CACHE: u32 = 10_000_000;
let tmp_dir = tempfile::tempdir()?;
let state_store_path = tmp_dir.path().join("state_store");
std::fs::create_dir_all(&state_store_path)?;
let contract_store = InMemoryContractStore::new();
let state_store =
StateStore::new(Storage::new(&state_store_path).await?, MAX_MEM_CACHE).unwrap();
let mut counter = 0;
Executor::new(
state_store,
|| {
counter += 1;
Ok(())
},
OperationMode::Local,
MockRuntime { contract_store },
None,
None,
)
.await
.expect("local node with handle");
assert_eq!(counter, 1);
Ok(())
}
async fn create_test_executor() -> Executor<MockRuntime, MockStateStorage> {
let shared_storage = MockStateStorage::new();
Executor::new_mock_in_memory("test", shared_storage, None, None)
.await
.expect("create test executor")
}
pub(crate) fn create_test_contract(code_bytes: &[u8]) -> ContractContainer {
use freenet_stdlib::prelude::*;
let code = ContractCode::from(code_bytes.to_vec());
let params = Parameters::from(vec![]);
ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(code),
params,
)))
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_merge_larger_hash_wins() {
let mut executor = create_test_executor().await;
let state_a = WrappedState::new(vec![1, 2, 3]);
let state_b = WrappedState::new(vec![4, 5, 6]);
let hash_a = blake3::hash(state_a.as_ref());
let hash_b = blake3::hash(state_b.as_ref());
let (smaller_state, larger_state) = if hash_a.as_bytes() < hash_b.as_bytes() {
(state_a, state_b)
} else {
(state_b, state_a)
};
let contract = create_test_contract(b"test_contract_code_1");
let key = contract.key();
let result = executor
.upsert_contract_state(
key,
Either::Left(smaller_state.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.expect("initial store should succeed");
assert!(matches!(result, UpsertResult::Updated(_)));
let result = executor
.upsert_contract_state(
key,
Either::Left(larger_state.clone()),
RelatedContracts::default(),
None,
)
.await
.expect("update with larger hash should succeed");
assert!(
matches!(result, UpsertResult::Updated(_)),
"Larger hash should win and update"
);
let (stored, _) = executor
.fetch_contract(key, false)
.await
.expect("fetch should succeed");
assert_eq!(
stored.unwrap().as_ref(),
larger_state.as_ref(),
"Stored state should be the larger-hash state"
);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_merge_smaller_hash_rejected() {
let mut executor = create_test_executor().await;
let state_a = WrappedState::new(vec![1, 2, 3]);
let state_b = WrappedState::new(vec![4, 5, 6]);
let hash_a = blake3::hash(state_a.as_ref());
let hash_b = blake3::hash(state_b.as_ref());
let (smaller_state, larger_state) = if hash_a.as_bytes() < hash_b.as_bytes() {
(state_a, state_b)
} else {
(state_b, state_a)
};
let contract = create_test_contract(b"test_contract_code_2");
let key = contract.key();
let result = executor
.upsert_contract_state(
key,
Either::Left(larger_state.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.expect("initial store should succeed");
assert!(matches!(result, UpsertResult::Updated(_)));
let result = executor
.upsert_contract_state(
key,
Either::Left(smaller_state.clone()),
RelatedContracts::default(),
None,
)
.await
.expect("update attempt should not error");
assert!(
matches!(result, UpsertResult::CurrentWon(_)),
"Smaller hash should be rejected (CurrentWon indicating local state won)"
);
let (stored, _) = executor
.fetch_contract(key, false)
.await
.expect("fetch should succeed");
assert_eq!(
stored.unwrap().as_ref(),
larger_state.as_ref(),
"Stored state should still be the larger-hash state"
);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_merge_equal_hash_no_change() {
let mut executor = create_test_executor().await;
let state = WrappedState::new(vec![1, 2, 3, 4, 5]);
let contract = create_test_contract(b"test_contract_code_3");
let key = contract.key();
let result = executor
.upsert_contract_state(
key,
Either::Left(state.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.expect("initial store should succeed");
assert!(matches!(result, UpsertResult::Updated(_)));
let result = executor
.upsert_contract_state(
key,
Either::Left(state.clone()),
RelatedContracts::default(),
None,
)
.await
.expect("update attempt should not error");
assert!(
matches!(result, UpsertResult::NoChange),
"Same state should result in NoChange"
);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_merge_peers_converge() {
let state_1 = WrappedState::new(vec![1, 1, 1]);
let state_2 = WrappedState::new(vec![2, 2, 2]);
let state_3 = WrappedState::new(vec![3, 3, 3]);
let states = [
(state_1.clone(), blake3::hash(state_1.as_ref())),
(state_2.clone(), blake3::hash(state_2.as_ref())),
(state_3.clone(), blake3::hash(state_3.as_ref())),
];
let winner = states
.iter()
.max_by_key(|(_, hash)| hash.as_bytes())
.map(|(state, _)| state.clone())
.unwrap();
let mut peer_a = create_test_executor().await;
let contract = create_test_contract(b"convergence_test_contract");
let key = contract.key();
peer_a
.upsert_contract_state(
key,
Either::Left(state_1.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_a
.upsert_contract_state(
key,
Either::Left(state_2.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_a
.upsert_contract_state(
key,
Either::Left(state_3.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let mut peer_b = create_test_executor().await;
peer_b
.upsert_contract_state(
key,
Either::Left(state_3.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_b
.upsert_contract_state(
key,
Either::Left(state_1.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_b
.upsert_contract_state(
key,
Either::Left(state_2.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let mut peer_c = create_test_executor().await;
peer_c
.upsert_contract_state(
key,
Either::Left(state_2.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_c
.upsert_contract_state(
key,
Either::Left(state_3.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_c
.upsert_contract_state(
key,
Either::Left(state_1.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let (state_a, _) = peer_a.fetch_contract(key, false).await.unwrap();
let (state_b, _) = peer_b.fetch_contract(key, false).await.unwrap();
let (state_c, _) = peer_c.fetch_contract(key, false).await.unwrap();
let state_a = state_a.expect("peer A should have state");
let state_b = state_b.expect("peer B should have state");
let state_c = state_c.expect("peer C should have state");
assert_eq!(
state_a.as_ref(),
winner.as_ref(),
"Peer A should converge to winner"
);
assert_eq!(
state_b.as_ref(),
winner.as_ref(),
"Peer B should converge to winner"
);
assert_eq!(
state_c.as_ref(),
winner.as_ref(),
"Peer C should converge to winner"
);
assert_eq!(
state_a.as_ref(),
state_b.as_ref(),
"Peer A and B should have same state"
);
assert_eq!(
state_b.as_ref(),
state_c.as_ref(),
"Peer B and C should have same state"
);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_merge_delayed_broadcast_rejected() {
let mut executor = create_test_executor().await;
let mut s1 = WrappedState::new(vec![1, 0, 0, 0]);
let mut s3 = WrappedState::new(vec![3, 0, 0, 0]);
let hash_s1 = blake3::hash(s1.as_ref());
let hash_s3 = blake3::hash(s3.as_ref());
if hash_s1.as_bytes() > hash_s3.as_bytes() {
std::mem::swap(&mut s1, &mut s3);
}
let contract = create_test_contract(b"delayed_broadcast_test");
let key = contract.key();
executor
.upsert_contract_state(
key,
Either::Left(s3.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let result = executor
.upsert_contract_state(
key,
Either::Left(s1.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
assert!(
matches!(result, UpsertResult::CurrentWon(_)),
"Delayed old broadcast should be rejected (CurrentWon indicating local state won)"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
assert_eq!(
stored.unwrap().as_ref(),
s3.as_ref(),
"Newer state S3 should still be stored"
);
}
fn create_crdt_state(version: u64, data: &[u8]) -> WrappedState {
WrappedState::new(crdt_encoding::encode_state(version, data))
}
fn create_crdt_delta(from_version: u64, to_version: u64, data: &[u8]) -> StateDelta<'static> {
StateDelta::from(crdt_encoding::create_delta(from_version, to_version, data))
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_higher_version_wins() {
let mut executor = create_test_executor().await;
let contract = create_test_contract(b"crdt_emulation_test_1");
let key = contract.key();
register_crdt_contract(*key.id());
let initial_state = create_crdt_state(1, b"initial data");
executor
.upsert_contract_state(
key,
Either::Left(initial_state),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let delta = create_crdt_delta(1, 2, b"updated data v2");
let result = executor
.upsert_contract_state(key, Either::Right(delta), RelatedContracts::default(), None)
.await
.unwrap();
assert!(
matches!(result, UpsertResult::Updated(_)),
"Higher version delta should be accepted"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
let stored = stored.unwrap();
let (version, data) = crdt_encoding::decode_state(stored.as_ref()).unwrap();
assert_eq!(version, 2);
assert_eq!(data, b"updated data v2");
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_lower_version_rejected() {
let mut executor = create_test_executor().await;
let contract = create_test_contract(b"crdt_emulation_test_2");
let key = contract.key();
register_crdt_contract(*key.id());
let initial_state = create_crdt_state(5, b"version 5 data");
executor
.upsert_contract_state(
key,
Either::Left(initial_state),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let delta = create_crdt_delta(2, 3, b"old data v3");
let result = executor
.upsert_contract_state(key, Either::Right(delta), RelatedContracts::default(), None)
.await
.unwrap();
assert!(
matches!(result, UpsertResult::NoChange),
"Lower version delta should be rejected with NoChange"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
let stored = stored.unwrap();
let (version, data) = crdt_encoding::decode_state(stored.as_ref()).unwrap();
assert_eq!(version, 5);
assert_eq!(data, b"version 5 data");
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_equal_version_hash_tiebreaker() {
let mut executor = create_test_executor().await;
let contract = create_test_contract(b"crdt_emulation_test_3");
let key = contract.key();
register_crdt_contract(*key.id());
let data_a = b"aaaa";
let data_b = b"bbbb";
let hash_a = blake3::hash(data_a);
let hash_b = blake3::hash(data_b);
let (smaller_data, larger_data) = if hash_a.as_bytes() < hash_b.as_bytes() {
(data_a.as_slice(), data_b.as_slice())
} else {
(data_b.as_slice(), data_a.as_slice())
};
let initial_state = create_crdt_state(5, smaller_data);
executor
.upsert_contract_state(
key,
Either::Left(initial_state),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let delta = create_crdt_delta(5, 5, larger_data);
let result = executor
.upsert_contract_state(key, Either::Right(delta), RelatedContracts::default(), None)
.await
.unwrap();
assert!(
matches!(result, UpsertResult::Updated(_)),
"Equal version with larger hash should win"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
let stored = stored.unwrap();
let (version, data) = crdt_encoding::decode_state(stored.as_ref()).unwrap();
assert_eq!(version, 5);
assert_eq!(data, larger_data);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_equal_version_smaller_hash_rejected() {
let mut executor = create_test_executor().await;
let contract = create_test_contract(b"crdt_emulation_test_4");
let key = contract.key();
register_crdt_contract(*key.id());
let data_a = b"aaaa";
let data_b = b"bbbb";
let hash_a = blake3::hash(data_a);
let hash_b = blake3::hash(data_b);
let (smaller_data, larger_data) = if hash_a.as_bytes() < hash_b.as_bytes() {
(data_a.as_slice(), data_b.as_slice())
} else {
(data_b.as_slice(), data_a.as_slice())
};
let initial_state = create_crdt_state(5, larger_data);
executor
.upsert_contract_state(
key,
Either::Left(initial_state),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let delta = create_crdt_delta(5, 5, smaller_data);
let result = executor
.upsert_contract_state(key, Either::Right(delta), RelatedContracts::default(), None)
.await
.unwrap();
assert!(
matches!(result, UpsertResult::NoChange),
"Equal version with smaller hash should be rejected"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
let stored = stored.unwrap();
let (version, data) = crdt_encoding::decode_state(stored.as_ref()).unwrap();
assert_eq!(version, 5);
assert_eq!(data, larger_data);
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_convergence_any_order() {
let contract = create_test_contract(b"crdt_convergence_test");
let key = contract.key();
let contract_id = *key.id();
register_crdt_contract(contract_id);
let delta_1_to_2 = create_crdt_delta(1, 2, b"data at version 2");
let delta_2_to_3 = create_crdt_delta(2, 3, b"data at version 3");
let delta_1_to_2_late = create_crdt_delta(1, 2, b"late update to v2");
let initial = create_crdt_state(1, b"initial");
let mut peer_a = create_test_executor().await;
peer_a
.upsert_contract_state(
key,
Either::Left(initial.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_a
.upsert_contract_state(
key,
Either::Right(delta_1_to_2.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_a
.upsert_contract_state(
key,
Either::Right(delta_2_to_3.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_a
.upsert_contract_state(
key,
Either::Right(delta_1_to_2_late.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let mut peer_b = create_test_executor().await;
peer_b
.upsert_contract_state(
key,
Either::Left(initial.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_b
.upsert_contract_state(
key,
Either::Right(delta_2_to_3.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_b
.upsert_contract_state(
key,
Either::Right(delta_1_to_2.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_b
.upsert_contract_state(
key,
Either::Right(delta_1_to_2_late.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let mut peer_c = create_test_executor().await;
peer_c
.upsert_contract_state(
key,
Either::Left(initial.clone()),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
peer_c
.upsert_contract_state(
key,
Either::Right(delta_1_to_2_late.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_c
.upsert_contract_state(
key,
Either::Right(delta_2_to_3.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
peer_c
.upsert_contract_state(
key,
Either::Right(delta_1_to_2.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
let (state_a, _) = peer_a.fetch_contract(key, false).await.unwrap();
let (state_b, _) = peer_b.fetch_contract(key, false).await.unwrap();
let (state_c, _) = peer_c.fetch_contract(key, false).await.unwrap();
let state_a = state_a.unwrap();
let state_b = state_b.unwrap();
let state_c = state_c.unwrap();
let (ver_a, data_a) = crdt_encoding::decode_state(state_a.as_ref()).unwrap();
let (ver_b, data_b) = crdt_encoding::decode_state(state_b.as_ref()).unwrap();
let (ver_c, data_c) = crdt_encoding::decode_state(state_c.as_ref()).unwrap();
assert_eq!(ver_a, 3, "Peer A should be at version 3");
assert_eq!(ver_b, 3, "Peer B should be at version 3");
assert_eq!(ver_c, 3, "Peer C should be at version 3");
assert_eq!(data_a, data_b, "Peer A and B should have same data");
assert_eq!(data_b, data_c, "Peer B and C should have same data");
assert_eq!(data_a, b"data at version 3", "All should have v3 data");
}
#[tokio::test(flavor = "current_thread")]
async fn crdt_emulation_idempotent() {
let mut executor = create_test_executor().await;
let contract = create_test_contract(b"crdt_idempotent_test");
let key = contract.key();
register_crdt_contract(*key.id());
let initial = create_crdt_state(1, b"initial");
executor
.upsert_contract_state(
key,
Either::Left(initial),
RelatedContracts::default(),
Some(contract.clone()),
)
.await
.unwrap();
let delta = create_crdt_delta(1, 2, b"version 2 data");
let result1 = executor
.upsert_contract_state(
key,
Either::Right(delta.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
assert!(matches!(result1, UpsertResult::Updated(_)));
let result2 = executor
.upsert_contract_state(
key,
Either::Right(delta.clone()),
RelatedContracts::default(),
None,
)
.await
.unwrap();
assert!(
matches!(result2, UpsertResult::NoChange),
"Applying same delta twice should be idempotent (NoChange)"
);
let (stored, _) = executor.fetch_contract(key, false).await.unwrap();
let stored = stored.unwrap();
let (version, data) = crdt_encoding::decode_state(stored.as_ref()).unwrap();
assert_eq!(version, 2);
assert_eq!(data, b"version 2 data");
}
#[tokio::test(flavor = "current_thread")]
async fn recovery_guard_prevents_infinite_loops() {
let executor = create_test_executor().await;
let contract = create_test_contract(b"guard_test_code");
let key = contract.key();
let guard = &executor.recovery_guard;
assert!(!guard.lock().unwrap().contains(&key));
guard.lock().unwrap().insert(key);
assert!(guard.lock().unwrap().contains(&key));
guard.lock().unwrap().remove(&key);
assert!(!guard.lock().unwrap().contains(&key));
}
#[tokio::test(flavor = "current_thread")]
async fn recovery_guard_shared_across_pool() {
let mut executor_a = create_test_executor().await;
let mut executor_b = create_test_executor().await;
let contract = create_test_contract(b"pool_guard_test");
let key = contract.key();
let shared_guard: super::super::CorruptedStateRecoveryGuard =
Arc::new(std::sync::Mutex::new(HashSet::new()));
executor_a.set_recovery_guard(shared_guard.clone());
executor_b.set_recovery_guard(shared_guard.clone());
executor_a.recovery_guard.lock().unwrap().insert(key);
assert!(
executor_b.recovery_guard.lock().unwrap().contains(&key),
"Recovery guard should be visible across pool executors"
);
}
mod proptest_state_merge {
use super::*;
use proptest::prelude::*;
fn arb_states(count: usize) -> impl Strategy<Value = Vec<Vec<u8>>> {
proptest::collection::vec(proptest::collection::vec(any::<u8>(), 1..64), count..=count)
}
fn hash_merge(current: &[u8], incoming: &[u8]) -> Vec<u8> {
let current_hash = blake3::hash(current);
let incoming_hash = blake3::hash(incoming);
if incoming_hash.as_bytes() > current_hash.as_bytes() {
incoming.to_vec()
} else {
current.to_vec()
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(256))]
#[test]
fn state_merge_order_independent(
states in arb_states(4),
perm_seed in any::<u64>(),
) {
let all_same = states.windows(2).all(|w| w[0] == w[1]);
if all_same {
return Ok(());
}
let mut result_natural = states[0].clone();
for s in &states[1..] {
result_natural = hash_merge(&result_natural, s);
}
let mut indices: Vec<usize> = (0..states.len()).collect();
let mut rng_val = perm_seed;
for i in (1..indices.len()).rev() {
rng_val = rng_val.wrapping_mul(6364136223846793005).wrapping_add(1);
let j = (rng_val as usize) % (i + 1);
indices.swap(i, j);
}
let mut result_permuted = states[indices[0]].clone();
for &idx in &indices[1..] {
result_permuted = hash_merge(&result_permuted, &states[idx]);
}
prop_assert_eq!(
result_natural, result_permuted,
"States must converge regardless of application order"
);
}
#[test]
fn merge_selects_largest_hash(
states in arb_states(5),
) {
let winner = states
.iter()
.max_by_key(|s| blake3::hash(s).as_bytes().to_vec())
.unwrap()
.clone();
let mut result = states[0].clone();
for s in &states[1..] {
result = hash_merge(&result, s);
}
prop_assert_eq!(
result, winner,
"Merge must converge to state with largest hash"
);
}
#[test]
fn merge_idempotent(
state in proptest::collection::vec(any::<u8>(), 1..64),
) {
let result = hash_merge(&state, &state);
prop_assert_eq!(
result, state,
"Merging a state with itself must be idempotent"
);
}
#[test]
fn merge_commutative(
a in proptest::collection::vec(any::<u8>(), 1..64),
b in proptest::collection::vec(any::<u8>(), 1..64),
) {
let ab = hash_merge(&a, &b);
let ba = hash_merge(&b, &a);
prop_assert_eq!(ab, ba, "Merge must be commutative");
}
#[test]
fn crdt_lww_merge_order_independent(
versions in proptest::collection::vec(1u64..20, 3..6),
data_bytes in proptest::collection::vec(
proptest::collection::vec(any::<u8>(), 1..32),
3..6,
),
perm_seed in any::<u64>(),
) {
let count = versions.len().min(data_bytes.len());
let versions = &versions[..count];
let data_bytes = &data_bytes[..count];
fn lww_merge(
current: (u64, Vec<u8>),
incoming: (u64, Vec<u8>),
) -> (u64, Vec<u8>) {
if incoming.0 > current.0 {
incoming
} else if incoming.0 == current.0 {
let incoming_hash = blake3::hash(&incoming.1);
let current_hash = blake3::hash(¤t.1);
if incoming_hash.as_bytes() > current_hash.as_bytes() {
incoming
} else {
current
}
} else {
current
}
}
let entries: Vec<(u64, Vec<u8>)> = versions
.iter()
.zip(data_bytes.iter())
.map(|(&v, d)| (v, d.clone()))
.collect();
let mut result_natural = entries[0].clone();
for e in &entries[1..] {
result_natural = lww_merge(result_natural, e.clone());
}
let mut indices: Vec<usize> = (0..entries.len()).collect();
let mut rng_val = perm_seed;
for i in (1..indices.len()).rev() {
rng_val = rng_val.wrapping_mul(6364136223846793005).wrapping_add(1);
let j = (rng_val as usize) % (i + 1);
indices.swap(i, j);
}
let mut result_permuted = entries[indices[0]].clone();
for &idx in &indices[1..] {
result_permuted = lww_merge(result_permuted, entries[idx].clone());
}
prop_assert_eq!(
result_natural, result_permuted,
"CRDT LWW merge must converge regardless of order"
);
}
}
}
}