use crate::{RpcEvent, errors, metrics::InjectedApiMetrics};
use super::{
InjectedServer, promise_manager::PromiseSubscriptionManager, relay::TransactionsRelayer,
spawner,
};
use ethexe_common::{
HashOf,
db::InjectedStorageRO,
injected::{
AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance,
SignedInjectedTransaction, SignedTxReceipt,
},
};
use ethexe_db::Database;
use jsonrpsee::{
core::{RpcResult, SubscriptionResult, async_trait},
server::PendingSubscriptionSink,
};
use std::ops::Deref;
use tokio::sync::mpsc;
use tracing::trace;
const MAX_TRANSACTION_IDS: usize = 100;
#[derive(Clone)]
pub struct InjectedApi {
db: Database,
manager: PromiseSubscriptionManager,
relayer: TransactionsRelayer,
metrics: InjectedApiMetrics,
}
#[async_trait]
impl InjectedServer for InjectedApi {
async fn send_transaction(
&self,
transaction: AddressedInjectedTransaction,
) -> RpcResult<InjectedTransactionAcceptance> {
self.send_transaction(transaction).await
}
async fn send_transaction_and_watch(
&self,
pending: PendingSubscriptionSink,
transaction: AddressedInjectedTransaction,
) -> SubscriptionResult {
self.send_transaction_and_watch(pending, transaction).await
}
async fn get_transaction_receipt(
&self,
tx_hash: HashOf<InjectedTransaction>,
) -> RpcResult<Option<SignedTxReceipt>> {
self.get_transaction_receipt(tx_hash).await
}
async fn get_transactions(
&self,
transaction_ids: Vec<HashOf<InjectedTransaction>>,
) -> RpcResult<Vec<Option<SignedInjectedTransaction>>> {
self.get_transactions(transaction_ids).await
}
}
impl Deref for InjectedApi {
type Target = PromiseSubscriptionManager;
fn deref(&self) -> &Self::Target {
&self.manager
}
}
impl InjectedApi {
pub fn new(db: Database, rpc_sender: mpsc::UnboundedSender<RpcEvent>) -> Self {
Self {
db: db.clone(),
manager: PromiseSubscriptionManager::new(db.clone()),
relayer: TransactionsRelayer::new(rpc_sender, db),
metrics: InjectedApiMetrics::default(),
}
}
}
impl InjectedApi {
async fn send_transaction(
&self,
transaction: AddressedInjectedTransaction,
) -> RpcResult<InjectedTransactionAcceptance> {
self.relayer.relay(transaction).await
}
async fn send_transaction_and_watch(
&self,
pending: PendingSubscriptionSink,
transaction: AddressedInjectedTransaction,
) -> SubscriptionResult {
let tx_hash = transaction.tx.data().to_hash();
let pending_subscriber = match self.manager.try_register_subscriber(tx_hash) {
Ok(subscriber) => subscriber,
Err(err) => {
return Err(errors::bad_request(err).into());
}
};
let acceptance = self.relayer.relay(transaction).await.inspect_err(|_err| {
self.manager.cancel_registration(tx_hash);
})?;
let sink = match acceptance {
InjectedTransactionAcceptance::Accept => {
pending.accept().await.inspect_err(|_err| {
self.manager.cancel_registration(tx_hash);
})?
}
InjectedTransactionAcceptance::Reject { reason } => {
self.manager.cancel_registration(tx_hash);
return Err(reason.into());
}
};
self.metrics.injected_tx_active_subscriptions.increment(1);
let (manager, metrics) = (self.manager.clone(), self.metrics.clone());
spawner::spawn_pending_subscriber(sink, pending_subscriber, move |tx_hash| {
manager.cancel_registration(tx_hash);
metrics.injected_tx_active_subscriptions.decrement(1);
});
Ok(())
}
async fn get_transaction_receipt(
&self,
tx_hash: HashOf<InjectedTransaction>,
) -> RpcResult<Option<SignedTxReceipt>> {
match self.db.receipt(tx_hash) {
Some(receipt) => Ok(Some(receipt)),
None => {
trace!(?tx_hash, "receipt not found for injected transaction");
Ok(None)
}
}
}
async fn get_transactions(
&self,
transaction_ids: Vec<HashOf<InjectedTransaction>>,
) -> RpcResult<Vec<Option<SignedInjectedTransaction>>> {
tracing::trace!(?transaction_ids, "Called injected_getTransactions");
if transaction_ids.len() > MAX_TRANSACTION_IDS {
return Err(errors::invalid_params(format!(
"Too many transaction ids requested. Maximum is {MAX_TRANSACTION_IDS}.",
)));
}
let transactions = transaction_ids
.into_iter()
.map(|tx_id| self.db.injected_transaction(tx_id))
.collect::<Vec<Option<SignedInjectedTransaction>>>();
Ok(transactions)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ethexe_common::{
Address, PrivateKey, SignedMessage, ValidatorsVec,
db::{GlobalsStorageRO, InjectedStorageRW, OnChainStorageRW, SetGlobals},
injected::{Promise, Receipt},
mock::Mock,
};
fn make_signed_tx() -> SignedInjectedTransaction {
SignedInjectedTransaction::create(PrivateKey::random(), InjectedTransaction::mock(()))
.expect("creating signed injected transaction succeeds")
}
fn make_injected_api(db: Database) -> InjectedApi {
let (sender, _receiver) = mpsc::unbounded_channel();
InjectedApi::new(db, sender)
}
fn set_current_validators(db: &Database, validators: Vec<Address>) {
db.set_validators(
0,
ValidatorsVec::try_from(validators).expect("validators must be non-empty"),
);
}
fn set_validators(db: &Database, era: u64, validators: Vec<Address>) {
db.set_validators(
era,
ValidatorsVec::try_from(validators).expect("validators must be non-empty"),
);
}
fn set_current_era(db: &Database, era: u64) {
let mut globals = db.globals().clone();
globals.latest_synced_eb.header.timestamp = era;
db.set_globals(globals);
}
#[tokio::test]
async fn test_get_transactions_found() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let tx = make_signed_tx();
let tx_hash = tx.data().to_hash();
db.set_injected_transaction(tx.clone());
let result = api.get_transactions(vec![tx_hash]).await.unwrap();
assert_eq!(result, vec![Some(tx)]);
}
#[tokio::test]
async fn test_get_transactions_not_found() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let tx_hash = make_signed_tx().data().to_hash();
let result = api.get_transactions(vec![tx_hash]).await.unwrap();
assert_eq!(result, vec![None]);
}
#[tokio::test]
async fn test_get_transactions_mixed() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let tx1 = make_signed_tx();
let tx2 = make_signed_tx();
let hash1 = tx1.data().to_hash();
let hash2 = tx2.data().to_hash();
db.set_injected_transaction(tx1.clone());
let result = api.get_transactions(vec![hash1, hash2]).await.unwrap();
assert_eq!(result, vec![Some(tx1), None]);
}
#[tokio::test]
async fn test_get_transactions_empty() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let result = api.get_transactions(vec![]).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_get_transactions_exceeds_limit() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let ids = (0..=MAX_TRANSACTION_IDS)
.map(|_| make_signed_tx().data().to_hash())
.collect();
let result = api.get_transactions(ids).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_transaction_receipt_returns_stored_receipt() {
let db = Database::memory();
let api = make_injected_api(db);
let tx_hash = make_signed_tx().data().to_hash();
let promise = Promise::mock(tx_hash);
let compact_receipt =
SignedMessage::create(PrivateKey::random(), Receipt::Promise(promise.to_compact()))
.expect("creating signed receipt succeeds");
set_current_validators(&api.db, vec![compact_receipt.address()]);
api.on_computed_promise(promise.clone());
api.on_tx_receipt(compact_receipt.into());
let receipt = api
.get_transaction_receipt(tx_hash)
.await
.expect("RPC result succeeds")
.expect("receipt is stored");
assert_eq!(receipt.data(), &Receipt::Promise(promise));
}
#[tokio::test]
async fn test_get_transaction_receipt_keeps_previous_era_cached_receipt() {
let db = Database::memory();
let api = make_injected_api(db.clone());
let tx_hash = make_signed_tx().data().to_hash();
let promise = Promise::mock(tx_hash);
let receipt: SignedTxReceipt =
SignedMessage::create(PrivateKey::random(), Receipt::Promise(promise.clone()))
.expect("creating signed receipt succeeds")
.into();
set_validators(&db, 0, vec![receipt.address()]);
set_validators(&db, 1, vec![Address::from(1)]);
set_current_era(&db, 1);
db.set_receipt(&receipt);
let result = api
.get_transaction_receipt(tx_hash)
.await
.expect("RPC result succeeds");
assert_eq!(result, Some(receipt));
}
#[tokio::test]
async fn test_get_transaction_receipt_ignores_unsigned_promise_body() {
let db = Database::memory();
let api = make_injected_api(db);
let tx_hash = make_signed_tx().data().to_hash();
api.on_computed_promise(Promise::mock(tx_hash));
let receipt = api
.get_transaction_receipt(tx_hash)
.await
.expect("RPC result succeeds");
assert_eq!(receipt, None);
}
}