signet_tx_cache/client.rs
1use crate::error::Result;
2use crate::types::{
3 BundleResponse, CacheObject, CacheResponse, OrderKey, OrderList, OrderResponse,
4 TransactionList, TransactionResponse, TxKey,
5};
6use alloy::consensus::TxEnvelope;
7use futures_util::future::Either;
8use futures_util::stream::{self, Stream, StreamExt};
9use serde::{de::DeserializeOwned, Serialize};
10use signet_bundle::SignetEthBundle;
11use signet_constants::parmigiana;
12#[allow(deprecated)]
13use signet_constants::pecorino;
14use signet_types::SignedOrder;
15use tracing::{instrument, warn};
16
17/// The endpoints for the transaction cache.
18const TRANSACTIONS: &str = "transactions";
19const BUNDLES: &str = "bundles";
20const ORDERS: &str = "orders";
21
22/// Signet's Transaction Cache helper.
23/// Forwards GET and POST requests to a tx cache URL.
24#[derive(Debug, Clone)]
25pub struct TxCache {
26 /// The URL of the transaction cache.
27 url: reqwest::Url,
28 /// The reqwest client used to send requests.
29 client: reqwest::Client,
30}
31
32impl TxCache {
33 /// Create a new cache with the given URL and client.
34 pub const fn new_with_client(url: reqwest::Url, client: reqwest::Client) -> Self {
35 Self { url, client }
36 }
37
38 /// Instantiate a new cache with the given URL and a new reqwest client.
39 pub fn new(url: reqwest::Url) -> Self {
40 Self { url, client: reqwest::Client::new() }
41 }
42
43 /// Create a new cache given a string URL.
44 pub fn new_from_string(url: &str) -> Result<Self> {
45 let url = reqwest::Url::parse(url)?;
46 Ok(Self::new(url))
47 }
48
49 /// Connect to the transaction cache with the Parmigiana URL.
50 pub fn parmigiana() -> Self {
51 Self::new_from_string(parmigiana::TX_CACHE_URL).expect("parmigiana tx cache URL is invalid")
52 }
53
54 /// Create a new cache with the Parmigiana URL and a specific [`reqwest::Client`].
55 pub fn parmigiana_with_client(client: reqwest::Client) -> Self {
56 Self::new_with_client(
57 parmigiana::TX_CACHE_URL.parse().expect("parmigiana tx cache URL is invalid"),
58 client,
59 )
60 }
61
62 /// Connect to the transaction cache with the Pecorino URL.
63 #[deprecated(note = "Pecorino is being deprecated in favor of Parmigiana")]
64 #[allow(deprecated)]
65 pub fn pecorino() -> Self {
66 Self::new_from_string(pecorino::TX_CACHE_URL).expect("pecorino tx cache URL is invalid")
67 }
68
69 /// Connect to the transaction cache with the Pecorino URL and a specific [`reqwest::Client`].
70 #[deprecated(note = "Pecorino is being deprecated in favor of Parmigiana")]
71 #[allow(deprecated)]
72 pub fn pecorino_with_client(client: reqwest::Client) -> Self {
73 Self::new_with_client(
74 pecorino::TX_CACHE_URL.parse().expect("pecorino tx cache URL is invalid"),
75 client,
76 )
77 }
78
79 /// Get the client used to send requests
80 pub const fn client(&self) -> &reqwest::Client {
81 &self.client
82 }
83
84 /// Get the URL of the transaction cache.
85 pub const fn url(&self) -> &reqwest::Url {
86 &self.url
87 }
88
89 async fn forward_inner<T: Serialize + Send, R: DeserializeOwned>(
90 &self,
91 join: &'static str,
92 obj: T,
93 ) -> Result<R> {
94 self.forward_inner_raw(join, obj)
95 .await?
96 .error_for_status()?
97 .json::<R>()
98 .await
99 .inspect_err(|e| warn!(%e, "Failed to parse response from transaction cache"))
100 .map_err(Into::into)
101 }
102
103 async fn forward_inner_raw<T: Serialize + Send>(
104 &self,
105 join: &'static str,
106 obj: T,
107 ) -> Result<reqwest::Response> {
108 // Append the path to the URL.
109 let url = self
110 .url
111 .join(join)
112 .inspect_err(|e| warn!(%e, "Failed to join URL. Not forwarding transaction."))?;
113
114 // Send the object and check for success.
115 self.client.post(url).json(&obj).send().await?.error_for_status().map_err(Into::into)
116 }
117
118 async fn get_inner<T>(&self, join: &'static str, query: Option<T::Key>) -> Result<T>
119 where
120 T: DeserializeOwned + CacheObject,
121 {
122 let url = self
123 .url
124 .join(join)
125 .inspect_err(|e| warn!(%e, "Failed to join URL. Not querying transaction cache."))?;
126
127 self.client
128 .get(url)
129 .query(&query)
130 .send()
131 .await
132 .inspect_err(|e| warn!(%e, "Failed to get object from transaction cache."))?
133 .json::<T>()
134 .await
135 .map_err(Into::into)
136 }
137
138 async fn put_inner<T: Serialize + Send, R: DeserializeOwned>(
139 &self,
140 path: &str,
141 obj: T,
142 ) -> Result<R> {
143 let url = self
144 .url
145 .join(path)
146 .inspect_err(|e| warn!(%e, "Failed to join URL. Not updating resource."))?;
147
148 self.client
149 .put(url)
150 .json(&obj)
151 .send()
152 .await?
153 .error_for_status()?
154 .json::<R>()
155 .await
156 .inspect_err(|e| warn!(%e, "Failed to parse response from transaction cache"))
157 .map_err(Into::into)
158 }
159
160 /// Forward a raw transaction to the transaction cache.
161 ///
162 /// This method submits a signed transaction envelope to the cache for
163 /// inclusion in a future block. The transaction will be validated and
164 /// stored, returning a response containing its cache identifier.
165 ///
166 /// # Arguments
167 ///
168 /// * `tx` - A signed [`TxEnvelope`] containing the transaction to forward.
169 ///
170 /// # Returns
171 ///
172 /// A [`TransactionResponse`] containing the transaction's cache
173 /// identifier on success.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the request fails or the transaction cache rejects
178 /// the transaction.
179 #[instrument(skip_all)]
180 pub async fn forward_raw_transaction(&self, tx: TxEnvelope) -> Result<TransactionResponse> {
181 self.forward_inner(TRANSACTIONS, tx).await
182 }
183
184 /// Forward a bundle to the transaction cache.
185 ///
186 /// This method submits a signed bundle to the cache for inclusion in a
187 /// future block. Bundles allow multiple transactions to be submitted
188 /// atomically with ordering guarantees.
189 ///
190 /// # Arguments
191 ///
192 /// * `bundle` - A [`SignetEthBundle`] containing the bundle to forward.
193 ///
194 /// # Returns
195 ///
196 /// A [`BundleResponse`] containing the bundle's cache identifier
197 /// (UUID) on success.
198 ///
199 /// # Errors
200 ///
201 /// Returns an error if the request fails or the transaction cache rejects
202 /// the bundle.
203 #[instrument(skip_all)]
204 pub async fn forward_bundle(&self, bundle: SignetEthBundle) -> Result<BundleResponse> {
205 self.forward_inner(BUNDLES, bundle).await
206 }
207
208 /// Forward a signed order to the transaction cache.
209 ///
210 /// This method submits a signed order to the cache. Orders represent
211 /// user intents that can be filled by solvers or market makers.
212 ///
213 /// # Arguments
214 ///
215 /// * `order` - A [`SignedOrder`] containing the order to forward.
216 ///
217 /// # Errors
218 ///
219 /// Returns an error if the request fails or the transaction cache rejects
220 /// the order.
221 #[instrument(skip_all)]
222 pub async fn forward_order(&self, order: SignedOrder) -> Result<()> {
223 self.forward_inner_raw(ORDERS, order).await.map(drop)
224 }
225
226 /// Get transactions from the transaction cache.
227 ///
228 /// Retrieves transactions from the cache, optionally filtered by a query
229 /// key for pagination. When no query is provided, returns the first page
230 /// of transactions.
231 ///
232 /// # Arguments
233 ///
234 /// * `query` - An optional [`TxKey`] for pagination. Use `None` to get the
235 /// first page, or pass the key from a previous response to get subsequent
236 /// pages.
237 ///
238 /// # Returns
239 ///
240 /// A [`CacheResponse`] containing a [`TransactionList`] with
241 /// the transactions and pagination information. If more transactions are
242 /// available, the response will contain a key to fetch the next page.
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the request fails or the response cannot be parsed.
247 #[instrument(skip_all)]
248 pub async fn get_transactions(
249 &self,
250 query: Option<TxKey>,
251 ) -> Result<CacheResponse<TransactionList>> {
252 self.get_inner(TRANSACTIONS, query).await
253 }
254
255 /// Get signed orders from the transaction cache.
256 ///
257 /// Retrieves signed orders from the cache, optionally filtered by a query
258 /// key for pagination. When no query is provided, returns the first page
259 /// of orders.
260 ///
261 /// # Arguments
262 ///
263 /// * `query` - An optional [`OrderKey`] for pagination. Use `None` to get
264 /// the first page, or pass the key from a previous response to get
265 /// subsequent pages.
266 ///
267 /// # Returns
268 ///
269 /// A [`CacheResponse`] containing an [`OrderList`] with the
270 /// orders and pagination information. If more orders are available, the
271 /// response will contain a key to fetch the next page.
272 ///
273 /// # Errors
274 ///
275 /// Returns an error if the request fails or the response cannot be parsed.
276 #[instrument(skip_all)]
277 pub async fn get_orders(&self, query: Option<OrderKey>) -> Result<CacheResponse<OrderList>> {
278 self.get_inner(ORDERS, query).await
279 }
280
281 /// Update an existing bundle in the transaction cache.
282 ///
283 /// This method sends a PUT request to update a bundle that already exists
284 /// in the cache. The bundle is identified by its UUID and the entire bundle
285 /// content is replaced with the provided data.
286 ///
287 /// # Arguments
288 ///
289 /// * `bundle_id` - The UUID of the bundle to update.
290 /// * `bundle` - The updated [`SignetEthBundle`] to store.
291 ///
292 /// # Returns
293 ///
294 /// A [`BundleResponse`] containing the bundle's UUID on success.
295 ///
296 /// # Errors
297 ///
298 /// Returns [`TxCacheError::NotFound`] if the bundle does not exist.
299 /// Returns an error if the request fails or the response cannot be parsed.
300 ///
301 /// # Example
302 ///
303 /// ```ignore
304 /// use signet_tx_cache::TxCache;
305 /// use signet_bundle::SignetEthBundle;
306 ///
307 /// async fn example() -> Result<(), signet_tx_cache::TxCacheError> {
308 /// let cache = TxCache::parmigiana();
309 /// let bundle_id = "550e8400-e29b-41d4-a716-446655440000";
310 /// // Create bundle from your transaction data
311 /// let bundle: SignetEthBundle = todo!();
312 ///
313 /// let response = cache.update_bundle(bundle_id, bundle).await?;
314 /// println!("Updated bundle: {}", response.id);
315 /// Ok(())
316 /// }
317 /// ```
318 ///
319 /// [`TxCacheError::NotFound`]: crate::error::TxCacheError::NotFound
320 #[instrument(skip_all)]
321 pub async fn update_bundle(
322 &self,
323 bundle_id: &str,
324 bundle: SignetEthBundle,
325 ) -> Result<BundleResponse> {
326 let path = format!("{BUNDLES}/{bundle_id}");
327 self.put_inner(&path, bundle).await
328 }
329
330 /// Stream all transactions from the transaction cache, automatically
331 /// paginating through all available pages.
332 ///
333 /// Returns a [`Stream`] that yields each [`TxEnvelope`] individually,
334 /// fetching subsequent pages as needed. The stream ends when no more
335 /// pages are available or on the first error (which is yielded before
336 /// terminating).
337 pub fn stream_transactions(&self) -> impl Stream<Item = Result<TxEnvelope>> + Send + '_ {
338 stream::unfold(Some(None), move |cursor| async move {
339 let cursor = cursor?;
340
341 match self.get_transactions(cursor).await {
342 Ok(response) => {
343 let (inner, next_cursor) = response.into_parts();
344 let txns = stream::iter(inner.transactions).map(Ok);
345 Some((Either::Left(txns), next_cursor.map(Some)))
346 }
347 Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
348 }
349 })
350 .flatten()
351 }
352
353 /// Stream all signed orders from the transaction cache, automatically
354 /// paginating through all available pages.
355 ///
356 /// Returns a [`Stream`] that yields each [`SignedOrder`] individually,
357 /// fetching subsequent pages as needed. The stream ends when no more
358 /// pages are available or on the first error (which is yielded before
359 /// terminating).
360 pub fn stream_orders(&self) -> impl Stream<Item = Result<SignedOrder>> + Send + '_ {
361 stream::unfold(Some(None), move |cursor| async move {
362 let cursor = cursor?;
363
364 match self.get_orders(cursor).await {
365 Ok(response) => {
366 let (inner, next_cursor) = response.into_parts();
367 let orders = stream::iter(inner.orders).map(Ok);
368 Some((Either::Left(orders), next_cursor.map(Some)))
369 }
370 Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
371 }
372 })
373 .flatten()
374 }
375
376 /// Update an existing order in the transaction cache.
377 ///
378 /// This method sends a PUT request to update an order that already exists
379 /// in the cache. The order is identified by its ID and the entire order
380 /// content is replaced with the provided data.
381 ///
382 /// # Arguments
383 ///
384 /// * `order_id` - The ID of the order to update (hex-encoded B256).
385 /// * `order` - The updated [`SignedOrder`] to store.
386 ///
387 /// # Returns
388 ///
389 /// An [`OrderResponse`] containing the order's ID on success.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`TxCacheError::NotFound`] if the order does not exist.
394 /// Returns an error if the request fails or the response cannot be parsed.
395 ///
396 /// # Example
397 ///
398 /// ```no_run
399 /// # async fn example() -> Result<(), signet_tx_cache::TxCacheError> {
400 /// use signet_tx_cache::TxCache;
401 /// use signet_types::SignedOrder;
402 ///
403 /// let cache = TxCache::parmigiana();
404 /// let order_id = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
405 /// # let order: SignedOrder = todo!();
406 ///
407 /// let response = cache.update_order(order_id, order).await?;
408 /// println!("Updated order: {:?}", response.id);
409 /// # Ok(())
410 /// # }
411 /// ```
412 ///
413 /// [`TxCacheError::NotFound`]: crate::error::TxCacheError::NotFound
414 #[instrument(skip_all)]
415 pub async fn update_order(&self, order_id: &str, order: SignedOrder) -> Result<OrderResponse> {
416 let path = format!("{ORDERS}/{order_id}");
417 self.put_inner(&path, order).await
418 }
419}
420
421#[cfg(feature = "sse")]
422use eventsource_stream::{Event, EventStreamError, Eventsource};
423#[cfg(feature = "sse")]
424use tracing::debug;
425
426#[cfg(feature = "sse")]
427impl TxCache {
428 const TRANSACTIONS_FEED: &str = "transactions/feed";
429 const ORDERS_FEED: &str = "orders/feed";
430
431 fn decode_sse_events<T, S>(events: S) -> impl Stream<Item = Result<T>> + Send
432 where
433 T: DeserializeOwned + Send + 'static,
434 S: Stream<Item = std::result::Result<Event, EventStreamError<reqwest::Error>>> + Send,
435 {
436 events
437 .map(|result| match result {
438 Ok(event) => serde_json::from_str::<T>(&event.data).map_err(Into::into),
439 Err(e) => Err(e.into()),
440 })
441 .scan(false, |errored, result| {
442 if *errored {
443 return std::future::ready(None);
444 }
445 *errored = result.is_err();
446 std::future::ready(Some(result))
447 })
448 }
449
450 /// Connect to an SSE feed endpoint, returning a stream that
451 /// deserializes each event's JSON data into `T`. The stream
452 /// terminates on the first error, which is yielded as the final
453 /// item.
454 ///
455 /// If `secret` is provided, it is sent as a bearer token in the
456 /// request.
457 #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
458 #[instrument(skip_all)]
459 pub async fn subscribe_inner<T: DeserializeOwned + Send + 'static>(
460 &self,
461 feed: &'static str,
462 secret: Option<&str>,
463 ) -> Result<impl Stream<Item = Result<T>> + Send> {
464 let url = self
465 .url
466 .join(feed)
467 .inspect_err(|e| warn!(%e, "Failed to join URL for SSE subscription"))?;
468
469 let req = match secret {
470 Some(s) => self.client.get(url).bearer_auth(s),
471 None => self.client.get(url),
472 };
473 let es = req.send().await?.error_for_status()?.bytes_stream().eventsource();
474
475 debug!(feed, "SSE subscription established");
476
477 Ok(Self::decode_sse_events(es))
478 }
479
480 /// Subscribe to real-time transaction events via SSE.
481 ///
482 /// Connects to the `/transactions/feed` endpoint and returns a
483 /// [`Stream`] that yields each [`TxEnvelope`] as it arrives from
484 /// the server. Unlike [`stream_transactions`], which paginates
485 /// over existing data, this receives new transactions in
486 /// real-time.
487 ///
488 /// The stream terminates on the first error, which is yielded as
489 /// the final item.
490 ///
491 /// [`stream_transactions`]: TxCache::stream_transactions
492 #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
493 #[instrument(skip_all)]
494 pub async fn subscribe_transactions(
495 &self,
496 ) -> Result<impl Stream<Item = Result<TxEnvelope>> + Send> {
497 self.subscribe_inner(Self::TRANSACTIONS_FEED, None).await
498 }
499
500 /// Subscribe to real-time order events via SSE.
501 ///
502 /// Connects to the `/orders/feed` endpoint and returns a
503 /// [`Stream`] that yields each [`SignedOrder`] as it arrives from
504 /// the server. Unlike [`stream_orders`], which paginates over
505 /// existing data, this receives new orders in real-time.
506 ///
507 /// The stream terminates on the first error, which is yielded as
508 /// the final item.
509 ///
510 /// [`stream_orders`]: TxCache::stream_orders
511 #[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
512 #[instrument(skip_all)]
513 pub async fn subscribe_orders(&self) -> Result<impl Stream<Item = Result<SignedOrder>> + Send> {
514 self.subscribe_inner(Self::ORDERS_FEED, None).await
515 }
516}
517
518#[cfg(all(test, feature = "sse"))]
519mod tests {
520 use super::TxCache;
521 use crate::error::TxCacheError;
522 use futures_util::{stream, StreamExt};
523
524 type SseError = eventsource_stream::EventStreamError<reqwest::Error>;
525
526 fn event(data: &str) -> eventsource_stream::Event {
527 eventsource_stream::Event { data: data.to_owned(), ..Default::default() }
528 }
529
530 fn utf8_sse_error() -> SseError {
531 eventsource_stream::EventStreamError::Utf8(
532 String::from_utf8(vec![0xff]).expect_err("invalid UTF-8 should error"),
533 )
534 }
535
536 #[tokio::test]
537 async fn decode_sse_events_deserializes_json_events() {
538 let events = stream::iter([Ok::<_, SseError>(event(r#"{"ok":true}"#))]);
539
540 let decoded: Vec<_> =
541 TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;
542
543 assert_eq!(decoded.len(), 1);
544 assert_eq!(decoded[0].as_ref().unwrap()["ok"], true);
545 }
546
547 #[tokio::test]
548 async fn decode_sse_events_maps_invalid_json_to_deserialization_error() {
549 let events = stream::iter([Ok::<_, SseError>(event("not-json"))]);
550
551 let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);
552
553 match decoded.next().await.expect("stream should yield an error") {
554 Err(TxCacheError::Deserialization(_)) => {}
555 other => panic!("expected deserialization error, got {other:?}"),
556 }
557 assert!(decoded.next().await.is_none(), "stream should terminate after the error");
558 }
559
560 #[tokio::test]
561 async fn decode_sse_events_maps_sse_errors() {
562 let events = stream::iter([Err::<eventsource_stream::Event, _>(utf8_sse_error())]);
563
564 let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);
565
566 match decoded.next().await.expect("stream should yield an error") {
567 Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
568 other => panic!("expected SSE error, got {other:?}"),
569 }
570 assert!(decoded.next().await.is_none(), "stream should terminate after the error");
571 }
572
573 #[tokio::test]
574 async fn decode_sse_events_stops_after_first_error() {
575 let events = stream::iter([
576 Ok::<_, SseError>(event(r#"{"idx":1}"#)),
577 Err(utf8_sse_error()),
578 Ok(event(r#"{"idx":2}"#)),
579 ]);
580
581 let decoded: Vec<_> =
582 TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;
583
584 assert_eq!(decoded.len(), 2);
585 assert_eq!(decoded[0].as_ref().unwrap()["idx"], 1);
586 match &decoded[1] {
587 Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
588 other => panic!("expected final SSE error, got {other:?}"),
589 }
590 }
591}