use battleware_types::api::Pending;
#[cfg(test)]
use battleware_types::execution::Transaction;
use battleware_types::{api::Summary, Seed};
#[cfg(test)]
use battleware_types::{Identity, NAMESPACE};
#[cfg(test)]
use commonware_consensus::{threshold_simplex::types::View, Viewable};
use commonware_cryptography::ed25519::Batch;
use commonware_cryptography::BatchVerifier;
#[cfg(test)]
use commonware_runtime::RwLock;
use commonware_runtime::Spawner;
use commonware_runtime::{Clock, Handle};
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use rand::{CryptoRng, Rng};
use std::future::Future;
#[cfg(test)]
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tracing::{error, info, warn};
const TX_STREAM_RECONNECT_DELAY: Duration = Duration::from_secs(10);
const TX_STREAM_BUFFER_SIZE: usize = 1_024;
pub trait Indexer: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
fn submit_seed(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn listen_mempool(
&self,
) -> impl Future<
Output = Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error>,
> + Send;
fn submit_summary(
&self,
summary: Summary,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
#[cfg(test)]
#[derive(Clone)]
pub struct Mock {
pub identity: Identity,
pub seeds: Arc<Mutex<HashMap<View, Seed>>>,
#[allow(clippy::type_complexity)]
pub summaries: Arc<RwLock<Vec<(u64, Summary)>>>,
#[allow(clippy::type_complexity)]
pub tx_sender: Arc<Mutex<Vec<mpsc::UnboundedSender<Result<Pending, std::io::Error>>>>>,
}
#[cfg(test)]
impl Mock {
pub fn new(identity: Identity) -> Self {
Self {
identity,
seeds: Arc::new(Mutex::new(HashMap::new())),
summaries: Arc::new(RwLock::new(Vec::new())),
tx_sender: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn submit_tx(&self, tx: Transaction) {
let mut senders = self.tx_sender.lock().unwrap();
senders.retain(|sender| {
sender
.unbounded_send(Ok(Pending {
transactions: vec![tx.clone()],
}))
.is_ok()
});
}
}
#[cfg(test)]
impl Indexer for Mock {
type Error = std::io::Error;
async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
assert!(seed.verify(NAMESPACE, &self.identity));
let mut seeds = self.seeds.lock().unwrap();
seeds.insert(seed.view(), seed);
Ok(())
}
async fn listen_mempool(
&self,
) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
let (tx, rx) = mpsc::unbounded();
self.tx_sender.lock().unwrap().push(tx);
Ok(rx)
}
async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
assert!(summary.verify(&self.identity).is_some());
let mut summaries = self.summaries.write().await;
summaries.push((summary.progress.height, summary));
Ok(())
}
}
impl Indexer for battleware_client::Client {
type Error = battleware_client::Error;
async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
self.submit_seed(seed).await
}
async fn listen_mempool(
&self,
) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
match self.connect_mempool().await {
Ok(stream) => Ok(stream
.map(|result| result.map_err(|_| battleware_client::Error::UnexpectedResponse))),
Err(_) => Err(battleware_client::Error::UnexpectedResponse),
}
}
async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
self.submit_summary(summary).await
}
}
pub struct ReconnectingStream<I>
where
I: Indexer,
{
rx: mpsc::Receiver<Result<Pending, I::Error>>,
_handle: Handle<()>,
}
impl<I> ReconnectingStream<I>
where
I: Indexer,
{
pub fn new<E>(context: E, indexer: I) -> Self
where
E: Spawner + Clock + Rng + CryptoRng,
{
let (mut tx, rx) = mpsc::channel(TX_STREAM_BUFFER_SIZE);
let handle = context.spawn({
move |mut context| async move {
loop {
match indexer.listen_mempool().await {
Ok(stream) => {
info!("connected to mempool stream");
let mut stream = Box::pin(stream);
while let Some(result) = stream.next().await {
match result {
Ok(pending) => {
let mut batcher = Batch::new();
for tx in &pending.transactions {
tx.verify_batch(&mut batcher);
}
if !batcher.verify(&mut context) {
warn!("received invalid transaction from indexer");
return;
}
if tx.send(Ok(pending)).await.is_err() {
warn!("receiver dropped");
return;
}
}
Err(e) => {
error!(?e, "mempool stream error");
break;
}
}
}
warn!("mempool stream ended");
}
Err(e) => {
error!(?e, "failed to connect mempool stream");
}
}
context.sleep(TX_STREAM_RECONNECT_DELAY).await;
}
}
});
Self {
rx,
_handle: handle,
}
}
}
impl<I> Stream for ReconnectingStream<I>
where
I: Indexer,
{
type Item = Result<Pending, I::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx)
}
}
#[derive(Clone)]
pub struct ReconnectingIndexer<I, E>
where
I: Indexer,
E: Rng + CryptoRng + Spawner + Clock + Clone,
{
inner: I,
context: E,
}
impl<I, E> ReconnectingIndexer<I, E>
where
I: Indexer,
E: Rng + CryptoRng + Spawner + Clock + Clone,
{
pub fn new(context: E, inner: I) -> Self {
Self { inner, context }
}
}
impl<I, E> Indexer for ReconnectingIndexer<I, E>
where
I: Indexer,
E: Rng + CryptoRng + Spawner + Clock + Clone + Send + Sync + 'static,
{
type Error = I::Error;
async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
self.inner.submit_seed(seed).await
}
async fn listen_mempool(
&self,
) -> Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error> {
Ok(ReconnectingStream::new(
self.context.clone(),
self.inner.clone(),
))
}
async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
self.inner.submit_summary(summary).await
}
}