use std::time::Duration;
use tokio::{task::JoinHandle, time::sleep};
use tower::{BoxError, Service, ServiceExt};
use tracing_futures::Instrument;
use crate::components::mempool;
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(5);
pub struct QueueChecker<Mempool> {
mempool: Mempool,
}
impl<Mempool> QueueChecker<Mempool>
where
Mempool:
Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
Mempool::Future: Send,
{
pub fn spawn(mempool: Mempool) -> JoinHandle<Result<(), BoxError>> {
let queue_checker = QueueChecker { mempool };
tokio::spawn(queue_checker.run().in_current_span())
}
pub async fn run(mut self) -> Result<(), BoxError> {
info!("initializing mempool queue checker task");
loop {
sleep(RATE_LIMIT_DELAY).await;
self.check_queue().await?;
}
}
async fn check_queue(&mut self) -> Result<(), BoxError> {
debug!("checking for newly verified mempool transactions");
let response = self
.mempool
.ready()
.await?
.call(mempool::Request::CheckForVerifiedTransactions)
.await?;
match response {
mempool::Response::CheckedForVerifiedTransactions => {}
_ => {
unreachable!("mempool did not respond with checked queue to mempool queue checker")
}
};
Ok(())
}
}