mod contract_ops;
mod delegates;
mod executor_impl;
mod pool;
mod subscriptions;
use super::*;
use super::{
ContractExecutor, ContractRequest, ContractResponse, ExecutorError, InitCheckResult,
RequestError, Response, SLOW_INIT_THRESHOLD, STALE_INIT_THRESHOLD, StateStoreError, now_nanos,
};
pub(crate) use contract_ops::ReclaimOutcome;
pub use pool::RuntimePool;
const MAX_RELATED_CONTRACTS_PER_REQUEST: usize = 10;
const RELATED_FETCH_TIMEOUT: Duration = Duration::from_secs(10);
const IDEMPOTENCY_PROBE_PROBABILITY: f64 = 1.0 / 32.0;
fn byte_multiset_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut hist = [0i64; 256];
for &x in a {
hist[x as usize] += 1;
}
for &x in b {
hist[x as usize] -= 1;
}
hist.iter().all(|&c| c == 0)
}
use crate::node::OpManager;
use crate::wasm_runtime::{
BackendEngine, MAX_STATE_SIZE, ModuleCache, RuntimeConfig, SharedModuleCache, UserSecretContext,
};
use dashmap::DashMap;
use freenet_stdlib::prelude::{MessageOrigin, RelatedContract};
use std::collections::{HashMap, HashSet};
fn production_offload_compilation() -> bool {
true
}
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
type SharedNotifications =
Arc<DashMap<ContractInstanceId, Vec<(ClientId, tokio::sync::mpsc::Sender<HostResult>)>>>;
type SharedSummaries =
Arc<DashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>>;
type SharedClientCounts = Arc<DashMap<ClientId, usize>>;
fn subscriber_limit_error(instance_id: ContractInstanceId, cause: &str) -> Box<RequestError> {
let synthetic_key = ContractKey::from_id_and_code(
instance_id,
freenet_stdlib::prelude::CodeHash::new([0u8; 32]),
);
Box::new(RequestError::ContractError(StdContractError::Subscribe {
key: synthetic_key,
cause: cause.to_string().into(),
}))
}
impl ContractExecutor for Executor<Runtime> {
fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey> {
self.bridged_lookup_key(instance_id)
}
fn op_manager_handle(&self) -> Option<Arc<crate::node::OpManager>> {
self.op_manager.clone()
}
async fn fetch_contract(
&mut self,
key: ContractKey,
return_contract_code: bool,
) -> Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError> {
self.bridged_fetch_contract(key, return_contract_code).await
}
async fn upsert_contract_state(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<UpsertResult, ExecutorError> {
self.bridged_upsert_contract_state(key, update, related_contracts, code)
.await
}
async fn upsert_contract_state_deferrable(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<UpsertOutcome, ExecutorError> {
bridged_upsert_outcome(
self.bridged_upsert_contract_state_inner(key, update, related_contracts, code, true)
.await,
)
}
fn register_contract_notifier(
&mut self,
instance_id: ContractInstanceId,
cli_id: ClientId,
notification_ch: tokio::sync::mpsc::Sender<HostResult>,
summary: Option<StateSummary<'_>>,
) -> Result<(), Box<RequestError>> {
self.bridged_register_contract_notifier(instance_id, cli_id, notification_ch, summary)
}
async fn execute_delegate_request(
&mut self,
req: DelegateRequest<'_>,
origin_contract: Option<&ContractInstanceId>,
caller_delegate: Option<&DelegateKey>,
user_context: Option<&UserSecretContext>,
) -> Response {
self.delegate_request(req, origin_contract, caller_delegate, user_context)
}
async fn export_user_secrets(
&mut self,
user_context: &UserSecretContext,
token: &[u8],
) -> Result<Vec<u8>, ExecutorError> {
Executor::export_user_secrets(self, user_context, token)
}
fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
self.get_subscription_info()
}
async fn summarize_contract_state(
&mut self,
key: ContractKey,
) -> Result<StateSummary<'static>, ExecutorError> {
self.bridged_summarize_contract_state(key).await
}
async fn get_contract_state_delta(
&mut self,
key: ContractKey,
their_summary: StateSummary<'static>,
) -> Result<StateDelta<'static>, ExecutorError> {
self.bridged_get_contract_state_delta(key, their_summary)
.await
}
async fn remove_contract(
&mut self,
key: &ContractKey,
_expected_generation: u64,
) -> Result<(), ExecutorError> {
self.reclaim_contract_storage(key).await.map(|_| ())
}
}
impl Executor<Runtime> {
pub async fn from_config_local(config: Arc<Config>) -> anyhow::Result<Self> {
Self::from_config(config, None).await
}
pub(crate) async fn from_config(
config: Arc<Config>,
op_manager: Option<Arc<OpManager>>,
) -> anyhow::Result<Self> {
let (contract_store, delegate_store, secret_store, state_store) =
Self::get_stores(&config).await?;
let mut rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap();
rt.set_state_store_db(state_store.storage());
if let Some(op_manager_ref) = &op_manager {
let op_manager_clone = op_manager_ref.clone();
rt.set_state_write_callback(Arc::new(move |key: &ContractKey, state_size: usize| {
op_manager_clone.ring.commit_state_write(key, state_size);
}));
}
Executor::new(
state_store,
move || {
if let Err(error) = crate::util::set_cleanup_on_exit(config.paths()) {
tracing::error!("Failed to set cleanup on exit: {error}");
}
Ok(())
},
OperationMode::Local,
rt,
op_manager,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn from_config_with_shared_modules(
config: Arc<Config>,
shared_state_store: StateStore<Storage>,
op_manager: Option<Arc<OpManager>>,
contract_modules: SharedModuleCache<ContractKey>,
delegate_modules: SharedModuleCache<DelegateKey>,
delegate_contexts: crate::wasm_runtime::DelegateContextCache,
shared_backend: Option<BackendEngine>,
) -> anyhow::Result<Self> {
let db = shared_state_store.storage();
let (contract_store, delegate_store, secret_store) =
Self::get_runtime_stores(&config, db.clone())?;
let runtime_config = RuntimeConfig {
offload_compilation: production_offload_compilation(),
module_cache_budget_bytes: config.module_cache_budget_bytes,
..RuntimeConfig::default()
};
let mut rt = Runtime::build_with_shared_module_caches(
contract_store,
delegate_store,
secret_store,
false,
contract_modules,
delegate_modules,
delegate_contexts,
shared_backend.unwrap_or_else(|| {
crate::wasm_runtime::engine::Engine::create_backend_engine(&runtime_config)
.expect("Failed to create WASM backend engine")
}),
&runtime_config,
)
.unwrap();
rt.set_state_store_db(db);
if let Some(op_manager_ref) = &op_manager {
let op_manager_clone = op_manager_ref.clone();
rt.set_state_write_callback(Arc::new(move |key: &ContractKey, state_size: usize| {
op_manager_clone.ring.commit_state_write(key, state_size);
}));
}
Executor::new(
shared_state_store,
|| Ok(()),
OperationMode::Local,
rt,
op_manager,
)
.await
}
pub async fn preload(
&mut self,
cli_id: ClientId,
contract: ContractContainer,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
) {
if let Err(err) = self
.contract_requests(
ContractRequest::Put {
contract,
state,
related_contracts,
subscribe: false,
blocking_subscribe: false,
},
cli_id,
None,
)
.await
{
match err.inner {
Either::Left(err) => tracing::error!("req error: {err}"),
Either::Right(err) => tracing::error!("other error: {err}"),
}
}
}
pub async fn handle_request(
&mut self,
id: ClientId,
req: ClientRequest<'_>,
updates: Option<mpsc::Sender<Result<HostResponse, WsClientError>>>,
) -> Response {
match req {
ClientRequest::ContractOp(op) => self.contract_requests(op, id, updates).await,
ClientRequest::DelegateOp(op) => self.delegate_request(op, None, None, None),
ClientRequest::Disconnect { cause } => {
if let Some(cause) = cause {
tracing::info!("disconnecting cause: {cause}");
}
Err(RequestError::Disconnect.into())
}
other @ (ClientRequest::Authenticate { .. }
| ClientRequest::NodeQueries(_)
| ClientRequest::Close
| _) => {
tracing::warn!(
client = %id,
request = ?other,
"unsupported client request"
);
Err(ExecutorError::other(anyhow::anyhow!("not supported")))
}
}
}
pub async fn contract_requests(
&mut self,
req: ContractRequest<'_>,
cli_id: ClientId,
updates: Option<mpsc::Sender<Result<HostResponse, WsClientError>>>,
) -> Response {
tracing::debug!(
client = %cli_id,
"received contract request"
);
let result = match req {
ContractRequest::Put {
contract,
state,
related_contracts,
..
} => {
tracing::debug!(
client = %cli_id,
contract = %contract.key(),
state_size = state.as_ref().len(),
"putting contract"
);
let key = contract.key();
if crate::contract::contains_debug_sections(contract.data()) {
let sections = crate::contract::debug_sections(contract.data()).join(", ");
return Err(ExecutorError::request(StdContractError::Put {
key,
cause: format!(
"contract appears to be compiled in debug mode \
(contains {sections} section(s)). Debug WASM is \
typically 10-100x larger than release builds and \
may exceed message-size limits. Recompile the \
contract with `--release` before publishing."
)
.into(),
}));
}
self.perform_contract_put(contract, state, related_contracts)
.await
}
ContractRequest::Update { key, data } => self.perform_contract_update(key, data).await,
ContractRequest::Get {
key: instance_id,
return_contract_code,
..
} => {
let full_key = self.lookup_key(&instance_id).ok_or_else(|| {
tracing::debug!(
contract = %instance_id,
phase = "key_lookup_failed",
"Contract not found during get request"
);
ExecutorError::request(StdContractError::MissingContract { key: instance_id })
})?;
match self
.perform_contract_get(return_contract_code, full_key)
.await
{
Ok((state, contract)) => Ok(ContractResponse::GetResponse {
key: full_key,
state: state.ok_or_else(|| {
tracing::debug!(
contract = %full_key,
phase = "get_failed",
"Contract state not found during get request"
);
ExecutorError::request(StdContractError::Get {
key: full_key,
cause: "contract state not found".into(),
})
})?,
contract,
}
.into()),
Err(err) => Err(err),
}
}
ContractRequest::Subscribe {
key: instance_id,
summary,
} => {
tracing::debug!(
client = %cli_id,
contract = %instance_id,
has_summary = summary.is_some(),
"subscribing to contract"
);
let updates = updates.ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("missing update channel"))
})?;
self.register_contract_notifier(instance_id, cli_id, updates, summary)?;
let full_key = self.lookup_key(&instance_id).ok_or_else(|| {
tracing::debug!(
contract = %instance_id,
phase = "key_lookup_failed",
"Contract not found during subscribe request"
);
ExecutorError::request(StdContractError::MissingContract { key: instance_id })
})?;
let _res = self.perform_contract_get(false, full_key).await?;
self.subscribe(full_key).await?;
Ok(ContractResponse::SubscribeResponse {
key: full_key,
subscribed: true,
}
.into())
}
other => {
tracing::warn!(
client = %cli_id,
request = ?other,
"unsupported contract request"
);
Err(ExecutorError::other(anyhow::anyhow!("not supported")))
}
};
if let Err(ref e) = result {
tracing::error!(
client = %cli_id,
error = %e,
phase = "request_failed",
"Contract request failed"
);
}
result
}
}
async fn fetch_related_via_network(
op_manager: Option<&Arc<crate::node::OpManager>>,
id: &ContractInstanceId,
) -> Result<WrappedState, ExecutorError> {
#[cfg(test)]
{
if let Some(stub) = TEST_NETWORK_FETCH_OVERRIDE.with(|cell| cell.borrow().clone()) {
return stub(*id);
}
}
let Some(op_manager) = op_manager else {
return Err(ExecutorError::request(StdContractError::MissingRelated {
key: *id,
}));
};
let (_tx, rx) = crate::operations::get::op_ctx_task::start_sub_op_get(op_manager, *id, false);
let outcome = rx
.await
.map_err(|_| ExecutorError::other(anyhow::anyhow!("sub-op GET task dropped")))?;
match outcome {
crate::operations::get::op_ctx_task::SubOpGetOutcome::Found(get_result) => {
Ok(WrappedState::from(get_result.state.as_ref().to_vec()))
}
crate::operations::get::op_ctx_task::SubOpGetOutcome::NotFound(_)
| crate::operations::get::op_ctx_task::SubOpGetOutcome::Infra(_) => {
Err(ExecutorError::request(StdContractError::MissingRelated {
key: *id,
}))
}
}
}
pub(super) fn bridged_upsert_outcome(
result: Result<UpsertResult, ExecutorError>,
) -> Result<UpsertOutcome, ExecutorError> {
match result {
Ok(res) => Ok(UpsertOutcome::Completed(res)),
Err(err) => match err.into_defer_related_fetch() {
Ok(missing) => Ok(UpsertOutcome::DeferRelated(missing)),
Err(other) => Err(other),
},
}
}
#[cfg(test)]
pub(crate) type NetworkFetchStub =
std::rc::Rc<dyn Fn(ContractInstanceId) -> Result<WrappedState, ExecutorError>>;
#[cfg(test)]
thread_local! {
static TEST_NETWORK_FETCH_OVERRIDE: std::cell::RefCell<Option<NetworkFetchStub>> =
const { std::cell::RefCell::new(None) };
}
#[cfg(test)]
pub(crate) fn set_test_network_fetch_override(stub: Option<NetworkFetchStub>) {
TEST_NETWORK_FETCH_OVERRIDE.with(|cell| *cell.borrow_mut() = stub);
}
#[cfg(test)]
mod executor_pin_tests {
#[test]
fn local_state_or_from_network_uses_sub_op_driver() {
let src = include_str!("runtime/subscriptions.rs");
let body = src
.split("async fn local_state_or_from_network(")
.nth(1)
.expect("local_state_or_from_network must exist")
.split(
"
}",
)
.next()
.expect("closing brace");
assert!(
body.contains("start_sub_op_get"),
"local_state_or_from_network must call start_sub_op_get"
);
let get_contract_needle = ["Get", "Contract", " {"].concat();
assert!(
!body.contains(&get_contract_needle),
"local_state_or_from_network must NOT construct GetContract"
);
let op_request_needle = ["self.", "op_request"].concat();
assert!(
!body.contains(&op_request_needle),
"local_state_or_from_network must NOT call self.op_request"
);
}
#[test]
fn executor_subscribe_uses_run_executor_subscribe() {
let src = include_str!("runtime/subscriptions.rs");
let body = src
.split("async fn subscribe(&mut self, key: ContractKey)")
.nth(1)
.expect("executor::subscribe must exist")
.split(
"
}",
)
.next()
.expect("closing brace");
assert!(
body.contains("run_executor_subscribe"),
"executor::subscribe must call run_executor_subscribe"
);
let sub_contract_needle = ["Subscribe", "Contract", " {"].concat();
assert!(
!body.contains(&sub_contract_needle),
"executor::subscribe must NOT construct SubscribeContract"
);
let op_request_needle = ["self.", "op_request"].concat();
assert!(
!body.contains(&op_request_needle),
"executor::subscribe must NOT call self.op_request"
);
}
#[test]
fn perform_contract_update_does_not_use_network_op_request() {
let src = include_str!("runtime/contract_ops.rs");
let body = src
.split("async fn perform_contract_update(")
.nth(1)
.expect("perform_contract_update must exist")
.split(
"
}",
)
.next()
.expect("closing brace");
let update_contract_needle = ["Update", "Contract", " {"].concat();
assert!(
!body.contains(&update_contract_needle),
"perform_contract_update must NOT construct UpdateContract"
);
let op_request_needle = ["self.", "op_request"].concat();
assert!(
!body.contains(&op_request_needle),
"perform_contract_update must NOT call self.op_request; \
network-mode UPDATEs flow through start_client_update"
);
let request_update_needle = ["request_", "update("].concat();
assert!(
!body.contains(&request_update_needle),
"perform_contract_update must NOT call request_update"
);
}
#[test]
fn related_contract_fetch_sites_use_join_all() {
const CONTRACT_OPS_SRC: &str = include_str!("runtime/contract_ops.rs");
const EXECUTOR_IMPL_SRC: &str = include_str!("runtime/executor_impl.rs");
let sites: &[(&str, &str, &str)] = &[
(
"fetch_related_for_validation",
EXECUTOR_IMPL_SRC,
"async fn fetch_related_for_validation(",
),
(
"fetch_related_for_validation_network",
CONTRACT_OPS_SRC,
"async fn fetch_related_for_validation_network(",
),
];
for (name, file_src, needle) in sites {
let body = file_src
.split(needle)
.nth(1)
.unwrap_or_else(|| panic!("{name} must exist in its source file"))
.split("\n }\n")
.next()
.unwrap_or_else(|| panic!("{name} closing brace not found"));
assert!(
body.contains("join_all"),
"{name} must call futures::future::join_all — serial fetch \
regressed in freenet/freenet-core#4077; do not revert"
);
let serial_network_fetch = body.split("for id in &unique_ids").skip(1).any(|seg| {
let mut end = seg.len().min(1_500);
while end > 0 && !seg.is_char_boundary(end) {
end -= 1;
}
let window = &seg[..end];
window.contains("fetch_related_via_network")
});
assert!(
!serial_network_fetch,
"{name} must not iterate serially over &unique_ids to call \
fetch_related_via_network — regressed to pre-#4077 behavior"
);
}
let inline_marker = "NON-deferrable mode: parallel fetch";
let after_marker = EXECUTOR_IMPL_SRC.split(inline_marker).nth(1).expect(
"inline UPDATE-side parallel-fetch marker missing from \
executor_impl.rs — #4077 regressed",
);
let mut window_end = after_marker.len().min(2_000);
while window_end > 0 && !after_marker.is_char_boundary(window_end) {
window_end -= 1;
}
assert!(
after_marker[..window_end].contains("join_all"),
"UPDATE-side inline related fetch in bridged_upsert_contract_state \
must call futures::future::join_all (#4077)"
);
}
#[test]
fn contract_state_updated_logs_at_debug_pin_test() {
let src = include_str!("runtime/executor_impl.rs");
let needle = "\"Contract state updated\"";
let idx = src
.find(needle)
.expect("Contract state updated log message must still exist in source");
let preceding = &src[..idx];
let macro_idx = preceding
.rfind("tracing::")
.expect("a tracing macro must precede the Contract-state-updated log site");
let line_start = preceding[..macro_idx].rfind('\n').map_or(0, |n| n + 1);
let line_prefix = &preceding[line_start..macro_idx];
assert!(
line_prefix.chars().all(char::is_whitespace),
"rfind matched `tracing::` inside a string literal or comment, \
not a macro invocation. Prefix on its line: {line_prefix:?}"
);
let after_macro = &preceding[macro_idx + "tracing::".len()..];
let macro_name = after_macro.split('!').next().unwrap_or("");
let tail = &preceding[preceding.len().saturating_sub(200)..];
assert_eq!(
macro_name, "debug",
"Contract-state-updated log site must be at DEBUG \
(closest preceding macro is `tracing::{macro_name}!`). \
Re-promotion to INFO/WARN restores the #4251 / #4272 log-volume regression.\n\
Preceding source (last 200 bytes):\n{tail}"
);
}
#[test]
fn production_offload_is_opt_in() {
assert!(
super::production_offload_compilation(),
"production pool must opt in to offload; the runtime-flavor check in \
compile_offloaded keeps it safe/deterministic everywhere else"
);
}
#[test]
fn from_config_with_shared_modules_wires_offload_and_budget() {
let src = include_str!("runtime.rs");
let body = src
.split("pub(crate) async fn from_config_with_shared_modules(")
.nth(1)
.expect("from_config_with_shared_modules must exist")
.split("\n pub async fn preload(")
.next()
.expect("end of from_config_with_shared_modules");
assert!(
body.contains("offload_compilation: production_offload_compilation()"),
"must set offload_compilation from the production gate"
);
assert!(
body.contains("module_cache_budget_bytes: config.module_cache_budget_bytes"),
"must thread the operator-configured byte budget into the runtime config"
);
assert!(
body.contains("Engine::create_backend_engine(&runtime_config)"),
"backend engine must be built from the threaded runtime_config"
);
}
#[test]
fn runtime_pool_sizes_caches_by_byte_budget() {
let src = include_str!("runtime/pool.rs");
let body = src
.split("pub async fn new(")
.nth(1)
.expect("RuntimePool::new must exist")
.split("\n ")
.take(60)
.collect::<String>();
assert!(
body.contains("config.module_cache_budget_bytes"),
"RuntimePool::new must size caches from config.module_cache_budget_bytes"
);
assert!(
body.contains("ModuleCache::with_label(contract_cache_budget, \"contract\")"),
"RuntimePool::new must build the contract cache from the contract byte budget"
);
assert!(
body.contains("ModuleCache::with_label(delegate_cache_budget, \"delegate\")"),
"RuntimePool::new must build the delegate cache from its own (smaller) budget"
);
assert!(
body.contains("DELEGATE_MODULE_CACHE_BUDGET_DIVISOR"),
"the delegate cache must be a fraction of the contract budget so the \
COMBINED default ceiling stays safe on a small box (issue #4441 fix-up)"
);
assert!(
!body.contains("DEFAULT_MODULE_CACHE_CAPACITY"),
"RuntimePool::new must no longer reference the removed count cap"
);
}
#[test]
fn contract_requests_put_rejects_debug_wasm_before_perform_put() {
let src = include_str!("runtime.rs");
let body = src
.split("pub async fn contract_requests(")
.nth(1)
.expect("contract_requests must exist");
let guard_pos = body
.find("contains_debug_sections")
.expect("contract_requests Put arm must call contains_debug_sections");
let put_pos = body
.find("self.perform_contract_put(")
.expect("contract_requests must call perform_contract_put");
assert!(
guard_pos < put_pos,
"the debug-WASM guard (contains_debug_sections) must run BEFORE \
perform_contract_put, so a debug build is rejected before any \
local storage/validation work"
);
}
}
#[cfg(test)]
mod remove_contract_tests {
use std::sync::Arc;
use freenet_stdlib::prelude::{
ContractCode, ContractContainer, ContractKey, ContractWasmAPIVersion, Parameters,
WrappedContract, WrappedState,
};
use super::ReclaimOutcome;
use crate::contract::executor::Executor;
use crate::contract::storages::Storage;
use crate::wasm_runtime::{
ContractStore, DelegateStore, Runtime, SecretsStore, StateStore, StateStoreError,
};
async fn build_disk_executor(
seed: &str,
) -> (Executor<Runtime>, std::path::PathBuf, tempfile::TempDir) {
let temp_dir = crate::util::tests::get_temp_dir();
let db = Storage::new(temp_dir.path())
.await
.expect("create storage db");
let contracts_dir = temp_dir.path().join(format!("contracts-{seed}"));
let contract_store = ContractStore::new(contracts_dir.clone(), 10_000, db.clone())
.expect("create contract store");
let delegate_store =
DelegateStore::new(temp_dir.path().join("delegate"), 10_000, db.clone())
.expect("create delegate store");
let secrets_store = SecretsStore::new(
temp_dir.path().join("secrets"),
Default::default(),
db.clone(),
)
.expect("create secrets store");
let state_store = StateStore::new(db, 10_000_000).expect("create state store");
let runtime = Runtime::build(contract_store, delegate_store, secrets_store, false)
.expect("build runtime");
let executor = Executor::new(
state_store,
|| Ok(()),
crate::contract::executor::OperationMode::Local,
runtime,
None,
)
.await
.expect("create executor");
(executor, contracts_dir, temp_dir)
}
fn make_contract(code_seed: u8, param_seed: u8) -> (ContractContainer, ContractKey) {
let code = ContractCode::from(vec![code_seed; 64]);
let params = Parameters::from(vec![param_seed; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let wrapped = WrappedContract::new(Arc::new(code), params);
let container = ContractContainer::Wasm(ContractWasmAPIVersion::V1(wrapped));
(container, key)
}
fn wasm_path(contracts_dir: &std::path::Path, key: &ContractKey) -> std::path::PathBuf {
contracts_dir
.join(key.code_hash().encode())
.with_extension("wasm")
}
#[tokio::test(flavor = "multi_thread")]
async fn remove_contract_reclaims_state_and_wasm_from_disk() {
let (mut executor, contracts_dir, _temp) = build_disk_executor("reclaim").await;
let (container, key) = make_contract(0x11, 0x22);
let params = container.params();
let state = WrappedState::new(b"hosted state payload".to_vec());
executor
.runtime
.contract_store
.store_contract(container)
.expect("store contract code");
executor
.state_store
.store(key, state.clone(), params)
.await
.expect("store contract state");
let fetched = executor
.state_store
.get(&key)
.await
.expect("state retrievable before eviction");
assert_eq!(fetched, state, "stored state must round-trip");
let blob = wasm_path(&contracts_dir, &key);
assert!(
blob.exists(),
"WASM blob must exist on disk before eviction: {blob:?}"
);
let outcome = executor
.reclaim_contract_storage(&key)
.await
.expect("reclaim must succeed");
assert_eq!(
outcome,
ReclaimOutcome::Full,
"fresh-evict path with both halves present must be Full"
);
match executor.state_store.get(&key).await {
Err(StateStoreError::MissingContract(missing)) => assert_eq!(missing, key),
other => panic!("expected MissingContract after eviction, got {other:?}"),
}
assert!(
!blob.exists(),
"WASM blob must be deleted from disk after eviction: {blob:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn remove_contract_is_idempotent_on_double_eviction() {
let (mut executor, contracts_dir, _temp) = build_disk_executor("idempotent").await;
let (container, key) = make_contract(0x33, 0x44);
let params = container.params();
let state = WrappedState::new(b"payload".to_vec());
executor
.runtime
.contract_store
.store_contract(container)
.expect("store contract code");
executor
.state_store
.store(key, state, params)
.await
.expect("store contract state");
let first = executor
.reclaim_contract_storage(&key)
.await
.expect("first reclaim must succeed");
assert_eq!(
first,
ReclaimOutcome::Full,
"first reclaim with both halves present must be Full"
);
let second = executor
.reclaim_contract_storage(&key)
.await
.expect("second reclaim must be a no-op, not an error");
assert_eq!(
second,
ReclaimOutcome::Full,
"double-evict must report Full (both backends treat missing as ok)"
);
assert!(
!wasm_path(&contracts_dir, &key).exists(),
"WASM blob must remain absent after double eviction"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn remove_contract_unknown_contract_is_noop() {
let (mut executor, _contracts_dir, _temp) = build_disk_executor("unknown").await;
let (_container, key) = make_contract(0x55, 0x66);
let outcome = executor
.reclaim_contract_storage(&key)
.await
.expect("reclaiming an unknown contract must be Ok");
assert_eq!(
outcome,
ReclaimOutcome::Full,
"unknown-contract path is treated as already-clean, hence Full"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn reclaim_outcome_state_present_code_absent_is_full() {
let (mut executor, _contracts_dir, _temp) = build_disk_executor("partial-state-only").await;
let (container, key) = make_contract(0x77, 0x88);
let params = container.params();
let state = WrappedState::new(b"state without code".to_vec());
executor
.state_store
.store(key, state, params)
.await
.expect("store contract state");
let outcome = executor
.reclaim_contract_storage(&key)
.await
.expect("reclaim must succeed even when code half is already absent");
assert_eq!(
outcome,
ReclaimOutcome::Full,
"state-only present + code-already-gone counts as Full because \
both halves are absent at end"
);
}
}
#[cfg(test)]
mod state_write_attribution_pin_tests {
const RUNTIME_SRC: &str = concat!(
include_str!("runtime.rs"),
include_str!("runtime/executor_impl.rs"),
include_str!("runtime/contract_ops.rs")
);
const RING_SRC: &str = include_str!("../../ring.rs");
const NATIVE_API_SRC: &str = include_str!("../../wasm_runtime/native_api.rs");
fn count_call_sites(src: &str, needle: &str) -> usize {
src.lines()
.filter(|line| {
let trimmed = line.trim_start();
if trimmed.starts_with("//") {
return false;
}
let stripped = strip_string_literals(line);
stripped.contains(needle)
})
.count()
}
fn strip_string_literals(line: &str) -> String {
let mut out = String::with_capacity(line.len());
let mut in_string = false;
let mut prev_was_backslash = false;
for c in line.chars() {
if in_string {
if c == '"' && !prev_was_backslash {
in_string = false;
out.push('"');
}
} else if c == '"' {
in_string = true;
out.push('"');
} else {
out.push(c);
}
prev_was_backslash = c == '\\' && !prev_was_backslash;
}
out
}
#[test]
fn bump_state_generation_has_exactly_one_caller_outside_hosting_manager() {
let count = count_call_sites(RING_SRC, ".bump_state_generation(");
assert_eq!(
count, 1,
"expected exactly 1 .bump_state_generation( call in ring.rs \
(inside commit_state_write); found {count}. New direct \
callers should go through Ring::commit_state_write instead, \
or this assertion needs updating with a comment explaining \
why the new direct caller is correct."
);
let runtime_calls = count_call_sites(RUNTIME_SRC, ".bump_state_generation(");
assert_eq!(
runtime_calls, 0,
"runtime.rs must not call .bump_state_generation directly; \
use Ring::commit_state_write instead (which bundles the \
bump + refresh + report side effects). See \
`.claude/rules/bug-prevention-patterns.md` row \
'Manually-mirrored telemetry counters'."
);
}
#[test]
fn every_runtime_state_write_chokepoint_goes_through_commit_state_write() {
const EXPECTED: usize = 6;
let count = count_call_sites(RUNTIME_SRC, ".commit_state_write(");
assert_eq!(
count, EXPECTED,
"expected exactly {EXPECTED} .commit_state_write( call sites \
in runtime.rs; found {count}. If you added a new state-write \
chokepoint, wire it through `Ring::commit_state_write` and \
bump this expectation. If you removed one, ensure the \
chokepoint is genuinely gone (not just relocated) before \
lowering this expectation."
);
}
#[test]
fn v2_delegate_state_write_paths_invoke_callback_with_state_size() {
let calls = count_call_sites(NATIVE_API_SRC, "cb(&contract_key,");
assert_eq!(
calls, 2,
"expected exactly 2 callback invocations passing state_size \
in native_api.rs (one for PUT, one for UPDATE); found {calls}"
);
let captures = count_call_sites(NATIVE_API_SRC, "let state_size = state.len();");
assert_eq!(
captures, 2,
"expected exactly 2 `let state_size = state.len();` captures \
in native_api.rs (one before each state-store move); found \
{captures}. The order matters — capturing AFTER the move \
into store_state_sync would not compile, but a refactor \
that moves state into an intermediate first could regress \
this silently."
);
}
}
#[cfg(test)]
mod idempotency_probe_convergence_tests {
use super::byte_multiset_eq;
#[test]
fn reordered_bytes_are_benign_flutter() {
assert!(
byte_multiset_eq(b"{\"a\":1,\"b\":2}", b"{\"b\":2,\"a\":1}"),
"a key-order permutation has the same byte multiset and must not be flagged"
);
assert!(byte_multiset_eq(b"same", b"same"));
}
#[test]
fn fixed_size_content_change_is_flagged() {
assert!(
!byte_multiset_eq(b"counter=464;payload", b"counter=465;payload"),
"a fixed-size content change must be detected as non-idempotent"
);
let mut s1 = vec![b'x'; 464];
let mut s2 = vec![b'x'; 464];
s1[0] = 0;
s2[0] = 1; assert!(
!byte_multiset_eq(&s1, &s2),
"fixed-size (464-byte) counter churn must be flagged (the #4251 shape)"
);
}
#[test]
fn growing_state_is_flagged() {
assert!(
!byte_multiset_eq(b"abc", b"abcd"),
"a state that grows on re-application is non-convergent"
);
}
}