#![allow(clippy::large_futures)]
#![cfg(any(feature = "wasmer", feature = "wasmtime"))]
use std::collections::{BTreeMap, BTreeSet};
use assert_matches::assert_matches;
use async_graphql::Request;
use counter::CounterAbi;
use crowd_funding::{CrowdFundingAbi, InstantiationArgument, Operation as CrowdFundingOperation};
use fungible::{FungibleOperation, InitialState, Parameters};
use hex_game::{HexAbi, Operation as HexOperation, Timeouts};
use linera_base::{
crypto::{CryptoHash, InMemorySigner},
data_types::{
Amount, BlanketMessagePolicy, BlobContent, BlockHeight, Bytecode, ChainDescription, Epoch,
Event, MessagePolicy, OracleResponse, Round, TimeDelta, Timestamp,
},
identifiers::{
AccountOwner, ApplicationId, BlobId, BlobType, DataBlobHash, ModuleId, StreamId, StreamName,
},
ownership::{ChainOwnership, TimeoutConfig},
vm::VmRuntime,
};
use linera_chain::{data_types::MessageAction, ChainError, ChainExecutionContext};
use linera_execution::{
wasm_test, ExecutionError, Message, MessageKind, Operation, QueryOutcome,
ResourceControlPolicy, SystemMessage, SystemOperation, WasmRuntime,
};
use linera_storage::Storage as _;
use serde_json::json;
use test_case::test_case;
#[cfg(feature = "dynamodb")]
use crate::client::client_tests::DynamoDbStorageBuilder;
#[cfg(feature = "rocksdb")]
use crate::client::client_tests::RocksDbStorageBuilder;
#[cfg(feature = "scylladb")]
use crate::client::client_tests::ScyllaDbStorageBuilder;
#[cfg(feature = "storage-service")]
use crate::client::client_tests::ServiceStorageBuilder;
use crate::{
client::{
chain_client,
client_tests::{MemoryStorageBuilder, StorageBuilder, TestBuilder},
ChainClient, ClientOutcome,
},
local_node::LocalNodeError,
test_utils::{ClientOutcomeResultExt as _, FaultType},
worker::{Notification, Reason, WorkerError},
Environment,
};
trait ChainClientExt {
async fn publish_wasm_example(&self, name: &str) -> anyhow::Result<ModuleId>;
}
impl<Env: Environment> ChainClientExt for ChainClient<Env> {
async fn publish_wasm_example(&self, name: &str) -> anyhow::Result<ModuleId> {
let (contract_path, service_path) = wasm_test::get_example_bytecode_paths(name)?;
let contract_bytecode = Bytecode::load_from_file(contract_path)?;
let service_bytecode = Bytecode::load_from_file(service_path)?;
let (module_id, _cert) = self
.publish_module(contract_bytecode, service_bytecode, VmRuntime::Wasm)
.await
.unwrap_ok_committed();
Ok(module_id)
}
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_create_application(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_create_application(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_service_create_application(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_create_application(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_rocks_db_create_application(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_create_application(RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await).await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_dynamo_db_create_application(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_create_application(DynamoDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_scylla_db_create_application(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_create_application(ScyllaDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_create_application<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let vm_runtime = VmRuntime::Wasm;
let (contract_path, service_path) =
linera_execution::wasm_test::get_example_bytecode_paths("counter")?;
let contract_bytecode = Bytecode::load_from_file(contract_path)?;
let service_bytecode = Bytecode::load_from_file(service_path)?;
let contract_compressed_len = contract_bytecode.compress().compressed_bytes.len();
let service_compressed_len = service_bytecode.compress().compressed_bytes.len();
let mut policy = ResourceControlPolicy::all_categories();
policy.maximum_bytecode_size = contract_bytecode
.bytes
.len()
.max(service_bytecode.bytes.len()) as u64;
policy.maximum_blob_size = contract_compressed_len.max(service_compressed_len) as u64;
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(policy.clone());
let publisher = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let creator = builder.add_root_chain(1, Amount::ONE).await?;
let (module_id, _cert) = publisher
.publish_module(contract_bytecode, service_bytecode, vm_runtime)
.await
.unwrap_ok_committed();
let module_id = module_id.with_abi::<counter::CounterAbi, (), u64>();
creator.synchronize_from_validators().await.unwrap();
creator.process_inbox().await.unwrap();
let balance_after_messaging = creator.local_balance().await?;
assert_eq!(balance_after_messaging, Amount::ONE);
let initial_value = 10_u64;
let (application_id, _) = creator
.create_application(module_id, &(), &initial_value, vec![])
.await
.unwrap_ok_committed();
let increment = 5_u64;
creator
.execute_operation(Operation::user(application_id, &increment)?)
.await
.unwrap();
let query = Request::new("{ value }");
let outcome = creator
.query_user_application(application_id, &query)
.await
.unwrap();
let expected = QueryOutcome {
response: async_graphql::Response::new(
async_graphql::Value::from_json(json!({"value": 15})).unwrap(),
),
operations: vec![],
};
assert_eq!(outcome, expected);
let balance_after_init = creator.local_balance().await?;
assert!(balance_after_init < balance_after_messaging);
let large_bytecode = Bytecode::new(vec![0; policy.maximum_bytecode_size as usize + 1]);
let small_bytecode = Bytecode::new(vec![]);
let result = publisher
.publish_module(large_bytecode.clone(), small_bytecode.clone(), vm_runtime)
.await;
assert_matches!(
result,
Err(chain_client::Error::LocalNodeError(
LocalNodeError::WorkerError(WorkerError::ChainError(chain_error))
)) if matches!(&*chain_error, ChainError::ExecutionError(
error, ChainExecutionContext::Block
) if matches!(**error, ExecutionError::BytecodeTooLarge))
);
let result = publisher
.publish_module(small_bytecode, large_bytecode, vm_runtime)
.await;
assert_matches!(
result,
Err(chain_client::Error::LocalNodeError(
LocalNodeError::WorkerError(WorkerError::ChainError(chain_error))
)) if matches!(&*chain_error, ChainError::ExecutionError(
error, ChainExecutionContext::Block
) if matches!(**error, ExecutionError::BytecodeTooLarge))
);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_run_application_with_dependency(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_run_application_with_dependency(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime))
.await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_service_run_application_with_dependency(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_run_application_with_dependency(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime))
.await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_rocks_db_run_application_with_dependency(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_run_application_with_dependency(
RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await,
)
.await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_dynamo_db_run_application_with_dependency(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_run_application_with_dependency(DynamoDbStorageBuilder::with_wasm_runtime(
wasm_runtime,
))
.await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_scylla_db_run_application_with_dependency(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_run_application_with_dependency(ScyllaDbStorageBuilder::with_wasm_runtime(
wasm_runtime,
))
.await
}
async fn run_test_run_application_with_dependency<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let publisher = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let creator = builder.add_root_chain(1, Amount::ONE).await?;
let receiver = builder.add_root_chain(2, Amount::ONE).await?;
let receiver_id = receiver.chain_id();
let receiver_key = receiver.identity().await.unwrap();
receiver
.change_ownership(ChainOwnership::multiple(
[(receiver_key, 100)],
100,
TimeoutConfig::default(),
))
.await
.unwrap();
let creator_key = creator.identity().await.unwrap();
creator
.change_ownership(ChainOwnership::multiple(
[(creator_key, 100)],
100,
TimeoutConfig::default(),
))
.await
.unwrap();
let module_id1 = publisher.publish_wasm_example("counter").await?;
let module_id1 = module_id1.with_abi::<counter::CounterAbi, (), u64>();
let module_id2 = publisher.publish_wasm_example("meta-counter").await?;
let module_id2 =
module_id2.with_abi::<meta_counter::MetaCounterAbi, ApplicationId<CounterAbi>, ()>();
creator.synchronize_from_validators().await.unwrap();
let initial_value = 10_u64;
let (application_id1, _) = creator
.create_application(module_id1, &(), &initial_value, vec![])
.await
.unwrap_ok_committed();
let (application_id2, certificate) = creator
.create_application(
module_id2,
&application_id1,
&(),
vec![application_id1.forget_abi()],
)
.await
.unwrap_ok_committed();
assert_eq!(
certificate.block().body.events,
vec![vec![Event {
stream_id: StreamId {
application_id: application_id2.forget_abi().into(),
stream_name: StreamName(b"announcements".to_vec()),
},
index: 0,
value: bcs::to_bytes(&"instantiated".to_string()).unwrap(),
}]]
);
let mut operation = meta_counter::Operation::increment(receiver_id, 5, true);
operation.fuel_grant = 1000000;
let cert = creator
.execute_operation(Operation::user(application_id2, &operation)?)
.await
.unwrap_ok_committed();
let block = cert.block();
let responses = &block.body.oracle_responses;
let [_, responses] = &responses[..] else {
panic!("Unexpected oracle responses: {:?}", responses);
};
let [OracleResponse::Service(json)] = &responses[..] else {
assert_eq!(&responses[..], &[]);
panic!("Unexpected oracle responses: {:?}", responses);
};
let response_json = serde_json::from_slice::<serde_json::Value>(json).unwrap();
assert_eq!(response_json["data"], json!({"value": 10}));
receiver.synchronize_from_validators().await.unwrap();
receiver.process_inbox().await.unwrap();
let query = Request::new("{ value }");
let outcome = receiver
.query_user_application(application_id2, &query)
.await
.unwrap();
let expected = QueryOutcome {
response: async_graphql::Response::new(
async_graphql::Value::from_json(json!({"value": 5})).unwrap(),
),
operations: vec![],
};
assert_eq!(outcome, expected);
let operation = meta_counter::Operation::fail(receiver_id);
creator
.execute_operation(Operation::user(application_id2, &operation)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
let mut certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let cert = certs.pop().unwrap();
let incoming_bundles = cert.block().body.incoming_bundles().collect::<Vec<_>>();
assert_eq!(incoming_bundles.len(), 1);
assert_eq!(incoming_bundles[0].action, MessageAction::Reject);
assert_eq!(
incoming_bundles[0].bundle.messages[0].kind,
MessageKind::Simple
);
let messages = cert.block().messages();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].len(), 0);
let mut operation = meta_counter::Operation::fail(receiver_id);
operation.is_tracked = true;
creator
.execute_operation(Operation::user(application_id2, &operation)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
let mut certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let cert = certs.pop().unwrap();
let incoming_bundles = cert.block().body.incoming_bundles().collect::<Vec<_>>();
assert_eq!(incoming_bundles.len(), 1);
assert_eq!(incoming_bundles[0].action, MessageAction::Reject);
assert_eq!(
incoming_bundles[0].bundle.messages[0].kind,
MessageKind::Tracked
);
let messages = cert.block().messages();
assert_eq!(messages.len(), 1);
creator.synchronize_from_validators().await.unwrap();
let mut certs = creator.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let cert = certs.pop().unwrap();
let incoming_bundles = cert.block().body.incoming_bundles().collect::<Vec<_>>();
assert_eq!(incoming_bundles.len(), 2);
assert_eq!(incoming_bundles[0].action, MessageAction::Accept);
assert_eq!(
incoming_bundles[0].bundle.messages[0].kind,
MessageKind::Tracked
);
assert_matches!(
incoming_bundles[0].bundle.messages[0].message,
Message::System(SystemMessage::Credit { .. })
);
assert_eq!(incoming_bundles[1].action, MessageAction::Accept);
assert_eq!(
incoming_bundles[1].bundle.messages[0].kind,
MessageKind::Bouncing
);
assert_matches!(
incoming_bundles[1].bundle.messages[0].message,
Message::User { .. }
);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_memory_cross_chain_message(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_cross_chain_message(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_service_cross_chain_message(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_cross_chain_message(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_rocks_db_cross_chain_message(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_cross_chain_message(RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await).await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_dynamo_db_cross_chain_message(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_cross_chain_message(DynamoDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_scylla_db_cross_chain_message(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_cross_chain_message(ScyllaDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_cross_chain_message<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let _admin = builder.add_root_chain(0, Amount::ONE).await?;
let sender = builder.add_root_chain(1, Amount::from_tokens(3)).await?;
let receiver = builder.add_root_chain(2, Amount::ONE).await?;
let receiver2 = builder.add_root_chain(3, Amount::ONE).await?;
let module_id = sender.publish_wasm_example("fungible").await?;
let module_id = module_id.with_abi::<fungible::FungibleTokenAbi, Parameters, InitialState>();
let sender_owner = sender.preferred_owner().unwrap();
let receiver_owner = receiver.preferred_owner().unwrap();
let receiver2_owner = receiver2.preferred_owner().unwrap();
let accounts = BTreeMap::from_iter([(sender_owner, Amount::from_tokens(1_000_000))]);
let state = InitialState { accounts };
let params = Parameters::new("FUN");
let (application_id, _cert) = sender
.create_application(module_id, ¶ms, &state, vec![])
.await
.unwrap_ok_committed();
let transfer = FungibleOperation::Transfer {
owner: sender_owner,
amount: 100.into(),
target_account: fungible::Account {
chain_id: receiver.chain_id(),
owner: receiver_owner,
},
};
let cert = sender
.execute_operation(Operation::user(application_id, &transfer)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
{
let chain = receiver
.storage_client()
.load_chain(sender.chain_id())
.await?;
assert_eq!(chain.tip_state.get().next_block_height.0, 0);
assert_eq!(
chain
.preprocessed_blocks
.get(&cert.inner().height())
.await?,
Some(cert.hash())
);
}
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let bundles = certs[0].block().body.incoming_bundles();
assert!(bundles
.flat_map(|msg| &msg.bundle.messages)
.any(|msg| matches!(msg.message, Message::User { .. })));
let transfer = FungibleOperation::Transfer {
owner: sender_owner,
amount: 200.into(),
target_account: fungible::Account {
chain_id: receiver.chain_id(),
owner: receiver_owner,
},
};
sender
.execute_operation(Operation::user(application_id, &transfer)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let bundles = certs[0].block().body.incoming_bundles();
assert!(bundles
.flat_map(|msg| &msg.bundle.messages)
.any(|msg| matches!(msg.message, Message::User { .. })));
let transfer = FungibleOperation::Transfer {
owner: receiver_owner,
amount: 301.into(),
target_account: fungible::Account {
chain_id: receiver2.chain_id(),
owner: receiver2_owner,
},
};
assert!(receiver
.execute_operation(Operation::user(application_id, &transfer)?)
.await
.is_err());
receiver.clear_pending_proposal().await;
let transfer = FungibleOperation::Transfer {
owner: receiver_owner,
amount: 300.into(),
target_account: fungible::Account {
chain_id: receiver2.chain_id(),
owner: receiver2_owner,
},
};
receiver
.execute_operation(Operation::user(application_id, &transfer)?)
.await
.unwrap_ok_committed();
receiver2.synchronize_from_validators().await.unwrap();
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_memory_event_streams(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_service_event_streams(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_rocks_db_event_streams(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams(RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await).await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_dynamo_db_event_streams(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams(DynamoDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_scylla_db_event_streams(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams(ScyllaDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_event_streams<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
builder.set_fault_type([3], FaultType::Offline);
let admin_client = builder.add_root_chain(0, Amount::ONE).await?;
let sender = builder.add_root_chain(1, Amount::ONE).await?;
let sender2 = builder.add_root_chain(2, Amount::ONE).await?;
let (sender, sender2) = if sender.chain_id() < sender2.chain_id() {
(sender, sender2)
} else {
(sender2, sender)
};
let mut receiver = builder.add_root_chain(2, Amount::ONE).await?;
let module_id = receiver.publish_wasm_example("social").await?;
let module_id = module_id.with_abi::<social::SocialAbi, (), ()>();
let (application_id, _cert) = receiver
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
let request_subscribe = social::Operation::Subscribe {
chain_id: sender.chain_id(),
};
let request_subscribe2 = social::Operation::Subscribe {
chain_id: sender2.chain_id(),
};
receiver
.execute_operations(
vec![
Operation::user(application_id, &request_subscribe)?,
Operation::user(application_id, &request_subscribe2)?,
],
vec![],
)
.await
.unwrap_ok_committed();
let text = "Please like and comment!".to_string();
let post = social::Operation::Post {
text: text.clone(),
image_url: None,
};
sender
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
builder.set_fault_type([3], FaultType::Honest);
builder.set_fault_type([2], FaultType::Offline);
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let operations = certs[0].block().body.operations().collect::<Vec<_>>();
let [Operation::System(operation)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
assert_eq!(
**operation,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id, 1)])
);
let query = async_graphql::Request::new("{ receivedPosts { keys { author, index } } }");
let outcome = receiver
.query_user_application(application_id, &query)
.await?;
let expected = QueryOutcome {
response: async_graphql::Response::new(
async_graphql::Value::from_json(json!({
"receivedPosts": {
"keys": [
{ "author": sender.chain_id(), "index": 0 }
]
}
}))
.unwrap(),
),
operations: vec![],
};
assert_eq!(outcome, expected);
let text = "Follow sender2!".to_string();
let post = social::Operation::Post {
text: text.clone(),
image_url: None,
};
sender
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
let text = "Thanks for the shoutout!".to_string();
let post = social::Operation::Post {
text: text.clone(),
image_url: None,
};
sender2
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
receiver.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
Some([sender.chain_id()].into_iter().collect()),
None,
None,
None,
);
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let operations = certs[0].block().body.operations().collect::<Vec<_>>();
let [Operation::System(operation)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
assert_eq!(
**operation,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id, 2)])
);
receiver.options_mut().message_policy =
MessagePolicy::new(BlanketMessagePolicy::Accept, None, None, None, None);
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let operations = certs[0].block().body.operations().collect::<Vec<_>>();
let [Operation::System(operation)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
assert_eq!(
**operation,
SystemOperation::UpdateStreams(vec![(sender2.chain_id(), stream_id, 1)])
);
let text = "Have you followed already?".to_string();
let post = social::Operation::Post {
text: text.clone(),
image_url: None,
};
sender
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
receiver.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
None,
None,
Some(Default::default()),
);
let certs = receiver.process_inbox().await.unwrap().0;
assert!(certs.is_empty());
receiver.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
None,
None,
Some([application_id.forget_abi().into()].into_iter().collect()),
);
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 1);
let operations = certs[0].block().body.operations().collect::<Vec<_>>();
let [Operation::System(operation)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
assert_eq!(
**operation,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id, 3)])
);
let info = receiver
.synchronize_chain_state(receiver.chain_id())
.await?;
assert_eq!(info.epoch, Epoch(0));
admin_client
.stage_new_committee(builder.initial_committee.clone())
.await
.unwrap();
receiver.synchronize_from_validators().await.unwrap();
receiver.process_inbox().await.unwrap();
let info = receiver
.synchronize_chain_state(receiver.chain_id())
.await?;
assert_eq!(info.epoch, Epoch(1));
let request_unsubscribe = social::Operation::Unsubscribe {
chain_id: sender.chain_id(),
};
receiver
.execute_operation(Operation::user(application_id, &request_unsubscribe)?)
.await
.unwrap_ok_committed();
sender.synchronize_from_validators().await.unwrap();
let _certs = sender.process_inbox().await.unwrap();
let post = social::Operation::Post {
text: "Nobody will read this!".to_string(),
image_url: None,
};
sender
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
let certs = receiver.process_inbox().await.unwrap().0;
assert!(certs.is_empty());
let query = async_graphql::Request::new("{ receivedPosts { keys { author, index } } }");
let outcome = receiver
.query_user_application(application_id, &query)
.await
.unwrap();
let expected = QueryOutcome {
response: async_graphql::Response::new(
async_graphql::Value::from_json(json!({
"receivedPosts": {
"keys": [ { "author": sender.chain_id(), "index": 2 },
{ "author": sender.chain_id(), "index": 1 },
{ "author": sender.chain_id(), "index": 0 },
{ "author": sender2.chain_id(), "index": 0 } ]
}
}))
.unwrap(),
),
operations: vec![],
};
assert_eq!(outcome, expected);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_memory_event_streams_limit(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_event_streams_limit(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_event_streams_limit<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys)
.await?
.with_policy(ResourceControlPolicy::no_fees());
let sender = builder.add_root_chain(0, Amount::ONE).await?;
let mut receiver = builder.add_root_chain(1, Amount::ONE).await?;
let module_id = receiver.publish_wasm_example("social").await?;
let module_id = module_id.with_abi::<social::SocialAbi, (), ()>();
let (application_id, _cert) = receiver
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
let request_subscribe = social::Operation::Subscribe {
chain_id: sender.chain_id(),
};
receiver
.execute_operation(Operation::user(application_id, &request_subscribe)?)
.await
.unwrap_ok_committed();
let posts: Vec<_> = (0..8)
.map(|i| {
Operation::user(
application_id,
&social::Operation::Post {
text: format!("Post {i}"),
image_url: None,
},
)
.unwrap()
})
.collect();
sender
.execute_operations(posts, vec![])
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await.unwrap();
receiver.options_mut().max_new_events_per_block = 3;
let certs = receiver.process_inbox().await.unwrap().0;
assert_eq!(certs.len(), 3);
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
let operations = certs[0].block().body.operations().collect::<Vec<_>>();
let [Operation::System(op)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
assert_eq!(
**op,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id.clone(), 3)])
);
let operations = certs[1].block().body.operations().collect::<Vec<_>>();
let [Operation::System(op)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
assert_eq!(
**op,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id.clone(), 6)])
);
let operations = certs[2].block().body.operations().collect::<Vec<_>>();
let [Operation::System(op)] = &*operations else {
panic!("Expected one operation, got {:?}", operations);
};
assert_eq!(
**op,
SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id, 8)])
);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_memory_sparse_event_chain(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_sparse_event_chain(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_sparse_event_chain<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys)
.await?
.with_policy(ResourceControlPolicy::no_fees());
let sender = builder.add_root_chain(1, Amount::ONE).await?;
let receiver = builder.add_root_chain(2, Amount::ONE).await?;
let module_id = receiver.publish_wasm_example("social").await?;
let module_id = module_id.with_abi::<social::SocialAbi, (), ()>();
let (application_id, _cert) = receiver
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
let request_subscribe = social::Operation::Subscribe {
chain_id: sender.chain_id(),
};
receiver
.execute_operation(Operation::user(application_id, &request_subscribe)?)
.await
.unwrap_ok_committed();
sender.synchronize_from_validators().await.unwrap();
sender.process_inbox().await?;
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
let cert0 = sender
.execute_operation(Operation::user(
application_id,
&social::Operation::Post {
text: "First post".to_string(),
image_url: None,
},
)?)
.await
.unwrap_ok_committed();
let cert1 = sender
.burn(AccountOwner::CHAIN, Amount::from_millis(1))
.await
.unwrap_ok_committed();
let cert2 = sender
.execute_operation(Operation::user(
application_id,
&social::Operation::Post {
text: "Second post".to_string(),
image_url: None,
},
)?)
.await
.unwrap_ok_committed();
let notification = Notification {
chain_id: sender.chain_id(),
reason: Reason::NewEvents {
height: cert2.block().header.height,
hash: cert2.hash(),
event_streams: BTreeSet::from([stream_id]),
},
};
let validator = builder
.initial_committee
.validator_addresses()
.next()
.unwrap();
receiver
.process_notification_from(notification, validator)
.await;
assert!(
receiver
.storage_client()
.contains_certificate(cert0.hash())
.await?
);
assert!(
!receiver
.storage_client()
.contains_certificate(cert1.hash())
.await?
);
assert!(
receiver
.storage_client()
.contains_certificate(cert2.hash())
.await?
);
let receiver2 = builder.add_root_chain(3, Amount::ONE).await?;
let request_subscribe2 = social::Operation::Subscribe {
chain_id: sender.chain_id(),
};
receiver2
.execute_operation(Operation::user(application_id, &request_subscribe2)?)
.await
.unwrap_ok_committed();
assert!(
!receiver2
.storage_client()
.contains_certificate(cert0.hash())
.await?
);
receiver2.synchronize_from_validators().await.unwrap();
assert!(
receiver2
.storage_client()
.contains_certificate(cert0.hash())
.await?
);
assert!(
!receiver2
.storage_client()
.contains_certificate(cert1.hash())
.await?
);
assert!(
receiver2
.storage_client()
.contains_certificate(cert2.hash())
.await?
);
let certs = receiver2.process_inbox().await?.0;
assert!(!certs.is_empty(), "receiver2 should have events to process");
let has_update_streams = certs.iter().any(|cert| {
cert.block().body.operations().any(|op| {
matches!(op, Operation::System(op) if matches!(**op, SystemOperation::UpdateStreams(_)))
})
});
assert!(has_update_streams, "should have UpdateStreams operations");
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_memory_message_policy_accept_apps(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_message_policy_accept_apps(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_service_message_policy_accept_apps(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_message_policy_accept_apps(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime))
.await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_rocks_db_message_policy_accept_apps(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_message_policy_accept_apps(
RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await,
)
.await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_dynamo_db_message_policy_accept_apps(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_message_policy_accept_apps(DynamoDbStorageBuilder::with_wasm_runtime(wasm_runtime))
.await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime; "wasmtime"))]
#[test_log::test(tokio::test)]
async fn test_scylla_db_message_policy_accept_apps(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_message_policy_accept_apps(ScyllaDbStorageBuilder::with_wasm_runtime(wasm_runtime))
.await
}
async fn run_test_message_policy_accept_apps<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let pledger_chain = builder.add_root_chain(1, Amount::from_tokens(10)).await?;
let mut campaign_chain = builder.add_root_chain(2, Amount::from_tokens(10)).await?;
let pledger_owner = pledger_chain.preferred_owner().unwrap();
let campaign_owner = campaign_chain.preferred_owner().unwrap();
let fungible_module = pledger_chain.publish_wasm_example("fungible").await?;
let fungible_module =
fungible_module.with_abi::<fungible::FungibleTokenAbi, Parameters, InitialState>();
let accounts = BTreeMap::from_iter([(pledger_owner, Amount::from_tokens(1_000))]);
let state = InitialState { accounts };
let params = Parameters::new("FUN");
let (fungible_id, _cert) = pledger_chain
.create_application(fungible_module, ¶ms, &state, vec![])
.await
.unwrap_ok_committed();
let crowd_funding_module = pledger_chain.publish_wasm_example("crowd-funding").await?;
let crowd_funding_module =
crowd_funding_module.with_abi::<CrowdFundingAbi, ApplicationId, InstantiationArgument>();
let deadline = Timestamp::from(u64::MAX);
let target = Amount::from_tokens(10);
let instantiation_arg = InstantiationArgument {
owner: campaign_owner,
deadline,
target,
};
let (crowd_funding_id, _cert) = campaign_chain
.create_application(
crowd_funding_module,
&fungible_id.forget_abi(),
&instantiation_arg,
vec![],
)
.await
.unwrap_ok_committed();
let pledge_amount = Amount::from_tokens(5);
pledger_chain
.execute_operation(Operation::user(
crowd_funding_id,
&CrowdFundingOperation::Pledge {
owner: pledger_owner,
amount: pledge_amount,
},
)?)
.await
.unwrap_ok_committed();
campaign_chain.synchronize_from_validators().await?;
campaign_chain.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
Some([fungible_id.forget_abi().into()].into_iter().collect()),
None,
None,
);
let certs = campaign_chain.process_inbox().await?.0;
assert_eq!(certs.len(), 1, "Should accept bundle with fungible message");
pledger_chain
.execute_operation(Operation::user(
crowd_funding_id,
&CrowdFundingOperation::Pledge {
owner: pledger_owner,
amount: pledge_amount,
},
)?)
.await
.unwrap_ok_committed();
campaign_chain.synchronize_from_validators().await?;
campaign_chain.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
Some([crowd_funding_id.forget_abi().into()].into_iter().collect()),
None,
None,
);
let certs = campaign_chain.process_inbox().await?.0;
assert_eq!(
certs.len(),
1,
"Should accept bundle with crowd-funding message"
);
pledger_chain
.execute_operation(Operation::user(
crowd_funding_id,
&CrowdFundingOperation::Pledge {
owner: pledger_owner,
amount: pledge_amount,
},
)?)
.await
.unwrap_ok_committed();
campaign_chain.synchronize_from_validators().await?;
let fake_app_id = ApplicationId::new(CryptoHash::test_hash("fake app"));
campaign_chain.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
Some([fake_app_id.into()].into_iter().collect()),
None,
None,
);
let certs = campaign_chain.process_inbox().await?.0;
assert_eq!(
certs.len(),
0,
"Should reject bundle without message from fake app"
);
campaign_chain.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
None,
Some([fungible_id.forget_abi().into()].into_iter().collect()),
None,
);
let certs = campaign_chain.process_inbox().await?.0;
assert_eq!(
certs.len(),
0,
"Should reject bundle with message from non-allowed crowd-funding app"
);
campaign_chain.options_mut().message_policy = MessagePolicy::new(
BlanketMessagePolicy::Accept,
None,
None,
Some(
[
fungible_id.forget_abi().into(),
crowd_funding_id.forget_abi().into(),
]
.into_iter()
.collect(),
),
None,
);
let certs = campaign_chain.process_inbox().await?.0;
assert_eq!(
certs.len(),
1,
"Should accept bundle when all app messages are allowed"
);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_fuel_limit(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
let storage_builder = MemoryStorageBuilder::with_wasm_runtime(wasm_runtime);
let policy = ResourceControlPolicy {
maximum_wasm_fuel_per_block: 30_000,
blob_read: Amount::from_tokens(10), blob_published: Amount::from_attos(100),
blob_byte_read: Amount::from_tokens(10), blob_byte_published: Amount::from_attos(1),
..ResourceControlPolicy::default()
};
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(policy.clone());
let publisher = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let mut expected_balance = publisher.local_balance().await?;
let module_id = publisher.publish_wasm_example("counter").await?;
let module_id = module_id.with_abi::<counter::CounterAbi, (), u64>();
let mut blobs = publisher
.storage_client()
.read_blobs(&[
BlobId::new(module_id.contract_blob_hash, BlobType::ContractBytecode),
BlobId::new(module_id.service_blob_hash, BlobType::ServiceBytecode),
])
.await?
.into_iter()
.flatten();
expected_balance = expected_balance
- policy.blob_published * 2
- policy.blob_byte_published
* (blobs.next().unwrap().bytes().len() as u128
+ blobs.next().unwrap().bytes().len() as u128);
assert_eq!(publisher.local_balance().await?, expected_balance);
let initial_value = 10_u64;
let (application_id, _) = publisher
.create_application(module_id, &(), &initial_value, vec![])
.await
.unwrap_ok_committed();
let increment = 5_u64;
publisher
.execute_operation(Operation::user(application_id, &increment)?)
.await
.unwrap_ok_committed();
assert!(publisher
.execute_operations(
vec![Operation::user(application_id, &increment)?; 10],
vec![]
)
.await
.is_err());
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_skipping_proposal(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_skipping_proposal(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_skipping_proposal<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut keys = InMemorySigner::new(None);
let owner_a = keys.generate_new().into();
let owner_b = keys.generate_new().into();
let clock = storage_builder.clock().clone();
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys).await?;
let creator = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let module_id = creator.publish_wasm_example("hex-game").await?;
let module_id = module_id.with_abi::<HexAbi, (), Timeouts>();
let timeouts = Timeouts::default();
let (app_id, _) = creator
.create_application(module_id, &(), &timeouts, vec![])
.await
.unwrap_ok_committed();
let start_op = HexOperation::Start {
players: [owner_a, owner_b],
board_size: 11,
fee_budget: Amount::ONE,
timeouts: None, };
let cert = creator
.execute_operation(Operation::user(app_id, &start_op)?)
.await
.unwrap_ok_committed();
let blobs = cert.inner().block().created_blobs();
let chain_blob = blobs
.values()
.find(|blob| blob.content().blob_type() == BlobType::ChainDescription)
.unwrap();
let chain_id = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?.into();
builder.chain_owners.insert(chain_id, owner_a);
let mut client_a = builder
.make_client(chain_id, None, BlockHeight::ZERO)
.await?;
client_a.set_preferred_owner(owner_a);
builder.chain_owners.insert(chain_id, owner_b);
let mut client_b = builder
.make_client(chain_id, None, BlockHeight::ZERO)
.await?;
client_b.set_preferred_owner(owner_b);
client_a.synchronize_from_validators().await?;
let move_op = HexOperation::MakeMove { x: 5, y: 5 };
client_a
.execute_operation(Operation::user(app_id, &move_op)?)
.await?;
client_b.synchronize_from_validators().await?;
builder.set_fault_type([0, 1, 2, 3], FaultType::DontProcessValidated);
let move_op = HexOperation::MakeMove { x: 4, y: 4 };
let result = client_b
.execute_operation(Operation::user(app_id, &move_op)?)
.await;
assert_matches!(result, Err(chain_client::Error::CommunicationError(_)));
clock.add(timeouts.start_time * 2);
builder.set_fault_type([0, 1, 2, 3], FaultType::Honest);
let claim_victory_operation = HexOperation::ClaimVictory;
client_a.synchronize_from_validators().await?;
client_a
.execute_operation(Operation::user(app_id, &claim_victory_operation)?)
.await?;
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_publish_read_data_blob(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_publish_read_data_blob(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "storage-service")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_service_publish_read_data_blob(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_publish_read_data_blob(ServiceStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "rocksdb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_rocks_db_publish_read_data_blob(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_publish_read_data_blob(RocksDbStorageBuilder::with_wasm_runtime(wasm_runtime).await)
.await
}
#[ignore]
#[cfg(feature = "dynamodb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_dynamo_db_publish_read_data_blob(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_publish_read_data_blob(DynamoDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
#[ignore]
#[cfg(feature = "scylladb")]
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_scylla_db_publish_read_data_blob(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_publish_read_data_blob(ScyllaDbStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_publish_read_data_blob<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
use publish_read_data_blob::PublishReadDataBlobAbi;
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let client = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let module_id = client
.publish_wasm_example("publish-read-data-blob")
.await?;
let module_id = module_id.with_abi::<PublishReadDataBlobAbi, (), ()>();
let (application_id, _) = client
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
let test_data = b"This is test data for method 1.".to_vec();
let publish_op = publish_read_data_blob::Operation::CreateDataBlob(test_data.clone());
let certificate = client
.execute_operation(Operation::user(application_id, &publish_op)?)
.await
.unwrap_ok_committed();
let content = BlobContent::new_data(test_data.clone());
let hash = DataBlobHash(CryptoHash::new(&content));
let read_op = publish_read_data_blob::Operation::ReadDataBlob(hash, test_data);
client
.execute_operation(Operation::user(application_id, &read_op)?)
.await
.unwrap_ok_committed();
assert_eq!(certificate.block().body.oracle_responses[0].len(), 0);
let test_data = b"This is test data for method 2.".to_vec();
let combined_op = publish_read_data_blob::Operation::CreateAndReadDataBlob(test_data);
let certificate = client
.execute_operation(Operation::user(application_id, &combined_op)?)
.await
.unwrap_ok_committed();
assert_eq!(certificate.block().body.oracle_responses[0].len(), 0);
let test_data = b"This is test data for method 3.".to_vec();
let publish_op = publish_read_data_blob::Operation::CreateDataBlob(test_data.clone());
let content = BlobContent::new_data(test_data.clone());
let hash = DataBlobHash(CryptoHash::new(&content));
let read_op = publish_read_data_blob::Operation::ReadDataBlob(hash, test_data);
let op1 = Operation::user(application_id, &publish_op)?;
let op2 = Operation::user(application_id, &read_op)?;
let certificate = client
.execute_operations(vec![op1, op2], vec![])
.await
.unwrap_ok_committed();
assert_eq!(certificate.block().body.oracle_responses[0].len(), 0);
assert_eq!(certificate.block().body.oracle_responses[1].len(), 0);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_time_expiry_rounds(wasm_runtime: WasmRuntime) -> anyhow::Result<()> {
run_test_time_expiry_rounds(MemoryStorageBuilder::with_wasm_runtime(wasm_runtime)).await
}
async fn run_test_time_expiry_rounds<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let clock = storage_builder.clock().clone();
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let creator = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let module_id = creator.publish_wasm_example("time-expiry").await?;
let module_id = module_id.with_abi::<time_expiry::TimeExpiryAbi, (), ()>();
let (app_id, _) = creator
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
builder.set_fault_type([2, 3], FaultType::Offline);
let op1 = time_expiry::TimeExpiryOperation::ExpireAfter(TimeDelta::from_secs(5));
let result = creator
.execute_operation(Operation::user(app_id, &op1)?)
.await;
assert_matches!(result, Err(chain_client::Error::CommunicationError(_)));
let chain_info = creator.chain_info_with_manager_values().await?;
assert_eq!(
chain_info.manager.requested_proposed.unwrap().content.round,
Round::MultiLeader(0)
);
creator.clear_pending_proposal().await;
let op2 = time_expiry::TimeExpiryOperation::ExpireAfter(TimeDelta::from_secs(6));
let result = creator
.execute_operation(Operation::user(app_id, &op2)?)
.await;
assert_matches!(result, Err(chain_client::Error::CommunicationError(_)));
let chain_info = creator.chain_info_with_manager_values().await?;
assert_eq!(
chain_info.manager.requested_proposed.unwrap().content.round,
Round::MultiLeader(1)
);
creator.clear_pending_proposal().await;
let op3 = time_expiry::TimeExpiryOperation::ExpireAfter(TimeDelta::from_secs(7));
let result = creator
.execute_operation(Operation::user(app_id, &op3)?)
.await;
assert_matches!(result, Err(chain_client::Error::CommunicationError(_)));
let chain_info = creator.chain_info_with_manager_values().await?;
assert_eq!(
chain_info.manager.requested_proposed.unwrap().content.round,
Round::SingleLeader(0)
);
builder.set_fault_type([0, 1, 2, 3], FaultType::Honest);
clock.add(TimeDelta::from_secs(10));
creator.clear_pending_proposal().await;
let op4 = time_expiry::TimeExpiryOperation::ExpireAfter(TimeDelta::from_secs(600));
let result = creator
.execute_operation(Operation::user(app_id, &op4)?)
.await?;
assert_matches!(result, ClientOutcome::WaitForTimeout(_));
let certificate = loop {
clock.add(TimeDelta::from_secs(20));
match creator.process_pending_block().await? {
ClientOutcome::Committed(Some(cert)) => break cert,
ClientOutcome::WaitForTimeout(_)
if clock.current_time()
< Timestamp::from(0).saturating_add(TimeDelta::from_secs(200)) =>
{
continue
}
outcome => panic!("Failed to commit the block: {outcome:?}"),
}
};
let operations: Vec<_> = certificate.block().body.operations().collect();
assert_eq!(operations.len(), 1);
let operation = &operations[0];
if let Operation::User {
application_id,
bytes,
} = operation
{
assert_eq!(application_id, &app_id.forget_abi());
let decoded_op: time_expiry::TimeExpiryOperation = bcs::from_bytes(bytes)?;
assert_matches!(
decoded_op,
time_expiry::TimeExpiryOperation::ExpireAfter(delta) if delta == TimeDelta::from_secs(600)
);
} else {
panic!("Expected a user operation");
}
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_auto_retry_produces_consistent_outcome(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_auto_retry_produces_consistent_outcome(MemoryStorageBuilder::with_wasm_runtime(
wasm_runtime,
))
.await
}
async fn run_test_auto_retry_produces_consistent_outcome<B>(
storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 1, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let publisher = builder.add_root_chain(0, Amount::from_tokens(3)).await?;
let creator = builder.add_root_chain(1, Amount::ONE).await?;
let receiver = builder.add_root_chain(2, Amount::ONE).await?;
let receiver_id = receiver.chain_id();
let module_id1 = publisher.publish_wasm_example("counter").await?;
let module_id1 = module_id1.with_abi::<counter::CounterAbi, (), u64>();
let module_id2 = publisher.publish_wasm_example("meta-counter").await?;
let module_id2 =
module_id2.with_abi::<meta_counter::MetaCounterAbi, ApplicationId<CounterAbi>, ()>();
creator.synchronize_from_validators().await?;
let initial_value = 10_u64;
let (application_id1, _) = creator
.create_application(module_id1, &(), &initial_value, vec![])
.await
.unwrap_ok_committed();
let (application_id2, _) = creator
.create_application(
module_id2,
&application_id1,
&(),
vec![application_id1.forget_abi()],
)
.await
.unwrap_ok_committed();
let mut operation = meta_counter::Operation::increment(receiver_id, 5, true);
operation.fuel_grant = 1_000_000;
creator
.execute_operation(Operation::user(application_id2, &operation)?)
.await
.unwrap_ok_committed();
let operation = meta_counter::Operation::fail(receiver_id);
creator
.execute_operation(Operation::user(application_id2, &operation)?)
.await
.unwrap_ok_committed();
receiver.synchronize_from_validators().await?;
let certs = receiver.process_inbox().await?.0;
assert_eq!(certs.len(), 1, "Should have one certificate");
let cert = &certs[0];
let incoming_bundles: Vec<_> = cert.block().body.incoming_bundles().collect();
assert!(!incoming_bundles.is_empty(), "Should have incoming bundles");
let rejected_count = incoming_bundles
.iter()
.filter(|b| b.action == MessageAction::Reject)
.count();
assert!(rejected_count > 0, "At least one bundle should be rejected");
let query = async_graphql::Request::new("{ value }");
let outcome = receiver
.query_user_application(application_id2, &query)
.await?;
let expected = QueryOutcome {
response: async_graphql::Response::new(async_graphql::Value::from_json(
json!({"value": 5}),
)?),
operations: vec![],
};
assert_eq!(outcome, expected);
Ok(())
}
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_memory_read_event_downloads_publisher_chain(
wasm_runtime: WasmRuntime,
) -> anyhow::Result<()> {
run_test_read_event_downloads_publisher_chain(MemoryStorageBuilder::with_wasm_runtime(
wasm_runtime,
))
.await
}
async fn run_test_read_event_downloads_publisher_chain<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let keys = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 0, keys)
.await?
.with_policy(ResourceControlPolicy::all_categories());
let sender = builder.add_root_chain(0, Amount::ONE).await?;
let receiver = builder.add_root_chain(1, Amount::ONE).await?;
let module_id = receiver.publish_wasm_example("social").await?;
let module_id = module_id.with_abi::<social::SocialAbi, (), ()>();
let (application_id, _cert) = receiver
.create_application(module_id, &(), &(), vec![])
.await
.unwrap_ok_committed();
let request_subscribe = social::Operation::Subscribe {
chain_id: sender.chain_id(),
};
receiver
.execute_operation(Operation::user(application_id, &request_subscribe)?)
.await
.unwrap_ok_committed();
let post = social::Operation::Post {
text: "Hello from sender!".to_string(),
image_url: None,
};
sender
.execute_operation(Operation::user(application_id, &post)?)
.await
.unwrap_ok_committed();
let stream_id = StreamId {
application_id: application_id.forget_abi().into(),
stream_name: b"posts".into(),
};
receiver
.execute_operations(
vec![SystemOperation::UpdateStreams(vec![(sender.chain_id(), stream_id, 1)]).into()],
vec![],
)
.await
.unwrap_ok_committed();
let query = Request::new("{ receivedPosts { keys { author, index } } }");
let outcome = receiver
.query_user_application(application_id, &query)
.await?;
let expected = QueryOutcome {
response: async_graphql::Response::new(
async_graphql::Value::from_json(json!({
"receivedPosts": {
"keys": [
{ "author": sender.chain_id(), "index": 0 }
]
}
}))
.unwrap(),
),
operations: vec![],
};
assert_eq!(outcome, expected);
Ok(())
}