#![allow(clippy::large_futures)]
#[path = "./wasm_worker_tests.rs"]
mod wasm;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
iter,
sync::{Arc, Mutex},
time::Duration,
};
use assert_matches::assert_matches;
use linera_base::{
crypto::{
AccountPublicKey, AccountSecretKey, AccountSignature, CryptoHash, InMemorySigner,
ValidatorKeypair,
},
data_types::*,
identifiers::{Account, AccountOwner, ChainId, EventId, StreamId},
ownership::{ChainOwnership, TimeoutConfig},
};
use linera_chain::{
data_types::{
BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, ChainAndHeight,
IncomingBundle, LiteValue, LiteVote, MessageAction, MessageBundle, OperationResult,
PostedMessage, SignatureAggregator, Transaction, Vote,
},
manager::LockingBlock,
test::{make_child_block, make_first_block, BlockTestExt, MessageTestExt, VoteTestExt},
types::{
CertificateKind, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate,
GenericCertificate, Timeout, ValidatedBlock,
},
ChainError, ChainExecutionContext, ChainStateView,
};
use linera_execution::{
committee::Committee,
system::{
AdminOperation, OpenChainConfig, SystemMessage, SystemOperation,
EPOCH_STREAM_NAME as NEW_EPOCH_STREAM_NAME, REMOVED_EPOCH_STREAM_NAME,
},
test_utils::{
dummy_chain_description, ExpectedCall, RegisterMockApplication, SystemExecutionState,
},
ExecutionError, ExecutionRuntimeContext, Message, MessageKind, OutgoingMessage, Query,
QueryContext, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
};
use linera_storage::{DbStorage, Storage, TestClock};
use linera_views::{
context::Context, memory::MemoryDatabase, random::generate_test_namespace,
store::TestKeyValueDatabase as _, views::RootView,
};
use test_case::test_case;
use test_log::test;
#[cfg(feature = "dynamodb")]
use crate::test_utils::DynamoDbStorageBuilder;
#[cfg(feature = "rocksdb")]
use crate::test_utils::RocksDbStorageBuilder;
#[cfg(feature = "scylladb")]
use crate::test_utils::ScyllaDbStorageBuilder;
use crate::{
chain_worker::CrossChainUpdateHelper,
data_types::*,
test_utils::{MemoryStorageBuilder, StorageBuilder},
worker::{
Notification,
Reason::{self, NewBlock, NewIncomingBundle},
WorkerError, WorkerState,
},
ChainWorkerConfig,
};
const TEST_GRACE_PERIOD_MICROS: u64 = 500_000;
struct TestEnvironment<S: Storage> {
committee: Committee,
worker: WorkerState<S>,
admin_keypair: AccountSecretKey,
admin_description: ChainDescription,
other_chains: BTreeMap<ChainId, ChainDescription>,
}
impl<S> TestEnvironment<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
async fn new(storage: S, is_client: bool, has_long_lived_services: bool) -> Self {
Self::new_with_amount(
storage,
is_client,
has_long_lived_services,
Amount::from_tokens(1_000_000),
)
.await
}
async fn new_with_amount(
storage: S,
is_client: bool,
has_long_lived_services: bool,
amount: Amount,
) -> Self {
let validator_keypair = ValidatorKeypair::generate();
let account_secret = AccountSecretKey::generate();
let committee = Committee::make_simple(vec![(
validator_keypair.public_key,
account_secret.public(),
)]);
let origin = ChainOrigin::Root(0);
let config = InitialChainConfig {
balance: amount,
ownership: ChainOwnership::single(account_secret.public().into()),
epoch: Epoch::ZERO,
min_active_epoch: Epoch::ZERO,
max_active_epoch: Epoch::ZERO,
application_permissions: Default::default(),
};
let admin_description = ChainDescription::new(origin, config, Timestamp::from(0));
let committee_blob = Blob::new_committee(bcs::to_bytes(&committee).unwrap());
storage
.write_blob(&Blob::new_chain_description(&admin_description))
.await
.expect("writing a blob should not fail");
storage
.write_blob(&committee_blob)
.await
.expect("writing a blob should succeed");
storage
.write_network_description(&NetworkDescription {
admin_chain_id: admin_description.id(),
genesis_config_hash: CryptoHash::test_hash("genesis config"),
genesis_timestamp: Timestamp::from(0),
genesis_committee_blob_hash: committee_blob.id().hash,
name: "test network".to_string(),
})
.await
.expect("writing a network description should not fail");
let config = ChainWorkerConfig {
nickname: "Single validator node".to_string(),
allow_inactive_chains: is_client,
allow_messages_from_deprecated_epochs: is_client,
long_lived_services: has_long_lived_services,
block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS),
sender_chain_ttl: None,
..ChainWorkerConfig::default()
}
.with_key_pair(Some(validator_keypair.secret_key));
let worker = WorkerState::new(storage, config, None);
Self {
committee,
worker,
admin_description,
admin_keypair: account_secret,
other_chains: BTreeMap::new(),
}
}
fn admin_chain_id(&self) -> ChainId {
self.admin_description.id()
}
fn committee(&self) -> &Committee {
&self.committee
}
fn worker(&self) -> &WorkerState<S> {
&self.worker
}
fn with_priority_bundle_origins(mut self, origins: HashSet<ChainId>) -> Self {
self.worker = self.worker.with_priority_bundle_origins(origins);
self
}
fn with_cross_chain_message_chunk_limit(mut self, limit: usize) -> Self {
self.worker.set_cross_chain_message_chunk_limit(limit);
self
}
fn admin_public_key(&self) -> AccountPublicKey {
self.admin_keypair.public()
}
async fn add_root_chain(
&mut self,
index: u32,
owner: AccountOwner,
balance: Amount,
) -> ChainDescription {
self.add_root_chain_with_ownership(index, balance, ChainOwnership::single(owner))
.await
}
async fn add_root_chain_with_ownership(
&mut self,
index: u32,
balance: Amount,
ownership: ChainOwnership,
) -> ChainDescription {
let origin = ChainOrigin::Root(index);
let config = InitialChainConfig {
epoch: self.admin_description.config().epoch,
ownership,
min_active_epoch: self.admin_description.config().min_active_epoch,
max_active_epoch: self.admin_description.config().max_active_epoch,
balance,
application_permissions: Default::default(),
};
let description = ChainDescription::new(origin, config, Timestamp::from(0));
self.other_chains
.insert(description.id(), description.clone());
self.worker
.storage
.create_chain(description.clone())
.await
.unwrap();
description
}
async fn add_child_chain(
&mut self,
parent_id: ChainId,
owner: AccountOwner,
balance: Amount,
) -> ChainDescription {
let origin = ChainOrigin::Child {
parent: parent_id,
block_height: BlockHeight(0),
chain_index: 0,
};
let config = InitialChainConfig {
epoch: self.admin_description.config().epoch,
ownership: ChainOwnership::single(owner),
min_active_epoch: self.admin_description.config().min_active_epoch,
max_active_epoch: self.admin_description.config().max_active_epoch,
balance,
application_permissions: Default::default(),
};
let description = ChainDescription::new(origin, config, Timestamp::from(0));
self.other_chains
.insert(description.id(), description.clone());
self.worker
.storage
.create_chain(description.clone())
.await
.unwrap();
description
}
fn make_certificate<T>(&self, value: T) -> GenericCertificate<T>
where
T: CertificateValue,
{
self.make_certificate_with_round(value, Round::MultiLeader(0))
}
fn make_certificate_with_round<T>(&self, value: T, round: Round) -> GenericCertificate<T>
where
T: CertificateValue,
{
let vote = LiteVote::new(
LiteValue::new(&value),
round,
self.worker.chain_worker_config.key_pair().unwrap(),
);
let mut builder = SignatureAggregator::new(value, round, &self.committee);
builder
.append(self.worker.public_key(), vote.signature)
.unwrap()
.unwrap()
}
#[expect(clippy::too_many_arguments)]
async fn make_simple_transfer_certificate(
&self,
chain_description: ChainDescription,
chain_owner_pubkey: AccountPublicKey,
target_id: ChainId,
amount: Amount,
incoming_bundles: Vec<IncomingBundle>,
balance: Amount,
previous_confirmed_blocks: Vec<&ConfirmedBlockCertificate>,
) -> ConfirmedBlockCertificate {
self.make_transfer_certificate_for_epoch(
chain_description,
chain_owner_pubkey,
chain_owner_pubkey.into(),
AccountOwner::CHAIN,
Account::chain(target_id),
amount,
incoming_bundles,
Epoch::ZERO,
balance,
BTreeMap::new(),
previous_confirmed_blocks,
)
.await
}
#[expect(clippy::too_many_arguments)]
async fn make_transfer_certificate(
&self,
chain_description: ChainDescription,
chain_owner_pubkey: AccountPublicKey,
authenticated_signer: AccountOwner,
source: AccountOwner,
recipient: Account,
amount: Amount,
incoming_bundles: Vec<IncomingBundle>,
balance: Amount,
balances: BTreeMap<AccountOwner, Amount>,
previous_confirmed_blocks: Vec<&ConfirmedBlockCertificate>,
) -> ConfirmedBlockCertificate {
self.make_transfer_certificate_for_epoch(
chain_description,
chain_owner_pubkey,
authenticated_signer,
source,
recipient,
amount,
incoming_bundles,
Epoch::ZERO,
balance,
balances,
previous_confirmed_blocks,
)
.await
}
#[expect(clippy::too_many_arguments)]
async fn make_transfer_certificate_for_epoch(
&self,
chain_description: ChainDescription,
chain_owner_pubkey: AccountPublicKey,
authenticated_signer: AccountOwner,
source: AccountOwner,
recipient: Account,
amount: Amount,
incoming_bundles: Vec<IncomingBundle>,
epoch: Epoch,
balance: Amount,
balances: BTreeMap<AccountOwner, Amount>,
previous_confirmed_blocks: Vec<&ConfirmedBlockCertificate>,
) -> ConfirmedBlockCertificate {
let chain_id = chain_description.id();
let system_state = SystemExecutionState {
committees: [(epoch, self.committee.clone())].into_iter().collect(),
ownership: ChainOwnership::single(chain_owner_pubkey.into()),
balance,
balances,
admin_chain_id: Some(self.admin_chain_id()),
..SystemExecutionState::new(chain_description)
};
let mut block = match previous_confirmed_blocks.first() {
None => make_first_block(chain_id),
Some(cert) => make_child_block(cert.value()),
}
.with_transfer(source, recipient, amount);
block.authenticated_signer = Some(authenticated_signer);
block.epoch = epoch;
let mut messages = incoming_bundles
.iter()
.flat_map(|incoming_bundle| {
incoming_bundle
.bundle
.messages
.iter()
.map(|posted_message| {
if matches!(incoming_bundle.action, MessageAction::Reject)
&& matches!(posted_message.kind, MessageKind::Tracked)
{
vec![OutgoingMessage {
authenticated_signer: posted_message.authenticated_signer,
destination: incoming_bundle.origin,
grant: Amount::ZERO,
refund_grant_to: None,
kind: MessageKind::Bouncing,
message: posted_message.message.clone(),
}]
} else {
Vec::new()
}
})
})
.collect::<Vec<_>>();
block.transactions = incoming_bundles
.into_iter()
.map(Transaction::ReceiveMessages)
.chain(block.transactions)
.collect();
if chain_id != recipient.chain_id {
messages.push(vec![direct_outgoing_message(
recipient.chain_id,
MessageKind::Tracked,
SystemMessage::Credit {
source,
target: recipient.owner,
amount,
},
)]);
} else {
messages.push(Vec::new());
}
let tx_count = block.transactions.len();
let oracle_responses = iter::repeat_with(Vec::new).take(tx_count).collect();
let events = iter::repeat_with(Vec::new).take(tx_count).collect();
let blobs = iter::repeat_with(Vec::new).take(tx_count).collect();
let operation_results = vec![OperationResult(Vec::new()); block.operations().count()];
let state_hash = system_state.into_hash().await;
let previous_message_blocks = messages
.iter()
.flatten()
.map(|message| message.destination)
.filter_map(|recipient| {
previous_confirmed_blocks
.iter()
.find(|block| {
block
.inner()
.block()
.body
.messages
.iter()
.flatten()
.any(|message| message.destination == recipient)
})
.map(|block| {
(
recipient,
(block.hash(), block.inner().block().header.height),
)
})
})
.collect();
let value = ConfirmedBlock::new(
BlockExecutionOutcome {
messages,
previous_message_blocks,
previous_event_blocks: BTreeMap::new(),
events,
blobs,
state_hash,
oracle_responses,
operation_results,
}
.with(block),
);
self.make_certificate(value)
}
pub fn system_execution_state(&self, chain_id: &ChainId) -> SystemExecutionState {
let description = if *chain_id == self.admin_chain_id() {
self.admin_description.clone()
} else {
self.other_chains
.get(chain_id)
.expect("Unknown chain")
.clone()
};
SystemExecutionState {
admin_chain_id: Some(self.admin_chain_id()),
timestamp: description.timestamp(),
committees: [(Epoch::ZERO, self.committee.clone())]
.into_iter()
.collect(),
..SystemExecutionState::new(description.clone())
}
}
}
async fn assert_no_removed_bundles<C>(chain: &ChainStateView<C>)
where
C: Context + Clone + Send + Sync + 'static,
C::Extra: ExecutionRuntimeContext,
{
for (_, inbox) in chain.inboxes.try_load_all_entries().await.unwrap() {
assert_eq!(inbox.removed_bundles.front().await.unwrap(), None);
}
}
fn direct_outgoing_message(
recipient: ChainId,
kind: MessageKind,
message: SystemMessage,
) -> OutgoingMessage {
OutgoingMessage {
destination: recipient,
authenticated_signer: None,
grant: Amount::ZERO,
refund_grant_to: None,
kind,
message: Message::System(message),
}
}
fn system_credit_message(amount: Amount) -> Message {
Message::System(SystemMessage::Credit {
source: AccountOwner::CHAIN,
target: AccountOwner::CHAIN,
amount,
})
}
fn direct_credit_message(recipient: ChainId, amount: Amount) -> OutgoingMessage {
let message = SystemMessage::Credit {
source: AccountOwner::CHAIN,
target: AccountOwner::CHAIN,
amount,
};
direct_outgoing_message(recipient, MessageKind::Tracked, message)
}
fn generate_key_pairs(signer: &mut InMemorySigner, count: usize) -> Vec<AccountPublicKey> {
let mut public_keys = iter::repeat_with(|| signer.generate_new())
.take(count)
.collect::<Vec<_>>();
public_keys.sort_by_key(|pk| AccountOwner::from(*pk));
public_keys
}
fn update_recipient_direct(
recipient: ChainId,
certificate: &ConfirmedBlockCertificate,
) -> CrossChainRequest {
let sender = certificate.inner().block().header.chain_id;
let previous_height = certificate
.inner()
.block()
.body
.previous_message_blocks
.get(&recipient)
.map(|(_, h)| *h);
let bundles = certificate.message_bundles_for(recipient).collect();
CrossChainRequest::UpdateRecipient {
sender,
recipient,
bundles,
previous_height,
}
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_bad_signature<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_public_key = signer.generate_new();
let sender_owner = sender_public_key.into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::from_tokens(5))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
let unknown_key_pair = AccountSecretKey::generate();
let original_public_key = match block_proposal.signature {
AccountSignature::Ed25519 { public_key, .. } => public_key,
_ => {
panic!(
"Expected an Ed25519 signature, found: {:?}",
block_proposal.signature
);
}
};
let mut bad_signature_block_proposal = block_proposal.clone();
let bad_signature = match unknown_key_pair.sign(&block_proposal.content) {
AccountSignature::Ed25519 { signature, .. } => AccountSignature::Ed25519 {
public_key: original_public_key,
signature,
},
_ => panic!("Expected an Ed25519 signature"),
};
bad_signature_block_proposal.signature = bad_signature;
assert_matches!(
env.worker()
.handle_block_proposal(bad_signature_block_proposal)
.await,
Err(WorkerError::CryptoError(error))
if matches!(error, linera_base::crypto::CryptoError::InvalidSignature {..})
);
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none());
assert!(chain.manager.validated_vote().is_none());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_zero_amount<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_owner = signer.generate_new().into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let zero_amount_block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::ZERO)
.with_authenticated_signer(Some(sender_owner))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker()
.handle_block_proposal(zero_amount_block_proposal)
.await,
Err(
WorkerError::ChainError(error)
) if matches!(&*error, ChainError::ExecutionError(
execution_error, ChainExecutionContext::Operation(_)
) if matches!(**execution_error, ExecutionError::IncorrectTransferAmount))
);
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none());
assert!(chain.manager.validated_vote().is_none());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_valid_timestamps<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let storage = storage_builder.build().await?;
let clock = storage_builder.clock();
let public_key = signer.generate_new();
let owner = public_key.into();
let balance = Amount::from_tokens(5);
let small_transfer = Amount::from_micros(1);
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner, balance).await;
let chain_2_desc = env.add_root_chain(2, owner, balance).await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
{
let block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner))
.with_timestamp(Timestamp::from(TEST_GRACE_PERIOD_MICROS + 1_000_000))
.into_first_proposal(owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::InvalidTimestamp { .. })
);
}
let block_0_time = Timestamp::from(TEST_GRACE_PERIOD_MICROS);
let certificate = {
let block = make_first_block(chain_1)
.with_timestamp(block_0_time)
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner));
let block_proposal = block
.clone()
.into_first_proposal(owner, &signer)
.await
.unwrap();
let future = env.worker().handle_block_proposal(block_proposal);
clock.set(block_0_time);
future.await?;
let system_state = SystemExecutionState {
balance: balance - small_transfer,
timestamp: block_0_time,
..env.system_execution_state(&chain_1_desc.id())
};
let state_hash = system_state.into_hash().await;
let value = ConfirmedBlock::new(
BlockExecutionOutcome {
state_hash,
messages: vec![vec![direct_credit_message(chain_2, small_transfer)]],
oracle_responses: vec![vec![]],
events: vec![vec![]],
blobs: vec![vec![]],
operation_results: vec![OperationResult::default()],
..BlockExecutionOutcome::default()
}
.with(block),
);
env.make_certificate(value)
};
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
{
let block_proposal = make_child_block(&certificate.into_value())
.with_timestamp(block_0_time.saturating_sub_micros(1))
.into_first_proposal(owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::ChainError(error))
if matches!(*error, ChainError::InvalidBlockTimestamp { .. })
);
}
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_timestamp_delay<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
use std::task::Poll;
use futures::{future::poll_fn, Future as _};
use tokio::task::yield_now;
let mut signer = InMemorySigner::new(None);
let storage = storage_builder.build().await?;
let clock = storage_builder.clock();
let public_key = signer.generate_new();
let owner = public_key.into();
let balance = Amount::from_tokens(5);
let small_transfer = Amount::from_micros(1);
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner, balance).await;
let chain_2_desc = env.add_root_chain(2, owner, balance).await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
clock.set(Timestamp::from(0));
let past_timestamp = Timestamp::from(0);
clock.set(Timestamp::from(1000)); let proposed_block = make_first_block(chain_1)
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner))
.with_timestamp(past_timestamp);
let block_proposal = proposed_block
.clone()
.into_first_proposal(owner, &signer)
.await
.unwrap();
let (_, block, _, _) = env
.worker()
.stage_block_execution(
proposed_block,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let result = env.worker().handle_block_proposal(block_proposal).await;
assert!(result.is_ok(), "Past timestamp should be accepted");
let certificate = env.make_certificate(ConfirmedBlock::new(block));
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let current_timestamp = Timestamp::from(2000);
clock.set(current_timestamp);
let proposed_block = make_child_block(&certificate.clone().into_value())
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner))
.with_timestamp(current_timestamp);
let block_proposal = proposed_block
.clone()
.into_first_proposal(owner, &signer)
.await
.unwrap();
let (_, block, _, _) = env
.worker()
.stage_block_execution(
proposed_block,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let result = env.worker().handle_block_proposal(block_proposal).await;
assert!(result.is_ok(), "Current timestamp should be accepted");
let certificate = env.make_certificate(ConfirmedBlock::new(block));
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let future_timestamp = Timestamp::from(3000 + TEST_GRACE_PERIOD_MICROS / 2);
clock.set(Timestamp::from(3000));
let proposed_block = make_child_block(&certificate.clone().into_value())
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner))
.with_timestamp(future_timestamp);
let block_proposal = proposed_block
.clone()
.into_first_proposal(owner, &signer)
.await
.unwrap();
let (_, block, _, _) = env
.worker()
.stage_block_execution(
proposed_block,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let worker = env.worker().clone();
let mut future = Box::pin(worker.handle_block_proposal(block_proposal));
yield_now().await;
let is_pending = poll_fn(|cx| match future.as_mut().poll(cx) {
Poll::Pending => Poll::Ready(true),
Poll::Ready(_) => Poll::Ready(false),
})
.await;
assert!(is_pending, "Future-timestamp proposal should be delayed");
clock.set(future_timestamp);
let result = future.as_mut().await;
assert!(
result.is_ok(),
"Future timestamp within grace period should succeed after delay"
);
let certificate = env.make_certificate(ConfirmedBlock::new(block));
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let far_future_timestamp = Timestamp::from(4000 + TEST_GRACE_PERIOD_MICROS + 1_000_000);
clock.set(Timestamp::from(4000));
let proposed_block = make_child_block(&certificate.into_value())
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner))
.with_timestamp(far_future_timestamp);
let block_proposal = proposed_block
.into_first_proposal(owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::InvalidTimestamp { .. })
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_unknown_sender<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_public_key = signer.generate_new();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_public_key.into(), Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let unknown_key = AccountSecretKey::generate();
let unknown_owner = unknown_key.public().into();
let new_signer: InMemorySigner = InMemorySigner::from_iter(vec![(unknown_owner, unknown_key)]);
let unknown_sender_block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::from_tokens(5))
.into_first_proposal(unknown_owner, &new_signer)
.await
.unwrap();
assert_matches!(
env.worker()
.handle_block_proposal(unknown_sender_block_proposal)
.await,
Err(WorkerError::InvalidOwner)
);
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none());
assert!(chain.manager.validated_vote().is_none());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_with_chaining<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_public_key = signer.generate_new();
let sender_owner = sender_public_key.into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_1 = chain_desc.id();
let chain_2 = env.add_root_chain(2, sender_owner, Amount::ZERO).await.id();
let block_proposal0 = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::ONE)
.with_authenticated_signer(Some(sender_owner))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
let certificate0 = env
.make_simple_transfer_certificate(
chain_desc.clone(),
sender_public_key,
chain_2,
Amount::ONE,
Vec::new(),
Amount::from_tokens(4),
vec![],
)
.await;
let block_proposal1 = make_child_block(certificate0.value())
.with_simple_transfer(chain_2, Amount::from_tokens(2))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal1.clone()).await,
Err(WorkerError::ChainError(error)) if matches!(
*error,
ChainError::UnexpectedBlockHeight {
expected_block_height: BlockHeight(0),
found_block_height: BlockHeight(1)
})
);
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none());
assert!(chain.manager.validated_vote().is_none());
drop(chain);
env.worker()
.handle_block_proposal(block_proposal0.clone())
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
let block = chain.manager.validated_vote().unwrap().value().block();
assert!(block.matches_proposed_block(&block_proposal0.content.block));
assert!(chain.manager.confirmed_vote().is_none());
let block_certificate0 =
env.make_certificate(chain.manager.validated_vote().unwrap().value().clone());
drop(chain);
env.worker()
.handle_validated_certificate(block_certificate0)
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
let block = chain.manager.confirmed_vote().unwrap().value().block();
assert!(block.matches_proposed_block(&block_proposal0.content.block));
assert!(chain.manager.validated_vote().is_none());
drop(chain);
env.worker()
.handle_confirmed_certificate(certificate0, None)
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
drop(chain);
env.worker()
.handle_block_proposal(block_proposal1.clone())
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
let block = chain.manager.validated_vote().unwrap().value().block();
assert!(block.matches_proposed_block(&block_proposal1.content.block));
assert!(chain.manager.confirmed_vote().is_none());
drop(chain);
assert_matches!(
env.worker().handle_block_proposal(block_proposal0).await,
Err(WorkerError::ChainError(error)) if matches!(
*error,
ChainError::UnexpectedBlockHeight {
expected_block_height: BlockHeight(1),
found_block_height: BlockHeight(0),
})
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_sparse_chain<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_public_key = signer.generate_new();
let sender_owner = sender_public_key.into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_1 = chain_1_desc.id();
let chain_2_desc = env.add_root_chain(2, sender_owner, Amount::ZERO).await;
let chain_2 = chain_2_desc.id();
let chain_3_desc = env.add_root_chain(3, sender_owner, Amount::ZERO).await;
let chain_3 = chain_3_desc.id();
let certificate0 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_public_key,
chain_2,
Amount::ONE,
Vec::new(),
Amount::from_tokens(4),
vec![],
)
.await;
let certificate1 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_public_key,
chain_3,
Amount::ONE,
Vec::new(),
Amount::from_tokens(3),
vec![&certificate0],
)
.await;
let certificate2 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_public_key,
chain_2,
Amount::ONE,
Vec::new(),
Amount::from_tokens(2),
vec![&certificate1, &certificate0],
)
.await;
let block_proposal1 = make_child_block(certificate2.value())
.with_simple_transfer(chain_2, Amount::from_tokens(1))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
env.worker()
.handle_confirmed_certificate(certificate0, None)
.await?;
env.worker()
.handle_confirmed_certificate(certificate2.clone(), None)
.await?;
{
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_eq!(chain.tip_state.get().next_block_height, BlockHeight(1));
}
let proposal_result = env
.worker()
.handle_block_proposal(block_proposal1.clone())
.await;
assert_matches!(
proposal_result,
Err(WorkerError::ChainError(err)) if matches!(*err, ChainError::UnexpectedBlockHeight {
expected_block_height: BlockHeight(1),
found_block_height: BlockHeight(3)
})
);
env.worker()
.handle_confirmed_certificate(certificate1, None)
.await?;
{
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_eq!(chain.tip_state.get().next_block_height, BlockHeight(2));
}
env.worker()
.handle_confirmed_certificate(certificate2, None)
.await?;
{
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_eq!(chain.tip_state.get().next_block_height, BlockHeight(3));
}
let proposal_result = env
.worker()
.handle_block_proposal(block_proposal1.clone())
.await;
assert_matches!(proposal_result, Ok(_));
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_with_incoming_bundles<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_public_key = signer.generate_new();
let sender_owner = sender_public_key.into();
let recipient_public_key = signer.generate_new();
let recipient_owner = recipient_public_key.into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(6))
.await;
let chain_2_desc = env.add_root_chain(2, recipient_owner, Amount::ZERO).await;
let chain_3_desc = env.add_root_chain(3, recipient_owner, Amount::ZERO).await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let chain_3 = chain_3_desc.id();
let certificate0 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![
vec![direct_credit_message(chain_2, Amount::ONE)],
vec![direct_credit_message(chain_2, Amount::from_tokens(2))],
],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new(); 2],
blobs: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
balance: Amount::from_tokens(3),
..env.system_execution_state(&chain_1_desc.id())
}
.into_hash()
.await,
oracle_responses: vec![Vec::new(); 2],
operation_results: vec![OperationResult::default(); 2],
}
.with(
make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::ONE)
.with_simple_transfer(chain_2, Amount::from_tokens(2))
.with_authenticated_signer(Some(sender_owner)),
),
));
let certificate1 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(chain_2, Amount::from_tokens(3))]],
previous_message_blocks: BTreeMap::from([(
chain_2,
(certificate0.hash(), BlockHeight(0)),
)]),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
balance: Amount::ZERO,
..env.system_execution_state(&chain_1_desc.id())
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![OperationResult::default()],
}
.with(
make_child_block(&certificate0.clone().into_value())
.with_simple_transfer(chain_2, Amount::from_tokens(3))
.with_authenticated_signer(Some(sender_owner)),
),
));
assert_matches!(
env.worker()
.handle_confirmed_certificate(certificate1.clone(), None)
.await,
Ok(_)
);
let notifications = Arc::new(Mutex::new(Vec::new()));
env.worker()
.fully_handle_certificate_with_notifications(certificate0.clone(), ¬ifications)
.await?;
env.worker()
.fully_handle_certificate_with_notifications(certificate1.clone(), ¬ifications)
.await?;
assert_eq!(
*notifications.lock().unwrap(),
vec![
Notification {
chain_id: chain_1,
reason: NewBlock {
height: BlockHeight(0),
hash: certificate0.hash(),
event_streams: BTreeSet::new(),
}
},
Notification {
chain_id: chain_2,
reason: NewIncomingBundle {
origin: chain_1,
height: BlockHeight(0)
}
},
Notification {
chain_id: chain_1,
reason: NewBlock {
height: BlockHeight(1),
hash: certificate1.hash(),
event_streams: BTreeSet::new(),
}
},
Notification {
chain_id: chain_2,
reason: NewIncomingBundle {
origin: chain_1,
height: BlockHeight(1)
}
}
]
);
{
let block_proposal = make_first_block(chain_2)
.with_simple_transfer(chain_3, Amount::from_tokens(6))
.with_authenticated_signer(Some(recipient_owner))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(
WorkerError::ChainError(error)
) if matches!(&*error, ChainError::ExecutionError(
execution_error, ChainExecutionContext::Operation(_)
) if matches!(**execution_error, ExecutionError::InsufficientBalance { .. }))
);
}
{
let block_proposal = make_first_block(chain_2)
.with_simple_transfer(chain_3, Amount::from_tokens(5))
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::ONE).to_posted(0, MessageKind::Tracked)
],
},
action: MessageAction::Accept,
})
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate1.hash(),
height: BlockHeight::from(1),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::from_tokens(2)) .to_posted(0, MessageKind::Tracked),
],
},
action: MessageAction::Accept,
})
.with_authenticated_signer(Some(recipient_owner))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::ChainError(chain_error))
if matches!(*chain_error, ChainError::UnexpectedMessage { .. })
);
}
{
let block_proposal = make_first_block(chain_2)
.with_simple_transfer(chain_3, Amount::from_tokens(6))
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(1, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_authenticated_signer(Some(recipient_owner))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::ChainError(chain_error))
if matches!(*chain_error, ChainError::CannotSkipMessage { .. })
);
}
{
let block_proposal = make_first_block(chain_2)
.with_simple_transfer(chain_3, Amount::from_tokens(6))
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate1.hash(),
height: BlockHeight::from(1),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![system_credit_message(Amount::from_tokens(3))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::ONE).to_posted(0, MessageKind::Tracked)
],
},
action: MessageAction::Accept,
})
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(1, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_authenticated_signer(Some(recipient_owner))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(WorkerError::ChainError(chain_error))
if matches!(*chain_error, ChainError::CannotSkipMessage { .. })
);
}
{
let block_proposal = make_first_block(chain_2)
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::ONE).to_posted(0, MessageKind::Tracked)
],
},
action: MessageAction::Accept,
})
.with_simple_transfer(chain_3, Amount::ONE)
.with_authenticated_signer(Some(recipient_owner))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
env.worker()
.handle_block_proposal(block_proposal.clone())
.await?;
let certificate: ConfirmedBlockCertificate = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![
Vec::new(),
vec![direct_credit_message(chain_3, Amount::ONE)],
],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new(); 2],
blobs: vec![Vec::new(); 2],
state_hash: env
.system_execution_state(&chain_2_desc.id())
.into_hash()
.await,
oracle_responses: vec![Vec::new(); 2],
operation_results: vec![OperationResult::default()],
}
.with(block_proposal.content.block),
));
env.worker()
.handle_confirmed_certificate(certificate.clone(), None)
.await?;
let block_proposal = make_child_block(&certificate.into_value())
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::from(0),
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(1, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate1.hash(),
height: BlockHeight::from(1),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![system_credit_message(Amount::from_tokens(3))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_simple_transfer(chain_3, Amount::from_tokens(3))
.into_first_proposal(recipient_owner, &signer)
.await
.unwrap();
env.worker()
.handle_block_proposal(block_proposal.clone())
.await?;
}
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_exceed_balance<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_owner = signer.generate_new().into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::from_tokens(1000))
.with_authenticated_signer(Some(sender_owner))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(block_proposal).await,
Err(
WorkerError::ChainError(error)
) if matches!(&*error, ChainError::ExecutionError(
execution_error, ChainExecutionContext::Operation(_)
) if matches!(**execution_error, ExecutionError::InsufficientBalance { .. }))
);
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none());
assert!(chain.manager.validated_vote().is_none());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_owner = signer.generate_new().into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, sender_owner, Amount::from_tokens(5))
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::from_tokens(5))
.with_authenticated_signer(Some(sender_owner))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
let (chain_info_response, _actions) =
env.worker().handle_block_proposal(block_proposal).await?;
chain_info_response.check(env.worker().public_key())?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.confirmed_vote().is_none()); let validated_certificate =
env.make_certificate(chain.manager.validated_vote().unwrap().value().clone());
drop(chain);
let (chain_info_response, _actions) = env
.worker()
.handle_validated_certificate(validated_certificate)
.await?;
chain_info_response.check(env.worker().public_key())?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert!(chain.manager.validated_vote().is_none()); let pending_vote = chain.manager.confirmed_vote().unwrap().lite();
assert_eq!(
chain_info_response.info.manager.pending.unwrap(),
pending_vote
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_block_proposal_replay<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_owner = signer.generate_new().into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_owner, Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let block_proposal = make_first_block(chain_1)
.with_simple_transfer(chain_2, Amount::from_tokens(5))
.with_authenticated_signer(Some(sender_owner))
.into_first_proposal(sender_owner, &signer)
.await
.unwrap();
let (response, _actions) = env
.worker()
.handle_block_proposal(block_proposal.clone())
.await?;
response.check(env.worker().public_key())?;
let (replay_response, _actions) = env.worker().handle_block_proposal(block_proposal).await?;
assert_eq!(
CryptoHash::new(&*response.info),
CryptoHash::new(&*replay_response.info)
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_unknown_sender<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let sender_pubkey = signer.generate_new();
let test_pubkey = signer.generate_new();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_2_desc = env
.add_root_chain(2, test_pubkey.into(), Amount::ZERO)
.await;
let chain_2 = chain_2_desc.id();
let chain_1_desc = dummy_chain_description(1);
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_pubkey,
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::from_tokens(5),
vec![],
)
.await;
assert_matches!(
env.worker()
.fully_handle_certificate_with_notifications(certificate, &())
.await,
Err(WorkerError::BlobsNotFound(error))
if error == vec![Blob::new_chain_description(&chain_1_desc).id()]
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_with_open_chain<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let balance = Amount::from_tokens(42);
let small_transfer = Amount::from_tokens(1);
let description = env
.add_child_chain(chain_2_desc.id(), sender_key_pair.public().into(), balance)
.await;
let chain_id = description.id();
let mut state = env.system_execution_state(&description.id());
state.balance = balance;
let block = make_first_block(chain_id)
.with_simple_transfer(chain_id, small_transfer)
.with_authenticated_signer(Some(sender_key_pair.public().into()));
let value = ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![vec![]],
blobs: vec![vec![]],
state_hash: state.into_hash().await,
oracle_responses: vec![vec![]],
operation_results: vec![OperationResult::default()],
}
.with(block),
);
let certificate = env.make_certificate(value);
let info = env
.worker()
.fully_handle_certificate_with_notifications(certificate, &())
.await?
.info;
assert_eq!(info.next_block_height, BlockHeight::from(1));
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_wrong_owner<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let chain_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_2_desc = env
.add_root_chain(2, chain_key_pair.public().into(), Amount::from_tokens(5))
.await;
let chain_2 = chain_2_desc.id();
let certificate = env
.make_transfer_certificate_for_epoch(
chain_2_desc.clone(),
sender_key_pair.public(),
chain_key_pair.public().into(),
AccountOwner::CHAIN,
Account::chain(chain_2),
Amount::from_tokens(5),
Vec::new(),
Epoch::ZERO,
Amount::ZERO,
BTreeMap::new(),
vec![],
)
.await;
assert_matches!(
env.worker()
.fully_handle_certificate_with_notifications(certificate, &())
.await,
Err(WorkerError::ChainError(ref e)) if matches!(**e, ChainError::CorruptedChainState(_))
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_bad_block_height<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_2 = chain_2_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
env.worker()
.fully_handle_certificate_with_notifications(certificate, &())
.await?;
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_with_anticipated_incoming_bundle<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, key_pair.public().into(), Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ZERO)
.await;
let chain_3_desc = env
.add_root_chain(3, AccountPublicKey::test_key(3).into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let chain_3 = chain_3_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
key_pair.public(),
chain_2,
Amount::from_tokens(1000),
vec![IncomingBundle {
origin: chain_3,
bundle: MessageBundle {
certificate_hash: CryptoHash::test_hash("certificate"),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![system_credit_message(Amount::from_tokens(995))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
}],
Amount::ZERO,
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_eq!(Amount::ZERO, *chain.execution_state.system.balance.get());
assert_eq!(
BlockHeight::from(1),
chain.tip_state.get().next_block_height
);
let inbox = chain
.inboxes
.try_load_entry(&chain_3)
.await?
.expect("Missing inbox for `chain_3` in `chain_1`");
assert_eq!(BlockHeight::ZERO, inbox.next_block_height_to_receive()?);
assert_eq!(inbox.added_bundles.count(), 0);
assert_matches!(
inbox
.removed_bundles
.front()
.await?
.unwrap(),
MessageBundle {
certificate_hash,
height,
timestamp,
transaction_index: 0,
messages,
} if certificate_hash == CryptoHash::test_hash("certificate")
&& height == BlockHeight::ZERO
&& timestamp == Timestamp::from(0)
&& matches!(messages[..], [PostedMessage {
authenticated_signer: None,
grant: Amount::ZERO,
refund_grant_to: None,
kind: MessageKind::Tracked,
index: 0,
message: Message::System(SystemMessage::Credit { amount, .. }),
}] if amount == Amount::from_tokens(995)),
"Unexpected bundle",
);
assert_eq!(chain.confirmed_log.count(), 1);
assert_eq!(Some(certificate.hash()), chain.tip_state.get().block_hash);
let chain = env.worker().chain_state_view(chain_2).await?;
assert!(chain.is_active().await?);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_receiver_balance_overflow<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::ONE)
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::MAX)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::ONE,
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let new_sender_chain = env.worker().chain_state_view(chain_1).await?;
assert!(new_sender_chain.is_active().await?);
assert_eq!(
Amount::ZERO,
*new_sender_chain.execution_state.system.balance.get()
);
assert_eq!(
BlockHeight::from(1),
new_sender_chain.tip_state.get().next_block_height
);
assert_eq!(new_sender_chain.confirmed_log.count(), 1);
assert_eq!(
Some(certificate.hash()),
new_sender_chain.tip_state.get().block_hash
);
let new_recipient_chain = env.worker().chain_state_view(chain_2).await?;
assert!(new_recipient_chain.is_active().await?);
assert_eq!(
Amount::MAX,
*new_recipient_chain.execution_state.system.balance.get()
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_same_chain_same_owner_no_messages<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let key_pair = AccountSecretKey::generate();
let owner = key_pair.public().into();
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner, Amount::ONE).await;
let chain_1 = chain_1_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
key_pair.public(),
chain_1,
Amount::ONE,
Vec::new(),
Amount::ONE,
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let chain = env.worker().chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_eq!(Amount::ONE, *chain.execution_state.system.balance.get());
let inbox = chain.inboxes.try_load_entry(&chain_1).await?;
assert!(inbox.is_none());
assert_eq!(
BlockHeight::from(1),
chain.tip_state.get().next_block_height
);
assert_eq!(chain.confirmed_log.count(), 1);
assert_eq!(Some(certificate.hash()), chain.tip_state.get().block_hash);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_different_chain_with_messages<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let key_pair = AccountSecretKey::generate();
let owner = key_pair.public().into();
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner, Amount::ONE).await;
let chain_1 = chain_1_desc.id();
let chain_2_desc = env.add_root_chain(2, owner, Amount::ZERO).await;
let chain_2 = chain_2_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
key_pair.public(),
chain_2,
Amount::ONE,
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
let chain_1_state = env.worker().chain_state_view(chain_1).await?;
assert!(chain_1_state.is_active().await?);
assert_eq!(
Amount::ZERO,
*chain_1_state.execution_state.system.balance.get()
);
let chain_2_state = env.worker().chain_state_view(chain_2).await?;
let inbox = chain_2_state
.inboxes
.try_load_entry(&chain_1)
.await?
.expect("Missing inbox for chain_1 in chain_2");
assert_eq!(BlockHeight::from(1), inbox.next_block_height_to_receive()?);
assert_matches!(
inbox.added_bundles.front().await?.unwrap(),
MessageBundle {
certificate_hash,
height,
timestamp,
transaction_index: 0,
messages,
} if certificate_hash == certificate.hash()
&& height == BlockHeight::ZERO
&& timestamp == Timestamp::from(0)
&& matches!(messages[..], [PostedMessage {
authenticated_signer: None,
grant: Amount::ZERO,
refund_grant_to: None,
kind: MessageKind::Tracked,
index: 0,
message: Message::System(SystemMessage::Credit { amount, .. })
}] if amount == Amount::ONE),
"Unexpected bundle",
);
assert_eq!(
BlockHeight::from(1),
chain_1_state.tip_state.get().next_block_height
);
assert_eq!(chain_1_state.confirmed_log.count(), 1);
assert_eq!(
Some(certificate.hash()),
chain_1_state.tip_state.get().block_hash
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_cross_chain_request<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let chain_2 = chain_2_desc.id();
let chain_1_desc = dummy_chain_description(1);
let chain_1 = chain_1_desc.id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc,
sender_key_pair.public(),
chain_2,
Amount::from_tokens(10),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &certificate.clone()))
.await?;
let chain = env.worker().chain_state_view(chain_2).await?;
assert!(chain.is_active().await?);
assert_eq!(Amount::ONE, *chain.execution_state.system.balance.get());
assert_eq!(BlockHeight::ZERO, chain.tip_state.get().next_block_height);
let inbox = chain
.inboxes
.try_load_entry(&chain_1)
.await?
.expect("Missing inbox for `chain_1` in `chain_2`");
assert_eq!(BlockHeight::from(1), inbox.next_block_height_to_receive()?);
assert_matches!(
inbox
.added_bundles
.front()
.await?
.unwrap(),
MessageBundle {
certificate_hash,
height,
timestamp,
transaction_index: 0,
messages,
} if certificate_hash == certificate.hash()
&& height == BlockHeight::ZERO
&& timestamp == Timestamp::from(0)
&& matches!(messages[..], [PostedMessage {
authenticated_signer: None,
grant: Amount::ZERO,
refund_grant_to: None,
kind: MessageKind::Tracked,
index: 0,
message: Message::System(SystemMessage::Credit { amount, .. })
}] if amount == Amount::from_tokens(10)),
"Unexpected bundle",
);
assert_eq!(chain.confirmed_log.count(), 0);
assert_eq!(None, chain.tip_state.get().block_hash);
assert_eq!(chain.received_log.count(), 1);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_cross_chain_request_no_recipient_chain<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(10))
.await;
let chain_2 = dummy_chain_description(2).id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc,
sender_key_pair.public(),
chain_2,
Amount::from_tokens(10),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
assert!(env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &certificate))
.await?
.cross_chain_requests
.is_empty());
let chain = env.worker().chain_state_view(chain_2).await?;
assert!(chain.inboxes.indices().await?.is_empty());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_cross_chain_request_no_recipient_chain_on_client<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage, true, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(10))
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = dummy_chain_description(2).id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc,
sender_key_pair.public(),
chain_2,
Amount::from_tokens(10),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
let actions = env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &certificate))
.await?;
assert_matches!(
actions.cross_chain_requests.as_slice(),
&[CrossChainRequest::ConfirmUpdatedRecipient { .. }]
);
assert_eq!(
actions.notifications,
vec![Notification {
chain_id: chain_2,
reason: Reason::NewIncomingBundle {
origin: chain_1,
height: BlockHeight::ZERO,
}
}]
);
let chain = env.worker().chain_state_view(chain_2).await?;
assert!(!chain.inboxes.indices().await?.is_empty());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_to_active_recipient<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let recipient_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(5))
.await;
let chain_2_desc = env
.add_root_chain(2, recipient_key_pair.public().into(), Amount::ZERO)
.await;
let chain_3_desc = env
.add_root_chain(3, recipient_key_pair.public().into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let chain_3 = chain_3_desc.id();
assert_eq!(
env.worker()
.query_application(chain_1, Query::System(SystemQuery), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::System(SystemResponse {
chain_id: chain_1,
balance: Amount::from_tokens(5),
}),
operations: vec![],
}
);
assert_eq!(
env.worker()
.query_application(chain_2, Query::System(SystemQuery), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::System(SystemResponse {
chain_id: chain_2,
balance: Amount::ZERO,
}),
operations: vec![],
}
);
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
let info = env
.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?
.info;
assert_eq!(chain_1, info.chain_id);
assert_eq!(Amount::ZERO, info.chain_balance);
assert_eq!(BlockHeight::from(1), info.next_block_height);
assert_eq!(Some(certificate.hash()), info.block_hash);
assert!(info.manager.pending.is_none());
assert_eq!(
env.worker()
.query_application(chain_1, Query::System(SystemQuery), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::System(SystemResponse {
chain_id: chain_1,
balance: Amount::ZERO,
}),
operations: vec![],
}
);
let certificate = env
.make_simple_transfer_certificate(
chain_2_desc.clone(),
recipient_key_pair.public(),
chain_3,
Amount::ONE,
vec![IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![system_credit_message(Amount::from_tokens(5))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
}],
Amount::from_tokens(4),
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
assert_eq!(
env.worker()
.query_application(chain_2, Query::System(SystemQuery), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::System(SystemResponse {
chain_id: chain_2,
balance: Amount::from_tokens(4),
}),
operations: vec![],
}
);
{
let recipient_chain = env.worker().chain_state_view(chain_2).await?;
assert!(recipient_chain.is_active().await?);
assert_eq!(
*recipient_chain.execution_state.system.balance.get(),
Amount::from_tokens(4)
);
let ownership = &recipient_chain.manager.ownership.get();
assert!(
ownership
.owners
.contains_key(&recipient_key_pair.public().into())
&& ownership.super_owners.is_empty()
&& ownership.owners.len() == 1
);
assert_eq!(recipient_chain.confirmed_log.count(), 1);
assert_eq!(
recipient_chain.tip_state.get().block_hash,
Some(certificate.hash())
);
assert_eq!(recipient_chain.received_log.count(), 1);
}
let query = ChainInfoQuery::new(chain_2).with_received_log_excluding_first_n(0);
let (response, _actions) = env.worker().handle_chain_info_query(query).await?;
assert_eq!(response.info.requested_received_log.len(), 1);
assert_eq!(
response.info.requested_received_log[0],
ChainAndHeight {
chain_id: chain_1,
height: BlockHeight::ZERO
}
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_to_inactive_recipient<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(5))
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = dummy_chain_description(2).id();
let certificate = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2, Amount::from_tokens(5),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
let info = env
.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?
.info;
assert_eq!(chain_1, info.chain_id);
assert_eq!(Amount::ZERO, info.chain_balance);
assert_eq!(BlockHeight::from(1), info.next_block_height);
assert_eq!(Some(certificate.hash()), info.block_hash);
assert!(info.manager.pending.is_none());
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_handle_certificate_with_rejected_transfer<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let sender_pubkey = sender_key_pair.public();
let sender = AccountOwner::from(sender_pubkey);
let recipient_key_pair = AccountSecretKey::generate();
let recipient_pubkey = recipient_key_pair.public();
let recipient = AccountOwner::from(recipient_pubkey);
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(6))
.await;
let chain_2_desc = env
.add_root_chain(2, recipient_key_pair.public().into(), Amount::ZERO)
.await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let sender_account = Account {
chain_id: chain_1,
owner: sender,
};
let recipient_account = Account {
chain_id: chain_2,
owner: recipient,
};
let certificate0 = env
.make_transfer_certificate(
chain_1_desc.clone(),
sender_pubkey,
sender,
AccountOwner::CHAIN,
sender_account,
Amount::from_tokens(5),
Vec::new(),
Amount::ONE,
BTreeMap::from_iter([(sender, Amount::from_tokens(5))]),
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate0.clone(), &())
.await?;
let certificate1 = env
.make_transfer_certificate(
chain_1_desc.clone(),
sender_pubkey,
sender,
sender,
recipient_account,
Amount::from_tokens(3),
Vec::new(),
Amount::ONE,
BTreeMap::from_iter([(sender, Amount::from_tokens(2))]),
vec![&certificate0],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate1.clone(), &())
.await?;
let certificate2 = env
.make_transfer_certificate(
chain_1_desc.clone(),
sender_pubkey,
sender,
sender,
recipient_account,
Amount::from_tokens(2),
Vec::new(),
Amount::ONE,
BTreeMap::new(),
vec![&certificate1, &certificate0],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate2.clone(), &())
.await?;
let certificate = env
.make_transfer_certificate(
chain_2_desc.clone(),
recipient_pubkey,
recipient,
recipient,
Account::chain(chain_2_desc.id()),
Amount::ONE,
vec![
IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate1.hash(),
height: BlockHeight::from(1),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![Message::System(SystemMessage::Credit {
source: sender,
target: recipient,
amount: Amount::from_tokens(3),
})
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Reject,
},
IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: certificate2.hash(),
height: BlockHeight::from(2),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![Message::System(SystemMessage::Credit {
source: sender,
target: recipient,
amount: Amount::from_tokens(2),
})
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
},
],
Amount::ONE,
BTreeMap::from_iter([(recipient, Amount::from_tokens(1))]),
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate.clone(), &())
.await?;
{
let chain = env.worker().chain_state_view(chain_2).await?;
assert!(chain.is_active().await?);
assert_no_removed_bundles(&chain).await;
}
let certificate3 = env
.make_transfer_certificate(
chain_1_desc.clone(),
sender_pubkey,
sender,
sender,
Account::chain(chain_2_desc.id()),
Amount::from_tokens(3),
vec![IncomingBundle {
origin: chain_2,
bundle: MessageBundle {
certificate_hash: certificate.hash(),
height: BlockHeight::from(0),
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![Message::System(SystemMessage::Credit {
source: sender,
target: recipient,
amount: Amount::from_tokens(3),
})
.to_posted(0, MessageKind::Bouncing)],
},
action: MessageAction::Accept,
}],
Amount::ONE,
BTreeMap::new(),
vec![&certificate2, &certificate1, &certificate0],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(certificate3.clone(), &())
.await?;
{
let chain = env.worker.chain_state_view(chain_1).await?;
assert!(chain.is_active().await?);
assert_no_removed_bundles(&chain).await;
}
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn run_test_chain_creation_with_committee_creation<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let mut env =
TestEnvironment::new_with_amount(storage.clone(), false, false, Amount::from_tokens(2))
.await;
let mut committees = BTreeMap::new();
let committee = env.committee().clone();
committees.insert(Epoch::ZERO, committee.clone());
let admin_chain_id = env.admin_chain_id();
let user_description = env
.add_child_chain(admin_chain_id, env.admin_public_key().into(), Amount::ZERO)
.await;
let user_id = user_description.id();
let certificate0 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![vec![Blob::new_chain_description(&user_description)]],
state_hash: env
.system_execution_state(&admin_chain_id)
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![OperationResult::default()],
}
.with(
make_first_block(admin_chain_id)
.with_operation(SystemOperation::OpenChain(OpenChainConfig {
ownership: ChainOwnership::single(env.admin_public_key().into()),
balance: Amount::ZERO,
application_permissions: Default::default(),
}))
.with_authenticated_signer(Some(env.admin_public_key().into())),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate0.clone(), &())
.await?;
{
let admin_chain = env.worker().chain_state_view(admin_chain_id).await?;
assert!(admin_chain.is_active().await?);
assert_no_removed_bundles(&admin_chain).await;
assert_eq!(
BlockHeight::from(1),
admin_chain.tip_state.get().next_block_height
);
assert!(admin_chain.outboxes.indices().await?.is_empty());
assert_eq!(
*admin_chain.execution_state.system.admin_chain_id.get(),
Some(admin_chain_id)
);
}
let committees2 = BTreeMap::from_iter([
(Epoch::ZERO, committee.clone()),
(Epoch::from(1), committee.clone()),
]);
let event_id = EventId {
chain_id: admin_chain_id,
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
index: 1,
};
let committee_blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
storage.write_blob(&committee_blob).await?;
let blob_hash = committee_blob.id().hash;
let certificate1 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![
vec![],
vec![direct_credit_message(user_id, Amount::from_tokens(2))],
],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![
vec![Event {
stream_id: event_id.stream_id.clone(),
index: event_id.index,
value: bcs::to_bytes(&blob_hash).unwrap(),
}],
Vec::new(),
],
blobs: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: committees2.clone(),
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(1),
balance: Amount::ZERO,
..env.system_execution_state(&admin_chain_id)
}
.into_hash()
.await,
oracle_responses: vec![vec![OracleResponse::Blob(committee_blob.id())], vec![]],
operation_results: vec![OperationResult::default(); 2],
}
.with(
make_child_block(&certificate0.clone().into_value())
.with_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
epoch: Epoch::from(1),
blob_hash,
}))
.with_simple_transfer(user_id, Amount::from_tokens(2)),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate1.clone(), &())
.await?;
{
let user_chain = env.worker().chain_state_view(user_id).await?;
assert!(user_chain.is_active().await?);
assert_eq!(
BlockHeight::ZERO,
user_chain.tip_state.get().next_block_height
);
assert_eq!(
*user_chain.execution_state.system.admin_chain_id.get(),
Some(admin_chain_id)
);
assert_no_removed_bundles(&user_chain).await;
matches!(
&user_chain
.inboxes
.try_load_entry(&admin_chain_id)
.await?
.expect("Missing inbox for admin chain in user chain")
.added_bundles
.read_front(10)
.await?[..],
[bundle1]
if matches!(bundle1.messages[..], [PostedMessage {
message: Message::System(SystemMessage::Credit { .. }), ..
}])
);
assert_eq!(
user_chain
.execution_state
.system
.committees
.get()
.await?
.len(),
1
);
}
let certificate3 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![Vec::new(); 2],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new(); 2],
blobs: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: committees2.clone(),
balance: Amount::from_tokens(2),
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(1),
..env.system_execution_state(&user_description.id())
}
.into_hash()
.await,
oracle_responses: vec![
vec![],
vec![
OracleResponse::Event(
EventId {
chain_id: admin_chain_id,
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
index: 1,
},
bcs::to_bytes(&blob_hash).unwrap(),
),
OracleResponse::Blob(committee_blob.id()),
],
],
operation_results: vec![OperationResult::default()],
}
.with(
make_first_block(user_id)
.with_incoming_bundle(IncomingBundle {
origin: admin_chain_id,
bundle: MessageBundle {
certificate_hash: certificate1.hash(),
height: BlockHeight::from(1),
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.with_operation(SystemOperation::ProcessNewEpoch(Epoch::from(1))),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate3, &())
.await?;
{
let user_chain = env.worker().chain_state_view(user_id).await?;
assert!(user_chain.is_active().await?);
assert_eq!(
BlockHeight::from(1),
user_chain.tip_state.get().next_block_height
);
assert_eq!(
*user_chain.execution_state.system.admin_chain_id.get(),
Some(admin_chain_id)
);
assert_eq!(
user_chain
.execution_state
.system
.committees
.get()
.await?
.len(),
2
);
assert_no_removed_bundles(&user_chain).await;
Ok(())
}
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_transfers_and_committee_creation<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let owner1 = AccountSecretKey::generate().public().into();
let storage = storage_builder.build().await?;
let mut env = TestEnvironment::new(storage.clone(), false, false).await;
let chain_1_desc = env.add_root_chain(1, owner1, Amount::from_tokens(3)).await;
let mut committees = BTreeMap::new();
let committee = env.committee().clone();
committees.insert(Epoch::ZERO, committee.clone());
let admin_chain_id = env.admin_chain_id();
let user_id = chain_1_desc.id();
let certificate0 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(admin_chain_id, Amount::ONE)]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
balance: Amount::from_tokens(2),
..env.system_execution_state(&chain_1_desc.id())
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![OperationResult::default()],
}
.with(
make_first_block(user_id)
.with_simple_transfer(admin_chain_id, Amount::ONE)
.with_authenticated_signer(Some(owner1)),
),
));
let committees2 = BTreeMap::from_iter([
(Epoch::ZERO, committee.clone()),
(Epoch::from(1), committee.clone()),
]);
let committee_blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
let blob_hash = committee_blob.id().hash;
storage.write_blob(&committee_blob).await?;
let certificate1 =
env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![vec![Event {
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
index: 1,
value: bcs::to_bytes(&committee_blob.id().hash).unwrap(),
}]],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees2.clone(),
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(1),
..env.system_execution_state(&admin_chain_id)
}
.into_hash()
.await,
oracle_responses: vec![vec![OracleResponse::Blob(committee_blob.id())]],
operation_results: vec![OperationResult::default()],
}
.with(make_first_block(admin_chain_id).with_operation(
SystemOperation::Admin(AdminOperation::CreateCommittee {
epoch: Epoch::from(1),
blob_hash,
}),
)),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate1.clone(), &())
.await?;
env.worker()
.fully_handle_certificate_with_notifications(certificate0.clone(), &())
.await?;
let user_chain = env.worker().chain_state_view(user_id).await?;
assert!(user_chain.is_active().await?);
assert_eq!(
BlockHeight::from(1),
user_chain.tip_state.get().next_block_height
);
assert_eq!(
*user_chain.execution_state.system.balance.get(),
Amount::from_tokens(2)
);
assert_eq!(*user_chain.execution_state.system.epoch.get(), Epoch::ZERO);
let admin_chain = env.worker().chain_state_view(admin_chain_id).await?;
assert!(admin_chain.is_active().await?);
assert_eq!(admin_chain.inboxes.indices().await?.len(), 1);
matches!(
&admin_chain
.inboxes
.try_load_entry(&user_id)
.await?
.expect("Missing inbox for user chain in admin chain")
.added_bundles
.read_front(10)
.await?[..],
[bundle] if matches!(bundle.messages[..], [PostedMessage {
message: Message::System(SystemMessage::Credit { .. }),
..
}])
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_transfers_and_committee_removal<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let mut env =
TestEnvironment::new_with_amount(storage.clone(), false, false, Amount::ZERO).await;
let owner1 = AccountSecretKey::generate().public().into();
let chain_1_desc = env.add_root_chain(1, owner1, Amount::from_tokens(3)).await;
let mut committees = BTreeMap::new();
let committee = env.committee().clone();
committees.insert(Epoch::ZERO, committee.clone());
let admin_chain_id = env.admin_chain_id();
let user_id = chain_1_desc.id();
let certificate0 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(admin_chain_id, Amount::ONE)]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
balance: Amount::from_tokens(2),
..env.system_execution_state(&chain_1_desc.id())
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![OperationResult::default()],
}
.with(
make_first_block(user_id)
.with_simple_transfer(admin_chain_id, Amount::ONE)
.with_authenticated_signer(Some(owner1)),
),
));
let committees1 = BTreeMap::from_iter([(Epoch::from(1), committee.clone())]);
let committee_blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
let blob_hash = committee_blob.id().hash;
storage.write_blob(&committee_blob).await?;
let certificate1 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![]; 2],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![
vec![Event {
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
index: 1,
value: bcs::to_bytes(&committee_blob.id().hash).unwrap(),
}],
vec![Event {
stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
index: 0,
value: Vec::new(),
}],
],
blobs: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: committees1.clone(),
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(1),
..env.system_execution_state(&admin_chain_id)
}
.into_hash()
.await,
oracle_responses: vec![vec![OracleResponse::Blob(committee_blob.id())], vec![]],
operation_results: vec![OperationResult::default(); 2],
}
.with(
make_first_block(admin_chain_id)
.with_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
epoch: Epoch::from(1),
blob_hash,
}))
.with_operation(SystemOperation::Admin(AdminOperation::RemoveCommittee {
epoch: Epoch::ZERO,
})),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate1.clone(), &())
.await?;
env.worker()
.fully_handle_certificate_with_notifications(certificate0.clone(), &())
.await?;
{
let user_chain = env.worker().chain_state_view(user_id).await?;
assert!(user_chain.is_active().await?);
assert_eq!(
BlockHeight::from(1),
user_chain.tip_state.get().next_block_height
);
assert_eq!(
*user_chain.execution_state.system.balance.get(),
Amount::from_tokens(2)
);
assert_eq!(*user_chain.execution_state.system.epoch.get(), Epoch::ZERO);
let admin_chain = env.worker().chain_state_view(admin_chain_id).await?;
assert!(admin_chain.is_active().await?);
assert_eq!(admin_chain.inboxes.indices().await?, vec![user_id]);
}
let certificate2 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![Vec::new()],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees1.clone(),
balance: Amount::ONE,
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(1),
..env.system_execution_state(&admin_chain_id)
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![],
}
.with(
make_child_block(&certificate1.clone().into_value())
.with_epoch(1)
.with_incoming_bundle(IncomingBundle {
origin: user_id,
bundle: MessageBundle {
certificate_hash: certificate0.hash(),
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::ONE).to_posted(0, MessageKind::Tracked)
],
},
action: MessageAction::Accept,
}),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate2.clone(), &())
.await?;
{
let admin_chain = env.worker().chain_state_view(admin_chain_id).await?;
assert!(admin_chain.is_active().await?);
assert_no_removed_bundles(&admin_chain).await;
}
let committees3 = BTreeMap::from_iter([
(Epoch::from(1), committee.clone()),
(Epoch::from(2), committee.clone()),
]);
let certificate3 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![Vec::new()],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::from([(
StreamId::system(NEW_EPOCH_STREAM_NAME),
(certificate1.hash(), BlockHeight(0)),
)]),
events: vec![vec![Event {
stream_id: StreamId::system(NEW_EPOCH_STREAM_NAME),
index: 2,
value: bcs::to_bytes(&committee_blob.id().hash).unwrap(),
}]],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees3.clone(),
balance: Amount::ONE,
used_blobs: BTreeSet::from([committee_blob.id()]),
epoch: Epoch::from(2),
..env.system_execution_state(&admin_chain_id)
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![OperationResult::default()],
}
.with(
make_child_block(&certificate2.into_value())
.with_epoch(1)
.with_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
epoch: Epoch::from(2),
blob_hash,
})),
),
));
env.worker()
.fully_handle_certificate_with_notifications(certificate3, &())
.await?;
Ok(())
}
#[test(tokio::test)]
async fn test_cross_chain_helper() -> anyhow::Result<()> {
let store_config = MemoryDatabase::new_test_config().await?;
let namespace = generate_test_namespace();
let store = DbStorage::<MemoryDatabase, _>::new_for_testing(
store_config,
&namespace,
None,
TestClock::new(),
)
.await?;
let env = TestEnvironment::new(store, true, false).await;
let chain_0 = env.admin_description.clone();
let chain_1 = dummy_chain_description(1);
let key_pair0 = AccountSecretKey::generate();
let id0 = chain_0.id();
let id1 = chain_1.id();
let certificate0 = env
.make_transfer_certificate_for_epoch(
chain_0.clone(),
key_pair0.public(),
key_pair0.public().into(),
AccountOwner::CHAIN,
Account::chain(id1),
Amount::ONE,
Vec::new(),
Epoch::ZERO,
Amount::ONE,
BTreeMap::new(),
vec![],
)
.await;
let certificate1 = env
.make_transfer_certificate_for_epoch(
chain_0.clone(),
key_pair0.public(),
key_pair0.public().into(),
AccountOwner::CHAIN,
Account::chain(id1),
Amount::ONE,
Vec::new(),
Epoch::ZERO,
Amount::ONE,
BTreeMap::new(),
vec![&certificate0],
)
.await;
let certificate2 = env
.make_transfer_certificate_for_epoch(
chain_0.clone(),
key_pair0.public(),
key_pair0.public().into(),
AccountOwner::CHAIN,
Account::chain(id1),
Amount::ONE,
Vec::new(),
Epoch::from(1),
Amount::ONE,
BTreeMap::new(),
vec![&certificate1, &certificate0],
)
.await;
let certificate3 = env
.make_transfer_certificate_for_epoch(
chain_0.clone(),
key_pair0.public(),
key_pair0.public().into(),
AccountOwner::CHAIN,
Account::chain(id1),
Amount::ONE,
Vec::new(),
Epoch::ZERO,
Amount::ONE,
BTreeMap::new(),
vec![&certificate2, &certificate1, &certificate0],
)
.await;
let bundles0 = certificate0.message_bundles_for(id1).collect::<Vec<_>>();
let bundles1 = certificate1.message_bundles_for(id1).collect::<Vec<_>>();
let bundles2 = certificate2.message_bundles_for(id1).collect::<Vec<_>>();
let bundles3 = certificate3.message_bundles_for(id1).collect::<Vec<_>>();
let bundles01 = Vec::from_iter(bundles0.iter().cloned().chain(bundles1.iter().cloned()));
let bundles012 = Vec::from_iter(bundles01.iter().cloned().chain(bundles2.iter().cloned()));
let bundles0123 = Vec::from_iter(bundles012.iter().cloned().chain(bundles3.iter().cloned()));
fn without_epochs<'a>(
bundles: impl IntoIterator<Item = &'a (Epoch, MessageBundle)>,
) -> Vec<MessageBundle> {
bundles
.into_iter()
.map(|(_, bundle)| bundle.clone())
.collect()
}
let storage = env.worker().storage_client();
let helper = CrossChainUpdateHelper {
allow_messages_from_deprecated_epochs: true,
current_epoch: Epoch::from(1),
};
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::ZERO,
None,
bundles01.clone(),
storage
)
.await?,
without_epochs(&bundles01)
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::from(1),
None,
bundles01.clone(),
storage
)
.await?,
without_epochs(&bundles1)
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::from(2),
None,
bundles01.clone(),
storage
)
.await?,
vec![]
);
assert_matches!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::ZERO,
None,
Vec::from_iter(bundles1.iter().cloned().chain(bundles0.iter().cloned())),
storage
)
.await,
Err(WorkerError::InvalidCrossChainRequest)
);
let helper = CrossChainUpdateHelper {
allow_messages_from_deprecated_epochs: false,
current_epoch: Epoch::from(1),
};
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::ZERO,
None,
bundles01.clone(),
storage
)
.await?,
without_epochs(&bundles01)
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::ZERO,
None,
bundles0123.clone(),
storage
)
.await?,
without_epochs(&bundles0123)
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::from(1),
None,
bundles012.clone(),
storage
)
.await?,
without_epochs(bundles1.iter().chain(&bundles2))
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::from(1),
Some(BlockHeight::from(1)),
bundles01.clone(),
storage
)
.await?,
without_epochs(&bundles1)
);
assert_eq!(
helper
.select_message_bundles(
&id0,
id1,
BlockHeight::ZERO,
Some(BlockHeight::from(1)),
bundles01.clone(),
storage
)
.await?,
without_epochs(&bundles01)
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_timeouts<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let mut signer = InMemorySigner::new(None);
let clock = storage_builder.clock();
let key_pairs = generate_key_pairs(&mut signer, 2);
let owner0 = AccountOwner::from(key_pairs[0]);
let owner1 = AccountOwner::from(key_pairs[1]);
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner0, Amount::from_tokens(2)).await;
let small_transfer = Amount::from_micros(1);
let chain_1 = chain_1_desc.id();
let proposed_block0 = make_first_block(chain_1)
.with_operation(SystemOperation::ChangeOwnership {
super_owners: Vec::new(),
owners: vec![(owner0, 100), (owner1, 100)],
multi_leader_rounds: 0,
open_multi_leader_rounds: false,
timeout_config: TimeoutConfig::default(),
})
.with_authenticated_signer(Some(owner0));
let (_, block0, _, _) = env
.worker()
.stage_block_execution(
proposed_block0,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value0 = ConfirmedBlock::new(block0);
let certificate0 = env.make_certificate(value0.clone());
let response = env
.worker()
.fully_handle_certificate_with_notifications(certificate0, &())
.await?;
assert_eq!(response.info.manager.leader, Some(owner1));
let proposal = make_child_block(&value0)
.with_simple_transfer(chain_1, small_transfer)
.into_proposal_with_round(owner0, &signer, Round::SingleLeader(0))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal).await;
assert_matches!(result, Err(WorkerError::InvalidOwner));
let proposal = make_child_block(&value0)
.with_simple_transfer(chain_1, small_transfer)
.into_proposal_with_round(owner0, &signer, Round::SingleLeader(1))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal).await;
assert_matches!(result, Err(WorkerError::ChainError(ref error))
if matches!(**error, ChainError::WrongRound(Round::SingleLeader(0)))
);
let query = ChainInfoQuery::new(chain_1).with_timeout(BlockHeight(1), Round::SingleLeader(0));
let result = env.worker().handle_chain_info_query(query.clone()).await;
assert_matches!(result, Err(WorkerError::ChainError(ref error))
if matches!(**error, ChainError::NotTimedOutYet(_))
);
clock.set(response.info.manager.round_timeout.unwrap());
let (response, _) = env.worker().handle_chain_info_query(query).await?;
let vote = response.info.manager.timeout_vote.clone().unwrap();
let value_timeout = Timeout::new(chain_1, BlockHeight::from(1), Epoch::from(0));
let certificate_timeout = vote
.with_value(value_timeout.clone())
.unwrap()
.into_certificate(env.worker().public_key());
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.leader, Some(owner0));
let proposed_block1 = make_child_block(&value0).with_simple_transfer(chain_1, small_transfer);
let (_, block1, _, _) = env
.worker()
.stage_block_execution(
proposed_block1.clone(),
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let proposal1_wrong_owner = proposed_block1
.clone()
.with_authenticated_signer(Some(owner1))
.into_proposal_with_round(owner1, &signer, Round::SingleLeader(1))
.await
.unwrap();
let result = env
.worker()
.handle_block_proposal(proposal1_wrong_owner)
.await;
assert_matches!(result, Err(WorkerError::InvalidOwner));
let proposal1 = proposed_block1
.clone()
.into_proposal_with_round(owner0, &signer, Round::SingleLeader(1))
.await
.unwrap();
let (response, _) = env.worker().handle_block_proposal(proposal1).await?;
let value1 = ValidatedBlock::new(block1.clone());
let vote = response.info.manager.pending.clone().unwrap();
let certificate1 = vote
.with_value(value1.clone())
.unwrap()
.into_certificate(env.worker().public_key());
let (response, _) = env
.worker()
.handle_validated_certificate(certificate1.clone())
.await?;
let vote = response.info.manager.pending.as_ref().unwrap();
let value = ConfirmedBlock::new(block1.clone());
assert_eq!(vote.value, LiteValue::new(&value));
let certificate_timeout =
env.make_certificate_with_round(value_timeout.clone(), Round::SingleLeader(4));
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.leader, Some(owner1));
assert_eq!(response.info.manager.current_round, Round::SingleLeader(5));
let amount = Amount::from_tokens(1);
let proposed_block2 = make_child_block(&value0.clone()).with_simple_transfer(chain_1, amount);
let (_, block2, _, _) = env
.worker()
.stage_block_execution(
proposed_block2.clone(),
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value2 = ValidatedBlock::new(block2.clone());
let certificate = env.make_certificate_with_round(value2.clone(), Round::SingleLeader(2));
env.worker()
.handle_validated_certificate(certificate)
.await?;
let query_values = ChainInfoQuery::new(chain_1).with_manager_values();
let (response, _) = env
.worker()
.handle_chain_info_query(query_values.clone())
.await?;
let manager = response.info.manager;
assert_eq!(
manager.requested_confirmed.unwrap().block(),
certificate1.block()
);
let proposal = proposed_block2
.clone()
.with_authenticated_signer(Some(owner1))
.into_proposal_with_round(owner1, &signer, Round::SingleLeader(5))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal.clone()).await;
assert_matches!(result, Err(WorkerError::ChainError(error))
if matches!(*error, ChainError::HasIncompatibleConfirmedVote(_, _))
);
let certificate2 = env.make_certificate_with_round(value2.clone(), Round::SingleLeader(4));
let proposal = BlockProposal::new_retry_regular(
owner1,
Round::SingleLeader(5),
certificate2.clone(),
&signer,
)
.await
.unwrap();
let lite_value2 = LiteValue::new(&value2);
let (_, _) = env.worker().handle_block_proposal(proposal).await?;
let (response, _) = env
.worker()
.handle_chain_info_query(query_values.clone())
.await?;
assert_eq!(
response.info.manager.requested_locking,
Some(Box::new(LockingBlock::Regular(certificate2)))
);
let vote = response.info.manager.pending.as_ref().unwrap();
assert_eq!(vote.value, lite_value2);
assert_eq!(vote.round, Round::SingleLeader(5));
let certificate_timeout =
env.make_certificate_with_round(value_timeout.clone(), Round::SingleLeader(5));
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.leader, Some(owner0));
assert_eq!(response.info.manager.current_round, Round::SingleLeader(6));
let proposal = proposed_block1
.into_proposal_with_round(owner0, &signer, Round::SingleLeader(6))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal.clone()).await;
assert_matches!(result, Err(WorkerError::ChainError(error))
if matches!(*error, ChainError::HasIncompatibleConfirmedVote(_, _))
);
let certificate_timeout =
env.make_certificate_with_round(value_timeout, Round::SingleLeader(7));
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.current_round, Round::SingleLeader(8));
let certificate = env.make_certificate_with_round(value1, Round::SingleLeader(7));
let worker = env.worker().clone();
worker
.handle_validated_certificate(certificate.clone())
.await?;
let (response, _) = worker.handle_chain_info_query(query_values).await?;
assert_eq!(
response.info.manager.requested_locking,
Some(Box::new(LockingBlock::Regular(certificate)))
);
assert_ne!(
response.info.manager.pending.unwrap().kind(),
CertificateKind::Confirmed
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_round_types<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let mut signer = InMemorySigner::new(None);
let clock = storage_builder.clock();
let key_pairs = generate_key_pairs(&mut signer, 2);
let owner0 = AccountOwner::from(key_pairs[0]);
let owner1 = AccountOwner::from(key_pairs[1]);
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner0, Amount::from_tokens(2)).await;
let small_transfer = Amount::from_micros(1);
let chain_id = chain_1_desc.id();
let proposed_block0 =
make_first_block(chain_id).with_operation(SystemOperation::ChangeOwnership {
super_owners: vec![owner0],
owners: vec![(owner0, 100), (owner1, 100)],
multi_leader_rounds: 2,
open_multi_leader_rounds: false,
timeout_config: TimeoutConfig {
fast_round_duration: Some(TimeDelta::from_secs(5)),
..TimeoutConfig::default()
},
});
let (_, block0, _, _) = env
.worker()
.stage_block_execution(
proposed_block0,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value0 = ConfirmedBlock::new(block0);
let certificate0 = env.make_certificate(value0.clone());
let response = env
.worker()
.fully_handle_certificate_with_notifications(certificate0, &())
.await?;
assert_eq!(response.info.manager.current_round, Round::Fast);
assert_eq!(response.info.manager.leader, None);
let proposal = make_child_block(&value0)
.with_simple_transfer(chain_id, small_transfer)
.into_proposal_with_round(owner1, &signer, Round::Fast)
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal).await;
assert_matches!(result, Err(WorkerError::InvalidOwner));
let proposal = make_child_block(&value0)
.into_proposal_with_round(owner1, &signer, Round::MultiLeader(0))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal).await;
assert_matches!(result, Err(WorkerError::ChainError(ref error))
if matches!(**error, ChainError::WrongRound(Round::Fast))
);
let query = ChainInfoQuery::new(chain_id).with_timeout(BlockHeight(1), Round::Fast);
let result = env.worker().handle_chain_info_query(query.clone()).await;
assert_matches!(result, Err(WorkerError::ChainError(ref error))
if matches!(**error, ChainError::NotTimedOutYet(_))
);
clock.set(response.info.manager.round_timeout.unwrap());
let (response, _) = env.worker().handle_chain_info_query(query).await?;
let vote = response.info.manager.timeout_vote.clone().unwrap();
let value_timeout = Timeout::new(chain_id, BlockHeight::from(1), Epoch::from(0));
let certificate_timeout = vote
.with_value(value_timeout)
.unwrap()
.into_certificate(env.worker().public_key());
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.current_round, Round::MultiLeader(0));
assert_eq!(response.info.manager.leader, None);
let block1 = make_child_block(&value0).with_simple_transfer(chain_id, small_transfer);
let proposal1 = block1
.clone()
.with_authenticated_signer(Some(owner1))
.into_proposal_with_round(owner1, &signer, Round::MultiLeader(1))
.await
.unwrap();
let (_, actions) = env.worker().handle_block_proposal(proposal1).await?;
assert_matches!(actions.notifications[0].reason, Reason::NewRound { .. });
let query_values = ChainInfoQuery::new(chain_id).with_manager_values();
let (response, _) = env.worker().handle_chain_info_query(query_values).await?;
assert_eq!(response.info.manager.current_round, Round::MultiLeader(1));
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_fast_proposal_is_locked<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let storage = storage_builder.build().await?;
let clock = storage_builder.clock();
let mut signer = InMemorySigner::new(None);
let key_pairs = generate_key_pairs(&mut signer, 2);
let owner0 = AccountOwner::from(key_pairs[0]);
let owner1 = AccountOwner::from(key_pairs[1]);
let mut env = TestEnvironment::new(storage, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner0, Amount::from_tokens(2)).await;
let chain_id = chain_1_desc.id();
let proposed_block0 = make_first_block(chain_id)
.with_transfer(
AccountOwner::CHAIN,
Account::new(chain_id, owner0),
Amount::from_tokens(1),
)
.with_authenticated_signer(Some(owner0))
.with_operation(SystemOperation::ChangeOwnership {
super_owners: vec![owner0],
owners: vec![(owner0, 100), (owner1, 100)],
multi_leader_rounds: 3,
open_multi_leader_rounds: false,
timeout_config: TimeoutConfig {
fast_round_duration: Some(TimeDelta::from_millis(5)),
..TimeoutConfig::default()
},
});
let (_, block0, _, _) = env
.worker()
.stage_block_execution(
proposed_block0,
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value0 = ConfirmedBlock::new(block0);
let certificate0 = env.make_certificate(value0.clone());
let response = env
.worker()
.fully_handle_certificate_with_notifications(certificate0, &())
.await?;
assert_eq!(response.info.manager.current_round, Round::Fast);
assert_eq!(response.info.manager.leader, None);
let proposed_block1 = make_child_block(&value0)
.with_transfer(
AccountOwner::CHAIN,
Account::new(chain_id, owner0),
Amount::from_micros(1),
)
.with_authenticated_signer(Some(owner0));
let proposal1 = proposed_block1
.clone()
.into_proposal_with_round(owner0, &signer, Round::Fast)
.await
.unwrap();
let (_, block1, _, _) = env
.worker()
.stage_block_execution(
proposed_block1.clone(),
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value1 = ConfirmedBlock::new(block1);
let (response, _) = env
.worker()
.handle_block_proposal(proposal1.clone())
.await?;
let vote = response.info.manager.pending.as_ref().unwrap();
assert_eq!(vote.round, Round::Fast);
assert_eq!(vote.value.value_hash, value1.hash());
clock.set(response.info.manager.round_timeout.unwrap());
let value_timeout = Timeout::new(chain_id, BlockHeight::from(1), Epoch::from(0));
let certificate_timeout = env.make_certificate_with_round(value_timeout.clone(), Round::Fast);
let (response, _) = env
.worker()
.handle_timeout_certificate(certificate_timeout)
.await?;
assert_eq!(response.info.manager.current_round, Round::MultiLeader(0));
assert_eq!(response.info.manager.leader, None);
let proposal1b =
BlockProposal::new_retry_fast(owner1, Round::MultiLeader(0), proposal1.clone(), &signer)
.await
.unwrap();
let (response, _) = env.worker().handle_block_proposal(proposal1b).await?;
let vote = response.info.manager.pending.as_ref().unwrap();
assert_eq!(vote.round, Round::MultiLeader(0));
assert_eq!(vote.value.value_hash, value1.hash());
let proposed_block2 = make_child_block(&value0)
.with_simple_transfer(chain_id, Amount::ONE)
.with_authenticated_signer(Some(owner1));
let proposal2 = proposed_block2
.clone()
.into_proposal_with_round(owner1, &signer, Round::MultiLeader(1))
.await
.unwrap();
let result = env.worker().handle_block_proposal(proposal2).await;
assert_matches!(result, Err(WorkerError::ChainError(err))
if matches!(*err, ChainError::HasIncompatibleConfirmedVote(_, Round::Fast))
);
let proposal3 =
BlockProposal::new_retry_fast(owner0, Round::MultiLeader(2), proposal1.clone(), &signer)
.await
.unwrap();
env.worker().handle_block_proposal(proposal3).await?;
let (_, block2, _, _) = env
.worker()
.stage_block_execution(
proposed_block2.clone(),
None,
vec![],
BundleExecutionPolicy::committed(),
)
.await?;
let value2 = ValidatedBlock::new(block2.clone());
let certificate2 = env.make_certificate_with_round(value2.clone(), Round::MultiLeader(0));
let proposal = BlockProposal::new_retry_regular(
owner1,
Round::MultiLeader(3),
certificate2.clone(),
&signer,
)
.await
.unwrap();
let lite_value2 = LiteValue::new(&value2);
let (_, _) = env.worker().handle_block_proposal(proposal).await?;
let query_values = ChainInfoQuery::new(chain_id).with_manager_values();
let (response, _) = env.worker().handle_chain_info_query(query_values).await?;
assert_eq!(
response.info.manager.requested_locking,
Some(Box::new(LockingBlock::Regular(certificate2)))
);
let vote = response.info.manager.pending.as_ref().unwrap();
assert_eq!(vote.value, lite_value2);
assert_eq!(vote.round, Round::MultiLeader(3));
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_long_lived_service<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
const NUM_QUERIES: usize = 5;
let storage = storage_builder.build().await?;
let clock = storage_builder.clock();
let mut env = TestEnvironment::new(storage, false, true).await;
let chain_description = env
.add_root_chain(
1,
AccountSecretKey::generate().public().into(),
Amount::from_tokens(2),
)
.await;
let chain_id = chain_description.id();
let (application_id, application);
{
let mut chain = env.worker().storage.load_chain(chain_id).await?;
(application_id, application, _) =
chain.execution_state.register_mock_application(0).await?;
chain.save().await?;
}
let query_times = (0..NUM_QUERIES as u64).map(Timestamp::from);
let query_contexts = query_times.clone().map(|local_time| QueryContext {
chain_id,
next_block_height: BlockHeight(0),
local_time,
});
for _ in query_contexts {
application.expect_call(ExpectedCall::handle_query(move |_runtime, query| {
assert!(query.is_empty());
Ok(vec![])
}));
}
let query = Query::User {
application_id,
bytes: vec![],
};
for query_time in query_times {
clock.set(query_time);
assert_eq!(
env.worker()
.query_application(chain_id, query.clone(), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::User(vec![]),
operations: vec![],
}
);
}
drop(env);
linera_base::time::timer::sleep(Duration::from_millis(10)).await;
application.assert_no_more_expected_calls();
application.assert_no_active_instances();
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_new_block_causes_service_restart<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
const NUM_QUERIES: usize = 2;
const BLOCK_TIMESTAMP: u64 = 10;
let storage = storage_builder.build().await?;
let clock = storage_builder.clock();
let mut signer = InMemorySigner::new(None);
let public_key = signer.generate_new();
let owner = public_key.into();
let balance = Amount::from_tokens(1);
let small_transfer = Amount::from_micros(1);
let mut env = TestEnvironment::new(storage.clone(), false, true).await;
let chain_1_desc = env.add_root_chain(1, owner, balance).await;
let chain_2_desc = env.add_root_chain(2, owner, balance).await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let (application_id, application);
{
let mut chain = storage.load_chain(chain_1).await?;
(application_id, application, _) =
chain.execution_state.register_mock_application(0).await?;
chain.save().await?;
}
let queries_before_proposal = (0..NUM_QUERIES as u64).map(Timestamp::from);
let queries_before_confirmation =
(0..NUM_QUERIES as u64).map(|delta| Timestamp::from(NUM_QUERIES as u64 + delta));
let queries_before_new_block = queries_before_proposal
.clone()
.chain(queries_before_confirmation.clone());
let queries_after_new_block =
(1..=NUM_QUERIES as u64).map(|delta| Timestamp::from(BLOCK_TIMESTAMP + delta));
let query = Query::User {
application_id,
bytes: vec![],
};
let query_contexts_before_new_block =
queries_before_new_block
.clone()
.map(|local_time| QueryContext {
chain_id: chain_1,
next_block_height: BlockHeight(0),
local_time,
});
let query_contexts_after_new_block =
queries_after_new_block
.clone()
.map(|local_time| QueryContext {
chain_id: chain_1,
next_block_height: BlockHeight(1),
local_time,
});
for _ in query_contexts_before_new_block.clone() {
application.expect_call(ExpectedCall::handle_query(move |_runtime, query| {
assert!(query.is_empty());
Ok(vec![])
}));
}
for local_time in queries_before_proposal {
clock.set(local_time);
assert_eq!(
env.worker()
.query_application(chain_1, query.clone(), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::User(vec![]),
operations: vec![],
}
);
}
clock.set(Timestamp::from(BLOCK_TIMESTAMP));
let block = make_first_block(chain_1)
.with_timestamp(Timestamp::from(BLOCK_TIMESTAMP))
.with_simple_transfer(chain_2, small_transfer)
.with_authenticated_signer(Some(owner));
let block_proposal = block
.clone()
.into_first_proposal(owner, &signer)
.await
.unwrap();
env.worker().handle_block_proposal(block_proposal).await?;
for local_time in queries_before_confirmation {
clock.set(local_time);
assert_eq!(
env.worker()
.query_application(chain_1, query.clone(), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::User(vec![]),
operations: vec![],
}
);
}
let mut state = SystemExecutionState {
timestamp: Timestamp::from(BLOCK_TIMESTAMP),
balance: balance - small_transfer,
..env.system_execution_state(&chain_1_desc.id())
}
.into_view()
.await;
state.register_mock_application(0).await?;
let value = ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(chain_2, small_transfer)]],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![vec![]],
blobs: vec![vec![]],
state_hash: state.crypto_hash_mut().await?,
oracle_responses: vec![vec![]],
operation_results: vec![OperationResult::default()],
}
.with(block),
);
let certificate = env.make_certificate(value);
env.worker()
.handle_confirmed_certificate(certificate, None)
.await?;
for _ in query_contexts_after_new_block.clone() {
application.expect_call(ExpectedCall::handle_query(move |_runtime, query| {
assert!(query.is_empty());
Ok(vec![])
}));
}
for local_time in queries_after_new_block {
clock.set(local_time);
assert_eq!(
env.worker()
.query_application(chain_1, query.clone(), None)
.await?
.0,
QueryOutcome {
response: QueryResponse::User(vec![]),
operations: vec![],
}
);
}
drop(env);
linera_base::time::timer::sleep(Duration::from_millis(10)).await;
application.assert_no_more_expected_calls();
application.assert_no_active_instances();
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_stage_block_with_message_earlier_than_cursor<B>(
mut storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let mut signer = InMemorySigner::new(None);
let receiver_public_key = signer.generate_new();
let owner = receiver_public_key.into();
let mut env = TestEnvironment::new(storage_builder.build().await?, false, false).await;
let chain_1_desc = env.add_root_chain(1, owner, Amount::from_tokens(10)).await;
let chain_2_desc = env.add_root_chain(2, owner, Amount::ZERO).await;
let chain_1 = chain_1_desc.id();
let chain_2 = chain_2_desc.id();
let sender_hash = CryptoHash::test_hash("sender block");
let block_proposal = make_first_block(chain_2)
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: sender_hash,
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 1,
messages: vec![system_credit_message(Amount::from_tokens(2))
.to_posted(0, MessageKind::Tracked)],
},
action: MessageAction::Accept,
})
.into_first_proposal(owner, &signer)
.await
.unwrap();
let certificate_chain_2 = env.make_certificate(ConfirmedBlock::new(
BlockExecutionOutcome {
messages: vec![Vec::new()],
previous_message_blocks: BTreeMap::new(),
previous_event_blocks: BTreeMap::new(),
events: vec![Vec::new()],
blobs: vec![Vec::new()],
state_hash: SystemExecutionState {
balance: Amount::from_tokens(2),
..env.system_execution_state(&chain_2_desc.id())
}
.into_hash()
.await,
oracle_responses: vec![Vec::new()],
operation_results: vec![],
}
.with(block_proposal.content.block),
));
env.worker()
.handle_confirmed_certificate(certificate_chain_2.clone(), None)
.await?;
let bad_proposed_block = make_child_block(&certificate_chain_2.into_value())
.with_incoming_bundle(IncomingBundle {
origin: chain_1,
bundle: MessageBundle {
certificate_hash: sender_hash,
height: BlockHeight::ZERO,
timestamp: Timestamp::from(0),
transaction_index: 0,
messages: vec![
system_credit_message(Amount::ONE).to_posted(0, MessageKind::Tracked)
],
},
action: MessageAction::Accept,
});
assert_matches!(
env.worker()
.stage_block_execution(bad_proposed_block.clone(), None, vec![], BundleExecutionPolicy::committed())
.await,
Err(WorkerError::ChainError(chain_error))
if matches!(*chain_error, ChainError::IncorrectMessageOrder { .. })
);
let bad_proposal = bad_proposed_block
.into_first_proposal(owner, &signer)
.await
.unwrap();
assert_matches!(
env.worker().handle_block_proposal(bad_proposal).await,
Err(WorkerError::ChainError(chain_error))
if matches!(*chain_error, ChainError::IncorrectMessageOrder { .. })
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_pending_bundles_priority_ordering<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_1_key = AccountSecretKey::generate();
let sender_2_key = AccountSecretKey::generate();
let storage = storage_builder.build().await?;
let chain_1_desc = dummy_chain_description(1);
let chain_1 = chain_1_desc.id();
let chain_3_desc = dummy_chain_description(3);
let chain_3 = chain_3_desc.id();
let mut env = TestEnvironment::new(storage, false, false)
.await
.with_priority_bundle_origins(HashSet::from([chain_3]));
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let chain_2 = chain_2_desc.id();
let certificate_1 = env
.make_simple_transfer_certificate(
chain_1_desc,
sender_1_key.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &certificate_1))
.await?;
let certificate_3 = env
.make_simple_transfer_certificate(
chain_3_desc,
sender_2_key.public(),
chain_2,
Amount::from_tokens(3),
Vec::new(),
Amount::ZERO,
vec![],
)
.await;
env.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &certificate_3))
.await?;
let query = ChainInfoQuery::new(chain_2).with_pending_message_bundles();
let (response, _actions) = env.worker().handle_chain_info_query(query).await?;
let bundles = &response.info.requested_pending_message_bundles;
assert_eq!(bundles.len(), 2);
assert_eq!(
bundles[0].origin, chain_3,
"Priority chain bundle should be first"
);
assert_eq!(
bundles[1].origin, chain_1,
"Non-priority chain bundle should be second"
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_revert_confirm_recovery<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, true, false).await;
env.worker.chain_worker_config.allow_revert_confirm = true;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100))
.await;
let chain_1 = chain_1_desc.id();
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let chain_2 = chain_2_desc.id();
let cert_0 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::from_tokens(95),
vec![],
)
.await;
let cert_1 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(3),
Vec::new(),
Amount::from_tokens(92),
vec![&cert_0],
)
.await;
env.worker()
.process_confirmed_block(cert_0.clone(), None)
.await?;
env.worker()
.process_confirmed_block(cert_1.clone(), None)
.await?;
let actions = env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &cert_0))
.await?;
for request in actions.cross_chain_requests {
env.worker().handle_cross_chain_request(request).await?;
}
env.worker()
.handle_cross_chain_request(CrossChainRequest::ConfirmUpdatedRecipient {
sender: chain_1,
recipient: chain_2,
latest_height: BlockHeight::from(1),
})
.await?;
{
let chain = env.worker().chain_state_view(chain_1).await?;
let outbox = chain.outboxes.try_load_entry(&chain_2).await?;
assert!(
outbox.is_none() || outbox.unwrap().queue.count() == 0,
"Outbox should be drained by the spurious confirm"
);
}
let cert_2 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(2),
Vec::new(),
Amount::from_tokens(90),
vec![&cert_1],
)
.await;
env.worker()
.process_confirmed_block(cert_2.clone(), None)
.await?;
let actions = env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &cert_2))
.await?;
assert!(
actions
.cross_chain_requests
.iter()
.any(|r| matches!(r, CrossChainRequest::RevertConfirm { .. })),
"Expected RevertConfirm but got: {:?}",
actions.cross_chain_requests,
);
let mut requests = std::collections::VecDeque::from(actions.cross_chain_requests);
let mut iterations = 0;
while let Some(request) = requests.pop_front() {
iterations += 1;
assert!(iterations < 20, "Too many iterations in cross-chain loop");
let actions = env.worker().handle_cross_chain_request(request).await?;
requests.extend(actions.cross_chain_requests);
}
{
let chain = env.worker().chain_state_view(chain_2).await?;
let inbox = chain
.inboxes
.try_load_entry(&chain_1)
.await?
.expect("chain_2 should have an inbox for chain_1");
assert_eq!(
inbox.removed_bundles.count(),
0,
"removed_bundles should be empty"
);
assert_eq!(
inbox.added_bundles.count(),
3,
"all three heights should have been delivered"
);
}
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "dynamodb", test_case(DynamoDbStorageBuilder::default(); "dynamo_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_reset_on_corrupted_chain_state<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let storage = storage_builder.build().await?;
let mut env = TestEnvironment::new(storage.clone(), true, false).await;
let chain_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100))
.await;
let chain_id = chain_desc.id();
let target_chain = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let target_id = target_chain.id();
let cert_0 = env
.make_simple_transfer_certificate(
chain_desc.clone(),
sender_key_pair.public(),
target_id,
Amount::from_tokens(5),
Vec::new(),
Amount::from_tokens(95),
vec![],
)
.await;
env.worker()
.fully_handle_certificate_with_notifications(cert_0.clone(), &())
.await?;
{
let chain = env.worker().chain_state_view(chain_id).await?;
assert_eq!(
chain.tip_state.get().next_block_height,
BlockHeight::from(1)
);
}
let planted_vote = {
let key_pair = env.worker().chain_worker_config.key_pair().unwrap().copy();
let mut chain = storage.load_chain(chain_id).await?;
let vote = Vote::new(cert_0.value().clone(), Round::Fast, &key_pair);
chain.manager.confirmed_vote.set(Some(vote.clone()));
chain.save().await?;
vote
};
{
let mut chain = storage.load_chain(chain_id).await?;
chain.previous_message_blocks.remove(&target_id)?;
chain.save().await?;
}
let cert_1 = env
.make_simple_transfer_certificate(
chain_desc.clone(),
sender_key_pair.public(),
target_id,
Amount::from_tokens(3),
Vec::new(),
Amount::from_tokens(92),
vec![&cert_0],
)
.await;
{
let worker_no_recovery = WorkerState::new(
storage.clone(),
ChainWorkerConfig {
nickname: "No-recovery worker".to_string(),
allow_inactive_chains: true,
allow_messages_from_deprecated_epochs: true,
block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS),
..ChainWorkerConfig::default()
}
.with_key_pair(Some(
env.worker().chain_worker_config.key_pair().unwrap().copy(),
)),
None,
);
assert_matches!(
worker_no_recovery
.fully_handle_certificate_with_notifications(cert_1.clone(), &())
.await,
Err(WorkerError::ChainError(ref e)) if matches!(**e, ChainError::CorruptedChainState(_))
);
}
{
let worker_not_whitelisted = WorkerState::new(
storage.clone(),
ChainWorkerConfig {
nickname: "Whitelist-excludes worker".to_string(),
allow_inactive_chains: true,
allow_messages_from_deprecated_epochs: true,
reset_on_corrupted_chain_state: Some(Duration::from_secs(0)),
recovery_whitelist: Some(HashSet::from([target_id])),
block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS),
..ChainWorkerConfig::default()
}
.with_key_pair(Some(
env.worker().chain_worker_config.key_pair().unwrap().copy(),
)),
None,
);
assert_matches!(
worker_not_whitelisted
.fully_handle_certificate_with_notifications(cert_1.clone(), &())
.await,
Err(WorkerError::ChainError(ref e)) if matches!(**e, ChainError::CorruptedChainState(_))
);
assert_matches!(
worker_not_whitelisted
.fully_handle_certificate_with_notifications(cert_1.clone(), &())
.await,
Err(WorkerError::ChainError(ref e)) if matches!(**e, ChainError::CorruptedChainState(_))
);
}
let worker_with_recovery = WorkerState::new(
storage.clone(),
ChainWorkerConfig {
nickname: "Recovery worker".to_string(),
allow_inactive_chains: true,
allow_messages_from_deprecated_epochs: true,
reset_on_corrupted_chain_state: Some(Duration::from_secs(0)),
recovery_whitelist: Some(HashSet::from([chain_id])),
block_time_grace_period: Duration::from_micros(TEST_GRACE_PERIOD_MICROS),
..ChainWorkerConfig::default()
}
.with_key_pair(Some(
env.worker().chain_worker_config.key_pair().unwrap().copy(),
)),
None,
);
assert_matches!(
worker_with_recovery
.fully_handle_certificate_with_notifications(cert_1.clone(), &())
.await,
Err(WorkerError::ChainError(ref e)) if matches!(**e, ChainError::CorruptedChainState(_))
);
{
let chain = worker_with_recovery.chain_state_view(chain_id).await?;
let restored_vote = chain.manager.confirmed_vote.get().as_ref();
assert_eq!(
restored_vote.map(|v| v.signature),
Some(planted_vote.signature),
"confirmed_vote should be restored after reset-and-reexecute"
);
}
worker_with_recovery
.fully_handle_certificate_with_notifications(cert_1.clone(), &())
.await?;
{
let chain = worker_with_recovery.chain_state_view(chain_id).await?;
assert_eq!(
chain.tip_state.get().next_block_height,
BlockHeight::from(2),
"Chain should have processed both blocks after recovery"
);
assert_eq!(
*chain.execution_state.system.balance.get(),
Amount::from_tokens(92),
"Balance should be correct after re-execution"
);
let prev = chain.previous_message_blocks.get(&target_id).await?;
assert_eq!(
prev,
Some(BlockHeight::from(1)),
"previous_message_blocks should be corrected after re-execution"
);
}
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[test_log::test(tokio::test)]
async fn test_cross_chain_message_chunking<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, true, false)
.await
.with_cross_chain_message_chunk_limit(1);
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100))
.await;
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let chain_2 = chain_2_desc.id();
let cert_0 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::from_tokens(95),
vec![],
)
.await;
let cert_1 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(3),
Vec::new(),
Amount::from_tokens(92),
vec![&cert_0],
)
.await;
let cert_2 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(2),
Vec::new(),
Amount::from_tokens(90),
vec![&cert_1],
)
.await;
env.worker()
.process_confirmed_block(cert_0.clone(), None)
.await?;
env.worker()
.process_confirmed_block(cert_1.clone(), None)
.await?;
let (_, actions, _) = env
.worker()
.process_confirmed_block(cert_2.clone(), None)
.await?;
let initial_updates: Vec<_> = actions
.cross_chain_requests
.iter()
.filter(|r| matches!(r, CrossChainRequest::UpdateRecipient { .. }))
.collect();
assert_eq!(
initial_updates.len(),
1,
"Only one UpdateRecipient chunk should be returned initially"
);
let mut requests = std::collections::VecDeque::from(actions.cross_chain_requests);
let mut chunks_delivered = 0;
let mut iterations = 0;
while let Some(request) = requests.pop_front() {
iterations += 1;
assert!(iterations < 30, "Too many iterations in cross-chain loop");
let actions = env.worker().handle_cross_chain_request(request).await?;
chunks_delivered += actions
.cross_chain_requests
.iter()
.filter(|r| matches!(r, CrossChainRequest::UpdateRecipient { .. }))
.count();
requests.extend(actions.cross_chain_requests);
}
assert_eq!(
chunks_delivered, 2,
"Expected 2 additional chunks from confirmations, got {}",
chunks_delivered
);
let chain = env.worker().chain_state_view(chain_2).await?;
let inbox = chain
.inboxes
.try_load_entry(&chain_1_desc.id())
.await?
.expect("chain_2 should have an inbox for chain_1");
assert_eq!(
inbox.next_block_height_to_receive()?,
BlockHeight::from(3),
"All three heights should have been received"
);
Ok(())
}
#[test_case(MemoryStorageBuilder::default(); "memory")]
#[test_log::test(tokio::test)]
async fn test_chunked_cross_chain_gap_detection<B>(mut storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
let sender_key_pair = AccountSecretKey::generate();
let mut env = TestEnvironment::new(storage_builder.build().await?, true, false)
.await
.with_cross_chain_message_chunk_limit(1);
env.worker.chain_worker_config.allow_revert_confirm = true;
let chain_1_desc = env
.add_root_chain(1, sender_key_pair.public().into(), Amount::from_tokens(100))
.await;
let chain_1 = chain_1_desc.id();
let chain_2_desc = env
.add_root_chain(2, AccountPublicKey::test_key(2).into(), Amount::ONE)
.await;
let chain_2 = chain_2_desc.id();
let cert_0 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(5),
Vec::new(),
Amount::from_tokens(95),
vec![],
)
.await;
let cert_1 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(3),
Vec::new(),
Amount::from_tokens(92),
vec![&cert_0],
)
.await;
let cert_2 = env
.make_simple_transfer_certificate(
chain_1_desc.clone(),
sender_key_pair.public(),
chain_2,
Amount::from_tokens(2),
Vec::new(),
Amount::from_tokens(90),
vec![&cert_1],
)
.await;
env.worker()
.process_confirmed_block(cert_0.clone(), None)
.await?;
env.worker()
.process_confirmed_block(cert_1.clone(), None)
.await?;
env.worker()
.process_confirmed_block(cert_2.clone(), None)
.await?;
let actions = env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &cert_0))
.await?;
for request in actions.cross_chain_requests {
env.worker().handle_cross_chain_request(request).await?;
}
let actions = env
.worker()
.handle_cross_chain_request(update_recipient_direct(chain_2, &cert_2))
.await?;
assert!(
actions
.cross_chain_requests
.iter()
.any(|r| matches!(r, CrossChainRequest::RevertConfirm { .. })),
"Expected RevertConfirm due to gap, but got: {:?}",
actions.cross_chain_requests,
);
let chain = env.worker().chain_state_view(chain_2).await?;
let inbox = chain
.inboxes
.try_load_entry(&chain_1)
.await?
.expect("chain_2 should have an inbox for chain_1");
assert_eq!(
inbox.next_block_height_to_receive()?,
BlockHeight::from(1),
"Only height 0 should have been received before gap detection"
);
Ok(())
}