1use battleware_types::api::Pending;
2#[cfg(test)]
3use battleware_types::execution::Transaction;
4use battleware_types::{api::Summary, Seed};
5#[cfg(test)]
6use battleware_types::{Identity, NAMESPACE};
7#[cfg(test)]
8use commonware_consensus::{threshold_simplex::types::View, Viewable};
9use commonware_cryptography::ed25519::Batch;
10use commonware_cryptography::BatchVerifier;
11#[cfg(test)]
12use commonware_runtime::RwLock;
13use commonware_runtime::Spawner;
14use commonware_runtime::{Clock, Handle};
15use futures::channel::mpsc;
16use futures::{SinkExt, Stream, StreamExt};
17use rand::{CryptoRng, Rng};
18use std::future::Future;
19#[cfg(test)]
20use std::{
21 collections::HashMap,
22 sync::{Arc, Mutex},
23};
24use std::{
25 pin::Pin,
26 task::{Context, Poll},
27 time::Duration,
28};
29use tracing::{error, info, warn};
30
31const TX_STREAM_RECONNECT_DELAY: Duration = Duration::from_secs(10);
33
34const TX_STREAM_BUFFER_SIZE: usize = 1_024;
36
37pub trait Indexer: Clone + Send + Sync + 'static {
39 type Error: std::error::Error + Send + Sync + 'static;
40
41 fn submit_seed(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
43
44 fn listen_mempool(
46 &self,
47 ) -> impl Future<
48 Output = Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error>,
49 > + Send;
50
51 fn submit_summary(
53 &self,
54 summary: Summary,
55 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
56}
57
58#[cfg(test)]
60#[derive(Clone)]
61pub struct Mock {
62 pub identity: Identity,
63 pub seeds: Arc<Mutex<HashMap<View, Seed>>>,
64 #[allow(clippy::type_complexity)]
65 pub summaries: Arc<RwLock<Vec<(u64, Summary)>>>,
66 #[allow(clippy::type_complexity)]
67 pub tx_sender: Arc<Mutex<Vec<mpsc::UnboundedSender<Result<Pending, std::io::Error>>>>>,
68}
69
70#[cfg(test)]
71impl Mock {
72 pub fn new(identity: Identity) -> Self {
73 Self {
74 identity,
75 seeds: Arc::new(Mutex::new(HashMap::new())),
76 summaries: Arc::new(RwLock::new(Vec::new())),
77 tx_sender: Arc::new(Mutex::new(Vec::new())),
78 }
79 }
80
81 pub fn submit_tx(&self, tx: Transaction) {
82 let mut senders = self.tx_sender.lock().unwrap();
83 senders.retain(|sender| {
84 sender
85 .unbounded_send(Ok(Pending {
86 transactions: vec![tx.clone()],
87 }))
88 .is_ok()
89 });
90 }
91}
92
93#[cfg(test)]
94impl Indexer for Mock {
95 type Error = std::io::Error;
96
97 async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
98 assert!(seed.verify(NAMESPACE, &self.identity));
100
101 let mut seeds = self.seeds.lock().unwrap();
103 seeds.insert(seed.view(), seed);
104 Ok(())
105 }
106
107 async fn listen_mempool(
108 &self,
109 ) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
110 let (tx, rx) = mpsc::unbounded();
111 self.tx_sender.lock().unwrap().push(tx);
112 Ok(rx)
113 }
114
115 async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
116 assert!(summary.verify(&self.identity).is_some());
118
119 let mut summaries = self.summaries.write().await;
121 summaries.push((summary.progress.height, summary));
122
123 Ok(())
124 }
125}
126
127impl Indexer for battleware_client::Client {
128 type Error = battleware_client::Error;
129
130 async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
131 self.submit_seed(seed).await
132 }
133
134 async fn listen_mempool(
135 &self,
136 ) -> Result<impl Stream<Item = Result<Pending, Self::Error>>, Self::Error> {
137 match self.connect_mempool().await {
138 Ok(stream) => Ok(stream
139 .map(|result| result.map_err(|_| battleware_client::Error::UnexpectedResponse))),
140 Err(_) => Err(battleware_client::Error::UnexpectedResponse),
141 }
142 }
143
144 async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
145 self.submit_summary(summary).await
146 }
147}
148
149pub struct ReconnectingStream<I>
151where
152 I: Indexer,
153{
154 rx: mpsc::Receiver<Result<Pending, I::Error>>,
155 _handle: Handle<()>,
156}
157
158impl<I> ReconnectingStream<I>
159where
160 I: Indexer,
161{
162 pub fn new<E>(context: E, indexer: I) -> Self
163 where
164 E: Spawner + Clock + Rng + CryptoRng,
165 {
166 let (mut tx, rx) = mpsc::channel(TX_STREAM_BUFFER_SIZE);
168 let handle = context.spawn({
169 move |mut context| async move {
170 loop {
171 match indexer.listen_mempool().await {
173 Ok(stream) => {
174 info!("connected to mempool stream");
175 let mut stream = Box::pin(stream);
176
177 while let Some(result) = stream.next().await {
179 match result {
180 Ok(pending) => {
181 let mut batcher = Batch::new();
183 for tx in &pending.transactions {
184 tx.verify_batch(&mut batcher);
185 }
186 if !batcher.verify(&mut context) {
187 warn!("received invalid transaction from indexer");
188 return;
189 }
190
191 if tx.send(Ok(pending)).await.is_err() {
193 warn!("receiver dropped");
194 return;
195 }
196 }
197 Err(e) => {
198 error!(?e, "mempool stream error");
199 break;
200 }
201 }
202 }
203
204 warn!("mempool stream ended");
205 }
206 Err(e) => {
207 error!(?e, "failed to connect mempool stream");
208 }
209 }
210
211 context.sleep(TX_STREAM_RECONNECT_DELAY).await;
213 }
214 }
215 });
216
217 Self {
218 rx,
219 _handle: handle,
220 }
221 }
222}
223
224impl<I> Stream for ReconnectingStream<I>
225where
226 I: Indexer,
227{
228 type Item = Result<Pending, I::Error>;
229
230 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231 Pin::new(&mut self.rx).poll_next(cx)
232 }
233}
234
235#[derive(Clone)]
237pub struct ReconnectingIndexer<I, E>
238where
239 I: Indexer,
240 E: Rng + CryptoRng + Spawner + Clock + Clone,
241{
242 inner: I,
243 context: E,
244}
245
246impl<I, E> ReconnectingIndexer<I, E>
247where
248 I: Indexer,
249 E: Rng + CryptoRng + Spawner + Clock + Clone,
250{
251 pub fn new(context: E, inner: I) -> Self {
252 Self { inner, context }
253 }
254}
255
256impl<I, E> Indexer for ReconnectingIndexer<I, E>
257where
258 I: Indexer,
259 E: Rng + CryptoRng + Spawner + Clock + Clone + Send + Sync + 'static,
260{
261 type Error = I::Error;
262
263 async fn submit_seed(&self, seed: Seed) -> Result<(), Self::Error> {
264 self.inner.submit_seed(seed).await
265 }
266
267 async fn listen_mempool(
268 &self,
269 ) -> Result<impl Stream<Item = Result<Pending, Self::Error>> + Send, Self::Error> {
270 Ok(ReconnectingStream::new(
271 self.context.clone(),
272 self.inner.clone(),
273 ))
274 }
275
276 async fn submit_summary(&self, summary: Summary) -> Result<(), Self::Error> {
277 self.inner.submit_summary(summary).await
278 }
279}