use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
time::Duration,
vec,
};
use async_trait::async_trait;
use futures::{
future::Either,
lock::{Mutex, MutexGuard},
Future,
};
use linera_base::{
crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
data_types::*,
identifiers::{AccountOwner, BlobId, ChainId, StreamId},
ownership::ChainOwnership,
};
use linera_chain::{
data_types::BlockProposal,
types::{
CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
LiteCertificate, Timeout, ValidatedBlock,
},
};
use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
use linera_storage_service::client::StorageServiceDatabase;
use linera_version::VersionInfo;
#[cfg(feature = "dynamodb")]
use linera_views::dynamo_db::DynamoDbDatabase;
#[cfg(feature = "scylladb")]
use linera_views::scylla_db::ScyllaDbDatabase;
use linera_views::{
memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
};
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;
#[cfg(feature = "rocksdb")]
use {
linera_views::rocks_db::RocksDbDatabase,
tokio::sync::{Semaphore, SemaphorePermit},
};
use crate::{
client::{chain_client, Client, ListeningMode},
data_types::*,
environment::{wallet::Chain, TestSigner, TestWallet},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
ValidatorNodeProvider,
},
notifier::ChannelNotifier,
worker::{Notification, ProcessableCertificate, WorkerState},
ChainWorkerConfig,
};
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum FaultType {
Honest,
Offline,
OfflineWithInfo,
NoChains,
DontSendConfirmVote,
DontProcessValidated,
DontSendValidateVote,
}
struct LocalValidator<S>
where
S: Storage,
{
state: WorkerState<S>,
notifier: Arc<ChannelNotifier<Notification>>,
}
#[derive(Clone)]
pub struct LocalValidatorClient<S>
where
S: Storage,
{
public_key: ValidatorPublicKey,
client: Arc<Mutex<LocalValidator<S>>>,
fault_type: FaultType,
}
impl<S> ValidatorNode for LocalValidatorClient<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
type NotificationStream = NotificationStream;
fn address(&self) -> String {
format!("local:{}", self.public_key)
}
async fn handle_block_proposal(
&self,
proposal: BlockProposal,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_block_proposal(proposal, sender)
})
.await
}
async fn handle_lite_certificate(
&self,
certificate: LiteCertificate<'_>,
_delivery: CrossChainMessageDelivery,
) -> Result<ChainInfoResponse, NodeError> {
let certificate = certificate.cloned();
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_lite_certificate(certificate, sender)
})
.await
}
async fn handle_timeout_certificate(
&self,
certificate: GenericCertificate<Timeout>,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_certificate(certificate, sender)
})
.await
}
async fn handle_validated_certificate(
&self,
certificate: GenericCertificate<ValidatedBlock>,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_certificate(certificate, sender)
})
.await
}
async fn handle_confirmed_certificate(
&self,
certificate: GenericCertificate<ConfirmedBlock>,
_delivery: CrossChainMessageDelivery,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_certificate(certificate, sender)
})
.await
}
async fn handle_chain_info_query(
&self,
query: ChainInfoQuery,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_chain_info_query(query, sender)
})
.await
}
async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
.await
}
async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
Ok(Default::default())
}
async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
Ok(self
.client
.lock()
.await
.state
.storage_client()
.read_network_description()
.await
.transpose()
.ok_or(NodeError::ViewError {
error: "missing NetworkDescription".to_owned(),
})??)
}
async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
.await
}
async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
.await
}
async fn download_pending_blob(
&self,
chain_id: ChainId,
blob_id: BlobId,
) -> Result<BlobContent, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_download_pending_blob(chain_id, blob_id, sender)
})
.await
}
async fn handle_pending_blob(
&self,
chain_id: ChainId,
blob: BlobContent,
) -> Result<ChainInfoResponse, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_handle_pending_blob(chain_id, blob, sender)
})
.await
}
async fn download_certificate(
&self,
hash: CryptoHash,
) -> Result<ConfirmedBlockCertificate, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_download_certificate(hash, sender)
})
.await
}
async fn download_certificates(
&self,
hashes: Vec<CryptoHash>,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_download_certificates(hashes, sender)
})
.await
}
async fn download_certificates_by_heights(
&self,
chain_id: ChainId,
heights: Vec<BlockHeight>,
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_download_certificates_by_heights(chain_id, heights, sender)
})
.await
}
async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_blob_last_used_by(blob_id, sender)
})
.await
}
async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_missing_blob_ids(blob_ids, sender)
})
.await
}
async fn blob_last_used_by_certificate(
&self,
blob_id: BlobId,
) -> Result<ConfirmedBlockCertificate, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_blob_last_used_by_certificate(blob_id, sender)
})
.await
}
async fn previous_event_blocks(
&self,
chain_id: ChainId,
stream_ids: Vec<StreamId>,
) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_previous_event_blocks(chain_id, stream_ids, sender)
})
.await
}
}
impl<S> LocalValidatorClient<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
let client = LocalValidator {
state,
notifier: Arc::new(ChannelNotifier::default()),
};
Self {
public_key,
client: Arc::new(Mutex::new(client)),
fault_type: FaultType::Honest,
}
}
pub fn name(&self) -> ValidatorPublicKey {
self.public_key
}
fn set_fault_type(&mut self, fault_type: FaultType) {
self.fault_type = fault_type;
}
pub async fn chain_info_with_manager_values(
&mut self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, NodeError> {
let query = ChainInfoQuery::new(chain_id).with_manager_values();
let response = self.handle_chain_info_query(query).await?;
Ok(response.info)
}
async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
where
T: Send + 'static,
R: Future<Output = Result<(), T>> + Send,
F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
{
let validator = self.clone();
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
if f(validator, sender).await.is_err() {
tracing::debug!("result could not be sent");
}
});
receiver.await.unwrap()
}
async fn do_handle_block_proposal(
self,
proposal: BlockProposal,
sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
) -> Result<(), Result<ChainInfoResponse, NodeError>> {
let result = match self.fault_type {
FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
error: "offline".to_string(),
}),
FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
FaultType::DontSendValidateVote
| FaultType::Honest
| FaultType::DontSendConfirmVote
| FaultType::DontProcessValidated => {
let result = self
.client
.lock()
.await
.state
.handle_block_proposal(proposal)
.await
.map_err(Into::into);
if self.fault_type == FaultType::DontSendValidateVote {
Err(NodeError::ClientIoError {
error: "refusing to validate".to_string(),
})
} else {
result
}
}
};
sender.send(result.map(|(info, _actions)| info))
}
async fn do_handle_lite_certificate(
self,
certificate: LiteCertificate<'_>,
sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
) -> Result<(), Result<ChainInfoResponse, NodeError>> {
let client = self.client.clone();
let mut validator = client.lock().await;
let result = async move {
match validator.state.full_certificate(certificate).await? {
Either::Left(confirmed) => {
self.do_handle_certificate_internal(confirmed, &mut validator)
.await
}
Either::Right(validated) => {
self.do_handle_certificate_internal(validated, &mut validator)
.await
}
}
}
.await;
sender.send(result)
}
async fn do_handle_certificate_internal<T: ProcessableCertificate>(
&self,
certificate: GenericCertificate<T>,
validator: &mut MutexGuard<'_, LocalValidator<S>>,
) -> Result<ChainInfoResponse, NodeError> {
match self.fault_type {
FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
Err(NodeError::ClientIoError {
error: "refusing to process validated block".to_string(),
})
}
FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
FaultType::Honest
| FaultType::DontSendConfirmVote
| FaultType::DontProcessValidated
| FaultType::DontSendValidateVote => {
let result = validator
.state
.fully_handle_certificate_with_notifications(certificate, &validator.notifier)
.await
.map_err(Into::into);
if T::KIND == CertificateKind::Validated
&& self.fault_type == FaultType::DontSendConfirmVote
{
Err(NodeError::ClientIoError {
error: "refusing to confirm".to_string(),
})
} else {
result
}
}
FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
error: "offline".to_string(),
}),
}
}
async fn do_handle_certificate<T: ProcessableCertificate>(
self,
certificate: GenericCertificate<T>,
sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
) -> Result<(), Result<ChainInfoResponse, NodeError>> {
let mut validator = self.client.lock().await;
let result = self
.do_handle_certificate_internal(certificate, &mut validator)
.await;
sender.send(result)
}
async fn do_handle_chain_info_query(
self,
query: ChainInfoQuery,
sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
) -> Result<(), Result<ChainInfoResponse, NodeError>> {
let validator = self.client.lock().await;
let result = match self.fault_type {
FaultType::Offline => Err(NodeError::ClientIoError {
error: "offline".to_string(),
}),
FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
FaultType::Honest
| FaultType::DontSendConfirmVote
| FaultType::DontProcessValidated
| FaultType::DontSendValidateVote
| FaultType::OfflineWithInfo => validator
.state
.handle_chain_info_query(query)
.await
.map_err(Into::into),
};
sender.send(result.map(|(info, _actions)| info))
}
async fn do_subscribe(
self,
chains: Vec<ChainId>,
sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
) -> Result<(), Result<NotificationStream, NodeError>> {
let validator = self.client.lock().await;
let rx = validator.notifier.subscribe(chains);
let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
sender.send(Ok(stream))
}
async fn do_upload_blob(
self,
content: BlobContent,
sender: oneshot::Sender<Result<BlobId, NodeError>>,
) -> Result<(), Result<BlobId, NodeError>> {
let validator = self.client.lock().await;
let blob = Blob::new(content);
let id = blob.id();
let storage = validator.state.storage_client();
let result = match storage.maybe_write_blobs(&[blob]).await {
Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
Err(error) => Err(error.into()),
};
sender.send(result)
}
async fn do_download_blob(
self,
blob_id: BlobId,
sender: oneshot::Sender<Result<BlobContent, NodeError>>,
) -> Result<(), Result<BlobContent, NodeError>> {
let validator = self.client.lock().await;
let blob = validator
.state
.storage_client()
.read_blob(blob_id)
.await
.map_err(Into::into);
let blob = match blob {
Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
Err(error) => Err(error),
};
sender.send(blob.map(|blob| Arc::unwrap_or_clone(blob).into_content()))
}
async fn do_download_pending_blob(
self,
chain_id: ChainId,
blob_id: BlobId,
sender: oneshot::Sender<Result<BlobContent, NodeError>>,
) -> Result<(), Result<BlobContent, NodeError>> {
let validator = self.client.lock().await;
let result = validator
.state
.download_pending_blob(chain_id, blob_id)
.await
.map_err(Into::into);
sender.send(result.map(|blob| blob.into_content()))
}
async fn do_handle_pending_blob(
self,
chain_id: ChainId,
blob: BlobContent,
sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
) -> Result<(), Result<ChainInfoResponse, NodeError>> {
let validator = self.client.lock().await;
let result = validator
.state
.handle_pending_blob(chain_id, Blob::new(blob))
.await
.map_err(Into::into);
sender.send(result)
}
async fn do_download_certificate(
self,
hash: CryptoHash,
sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
let validator = self.client.lock().await;
let certificate = validator
.state
.storage_client()
.read_certificate(hash)
.await
.map_err(Into::into);
let certificate = match certificate {
Err(error) => Err(error),
Ok(entry) => match entry {
Some(certificate) => Ok(Arc::unwrap_or_clone(certificate)),
None => {
panic!("Missing certificate: {hash}");
}
},
};
sender.send(certificate)
}
async fn do_download_certificates(
self,
hashes: Vec<CryptoHash>,
sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
let validator = self.client.lock().await;
let certificates = validator
.state
.storage_client()
.read_certificates(&hashes)
.await
.map_err(Into::into);
let certificates = match certificates {
Err(error) => Err(error),
Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
ResultReadCertificates::Certificates(certificates) => Ok(certificates),
ResultReadCertificates::InvalidHashes(hashes) => {
panic!("Missing certificates: {:?}", hashes)
}
},
};
sender.send(certificates)
}
async fn do_download_certificates_by_heights(
self,
chain_id: ChainId,
heights: Vec<BlockHeight>,
sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
let (query_sender, query_receiver) = oneshot::channel();
let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
let self_clone = self.clone();
self.do_handle_chain_info_query(query, query_sender)
.await
.expect("Failed to handle chain info query");
let chain_info_response = query_receiver.await.map_err(|_| {
Err(NodeError::ClientIoError {
error: "Failed to receive chain info response".to_string(),
})
})?;
let hashes = match chain_info_response {
Ok(response) => response.info.requested_sent_certificate_hashes,
Err(e) => {
return sender.send(Err(e));
}
};
let (cert_sender, cert_receiver) = oneshot::channel();
self_clone
.do_download_certificates(hashes, cert_sender)
.await?;
let result = cert_receiver.await.map_err(|_| {
Err(NodeError::ClientIoError {
error: "Failed to receive certificates".to_string(),
})
})?;
sender.send(result)
}
async fn do_blob_last_used_by(
self,
blob_id: BlobId,
sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
) -> Result<(), Result<CryptoHash, NodeError>> {
let validator = self.client.lock().await;
let blob_state = validator
.state
.storage_client()
.read_blob_state(blob_id)
.await
.map_err(Into::into);
let certificate_hash = match blob_state {
Err(err) => Err(err),
Ok(blob_state) => match blob_state {
None => Err(NodeError::BlobsNotFound(vec![blob_id])),
Some(blob_state) => blob_state
.last_used_by
.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
},
};
sender.send(certificate_hash)
}
async fn do_missing_blob_ids(
self,
blob_ids: Vec<BlobId>,
sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
) -> Result<(), Result<Vec<BlobId>, NodeError>> {
let validator = self.client.lock().await;
let missing_blob_ids = validator
.state
.storage_client()
.missing_blobs(&blob_ids)
.await
.map_err(Into::into);
sender.send(missing_blob_ids)
}
async fn do_blob_last_used_by_certificate(
self,
blob_id: BlobId,
sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
match self.blob_last_used_by(blob_id).await {
Ok(cert_hash) => {
let cert = self.download_certificate(cert_hash).await;
sender.send(cert)
}
Err(err) => sender.send(Err(err)),
}
}
#[allow(clippy::type_complexity)]
async fn do_previous_event_blocks(
self,
chain_id: ChainId,
stream_ids: Vec<StreamId>,
sender: oneshot::Sender<Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, NodeError>>,
) -> Result<(), Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, NodeError>> {
let validator = self.client.lock().await;
let result = validator
.state
.previous_event_blocks(chain_id, stream_ids)
.await
.map_err(Into::into);
sender.send(result)
}
}
#[derive(Clone)]
pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
where
S: Storage;
impl<S> NodeProvider<S>
where
S: Storage + Clone,
{
fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
self.0.lock().unwrap().clone()
}
}
impl<S> ValidatorNodeProvider for NodeProvider<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
type Node = LocalValidatorClient<S>;
fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
unimplemented!()
}
fn make_nodes_from_list<A>(
&self,
validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
where
A: AsRef<str>,
{
let list = self.0.lock().unwrap();
Ok(validators
.into_iter()
.map(|(public_key, address)| {
list.iter()
.find(|client| client.public_key == public_key)
.ok_or_else(|| NodeError::CannotResolveValidatorAddress {
address: address.as_ref().to_string(),
})
.map(|client| (public_key, client.clone()))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter())
}
}
impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
where
S: Storage,
{
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = LocalValidatorClient<S>>,
{
Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
}
}
pub struct TestBuilder<B: StorageBuilder> {
storage_builder: B,
pub initial_committee: Committee,
admin_description: Option<ChainDescription>,
network_description: Option<NetworkDescription>,
genesis_storage_builder: GenesisStorageBuilder,
node_provider: NodeProvider<B::Storage>,
validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
chain_client_storages: Vec<B::Storage>,
pub chain_owners: BTreeMap<ChainId, AccountOwner>,
pub signer: TestSigner,
}
#[async_trait]
pub trait StorageBuilder {
type Storage: Storage + Clone + Send + Sync + 'static;
async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
fn clock(&self) -> &TestClock;
}
#[derive(Default)]
struct GenesisStorageBuilder {
accounts: Vec<GenesisAccount>,
}
struct GenesisAccount {
description: ChainDescription,
public_key: AccountPublicKey,
}
impl GenesisStorageBuilder {
fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
self.accounts.push(GenesisAccount {
description,
public_key,
})
}
async fn build<S>(&self, storage: S) -> S
where
S: Storage + Clone + Send + Sync + 'static,
{
for account in &self.accounts {
storage
.create_chain(account.description.clone())
.await
.unwrap();
}
storage
}
}
pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
pub async fn read_confirmed_blocks_downward(
&self,
from: CryptoHash,
limit: u32,
) -> anyhow::Result<Vec<ConfirmedBlock>> {
let mut hash = Some(from);
let mut values = Vec::new();
for _ in 0..limit {
let Some(next_hash) = hash else {
break;
};
let value = self.read_confirmed_block(next_hash).await?;
hash = value.block().header.previous_block_hash;
values.push(value);
}
Ok(values)
}
}
impl<B> TestBuilder<B>
where
B: StorageBuilder,
{
pub async fn new(
mut storage_builder: B,
count: usize,
with_faulty_validators: usize,
mut signer: TestSigner,
) -> Result<Self, anyhow::Error> {
let mut validators = Vec::new();
for _ in 0..count {
let validator_keypair = ValidatorKeypair::generate();
let account_public_key = signer.generate_new();
validators.push((validator_keypair, account_public_key));
}
let for_committee = validators
.iter()
.map(|(validating, account)| (validating.public_key, *account))
.collect::<Vec<_>>();
let initial_committee = Committee::make_simple(for_committee);
let mut validator_clients = Vec::new();
let mut validator_storages = HashMap::new();
let mut faulty_validators = HashSet::new();
for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
let validator_public_key = validator_keypair.public_key;
let storage = storage_builder.build().await?;
let config = ChainWorkerConfig {
nickname: format!("Node {}", i),
..ChainWorkerConfig::default()
}
.with_key_pair(Some(validator_keypair.secret_key));
let state = WorkerState::new(storage.clone(), config, None);
let mut validator = LocalValidatorClient::new(validator_public_key, state);
if i < with_faulty_validators {
faulty_validators.insert(validator_public_key);
validator.set_fault_type(FaultType::NoChains);
}
validator_clients.push(validator);
validator_storages.insert(validator_public_key, storage);
}
tracing::info!(
"Test will use the following faulty validators: {:?}",
faulty_validators
);
Ok(Self {
storage_builder,
initial_committee,
admin_description: None,
network_description: None,
genesis_storage_builder: GenesisStorageBuilder::default(),
node_provider: NodeProvider::from_iter(validator_clients),
validator_storages,
chain_client_storages: Vec::new(),
chain_owners: BTreeMap::new(),
signer,
})
}
pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
let validators = self.initial_committee.validators().clone();
self.initial_committee = Committee::new(validators, policy);
self
}
pub fn with_cross_chain_message_chunk_limit(self, limit: usize) -> Self {
let validator_clients = self.node_provider.0.lock().unwrap();
for validator in validator_clients.iter() {
let mut inner = validator.client.try_lock().expect("no contention at setup");
inner.state.set_cross_chain_message_chunk_limit(limit);
}
drop(validator_clients);
self
}
pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
let mut faulty_validators = vec![];
let mut validator_clients = self.node_provider.0.lock().unwrap();
for index in indexes.as_ref() {
let validator = &mut validator_clients[*index];
validator.set_fault_type(fault_type);
faulty_validators.push(validator.public_key);
}
tracing::info!(
"Making the following validators {:?}: {:?}",
fault_type,
faulty_validators
);
}
pub async fn add_root_chain(
&mut self,
index: u32,
balance: Amount,
) -> anyhow::Result<ChainClient<B::Storage>> {
if self.admin_description.is_none() && index != 0 {
Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
}
let origin = ChainOrigin::Root(index);
let public_key = self.signer.generate_new();
let open_chain_config = InitialChainConfig {
ownership: ChainOwnership::single(public_key.into()),
epoch: Epoch(0),
min_active_epoch: Epoch(0),
max_active_epoch: Epoch(0),
balance,
application_permissions: ApplicationPermissions::default(),
};
let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
if index == 0 {
self.admin_description = Some(description.clone());
self.network_description = Some(NetworkDescription {
admin_chain_id: description.id(),
genesis_config_hash: CryptoHash::test_hash("genesis config"),
genesis_timestamp: Timestamp::from(0),
genesis_committee_blob_hash: committee_blob.id().hash,
name: "test network".to_string(),
});
}
self.genesis_storage_builder
.add(description.clone(), public_key);
let network_description = self.network_description.as_ref().unwrap();
for validator in self.node_provider.all_nodes() {
let storage = self
.validator_storages
.get_mut(&validator.public_key)
.unwrap();
storage
.write_network_description(network_description)
.await
.expect("writing the NetworkDescription should succeed");
storage
.write_blob(&committee_blob)
.await
.expect("writing a blob should succeed");
storage.create_chain(description.clone()).await.unwrap();
}
for storage in &mut self.chain_client_storages {
storage.create_chain(description.clone()).await.unwrap();
}
let chain_id = description.id();
self.chain_owners.insert(chain_id, public_key.into());
self.make_client(chain_id, None, BlockHeight::ZERO).await
}
pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
let mut result = Vec::new();
for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
assert_eq!(
genesis_account.description.origin(),
ChainOrigin::Root(i as u32)
);
result.push((
genesis_account.public_key,
genesis_account.description.config().balance,
));
}
result
}
pub fn admin_chain_id(&self) -> ChainId {
self.admin_description
.as_ref()
.expect("admin chain not initialized")
.id()
}
pub fn admin_description(&self) -> Option<&ChainDescription> {
self.admin_description.as_ref()
}
pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
self.node_provider.clone()
}
pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
self.node_provider.0.lock().unwrap()[index].clone()
}
pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
let storage = self.storage_builder.build().await?;
let network_description = self.network_description.as_ref().unwrap();
let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
storage
.write_network_description(network_description)
.await
.expect("writing the NetworkDescription should succeed");
storage
.write_blob(&committee_blob)
.await
.expect("writing a blob should succeed");
Ok(self.genesis_storage_builder.build(storage).await)
}
pub async fn make_client_with_options(
&mut self,
chain_id: ChainId,
block_hash: Option<CryptoHash>,
block_height: BlockHeight,
options: chain_client::Options,
) -> anyhow::Result<ChainClient<B::Storage>> {
let storage = self.make_storage().await?;
self.chain_client_storages.push(storage.clone());
let wallet = TestWallet::default();
let owner = self.chain_owners.get(&chain_id).copied();
wallet.insert(
chain_id,
Chain {
owner,
block_hash,
next_block_height: block_height,
..Chain::default()
},
);
let client = Arc::new(Client::new(
crate::environment::Impl {
network: self.make_node_provider(),
storage,
signer: self.signer.clone(),
wallet,
},
self.admin_chain_id(),
false,
[(chain_id, ListeningMode::FullChain)],
format!("Client node for {:.8}", chain_id),
Some(Duration::from_secs(30)),
Some(Duration::from_secs(1)),
HashSet::new(),
HashSet::new(),
options,
crate::client::RequestsSchedulerConfig::default(),
crate::worker::DEFAULT_BLOCK_CACHE_SIZE,
crate::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE,
));
Ok(client.create_chain_client(chain_id, block_hash, block_height, None, owner, None))
}
pub async fn make_client(
&mut self,
chain_id: ChainId,
block_hash: Option<CryptoHash>,
block_height: BlockHeight,
) -> anyhow::Result<ChainClient<B::Storage>> {
self.make_client_with_options(
chain_id,
block_hash,
block_height,
chain_client::Options::test_default(),
)
.await
}
pub async fn check_that_validators_have_certificate(
&self,
chain_id: ChainId,
block_height: BlockHeight,
target_count: usize,
) -> Option<ConfirmedBlockCertificate> {
let query = ChainInfoQuery::new(chain_id)
.with_sent_certificate_hashes_by_heights(vec![block_height]);
let mut count = 0;
let mut certificate = None;
for validator in self.node_provider.all_nodes() {
if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
if response.check(validator.public_key).is_ok() {
let ChainInfo {
mut requested_sent_certificate_hashes,
..
} = *response.info;
debug_assert!(requested_sent_certificate_hashes.len() <= 1);
if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
if let Ok(cert) = validator.download_certificate(cert_hash).await {
if cert.inner().block().header.chain_id == chain_id
&& cert.inner().block().header.height == block_height
{
cert.check(&self.initial_committee).unwrap();
count += 1;
certificate = Some(cert);
}
}
}
}
}
}
assert!(count >= target_count);
certificate
}
pub async fn check_that_validators_are_in_round(
&self,
chain_id: ChainId,
block_height: BlockHeight,
round: Round,
target_count: usize,
) {
let query = ChainInfoQuery::new(chain_id);
let mut count = 0;
for validator in self.node_provider.all_nodes() {
if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
if response.info.manager.current_round == round
&& response.info.next_block_height == block_height
&& response.check(validator.public_key).is_ok()
{
count += 1;
}
}
}
assert!(count >= target_count);
}
pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
for validator in self.node_provider.all_nodes() {
let guard = validator.client.lock().await;
let chain = guard.state.chain_state_view(chain_id).await.unwrap();
assert_eq!(chain.outboxes.indices().await.unwrap(), []);
}
}
}
#[cfg(feature = "rocksdb")]
static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
#[derive(Default)]
pub struct MemoryStorageBuilder {
namespace: String,
instance_counter: usize,
wasm_runtime: Option<WasmRuntime>,
clock: TestClock,
}
#[async_trait]
impl StorageBuilder for MemoryStorageBuilder {
type Storage = DbStorage<MemoryDatabase, TestClock>;
async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
self.instance_counter += 1;
let config = MemoryDatabase::new_test_config().await?;
if self.namespace.is_empty() {
self.namespace = generate_test_namespace();
}
let namespace = format!("{}_{}", self.namespace, self.instance_counter);
Ok(
DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
.await?,
)
}
fn clock(&self) -> &TestClock {
&self.clock
}
}
impl MemoryStorageBuilder {
pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
MemoryStorageBuilder {
wasm_runtime: wasm_runtime.into(),
..MemoryStorageBuilder::default()
}
}
}
#[cfg(feature = "rocksdb")]
pub struct RocksDbStorageBuilder {
namespace: String,
instance_counter: usize,
wasm_runtime: Option<WasmRuntime>,
clock: TestClock,
_permit: SemaphorePermit<'static>,
}
#[cfg(feature = "rocksdb")]
impl RocksDbStorageBuilder {
pub async fn new() -> Self {
RocksDbStorageBuilder {
namespace: String::new(),
instance_counter: 0,
wasm_runtime: None,
clock: TestClock::default(),
_permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
}
}
#[cfg(any(feature = "wasmer", feature = "wasmtime"))]
pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
RocksDbStorageBuilder {
wasm_runtime: wasm_runtime.into(),
..RocksDbStorageBuilder::new().await
}
}
}
#[cfg(feature = "rocksdb")]
#[async_trait]
impl StorageBuilder for RocksDbStorageBuilder {
type Storage = DbStorage<RocksDbDatabase, TestClock>;
async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
self.instance_counter += 1;
let config = RocksDbDatabase::new_test_config().await?;
if self.namespace.is_empty() {
self.namespace = generate_test_namespace();
}
let namespace = format!("{}_{}", self.namespace, self.instance_counter);
Ok(
DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
.await?,
)
}
fn clock(&self) -> &TestClock {
&self.clock
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
#[derive(Default)]
pub struct ServiceStorageBuilder {
namespace: String,
instance_counter: usize,
wasm_runtime: Option<WasmRuntime>,
clock: TestClock,
}
#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
impl ServiceStorageBuilder {
pub fn new() -> Self {
Self::with_wasm_runtime(None)
}
pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
ServiceStorageBuilder {
wasm_runtime: wasm_runtime.into(),
..ServiceStorageBuilder::default()
}
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
#[async_trait]
impl StorageBuilder for ServiceStorageBuilder {
type Storage = DbStorage<StorageServiceDatabase, TestClock>;
async fn build(&mut self) -> anyhow::Result<Self::Storage> {
self.instance_counter += 1;
let config = StorageServiceDatabase::new_test_config().await?;
if self.namespace.is_empty() {
self.namespace = generate_test_namespace();
}
let namespace = format!("{}_{}", self.namespace, self.instance_counter);
Ok(
DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
.await?,
)
}
fn clock(&self) -> &TestClock {
&self.clock
}
}
#[cfg(feature = "dynamodb")]
#[derive(Default)]
pub struct DynamoDbStorageBuilder {
namespace: String,
instance_counter: usize,
wasm_runtime: Option<WasmRuntime>,
clock: TestClock,
}
#[cfg(feature = "dynamodb")]
impl DynamoDbStorageBuilder {
pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
DynamoDbStorageBuilder {
wasm_runtime: wasm_runtime.into(),
..DynamoDbStorageBuilder::default()
}
}
}
#[cfg(feature = "dynamodb")]
#[async_trait]
impl StorageBuilder for DynamoDbStorageBuilder {
type Storage = DbStorage<DynamoDbDatabase, TestClock>;
async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
self.instance_counter += 1;
let config = DynamoDbDatabase::new_test_config().await?;
if self.namespace.is_empty() {
self.namespace = generate_test_namespace();
}
let namespace = format!("{}_{}", self.namespace, self.instance_counter);
Ok(
DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
.await?,
)
}
fn clock(&self) -> &TestClock {
&self.clock
}
}
#[cfg(feature = "scylladb")]
#[derive(Default)]
pub struct ScyllaDbStorageBuilder {
namespace: String,
instance_counter: usize,
wasm_runtime: Option<WasmRuntime>,
clock: TestClock,
}
#[cfg(feature = "scylladb")]
impl ScyllaDbStorageBuilder {
pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
ScyllaDbStorageBuilder {
wasm_runtime: wasm_runtime.into(),
..ScyllaDbStorageBuilder::default()
}
}
}
#[cfg(feature = "scylladb")]
#[async_trait]
impl StorageBuilder for ScyllaDbStorageBuilder {
type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
self.instance_counter += 1;
let config = ScyllaDbDatabase::new_test_config().await?;
if self.namespace.is_empty() {
self.namespace = generate_test_namespace();
}
let namespace = format!("{}_{}", self.namespace, self.instance_counter);
Ok(
DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
.await?,
)
}
fn clock(&self) -> &TestClock {
&self.clock
}
}
pub trait ClientOutcomeResultExt<T, E> {
fn unwrap_ok_committed(self) -> T;
fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>>;
}
impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
fn unwrap_ok_committed(self) -> T {
match self.unwrap() {
ClientOutcome::Committed(t) => t,
ClientOutcome::WaitForTimeout(timeout) => {
panic!("unexpected timeout: {timeout}")
}
ClientOutcome::Conflict(certificate) => {
panic!("unexpected conflict: {}", certificate.hash())
}
}
}
fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>> {
match self.unwrap() {
ClientOutcome::Committed(t) => Ok(t),
ClientOutcome::Conflict(certificate) => Err(certificate),
ClientOutcome::WaitForTimeout(timeout) => {
panic!("unexpected timeout: {timeout}")
}
}
}
}