Skip to main content

r402_http/server/
facilitator.rs

1//! A [`r402::facilitator::Facilitator`] implementation that interacts with a _remote_ x402 Facilitator over HTTP.
2//!
3//! This [`FacilitatorClient`] handles the `/verify`, `/settle`, and `/supported` endpoints of a remote facilitator,
4//! and implements the [`r402::facilitator::Facilitator`] trait for compatibility
5//! with x402-based middleware and logic.
6//!
7//! ## Features
8//!
9//! - Uses `reqwest` for async HTTP requests
10//! - Supports optional timeout and headers
11//! - Integrates with `tracing` if the `telemetry` feature is enabled
12//!
13//! ## Error Handling
14//!
15//! Custom error types capture detailed failure contexts, including
16//! - URL construction
17//! - HTTP transport failures
18//! - JSON deserialization errors
19//! - Unexpected HTTP status responses
20//!
21
22use std::fmt::Display;
23use std::sync::Arc;
24use std::time::Duration;
25
26use http::{HeaderMap, StatusCode};
27use r402::facilitator::{BoxFuture, Facilitator, FacilitatorError};
28use r402::proto::{
29    SettleRequest, SettleResponse, SupportedResponse, VerifyRequest, VerifyResponse,
30};
31use reqwest::Client;
32use tokio::sync::RwLock;
33#[cfg(feature = "telemetry")]
34use tracing::{Instrument, Span, instrument};
35use url::Url;
36
37/// TTL cache for [`SupportedResponse`].
38#[derive(Clone, Debug)]
39struct SupportedCacheState {
40    /// The cached response
41    response: SupportedResponse,
42    /// When the cache expires
43    expires_at: std::time::Instant,
44}
45
46/// An encapsulated TTL cache for the `/supported` endpoint response.
47///
48/// Clones share the same cache state via `Arc`, so cached responses are
49/// visible across all clones (e.g. when the middleware clones the
50/// facilitator per-request).
51#[derive(Debug, Clone)]
52pub struct SupportedCache {
53    /// TTL for the cache
54    ttl: Duration,
55    /// Shared cache state (`Arc<RwLock>` so clones hit the same cache)
56    state: Arc<RwLock<Option<SupportedCacheState>>>,
57}
58
59impl SupportedCache {
60    /// Creates a new cache with the given TTL.
61    #[must_use]
62    pub fn new(ttl: Duration) -> Self {
63        Self {
64            ttl,
65            state: Arc::new(RwLock::new(None)),
66        }
67    }
68
69    /// Returns the cached response if valid, None otherwise.
70    pub async fn get(&self) -> Option<SupportedResponse> {
71        let guard = self.state.read().await;
72        let cache = guard.as_ref()?;
73        if std::time::Instant::now() < cache.expires_at {
74            Some(cache.response.clone())
75        } else {
76            None
77        }
78    }
79
80    /// Stores a response in the cache with the configured TTL.
81    pub async fn set(&self, response: SupportedResponse) {
82        let mut guard = self.state.write().await;
83        *guard = Some(SupportedCacheState {
84            response,
85            expires_at: std::time::Instant::now() + self.ttl,
86        });
87    }
88
89    /// Clears the cache.
90    pub async fn clear(&self) {
91        let mut guard = self.state.write().await;
92        *guard = None;
93    }
94}
95
96/// A client for communicating with a remote x402 facilitator.
97///
98/// Handles `/verify`, `/settle`, and `/supported` endpoints via JSON HTTP.
99#[derive(Clone, Debug)]
100pub struct FacilitatorClient {
101    /// Base URL of the facilitator (e.g. `https://facilitator.example/`)
102    base_url: Url,
103    /// Full URL to `POST /verify` requests
104    verify_url: Url,
105    /// Full URL to `POST /settle` requests
106    settle_url: Url,
107    /// Full URL to `GET /supported` requests
108    supported_url: Url,
109    /// Shared Reqwest HTTP client
110    client: Client,
111    /// Optional custom headers sent with each request
112    headers: HeaderMap,
113    /// Optional request timeout
114    timeout: Option<Duration>,
115    /// Cache for the supported endpoint response
116    supported_cache: SupportedCache,
117}
118
119impl Facilitator for FacilitatorClient {
120    fn verify(
121        &self,
122        request: VerifyRequest,
123    ) -> BoxFuture<'_, Result<VerifyResponse, FacilitatorError>> {
124        Box::pin(async move {
125            #[cfg(feature = "telemetry")]
126            let result = with_span(
127                Self::verify(self, &request),
128                tracing::info_span!("x402.facilitator_client.verify", timeout = ?self.timeout),
129            )
130            .await;
131            #[cfg(not(feature = "telemetry"))]
132            let result = Self::verify(self, &request).await;
133            result.map_err(|e| FacilitatorError::Other(Box::new(e)))
134        })
135    }
136
137    fn settle(
138        &self,
139        request: SettleRequest,
140    ) -> BoxFuture<'_, Result<SettleResponse, FacilitatorError>> {
141        Box::pin(async move {
142            #[cfg(feature = "telemetry")]
143            let result = with_span(
144                Self::settle(self, &request),
145                tracing::info_span!("x402.facilitator_client.settle", timeout = ?self.timeout),
146            )
147            .await;
148            #[cfg(not(feature = "telemetry"))]
149            let result = Self::settle(self, &request).await;
150            result.map_err(|e| FacilitatorError::Other(Box::new(e)))
151        })
152    }
153
154    fn supported(&self) -> BoxFuture<'_, Result<SupportedResponse, FacilitatorError>> {
155        Box::pin(async move {
156            Self::supported(self)
157                .await
158                .map_err(|e| FacilitatorError::Other(Box::new(e)))
159        })
160    }
161}
162
163/// Errors that can occur while interacting with a remote facilitator.
164#[derive(Debug, thiserror::Error)]
165pub enum FacilitatorClientError {
166    /// URL parse error.
167    #[error("URL parse error: {context}: {source}")]
168    UrlParse {
169        /// Human-readable context.
170        context: &'static str,
171        /// The underlying parse error.
172        #[source]
173        source: url::ParseError,
174    },
175    /// HTTP transport error.
176    #[error("HTTP error: {context}: {source}")]
177    Http {
178        /// Human-readable context.
179        context: &'static str,
180        /// The underlying reqwest error.
181        #[source]
182        source: reqwest::Error,
183    },
184    /// JSON deserialization error.
185    #[error("Failed to deserialize JSON: {context}: {source}")]
186    JsonDeserialization {
187        /// Human-readable context.
188        context: &'static str,
189        /// The underlying reqwest error.
190        #[source]
191        source: reqwest::Error,
192    },
193    /// Unexpected HTTP status code.
194    #[error("Unexpected HTTP status {status}: {context}: {body}")]
195    HttpStatus {
196        /// Human-readable context.
197        context: &'static str,
198        /// The HTTP status code.
199        status: StatusCode,
200        /// The response body.
201        body: String,
202    },
203    /// Failed to read response body.
204    #[error("Failed to read response body as text: {context}: {source}")]
205    ResponseBodyRead {
206        /// Human-readable context.
207        context: &'static str,
208        /// The underlying reqwest error.
209        #[source]
210        source: reqwest::Error,
211    },
212}
213
214impl FacilitatorClient {
215    /// Default TTL for caching the supported endpoint response (10 minutes).
216    pub const DEFAULT_SUPPORTED_CACHE_TTL: Duration = Duration::from_mins(10);
217
218    /// Returns the base URL used by this client.
219    #[must_use] 
220    pub const fn base_url(&self) -> &Url {
221        &self.base_url
222    }
223
224    /// Returns the computed `./verify` URL relative to [`FacilitatorClient::base_url`].
225    #[must_use] 
226    pub const fn verify_url(&self) -> &Url {
227        &self.verify_url
228    }
229
230    /// Returns the computed `./settle` URL relative to [`FacilitatorClient::base_url`].
231    #[must_use] 
232    pub const fn settle_url(&self) -> &Url {
233        &self.settle_url
234    }
235
236    /// Returns the computed `./supported` URL relative to [`FacilitatorClient::base_url`].
237    #[must_use] 
238    pub const fn supported_url(&self) -> &Url {
239        &self.supported_url
240    }
241
242    /// Returns any custom headers configured on the client.
243    #[must_use] 
244    pub const fn headers(&self) -> &HeaderMap {
245        &self.headers
246    }
247
248    /// Returns the configured timeout, if any.
249    #[must_use] 
250    pub const fn timeout(&self) -> &Option<Duration> {
251        &self.timeout
252    }
253
254    /// Returns a reference to the supported cache.
255    #[must_use] 
256    pub const fn supported_cache(&self) -> &SupportedCache {
257        &self.supported_cache
258    }
259
260    /// Constructs a new [`FacilitatorClient`] from a base URL.
261    ///
262    /// This sets up `./verify`, `./settle`, and `./supported` endpoint URLs relative to the base.
263    ///
264    /// # Errors
265    ///
266    /// Returns [`FacilitatorClientError`] if URL construction fails.
267    pub fn try_new(base_url: Url) -> Result<Self, FacilitatorClientError> {
268        let client = Client::new();
269        let verify_url =
270            base_url
271                .join("./verify")
272                .map_err(|e| FacilitatorClientError::UrlParse {
273                    context: "Failed to construct ./verify URL",
274                    source: e,
275                })?;
276        let settle_url =
277            base_url
278                .join("./settle")
279                .map_err(|e| FacilitatorClientError::UrlParse {
280                    context: "Failed to construct ./settle URL",
281                    source: e,
282                })?;
283        let supported_url =
284            base_url
285                .join("./supported")
286                .map_err(|e| FacilitatorClientError::UrlParse {
287                    context: "Failed to construct ./supported URL",
288                    source: e,
289                })?;
290        Ok(Self {
291            client,
292            base_url,
293            verify_url,
294            settle_url,
295            supported_url,
296            headers: HeaderMap::new(),
297            timeout: None,
298            supported_cache: SupportedCache::new(Self::DEFAULT_SUPPORTED_CACHE_TTL),
299        })
300    }
301
302    /// Attaches custom headers to all future requests.
303    #[must_use]
304    pub fn with_headers(mut self, headers: HeaderMap) -> Self {
305        self.headers = headers;
306        self
307    }
308
309    /// Sets a timeout for all future requests.
310    #[must_use]
311    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
312        self.timeout = Some(timeout);
313        self
314    }
315
316    /// Sets the TTL for caching the supported endpoint response.
317    ///
318    /// Default is 10 minutes. Use [`Self::without_supported_cache()`] to disable caching.
319    #[must_use]
320    pub fn with_supported_cache_ttl(mut self, ttl: Duration) -> Self {
321        self.supported_cache = SupportedCache::new(ttl);
322        self
323    }
324
325    /// Disables caching for the supported endpoint.
326    #[must_use]
327    pub fn without_supported_cache(self) -> Self {
328        self.with_supported_cache_ttl(Duration::ZERO)
329    }
330
331    /// Sends a `POST /verify` request to the facilitator.
332    ///
333    /// # Errors
334    ///
335    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
336    pub async fn verify(
337        &self,
338        request: &VerifyRequest,
339    ) -> Result<VerifyResponse, FacilitatorClientError> {
340        self.post_json(&self.verify_url, "POST /verify", request)
341            .await
342    }
343
344    /// Sends a `POST /settle` request to the facilitator.
345    ///
346    /// # Errors
347    ///
348    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
349    pub async fn settle(
350        &self,
351        request: &SettleRequest,
352    ) -> Result<SettleResponse, FacilitatorClientError> {
353        self.post_json(&self.settle_url, "POST /settle", request)
354            .await
355    }
356
357    /// Sends a `GET /supported` request to the facilitator.
358    /// This is the inner method that always makes an HTTP request.
359    #[cfg_attr(
360        feature = "telemetry",
361        instrument(name = "x402.facilitator_client.supported", skip_all, err)
362    )]
363    async fn supported_inner(&self) -> Result<SupportedResponse, FacilitatorClientError> {
364        self.get_json(&self.supported_url, "GET /supported").await
365    }
366
367    /// Sends a `GET /supported` request to the facilitator.
368    /// Results are cached with a configurable TTL (default: 10 minutes).
369    /// Use `supported_inner()` to bypass the cache.
370    ///
371    /// # Errors
372    ///
373    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
374    pub async fn supported(&self) -> Result<SupportedResponse, FacilitatorClientError> {
375        // Try to get from cache
376        if let Some(response) = self.supported_cache.get().await {
377            return Ok(response);
378        }
379
380        // Cache miss - fetch and cache
381        #[cfg(feature = "telemetry")]
382        tracing::info!("x402.facilitator_client.supported_cache_miss");
383
384        let response = self.supported_inner().await?;
385        self.supported_cache.set(response.clone()).await;
386
387        Ok(response)
388    }
389
390    /// Generic POST helper that handles JSON serialization, error mapping,
391    /// timeout application, and telemetry integration.
392    ///
393    /// `context` is a human-readable identifier used in tracing and error messages (e.g. `"POST /verify"`).
394    #[allow(clippy::needless_pass_by_value)]
395    async fn post_json<T, R>(
396        &self,
397        url: &Url,
398        context: &'static str,
399        payload: &T,
400    ) -> Result<R, FacilitatorClientError>
401    where
402        T: serde::Serialize + Sync + ?Sized,
403        R: serde::de::DeserializeOwned,
404    {
405        let req = self.client.post(url.clone()).json(payload);
406        self.send_and_parse(req, context).await
407    }
408
409    /// Generic GET helper that handles error mapping, timeout application,
410    /// and telemetry integration.
411    ///
412    /// `context` is a human-readable identifier used in tracing and error messages (e.g. `"GET /supported"`).
413    async fn get_json<R>(
414        &self,
415        url: &Url,
416        context: &'static str,
417    ) -> Result<R, FacilitatorClientError>
418    where
419        R: serde::de::DeserializeOwned,
420    {
421        let req = self.client.get(url.clone());
422        self.send_and_parse(req, context).await
423    }
424
425    /// Applies headers, timeout, sends the request, and parses the JSON response.
426    async fn send_and_parse<R>(
427        &self,
428        mut req: reqwest::RequestBuilder,
429        context: &'static str,
430    ) -> Result<R, FacilitatorClientError>
431    where
432        R: serde::de::DeserializeOwned,
433    {
434        for (key, value) in &self.headers {
435            req = req.header(key, value);
436        }
437        if let Some(timeout) = self.timeout {
438            req = req.timeout(timeout);
439        }
440        let http_response = req
441            .send()
442            .await
443            .map_err(|e| FacilitatorClientError::Http { context, source: e })?;
444
445        let result = if http_response.status() == StatusCode::OK {
446            http_response
447                .json::<R>()
448                .await
449                .map_err(|e| FacilitatorClientError::JsonDeserialization { context, source: e })
450        } else {
451            let status = http_response.status();
452            let body = http_response
453                .text()
454                .await
455                .map_err(|e| FacilitatorClientError::ResponseBodyRead { context, source: e })?;
456            Err(FacilitatorClientError::HttpStatus {
457                context,
458                status,
459                body,
460            })
461        };
462
463        record_result_on_span(&result);
464
465        result
466    }
467}
468
469/// Converts a string URL into a `FacilitatorClient`, parsing the URL and calling `try_new`.
470impl TryFrom<&str> for FacilitatorClient {
471    type Error = FacilitatorClientError;
472
473    fn try_from(value: &str) -> Result<Self, Self::Error> {
474        // Normalize: strip trailing slashes and add a single trailing slash
475        let mut normalized = value.trim_end_matches('/').to_string();
476        normalized.push('/');
477        let url = Url::parse(&normalized).map_err(|e| FacilitatorClientError::UrlParse {
478            context: "Failed to parse base url",
479            source: e,
480        })?;
481        Self::try_new(url)
482    }
483}
484
485/// Converts a String URL into a `FacilitatorClient`.
486impl TryFrom<String> for FacilitatorClient {
487    type Error = FacilitatorClientError;
488
489    fn try_from(value: String) -> Result<Self, Self::Error> {
490        Self::try_from(value.as_str())
491    }
492}
493
494/// Records the outcome of a request on a tracing span, including status and errors.
495#[cfg(feature = "telemetry")]
496fn record_result_on_span<R, E: Display>(result: &Result<R, E>) {
497    let span = Span::current();
498    match result {
499        Ok(_) => {
500            span.record("otel.status_code", "OK");
501        }
502        Err(err) => {
503            span.record("otel.status_code", "ERROR");
504            span.record("error.message", tracing::field::display(err));
505            tracing::event!(tracing::Level::ERROR, error = %err, "Request to facilitator failed");
506        }
507    }
508}
509
510/// Records the outcome of a request on a tracing span, including status and errors.
511/// Noop if telemetry feature is off.
512#[cfg(not(feature = "telemetry"))]
513const fn record_result_on_span<R, E: Display>(_result: &Result<R, E>) {}
514
515/// Instruments a future with a given tracing span.
516#[cfg(feature = "telemetry")]
517fn with_span<F: Future>(fut: F, span: Span) -> impl Future<Output = F::Output> {
518    fut.instrument(span)
519}
520
521#[cfg(test)]
522mod tests {
523    use std::collections::HashMap;
524
525    use r402::proto::SupportedPaymentKind;
526    use wiremock::matchers::{method, path};
527    use wiremock::{Mock, MockServer, ResponseTemplate};
528
529    use super::*;
530
531    fn create_test_supported_response() -> SupportedResponse {
532        SupportedResponse {
533            kinds: vec![SupportedPaymentKind {
534                x402_version: 1,
535                scheme: "eip155-exact".to_string(),
536                network: "1".to_string(),
537                extra: None,
538            }],
539            extensions: vec![],
540            signers: HashMap::new(),
541        }
542    }
543
544    #[tokio::test]
545    async fn test_supported_cache_caches_response() {
546        let mock_server = MockServer::start().await;
547        let test_response = create_test_supported_response();
548
549        // Mock the supported endpoint
550        Mock::given(method("GET"))
551            .and(path("/supported"))
552            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
553            .mount(&mock_server)
554            .await;
555
556        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
557
558        // First call should hit the network
559        let result1 = client.supported().await.unwrap();
560        assert_eq!(result1.kinds.len(), 1);
561
562        // Second call should use cache (same mock call count)
563        let result2 = client.supported().await.unwrap();
564        assert_eq!(result2.kinds.len(), 1);
565
566        // Both results should be equal
567        assert_eq!(result1.kinds[0].scheme, result2.kinds[0].scheme);
568    }
569
570    #[tokio::test]
571    async fn test_supported_cache_with_custom_ttl() {
572        let mock_server = MockServer::start().await;
573        let test_response = create_test_supported_response();
574
575        // Mock the supported endpoint
576        Mock::given(method("GET"))
577            .and(path("/supported"))
578            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
579            .mount(&mock_server)
580            .await;
581
582        // Create client with 1ms TTL (essentially no caching)
583        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap())
584            .unwrap()
585            .with_supported_cache_ttl(Duration::from_millis(1));
586
587        // First call
588        let result1 = client.supported().await.unwrap();
589        assert_eq!(result1.kinds.len(), 1);
590
591        // Wait for cache to expire
592        tokio::time::sleep(Duration::from_millis(10)).await;
593
594        // Second call should hit the network again due to expired cache
595        let result2 = client.supported().await.unwrap();
596        assert_eq!(result2.kinds.len(), 1);
597    }
598
599    #[tokio::test]
600    async fn test_supported_cache_disabled() {
601        let mock_server = MockServer::start().await;
602        let test_response = create_test_supported_response();
603
604        // Mock the supported endpoint
605        Mock::given(method("GET"))
606            .and(path("/supported"))
607            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
608            .mount(&mock_server)
609            .await;
610
611        // Create client with caching disabled
612        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap())
613            .unwrap()
614            .without_supported_cache();
615
616        // Each call should hit the network
617        let result1 = client.supported().await.unwrap();
618        let result2 = client.supported().await.unwrap();
619
620        assert_eq!(result1.kinds.len(), 1);
621        assert_eq!(result2.kinds.len(), 1);
622    }
623
624    #[tokio::test]
625    async fn test_supported_cache_shared_across_clones() {
626        let mock_server = MockServer::start().await;
627        let test_response = create_test_supported_response();
628
629        // Mock the supported endpoint — expect exactly 1 request
630        Mock::given(method("GET"))
631            .and(path("/supported"))
632            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
633            .expect(1)
634            .mount(&mock_server)
635            .await;
636
637        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
638
639        // Clone the client — clones share the same cache
640        let client2 = client.clone();
641
642        // Populate cache on first client
643        let result1 = client.supported().await.unwrap();
644        assert_eq!(result1.kinds.len(), 1);
645
646        // Clone should hit the shared cache (no extra HTTP request)
647        let result2 = client2.supported().await.unwrap();
648        assert_eq!(result2.kinds.len(), 1);
649        assert_eq!(result1.kinds[0].scheme, result2.kinds[0].scheme);
650    }
651
652    #[tokio::test]
653    async fn test_supported_inner_bypasses_cache() {
654        let mock_server = MockServer::start().await;
655        let test_response = create_test_supported_response();
656
657        // Mock the supported endpoint
658        Mock::given(method("GET"))
659            .and(path("/supported"))
660            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
661            .mount(&mock_server)
662            .await;
663
664        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
665
666        // Populate cache
667        let _ = client.supported().await.unwrap();
668
669        // supported_inner() should always make HTTP request, bypassing cache
670        let result = client.supported_inner().await.unwrap();
671        assert_eq!(result.kinds.len(), 1);
672    }
673}