signet_orders/impls/
tx_cache.rs1use crate::{BundleSubmitter, OrderSource, OrderSubmitter};
2use futures_util::future::Either;
3use futures_util::stream::{self, Stream, StreamExt};
4use signet_bundle::SignetEthBundle;
5use signet_tx_cache::{types::TxCacheSendBundleResponse, TxCache, TxCacheError};
6use signet_types::SignedOrder;
7
8impl OrderSubmitter for TxCache {
9 type Error = TxCacheError;
10
11 async fn submit_order(&self, order: SignedOrder) -> Result<(), Self::Error> {
12 self.forward_order(order).await
13 }
14}
15
16impl OrderSource for TxCache {
17 type Error = TxCacheError;
18
19 fn get_orders(&self) -> impl Stream<Item = Result<SignedOrder, Self::Error>> + Send {
20 stream::unfold(Some(None), move |cursor| async move {
21 let cursor = cursor?;
22
23 match TxCache::get_orders(self, cursor).await {
24 Ok(response) => {
25 let (inner, next_cursor) = response.into_parts();
26 let orders = stream::iter(inner.orders).map(Ok);
27 Some((Either::Left(orders), next_cursor.map(Some)))
28 }
29 Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
30 }
31 })
32 .flatten()
33 }
34}
35
36impl BundleSubmitter for TxCache {
37 type Response = TxCacheSendBundleResponse;
38 type Error = TxCacheError;
39
40 async fn submit_bundle(&self, bundle: SignetEthBundle) -> Result<Self::Response, Self::Error> {
41 self.forward_bundle(bundle).await
42 }
43}