Skip to main content

signet_tx_cache/
client.rs

1use crate::error::Result;
2use crate::types::{
3    BundleResponse, CacheObject, CacheResponse, OrderKey, OrderList, OrderResponse,
4    TransactionList, TransactionResponse, TxKey,
5};
6use alloy::consensus::TxEnvelope;
7use futures_util::future::Either;
8use futures_util::stream::{self, Stream, StreamExt};
9use serde::{de::DeserializeOwned, Serialize};
10use signet_bundle::SignetEthBundle;
11use signet_constants::parmigiana;
12#[allow(deprecated)]
13use signet_constants::pecorino;
14use signet_types::SignedOrder;
15use tracing::{instrument, warn};
16
17/// The endpoints for the transaction cache.
18const TRANSACTIONS: &str = "transactions";
19const BUNDLES: &str = "bundles";
20const ORDERS: &str = "orders";
21
22/// Signet's Transaction Cache helper.
23/// Forwards GET and POST requests to a tx cache URL.
24#[derive(Debug, Clone)]
25pub struct TxCache {
26    /// The URL of the transaction cache.
27    url: reqwest::Url,
28    /// The reqwest client used to send requests.
29    client: reqwest::Client,
30}
31
32impl TxCache {
33    /// Create a new cache with the given URL and client.
34    pub const fn new_with_client(url: reqwest::Url, client: reqwest::Client) -> Self {
35        Self { url, client }
36    }
37
38    /// Instantiate a new cache with the given URL and a new reqwest client.
39    pub fn new(url: reqwest::Url) -> Self {
40        Self { url, client: reqwest::Client::new() }
41    }
42
43    /// Create a new cache given a string URL.
44    pub fn new_from_string(url: &str) -> Result<Self> {
45        let url = reqwest::Url::parse(url)?;
46        Ok(Self::new(url))
47    }
48
49    /// Connect to the transaction cache with the Parmigiana URL.
50    pub fn parmigiana() -> Self {
51        Self::new_from_string(parmigiana::TX_CACHE_URL).expect("parmigiana tx cache URL is invalid")
52    }
53
54    /// Create a new cache with the Parmigiana URL and a specific [`reqwest::Client`].
55    pub fn parmigiana_with_client(client: reqwest::Client) -> Self {
56        Self::new_with_client(
57            parmigiana::TX_CACHE_URL.parse().expect("parmigiana tx cache URL is invalid"),
58            client,
59        )
60    }
61
62    /// Connect to the transaction cache with the Pecorino URL.
63    #[deprecated(note = "Pecorino is being deprecated in favor of Parmigiana")]
64    #[allow(deprecated)]
65    pub fn pecorino() -> Self {
66        Self::new_from_string(pecorino::TX_CACHE_URL).expect("pecorino tx cache URL is invalid")
67    }
68
69    /// Connect to the transaction cache with the Pecorino URL and a specific [`reqwest::Client`].
70    #[deprecated(note = "Pecorino is being deprecated in favor of Parmigiana")]
71    #[allow(deprecated)]
72    pub fn pecorino_with_client(client: reqwest::Client) -> Self {
73        Self::new_with_client(
74            pecorino::TX_CACHE_URL.parse().expect("pecorino tx cache URL is invalid"),
75            client,
76        )
77    }
78
79    /// Get the client used to send requests
80    pub const fn client(&self) -> &reqwest::Client {
81        &self.client
82    }
83
84    /// Get the URL of the transaction cache.
85    pub const fn url(&self) -> &reqwest::Url {
86        &self.url
87    }
88
89    async fn forward_inner<T: Serialize + Send, R: DeserializeOwned>(
90        &self,
91        join: &'static str,
92        obj: T,
93    ) -> Result<R> {
94        self.forward_inner_raw(join, obj)
95            .await?
96            .error_for_status()?
97            .json::<R>()
98            .await
99            .inspect_err(|e| warn!(%e, "Failed to parse response from transaction cache"))
100            .map_err(Into::into)
101    }
102
103    async fn forward_inner_raw<T: Serialize + Send>(
104        &self,
105        join: &'static str,
106        obj: T,
107    ) -> Result<reqwest::Response> {
108        // Append the path to the URL.
109        let url = self
110            .url
111            .join(join)
112            .inspect_err(|e| warn!(%e, "Failed to join URL. Not forwarding transaction."))?;
113
114        // Send the object and check for success.
115        self.client.post(url).json(&obj).send().await?.error_for_status().map_err(Into::into)
116    }
117
118    async fn get_inner<T>(&self, join: &'static str, query: Option<T::Key>) -> Result<T>
119    where
120        T: DeserializeOwned + CacheObject,
121    {
122        let url = self
123            .url
124            .join(join)
125            .inspect_err(|e| warn!(%e, "Failed to join URL. Not querying transaction cache."))?;
126
127        self.client
128            .get(url)
129            .query(&query)
130            .send()
131            .await
132            .inspect_err(|e| warn!(%e, "Failed to get object from transaction cache."))?
133            .json::<T>()
134            .await
135            .map_err(Into::into)
136    }
137
138    async fn put_inner<T: Serialize + Send, R: DeserializeOwned>(
139        &self,
140        path: &str,
141        obj: T,
142    ) -> Result<R> {
143        let url = self
144            .url
145            .join(path)
146            .inspect_err(|e| warn!(%e, "Failed to join URL. Not updating resource."))?;
147
148        self.client
149            .put(url)
150            .json(&obj)
151            .send()
152            .await?
153            .error_for_status()?
154            .json::<R>()
155            .await
156            .inspect_err(|e| warn!(%e, "Failed to parse response from transaction cache"))
157            .map_err(Into::into)
158    }
159
160    /// Forward a raw transaction to the transaction cache.
161    ///
162    /// This method submits a signed transaction envelope to the cache for
163    /// inclusion in a future block. The transaction will be validated and
164    /// stored, returning a response containing its cache identifier.
165    ///
166    /// # Arguments
167    ///
168    /// * `tx` - A signed [`TxEnvelope`] containing the transaction to forward.
169    ///
170    /// # Returns
171    ///
172    /// A [`TransactionResponse`] containing the transaction's cache
173    /// identifier on success.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the request fails or the transaction cache rejects
178    /// the transaction.
179    #[instrument(skip_all)]
180    pub async fn forward_raw_transaction(&self, tx: TxEnvelope) -> Result<TransactionResponse> {
181        self.forward_inner(TRANSACTIONS, tx).await
182    }
183
184    /// Forward a bundle to the transaction cache.
185    ///
186    /// This method submits a signed bundle to the cache for inclusion in a
187    /// future block. Bundles allow multiple transactions to be submitted
188    /// atomically with ordering guarantees.
189    ///
190    /// # Arguments
191    ///
192    /// * `bundle` - A [`SignetEthBundle`] containing the bundle to forward.
193    ///
194    /// # Returns
195    ///
196    /// A [`BundleResponse`] containing the bundle's cache identifier
197    /// (UUID) on success.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the request fails or the transaction cache rejects
202    /// the bundle.
203    #[instrument(skip_all)]
204    pub async fn forward_bundle(&self, bundle: SignetEthBundle) -> Result<BundleResponse> {
205        self.forward_inner(BUNDLES, bundle).await
206    }
207
208    /// Forward a signed order to the transaction cache.
209    ///
210    /// This method submits a signed order to the cache. Orders represent
211    /// user intents that can be filled by solvers or market makers.
212    ///
213    /// # Arguments
214    ///
215    /// * `order` - A [`SignedOrder`] containing the order to forward.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the request fails or the transaction cache rejects
220    /// the order.
221    #[instrument(skip_all)]
222    pub async fn forward_order(&self, order: SignedOrder) -> Result<()> {
223        self.forward_inner_raw(ORDERS, order).await.map(drop)
224    }
225
226    /// Get transactions from the transaction cache.
227    ///
228    /// Retrieves transactions from the cache, optionally filtered by a query
229    /// key for pagination. When no query is provided, returns the first page
230    /// of transactions.
231    ///
232    /// # Arguments
233    ///
234    /// * `query` - An optional [`TxKey`] for pagination. Use `None` to get the
235    ///   first page, or pass the key from a previous response to get subsequent
236    ///   pages.
237    ///
238    /// # Returns
239    ///
240    /// A [`CacheResponse`] containing a [`TransactionList`] with
241    /// the transactions and pagination information. If more transactions are
242    /// available, the response will contain a key to fetch the next page.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the request fails or the response cannot be parsed.
247    #[instrument(skip_all)]
248    pub async fn get_transactions(
249        &self,
250        query: Option<TxKey>,
251    ) -> Result<CacheResponse<TransactionList>> {
252        self.get_inner(TRANSACTIONS, query).await
253    }
254
255    /// Get signed orders from the transaction cache.
256    ///
257    /// Retrieves signed orders from the cache, optionally filtered by a query
258    /// key for pagination. When no query is provided, returns the first page
259    /// of orders.
260    ///
261    /// # Arguments
262    ///
263    /// * `query` - An optional [`OrderKey`] for pagination. Use `None` to get
264    ///   the first page, or pass the key from a previous response to get
265    ///   subsequent pages.
266    ///
267    /// # Returns
268    ///
269    /// A [`CacheResponse`] containing an [`OrderList`] with the
270    /// orders and pagination information. If more orders are available, the
271    /// response will contain a key to fetch the next page.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the request fails or the response cannot be parsed.
276    #[instrument(skip_all)]
277    pub async fn get_orders(&self, query: Option<OrderKey>) -> Result<CacheResponse<OrderList>> {
278        self.get_inner(ORDERS, query).await
279    }
280
281    /// Update an existing bundle in the transaction cache.
282    ///
283    /// This method sends a PUT request to update a bundle that already exists
284    /// in the cache. The bundle is identified by its UUID and the entire bundle
285    /// content is replaced with the provided data.
286    ///
287    /// # Arguments
288    ///
289    /// * `bundle_id` - The UUID of the bundle to update.
290    /// * `bundle` - The updated [`SignetEthBundle`] to store.
291    ///
292    /// # Returns
293    ///
294    /// A [`BundleResponse`] containing the bundle's UUID on success.
295    ///
296    /// # Errors
297    ///
298    /// Returns [`TxCacheError::NotFound`] if the bundle does not exist.
299    /// Returns an error if the request fails or the response cannot be parsed.
300    ///
301    /// # Example
302    ///
303    /// ```ignore
304    /// use signet_tx_cache::TxCache;
305    /// use signet_bundle::SignetEthBundle;
306    ///
307    /// async fn example() -> Result<(), signet_tx_cache::TxCacheError> {
308    ///     let cache = TxCache::parmigiana();
309    ///     let bundle_id = "550e8400-e29b-41d4-a716-446655440000";
310    ///     // Create bundle from your transaction data
311    ///     let bundle: SignetEthBundle = todo!();
312    ///
313    ///     let response = cache.update_bundle(bundle_id, bundle).await?;
314    ///     println!("Updated bundle: {}", response.id);
315    ///     Ok(())
316    /// }
317    /// ```
318    ///
319    /// [`TxCacheError::NotFound`]: crate::error::TxCacheError::NotFound
320    #[instrument(skip_all)]
321    pub async fn update_bundle(
322        &self,
323        bundle_id: &str,
324        bundle: SignetEthBundle,
325    ) -> Result<BundleResponse> {
326        let path = format!("{BUNDLES}/{bundle_id}");
327        self.put_inner(&path, bundle).await
328    }
329
330    /// Stream all transactions from the transaction cache, automatically
331    /// paginating through all available pages.
332    ///
333    /// Returns a [`Stream`] that yields each [`TxEnvelope`] individually,
334    /// fetching subsequent pages as needed. The stream ends when no more
335    /// pages are available or on the first error (which is yielded before
336    /// terminating).
337    pub fn stream_transactions(&self) -> impl Stream<Item = Result<TxEnvelope>> + Send + '_ {
338        stream::unfold(Some(None), move |cursor| async move {
339            let cursor = cursor?;
340
341            match self.get_transactions(cursor).await {
342                Ok(response) => {
343                    let (inner, next_cursor) = response.into_parts();
344                    let txns = stream::iter(inner.transactions).map(Ok);
345                    Some((Either::Left(txns), next_cursor.map(Some)))
346                }
347                Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
348            }
349        })
350        .flatten()
351    }
352
353    /// Stream all signed orders from the transaction cache, automatically
354    /// paginating through all available pages.
355    ///
356    /// Returns a [`Stream`] that yields each [`SignedOrder`] individually,
357    /// fetching subsequent pages as needed. The stream ends when no more
358    /// pages are available or on the first error (which is yielded before
359    /// terminating).
360    pub fn stream_orders(&self) -> impl Stream<Item = Result<SignedOrder>> + Send + '_ {
361        stream::unfold(Some(None), move |cursor| async move {
362            let cursor = cursor?;
363
364            match self.get_orders(cursor).await {
365                Ok(response) => {
366                    let (inner, next_cursor) = response.into_parts();
367                    let orders = stream::iter(inner.orders).map(Ok);
368                    Some((Either::Left(orders), next_cursor.map(Some)))
369                }
370                Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
371            }
372        })
373        .flatten()
374    }
375
376    /// Update an existing order in the transaction cache.
377    ///
378    /// This method sends a PUT request to update an order that already exists
379    /// in the cache. The order is identified by its ID and the entire order
380    /// content is replaced with the provided data.
381    ///
382    /// # Arguments
383    ///
384    /// * `order_id` - The ID of the order to update (hex-encoded B256).
385    /// * `order` - The updated [`SignedOrder`] to store.
386    ///
387    /// # Returns
388    ///
389    /// An [`OrderResponse`] containing the order's ID on success.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`TxCacheError::NotFound`] if the order does not exist.
394    /// Returns an error if the request fails or the response cannot be parsed.
395    ///
396    /// # Example
397    ///
398    /// ```no_run
399    /// # async fn example() -> Result<(), signet_tx_cache::TxCacheError> {
400    /// use signet_tx_cache::TxCache;
401    /// use signet_types::SignedOrder;
402    ///
403    /// let cache = TxCache::parmigiana();
404    /// let order_id = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
405    /// # let order: SignedOrder = todo!();
406    ///
407    /// let response = cache.update_order(order_id, order).await?;
408    /// println!("Updated order: {:?}", response.id);
409    /// # Ok(())
410    /// # }
411    /// ```
412    ///
413    /// [`TxCacheError::NotFound`]: crate::error::TxCacheError::NotFound
414    #[instrument(skip_all)]
415    pub async fn update_order(&self, order_id: &str, order: SignedOrder) -> Result<OrderResponse> {
416        let path = format!("{ORDERS}/{order_id}");
417        self.put_inner(&path, order).await
418    }
419}
420
421#[cfg(feature = "sse")]
422use eventsource_stream::{Event, EventStreamError, Eventsource};
423#[cfg(feature = "sse")]
424use tracing::debug;
425
426#[cfg(feature = "sse")]
427impl TxCache {
428    const TRANSACTIONS_FEED: &str = "transactions/feed";
429    const ORDERS_FEED: &str = "orders/feed";
430
431    fn decode_sse_events<T, S>(events: S) -> impl Stream<Item = Result<T>> + Send
432    where
433        T: DeserializeOwned + Send + 'static,
434        S: Stream<Item = std::result::Result<Event, EventStreamError<reqwest::Error>>> + Send,
435    {
436        events
437            .map(|result| match result {
438                Ok(event) => serde_json::from_str::<T>(&event.data).map_err(Into::into),
439                Err(e) => Err(e.into()),
440            })
441            .scan(false, |errored, result| {
442                if *errored {
443                    return std::future::ready(None);
444                }
445                *errored = result.is_err();
446                std::future::ready(Some(result))
447            })
448    }
449
450    /// Connect to an SSE feed endpoint, returning a stream that
451    /// deserializes each event's JSON data into `T`. The stream
452    /// terminates on the first error, which is yielded as the final
453    /// item.
454    ///
455    /// If `secret` is provided, it is sent as a bearer token in the
456    /// request.
457    #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
458    #[instrument(skip_all)]
459    pub async fn subscribe_inner<T: DeserializeOwned + Send + 'static>(
460        &self,
461        feed: &'static str,
462        secret: Option<&str>,
463    ) -> Result<impl Stream<Item = Result<T>> + Send> {
464        let url = self
465            .url
466            .join(feed)
467            .inspect_err(|e| warn!(%e, "Failed to join URL for SSE subscription"))?;
468
469        let req = match secret {
470            Some(s) => self.client.get(url).bearer_auth(s),
471            None => self.client.get(url),
472        };
473        let es = req.send().await?.error_for_status()?.bytes_stream().eventsource();
474
475        debug!(feed, "SSE subscription established");
476
477        Ok(Self::decode_sse_events(es))
478    }
479
480    /// Subscribe to real-time transaction events via SSE.
481    ///
482    /// Connects to the `/transactions/feed` endpoint and returns a
483    /// [`Stream`] that yields each [`TxEnvelope`] as it arrives from
484    /// the server. Unlike [`stream_transactions`], which paginates
485    /// over existing data, this receives new transactions in
486    /// real-time.
487    ///
488    /// The stream terminates on the first error, which is yielded as
489    /// the final item.
490    ///
491    /// [`stream_transactions`]: TxCache::stream_transactions
492    #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
493    #[instrument(skip_all)]
494    pub async fn subscribe_transactions(
495        &self,
496    ) -> Result<impl Stream<Item = Result<TxEnvelope>> + Send> {
497        self.subscribe_inner(Self::TRANSACTIONS_FEED, None).await
498    }
499
500    /// Subscribe to real-time order events via SSE.
501    ///
502    /// Connects to the `/orders/feed` endpoint and returns a
503    /// [`Stream`] that yields each [`SignedOrder`] as it arrives from
504    /// the server. Unlike [`stream_orders`], which paginates over
505    /// existing data, this receives new orders in real-time.
506    ///
507    /// The stream terminates on the first error, which is yielded as
508    /// the final item.
509    ///
510    /// [`stream_orders`]: TxCache::stream_orders
511    #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
512    #[instrument(skip_all)]
513    pub async fn subscribe_orders(&self) -> Result<impl Stream<Item = Result<SignedOrder>> + Send> {
514        self.subscribe_inner(Self::ORDERS_FEED, None).await
515    }
516}
517
518#[cfg(all(test, feature = "sse"))]
519mod tests {
520    use super::TxCache;
521    use crate::error::TxCacheError;
522    use futures_util::{stream, StreamExt};
523
524    type SseError = eventsource_stream::EventStreamError<reqwest::Error>;
525
526    fn event(data: &str) -> eventsource_stream::Event {
527        eventsource_stream::Event { data: data.to_owned(), ..Default::default() }
528    }
529
530    fn utf8_sse_error() -> SseError {
531        eventsource_stream::EventStreamError::Utf8(
532            String::from_utf8(vec![0xff]).expect_err("invalid UTF-8 should error"),
533        )
534    }
535
536    #[tokio::test]
537    async fn decode_sse_events_deserializes_json_events() {
538        let events = stream::iter([Ok::<_, SseError>(event(r#"{"ok":true}"#))]);
539
540        let decoded: Vec<_> =
541            TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;
542
543        assert_eq!(decoded.len(), 1);
544        assert_eq!(decoded[0].as_ref().unwrap()["ok"], true);
545    }
546
547    #[tokio::test]
548    async fn decode_sse_events_maps_invalid_json_to_deserialization_error() {
549        let events = stream::iter([Ok::<_, SseError>(event("not-json"))]);
550
551        let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);
552
553        match decoded.next().await.expect("stream should yield an error") {
554            Err(TxCacheError::Deserialization(_)) => {}
555            other => panic!("expected deserialization error, got {other:?}"),
556        }
557        assert!(decoded.next().await.is_none(), "stream should terminate after the error");
558    }
559
560    #[tokio::test]
561    async fn decode_sse_events_maps_sse_errors() {
562        let events = stream::iter([Err::<eventsource_stream::Event, _>(utf8_sse_error())]);
563
564        let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);
565
566        match decoded.next().await.expect("stream should yield an error") {
567            Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
568            other => panic!("expected SSE error, got {other:?}"),
569        }
570        assert!(decoded.next().await.is_none(), "stream should terminate after the error");
571    }
572
573    #[tokio::test]
574    async fn decode_sse_events_stops_after_first_error() {
575        let events = stream::iter([
576            Ok::<_, SseError>(event(r#"{"idx":1}"#)),
577            Err(utf8_sse_error()),
578            Ok(event(r#"{"idx":2}"#)),
579        ]);
580
581        let decoded: Vec<_> =
582            TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;
583
584        assert_eq!(decoded.len(), 2);
585        assert_eq!(decoded[0].as_ref().unwrap()["idx"], 1);
586        match &decoded[1] {
587            Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
588            other => panic!("expected final SSE error, got {other:?}"),
589        }
590    }
591}