jacquard_common/
xrpc.rs

1//! # Stateless XRPC utilities and request/response mapping
2//!
3//! Mapping overview:
4//! - Success (2xx): parse body into the endpoint's typed output.
5//! - 400: try typed error; on failure, fall back to a generic XRPC error (with
6//!   `nsid`, `method`, and `http_status`) and map common auth errors.
7//! - 401: if `WWW-Authenticate` is present, return
8//!   `ClientError::Auth(AuthError::Other(header))` so higher layers (OAuth/DPoP)
9//!   can inspect `error="invalid_token"` or `error="use_dpop_nonce"` and refresh/retry.
10//!   If the header is absent, parse the body and map auth errors to
11//!   `AuthError::TokenExpired`/`InvalidToken`.
12
13#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19    StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::error::DecodeError;
28use crate::http_client::HttpClient;
29#[cfg(feature = "streaming")]
30use crate::http_client::HttpClientExt;
31use crate::types::value::Data;
32use crate::{AuthorizationToken, error::AuthError};
33use crate::{CowStr, error::XrpcResult};
34use crate::{IntoStatic, types::value::RawData};
35use bytes::Bytes;
36use http::{
37    HeaderName, HeaderValue, Request, StatusCode,
38    header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46    BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47    SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48    SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52/// Error type for encoding XRPC requests
53#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55    /// Failed to serialize query parameters
56    #[error("Failed to serialize query: {0}")]
57    Query(
58        #[from]
59        #[source]
60        serde_html_form::ser::Error,
61    ),
62    /// Failed to serialize JSON body
63    #[error("Failed to serialize JSON: {0}")]
64    Json(
65        #[from]
66        #[source]
67        serde_json::Error,
68    ),
69    /// Other encoding error
70    #[error("Encoding error: {0}")]
71    Other(String),
72}
73
74/// XRPC method type
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77    /// Query (HTTP GET)
78    Query,
79    /// Procedure (HTTP POST)
80    Procedure(&'static str),
81}
82
83impl XrpcMethod {
84    /// Get the HTTP method string
85    pub const fn as_str(&self) -> &'static str {
86        match self {
87            Self::Query => "GET",
88            Self::Procedure(_) => "POST",
89        }
90    }
91
92    /// Get the body encoding type for this method (procedures only)
93    pub const fn body_encoding(&self) -> Option<&'static str> {
94        match self {
95            Self::Query => None,
96            Self::Procedure(enc) => Some(enc),
97        }
98    }
99}
100
101/// Trait for XRPC request types (queries and procedures)
102///
103/// This trait provides metadata about XRPC endpoints including the NSID,
104/// HTTP method, encoding, and associated output type.
105///
106/// The trait is implemented on the request parameters/input type itself.
107pub trait XrpcRequest: Serialize {
108    /// The NSID for this XRPC method
109    const NSID: &'static str;
110
111    /// XRPC method (query/GET or procedure/POST)
112    const METHOD: XrpcMethod;
113
114    /// Response type returned from the XRPC call (marker struct)
115    type Response: XrpcResp;
116
117    /// Encode the request body for procedures.
118    ///
119    /// Default implementation serializes to JSON. Override for non-JSON encodings.
120    fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121        Ok(serde_json::to_vec(self)?)
122    }
123
124    /// Decode the request body for procedures.
125    ///
126    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
127    fn decode_body<'de>(body: &'de [u8]) -> XrpcResult<Box<Self>>
128    where
129        Self: Deserialize<'de>,
130    {
131        let body: Self = serde_json::from_slice(body)
132            .map_err(|e| crate::error::ClientError::decode(format!("{:?}", e)))?;
133
134        Ok(Box::new(body))
135    }
136}
137
138/// Trait for XRPC Response types
139///
140/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types
141pub trait XrpcResp {
142    /// The NSID for this XRPC method
143    const NSID: &'static str;
144
145    /// Output encoding (MIME type)
146    const ENCODING: &'static str;
147
148    /// Response output type
149    type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
150
151    /// Error type for this request
152    type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic;
153
154    /// Output body encoding function, similar to the request-side type
155    fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
156        Ok(serde_json::to_vec(output)?)
157    }
158
159    /// Decode the response output body.
160    ///
161    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
162    fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError>
163    where
164        Self::Output<'de>: Deserialize<'de>,
165    {
166        #[allow(deprecated)]
167        let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
168
169        Ok(body)
170    }
171}
172
173/// XRPC server endpoint trait
174///
175/// Defines the fully-qualified path and method, as well as request and response types
176/// This exists primarily to work around lifetime issues for crates like Axum
177/// by moving the lifetime from the trait itself into an associated type.
178///
179/// It is implemented by the code generation on a marker struct, like the client-side [XrpcResp] trait.
180pub trait XrpcEndpoint {
181    /// Fully-qualified path ('/xrpc/\[nsid\]') where this endpoint should live on the server
182    const PATH: &'static str;
183    /// XRPC method (query/GET or procedure/POST)
184    const METHOD: XrpcMethod;
185    /// XRPC Request data type
186    type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
187    /// XRPC Response data type
188    type Response: XrpcResp;
189}
190
191/// Error type for XRPC endpoints that don't define any errors
192#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
193pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
194
195impl<'de> fmt::Display for GenericError<'de> {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        self.0.fmt(f)
198    }
199}
200
201impl Error for GenericError<'_> {}
202
203impl IntoStatic for GenericError<'_> {
204    type Output = GenericError<'static>;
205    fn into_static(self) -> Self::Output {
206        GenericError(self.0.into_static())
207    }
208}
209
210/// Per-request options for XRPC calls.
211#[derive(Debug, Default, Clone)]
212pub struct CallOptions<'a> {
213    /// Optional Authorization to apply (`Bearer` or `DPoP`).
214    pub auth: Option<AuthorizationToken<'a>>,
215    /// `atproto-proxy` header value.
216    pub atproto_proxy: Option<CowStr<'a>>,
217    /// `atproto-accept-labelers` header values.
218    pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
219    /// Extra headers to attach to this request.
220    pub extra_headers: Vec<(HeaderName, HeaderValue)>,
221}
222
223impl IntoStatic for CallOptions<'_> {
224    type Output = CallOptions<'static>;
225
226    fn into_static(self) -> Self::Output {
227        CallOptions {
228            auth: self.auth.map(|auth| auth.into_static()),
229            atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
230            atproto_accept_labelers: self
231                .atproto_accept_labelers
232                .map(|labelers| labelers.into_static()),
233            extra_headers: self.extra_headers,
234        }
235    }
236}
237
238/// Extension for stateless XRPC calls on any `HttpClient`.
239///
240/// Example
241/// ```no_run
242/// # #[tokio::main]
243/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
244/// use jacquard_common::xrpc::XrpcExt;
245/// use jacquard_common::http_client::HttpClient;
246///
247/// let http = reqwest::Client::new();
248/// let base = url::Url::parse("https://public.api.bsky.app")?;
249/// // let resp = http.xrpc(base).send(&request).await?;
250/// # Ok(())
251/// # }
252/// ```
253pub trait XrpcExt: HttpClient {
254    /// Start building an XRPC call for the given base URL.
255    fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
256    where
257        Self: Sized,
258    {
259        XrpcCall {
260            client: self,
261            base,
262            opts: CallOptions::default(),
263        }
264    }
265}
266
267impl<T: HttpClient> XrpcExt for T {}
268
269/// Nicer alias for Xrpc response type
270pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
271
272/// Stateful XRPC call trait
273#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
274pub trait XrpcClient: HttpClient {
275    /// Get the base URI for the client.
276    fn base_uri(&self) -> impl Future<Output = Url>;
277
278    /// Get the call options for the client.
279    fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
280        async { CallOptions::default() }
281    }
282
283    /// Send an XRPC request and parse the response
284    #[cfg(not(target_arch = "wasm32"))]
285    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
286    where
287        R: XrpcRequest + Send + Sync,
288        <R as XrpcRequest>::Response: Send + Sync,
289        Self: Sync;
290
291    /// Send an XRPC request and parse the response
292    #[cfg(target_arch = "wasm32")]
293    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
294    where
295        R: XrpcRequest + Send + Sync,
296        <R as XrpcRequest>::Response: Send + Sync;
297
298    /// Send an XRPC request and parse the response
299    #[cfg(not(target_arch = "wasm32"))]
300    fn send_with_opts<R>(
301        &self,
302        request: R,
303        opts: CallOptions<'_>,
304    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
305    where
306        R: XrpcRequest + Send + Sync,
307        <R as XrpcRequest>::Response: Send + Sync,
308        Self: Sync;
309
310    /// Send an XRPC request with custom options and parse the response
311    #[cfg(target_arch = "wasm32")]
312    fn send_with_opts<R>(
313        &self,
314        request: R,
315        opts: CallOptions<'_>,
316    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
317    where
318        R: XrpcRequest + Send + Sync,
319        <R as XrpcRequest>::Response: Send + Sync;
320}
321
322/// Stateful XRPC streaming client trait
323#[cfg(feature = "streaming")]
324pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
325    /// Send an XRPC request and stream the response
326    #[cfg(not(target_arch = "wasm32"))]
327    fn download<R>(
328        &self,
329        request: R,
330    ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
331    where
332        R: XrpcRequest + Send + Sync,
333        <R as XrpcRequest>::Response: Send + Sync,
334        Self: Sync;
335
336    /// Send an XRPC request and stream the response
337    #[cfg(target_arch = "wasm32")]
338    fn download<R>(
339        &self,
340        request: R,
341    ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
342    where
343        R: XrpcRequest + Send + Sync,
344        <R as XrpcRequest>::Response: Send + Sync;
345
346    /// Stream an XRPC procedure call and its response
347    #[cfg(not(target_arch = "wasm32"))]
348    fn stream<S>(
349        &self,
350        stream: XrpcProcedureSend<S::Frame<'static>>,
351    ) -> impl Future<
352        Output = Result<
353            XrpcResponseStream<
354                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
355            >,
356            StreamError,
357        >,
358    >
359    where
360        S: XrpcProcedureStream + 'static,
361        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
362        Self: Sync;
363
364    /// Stream an XRPC procedure call and its response
365    #[cfg(target_arch = "wasm32")]
366    fn stream<S>(
367        &self,
368        stream: XrpcProcedureSend<S::Frame<'static>>,
369    ) -> impl Future<
370        Output = Result<
371            XrpcResponseStream<
372                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
373            >,
374            StreamError,
375        >,
376    >
377    where
378        S: XrpcProcedureStream + 'static,
379        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
380}
381
382/// Stateless XRPC call builder.
383///
384/// Example (per-request overrides)
385/// ```no_run
386/// # #[tokio::main]
387/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
388/// use jacquard_common::xrpc::XrpcExt;
389/// use jacquard_common::{AuthorizationToken, CowStr};
390///
391/// let http = reqwest::Client::new();
392/// let base = url::Url::parse("https://public.api.bsky.app")?;
393/// let call = http
394///     .xrpc(base)
395///     .auth(AuthorizationToken::Bearer(CowStr::from("ACCESS_JWT")))
396///     .accept_labelers(vec![CowStr::from("did:plc:labelerid")])
397///     .header(http::header::USER_AGENT, http::HeaderValue::from_static("jacquard-example"));
398/// // let resp = call.send(&request).await?;
399/// # Ok(())
400/// # }
401/// ```
402pub struct XrpcCall<'a, C: HttpClient> {
403    pub(crate) client: &'a C,
404    pub(crate) base: Url,
405    pub(crate) opts: CallOptions<'a>,
406}
407
408impl<'a, C: HttpClient> XrpcCall<'a, C> {
409    /// Apply Authorization to this call.
410    pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
411        self.opts.auth = Some(token);
412        self
413    }
414    /// Set `atproto-proxy` header for this call.
415    pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
416        self.opts.atproto_proxy = Some(proxy);
417        self
418    }
419    /// Set `atproto-accept-labelers` header(s) for this call.
420    pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
421        self.opts.atproto_accept_labelers = Some(labelers);
422        self
423    }
424    /// Add an extra header.
425    pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
426        self.opts.extra_headers.push((name, value));
427        self
428    }
429    /// Replace the builder's options entirely.
430    pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
431        self.opts = opts;
432        self
433    }
434
435    /// Send the given typed XRPC request and return a response wrapper.
436    ///
437    /// Note on 401 handling:
438    /// - When the server returns 401 with a `WWW-Authenticate` header, this surfaces as
439    ///   `ClientError::Auth(AuthError::Other(header))` so higher layers (e.g., OAuth/DPoP) can
440    ///   inspect the header for `error="invalid_token"` or `error="use_dpop_nonce"` and react
441    ///   (refresh/retry). If the header is absent, the 401 body flows through to `Response` and
442    ///   can be parsed/mapped to `AuthError` as appropriate.
443    #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
444    pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
445    where
446        R: XrpcRequest,
447        <R as XrpcRequest>::Response: Send + Sync,
448    {
449        let http_request = build_http_request(&self.base, request, &self.opts)?;
450
451        let http_response = self
452            .client
453            .send_http(http_request)
454            .await
455            .map_err(|e| crate::error::ClientError::transport(e))?;
456
457        process_response(http_response)
458    }
459}
460
461/// Process the HTTP response from the server into a proper xrpc response statelessly.
462///
463/// Exposed to make things more easily pluggable
464#[inline]
465pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
466where
467    Resp: XrpcResp,
468{
469    let status = http_response.status();
470    // If the server returned 401 with a WWW-Authenticate header, expose it so higher layers
471    // (e.g., DPoP handling) can detect `error="invalid_token"` and trigger refresh.
472    #[allow(deprecated)]
473    if status.as_u16() == 401 {
474        if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
475            return Err(crate::error::ClientError::auth(
476                crate::error::AuthError::Other(hv.clone()),
477            ));
478        }
479    }
480    let buffer = Bytes::from(http_response.into_body());
481
482    if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
483        return Err(crate::error::HttpError {
484            status,
485            body: Some(buffer),
486        }
487        .into());
488    }
489
490    Ok(Response::new(buffer, status))
491}
492
493/// HTTP headers commonly used in XRPC requests
494pub enum Header {
495    /// Content-Type header
496    ContentType,
497    /// Authorization header
498    Authorization,
499    /// `atproto-proxy` header - specifies which service (app server or other atproto service) the user's PDS should forward requests to as appropriate.
500    ///
501    /// See: <https://atproto.com/specs/xrpc#service-proxying>
502    AtprotoProxy,
503    /// `atproto-accept-labelers` header used by clients to request labels from specific labelers to be included and applied in the response. See [label](https://atproto.com/specs/label) specification for details.
504    AtprotoAcceptLabelers,
505}
506
507impl From<Header> for HeaderName {
508    fn from(value: Header) -> Self {
509        match value {
510            Header::ContentType => CONTENT_TYPE,
511            Header::Authorization => AUTHORIZATION,
512            Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
513            Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
514        }
515    }
516}
517
518/// Build an HTTP request for an XRPC call given base URL and options
519pub fn build_http_request<'s, R>(
520    base: &Url,
521    req: &R,
522    opts: &CallOptions<'_>,
523) -> XrpcResult<Request<Vec<u8>>>
524where
525    R: XrpcRequest,
526{
527    use crate::error::ClientError;
528
529    let mut url = base.clone();
530    let mut path = url.path().trim_end_matches('/').to_owned();
531    path.push_str("/xrpc/");
532    path.push_str(<R as XrpcRequest>::NSID);
533    url.set_path(&path);
534
535    if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
536        let qs = serde_html_form::to_string(&req).map_err(|e| {
537            ClientError::invalid_request(format!("Failed to serialize query: {}", e))
538        })?;
539        if !qs.is_empty() {
540            url.set_query(Some(&qs));
541        } else {
542            url.set_query(None);
543        }
544    }
545
546    let method = match <R as XrpcRequest>::METHOD {
547        XrpcMethod::Query => http::Method::GET,
548        XrpcMethod::Procedure(_) => http::Method::POST,
549    };
550
551    let mut builder = Request::builder().method(method).uri(url.as_str());
552
553    if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
554        builder = builder.header(Header::ContentType, encoding);
555    }
556    let output_encoding = <R::Response as XrpcResp>::ENCODING;
557    builder = builder.header(http::header::ACCEPT, output_encoding);
558
559    if let Some(token) = &opts.auth {
560        let hv = match token {
561            AuthorizationToken::Bearer(t) => {
562                HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
563            }
564            AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
565        }
566        .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?;
567        builder = builder.header(Header::Authorization, hv);
568    }
569
570    if let Some(proxy) = &opts.atproto_proxy {
571        builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
572    }
573    if let Some(labelers) = &opts.atproto_accept_labelers {
574        if !labelers.is_empty() {
575            let joined = labelers
576                .iter()
577                .map(|s| s.as_ref())
578                .collect::<Vec<_>>()
579                .join(", ");
580            builder = builder.header(Header::AtprotoAcceptLabelers, joined);
581        }
582    }
583    for (name, value) in &opts.extra_headers {
584        builder = builder.header(name, value);
585    }
586
587    let body = if let XrpcMethod::Procedure(_) = R::METHOD {
588        req.encode_body()
589            .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))?
590    } else {
591        vec![]
592    };
593
594    builder
595        .body(body)
596        .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e)))
597}
598
599/// XRPC response wrapper that owns the response buffer
600///
601/// Allows borrowing from the buffer when parsing to avoid unnecessary allocations.
602/// Generic over the response marker type (e.g., `GetAuthorFeedResponse`), not the request.
603pub struct Response<Resp>
604where
605    Resp: XrpcResp, // HRTB: Resp works with any lifetime
606{
607    _marker: PhantomData<fn() -> Resp>,
608    buffer: Bytes,
609    status: StatusCode,
610}
611
612impl<R> Response<R>
613where
614    R: XrpcResp,
615{
616    /// Create a new response from a buffer and status code
617    pub fn new(buffer: Bytes, status: StatusCode) -> Self {
618        Self {
619            buffer,
620            status,
621            _marker: PhantomData,
622        }
623    }
624
625    /// Get the HTTP status code
626    pub fn status(&self) -> StatusCode {
627        self.status
628    }
629
630    /// Get the raw buffer
631    pub fn buffer(&self) -> &Bytes {
632        &self.buffer
633    }
634
635    /// Parse the response, borrowing from the internal buffer
636    pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
637        // 200: parse as output
638        if self.status.is_success() {
639            match R::decode_output(&self.buffer) {
640                Ok(output) => Ok(output),
641                Err(e) => Err(XrpcError::Decode(e)),
642            }
643        // 400: try typed XRPC error, fallback to generic error
644        } else if self.status.as_u16() == 400 {
645            match serde_json::from_slice::<_>(&self.buffer) {
646                Ok(error) => Err(XrpcError::Xrpc(error)),
647                Err(_) => {
648                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
649                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
650                        Ok(mut generic) => {
651                            generic.nsid = R::NSID;
652                            generic.method = ""; // method info only available on request
653                            generic.http_status = self.status;
654                            // Map auth-related errors to AuthError
655                            match generic.error.as_str() {
656                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
657                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
658                                _ => Err(XrpcError::Generic(generic)),
659                            }
660                        }
661                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
662                    }
663                }
664            }
665        // 401: always auth error
666        } else {
667            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
668                Ok(mut generic) => {
669                    generic.nsid = R::NSID;
670                    generic.method = ""; // method info only available on request
671                    generic.http_status = self.status;
672                    match generic.error.as_str() {
673                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
674                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
675                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
676                    }
677                }
678                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
679            }
680        }
681    }
682
683    /// Parse this as validated, loosely typed atproto data.
684    ///
685    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
686    pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
687        // 200: parse as output
688        if self.status.is_success() {
689            match serde_json::from_slice::<_>(&self.buffer) {
690                Ok(output) => Ok(output),
691                Err(_) => {
692                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
693                        if let Ok(data) = Data::from_cbor(&data) {
694                            Ok(data.into_static())
695                        } else {
696                            Ok(Data::Bytes(self.buffer.clone()))
697                        }
698                    } else {
699                        Ok(Data::Bytes(self.buffer.clone()))
700                    }
701                }
702            }
703        // 400: try typed XRPC error, fallback to generic error
704        } else if self.status.as_u16() == 400 {
705            match serde_json::from_slice::<_>(&self.buffer) {
706                Ok(error) => Err(XrpcError::Xrpc(error)),
707                Err(_) => {
708                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
709                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
710                        Ok(mut generic) => {
711                            generic.nsid = R::NSID;
712                            generic.method = ""; // method info only available on request
713                            generic.http_status = self.status;
714                            // Map auth-related errors to AuthError
715                            match generic.error.as_str() {
716                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
717                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
718                                _ => Err(XrpcError::Generic(generic)),
719                            }
720                        }
721                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
722                    }
723                }
724            }
725        // 401: always auth error
726        } else {
727            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
728                Ok(mut generic) => {
729                    generic.nsid = R::NSID;
730                    generic.method = ""; // method info only available on request
731                    generic.http_status = self.status;
732                    match generic.error.as_str() {
733                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
734                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
735                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
736                    }
737                }
738                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
739            }
740        }
741    }
742
743    /// Parse this as raw atproto data with minimal validation.
744    ///
745    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
746    pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
747        // 200: parse as output
748        if self.status.is_success() {
749            match serde_json::from_slice::<_>(&self.buffer) {
750                Ok(output) => Ok(output),
751                Err(_) => {
752                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
753                        if let Ok(data) = RawData::from_cbor(&data) {
754                            Ok(data.into_static())
755                        } else {
756                            Ok(RawData::Bytes(self.buffer.clone()))
757                        }
758                    } else {
759                        Ok(RawData::Bytes(self.buffer.clone()))
760                    }
761                }
762            }
763        // 400: try typed XRPC error, fallback to generic error
764        } else if self.status.as_u16() == 400 {
765            match serde_json::from_slice::<_>(&self.buffer) {
766                Ok(error) => Err(XrpcError::Xrpc(error)),
767                Err(_) => {
768                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
769                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
770                        Ok(mut generic) => {
771                            generic.nsid = R::NSID;
772                            generic.method = ""; // method info only available on request
773                            generic.http_status = self.status;
774                            // Map auth-related errors to AuthError
775                            match generic.error.as_str() {
776                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
777                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
778                                _ => Err(XrpcError::Generic(generic)),
779                            }
780                        }
781                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
782                    }
783                }
784            }
785        // 401: always auth error
786        } else {
787            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
788                Ok(mut generic) => {
789                    generic.nsid = R::NSID;
790                    generic.method = ""; // method info only available on request
791                    generic.http_status = self.status;
792                    match generic.error.as_str() {
793                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
794                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
795                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
796                    }
797                }
798                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
799            }
800        }
801    }
802
803    /// Reinterpret this response as a different response type.
804    ///
805    /// This transmutes the response by keeping the same buffer and status code,
806    /// but changing the type-level marker. Useful for converting generic XRPC responses
807    /// into collection-specific typed responses.
808    ///
809    /// # Safety
810    ///
811    /// This is safe in the sense that no memory unsafety occurs, but logical correctness
812    /// depends on ensuring the buffer actually contains data that can deserialize to `NEW`.
813    /// Incorrect conversion will cause deserialization errors at runtime.
814    pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
815        Response {
816            buffer: self.buffer,
817            status: self.status,
818            _marker: PhantomData,
819        }
820    }
821}
822
823/// doc
824pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
825/// doc
826pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
827
828impl<R> Response<R>
829where
830    R: XrpcResp,
831{
832    /// Parse the response into an owned output
833    pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
834    where
835        for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
836        for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
837    {
838        fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
839            serde_json::from_slice(buffer)
840        }
841
842        // 200: parse as output
843        if self.status.is_success() {
844            match R::decode_output(&self.buffer) {
845                Ok(output) => Ok(output.into_static()),
846                Err(e) => Err(XrpcError::Decode(e)),
847            }
848        // 400: try typed XRPC error, fallback to generic error
849        } else if self.status.as_u16() == 400 {
850            let error = match parse_error::<R>(&self.buffer) {
851                Ok(error) => XrpcError::Xrpc(error),
852                Err(_) => {
853                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
854                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
855                        Ok(mut generic) => {
856                            generic.nsid = R::NSID;
857                            generic.method = ""; // method info only available on request
858                            generic.http_status = self.status;
859                            // Map auth-related errors to AuthError
860                            match generic.error.as_ref() {
861                                "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
862                                "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
863                                _ => XrpcError::Generic(generic),
864                            }
865                        }
866                        Err(e) => XrpcError::Decode(DecodeError::Json(e)),
867                    }
868                }
869            };
870            Err(error.into_static())
871        // 401: always auth error
872        } else {
873            let error: XrpcError<<R as XrpcResp>::Err<'_>> =
874                match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
875                    Ok(mut generic) => {
876                        let status = self.status;
877                        generic.nsid = R::NSID;
878                        generic.method = ""; // method info only available on request
879                        generic.http_status = status;
880                        match generic.error.as_ref() {
881                            "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
882                            "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
883                            _ => XrpcError::Auth(AuthError::NotAuthenticated),
884                        }
885                    }
886                    Err(e) => XrpcError::Decode(DecodeError::Json(e)),
887                };
888
889            Err(error.into_static())
890        }
891    }
892}
893
894/// Generic XRPC error format for untyped errors like InvalidRequest
895///
896/// Used when the error doesn't match the endpoint's specific error enum
897#[derive(Debug, Clone, Deserialize, Serialize)]
898pub struct GenericXrpcError {
899    /// Error code (e.g., "InvalidRequest")
900    pub error: SmolStr,
901    /// Optional error message with details
902    pub message: Option<SmolStr>,
903    /// XRPC method NSID that produced this error (context only; not serialized)
904    #[serde(skip)]
905    pub nsid: &'static str,
906    /// HTTP method used (GET/POST) (context only; not serialized)
907    #[serde(skip)]
908    pub method: &'static str,
909    /// HTTP status code (context only; not serialized)
910    #[serde(skip)]
911    pub http_status: StatusCode,
912}
913
914impl std::fmt::Display for GenericXrpcError {
915    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
916        if let Some(msg) = &self.message {
917            write!(
918                f,
919                "{}: {} (nsid={}, method={}, status={})",
920                self.error, msg, self.nsid, self.method, self.http_status
921            )
922        } else {
923            write!(
924                f,
925                "{} (nsid={}, method={}, status={})",
926                self.error, self.nsid, self.method, self.http_status
927            )
928        }
929    }
930}
931
932impl IntoStatic for GenericXrpcError {
933    type Output = Self;
934
935    fn into_static(self) -> Self::Output {
936        self
937    }
938}
939
940impl std::error::Error for GenericXrpcError {}
941
942/// XRPC-specific errors returned from endpoints
943///
944/// Represents errors returned in the response body
945/// Type parameter `E` is the endpoint's specific error enum type.
946#[derive(Debug, thiserror::Error, miette::Diagnostic)]
947pub enum XrpcError<E: std::error::Error + IntoStatic> {
948    /// Typed XRPC error from the endpoint's specific error enum
949    #[error("XRPC error: {0}")]
950    #[diagnostic(code(jacquard_common::xrpc::typed))]
951    Xrpc(E),
952
953    /// Authentication error (ExpiredToken, InvalidToken, etc.)
954    #[error("Authentication error: {0}")]
955    #[diagnostic(code(jacquard_common::xrpc::auth))]
956    Auth(#[from] AuthError),
957
958    /// Generic XRPC error not in the endpoint's error enum (e.g., InvalidRequest)
959    #[error("XRPC error: {0}")]
960    #[diagnostic(code(jacquard_common::xrpc::generic))]
961    Generic(GenericXrpcError),
962
963    /// Failed to decode the response body
964    #[error("Failed to decode response: {0}")]
965    #[diagnostic(code(jacquard_common::xrpc::decode))]
966    Decode(#[from] DecodeError),
967}
968
969impl<E> IntoStatic for XrpcError<E>
970where
971    E: std::error::Error + IntoStatic,
972    E::Output: std::error::Error + IntoStatic,
973    <E as IntoStatic>::Output: std::error::Error + IntoStatic,
974{
975    type Output = XrpcError<E::Output>;
976    fn into_static(self) -> Self::Output {
977        match self {
978            XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
979            XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
980            XrpcError::Generic(e) => XrpcError::Generic(e),
981            XrpcError::Decode(e) => XrpcError::Decode(e),
982        }
983    }
984}
985
986impl<E> Serialize for XrpcError<E>
987where
988    E: std::error::Error + IntoStatic + Serialize,
989{
990    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
991    where
992        S: serde::Serializer,
993    {
994        use serde::ser::SerializeStruct;
995
996        match self {
997            // Typed errors already serialize to correct atproto format
998            XrpcError::Xrpc(e) => e.serialize(serializer),
999            // Generic errors already have correct format
1000            XrpcError::Generic(g) => g.serialize(serializer),
1001            // Auth and Decode need manual mapping to {"error": "...", "message": ...}
1002            XrpcError::Auth(auth) => {
1003                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1004                let (error, message) = match auth {
1005                    AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")),
1006                    AuthError::InvalidToken => {
1007                        ("InvalidToken", Some("Access token is invalid or malformed"))
1008                    }
1009                    AuthError::RefreshFailed => {
1010                        ("RefreshFailed", Some("Token refresh request failed"))
1011                    }
1012                    AuthError::NotAuthenticated => (
1013                        "AuthenticationRequired",
1014                        Some("Request requires authentication but none was provided"),
1015                    ),
1016                    AuthError::Other(hv) => {
1017                        let msg = hv.to_str().unwrap_or("[non-utf8 header]");
1018                        ("AuthenticationError", Some(msg))
1019                    }
1020                };
1021                state.serialize_field("error", error)?;
1022                if let Some(msg) = message {
1023                    state.serialize_field("message", msg)?;
1024                }
1025                state.end()
1026            }
1027            XrpcError::Decode(decode_err) => {
1028                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1029                state.serialize_field("error", "ResponseDecodeError")?;
1030                // Convert DecodeError to string for message field
1031                let msg = format!("{:?}", decode_err);
1032                state.serialize_field("message", &msg)?;
1033                state.end()
1034            }
1035        }
1036    }
1037}
1038
1039#[cfg(feature = "streaming")]
1040impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
1041    /// Send an XRPC call and stream the binary response.
1042    ///
1043    /// Useful for downloading blobs and entire repository archives
1044    pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
1045    where
1046        R: XrpcRequest,
1047        <R as XrpcRequest>::Response: Send + Sync,
1048    {
1049        let http_request =
1050            build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
1051
1052        let http_response = self
1053            .client
1054            .send_http_streaming(http_request)
1055            .await
1056            .map_err(StreamError::transport)?;
1057        let (parts, body) = http_response.into_parts();
1058
1059        Ok(StreamingResponse::new(parts, body))
1060    }
1061
1062    /// Stream an XRPC procedure call and its response
1063    ///
1064    /// Useful for streaming upload of large payloads, or for "pipe-through" operations
1065    /// where you are processing a large payload.
1066    pub async fn stream<S>(
1067        self,
1068        stream: XrpcProcedureSend<S::Frame<'static>>,
1069    ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1070    where
1071        S: XrpcProcedureStream + 'static,
1072        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1073    {
1074        use futures::TryStreamExt;
1075
1076        let mut url = self.base;
1077        let mut path = url.path().trim_end_matches('/').to_owned();
1078        path.push_str("/xrpc/");
1079        path.push_str(<S::Request as XrpcRequest>::NSID);
1080        url.set_path(&path);
1081
1082        let mut builder = http::Request::post(url.to_string());
1083
1084        if let Some(token) = &self.opts.auth {
1085            let hv = match token {
1086                AuthorizationToken::Bearer(t) => {
1087                    HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1088                }
1089                AuthorizationToken::Dpop(t) => {
1090                    HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1091                }
1092            }
1093            .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1094            builder = builder.header(Header::Authorization, hv);
1095        }
1096
1097        if let Some(proxy) = &self.opts.atproto_proxy {
1098            builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1099        }
1100        if let Some(labelers) = &self.opts.atproto_accept_labelers {
1101            if !labelers.is_empty() {
1102                let joined = labelers
1103                    .iter()
1104                    .map(|s| s.as_ref())
1105                    .collect::<Vec<_>>()
1106                    .join(", ");
1107                builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1108            }
1109        }
1110        for (name, value) in &self.opts.extra_headers {
1111            builder = builder.header(name, value);
1112        }
1113
1114        let (parts, _) = builder
1115            .body(())
1116            .map_err(|e| StreamError::protocol(e.to_string()))?
1117            .into_parts();
1118
1119        let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer));
1120
1121        let resp = self
1122            .client
1123            .send_http_bidirectional(parts, body_stream)
1124            .await
1125            .map_err(StreamError::transport)?;
1126
1127        let (parts, body) = resp.into_parts();
1128
1129        Ok(XrpcResponseStream::<
1130            <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1131        >::from_typed_parts(parts, body))
1132    }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use super::*;
1138    use serde::{Deserialize, Serialize};
1139
1140    #[derive(Serialize, Deserialize)]
1141    #[allow(dead_code)]
1142    struct DummyReq;
1143
1144    #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1145    #[error("{0}")]
1146    struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1147
1148    impl IntoStatic for DummyErr<'_> {
1149        type Output = DummyErr<'static>;
1150        fn into_static(self) -> Self::Output {
1151            DummyErr(self.0.into_static())
1152        }
1153    }
1154
1155    struct DummyResp;
1156
1157    impl XrpcResp for DummyResp {
1158        const NSID: &'static str = "test.dummy";
1159        const ENCODING: &'static str = "application/json";
1160        type Output<'de> = ();
1161        type Err<'de> = DummyErr<'de>;
1162    }
1163
1164    impl XrpcRequest for DummyReq {
1165        const NSID: &'static str = "test.dummy";
1166        const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1167        type Response = DummyResp;
1168    }
1169
1170    #[test]
1171    fn generic_error_carries_context() {
1172        let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1173        let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1174        let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1175        match resp.parse().unwrap_err() {
1176            XrpcError::Generic(g) => {
1177                assert_eq!(g.error.as_str(), "InvalidRequest");
1178                assert_eq!(g.message.as_deref(), Some("missing"));
1179                assert_eq!(g.nsid, DummyResp::NSID);
1180                assert_eq!(g.method, ""); // method info only on request
1181                assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1182            }
1183            other => panic!("unexpected: {other:?}"),
1184        }
1185    }
1186
1187    #[test]
1188    fn auth_error_mapping() {
1189        for (code, expect) in [
1190            ("ExpiredToken", AuthError::TokenExpired),
1191            ("InvalidToken", AuthError::InvalidToken),
1192        ] {
1193            let body = serde_json::json!({"error": code});
1194            let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1195            let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1196            match resp.parse().unwrap_err() {
1197                XrpcError::Auth(e) => match (e, expect) {
1198                    (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1199                    (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1200                    other => panic!("mismatch: {other:?}"),
1201                },
1202                other => panic!("unexpected: {other:?}"),
1203            }
1204        }
1205    }
1206
1207    #[test]
1208    fn no_double_slash_in_path() {
1209        #[derive(Serialize, Deserialize)]
1210        struct Req;
1211        #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1212        #[error("{0}")]
1213        struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1214        impl IntoStatic for Err<'_> {
1215            type Output = Err<'static>;
1216            fn into_static(self) -> Self::Output {
1217                Err(self.0.into_static())
1218            }
1219        }
1220        struct Resp;
1221        impl XrpcResp for Resp {
1222            const NSID: &'static str = "com.example.test";
1223            const ENCODING: &'static str = "application/json";
1224            type Output<'de> = ();
1225            type Err<'de> = Err<'de>;
1226        }
1227        impl XrpcRequest for Req {
1228            const NSID: &'static str = "com.example.test";
1229            const METHOD: XrpcMethod = XrpcMethod::Query;
1230            type Response = Resp;
1231        }
1232
1233        let opts = CallOptions::default();
1234        for base in [
1235            Url::parse("https://pds").unwrap(),
1236            Url::parse("https://pds/").unwrap(),
1237            Url::parse("https://pds/base/").unwrap(),
1238        ] {
1239            let req = build_http_request(&base, &Req, &opts).unwrap();
1240            let uri = req.uri().to_string();
1241            assert!(uri.contains("/xrpc/com.example.test"));
1242            assert!(!uri.contains("//xrpc"));
1243        }
1244    }
1245}