battleware_node/
indexer.rs

1use battleware_types::api::Pending;
2#[cfg(test)]
3use battleware_types::execution::Transaction;
4use battleware_types::{api::Summary, Seed};
5#[cfg(test)]
6use battleware_types::{Identity, NAMESPACE};
7#[cfg(test)]
8use commonware_consensus::{threshold_simplex::types::View, Viewable};
9use commonware_cryptography::ed25519::Batch;
10use commonware_cryptography::BatchVerifier;
11#[cfg(test)]
12use commonware_runtime::RwLock;
13use commonware_runtime::Spawner;
14use commonware_runtime::{Clock, Handle};
15use futures::channel::mpsc;
16use futures::{SinkExt, Stream, StreamExt};
17use rand::{CryptoRng, Rng};
18use std::future::Future;
19#[cfg(test)]
20use std::{
21    collections::HashMap,
22    sync::{Arc, Mutex},
23};
24use std::{
25    pin::Pin,
26    task::{Context, Poll},
27    time::Duration,
28};
29use tracing::{error, info, warn};
30
31/// Delay between reconnection attempts when tx_stream fails
32const TX_STREAM_RECONNECT_DELAY: Duration = Duration::from_secs(10);
33
34/// Buffer size for the tx_stream channel
35const TX_STREAM_BUFFER_SIZE: usize = 1_024;
36
37/// Trait for interacting with an indexer.
38pub trait Indexer: Clone + Send + Sync + 'static {
39    type Error: std::error::Error + Send + Sync + 'static;
40
41    /// Upload a seed to the indexer.
42    fn submit_seed(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
43
44    /// Get a stream of transactions from the indexer.
45    fn listen_mempool(
46        &self,
47    ) -> impl Future<
48        Output = Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error>,
49    > + Send;
50
51    /// Upload result
52    fn submit_summary(
53        &self,
54        summary: Summary,
55    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
56}
57
58/// A mock indexer implementation for testing.
59#[cfg(test)]
60#[derive(Clone)]
61pub struct Mock {
62    pub identity: Identity,
63    pub seeds: Arc<Mutex<HashMap<View, Seed>>>,
64    #[allow(clippy::type_complexity)]
65    pub summaries: Arc<RwLock<Vec<(u64, Summary)>>>,
66    #[allow(clippy::type_complexity)]
67    pub tx_sender: Arc<Mutex<Vec<mpsc::UnboundedSender<Result<Pending, std::io::Error>>>>>,
68}
69
70#[cfg(test)]
71impl Mock {
72    pub fn new(identity: Identity) -> Self {
73        Self {
74            identity,
75            seeds: Arc::new(Mutex::new(HashMap::new())),
76            summaries: Arc::new(RwLock::new(Vec::new())),
77            tx_sender: Arc::new(Mutex::new(Vec::new())),
78        }
79    }
80
81    pub fn submit_tx(&self, tx: Transaction) {
82        let mut senders = self.tx_sender.lock().unwrap();
83        senders.retain(|sender| {
84            sender
85                .unbounded_send(Ok(Pending {
86                    transactions: vec![tx.clone()],
87                }))
88                .is_ok()
89        });
90    }
91}
92
93#[cfg(test)]
94impl Indexer for Mock {
95    type Error = std::io::Error;
96
97    async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
98        // Verify the seed
99        assert!(seed.verify(NAMESPACE, &self.identity));
100
101        // Store the seed
102        let mut seeds = self.seeds.lock().unwrap();
103        seeds.insert(seed.view(), seed);
104        Ok(())
105    }
106
107    async fn listen_mempool(
108        &self,
109    ) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
110        let (tx, rx) = mpsc::unbounded();
111        self.tx_sender.lock().unwrap().push(tx);
112        Ok(rx)
113    }
114
115    async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
116        // Verify the summary
117        assert!(summary.verify(&self.identity).is_some());
118
119        // Store the summary
120        let mut summaries = self.summaries.write().await;
121        summaries.push((summary.progress.height, summary));
122
123        Ok(())
124    }
125}
126
127impl Indexer for battleware_client::Client {
128    type Error = battleware_client::Error;
129
130    async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
131        self.submit_seed(seed).await
132    }
133
134    async fn listen_mempool(
135        &self,
136    ) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
137        match self.connect_mempool().await {
138            Ok(stream) => Ok(stream
139                .map(|result| result.map_err(|_| battleware_client::Error::UnexpectedResponse))),
140            Err(_) => Err(battleware_client::Error::UnexpectedResponse),
141        }
142    }
143
144    async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
145        self.submit_summary(summary).await
146    }
147}
148
149/// A stream that wraps the indexer's listen_mempool with automatic reconnection
150pub struct ReconnectingStream<I>
151where
152    I: Indexer,
153{
154    rx: mpsc::Receiver<Result<Pending, I::Error>>,
155    _handle: Handle<()>,
156}
157
158impl<I> ReconnectingStream<I>
159where
160    I: Indexer,
161{
162    pub fn new<E>(context: E, indexer: I) -> Self
163    where
164        E: Spawner + Clock + Rng + CryptoRng,
165    {
166        // Spawn background task that manages connections
167        let (mut tx, rx) = mpsc::channel(TX_STREAM_BUFFER_SIZE);
168        let handle = context.spawn({
169            move |mut context| async move {
170                loop {
171                    // Try to connect
172                    match indexer.listen_mempool().await {
173                        Ok(stream) => {
174                            info!("connected to mempool stream");
175                            let mut stream = Box::pin(stream);
176
177                            // Forward transactions until stream fails
178                            while let Some(result) = stream.next().await {
179                                match result {
180                                    Ok(pending) => {
181                                        // Batch verify transactions
182                                        let mut batcher = Batch::new();
183                                        for tx in &pending.transactions {
184                                            tx.verify_batch(&mut batcher);
185                                        }
186                                        if !batcher.verify(&mut context) {
187                                            warn!("received invalid transaction from indexer");
188                                            return;
189                                        }
190
191                                        // Pass to receiver
192                                        if tx.send(Ok(pending)).await.is_err() {
193                                            warn!("receiver dropped");
194                                            return;
195                                        }
196                                    }
197                                    Err(e) => {
198                                        error!(?e, "mempool stream error");
199                                        break;
200                                    }
201                                }
202                            }
203
204                            warn!("mempool stream ended");
205                        }
206                        Err(e) => {
207                            error!(?e, "failed to connect mempool stream");
208                        }
209                    }
210
211                    // Wait before reconnecting
212                    context.sleep(TX_STREAM_RECONNECT_DELAY).await;
213                }
214            }
215        });
216
217        Self {
218            rx,
219            _handle: handle,
220        }
221    }
222}
223
224impl<I> Stream for ReconnectingStream<I>
225where
226    I: Indexer,
227{
228    type Item = Result<Pending, I::Error>;
229
230    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231        Pin::new(&mut self.rx).poll_next(cx)
232    }
233}
234
235/// A wrapper indexer that provides automatic reconnection for mempool stream
236#[derive(Clone)]
237pub struct ReconnectingIndexer<I, E>
238where
239    I: Indexer,
240    E: Rng + CryptoRng + Spawner + Clock + Clone,
241{
242    inner: I,
243    context: E,
244}
245
246impl<I, E> ReconnectingIndexer<I, E>
247where
248    I: Indexer,
249    E: Rng + CryptoRng + Spawner + Clock + Clone,
250{
251    pub fn new(context: E, inner: I) -> Self {
252        Self { inner, context }
253    }
254}
255
256impl<I, E> Indexer for ReconnectingIndexer<I, E>
257where
258    I: Indexer,
259    E: Rng + CryptoRng + Spawner + Clock + Clone + Send + Sync + 'static,
260{
261    type Error = I::Error;
262
263    async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
264        self.inner.submit_seed(seed).await
265    }
266
267    async fn listen_mempool(
268        &self,
269    ) -> Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error> {
270        Ok(ReconnectingStream::new(
271            self.context.clone(),
272            self.inner.clone(),
273        ))
274    }
275
276    async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
277        self.inner.submit_summary(summary).await
278    }
279}