jacquard_common/
xrpc.rs

1//! # Stateless XRPC utilities and request/response mapping
2//!
3//! Mapping overview:
4//! - Success (2xx): parse body into the endpoint's typed output.
5//! - 400: try typed error; on failure, fall back to a generic XRPC error (with
6//!   `nsid`, `method`, and `http_status`) and map common auth errors.
7//! - 401: if `WWW-Authenticate` is present, return
8//!   `ClientError::Auth(AuthError::Other(header))` so higher layers (OAuth/DPoP)
9//!   can inspect `error="invalid_token"` or `error="use_dpop_nonce"` and refresh/retry.
10//!   If the header is absent, parse the body and map auth errors to
11//!   `AuthError::TokenExpired`/`InvalidToken`.
12
13#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19    StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::error::DecodeError;
28use crate::http_client::HttpClient;
29#[cfg(feature = "streaming")]
30use crate::http_client::HttpClientExt;
31use crate::types::value::Data;
32use crate::{AuthorizationToken, error::AuthError};
33use crate::{CowStr, error::XrpcResult};
34use crate::{IntoStatic, types::value::RawData};
35use bytes::Bytes;
36use http::{
37    HeaderName, HeaderValue, Request, StatusCode,
38    header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46    BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47    SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48    SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52/// Error type for encoding XRPC requests
53#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55    /// Failed to serialize query parameters
56    #[error("Failed to serialize query: {0}")]
57    Query(
58        #[from]
59        #[source]
60        serde_html_form::ser::Error,
61    ),
62    /// Failed to serialize JSON body
63    #[error("Failed to serialize JSON: {0}")]
64    Json(
65        #[from]
66        #[source]
67        serde_json::Error,
68    ),
69    /// Other encoding error
70    #[error("Encoding error: {0}")]
71    Other(String),
72}
73
74/// XRPC method type
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77    /// Query (HTTP GET)
78    Query,
79    /// Procedure (HTTP POST)
80    Procedure(&'static str),
81}
82
83impl XrpcMethod {
84    /// Get the HTTP method string
85    pub const fn as_str(&self) -> &'static str {
86        match self {
87            Self::Query => "GET",
88            Self::Procedure(_) => "POST",
89        }
90    }
91
92    /// Get the body encoding type for this method (procedures only)
93    pub const fn body_encoding(&self) -> Option<&'static str> {
94        match self {
95            Self::Query => None,
96            Self::Procedure(enc) => Some(enc),
97        }
98    }
99}
100
101/// Trait for XRPC request types (queries and procedures)
102///
103/// This trait provides metadata about XRPC endpoints including the NSID,
104/// HTTP method, encoding, and associated output type.
105///
106/// The trait is implemented on the request parameters/input type itself.
107pub trait XrpcRequest: Serialize {
108    /// The NSID for this XRPC method
109    const NSID: &'static str;
110
111    /// XRPC method (query/GET or procedure/POST)
112    const METHOD: XrpcMethod;
113
114    /// Response type returned from the XRPC call (marker struct)
115    type Response: XrpcResp;
116
117    /// Encode the request body for procedures.
118    ///
119    /// Default implementation serializes to JSON. Override for non-JSON encodings.
120    fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121        Ok(serde_json::to_vec(self)?)
122    }
123
124    /// Decode the request body for procedures.
125    ///
126    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
127    fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError>
128    where
129        Self: Deserialize<'de>,
130    {
131        let body: Self = serde_json::from_slice(body)?;
132
133        Ok(Box::new(body))
134    }
135}
136
137/// Trait for XRPC Response types
138///
139/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types
140pub trait XrpcResp {
141    /// The NSID for this XRPC method
142    const NSID: &'static str;
143
144    /// Output encoding (MIME type)
145    const ENCODING: &'static str;
146
147    /// Response output type
148    type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
149
150    /// Error type for this request
151    type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic;
152
153    /// Output body encoding function, similar to the request-side type
154    fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
155        Ok(serde_json::to_vec(output)?)
156    }
157
158    /// Decode the response output body.
159    ///
160    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
161    fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError>
162    where
163        Self::Output<'de>: Deserialize<'de>,
164    {
165        let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
166        Ok(body)
167    }
168}
169
170/// XRPC server endpoint trait
171///
172/// Defines the fully-qualified path and method, as well as request and response types
173/// This exists primarily to work around lifetime issues for crates like Axum
174/// by moving the lifetime from the trait itself into an associated type.
175///
176/// It is implemented by the code generation on a marker struct, like the client-side [XrpcResp] trait.
177pub trait XrpcEndpoint {
178    /// Fully-qualified path ('/xrpc/\[nsid\]') where this endpoint should live on the server
179    const PATH: &'static str;
180    /// XRPC method (query/GET or procedure/POST)
181    const METHOD: XrpcMethod;
182    /// XRPC Request data type
183    type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
184    /// XRPC Response data type
185    type Response: XrpcResp;
186}
187
188/// Error type for XRPC endpoints that don't define any errors
189#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
191
192impl<'de> fmt::Display for GenericError<'de> {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        self.0.fmt(f)
195    }
196}
197
198impl Error for GenericError<'_> {}
199
200impl IntoStatic for GenericError<'_> {
201    type Output = GenericError<'static>;
202    fn into_static(self) -> Self::Output {
203        GenericError(self.0.into_static())
204    }
205}
206
207/// Per-request options for XRPC calls.
208#[derive(Debug, Default, Clone)]
209pub struct CallOptions<'a> {
210    /// Optional Authorization to apply (`Bearer` or `DPoP`).
211    pub auth: Option<AuthorizationToken<'a>>,
212    /// `atproto-proxy` header value.
213    pub atproto_proxy: Option<CowStr<'a>>,
214    /// `atproto-accept-labelers` header values.
215    pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
216    /// Extra headers to attach to this request.
217    pub extra_headers: Vec<(HeaderName, HeaderValue)>,
218}
219
220impl IntoStatic for CallOptions<'_> {
221    type Output = CallOptions<'static>;
222
223    fn into_static(self) -> Self::Output {
224        CallOptions {
225            auth: self.auth.map(|auth| auth.into_static()),
226            atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
227            atproto_accept_labelers: self
228                .atproto_accept_labelers
229                .map(|labelers| labelers.into_static()),
230            extra_headers: self.extra_headers,
231        }
232    }
233}
234
235/// Extension for stateless XRPC calls on any `HttpClient`.
236///
237/// Example
238/// ```no_run
239/// # #[tokio::main]
240/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
241/// use jacquard_common::xrpc::XrpcExt;
242/// use jacquard_common::http_client::HttpClient;
243///
244/// let http = reqwest::Client::new();
245/// let base = url::Url::parse("https://public.api.bsky.app")?;
246/// // let resp = http.xrpc(base).send(&request).await?;
247/// # Ok(())
248/// # }
249/// ```
250pub trait XrpcExt: HttpClient {
251    /// Start building an XRPC call for the given base URL.
252    fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
253    where
254        Self: Sized,
255    {
256        XrpcCall {
257            client: self,
258            base,
259            opts: CallOptions::default(),
260        }
261    }
262}
263
264impl<T: HttpClient> XrpcExt for T {}
265
266/// Nicer alias for Xrpc response type
267pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
268
269/// Stateful XRPC call trait
270#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
271pub trait XrpcClient: HttpClient {
272    /// Get the base URI for the client.
273    fn base_uri(&self) -> impl Future<Output = CowStr<'static>>;
274
275    /// Set the base URI for the client.
276    fn set_base_uri(&self, url: Url) -> impl Future<Output = ()> {
277        let _ = url;
278        async {}
279    }
280
281    /// Get the call options for the client.
282    fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
283        async { CallOptions::default() }
284    }
285
286    /// Set the call options for the client.
287    fn set_opts(&self, opts: CallOptions) -> impl Future<Output = ()> {
288        let _ = opts;
289        async {}
290    }
291
292    /// Send an XRPC request and parse the response
293    #[cfg(not(target_arch = "wasm32"))]
294    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
295    where
296        R: XrpcRequest + Send + Sync,
297        <R as XrpcRequest>::Response: Send + Sync,
298        Self: Sync;
299
300    /// Send an XRPC request and parse the response
301    #[cfg(target_arch = "wasm32")]
302    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
303    where
304        R: XrpcRequest + Send + Sync,
305        <R as XrpcRequest>::Response: Send + Sync;
306
307    /// Send an XRPC request and parse the response
308    #[cfg(not(target_arch = "wasm32"))]
309    fn send_with_opts<R>(
310        &self,
311        request: R,
312        opts: CallOptions<'_>,
313    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
314    where
315        R: XrpcRequest + Send + Sync,
316        <R as XrpcRequest>::Response: Send + Sync,
317        Self: Sync;
318
319    /// Send an XRPC request with custom options and parse the response
320    #[cfg(target_arch = "wasm32")]
321    fn send_with_opts<R>(
322        &self,
323        request: R,
324        opts: CallOptions<'_>,
325    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
326    where
327        R: XrpcRequest + Send + Sync,
328        <R as XrpcRequest>::Response: Send + Sync;
329}
330
331/// Stateful XRPC streaming client trait
332#[cfg(feature = "streaming")]
333pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
334    /// Send an XRPC request and stream the response
335    #[cfg(not(target_arch = "wasm32"))]
336    fn download<R>(
337        &self,
338        request: R,
339    ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
340    where
341        R: XrpcRequest + Send + Sync,
342        <R as XrpcRequest>::Response: Send + Sync,
343        Self: Sync;
344
345    /// Send an XRPC request and stream the response
346    #[cfg(target_arch = "wasm32")]
347    fn download<R>(
348        &self,
349        request: R,
350    ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
351    where
352        R: XrpcRequest + Send + Sync,
353        <R as XrpcRequest>::Response: Send + Sync;
354
355    /// Stream an XRPC procedure call and its response
356    #[cfg(not(target_arch = "wasm32"))]
357    fn stream<S>(
358        &self,
359        stream: XrpcProcedureSend<S::Frame<'static>>,
360    ) -> impl Future<
361        Output = Result<
362            XrpcResponseStream<
363                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
364            >,
365            StreamError,
366        >,
367    >
368    where
369        S: XrpcProcedureStream + 'static,
370        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
371        Self: Sync;
372
373    /// Stream an XRPC procedure call and its response
374    #[cfg(target_arch = "wasm32")]
375    fn stream<S>(
376        &self,
377        stream: XrpcProcedureSend<S::Frame<'static>>,
378    ) -> impl Future<
379        Output = Result<
380            XrpcResponseStream<
381                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
382            >,
383            StreamError,
384        >,
385    >
386    where
387        S: XrpcProcedureStream + 'static,
388        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
389}
390
391/// Stateless XRPC call builder.
392///
393/// Example (per-request overrides)
394/// ```no_run
395/// # #[tokio::main]
396/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
397/// use jacquard_common::xrpc::XrpcExt;
398/// use jacquard_common::{AuthorizationToken, CowStr};
399///
400/// let http = reqwest::Client::new();
401/// let base = url::Url::parse("https://public.api.bsky.app")?;
402/// let call = http
403///     .xrpc(base)
404///     .auth(AuthorizationToken::Bearer(CowStr::from("ACCESS_JWT")))
405///     .accept_labelers(vec![CowStr::from("did:plc:labelerid")])
406///     .header(http::header::USER_AGENT, http::HeaderValue::from_static("jacquard-example"));
407/// // let resp = call.send(&request).await?;
408/// # Ok(())
409/// # }
410/// ```
411pub struct XrpcCall<'a, C: HttpClient> {
412    pub(crate) client: &'a C,
413    pub(crate) base: Url,
414    pub(crate) opts: CallOptions<'a>,
415}
416
417impl<'a, C: HttpClient> XrpcCall<'a, C> {
418    /// Apply Authorization to this call.
419    pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
420        self.opts.auth = Some(token);
421        self
422    }
423    /// Set `atproto-proxy` header for this call.
424    pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
425        self.opts.atproto_proxy = Some(proxy);
426        self
427    }
428    /// Set `atproto-accept-labelers` header(s) for this call.
429    pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
430        self.opts.atproto_accept_labelers = Some(labelers);
431        self
432    }
433    /// Add an extra header.
434    pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
435        self.opts.extra_headers.push((name, value));
436        self
437    }
438    /// Replace the builder's options entirely.
439    pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
440        self.opts = opts;
441        self
442    }
443
444    /// Send the given typed XRPC request and return a response wrapper.
445    ///
446    /// Note on 401 handling:
447    /// - When the server returns 401 with a `WWW-Authenticate` header, this surfaces as
448    ///   `ClientError::Auth(AuthError::Other(header))` so higher layers (e.g., OAuth/DPoP) can
449    ///   inspect the header for `error="invalid_token"` or `error="use_dpop_nonce"` and react
450    ///   (refresh/retry). If the header is absent, the 401 body flows through to `Response` and
451    ///   can be parsed/mapped to `AuthError` as appropriate.
452    #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
453    pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
454    where
455        R: XrpcRequest,
456        <R as XrpcRequest>::Response: Send + Sync,
457    {
458        let http_request = build_http_request(&self.base, request, &self.opts)?;
459
460        let http_response = self
461            .client
462            .send_http(http_request)
463            .await
464            .map_err(|e| crate::error::ClientError::transport(e))?;
465
466        process_response(http_response)
467    }
468}
469
470/// Process the HTTP response from the server into a proper xrpc response statelessly.
471///
472/// Exposed to make things more easily pluggable
473#[inline]
474pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
475where
476    Resp: XrpcResp,
477{
478    let status = http_response.status();
479    // If the server returned 401 with a WWW-Authenticate header, expose it so higher layers
480    // (e.g., DPoP handling) can detect `error="invalid_token"` and trigger refresh.
481    #[allow(deprecated)]
482    if status.as_u16() == 401 {
483        if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
484            return Err(crate::error::ClientError::auth(
485                crate::error::AuthError::Other(hv.clone()),
486            ));
487        }
488    }
489    let buffer = Bytes::from(http_response.into_body());
490
491    if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
492        return Err(crate::error::HttpError {
493            status,
494            body: Some(buffer),
495        }
496        .into());
497    }
498
499    Ok(Response::new(buffer, status))
500}
501
502/// HTTP headers commonly used in XRPC requests
503pub enum Header {
504    /// Content-Type header
505    ContentType,
506    /// Authorization header
507    Authorization,
508    /// `atproto-proxy` header - specifies which service (app server or other atproto service) the user's PDS should forward requests to as appropriate.
509    ///
510    /// See: <https://atproto.com/specs/xrpc#service-proxying>
511    AtprotoProxy,
512    /// `atproto-accept-labelers` header used by clients to request labels from specific labelers to be included and applied in the response. See [label](https://atproto.com/specs/label) specification for details.
513    AtprotoAcceptLabelers,
514}
515
516impl From<Header> for HeaderName {
517    fn from(value: Header) -> Self {
518        match value {
519            Header::ContentType => CONTENT_TYPE,
520            Header::Authorization => AUTHORIZATION,
521            Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
522            Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
523        }
524    }
525}
526
527/// Build an HTTP request for an XRPC call given base URL and options
528pub fn build_http_request<'s, R>(
529    base: &Url,
530    req: &R,
531    opts: &CallOptions<'_>,
532) -> XrpcResult<Request<Vec<u8>>>
533where
534    R: XrpcRequest,
535{
536    use crate::error::ClientError;
537
538    let mut url = base.clone();
539    let mut path = url.path().trim_end_matches('/').to_owned();
540    path.push_str("/xrpc/");
541    path.push_str(<R as XrpcRequest>::NSID);
542    url.set_path(&path);
543    // Check if extra_headers already contains Content-Type
544
545    if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
546        let qs = serde_html_form::to_string(&req).map_err(|e| {
547            ClientError::invalid_request(format!("Failed to serialize query: {}", e))
548        })?;
549        if !qs.is_empty() {
550            url.set_query(Some(&qs));
551        } else {
552            url.set_query(None);
553        }
554    }
555
556    let method = match <R as XrpcRequest>::METHOD {
557        XrpcMethod::Query => http::Method::GET,
558        XrpcMethod::Procedure(_) => http::Method::POST,
559    };
560
561    let mut builder = Request::builder().method(method).uri(url.as_str());
562
563    let has_content_type = opts
564        .extra_headers
565        .iter()
566        .any(|(name, _)| name == CONTENT_TYPE);
567
568    if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
569        // Only set default Content-Type if not provided in extra_headers
570        if !has_content_type {
571            builder = builder.header(Header::ContentType, encoding);
572        }
573    }
574    let output_encoding = <R::Response as XrpcResp>::ENCODING;
575    builder = builder.header(http::header::ACCEPT, output_encoding);
576
577    if let Some(token) = &opts.auth {
578        let hv = match token {
579            AuthorizationToken::Bearer(t) => {
580                HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
581            }
582            AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
583        }
584        .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?;
585        builder = builder.header(Header::Authorization, hv);
586    }
587
588    if let Some(proxy) = &opts.atproto_proxy {
589        builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
590    }
591    if let Some(labelers) = &opts.atproto_accept_labelers {
592        if !labelers.is_empty() {
593            let joined = labelers
594                .iter()
595                .map(|s| s.as_ref())
596                .collect::<Vec<_>>()
597                .join(", ");
598            builder = builder.header(Header::AtprotoAcceptLabelers, joined);
599        }
600    }
601    for (name, value) in &opts.extra_headers {
602        builder = builder.header(name, value);
603    }
604
605    let body = if let XrpcMethod::Procedure(_) = R::METHOD {
606        req.encode_body()
607            .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))?
608    } else {
609        vec![]
610    };
611
612    builder
613        .body(body)
614        .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e)))
615}
616
617/// XRPC response wrapper that owns the response buffer
618///
619/// Allows borrowing from the buffer when parsing to avoid unnecessary allocations.
620/// Generic over the response marker type (e.g., `GetAuthorFeedResponse`), not the request.
621pub struct Response<Resp>
622where
623    Resp: XrpcResp, // HRTB: Resp works with any lifetime
624{
625    _marker: PhantomData<fn() -> Resp>,
626    buffer: Bytes,
627    status: StatusCode,
628}
629
630impl<R> Response<R>
631where
632    R: XrpcResp,
633{
634    /// Create a new response from a buffer and status code
635    pub fn new(buffer: Bytes, status: StatusCode) -> Self {
636        Self {
637            buffer,
638            status,
639            _marker: PhantomData,
640        }
641    }
642
643    /// Get the HTTP status code
644    pub fn status(&self) -> StatusCode {
645        self.status
646    }
647
648    /// Get the raw buffer
649    pub fn buffer(&self) -> &Bytes {
650        &self.buffer
651    }
652
653    /// Parse the response, borrowing from the internal buffer
654    pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
655        // 200: parse as output
656        if self.status.is_success() {
657            match R::decode_output(&self.buffer) {
658                Ok(output) => Ok(output),
659                Err(e) => Err(XrpcError::Decode(e)),
660            }
661        // 400: try typed XRPC error, fallback to generic error
662        } else if self.status.as_u16() == 400 {
663            match serde_json::from_slice::<_>(&self.buffer) {
664                Ok(error) => Err(XrpcError::Xrpc(error)),
665                Err(_) => {
666                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
667                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
668                        Ok(mut generic) => {
669                            generic.nsid = R::NSID;
670                            generic.method = ""; // method info only available on request
671                            generic.http_status = self.status;
672                            // Map auth-related errors to AuthError
673                            match generic.error.as_str() {
674                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
675                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
676                                _ => Err(XrpcError::Generic(generic)),
677                            }
678                        }
679                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
680                    }
681                }
682            }
683        // 401: always auth error
684        } else {
685            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
686                Ok(mut generic) => {
687                    generic.nsid = R::NSID;
688                    generic.method = ""; // method info only available on request
689                    generic.http_status = self.status;
690                    match generic.error.as_str() {
691                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
692                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
693                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
694                    }
695                }
696                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
697            }
698        }
699    }
700
701    /// Parse this as validated, loosely typed atproto data.
702    ///
703    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
704    pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
705        // 200: parse as output
706        if self.status.is_success() {
707            match serde_json::from_slice::<_>(&self.buffer) {
708                Ok(output) => Ok(output),
709                Err(_) => {
710                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
711                        if let Ok(data) = Data::from_cbor(&data) {
712                            Ok(data.into_static())
713                        } else {
714                            Ok(Data::Bytes(self.buffer.clone()))
715                        }
716                    } else {
717                        Ok(Data::Bytes(self.buffer.clone()))
718                    }
719                }
720            }
721        // 400: try typed XRPC error, fallback to generic error
722        } else if self.status.as_u16() == 400 {
723            match serde_json::from_slice::<_>(&self.buffer) {
724                Ok(error) => Err(XrpcError::Xrpc(error)),
725                Err(_) => {
726                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
727                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
728                        Ok(mut generic) => {
729                            generic.nsid = R::NSID;
730                            generic.method = ""; // method info only available on request
731                            generic.http_status = self.status;
732                            // Map auth-related errors to AuthError
733                            match generic.error.as_str() {
734                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
735                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
736                                _ => Err(XrpcError::Generic(generic)),
737                            }
738                        }
739                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
740                    }
741                }
742            }
743        // 401: always auth error
744        } else {
745            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
746                Ok(mut generic) => {
747                    generic.nsid = R::NSID;
748                    generic.method = ""; // method info only available on request
749                    generic.http_status = self.status;
750                    match generic.error.as_str() {
751                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
752                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
753                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
754                    }
755                }
756                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
757            }
758        }
759    }
760
761    /// Parse this as raw atproto data with minimal validation.
762    ///
763    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
764    pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
765        // 200: parse as output
766        if self.status.is_success() {
767            match serde_json::from_slice::<_>(&self.buffer) {
768                Ok(output) => Ok(output),
769                Err(_) => {
770                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
771                        if let Ok(data) = RawData::from_cbor(&data) {
772                            Ok(data.into_static())
773                        } else {
774                            Ok(RawData::Bytes(self.buffer.clone()))
775                        }
776                    } else {
777                        Ok(RawData::Bytes(self.buffer.clone()))
778                    }
779                }
780            }
781        // 400: try typed XRPC error, fallback to generic error
782        } else if self.status.as_u16() == 400 {
783            match serde_json::from_slice::<_>(&self.buffer) {
784                Ok(error) => Err(XrpcError::Xrpc(error)),
785                Err(_) => {
786                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
787                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
788                        Ok(mut generic) => {
789                            generic.nsid = R::NSID;
790                            generic.method = ""; // method info only available on request
791                            generic.http_status = self.status;
792                            // Map auth-related errors to AuthError
793                            match generic.error.as_str() {
794                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
795                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
796                                _ => Err(XrpcError::Generic(generic)),
797                            }
798                        }
799                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
800                    }
801                }
802            }
803        // 401: always auth error
804        } else {
805            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
806                Ok(mut generic) => {
807                    generic.nsid = R::NSID;
808                    generic.method = ""; // method info only available on request
809                    generic.http_status = self.status;
810                    match generic.error.as_str() {
811                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
812                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
813                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
814                    }
815                }
816                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
817            }
818        }
819    }
820
821    /// Reinterpret this response as a different response type.
822    ///
823    /// This transmutes the response by keeping the same buffer and status code,
824    /// but changing the type-level marker. Useful for converting generic XRPC responses
825    /// into collection-specific typed responses.
826    ///
827    /// # Safety
828    ///
829    /// This is safe in the sense that no memory unsafety occurs, but logical correctness
830    /// depends on ensuring the buffer actually contains data that can deserialize to `NEW`.
831    /// Incorrect conversion will cause deserialization errors at runtime.
832    pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
833        Response {
834            buffer: self.buffer,
835            status: self.status,
836            _marker: PhantomData,
837        }
838    }
839}
840
841/// doc
842pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
843/// doc
844pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
845
846impl<R> Response<R>
847where
848    R: XrpcResp,
849{
850    /// Parse the response into an owned output
851    pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
852    where
853        for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
854        for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
855    {
856        fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
857            serde_json::from_slice(buffer)
858        }
859
860        // 200: parse as output
861        if self.status.is_success() {
862            match R::decode_output(&self.buffer) {
863                Ok(output) => Ok(output.into_static()),
864                Err(e) => Err(XrpcError::Decode(e)),
865            }
866        // 400: try typed XRPC error, fallback to generic error
867        } else if self.status.as_u16() == 400 {
868            let error = match parse_error::<R>(&self.buffer) {
869                Ok(error) => XrpcError::Xrpc(error),
870                Err(_) => {
871                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
872                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
873                        Ok(mut generic) => {
874                            generic.nsid = R::NSID;
875                            generic.method = ""; // method info only available on request
876                            generic.http_status = self.status;
877                            // Map auth-related errors to AuthError
878                            match generic.error.as_ref() {
879                                "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
880                                "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
881                                _ => XrpcError::Generic(generic),
882                            }
883                        }
884                        Err(e) => XrpcError::Decode(DecodeError::Json(e)),
885                    }
886                }
887            };
888            Err(error.into_static())
889        // 401: always auth error
890        } else {
891            let error: XrpcError<<R as XrpcResp>::Err<'_>> =
892                match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
893                    Ok(mut generic) => {
894                        let status = self.status;
895                        generic.nsid = R::NSID;
896                        generic.method = ""; // method info only available on request
897                        generic.http_status = status;
898                        match generic.error.as_ref() {
899                            "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
900                            "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
901                            _ => XrpcError::Auth(AuthError::NotAuthenticated),
902                        }
903                    }
904                    Err(e) => XrpcError::Decode(DecodeError::Json(e)),
905                };
906
907            Err(error.into_static())
908        }
909    }
910}
911
912/// Generic XRPC error format for untyped errors like InvalidRequest
913///
914/// Used when the error doesn't match the endpoint's specific error enum
915#[derive(Debug, Clone, Deserialize, Serialize)]
916pub struct GenericXrpcError {
917    /// Error code (e.g., "InvalidRequest")
918    pub error: SmolStr,
919    /// Optional error message with details
920    pub message: Option<SmolStr>,
921    /// XRPC method NSID that produced this error (context only; not serialized)
922    #[serde(skip)]
923    pub nsid: &'static str,
924    /// HTTP method used (GET/POST) (context only; not serialized)
925    #[serde(skip)]
926    pub method: &'static str,
927    /// HTTP status code (context only; not serialized)
928    #[serde(skip)]
929    pub http_status: StatusCode,
930}
931
932impl std::fmt::Display for GenericXrpcError {
933    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934        if let Some(msg) = &self.message {
935            write!(
936                f,
937                "{}: {} (nsid={}, method={}, status={})",
938                self.error, msg, self.nsid, self.method, self.http_status
939            )
940        } else {
941            write!(
942                f,
943                "{} (nsid={}, method={}, status={})",
944                self.error, self.nsid, self.method, self.http_status
945            )
946        }
947    }
948}
949
950impl IntoStatic for GenericXrpcError {
951    type Output = Self;
952
953    fn into_static(self) -> Self::Output {
954        self
955    }
956}
957
958impl std::error::Error for GenericXrpcError {}
959
960/// XRPC-specific errors returned from endpoints
961///
962/// Represents errors returned in the response body
963/// Type parameter `E` is the endpoint's specific error enum type.
964#[derive(Debug, thiserror::Error, miette::Diagnostic)]
965pub enum XrpcError<E: std::error::Error + IntoStatic> {
966    /// Typed XRPC error from the endpoint's specific error enum
967    #[error("XRPC error: {0}")]
968    #[diagnostic(code(jacquard_common::xrpc::typed))]
969    Xrpc(E),
970
971    /// Authentication error (ExpiredToken, InvalidToken, etc.)
972    #[error("Authentication error: {0}")]
973    #[diagnostic(code(jacquard_common::xrpc::auth))]
974    Auth(#[from] AuthError),
975
976    /// Generic XRPC error not in the endpoint's error enum (e.g., InvalidRequest)
977    #[error("XRPC error: {0}")]
978    #[diagnostic(code(jacquard_common::xrpc::generic))]
979    Generic(GenericXrpcError),
980
981    /// Failed to decode the response body
982    #[error("Failed to decode response: {0}")]
983    #[diagnostic(code(jacquard_common::xrpc::decode))]
984    Decode(#[from] DecodeError),
985}
986
987impl<E> IntoStatic for XrpcError<E>
988where
989    E: std::error::Error + IntoStatic,
990    E::Output: std::error::Error + IntoStatic,
991    <E as IntoStatic>::Output: std::error::Error + IntoStatic,
992{
993    type Output = XrpcError<E::Output>;
994    fn into_static(self) -> Self::Output {
995        match self {
996            XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
997            XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
998            XrpcError::Generic(e) => XrpcError::Generic(e),
999            XrpcError::Decode(e) => XrpcError::Decode(e),
1000        }
1001    }
1002}
1003
1004impl<E> Serialize for XrpcError<E>
1005where
1006    E: std::error::Error + IntoStatic + Serialize,
1007{
1008    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1009    where
1010        S: serde::Serializer,
1011    {
1012        use serde::ser::SerializeStruct;
1013
1014        match self {
1015            // Typed errors already serialize to correct atproto format
1016            XrpcError::Xrpc(e) => e.serialize(serializer),
1017            // Generic errors already have correct format
1018            XrpcError::Generic(g) => g.serialize(serializer),
1019            // Auth and Decode need manual mapping to {"error": "...", "message": ...}
1020            XrpcError::Auth(auth) => {
1021                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1022                let (error, message) = match auth {
1023                    AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")),
1024                    AuthError::InvalidToken => {
1025                        ("InvalidToken", Some("Access token is invalid or malformed"))
1026                    }
1027                    AuthError::RefreshFailed => {
1028                        ("RefreshFailed", Some("Token refresh request failed"))
1029                    }
1030                    AuthError::NotAuthenticated => (
1031                        "AuthenticationRequired",
1032                        Some("Request requires authentication but none was provided"),
1033                    ),
1034                    AuthError::Other(hv) => {
1035                        let msg = hv.to_str().unwrap_or("[non-utf8 header]");
1036                        ("AuthenticationError", Some(msg))
1037                    }
1038                };
1039                state.serialize_field("error", error)?;
1040                if let Some(msg) = message {
1041                    state.serialize_field("message", msg)?;
1042                }
1043                state.end()
1044            }
1045            XrpcError::Decode(decode_err) => {
1046                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1047                state.serialize_field("error", "ResponseDecodeError")?;
1048                // Convert DecodeError to string for message field
1049                let msg = format!("{:?}", decode_err);
1050                state.serialize_field("message", &msg)?;
1051                state.end()
1052            }
1053        }
1054    }
1055}
1056
1057#[cfg(feature = "streaming")]
1058impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
1059    /// Send an XRPC call and stream the binary response.
1060    ///
1061    /// Useful for downloading blobs and entire repository archives
1062    pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
1063    where
1064        R: XrpcRequest,
1065        <R as XrpcRequest>::Response: Send + Sync,
1066    {
1067        let http_request =
1068            build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
1069
1070        let http_response = self
1071            .client
1072            .send_http_streaming(http_request)
1073            .await
1074            .map_err(StreamError::transport)?;
1075        let (parts, body) = http_response.into_parts();
1076
1077        Ok(StreamingResponse::new(parts, body))
1078    }
1079
1080    /// Stream an XRPC procedure call and its response
1081    ///
1082    /// Useful for streaming upload of large payloads, or for "pipe-through" operations
1083    /// where you are processing a large payload.
1084    pub async fn stream<S>(
1085        self,
1086        stream: XrpcProcedureSend<S::Frame<'static>>,
1087    ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1088    where
1089        S: XrpcProcedureStream + 'static,
1090        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1091    {
1092        use futures::TryStreamExt;
1093
1094        let mut url = self.base;
1095        let mut path = url.path().trim_end_matches('/').to_owned();
1096        path.push_str("/xrpc/");
1097        path.push_str(<S::Request as XrpcRequest>::NSID);
1098        url.set_path(&path);
1099
1100        let mut builder = http::Request::post(url.to_string());
1101
1102        if let Some(token) = &self.opts.auth {
1103            let hv = match token {
1104                AuthorizationToken::Bearer(t) => {
1105                    HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1106                }
1107                AuthorizationToken::Dpop(t) => {
1108                    HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1109                }
1110            }
1111            .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1112            builder = builder.header(Header::Authorization, hv);
1113        }
1114
1115        if let Some(proxy) = &self.opts.atproto_proxy {
1116            builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1117        }
1118        if let Some(labelers) = &self.opts.atproto_accept_labelers {
1119            if !labelers.is_empty() {
1120                let joined = labelers
1121                    .iter()
1122                    .map(|s| s.as_ref())
1123                    .collect::<Vec<_>>()
1124                    .join(", ");
1125                builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1126            }
1127        }
1128
1129        for (name, value) in &self.opts.extra_headers {
1130            builder = builder.header(name, value);
1131        }
1132
1133        let (parts, _) = builder
1134            .body(())
1135            .map_err(|e| StreamError::protocol(e.to_string()))?
1136            .into_parts();
1137
1138        let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer));
1139
1140        let resp = self
1141            .client
1142            .send_http_bidirectional(parts, body_stream)
1143            .await
1144            .map_err(StreamError::transport)?;
1145
1146        let (parts, body) = resp.into_parts();
1147
1148        Ok(XrpcResponseStream::<
1149            <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1150        >::from_typed_parts(parts, body))
1151    }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157    use serde::{Deserialize, Serialize};
1158
1159    #[derive(Serialize, Deserialize)]
1160    #[allow(dead_code)]
1161    struct DummyReq;
1162
1163    #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1164    #[error("{0}")]
1165    struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1166
1167    impl IntoStatic for DummyErr<'_> {
1168        type Output = DummyErr<'static>;
1169        fn into_static(self) -> Self::Output {
1170            DummyErr(self.0.into_static())
1171        }
1172    }
1173
1174    struct DummyResp;
1175
1176    impl XrpcResp for DummyResp {
1177        const NSID: &'static str = "test.dummy";
1178        const ENCODING: &'static str = "application/json";
1179        type Output<'de> = ();
1180        type Err<'de> = DummyErr<'de>;
1181    }
1182
1183    impl XrpcRequest for DummyReq {
1184        const NSID: &'static str = "test.dummy";
1185        const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1186        type Response = DummyResp;
1187    }
1188
1189    #[test]
1190    fn generic_error_carries_context() {
1191        let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1192        let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1193        let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1194        match resp.parse().unwrap_err() {
1195            XrpcError::Generic(g) => {
1196                assert_eq!(g.error.as_str(), "InvalidRequest");
1197                assert_eq!(g.message.as_deref(), Some("missing"));
1198                assert_eq!(g.nsid, DummyResp::NSID);
1199                assert_eq!(g.method, ""); // method info only on request
1200                assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1201            }
1202            other => panic!("unexpected: {other:?}"),
1203        }
1204    }
1205
1206    #[test]
1207    fn auth_error_mapping() {
1208        for (code, expect) in [
1209            ("ExpiredToken", AuthError::TokenExpired),
1210            ("InvalidToken", AuthError::InvalidToken),
1211        ] {
1212            let body = serde_json::json!({"error": code});
1213            let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1214            let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1215            match resp.parse().unwrap_err() {
1216                XrpcError::Auth(e) => match (e, expect) {
1217                    (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1218                    (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1219                    other => panic!("mismatch: {other:?}"),
1220                },
1221                other => panic!("unexpected: {other:?}"),
1222            }
1223        }
1224    }
1225
1226    #[test]
1227    fn no_double_slash_in_path() {
1228        #[derive(Serialize, Deserialize)]
1229        struct Req;
1230        #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1231        #[error("{0}")]
1232        struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1233        impl IntoStatic for Err<'_> {
1234            type Output = Err<'static>;
1235            fn into_static(self) -> Self::Output {
1236                Err(self.0.into_static())
1237            }
1238        }
1239        struct Resp;
1240        impl XrpcResp for Resp {
1241            const NSID: &'static str = "com.example.test";
1242            const ENCODING: &'static str = "application/json";
1243            type Output<'de> = ();
1244            type Err<'de> = Err<'de>;
1245        }
1246        impl XrpcRequest for Req {
1247            const NSID: &'static str = "com.example.test";
1248            const METHOD: XrpcMethod = XrpcMethod::Query;
1249            type Response = Resp;
1250        }
1251
1252        let opts = CallOptions::default();
1253        for base in [
1254            Url::parse("https://pds").unwrap(),
1255            Url::parse("https://pds/").unwrap(),
1256            Url::parse("https://pds/base/").unwrap(),
1257        ] {
1258            let req = build_http_request(&base, &Req, &opts).unwrap();
1259            let uri = req.uri().to_string();
1260            assert!(uri.contains("/xrpc/com.example.test"));
1261            assert!(!uri.contains("//xrpc"));
1262        }
1263    }
1264}