jacquard_common/
xrpc.rs

1//! # Stateless XRPC utilities and request/response mapping
2//!
3//! Mapping overview:
4//! - Success (2xx): parse body into the endpoint's typed output.
5//! - 400: try typed error; on failure, fall back to a generic XRPC error (with
6//!   `nsid`, `method`, and `http_status`) and map common auth errors.
7//! - 401: if `WWW-Authenticate` is present, return
8//!   `ClientError::Auth(AuthError::Other(header))` so higher layers (OAuth/DPoP)
9//!   can inspect `error="invalid_token"` or `error="use_dpop_nonce"` and refresh/retry.
10//!   If the header is absent, parse the body and map auth errors to
11//!   `AuthError::TokenExpired`/`InvalidToken`.
12
13#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19    StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::http_client::HttpClient;
28#[cfg(feature = "streaming")]
29use crate::http_client::HttpClientExt;
30use crate::types::value::Data;
31use crate::{AuthorizationToken, error::AuthError};
32use crate::{CowStr, error::XrpcResult};
33use crate::{IntoStatic, error::DecodeError};
34use crate::{error::TransportError, types::value::RawData};
35use bytes::Bytes;
36use http::{
37    HeaderName, HeaderValue, Request, StatusCode,
38    header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46    BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47    SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48    SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52/// Error type for encoding XRPC requests
53#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55    /// Failed to serialize query parameters
56    #[error("Failed to serialize query: {0}")]
57    Query(
58        #[from]
59        #[source]
60        serde_html_form::ser::Error,
61    ),
62    /// Failed to serialize JSON body
63    #[error("Failed to serialize JSON: {0}")]
64    Json(
65        #[from]
66        #[source]
67        serde_json::Error,
68    ),
69    /// Other encoding error
70    #[error("Encoding error: {0}")]
71    Other(String),
72}
73
74/// XRPC method type
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77    /// Query (HTTP GET)
78    Query,
79    /// Procedure (HTTP POST)
80    Procedure(&'static str),
81}
82
83impl XrpcMethod {
84    /// Get the HTTP method string
85    pub const fn as_str(&self) -> &'static str {
86        match self {
87            Self::Query => "GET",
88            Self::Procedure(_) => "POST",
89        }
90    }
91
92    /// Get the body encoding type for this method (procedures only)
93    pub const fn body_encoding(&self) -> Option<&'static str> {
94        match self {
95            Self::Query => None,
96            Self::Procedure(enc) => Some(enc),
97        }
98    }
99}
100
101/// Trait for XRPC request types (queries and procedures)
102///
103/// This trait provides metadata about XRPC endpoints including the NSID,
104/// HTTP method, encoding, and associated output type.
105///
106/// The trait is implemented on the request parameters/input type itself.
107pub trait XrpcRequest: Serialize {
108    /// The NSID for this XRPC method
109    const NSID: &'static str;
110
111    /// XRPC method (query/GET or procedure/POST)
112    const METHOD: XrpcMethod;
113
114    /// Response type returned from the XRPC call (marker struct)
115    type Response: XrpcResp;
116
117    /// Encode the request body for procedures.
118    ///
119    /// Default implementation serializes to JSON. Override for non-JSON encodings.
120    fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121        Ok(serde_json::to_vec(self)?)
122    }
123
124    /// Decode the request body for procedures.
125    ///
126    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
127    fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError>
128    where
129        Self: Deserialize<'de>,
130    {
131        let body: Self = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
132
133        Ok(Box::new(body))
134    }
135}
136
137/// Trait for XRPC Response types
138///
139/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types
140pub trait XrpcResp {
141    /// The NSID for this XRPC method
142    const NSID: &'static str;
143
144    /// Output encoding (MIME type)
145    const ENCODING: &'static str;
146
147    /// Response output type
148    type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
149
150    /// Error type for this request
151    type Err<'de>: Error + Deserialize<'de> + IntoStatic;
152
153    /// Output body encoding function, similar to the request-side type
154    fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
155        Ok(serde_json::to_vec(output)?)
156    }
157
158    /// Decode the response output body.
159    ///
160    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
161    fn decode_output<'de>(body: &'de [u8]) -> Result<Self::Output<'de>, DecodeError>
162    where
163        Self::Output<'de>: Deserialize<'de>,
164    {
165        let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
166
167        Ok(body)
168    }
169}
170
171/// XRPC server endpoint trait
172///
173/// Defines the fully-qualified path and method, as well as request and response types
174/// This exists primarily to work around lifetime issues for crates like Axum
175/// by moving the lifetime from the trait itself into an associated type.
176///
177/// It is implemented by the code generation on a marker struct, like the client-side [XrpcResp] trait.
178pub trait XrpcEndpoint {
179    /// Fully-qualified path ('/xrpc/\[nsid\]') where this endpoint should live on the server
180    const PATH: &'static str;
181    /// XRPC method (query/GET or procedure/POST)
182    const METHOD: XrpcMethod;
183    /// XRPC Request data type
184    type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
185    /// XRPC Response data type
186    type Response: XrpcResp;
187}
188
189/// Error type for XRPC endpoints that don't define any errors
190#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
191pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
192
193impl<'de> fmt::Display for GenericError<'de> {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        self.0.fmt(f)
196    }
197}
198
199impl Error for GenericError<'_> {}
200
201impl IntoStatic for GenericError<'_> {
202    type Output = GenericError<'static>;
203    fn into_static(self) -> Self::Output {
204        GenericError(self.0.into_static())
205    }
206}
207
208/// Per-request options for XRPC calls.
209#[derive(Debug, Default, Clone)]
210pub struct CallOptions<'a> {
211    /// Optional Authorization to apply (`Bearer` or `DPoP`).
212    pub auth: Option<AuthorizationToken<'a>>,
213    /// `atproto-proxy` header value.
214    pub atproto_proxy: Option<CowStr<'a>>,
215    /// `atproto-accept-labelers` header values.
216    pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
217    /// Extra headers to attach to this request.
218    pub extra_headers: Vec<(HeaderName, HeaderValue)>,
219}
220
221impl IntoStatic for CallOptions<'_> {
222    type Output = CallOptions<'static>;
223
224    fn into_static(self) -> Self::Output {
225        CallOptions {
226            auth: self.auth.map(|auth| auth.into_static()),
227            atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
228            atproto_accept_labelers: self
229                .atproto_accept_labelers
230                .map(|labelers| labelers.into_static()),
231            extra_headers: self.extra_headers,
232        }
233    }
234}
235
236/// Extension for stateless XRPC calls on any `HttpClient`.
237///
238/// Example
239/// ```no_run
240/// # #[tokio::main]
241/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
242/// use jacquard_common::xrpc::XrpcExt;
243/// use jacquard_common::http_client::HttpClient;
244///
245/// let http = reqwest::Client::new();
246/// let base = url::Url::parse("https://public.api.bsky.app")?;
247/// // let resp = http.xrpc(base).send(&request).await?;
248/// # Ok(())
249/// # }
250/// ```
251pub trait XrpcExt: HttpClient {
252    /// Start building an XRPC call for the given base URL.
253    fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
254    where
255        Self: Sized,
256    {
257        XrpcCall {
258            client: self,
259            base,
260            opts: CallOptions::default(),
261        }
262    }
263}
264
265impl<T: HttpClient> XrpcExt for T {}
266
267/// Nicer alias for Xrpc response type
268pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
269
270/// Stateful XRPC call trait
271#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
272pub trait XrpcClient: HttpClient {
273    /// Get the base URI for the client.
274    fn base_uri(&self) -> impl Future<Output = Url>;
275
276    /// Get the call options for the client.
277    fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
278        async { CallOptions::default() }
279    }
280
281    /// Send an XRPC request and parse the response
282    #[cfg(not(target_arch = "wasm32"))]
283    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
284    where
285        R: XrpcRequest + Send + Sync,
286        <R as XrpcRequest>::Response: Send + Sync,
287        Self: Sync;
288
289    /// Send an XRPC request and parse the response
290    #[cfg(target_arch = "wasm32")]
291    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
292    where
293        R: XrpcRequest + Send + Sync,
294        <R as XrpcRequest>::Response: Send + Sync;
295
296    /// Send an XRPC request and parse the response
297    #[cfg(not(target_arch = "wasm32"))]
298    fn send_with_opts<R>(
299        &self,
300        request: R,
301        opts: CallOptions<'_>,
302    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
303    where
304        R: XrpcRequest + Send + Sync,
305        <R as XrpcRequest>::Response: Send + Sync,
306        Self: Sync;
307
308    /// Send an XRPC request with custom options and parse the response
309    #[cfg(target_arch = "wasm32")]
310    fn send_with_opts<R>(
311        &self,
312        request: R,
313        opts: CallOptions<'_>,
314    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
315    where
316        R: XrpcRequest + Send + Sync,
317        <R as XrpcRequest>::Response: Send + Sync;
318}
319
320/// Stateful XRPC streaming client trait
321#[cfg(feature = "streaming")]
322pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
323    /// Send an XRPC request and stream the response
324    #[cfg(not(target_arch = "wasm32"))]
325    fn download<R>(
326        &self,
327        request: R,
328    ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
329    where
330        R: XrpcRequest + Send + Sync,
331        <R as XrpcRequest>::Response: Send + Sync,
332        Self: Sync;
333
334    /// Send an XRPC request and stream the response
335    #[cfg(target_arch = "wasm32")]
336    fn download<R>(
337        &self,
338        request: R,
339    ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
340    where
341        R: XrpcRequest + Send + Sync,
342        <R as XrpcRequest>::Response: Send + Sync;
343
344    /// Stream an XRPC procedure call and its response
345    #[cfg(not(target_arch = "wasm32"))]
346    fn stream<S>(
347        &self,
348        stream: XrpcProcedureSend<S::Frame<'static>>,
349    ) -> impl Future<
350        Output = Result<
351            XrpcResponseStream<
352                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
353            >,
354            StreamError,
355        >,
356    >
357    where
358        S: XrpcProcedureStream + 'static,
359        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
360        Self: Sync;
361
362    /// Stream an XRPC procedure call and its response
363    #[cfg(target_arch = "wasm32")]
364    fn stream<S>(
365        &self,
366        stream: XrpcProcedureSend<S::Frame<'static>>,
367    ) -> impl Future<
368        Output = Result<
369            XrpcResponseStream<
370                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
371            >,
372            StreamError,
373        >,
374    >
375    where
376        S: XrpcProcedureStream + 'static,
377        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
378}
379
380/// Stateless XRPC call builder.
381///
382/// Example (per-request overrides)
383/// ```no_run
384/// # #[tokio::main]
385/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
386/// use jacquard_common::xrpc::XrpcExt;
387/// use jacquard_common::{AuthorizationToken, CowStr};
388///
389/// let http = reqwest::Client::new();
390/// let base = url::Url::parse("https://public.api.bsky.app")?;
391/// let call = http
392///     .xrpc(base)
393///     .auth(AuthorizationToken::Bearer(CowStr::from("ACCESS_JWT")))
394///     .accept_labelers(vec![CowStr::from("did:plc:labelerid")])
395///     .header(http::header::USER_AGENT, http::HeaderValue::from_static("jacquard-example"));
396/// // let resp = call.send(&request).await?;
397/// # Ok(())
398/// # }
399/// ```
400pub struct XrpcCall<'a, C: HttpClient> {
401    pub(crate) client: &'a C,
402    pub(crate) base: Url,
403    pub(crate) opts: CallOptions<'a>,
404}
405
406impl<'a, C: HttpClient> XrpcCall<'a, C> {
407    /// Apply Authorization to this call.
408    pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
409        self.opts.auth = Some(token);
410        self
411    }
412    /// Set `atproto-proxy` header for this call.
413    pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
414        self.opts.atproto_proxy = Some(proxy);
415        self
416    }
417    /// Set `atproto-accept-labelers` header(s) for this call.
418    pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
419        self.opts.atproto_accept_labelers = Some(labelers);
420        self
421    }
422    /// Add an extra header.
423    pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
424        self.opts.extra_headers.push((name, value));
425        self
426    }
427    /// Replace the builder's options entirely.
428    pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
429        self.opts = opts;
430        self
431    }
432
433    /// Send the given typed XRPC request and return a response wrapper.
434    ///
435    /// Note on 401 handling:
436    /// - When the server returns 401 with a `WWW-Authenticate` header, this surfaces as
437    ///   `ClientError::Auth(AuthError::Other(header))` so higher layers (e.g., OAuth/DPoP) can
438    ///   inspect the header for `error="invalid_token"` or `error="use_dpop_nonce"` and react
439    ///   (refresh/retry). If the header is absent, the 401 body flows through to `Response` and
440    ///   can be parsed/mapped to `AuthError` as appropriate.
441    #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
442    pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
443    where
444        R: XrpcRequest,
445        <R as XrpcRequest>::Response: Send + Sync,
446    {
447        let http_request = build_http_request(&self.base, request, &self.opts)
448            .map_err(crate::error::TransportError::from)?;
449
450        let http_response = self
451            .client
452            .send_http(http_request)
453            .await
454            .map_err(|e| crate::error::TransportError::Other(Box::new(e)))?;
455
456        process_response(http_response)
457    }
458}
459
460/// Process the HTTP response from the server into a proper xrpc response statelessly.
461///
462/// Exposed to make things more easily pluggable
463#[inline]
464pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
465where
466    Resp: XrpcResp,
467{
468    let status = http_response.status();
469    // If the server returned 401 with a WWW-Authenticate header, expose it so higher layers
470    // (e.g., DPoP handling) can detect `error="invalid_token"` and trigger refresh.
471    if status.as_u16() == 401 {
472        if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
473            return Err(crate::error::ClientError::Auth(
474                crate::error::AuthError::Other(hv.clone()),
475            ));
476        }
477    }
478    let buffer = Bytes::from(http_response.into_body());
479
480    if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
481        return Err(crate::error::HttpError {
482            status,
483            body: Some(buffer),
484        }
485        .into());
486    }
487
488    Ok(Response::new(buffer, status))
489}
490
491/// HTTP headers commonly used in XRPC requests
492pub enum Header {
493    /// Content-Type header
494    ContentType,
495    /// Authorization header
496    Authorization,
497    /// `atproto-proxy` header - specifies which service (app server or other atproto service) the user's PDS should forward requests to as appropriate.
498    ///
499    /// See: <https://atproto.com/specs/xrpc#service-proxying>
500    AtprotoProxy,
501    /// `atproto-accept-labelers` header used by clients to request labels from specific labelers to be included and applied in the response. See [label](https://atproto.com/specs/label) specification for details.
502    AtprotoAcceptLabelers,
503}
504
505impl From<Header> for HeaderName {
506    fn from(value: Header) -> Self {
507        match value {
508            Header::ContentType => CONTENT_TYPE,
509            Header::Authorization => AUTHORIZATION,
510            Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
511            Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
512        }
513    }
514}
515
516/// Build an HTTP request for an XRPC call given base URL and options
517pub fn build_http_request<'s, R>(
518    base: &Url,
519    req: &R,
520    opts: &CallOptions<'_>,
521) -> core::result::Result<Request<Vec<u8>>, crate::error::TransportError>
522where
523    R: XrpcRequest,
524{
525    let mut url = base.clone();
526    let mut path = url.path().trim_end_matches('/').to_owned();
527    path.push_str("/xrpc/");
528    path.push_str(<R as XrpcRequest>::NSID);
529    url.set_path(&path);
530
531    if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
532        let qs = serde_html_form::to_string(&req)
533            .map_err(|e| crate::error::TransportError::InvalidRequest(e.to_string()))?;
534        if !qs.is_empty() {
535            url.set_query(Some(&qs));
536        } else {
537            url.set_query(None);
538        }
539    }
540
541    let method = match <R as XrpcRequest>::METHOD {
542        XrpcMethod::Query => http::Method::GET,
543        XrpcMethod::Procedure(_) => http::Method::POST,
544    };
545
546    let mut builder = Request::builder().method(method).uri(url.as_str());
547
548    if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
549        builder = builder.header(Header::ContentType, encoding);
550    }
551    let output_encoding = <R::Response as XrpcResp>::ENCODING;
552    builder = builder.header(http::header::ACCEPT, output_encoding);
553
554    if let Some(token) = &opts.auth {
555        let hv = match token {
556            AuthorizationToken::Bearer(t) => {
557                HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
558            }
559            AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
560        }
561        .map_err(|e| {
562            TransportError::InvalidRequest(format!("Invalid authorization token: {}", e))
563        })?;
564        builder = builder.header(Header::Authorization, hv);
565    }
566
567    if let Some(proxy) = &opts.atproto_proxy {
568        builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
569    }
570    if let Some(labelers) = &opts.atproto_accept_labelers {
571        if !labelers.is_empty() {
572            let joined = labelers
573                .iter()
574                .map(|s| s.as_ref())
575                .collect::<Vec<_>>()
576                .join(", ");
577            builder = builder.header(Header::AtprotoAcceptLabelers, joined);
578        }
579    }
580    for (name, value) in &opts.extra_headers {
581        builder = builder.header(name, value);
582    }
583
584    let body = if let XrpcMethod::Procedure(_) = R::METHOD {
585        req.encode_body()
586            .map_err(|e| TransportError::InvalidRequest(e.to_string()))?
587    } else {
588        vec![]
589    };
590
591    builder
592        .body(body)
593        .map_err(|e| TransportError::InvalidRequest(e.to_string()))
594}
595
596/// XRPC response wrapper that owns the response buffer
597///
598/// Allows borrowing from the buffer when parsing to avoid unnecessary allocations.
599/// Generic over the response marker type (e.g., `GetAuthorFeedResponse`), not the request.
600pub struct Response<Resp>
601where
602    Resp: XrpcResp, // HRTB: Resp works with any lifetime
603{
604    _marker: PhantomData<fn() -> Resp>,
605    buffer: Bytes,
606    status: StatusCode,
607}
608
609impl<R> Response<R>
610where
611    R: XrpcResp,
612{
613    /// Create a new response from a buffer and status code
614    pub fn new(buffer: Bytes, status: StatusCode) -> Self {
615        Self {
616            buffer,
617            status,
618            _marker: PhantomData,
619        }
620    }
621
622    /// Get the HTTP status code
623    pub fn status(&self) -> StatusCode {
624        self.status
625    }
626
627    /// Get the raw buffer
628    pub fn buffer(&self) -> &Bytes {
629        &self.buffer
630    }
631
632    /// Parse the response, borrowing from the internal buffer
633    pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
634        // 200: parse as output
635        if self.status.is_success() {
636            match R::decode_output(&self.buffer) {
637                Ok(output) => Ok(output),
638                Err(e) => Err(XrpcError::Decode(e)),
639            }
640        // 400: try typed XRPC error, fallback to generic error
641        } else if self.status.as_u16() == 400 {
642            match serde_json::from_slice::<_>(&self.buffer) {
643                Ok(error) => Err(XrpcError::Xrpc(error)),
644                Err(_) => {
645                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
646                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
647                        Ok(mut generic) => {
648                            generic.nsid = R::NSID;
649                            generic.method = ""; // method info only available on request
650                            generic.http_status = self.status;
651                            // Map auth-related errors to AuthError
652                            match generic.error.as_str() {
653                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
654                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
655                                _ => Err(XrpcError::Generic(generic)),
656                            }
657                        }
658                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
659                    }
660                }
661            }
662        // 401: always auth error
663        } else {
664            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
665                Ok(mut generic) => {
666                    generic.nsid = R::NSID;
667                    generic.method = ""; // method info only available on request
668                    generic.http_status = self.status;
669                    match generic.error.as_str() {
670                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
671                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
672                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
673                    }
674                }
675                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
676            }
677        }
678    }
679
680    /// Parse this as validated, loosely typed atproto data.
681    ///
682    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
683    pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
684        // 200: parse as output
685        if self.status.is_success() {
686            match serde_json::from_slice::<_>(&self.buffer) {
687                Ok(output) => Ok(output),
688                Err(_) => {
689                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
690                        if let Ok(data) = Data::from_cbor(&data) {
691                            Ok(data.into_static())
692                        } else {
693                            Ok(Data::Bytes(self.buffer.clone()))
694                        }
695                    } else {
696                        Ok(Data::Bytes(self.buffer.clone()))
697                    }
698                }
699            }
700        // 400: try typed XRPC error, fallback to generic error
701        } else if self.status.as_u16() == 400 {
702            match serde_json::from_slice::<_>(&self.buffer) {
703                Ok(error) => Err(XrpcError::Xrpc(error)),
704                Err(_) => {
705                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
706                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
707                        Ok(mut generic) => {
708                            generic.nsid = R::NSID;
709                            generic.method = ""; // method info only available on request
710                            generic.http_status = self.status;
711                            // Map auth-related errors to AuthError
712                            match generic.error.as_str() {
713                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
714                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
715                                _ => Err(XrpcError::Generic(generic)),
716                            }
717                        }
718                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
719                    }
720                }
721            }
722        // 401: always auth error
723        } else {
724            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
725                Ok(mut generic) => {
726                    generic.nsid = R::NSID;
727                    generic.method = ""; // method info only available on request
728                    generic.http_status = self.status;
729                    match generic.error.as_str() {
730                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
731                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
732                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
733                    }
734                }
735                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
736            }
737        }
738    }
739
740    /// Parse this as raw atproto data with minimal validation.
741    ///
742    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
743    pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
744        // 200: parse as output
745        if self.status.is_success() {
746            match serde_json::from_slice::<_>(&self.buffer) {
747                Ok(output) => Ok(output),
748                Err(_) => {
749                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
750                        if let Ok(data) = RawData::from_cbor(&data) {
751                            Ok(data.into_static())
752                        } else {
753                            Ok(RawData::Bytes(self.buffer.clone()))
754                        }
755                    } else {
756                        Ok(RawData::Bytes(self.buffer.clone()))
757                    }
758                }
759            }
760        // 400: try typed XRPC error, fallback to generic error
761        } else if self.status.as_u16() == 400 {
762            match serde_json::from_slice::<_>(&self.buffer) {
763                Ok(error) => Err(XrpcError::Xrpc(error)),
764                Err(_) => {
765                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
766                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
767                        Ok(mut generic) => {
768                            generic.nsid = R::NSID;
769                            generic.method = ""; // method info only available on request
770                            generic.http_status = self.status;
771                            // Map auth-related errors to AuthError
772                            match generic.error.as_str() {
773                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
774                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
775                                _ => Err(XrpcError::Generic(generic)),
776                            }
777                        }
778                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
779                    }
780                }
781            }
782        // 401: always auth error
783        } else {
784            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
785                Ok(mut generic) => {
786                    generic.nsid = R::NSID;
787                    generic.method = ""; // method info only available on request
788                    generic.http_status = self.status;
789                    match generic.error.as_str() {
790                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
791                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
792                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
793                    }
794                }
795                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
796            }
797        }
798    }
799
800    /// Reinterpret this response as a different response type.
801    ///
802    /// This transmutes the response by keeping the same buffer and status code,
803    /// but changing the type-level marker. Useful for converting generic XRPC responses
804    /// into collection-specific typed responses.
805    ///
806    /// # Safety
807    ///
808    /// This is safe in the sense that no memory unsafety occurs, but logical correctness
809    /// depends on ensuring the buffer actually contains data that can deserialize to `NEW`.
810    /// Incorrect conversion will cause deserialization errors at runtime.
811    pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
812        Response {
813            buffer: self.buffer,
814            status: self.status,
815            _marker: PhantomData,
816        }
817    }
818}
819
820/// doc
821pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
822/// doc
823pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
824
825impl<R> Response<R>
826where
827    R: XrpcResp,
828{
829    /// Parse the response into an owned output
830    pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
831    where
832        for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
833        for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
834    {
835        fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
836            serde_json::from_slice(buffer)
837        }
838
839        // 200: parse as output
840        if self.status.is_success() {
841            match R::decode_output(&self.buffer) {
842                Ok(output) => Ok(output.into_static()),
843                Err(e) => Err(XrpcError::Decode(e)),
844            }
845        // 400: try typed XRPC error, fallback to generic error
846        } else if self.status.as_u16() == 400 {
847            let error = match parse_error::<R>(&self.buffer) {
848                Ok(error) => XrpcError::Xrpc(error),
849                Err(_) => {
850                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
851                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
852                        Ok(mut generic) => {
853                            generic.nsid = R::NSID;
854                            generic.method = ""; // method info only available on request
855                            generic.http_status = self.status;
856                            // Map auth-related errors to AuthError
857                            match generic.error.as_ref() {
858                                "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
859                                "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
860                                _ => XrpcError::Generic(generic),
861                            }
862                        }
863                        Err(e) => XrpcError::Decode(DecodeError::Json(e)),
864                    }
865                }
866            };
867            Err(error.into_static())
868        // 401: always auth error
869        } else {
870            let error: XrpcError<<R as XrpcResp>::Err<'_>> =
871                match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
872                    Ok(mut generic) => {
873                        let status = self.status;
874                        generic.nsid = R::NSID;
875                        generic.method = ""; // method info only available on request
876                        generic.http_status = status;
877                        match generic.error.as_ref() {
878                            "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
879                            "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
880                            _ => XrpcError::Auth(AuthError::NotAuthenticated),
881                        }
882                    }
883                    Err(e) => XrpcError::Decode(DecodeError::Json(e)),
884                };
885
886            Err(error.into_static())
887        }
888    }
889}
890
891/// Generic XRPC error format for untyped errors like InvalidRequest
892///
893/// Used when the error doesn't match the endpoint's specific error enum
894#[derive(Debug, Clone, Deserialize, Serialize)]
895pub struct GenericXrpcError {
896    /// Error code (e.g., "InvalidRequest")
897    pub error: SmolStr,
898    /// Optional error message with details
899    pub message: Option<SmolStr>,
900    /// XRPC method NSID that produced this error (context only; not serialized)
901    #[serde(skip)]
902    pub nsid: &'static str,
903    /// HTTP method used (GET/POST) (context only; not serialized)
904    #[serde(skip)]
905    pub method: &'static str,
906    /// HTTP status code (context only; not serialized)
907    #[serde(skip)]
908    pub http_status: StatusCode,
909}
910
911impl std::fmt::Display for GenericXrpcError {
912    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
913        if let Some(msg) = &self.message {
914            write!(
915                f,
916                "{}: {} (nsid={}, method={}, status={})",
917                self.error, msg, self.nsid, self.method, self.http_status
918            )
919        } else {
920            write!(
921                f,
922                "{} (nsid={}, method={}, status={})",
923                self.error, self.nsid, self.method, self.http_status
924            )
925        }
926    }
927}
928
929impl IntoStatic for GenericXrpcError {
930    type Output = Self;
931
932    fn into_static(self) -> Self::Output {
933        self
934    }
935}
936
937impl std::error::Error for GenericXrpcError {}
938
939/// XRPC-specific errors returned from endpoints
940///
941/// Represents errors returned in the response body
942/// Type parameter `E` is the endpoint's specific error enum type.
943#[derive(Debug, thiserror::Error, miette::Diagnostic)]
944pub enum XrpcError<E: std::error::Error + IntoStatic> {
945    /// Typed XRPC error from the endpoint's specific error enum
946    #[error("XRPC error: {0}")]
947    #[diagnostic(code(jacquard_common::xrpc::typed))]
948    Xrpc(E),
949
950    /// Authentication error (ExpiredToken, InvalidToken, etc.)
951    #[error("Authentication error: {0}")]
952    #[diagnostic(code(jacquard_common::xrpc::auth))]
953    Auth(#[from] AuthError),
954
955    /// Generic XRPC error not in the endpoint's error enum (e.g., InvalidRequest)
956    #[error("XRPC error: {0}")]
957    #[diagnostic(code(jacquard_common::xrpc::generic))]
958    Generic(GenericXrpcError),
959
960    /// Failed to decode the response body
961    #[error("Failed to decode response: {0}")]
962    #[diagnostic(code(jacquard_common::xrpc::decode))]
963    Decode(#[from] DecodeError),
964}
965
966impl<E> IntoStatic for XrpcError<E>
967where
968    E: std::error::Error + IntoStatic,
969    E::Output: std::error::Error + IntoStatic,
970    <E as IntoStatic>::Output: std::error::Error + IntoStatic,
971{
972    type Output = XrpcError<E::Output>;
973    fn into_static(self) -> Self::Output {
974        match self {
975            XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
976            XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
977            XrpcError::Generic(e) => XrpcError::Generic(e),
978            XrpcError::Decode(e) => XrpcError::Decode(e),
979        }
980    }
981}
982
983#[cfg(feature = "streaming")]
984impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
985    /// Send an XRPC call and stream the binary response.
986    ///
987    /// Useful for downloading blobs and entire repository archives
988    pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
989    where
990        R: XrpcRequest,
991        <R as XrpcRequest>::Response: Send + Sync,
992    {
993        let http_request =
994            build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
995
996        let http_response = self
997            .client
998            .send_http_streaming(http_request)
999            .await
1000            .map_err(StreamError::transport)?;
1001        let (parts, body) = http_response.into_parts();
1002
1003        Ok(StreamingResponse::new(parts, body))
1004    }
1005
1006    /// Stream an XRPC procedure call and its response
1007    ///
1008    /// Useful for streaming upload of large payloads, or for "pipe-through" operations
1009    /// where you are processing a large payload.
1010    pub async fn stream<S>(
1011        self,
1012        stream: XrpcProcedureSend<S::Frame<'static>>,
1013    ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1014    where
1015        S: XrpcProcedureStream + 'static,
1016        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1017    {
1018        use futures::TryStreamExt;
1019        use n0_future::StreamExt;
1020
1021        let mut url = self.base;
1022        let mut path = url.path().trim_end_matches('/').to_owned();
1023        path.push_str("/xrpc/");
1024        path.push_str(<S::Request as XrpcRequest>::NSID);
1025        url.set_path(&path);
1026
1027        let mut builder = http::Request::post(url.to_string());
1028
1029        if let Some(token) = &self.opts.auth {
1030            let hv = match token {
1031                AuthorizationToken::Bearer(t) => {
1032                    HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1033                }
1034                AuthorizationToken::Dpop(t) => {
1035                    HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1036                }
1037            }
1038            .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1039            builder = builder.header(Header::Authorization, hv);
1040        }
1041
1042        if let Some(proxy) = &self.opts.atproto_proxy {
1043            builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1044        }
1045        if let Some(labelers) = &self.opts.atproto_accept_labelers {
1046            if !labelers.is_empty() {
1047                let joined = labelers
1048                    .iter()
1049                    .map(|s| s.as_ref())
1050                    .collect::<Vec<_>>()
1051                    .join(", ");
1052                builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1053            }
1054        }
1055        for (name, value) in &self.opts.extra_headers {
1056            builder = builder.header(name, value);
1057        }
1058
1059        let (parts, _) = builder
1060            .body(())
1061            .map_err(|e| StreamError::protocol(e.to_string()))?
1062            .into_parts();
1063
1064        let body_stream = stream.0.map_ok(|f| f.buffer).boxed();
1065
1066        let resp = self
1067            .client
1068            .send_http_bidirectional(parts, body_stream)
1069            .await
1070            .map_err(StreamError::transport)?;
1071
1072        let (parts, body) = resp.into_parts();
1073
1074        Ok(XrpcResponseStream::<
1075            <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1076        >::from_typed_parts(parts, body))
1077    }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083    use serde::{Deserialize, Serialize};
1084
1085    #[derive(Serialize, Deserialize)]
1086    #[allow(dead_code)]
1087    struct DummyReq;
1088
1089    #[derive(Deserialize, Debug, thiserror::Error)]
1090    #[error("{0}")]
1091    struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1092
1093    impl IntoStatic for DummyErr<'_> {
1094        type Output = DummyErr<'static>;
1095        fn into_static(self) -> Self::Output {
1096            DummyErr(self.0.into_static())
1097        }
1098    }
1099
1100    struct DummyResp;
1101
1102    impl XrpcResp for DummyResp {
1103        const NSID: &'static str = "test.dummy";
1104        const ENCODING: &'static str = "application/json";
1105        type Output<'de> = ();
1106        type Err<'de> = DummyErr<'de>;
1107    }
1108
1109    impl XrpcRequest for DummyReq {
1110        const NSID: &'static str = "test.dummy";
1111        const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1112        type Response = DummyResp;
1113    }
1114
1115    #[test]
1116    fn generic_error_carries_context() {
1117        let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1118        let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1119        let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1120        match resp.parse().unwrap_err() {
1121            XrpcError::Generic(g) => {
1122                assert_eq!(g.error.as_str(), "InvalidRequest");
1123                assert_eq!(g.message.as_deref(), Some("missing"));
1124                assert_eq!(g.nsid, DummyResp::NSID);
1125                assert_eq!(g.method, ""); // method info only on request
1126                assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1127            }
1128            other => panic!("unexpected: {other:?}"),
1129        }
1130    }
1131
1132    #[test]
1133    fn auth_error_mapping() {
1134        for (code, expect) in [
1135            ("ExpiredToken", AuthError::TokenExpired),
1136            ("InvalidToken", AuthError::InvalidToken),
1137        ] {
1138            let body = serde_json::json!({"error": code});
1139            let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1140            let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1141            match resp.parse().unwrap_err() {
1142                XrpcError::Auth(e) => match (e, expect) {
1143                    (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1144                    (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1145                    other => panic!("mismatch: {other:?}"),
1146                },
1147                other => panic!("unexpected: {other:?}"),
1148            }
1149        }
1150    }
1151
1152    #[test]
1153    fn no_double_slash_in_path() {
1154        #[derive(Serialize, Deserialize)]
1155        struct Req;
1156        #[derive(Deserialize, Debug, thiserror::Error)]
1157        #[error("{0}")]
1158        struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1159        impl IntoStatic for Err<'_> {
1160            type Output = Err<'static>;
1161            fn into_static(self) -> Self::Output {
1162                Err(self.0.into_static())
1163            }
1164        }
1165        struct Resp;
1166        impl XrpcResp for Resp {
1167            const NSID: &'static str = "com.example.test";
1168            const ENCODING: &'static str = "application/json";
1169            type Output<'de> = ();
1170            type Err<'de> = Err<'de>;
1171        }
1172        impl XrpcRequest for Req {
1173            const NSID: &'static str = "com.example.test";
1174            const METHOD: XrpcMethod = XrpcMethod::Query;
1175            type Response = Resp;
1176        }
1177
1178        let opts = CallOptions::default();
1179        for base in [
1180            Url::parse("https://pds").unwrap(),
1181            Url::parse("https://pds/").unwrap(),
1182            Url::parse("https://pds/base/").unwrap(),
1183        ] {
1184            let req = build_http_request(&base, &Req, &opts).unwrap();
1185            let uri = req.uri().to_string();
1186            assert!(uri.contains("/xrpc/com.example.test"));
1187            assert!(!uri.contains("//xrpc"));
1188        }
1189    }
1190}