Skip to main content

signet_orders/impls/
tx_cache.rs

1use crate::{BundleSubmitter, OrderSource, OrderSubmitter};
2use futures_util::future::Either;
3use futures_util::stream::{self, Stream, StreamExt};
4use signet_bundle::SignetEthBundle;
5use signet_tx_cache::{types::TxCacheSendBundleResponse, TxCache, TxCacheError};
6use signet_types::SignedOrder;
7
8impl OrderSubmitter for TxCache {
9    type Error = TxCacheError;
10
11    async fn submit_order(&self, order: SignedOrder) -> Result<(), Self::Error> {
12        self.forward_order(order).await
13    }
14}
15
16impl OrderSource for TxCache {
17    type Error = TxCacheError;
18
19    fn get_orders(&self) -> impl Stream<Item = Result<SignedOrder, Self::Error>> + Send {
20        stream::unfold(Some(None), move |cursor| async move {
21            let cursor = cursor?;
22
23            match TxCache::get_orders(self, cursor).await {
24                Ok(response) => {
25                    let (inner, next_cursor) = response.into_parts();
26                    let orders = stream::iter(inner.orders).map(Ok);
27                    Some((Either::Left(orders), next_cursor.map(Some)))
28                }
29                Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
30            }
31        })
32        .flatten()
33    }
34}
35
36impl BundleSubmitter for TxCache {
37    type Response = TxCacheSendBundleResponse;
38    type Error = TxCacheError;
39
40    async fn submit_bundle(&self, bundle: SignetEthBundle) -> Result<Self::Response, Self::Error> {
41        self.forward_bundle(bundle).await
42    }
43}