use crate::{
InjectedApi, InjectedClient, InjectedTransactionAcceptance, RpcConfig, RpcEvent, RpcServer,
RpcService,
};
use ethexe_common::{
SignedMessage, ValidatorsVec,
db::OnChainStorageRW,
ecdsa::{PrivateKey, PublicKey},
gear::MAX_BLOCK_GAS_LIMIT,
injected::{AddressedInjectedTransaction, Promise, Receipt, SignedCompactTxReceipt},
mock::Mock,
};
use ethexe_db::Database;
use futures::StreamExt;
use gear_core::message::{ReplyCode, SuccessReplyReason};
use jsonrpsee::{server::ServerHandle, ws_client::WsClientBuilder};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::task::{JoinHandle, JoinSet};
struct MockService {
rpc: RpcService,
handle: ServerHandle,
validator_key: PrivateKey,
}
impl MockService {
pub async fn new(listen_addr: SocketAddr) -> Self {
let db = Database::memory();
let validator_key = PrivateKey::random();
let validator_address = PublicKey::from(&validator_key).to_address();
db.set_validators(
0,
ValidatorsVec::try_from(vec![validator_address])
.expect("test validator set must be non-empty"),
);
let (handle, rpc) = start_new_server(listen_addr, db).await;
Self {
rpc,
handle,
validator_key,
}
}
pub fn injected_api(&self) -> InjectedApi {
self.rpc.injected_api.clone()
}
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
let mut tx_batch_interval =
tokio::time::interval(std::time::Duration::from_millis(350));
let mut tx_batch = Vec::new();
loop {
tokio::select! {
_ = tx_batch_interval.tick() => {
for tx in tx_batch.drain(..) {
let (promise, receipt) = self.create_promise_for(tx);
self.rpc.receive_computed_promise(promise);
self.rpc.receive_tx_receipt(receipt);
}
},
_ = self.handle.clone().stopped() => {
unreachable!("RPC server should not be stopped during the test")
},
event = self.rpc.next() => {
let RpcEvent::InjectedTransaction {transaction, response_sender} = event.expect("RPC event will be valid");
response_sender.send(InjectedTransactionAcceptance::Accept).expect("Response sender will be valid");
tx_batch.push(transaction);
},
}
}
})
}
fn create_promise_for(
&self,
tx: AddressedInjectedTransaction,
) -> (Promise, SignedCompactTxReceipt) {
let promise = Promise::mock(tx.tx.data().to_hash());
let receipt = SignedMessage::create(
self.validator_key.clone(),
Receipt::Promise(promise.to_compact()),
)
.unwrap();
(promise, receipt.into())
}
}
async fn start_new_server(listen_addr: SocketAddr, db: Database) -> (ServerHandle, RpcService) {
let rpc_config = RpcConfig {
listen_addr,
cors: None,
gas_allowance: MAX_BLOCK_GAS_LIMIT,
chunk_size: 2,
with_dev_api: false,
};
RpcServer::new(rpc_config, db)
.run_server()
.await
.expect("RPC Server will start successfully")
}
async fn wait_for_closed_subscriptions(injected_api: InjectedApi) {
while injected_api.subscribers_count() > 0 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
#[tokio::test]
#[ntest::timeout(60_000)]
async fn test_cleanup_promise_subscribers() {
let _ = tracing_subscriber::fmt::try_init();
let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8002);
let service = MockService::new(listen_addr).await;
let injected_api = service.injected_api();
let _handle = service.spawn();
let ws_client = WsClientBuilder::new()
.build(format!("ws://{}", listen_addr))
.await
.expect("WS client will be created");
{
let mut subscribers = JoinSet::new();
for _ in 0..20 {
let mut sub = ws_client
.send_transaction_and_watch(AddressedInjectedTransaction::mock(()))
.await
.expect("Subscription will be created");
subscribers.spawn(async move {
let receipt = sub
.next()
.await
.expect("Promise will be received")
.expect("No error in subscription result");
let promise = receipt.data().clone().unwrap_promise();
assert_eq!(
promise.reply.code,
ReplyCode::Success(SuccessReplyReason::Manual)
);
sub.unsubscribe().await.expect("Unsubscribe will succeed");
});
}
let _ = subscribers.join_all().await;
wait_for_closed_subscriptions(injected_api.clone()).await;
}
{
let mut subscribers = JoinSet::new();
for _ in 0..20 {
let mut subscription = ws_client
.send_transaction_and_watch(AddressedInjectedTransaction::mock(()))
.await
.expect("Subscription will be created");
subscribers.spawn(async move {
let receipt = subscription
.next()
.await
.expect("Promise will be received")
.expect("No error in subscription result");
let promise = receipt.data().clone().unwrap_promise();
assert_eq!(
promise.reply.code,
ReplyCode::Success(SuccessReplyReason::Manual)
);
});
}
let _ = subscribers.join_all().await;
wait_for_closed_subscriptions(injected_api.clone()).await;
}
{
let mut subscriptions = vec![];
for _ in 0..20 {
let subscription = ws_client
.send_transaction_and_watch(AddressedInjectedTransaction::mock(()))
.await
.expect("Subscription will be created");
subscriptions.push(subscription);
}
drop(subscriptions);
wait_for_closed_subscriptions(injected_api.clone()).await;
}
}
#[tokio::test]
#[ntest::timeout(120_000)]
async fn test_concurrent_multiple_clients() {
let _ = tracing_subscriber::fmt::try_init();
let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8010);
let service = MockService::new(listen_addr).await;
let injected_api = service.injected_api();
let _handle = service.spawn();
let mut tasks = JoinSet::new();
for _ in 0..10 {
tasks.spawn(async move {
let client = WsClientBuilder::new()
.build(format!("ws://{listen_addr}"))
.await
.expect("WS client will be created");
let mut subscriptions = vec![];
for _ in 0..50 {
let mut subscription = client
.send_transaction_and_watch(AddressedInjectedTransaction::mock(()))
.await
.expect("Subscription will be created");
let receipt = subscription
.next()
.await
.expect("Promise will be received")
.expect("No error in subscription result");
let promise = receipt.data().clone().unwrap_promise();
assert_eq!(
promise.reply.code,
ReplyCode::Success(SuccessReplyReason::Manual)
);
subscriptions.push(subscription);
}
});
}
let _ = tasks.join_all().await;
wait_for_closed_subscriptions(injected_api).await;
}