Skip to main content

r402_http/server/
facilitator.rs

1//! A [`r402::facilitator::Facilitator`] implementation that interacts with a _remote_ x402 Facilitator over HTTP.
2//!
3//! This [`FacilitatorClient`] handles the `/verify`, `/settle`, and `/supported` endpoints of a remote facilitator,
4//! and implements the [`r402::facilitator::Facilitator`] trait for compatibility
5//! with x402-based middleware and logic.
6//!
7//! ## Features
8//!
9//! - Uses `reqwest` for async HTTP requests
10//! - Supports optional timeout and headers
11//! - Integrates with `tracing` if the `telemetry` feature is enabled
12//!
13//! ## Error Handling
14//!
15//! Custom error types capture detailed failure contexts, including
16//! - URL construction
17//! - HTTP transport failures
18//! - JSON deserialization errors
19//! - Unexpected HTTP status responses
20//!
21
22use std::fmt::Display;
23use std::sync::Arc;
24use std::time::Duration;
25
26use http::{HeaderMap, StatusCode};
27use r402::facilitator::{BoxFuture, Facilitator, FacilitatorError};
28use r402::proto::{
29    SettleRequest, SettleResponse, SupportedResponse, VerifyRequest, VerifyResponse,
30};
31use reqwest::Client;
32use tokio::sync::RwLock;
33#[cfg(feature = "telemetry")]
34use tracing::{Instrument, Span, instrument};
35use url::Url;
36
37/// TTL cache for [`SupportedResponse`].
38#[derive(Clone, Debug)]
39struct SupportedCacheState {
40    /// The cached response
41    response: SupportedResponse,
42    /// When the cache expires
43    expires_at: std::time::Instant,
44}
45
46/// An encapsulated TTL cache for the `/supported` endpoint response.
47///
48/// Clones share the same cache state via `Arc`, so cached responses are
49/// visible across all clones (e.g. when the middleware clones the
50/// facilitator per-request).
51#[derive(Debug, Clone)]
52pub struct SupportedCache {
53    /// TTL for the cache
54    ttl: Duration,
55    /// Shared cache state (`Arc<RwLock>` so clones hit the same cache)
56    state: Arc<RwLock<Option<SupportedCacheState>>>,
57}
58
59impl SupportedCache {
60    /// Creates a new cache with the given TTL.
61    #[must_use]
62    pub fn new(ttl: Duration) -> Self {
63        Self {
64            ttl,
65            state: Arc::new(RwLock::new(None)),
66        }
67    }
68
69    /// Returns the cached response if valid, None otherwise.
70    #[allow(
71        clippy::significant_drop_tightening,
72        reason = "read guard scope matches data access"
73    )]
74    pub async fn get(&self) -> Option<SupportedResponse> {
75        let guard = self.state.read().await;
76        let cache = guard.as_ref()?;
77        if std::time::Instant::now() < cache.expires_at {
78            Some(cache.response.clone())
79        } else {
80            None
81        }
82    }
83
84    /// Stores a response in the cache with the configured TTL.
85    pub async fn set(&self, response: SupportedResponse) {
86        let mut guard = self.state.write().await;
87        *guard = Some(SupportedCacheState {
88            response,
89            expires_at: std::time::Instant::now() + self.ttl,
90        });
91    }
92
93    /// Clears the cache.
94    pub async fn clear(&self) {
95        let mut guard = self.state.write().await;
96        *guard = None;
97    }
98}
99
100/// A client for communicating with a remote x402 facilitator.
101///
102/// Handles `/verify`, `/settle`, and `/supported` endpoints via JSON HTTP.
103#[derive(Clone, Debug)]
104pub struct FacilitatorClient {
105    /// Base URL of the facilitator (e.g. `https://facilitator.example/`)
106    base_url: Url,
107    /// Full URL to `POST /verify` requests
108    verify_url: Url,
109    /// Full URL to `POST /settle` requests
110    settle_url: Url,
111    /// Full URL to `GET /supported` requests
112    supported_url: Url,
113    /// Shared Reqwest HTTP client
114    client: Client,
115    /// Optional custom headers sent with each request
116    headers: HeaderMap,
117    /// Optional request timeout
118    timeout: Option<Duration>,
119    /// Cache for the supported endpoint response
120    supported_cache: SupportedCache,
121}
122
123/// Errors that can occur while interacting with a remote facilitator.
124#[derive(Debug, thiserror::Error)]
125pub enum FacilitatorClientError {
126    /// URL parse error.
127    #[error("URL parse error: {context}: {source}")]
128    UrlParse {
129        /// Human-readable context.
130        context: &'static str,
131        /// The underlying parse error.
132        #[source]
133        source: url::ParseError,
134    },
135    /// HTTP transport error.
136    #[error("HTTP error: {context}: {source}")]
137    Http {
138        /// Human-readable context.
139        context: &'static str,
140        /// The underlying reqwest error.
141        #[source]
142        source: reqwest::Error,
143    },
144    /// JSON deserialization error.
145    #[error("Failed to deserialize JSON: {context}: {source}")]
146    JsonDeserialization {
147        /// Human-readable context.
148        context: &'static str,
149        /// The underlying reqwest error.
150        #[source]
151        source: reqwest::Error,
152    },
153    /// Unexpected HTTP status code.
154    #[error("Unexpected HTTP status {status}: {context}: {body}")]
155    HttpStatus {
156        /// Human-readable context.
157        context: &'static str,
158        /// The HTTP status code.
159        status: StatusCode,
160        /// The response body.
161        body: String,
162    },
163    /// Failed to read response body.
164    #[error("Failed to read response body as text: {context}: {source}")]
165    ResponseBodyRead {
166        /// Human-readable context.
167        context: &'static str,
168        /// The underlying reqwest error.
169        #[source]
170        source: reqwest::Error,
171    },
172}
173
174impl FacilitatorClient {
175    /// Default TTL for caching the supported endpoint response (10 minutes).
176    pub const DEFAULT_SUPPORTED_CACHE_TTL: Duration = Duration::from_mins(10);
177
178    /// Returns the base URL used by this client.
179    #[must_use]
180    pub const fn base_url(&self) -> &Url {
181        &self.base_url
182    }
183
184    /// Returns the computed `./verify` URL relative to [`FacilitatorClient::base_url`].
185    #[must_use]
186    pub const fn verify_url(&self) -> &Url {
187        &self.verify_url
188    }
189
190    /// Returns the computed `./settle` URL relative to [`FacilitatorClient::base_url`].
191    #[must_use]
192    pub const fn settle_url(&self) -> &Url {
193        &self.settle_url
194    }
195
196    /// Returns the computed `./supported` URL relative to [`FacilitatorClient::base_url`].
197    #[must_use]
198    pub const fn supported_url(&self) -> &Url {
199        &self.supported_url
200    }
201
202    /// Returns any custom headers configured on the client.
203    #[must_use]
204    pub const fn headers(&self) -> &HeaderMap {
205        &self.headers
206    }
207
208    /// Returns the configured timeout, if any.
209    #[must_use]
210    pub const fn timeout(&self) -> Option<&Duration> {
211        self.timeout.as_ref()
212    }
213
214    /// Returns a reference to the supported cache.
215    #[must_use]
216    pub const fn supported_cache(&self) -> &SupportedCache {
217        &self.supported_cache
218    }
219
220    /// Constructs a new [`FacilitatorClient`] from a base URL.
221    ///
222    /// This sets up `./verify`, `./settle`, and `./supported` endpoint URLs relative to the base.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`FacilitatorClientError`] if URL construction fails.
227    pub fn try_new(base_url: Url) -> Result<Self, FacilitatorClientError> {
228        let client = Client::new();
229        let verify_url =
230            base_url
231                .join("./verify")
232                .map_err(|e| FacilitatorClientError::UrlParse {
233                    context: "Failed to construct ./verify URL",
234                    source: e,
235                })?;
236        let settle_url =
237            base_url
238                .join("./settle")
239                .map_err(|e| FacilitatorClientError::UrlParse {
240                    context: "Failed to construct ./settle URL",
241                    source: e,
242                })?;
243        let supported_url =
244            base_url
245                .join("./supported")
246                .map_err(|e| FacilitatorClientError::UrlParse {
247                    context: "Failed to construct ./supported URL",
248                    source: e,
249                })?;
250        Ok(Self {
251            client,
252            base_url,
253            verify_url,
254            settle_url,
255            supported_url,
256            headers: HeaderMap::new(),
257            timeout: None,
258            supported_cache: SupportedCache::new(Self::DEFAULT_SUPPORTED_CACHE_TTL),
259        })
260    }
261
262    /// Attaches custom headers to all future requests.
263    #[must_use]
264    pub fn with_headers(mut self, headers: HeaderMap) -> Self {
265        self.headers = headers;
266        self
267    }
268
269    /// Sets a timeout for all future requests.
270    #[must_use]
271    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
272        self.timeout = Some(timeout);
273        self
274    }
275
276    /// Sets the TTL for caching the supported endpoint response.
277    ///
278    /// Default is 10 minutes. Use [`Self::without_supported_cache()`] to disable caching.
279    #[must_use]
280    pub fn with_supported_cache_ttl(mut self, ttl: Duration) -> Self {
281        self.supported_cache = SupportedCache::new(ttl);
282        self
283    }
284
285    /// Disables caching for the supported endpoint.
286    #[must_use]
287    pub fn without_supported_cache(self) -> Self {
288        self.with_supported_cache_ttl(Duration::ZERO)
289    }
290
291    /// Sends a `POST /verify` request to the facilitator.
292    ///
293    /// # Errors
294    ///
295    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
296    pub async fn verify(
297        &self,
298        request: &VerifyRequest,
299    ) -> Result<VerifyResponse, FacilitatorClientError> {
300        self.post_json(&self.verify_url, "POST /verify", request)
301            .await
302    }
303
304    /// Sends a `POST /settle` request to the facilitator.
305    ///
306    /// # Errors
307    ///
308    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
309    pub async fn settle(
310        &self,
311        request: &SettleRequest,
312    ) -> Result<SettleResponse, FacilitatorClientError> {
313        self.post_json(&self.settle_url, "POST /settle", request)
314            .await
315    }
316
317    /// Sends a `GET /supported` request to the facilitator.
318    /// This is the inner method that always makes an HTTP request.
319    #[cfg_attr(
320        feature = "telemetry",
321        instrument(name = "x402.facilitator_client.supported", skip_all, err)
322    )]
323    async fn supported_inner(&self) -> Result<SupportedResponse, FacilitatorClientError> {
324        self.get_json(&self.supported_url, "GET /supported").await
325    }
326
327    /// Sends a `GET /supported` request to the facilitator.
328    /// Results are cached with a configurable TTL (default: 10 minutes).
329    /// Use `supported_inner()` to bypass the cache.
330    ///
331    /// # Errors
332    ///
333    /// Returns [`FacilitatorClientError`] if the HTTP request fails.
334    pub async fn supported(&self) -> Result<SupportedResponse, FacilitatorClientError> {
335        // Try to get from cache
336        if let Some(response) = self.supported_cache.get().await {
337            return Ok(response);
338        }
339
340        // Cache miss - fetch and cache
341        #[cfg(feature = "telemetry")]
342        tracing::info!("x402.facilitator_client.supported_cache_miss");
343
344        let response = self.supported_inner().await?;
345        self.supported_cache.set(response.clone()).await;
346
347        Ok(response)
348    }
349
350    /// Generic POST helper that handles JSON serialization, error mapping,
351    /// timeout application, and telemetry integration.
352    ///
353    /// `context` is a human-readable identifier used in tracing and error messages (e.g. `"POST /verify"`).
354    #[allow(
355        clippy::needless_pass_by_value,
356        reason = "context is a static str, clone cost is zero"
357    )]
358    async fn post_json<T, R>(
359        &self,
360        url: &Url,
361        context: &'static str,
362        payload: &T,
363    ) -> Result<R, FacilitatorClientError>
364    where
365        T: serde::Serialize + Sync + ?Sized,
366        R: serde::de::DeserializeOwned,
367    {
368        let req = self.client.post(url.clone()).json(payload);
369        self.send_and_parse(req, context).await
370    }
371
372    /// Generic GET helper that handles error mapping, timeout application,
373    /// and telemetry integration.
374    ///
375    /// `context` is a human-readable identifier used in tracing and error messages (e.g. `"GET /supported"`).
376    async fn get_json<R>(
377        &self,
378        url: &Url,
379        context: &'static str,
380    ) -> Result<R, FacilitatorClientError>
381    where
382        R: serde::de::DeserializeOwned,
383    {
384        let req = self.client.get(url.clone());
385        self.send_and_parse(req, context).await
386    }
387
388    /// Applies headers, timeout, sends the request, and parses the JSON response.
389    async fn send_and_parse<R>(
390        &self,
391        mut req: reqwest::RequestBuilder,
392        context: &'static str,
393    ) -> Result<R, FacilitatorClientError>
394    where
395        R: serde::de::DeserializeOwned,
396    {
397        for (key, value) in &self.headers {
398            req = req.header(key, value);
399        }
400        if let Some(timeout) = self.timeout {
401            req = req.timeout(timeout);
402        }
403        let http_response = req
404            .send()
405            .await
406            .map_err(|e| FacilitatorClientError::Http { context, source: e })?;
407
408        let result = if http_response.status() == StatusCode::OK {
409            http_response
410                .json::<R>()
411                .await
412                .map_err(|e| FacilitatorClientError::JsonDeserialization { context, source: e })
413        } else {
414            let status = http_response.status();
415            let body = http_response
416                .text()
417                .await
418                .map_err(|e| FacilitatorClientError::ResponseBodyRead { context, source: e })?;
419            Err(FacilitatorClientError::HttpStatus {
420                context,
421                status,
422                body,
423            })
424        };
425
426        record_result_on_span(&result);
427
428        result
429    }
430}
431
432impl Facilitator for FacilitatorClient {
433    fn verify(
434        &self,
435        request: VerifyRequest,
436    ) -> BoxFuture<'_, Result<VerifyResponse, FacilitatorError>> {
437        Box::pin(async move {
438            #[cfg(feature = "telemetry")]
439            let result = with_span(
440                Self::verify(self, &request),
441                tracing::info_span!("x402.facilitator_client.verify", timeout = ?self.timeout),
442            )
443            .await;
444            #[cfg(not(feature = "telemetry"))]
445            let result = Self::verify(self, &request).await;
446            result.map_err(|e| FacilitatorError::Other(Box::new(e)))
447        })
448    }
449
450    fn settle(
451        &self,
452        request: SettleRequest,
453    ) -> BoxFuture<'_, Result<SettleResponse, FacilitatorError>> {
454        Box::pin(async move {
455            #[cfg(feature = "telemetry")]
456            let result = with_span(
457                Self::settle(self, &request),
458                tracing::info_span!("x402.facilitator_client.settle", timeout = ?self.timeout),
459            )
460            .await;
461            #[cfg(not(feature = "telemetry"))]
462            let result = Self::settle(self, &request).await;
463            result.map_err(|e| FacilitatorError::Other(Box::new(e)))
464        })
465    }
466
467    fn supported(&self) -> BoxFuture<'_, Result<SupportedResponse, FacilitatorError>> {
468        Box::pin(async move {
469            Self::supported(self)
470                .await
471                .map_err(|e| FacilitatorError::Other(Box::new(e)))
472        })
473    }
474}
475
476/// Converts a string URL into a `FacilitatorClient`, parsing the URL and calling `try_new`.
477impl TryFrom<&str> for FacilitatorClient {
478    type Error = FacilitatorClientError;
479
480    fn try_from(value: &str) -> Result<Self, Self::Error> {
481        // Normalize: strip trailing slashes and add a single trailing slash
482        let mut normalized = value.trim_end_matches('/').to_owned();
483        normalized.push('/');
484        let url = Url::parse(&normalized).map_err(|e| FacilitatorClientError::UrlParse {
485            context: "Failed to parse base url",
486            source: e,
487        })?;
488        Self::try_new(url)
489    }
490}
491
492/// Converts a String URL into a `FacilitatorClient`.
493impl TryFrom<String> for FacilitatorClient {
494    type Error = FacilitatorClientError;
495
496    fn try_from(value: String) -> Result<Self, Self::Error> {
497        Self::try_from(value.as_str())
498    }
499}
500
501/// Records the outcome of a request on a tracing span, including status and errors.
502#[cfg(feature = "telemetry")]
503fn record_result_on_span<R, E: Display>(result: &Result<R, E>) {
504    let span = Span::current();
505    match result {
506        Ok(_) => {
507            span.record("otel.status_code", "OK");
508        }
509        Err(err) => {
510            span.record("otel.status_code", "ERROR");
511            span.record("error.message", tracing::field::display(err));
512            tracing::event!(tracing::Level::ERROR, error = %err, "Request to facilitator failed");
513        }
514    }
515}
516
517/// Records the outcome of a request on a tracing span, including status and errors.
518/// Noop if telemetry feature is off.
519#[cfg(not(feature = "telemetry"))]
520const fn record_result_on_span<R, E: Display>(_result: &Result<R, E>) {}
521
522/// Instruments a future with a given tracing span.
523#[cfg(feature = "telemetry")]
524fn with_span<F: Future>(fut: F, span: Span) -> impl Future<Output = F::Output> {
525    fut.instrument(span)
526}
527
528#[cfg(test)]
529#[allow(
530    clippy::indexing_slicing,
531    reason = "test assertions with known-length slices"
532)]
533mod tests {
534    use std::collections::HashMap;
535
536    use r402::proto::SupportedPaymentKind;
537    use wiremock::matchers::{method, path};
538    use wiremock::{Mock, MockServer, ResponseTemplate};
539
540    use super::*;
541
542    fn create_test_supported_response() -> SupportedResponse {
543        SupportedResponse {
544            kinds: vec![SupportedPaymentKind {
545                x402_version: 1,
546                scheme: "eip155-exact".to_owned(),
547                network: "1".to_owned(),
548                extra: None,
549            }],
550            extensions: vec![],
551            signers: HashMap::new(),
552        }
553    }
554
555    #[tokio::test]
556    async fn test_supported_cache_caches_response() {
557        let mock_server = MockServer::start().await;
558        let test_response = create_test_supported_response();
559
560        // Mock the supported endpoint
561        Mock::given(method("GET"))
562            .and(path("/supported"))
563            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
564            .mount(&mock_server)
565            .await;
566
567        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
568
569        // First call should hit the network
570        let result1 = client.supported().await.unwrap();
571        assert_eq!(result1.kinds.len(), 1);
572
573        // Second call should use cache (same mock call count)
574        let result2 = client.supported().await.unwrap();
575        assert_eq!(result2.kinds.len(), 1);
576
577        // Both results should be equal
578        assert_eq!(result1.kinds[0].scheme, result2.kinds[0].scheme);
579    }
580
581    #[tokio::test]
582    async fn test_supported_cache_with_custom_ttl() {
583        let mock_server = MockServer::start().await;
584        let test_response = create_test_supported_response();
585
586        // Mock the supported endpoint
587        Mock::given(method("GET"))
588            .and(path("/supported"))
589            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
590            .mount(&mock_server)
591            .await;
592
593        // Create client with 1ms TTL (essentially no caching)
594        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap())
595            .unwrap()
596            .with_supported_cache_ttl(Duration::from_millis(1));
597
598        // First call
599        let result1 = client.supported().await.unwrap();
600        assert_eq!(result1.kinds.len(), 1);
601
602        // Wait for cache to expire
603        tokio::time::sleep(Duration::from_millis(10)).await;
604
605        // Second call should hit the network again due to expired cache
606        let result2 = client.supported().await.unwrap();
607        assert_eq!(result2.kinds.len(), 1);
608    }
609
610    #[tokio::test]
611    async fn test_supported_cache_disabled() {
612        let mock_server = MockServer::start().await;
613        let test_response = create_test_supported_response();
614
615        // Mock the supported endpoint
616        Mock::given(method("GET"))
617            .and(path("/supported"))
618            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
619            .mount(&mock_server)
620            .await;
621
622        // Create client with caching disabled
623        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap())
624            .unwrap()
625            .without_supported_cache();
626
627        // Each call should hit the network
628        let result1 = client.supported().await.unwrap();
629        let result2 = client.supported().await.unwrap();
630
631        assert_eq!(result1.kinds.len(), 1);
632        assert_eq!(result2.kinds.len(), 1);
633    }
634
635    #[tokio::test]
636    async fn test_supported_cache_shared_across_clones() {
637        let mock_server = MockServer::start().await;
638        let test_response = create_test_supported_response();
639
640        // Mock the supported endpoint — expect exactly 1 request
641        Mock::given(method("GET"))
642            .and(path("/supported"))
643            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
644            .expect(1)
645            .mount(&mock_server)
646            .await;
647
648        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
649
650        // Clone the client — clones share the same cache
651        let client2 = client.clone();
652
653        // Populate cache on first client
654        let result1 = client.supported().await.unwrap();
655        assert_eq!(result1.kinds.len(), 1);
656
657        // Clone should hit the shared cache (no extra HTTP request)
658        let result2 = client2.supported().await.unwrap();
659        assert_eq!(result2.kinds.len(), 1);
660        assert_eq!(result1.kinds[0].scheme, result2.kinds[0].scheme);
661    }
662
663    #[tokio::test]
664    async fn test_supported_inner_bypasses_cache() {
665        let mock_server = MockServer::start().await;
666        let test_response = create_test_supported_response();
667
668        // Mock the supported endpoint
669        Mock::given(method("GET"))
670            .and(path("/supported"))
671            .respond_with(ResponseTemplate::new(200).set_body_json(&test_response))
672            .mount(&mock_server)
673            .await;
674
675        let client = FacilitatorClient::try_new(mock_server.uri().parse::<Url>().unwrap()).unwrap();
676
677        // Populate cache
678        let _ = client.supported().await.unwrap();
679
680        // supported_inner() should always make HTTP request, bypassing cache
681        let result = client.supported_inner().await.unwrap();
682        assert_eq!(result.kinds.len(), 1);
683    }
684}