use tari_common_types::types::CompressedSignature;
use tari_service_framework::{Service, reply_channel::SenderService};
use tari_transaction_components::transaction_components::Transaction;
use crate::mempool::{
StateResponse,
StatsResponse,
TxStorageResponse,
service::{MempoolRequest, MempoolResponse, MempoolServiceError},
};
pub type LocalMempoolRequester = SenderService<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>;
#[derive(Clone)]
pub struct LocalMempoolService {
request_sender: LocalMempoolRequester,
}
impl LocalMempoolService {
pub fn new(request_sender: LocalMempoolRequester) -> Self {
LocalMempoolService { request_sender }
}
pub async fn get_mempool_stats(&mut self) -> Result<StatsResponse, MempoolServiceError> {
match self.request_sender.call(MempoolRequest::GetStats).await?? {
MempoolResponse::Stats(s) => Ok(s),
_ => Err(MempoolServiceError::UnexpectedApiResponse),
}
}
pub async fn get_mempool_state(&mut self) -> Result<StateResponse, MempoolServiceError> {
match self.request_sender.call(MempoolRequest::GetState).await?? {
MempoolResponse::State(s) => Ok(s),
_ => Err(MempoolServiceError::UnexpectedApiResponse),
}
}
pub async fn submit_transaction(
&mut self,
transaction: Transaction,
) -> Result<TxStorageResponse, MempoolServiceError> {
match self
.request_sender
.call(MempoolRequest::SubmitTransaction(transaction))
.await??
{
MempoolResponse::TxStorage(s) => Ok(s),
_ => Err(MempoolServiceError::UnexpectedApiResponse),
}
}
pub async fn get_transaction_state_by_excess_sig(
&mut self,
sig: CompressedSignature,
) -> Result<TxStorageResponse, MempoolServiceError> {
match self
.request_sender
.call(MempoolRequest::GetTxStateByExcessSig(sig))
.await??
{
MempoolResponse::TxStorage(s) => Ok(s),
_ => Err(MempoolServiceError::UnexpectedApiResponse),
}
}
}
#[cfg(test)]
mod test {
use futures::StreamExt;
use tari_service_framework::reply_channel::{Receiver, unbounded};
use tokio::task;
use crate::mempool::{
MempoolServiceError,
StatsResponse,
service::{MempoolRequest, MempoolResponse, local_service::LocalMempoolService},
};
pub type LocalMempoolRequestStream = Receiver<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>;
fn request_stats() -> StatsResponse {
StatsResponse {
unconfirmed_txs: 3,
reorg_txs: 4,
unconfirmed_weight: 1000,
}
}
async fn mock_handler(mut rx: LocalMempoolRequestStream) {
while let Some(req) = rx.next().await {
let (req, reply_channel) = req.split();
let res = match req {
MempoolRequest::GetStats => Ok(MempoolResponse::Stats(request_stats())),
_ => Err(MempoolServiceError::UnexpectedApiResponse),
};
reply_channel.send(res).unwrap();
}
}
#[tokio::test]
async fn mempool_stats() {
let (tx, rx) = unbounded();
let mut service = LocalMempoolService::new(tx);
task::spawn(mock_handler(rx));
let stats = service.get_mempool_stats().await;
let stats = stats.expect("get_mempool_stats should have succeeded");
assert_eq!(stats, request_stats());
}
#[tokio::test]
async fn mempool_stats_from_multiple() {
let (tx, rx) = unbounded();
let mut service = LocalMempoolService::new(tx);
let mut service2 = service.clone();
task::spawn(mock_handler(rx));
let stats = service.get_mempool_stats().await;
let stats = stats.expect("get_mempool_stats should have succeeded");
assert_eq!(stats, request_stats());
let stats = service2.get_mempool_stats().await;
let stats = stats.expect("get_mempool_stats should have succeeded");
assert_eq!(stats, request_stats());
}
}