use std::convert::{TryFrom, TryInto};
use log::*;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_transaction_components::transaction_components::Transaction;
use crate::{
mempool::{rpc::MempoolService, service::MempoolHandle},
proto,
};
const LOG_TARGET: &str = "c::mempool::rpc";
pub struct MempoolRpcService {
mempool: MempoolHandle,
}
impl MempoolRpcService {
pub fn new(mempool: MempoolHandle) -> Self {
Self { mempool }
}
#[inline]
pub fn mempool(&self) -> MempoolHandle {
self.mempool.clone()
}
}
fn to_internal_error<T: std::error::Error>(err: T) -> RpcStatus {
error!(target: LOG_TARGET, "Internal error: {err}");
RpcStatus::general_default()
}
#[tari_comms::async_trait]
impl MempoolService for MempoolRpcService {
async fn get_stats(&self, _: Request<()>) -> Result<Response<proto::mempool::StatsResponse>, RpcStatus> {
let stats = self.mempool().get_stats().await.map_err(to_internal_error)?;
Ok(Response::new(stats.into()))
}
async fn get_state(&self, _: Request<()>) -> Result<Response<proto::mempool::StateResponse>, RpcStatus> {
let state = self.mempool().get_state().await.map_err(to_internal_error)?;
Ok(Response::new(state.try_into().map_err(|e: String| {
error!(target: LOG_TARGET, "Internal error: {e}");
RpcStatus::general(&e)
})?))
}
async fn get_transaction_state_by_excess_sig(
&self,
request: Request<proto::types::Signature>,
) -> Result<Response<proto::mempool::TxStorage>, RpcStatus> {
let excess_sig = request
.into_message()
.try_into()
.map_err(|_| RpcStatus::bad_request("Invalid signature received"))?;
let resp = self
.mempool()
.get_tx_state_by_excess_sig(excess_sig)
.await
.map_err(to_internal_error)?;
Ok(Response::new(resp.into()))
}
async fn submit_transaction(
&self,
request: Request<proto::types::Transaction>,
) -> Result<Response<proto::mempool::TxStorage>, RpcStatus> {
let (context, message) = request.into_parts();
let tx = match Transaction::try_from(message) {
Ok(tx) => tx,
Err(err) => {
debug!(
target: LOG_TARGET,
"Received invalid message from peer `{}`: {}",
context.peer_node_id(),
err
);
return Err(RpcStatus::bad_request(&format!("Malformed transaction: {err}")));
},
};
let tx_storage = self.mempool().submit_transaction(tx).await.map_err(to_internal_error)?;
Ok(Response::new(tx_storage.into()))
}
}