Skip to main content

jacquard_common/
xrpc.rs

1//! # Stateless XRPC utilities and request/response mapping
2//!
3//! Mapping overview:
4//! - Success (2xx): parse body into the endpoint's typed output.
5//! - 400: try typed error; on failure, fall back to a generic XRPC error (with
6//!   `nsid`, `method`, and `http_status`) and map common auth errors.
7//! - 401: if `WWW-Authenticate` is present, return
8//!   `ClientError::Auth(AuthError::Other(header))` so higher layers (OAuth/DPoP)
9//!   can inspect `error="invalid_token"` or `error="use_dpop_nonce"` and refresh/retry.
10//!   If the header is absent, parse the body and map auth errors to
11//!   `AuthError::TokenExpired`/`InvalidToken`.
12
13#[cfg(feature = "streaming")]
14pub mod streaming;
15
16/// Hand-written XRPC types for com.atproto endpoints (bootstrap types).
17pub mod atproto;
18
19use alloc::borrow::ToOwned;
20use alloc::boxed::Box;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23use ipld_core::ipld::Ipld;
24#[cfg(feature = "streaming")]
25pub use streaming::{
26    StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
27};
28
29#[cfg(feature = "websocket")]
30pub mod subscription;
31
32#[cfg(feature = "streaming")]
33use crate::StreamError;
34use crate::error::DecodeError;
35use crate::http_client::HttpClient;
36#[cfg(feature = "streaming")]
37use crate::http_client::HttpClientExt;
38use crate::types::value::Data;
39use crate::{AuthorizationToken, error::AuthError};
40use crate::{CowStr, error::XrpcResult};
41use crate::{IntoStatic, types::value::RawData};
42use bytes::Bytes;
43use core::error::Error;
44use core::fmt::{self, Debug};
45use core::marker::PhantomData;
46use http::{
47    HeaderName, HeaderValue, Request, StatusCode,
48    header::{AUTHORIZATION, CONTENT_TYPE},
49};
50use serde::{Deserialize, Serialize};
51use smol_str::SmolStr;
52
53use crate::deps::fluent_uri::Uri;
54#[cfg(feature = "websocket")]
55pub use subscription::{
56    BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
57    SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
58    SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
59};
60
61/// Normalize a base URI by removing trailing slashes.
62///
63/// This is useful for XRPC clients where the base URI might be provided with
64/// a trailing slash (e.g., "<https://bsky.social/>") but needs to be normalized
65/// for consistent path building. Since trimming a trailing slash from a valid URI
66/// always yields a valid URI, the result is guaranteed to be valid.
67pub fn normalize_base_uri(uri: Uri<String>) -> Uri<String> {
68    let s = uri.as_str();
69    if s.ends_with('/') && s.len() > 1 {
70        let trimmed = s.trim_end_matches('/');
71        // Invariant: trimming trailing slashes from a valid URI always yields a valid URI.
72        Uri::parse(trimmed.to_string())
73            .expect("trimming trailing slash from valid URI yields valid URI")
74    } else {
75        uri
76    }
77}
78
79/// Error type for encoding XRPC requests
80#[derive(Debug, thiserror::Error)]
81#[cfg_attr(feature = "std", derive(miette::Diagnostic))]
82#[non_exhaustive]
83pub enum EncodeError {
84    /// Failed to serialize query parameters
85    #[error("Failed to serialize query: {0}")]
86    Query(
87        #[from]
88        #[source]
89        serde_html_form::ser::Error,
90    ),
91    /// Failed to serialize JSON body
92    #[error("Failed to serialize JSON: {0}")]
93    Json(
94        #[from]
95        #[source]
96        serde_json::Error,
97    ),
98    /// Other encoding error
99    #[error("Encoding error: {0}")]
100    Other(String),
101}
102
103/// XRPC method type
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
105pub enum XrpcMethod {
106    /// Query (HTTP GET)
107    Query,
108    /// Procedure (HTTP POST)
109    Procedure(&'static str),
110}
111
112impl XrpcMethod {
113    /// Get the HTTP method string
114    pub const fn as_str(&self) -> &'static str {
115        match self {
116            Self::Query => "GET",
117            Self::Procedure(_) => "POST",
118        }
119    }
120
121    /// Get the body encoding type for this method (procedures only)
122    pub const fn body_encoding(&self) -> Option<&'static str> {
123        match self {
124            Self::Query => None,
125            Self::Procedure(enc) => Some(enc),
126        }
127    }
128}
129
130/// Trait for XRPC request types (queries and procedures)
131///
132/// This trait provides metadata about XRPC endpoints including the NSID,
133/// HTTP method, encoding, and associated output type.
134///
135/// The trait is implemented on the request parameters/input type itself.
136pub trait XrpcRequest: Serialize {
137    /// The NSID for this XRPC method
138    const NSID: &'static str;
139
140    /// XRPC method (query/GET or procedure/POST)
141    const METHOD: XrpcMethod;
142
143    /// Response type returned from the XRPC call (marker struct)
144    type Response: XrpcResp;
145
146    /// Encode the request body for procedures.
147    ///
148    /// Default implementation serializes to JSON. Override for non-JSON encodings.
149    fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
150        Ok(serde_json::to_vec(self)?)
151    }
152
153    /// Decode the request body for procedures.
154    ///
155    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
156    fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError>
157    where
158        Self: Deserialize<'de>,
159    {
160        let body: Self = serde_json::from_slice(body)?;
161
162        Ok(Box::new(body))
163    }
164}
165
166/// Trait for XRPC Response types
167///
168/// It mirrors the NSID and carries the encoding types as well as Output (success) and Err types
169pub trait XrpcResp {
170    /// The NSID for this XRPC method
171    const NSID: &'static str;
172
173    /// Output encoding (MIME type)
174    const ENCODING: &'static str;
175
176    /// Response output type
177    type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
178
179    /// Error type for this request
180    type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic;
181
182    /// Output body encoding function, similar to the request-side type
183    fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
184        Ok(serde_json::to_vec(output)?)
185    }
186
187    /// Decode the response output body.
188    ///
189    /// Default implementation deserializes from JSON. Override for non-JSON encodings.
190    fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError>
191    where
192        Self::Output<'de>: Deserialize<'de>,
193    {
194        let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
195        Ok(body)
196    }
197}
198
199/// XRPC server endpoint trait
200///
201/// Defines the fully-qualified path and method, as well as request and response types
202/// This exists primarily to work around lifetime issues for crates like Axum
203/// by moving the lifetime from the trait itself into an associated type.
204///
205/// It is implemented by the code generation on a marker struct, like the client-side [XrpcResp] trait.
206pub trait XrpcEndpoint {
207    /// Fully-qualified path ('/xrpc/\[nsid\]') where this endpoint should live on the server
208    const PATH: &'static str;
209    /// XRPC method (query/GET or procedure/POST)
210    const METHOD: XrpcMethod;
211    /// XRPC Request data type
212    type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
213    /// XRPC Response data type
214    type Response: XrpcResp;
215}
216
217/// Error type for XRPC endpoints that don't define any errors
218#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
219pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
220
221impl<'de> fmt::Display for GenericError<'de> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        self.0.fmt(f)
224    }
225}
226
227impl Error for GenericError<'_> {}
228
229impl IntoStatic for GenericError<'_> {
230    type Output = GenericError<'static>;
231    fn into_static(self) -> Self::Output {
232        GenericError(self.0.into_static())
233    }
234}
235
236/// Per-request options for XRPC calls.
237#[derive(Debug, Default, Clone)]
238pub struct CallOptions<'a> {
239    /// Optional Authorization to apply (`Bearer` or `DPoP`).
240    pub auth: Option<AuthorizationToken<'a>>,
241    /// `atproto-proxy` header value.
242    pub atproto_proxy: Option<CowStr<'a>>,
243    /// `atproto-accept-labelers` header values.
244    pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
245    /// Extra headers to attach to this request.
246    pub extra_headers: Vec<(HeaderName, HeaderValue)>,
247}
248
249impl IntoStatic for CallOptions<'_> {
250    type Output = CallOptions<'static>;
251
252    fn into_static(self) -> Self::Output {
253        CallOptions {
254            auth: self.auth.map(|auth| auth.into_static()),
255            atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
256            atproto_accept_labelers: self
257                .atproto_accept_labelers
258                .map(|labelers| labelers.into_static()),
259            extra_headers: self.extra_headers,
260        }
261    }
262}
263
264/// Extension for stateless XRPC calls on any `HttpClient`.
265///
266/// Example
267/// ```no_run
268/// # #[tokio::main]
269/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270/// use jacquard_common::xrpc::XrpcExt;
271/// use jacquard_common::http_client::HttpClient;
272/// use jacquard_common::deps::fluent_uri::Uri;
273///
274/// let http = reqwest::Client::new();
275/// let base = Uri::parse("https://public.api.bsky.app").unwrap().to_owned();
276/// // let resp = http.xrpc(base).send(&request).await?;
277/// # Ok(())
278/// # }
279/// ```
280pub trait XrpcExt: HttpClient {
281    /// Start building an XRPC call for the given base URI.
282    fn xrpc<'a>(&'a self, base: Uri<String>) -> XrpcCall<'a, Self>
283    where
284        Self: Sized,
285    {
286        XrpcCall {
287            client: self,
288            base,
289            opts: CallOptions::default(),
290        }
291    }
292}
293
294impl<T: HttpClient> XrpcExt for T {}
295
296/// Nicer alias for Xrpc response type
297pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
298
299/// Stateful XRPC call trait
300#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
301pub trait XrpcClient: HttpClient {
302    /// Get the base URI for the client.
303    fn base_uri(&self) -> impl Future<Output = Uri<String>>;
304
305    /// Set the base URI for the client.
306    ///
307    /// The implementation should strip any trailing slash from the URI path before storing.
308    fn set_base_uri(&self, uri: Uri<String>) -> impl Future<Output = ()> {
309        let _ = uri;
310        async {}
311    }
312
313    /// Get the call options for the client.
314    fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
315        async { CallOptions::default() }
316    }
317
318    /// Set the call options for the client.
319    fn set_opts(&self, opts: CallOptions) -> impl Future<Output = ()> {
320        let _ = opts;
321        async {}
322    }
323
324    /// Send an XRPC request and parse the response
325    #[cfg(not(target_arch = "wasm32"))]
326    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
327    where
328        R: XrpcRequest + Send + Sync,
329        <R as XrpcRequest>::Response: Send + Sync,
330        Self: Sync;
331
332    /// Send an XRPC request and parse the response
333    #[cfg(target_arch = "wasm32")]
334    fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
335    where
336        R: XrpcRequest + Send + Sync,
337        <R as XrpcRequest>::Response: Send + Sync;
338
339    /// Send an XRPC request and parse the response
340    #[cfg(not(target_arch = "wasm32"))]
341    fn send_with_opts<R>(
342        &self,
343        request: R,
344        opts: CallOptions<'_>,
345    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
346    where
347        R: XrpcRequest + Send + Sync,
348        <R as XrpcRequest>::Response: Send + Sync,
349        Self: Sync;
350
351    /// Send an XRPC request with custom options and parse the response
352    #[cfg(target_arch = "wasm32")]
353    fn send_with_opts<R>(
354        &self,
355        request: R,
356        opts: CallOptions<'_>,
357    ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
358    where
359        R: XrpcRequest + Send + Sync,
360        <R as XrpcRequest>::Response: Send + Sync;
361}
362
363/// Stateful XRPC streaming client trait
364#[cfg(feature = "streaming")]
365pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
366    /// Send an XRPC request and stream the response
367    #[cfg(not(target_arch = "wasm32"))]
368    fn download<R>(
369        &self,
370        request: R,
371    ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
372    where
373        R: XrpcRequest + Send + Sync,
374        <R as XrpcRequest>::Response: Send + Sync,
375        Self: Sync;
376
377    /// Send an XRPC request and stream the response
378    #[cfg(target_arch = "wasm32")]
379    fn download<R>(
380        &self,
381        request: R,
382    ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
383    where
384        R: XrpcRequest + Send + Sync,
385        <R as XrpcRequest>::Response: Send + Sync;
386
387    /// Stream an XRPC procedure call and its response
388    #[cfg(not(target_arch = "wasm32"))]
389    fn stream<S>(
390        &self,
391        stream: XrpcProcedureSend<S::Frame<'static>>,
392    ) -> impl Future<
393        Output = Result<
394            XrpcResponseStream<
395                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
396            >,
397            StreamError,
398        >,
399    >
400    where
401        S: XrpcProcedureStream + 'static,
402        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
403        Self: Sync;
404
405    /// Stream an XRPC procedure call and its response
406    #[cfg(target_arch = "wasm32")]
407    fn stream<S>(
408        &self,
409        stream: XrpcProcedureSend<S::Frame<'static>>,
410    ) -> impl Future<
411        Output = Result<
412            XrpcResponseStream<
413                <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
414            >,
415            StreamError,
416        >,
417    >
418    where
419        S: XrpcProcedureStream + 'static,
420        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
421}
422
423/// Stateless XRPC call builder.
424///
425/// Example (per-request overrides)
426/// ```no_run
427/// # #[tokio::main]
428/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
429/// use jacquard_common::xrpc::XrpcExt;
430/// use jacquard_common::{AuthorizationToken, CowStr};
431/// use jacquard_common::deps::fluent_uri::Uri;
432///
433/// let http = reqwest::Client::new();
434/// let base = Uri::parse("https://public.api.bsky.app").unwrap().to_owned();
435/// let call = http
436///     .xrpc(base)
437///     .auth(AuthorizationToken::Bearer(CowStr::from("ACCESS_JWT")))
438///     .accept_labelers(vec![CowStr::from("did:plc:labelerid")])
439///     .header(http::header::USER_AGENT, http::HeaderValue::from_static("jacquard-example"));
440/// // let resp = call.send(&request).await?;
441/// # Ok(())
442/// # }
443/// ```
444pub struct XrpcCall<'a, C: HttpClient> {
445    pub(crate) client: &'a C,
446    pub(crate) base: Uri<String>,
447    pub(crate) opts: CallOptions<'a>,
448}
449
450impl<'a, C: HttpClient> XrpcCall<'a, C> {
451    /// Apply Authorization to this call.
452    pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
453        self.opts.auth = Some(token);
454        self
455    }
456    /// Set `atproto-proxy` header for this call.
457    pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
458        self.opts.atproto_proxy = Some(proxy);
459        self
460    }
461    /// Set `atproto-accept-labelers` header(s) for this call.
462    pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
463        self.opts.atproto_accept_labelers = Some(labelers);
464        self
465    }
466    /// Add an extra header.
467    pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
468        self.opts.extra_headers.push((name, value));
469        self
470    }
471    /// Replace the builder's options entirely.
472    pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
473        self.opts = opts;
474        self
475    }
476
477    /// Send the given typed XRPC request and return a response wrapper.
478    ///
479    /// Note on 401 handling:
480    /// - When the server returns 401 with a `WWW-Authenticate` header, this surfaces as
481    ///   `ClientError::Auth(AuthError::Other(header))` so higher layers (e.g., OAuth/DPoP) can
482    ///   inspect the header for `error="invalid_token"` or `error="use_dpop_nonce"` and react
483    ///   (refresh/retry). If the header is absent, the 401 body flows through to `Response` and
484    ///   can be parsed/mapped to `AuthError` as appropriate.
485    #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
486    pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
487    where
488        R: XrpcRequest,
489        <R as XrpcRequest>::Response: Send + Sync,
490    {
491        let http_request = build_http_request(&self.base, request, &self.opts)?;
492
493        let http_response = self
494            .client
495            .send_http(http_request)
496            .await
497            .map_err(|e| crate::error::ClientError::transport(e).for_nsid(R::NSID))?;
498
499        process_response(http_response)
500    }
501}
502
503/// Process the HTTP response from the server into a proper xrpc response statelessly.
504///
505/// Exposed to make things more easily pluggable
506#[inline]
507pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
508where
509    Resp: XrpcResp,
510{
511    let status = http_response.status();
512
513    // If the server returned 401 with a WWW-Authenticate header, expose it so higher layers
514    // (e.g., DPoP handling) can detect `error="invalid_token"` and trigger refresh.
515    #[allow(deprecated)]
516    if status.as_u16() == 401 {
517        if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
518            return Err(
519                crate::error::ClientError::auth(crate::error::AuthError::Other(hv.clone()))
520                    .for_nsid(Resp::NSID),
521            );
522        }
523    }
524    let buffer = Bytes::from(http_response.into_body());
525
526    if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
527        return Err(crate::error::ClientError::from(crate::error::HttpError {
528            status,
529            body: Some(buffer),
530        })
531        .for_nsid(Resp::NSID));
532    }
533
534    Ok(Response::new(buffer, status))
535}
536
537/// HTTP headers commonly used in XRPC requests
538pub enum Header {
539    /// Content-Type header
540    ContentType,
541    /// Authorization header
542    Authorization,
543    /// `atproto-proxy` header - specifies which service (app server or other atproto service) the user's PDS should forward requests to as appropriate.
544    ///
545    /// See: <https://atproto.com/specs/xrpc#service-proxying>
546    AtprotoProxy,
547    /// `atproto-accept-labelers` header used by clients to request labels from specific labelers to be included and applied in the response. See [label](https://atproto.com/specs/label) specification for details.
548    AtprotoAcceptLabelers,
549}
550
551impl From<Header> for HeaderName {
552    fn from(value: Header) -> Self {
553        match value {
554            Header::ContentType => CONTENT_TYPE,
555            Header::Authorization => AUTHORIZATION,
556            Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
557            Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
558        }
559    }
560}
561
562/// Construct an XRPC endpoint URI from a base URI, NSID, and optional query string.
563///
564/// This helper:
565/// 1. Extracts scheme and authority from the base URI
566/// 2. Gets the base path (already guaranteed no trailing slash from `set_base_uri`)
567/// 3. Builds new path: `{base_path}/xrpc/{nsid}`
568/// 4. Optionally sets query from serialized parameters
569/// 5. Returns the constructed URI
570fn xrpc_endpoint_uri(
571    base: &Uri<String>,
572    nsid: &str,
573    query: Option<&str>,
574) -> XrpcResult<Uri<String>> {
575    use crate::error::ClientError;
576
577    let base_path = base.path().as_str().trim_end_matches('/');
578
579    // Calculate approximate capacity: scheme + "://" + authority + base_path + "/xrpc/" + nsid + optional query
580    let capacity = base.scheme().as_str().len()
581        + 3 // "://"
582        + base.authority().map(|a| a.as_str().len()).unwrap_or(0)
583        + base_path.len()
584        + 6 // "/xrpc/"
585        + nsid.len()
586        + query.map(|q| q.len() + 1).unwrap_or(0); // query + "?"
587
588    // Build new path string: {base_path}/xrpc/{nsid}
589    let mut uri_str = String::with_capacity(capacity);
590    uri_str.push_str(base.scheme().as_str());
591    uri_str.push_str("://");
592
593    if let Some(authority) = base.authority() {
594        uri_str.push_str(authority.as_str());
595    }
596
597    uri_str.push_str(base_path);
598    uri_str.push_str("/xrpc/");
599    uri_str.push_str(nsid);
600
601    if let Some(q) = query {
602        uri_str.push('?');
603        uri_str.push_str(q);
604    }
605
606    Uri::parse(uri_str)
607        .map(|u| u.to_owned())
608        .map_err(|_| ClientError::invalid_request("Failed to construct XRPC endpoint URI"))
609}
610
611/// Build an HTTP request for an XRPC call given base URI and options
612pub fn build_http_request<'s, R>(
613    base: &Uri<String>,
614    req: &R,
615    opts: &CallOptions<'_>,
616) -> XrpcResult<Request<Vec<u8>>>
617where
618    R: XrpcRequest,
619{
620    use crate::error::ClientError;
621
622    // Determine query string for Query methods
623    let query_string = if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
624        let qs = serde_html_form::to_string(&req).map_err(|e| {
625            ClientError::invalid_request(format!("Failed to serialize query: {}", e))
626        })?;
627        if !qs.is_empty() { Some(qs) } else { None }
628    } else {
629        None
630    };
631
632    // Construct the XRPC endpoint URI using the helper
633    let uri = xrpc_endpoint_uri(base, <R as XrpcRequest>::NSID, query_string.as_deref())?;
634
635    let method = match <R as XrpcRequest>::METHOD {
636        XrpcMethod::Query => http::Method::GET,
637        XrpcMethod::Procedure(_) => http::Method::POST,
638    };
639
640    let mut builder = Request::builder().method(method).uri(uri.as_str());
641
642    let has_content_type = opts
643        .extra_headers
644        .iter()
645        .any(|(name, _)| name == CONTENT_TYPE);
646
647    if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
648        // Only set default Content-Type if not provided in extra_headers
649        if !has_content_type {
650            builder = builder.header(Header::ContentType, encoding);
651        }
652    }
653    let output_encoding = <R::Response as XrpcResp>::ENCODING;
654    builder = builder.header(http::header::ACCEPT, output_encoding);
655
656    if let Some(token) = &opts.auth {
657        let hv = match token {
658            AuthorizationToken::Bearer(t) => {
659                HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
660            }
661            AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
662        }
663        .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?;
664        builder = builder.header(Header::Authorization, hv);
665    }
666
667    if let Some(proxy) = &opts.atproto_proxy {
668        builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
669    }
670    if let Some(labelers) = &opts.atproto_accept_labelers {
671        if !labelers.is_empty() {
672            let joined = labelers
673                .iter()
674                .map(|s| s.as_ref())
675                .collect::<Vec<_>>()
676                .join(", ");
677            builder = builder.header(Header::AtprotoAcceptLabelers, joined);
678        }
679    }
680    for (name, value) in &opts.extra_headers {
681        builder = builder.header(name, value);
682    }
683
684    let body = if let XrpcMethod::Procedure(_) = R::METHOD {
685        req.encode_body()
686            .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))?
687    } else {
688        vec![]
689    };
690
691    builder
692        .body(body)
693        .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e)))
694}
695
696/// XRPC response wrapper that owns the response buffer
697///
698/// Allows borrowing from the buffer when parsing to avoid unnecessary allocations.
699/// Generic over the response marker type (e.g., `GetAuthorFeedResponse`), not the request.
700pub struct Response<Resp>
701where
702    Resp: XrpcResp, // HRTB: Resp works with any lifetime
703{
704    _marker: PhantomData<fn() -> Resp>,
705    buffer: Bytes,
706    status: StatusCode,
707}
708
709impl<R> Response<R>
710where
711    R: XrpcResp,
712{
713    /// Create a new response from a buffer and status code
714    pub fn new(buffer: Bytes, status: StatusCode) -> Self {
715        Self {
716            buffer,
717            status,
718            _marker: PhantomData,
719        }
720    }
721
722    /// Get the HTTP status code
723    pub fn status(&self) -> StatusCode {
724        self.status
725    }
726
727    /// Get the raw buffer
728    pub fn buffer(&self) -> &Bytes {
729        &self.buffer
730    }
731
732    /// Parse the response, borrowing from the internal buffer
733    pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
734        // 200: parse as output
735        if self.status.is_success() {
736            match R::decode_output(&self.buffer) {
737                Ok(output) => Ok(output),
738                Err(e) => Err(XrpcError::Decode(e)),
739            }
740        // 400: try typed XRPC error, fallback to generic error
741        } else if self.status.as_u16() == 400 {
742            match serde_json::from_slice::<R::Err<'_>>(&self.buffer) {
743                Ok(error) => {
744                    use alloc::string::ToString;
745                    if error.to_string().contains("InvalidToken") {
746                        Err(XrpcError::Auth(AuthError::InvalidToken))
747                    } else if error.to_string().contains("ExpiredToken") {
748                        Err(XrpcError::Auth(AuthError::TokenExpired))
749                    } else {
750                        Err(XrpcError::Xrpc(error))
751                    }
752                }
753                Err(_) => {
754                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
755                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
756                        Ok(mut generic) => {
757                            generic.nsid = R::NSID;
758                            generic.method = ""; // method info only available on request
759                            generic.http_status = self.status;
760                            // Map auth-related errors to AuthError
761                            match generic.error.as_str() {
762                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
763                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
764                                _ => Err(XrpcError::Generic(generic)),
765                            }
766                        }
767                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
768                    }
769                }
770            }
771        // 401: always auth error
772        } else {
773            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
774                Ok(mut generic) => {
775                    generic.nsid = R::NSID;
776                    generic.method = ""; // method info only available on request
777                    generic.http_status = self.status;
778                    match generic.error.as_str() {
779                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
780                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
781                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
782                    }
783                }
784                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
785            }
786        }
787    }
788
789    /// Parse this as validated, loosely typed atproto data.
790    ///
791    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
792    pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
793        // 200: parse as output
794        if self.status.is_success() {
795            match serde_json::from_slice::<_>(&self.buffer) {
796                Ok(output) => Ok(output),
797                Err(_) => {
798                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
799                        if let Ok(data) = Data::from_cbor(&data) {
800                            Ok(data.into_static())
801                        } else {
802                            Ok(Data::Bytes(self.buffer.clone()))
803                        }
804                    } else {
805                        Ok(Data::Bytes(self.buffer.clone()))
806                    }
807                }
808            }
809        // 400: try typed XRPC error, fallback to generic error
810        } else if self.status.as_u16() == 400 {
811            match serde_json::from_slice::<R::Err<'_>>(&self.buffer) {
812                Ok(error) => {
813                    use alloc::string::ToString;
814                    if error.to_string().contains("InvalidToken") {
815                        Err(XrpcError::Auth(AuthError::InvalidToken))
816                    } else if error.to_string().contains("ExpiredToken") {
817                        Err(XrpcError::Auth(AuthError::TokenExpired))
818                    } else {
819                        Err(XrpcError::Xrpc(error))
820                    }
821                }
822                Err(_) => {
823                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
824                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
825                        Ok(mut generic) => {
826                            generic.nsid = R::NSID;
827                            generic.method = ""; // method info only available on request
828                            generic.http_status = self.status;
829                            // Map auth-related errors to AuthError
830                            match generic.error.as_str() {
831                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
832                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
833                                _ => Err(XrpcError::Generic(generic)),
834                            }
835                        }
836                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
837                    }
838                }
839            }
840        // 401: always auth error
841        } else {
842            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
843                Ok(mut generic) => {
844                    generic.nsid = R::NSID;
845                    generic.method = ""; // method info only available on request
846                    generic.http_status = self.status;
847                    match generic.error.as_str() {
848                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
849                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
850                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
851                    }
852                }
853                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
854            }
855        }
856    }
857
858    /// Parse this as raw atproto data with minimal validation.
859    ///
860    /// NOTE: If the response is an error, it will still parse as the matching error type for the request.
861    pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
862        // 200: parse as output
863        if self.status.is_success() {
864            match serde_json::from_slice::<_>(&self.buffer) {
865                Ok(output) => Ok(output),
866                Err(_) => {
867                    if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
868                        if let Ok(data) = RawData::from_cbor(&data) {
869                            Ok(data.into_static())
870                        } else {
871                            Ok(RawData::Bytes(self.buffer.clone()))
872                        }
873                    } else {
874                        Ok(RawData::Bytes(self.buffer.clone()))
875                    }
876                }
877            }
878        // 400: try typed XRPC error, fallback to generic error
879        } else if self.status.as_u16() == 400 {
880            match serde_json::from_slice::<R::Err<'_>>(&self.buffer) {
881                Ok(error) => {
882                    use alloc::string::ToString;
883                    if error.to_string().contains("InvalidToken") {
884                        Err(XrpcError::Auth(AuthError::InvalidToken))
885                    } else if error.to_string().contains("ExpiredToken") {
886                        Err(XrpcError::Auth(AuthError::TokenExpired))
887                    } else {
888                        Err(XrpcError::Xrpc(error))
889                    }
890                }
891                Err(_) => {
892                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
893                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
894                        Ok(mut generic) => {
895                            generic.nsid = R::NSID;
896                            generic.method = ""; // method info only available on request
897                            generic.http_status = self.status;
898                            // Map auth-related errors to AuthError
899                            match generic.error.as_str() {
900                                "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
901                                "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
902                                _ => Err(XrpcError::Generic(generic)),
903                            }
904                        }
905                        Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
906                    }
907                }
908            }
909        // 401: always auth error
910        } else {
911            match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
912                Ok(mut generic) => {
913                    generic.nsid = R::NSID;
914                    generic.method = ""; // method info only available on request
915                    generic.http_status = self.status;
916                    match generic.error.as_str() {
917                        "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
918                        "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
919                        _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
920                    }
921                }
922                Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
923            }
924        }
925    }
926
927    /// Reinterpret this response as a different response type.
928    ///
929    /// This transmutes the response by keeping the same buffer and status code,
930    /// but changing the type-level marker. Useful for converting generic XRPC responses
931    /// into collection-specific typed responses.
932    ///
933    /// # Safety
934    ///
935    /// This is safe in the sense that no memory unsafety occurs, but logical correctness
936    /// depends on ensuring the buffer actually contains data that can deserialize to `NEW`.
937    /// Incorrect conversion will cause deserialization errors at runtime.
938    pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
939        Response {
940            buffer: self.buffer,
941            status: self.status,
942            _marker: PhantomData,
943        }
944    }
945}
946
947/// doc
948pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
949/// doc
950pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
951
952impl<R> Response<R>
953where
954    R: XrpcResp,
955{
956    /// Parse the response into an owned output
957    pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
958    where
959        for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
960        for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
961    {
962        fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
963            serde_json::from_slice(buffer)
964        }
965
966        // 200: parse as output
967        if self.status.is_success() {
968            match R::decode_output(&self.buffer) {
969                Ok(output) => Ok(output.into_static()),
970                Err(e) => Err(XrpcError::Decode(e)),
971            }
972        // 400: try typed XRPC error, fallback to generic error
973        } else if self.status.as_u16() == 400 {
974            let error = match parse_error::<R>(&self.buffer) {
975                Ok(error) => {
976                    use alloc::string::ToString;
977                    if error.to_string().contains("InvalidToken") {
978                        XrpcError::Auth(AuthError::InvalidToken)
979                    } else if error.to_string().contains("ExpiredToken") {
980                        XrpcError::Auth(AuthError::TokenExpired)
981                    } else {
982                        XrpcError::Xrpc(error)
983                    }
984                }
985                Err(_) => {
986                    // Fallback to generic error (InvalidRequest, ExpiredToken, etc.)
987                    match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
988                        Ok(mut generic) => {
989                            generic.nsid = R::NSID;
990                            generic.method = ""; // method info only available on request
991                            generic.http_status = self.status;
992                            // Map auth-related errors to AuthError
993                            match generic.error.as_ref() {
994                                "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
995                                "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
996                                _ => XrpcError::Generic(generic),
997                            }
998                        }
999                        Err(e) => XrpcError::Decode(DecodeError::Json(e)),
1000                    }
1001                }
1002            };
1003            Err(error.into_static())
1004        // 401: always auth error
1005        } else {
1006            let error: XrpcError<<R as XrpcResp>::Err<'_>> =
1007                match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
1008                    Ok(mut generic) => {
1009                        let status = self.status;
1010                        generic.nsid = R::NSID;
1011                        generic.method = ""; // method info only available on request
1012                        generic.http_status = status;
1013                        match generic.error.as_ref() {
1014                            "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
1015                            "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
1016                            _ => XrpcError::Auth(AuthError::NotAuthenticated),
1017                        }
1018                    }
1019                    Err(e) => XrpcError::Decode(DecodeError::Json(e)),
1020                };
1021
1022            Err(error.into_static())
1023        }
1024    }
1025}
1026
1027/// Generic XRPC error format for untyped errors like InvalidRequest
1028///
1029/// Used when the error doesn't match the endpoint's specific error enum
1030#[derive(Debug, Clone, Deserialize, Serialize)]
1031pub struct GenericXrpcError {
1032    /// Error code (e.g., "InvalidRequest")
1033    pub error: SmolStr,
1034    /// Optional error message with details
1035    pub message: Option<SmolStr>,
1036    /// XRPC method NSID that produced this error (context only; not serialized)
1037    #[serde(skip)]
1038    pub nsid: &'static str,
1039    /// HTTP method used (GET/POST) (context only; not serialized)
1040    #[serde(skip)]
1041    pub method: &'static str,
1042    /// HTTP status code (context only; not serialized)
1043    #[serde(skip)]
1044    pub http_status: StatusCode,
1045}
1046
1047impl core::fmt::Display for GenericXrpcError {
1048    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1049        if let Some(msg) = &self.message {
1050            write!(
1051                f,
1052                "{}: {} (nsid={}, method={}, status={})",
1053                self.error, msg, self.nsid, self.method, self.http_status
1054            )
1055        } else {
1056            write!(
1057                f,
1058                "{} (nsid={}, method={}, status={})",
1059                self.error, self.nsid, self.method, self.http_status
1060            )
1061        }
1062    }
1063}
1064
1065impl IntoStatic for GenericXrpcError {
1066    type Output = Self;
1067
1068    fn into_static(self) -> Self::Output {
1069        self
1070    }
1071}
1072
1073impl core::error::Error for GenericXrpcError {}
1074
1075/// XRPC-specific errors returned from endpoints
1076///
1077/// Represents errors returned in the response body
1078/// Type parameter `E` is the endpoint's specific error enum type.
1079#[derive(Debug, thiserror::Error)]
1080#[cfg_attr(feature = "std", derive(miette::Diagnostic))]
1081#[non_exhaustive]
1082pub enum XrpcError<E: core::error::Error + IntoStatic> {
1083    /// Typed XRPC error from the endpoint's specific error enum
1084    #[error("XRPC error: {0}")]
1085    #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::typed)))]
1086    Xrpc(E),
1087
1088    /// Authentication error (ExpiredToken, InvalidToken, etc.)
1089    #[error("Authentication error: {0}")]
1090    #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::auth)))]
1091    Auth(#[from] AuthError),
1092
1093    /// Generic XRPC error not in the endpoint's error enum (e.g., InvalidRequest)
1094    #[error("XRPC error: {0}")]
1095    #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::generic)))]
1096    Generic(GenericXrpcError),
1097
1098    /// Failed to decode the response body
1099    #[error("Failed to decode response: {0}")]
1100    #[cfg_attr(feature = "std", diagnostic(code(jacquard_common::xrpc::decode)))]
1101    Decode(#[from] DecodeError),
1102}
1103
1104impl<E> IntoStatic for XrpcError<E>
1105where
1106    E: core::error::Error + IntoStatic,
1107    E::Output: core::error::Error + IntoStatic,
1108    <E as IntoStatic>::Output: core::error::Error + IntoStatic,
1109{
1110    type Output = XrpcError<E::Output>;
1111    fn into_static(self) -> Self::Output {
1112        match self {
1113            XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
1114            XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
1115            XrpcError::Generic(e) => XrpcError::Generic(e),
1116            XrpcError::Decode(e) => XrpcError::Decode(e),
1117        }
1118    }
1119}
1120
1121impl<E> Serialize for XrpcError<E>
1122where
1123    E: core::error::Error + IntoStatic + Serialize,
1124{
1125    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1126    where
1127        S: serde::Serializer,
1128    {
1129        use serde::ser::SerializeStruct;
1130
1131        match self {
1132            // Typed errors already serialize to correct atproto format
1133            XrpcError::Xrpc(e) => e.serialize(serializer),
1134            // Generic errors already have correct format
1135            XrpcError::Generic(g) => g.serialize(serializer),
1136            // Auth and Decode need manual mapping to {"error": "...", "message": ...}
1137            XrpcError::Auth(auth) => {
1138                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1139                let (error, message) = match auth {
1140                    AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")),
1141                    AuthError::InvalidToken => {
1142                        ("InvalidToken", Some("Access token is invalid or malformed"))
1143                    }
1144                    AuthError::RefreshFailed => {
1145                        ("RefreshFailed", Some("Token refresh request failed"))
1146                    }
1147                    AuthError::NotAuthenticated => (
1148                        "AuthenticationRequired",
1149                        Some("Request requires authentication but none was provided"),
1150                    ),
1151                    AuthError::DpopProofFailed => {
1152                        ("DpopProofFailed", Some("DPoP proof construction failed"))
1153                    }
1154                    AuthError::DpopNonceFailed => {
1155                        ("DpopNonceFailed", Some("DPoP nonce negotiation failed"))
1156                    }
1157                    AuthError::Other(hv) => {
1158                        let msg = hv.to_str().unwrap_or("[non-utf8 header]");
1159                        ("AuthenticationError", Some(msg))
1160                    }
1161                };
1162                state.serialize_field("error", error)?;
1163                if let Some(msg) = message {
1164                    state.serialize_field("message", msg)?;
1165                }
1166                state.end()
1167            }
1168            XrpcError::Decode(decode_err) => {
1169                let mut state = serializer.serialize_struct("XrpcError", 2)?;
1170                state.serialize_field("error", "ResponseDecodeError")?;
1171                // Convert DecodeError to string for message field
1172                let msg = format!("{:?}", decode_err);
1173                state.serialize_field("message", &msg)?;
1174                state.end()
1175            }
1176        }
1177    }
1178}
1179
1180#[cfg(feature = "streaming")]
1181impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
1182    /// Send an XRPC call and stream the binary response.
1183    ///
1184    /// Useful for downloading blobs and entire repository archives
1185    pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
1186    where
1187        R: XrpcRequest,
1188        <R as XrpcRequest>::Response: Send + Sync,
1189    {
1190        let http_request =
1191            build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
1192
1193        let http_response = self
1194            .client
1195            .send_http_streaming(http_request)
1196            .await
1197            .map_err(StreamError::transport)?;
1198        let (parts, body) = http_response.into_parts();
1199
1200        Ok(StreamingResponse::new(parts, body))
1201    }
1202
1203    /// Stream an XRPC procedure call and its response
1204    ///
1205    /// Useful for streaming upload of large payloads, or for "pipe-through" operations
1206    /// where you are processing a large payload.
1207    pub async fn stream<S>(
1208        self,
1209        stream: XrpcProcedureSend<S::Frame<'static>>,
1210    ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1211    where
1212        S: XrpcProcedureStream + 'static,
1213        <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1214    {
1215        use futures::TryStreamExt;
1216
1217        let uri = xrpc_endpoint_uri(&self.base, <S::Request as XrpcRequest>::NSID, None).map_err(
1218            |e| StreamError::protocol(format!("Failed to construct endpoint URI: {}", e)),
1219        )?;
1220
1221        let mut builder = http::Request::post(uri.as_str());
1222
1223        if let Some(token) = &self.opts.auth {
1224            let hv = match token {
1225                AuthorizationToken::Bearer(t) => {
1226                    HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1227                }
1228                AuthorizationToken::Dpop(t) => {
1229                    HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1230                }
1231            }
1232            .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1233            builder = builder.header(Header::Authorization, hv);
1234        }
1235
1236        if let Some(proxy) = &self.opts.atproto_proxy {
1237            builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1238        }
1239        if let Some(labelers) = &self.opts.atproto_accept_labelers {
1240            if !labelers.is_empty() {
1241                let joined = labelers
1242                    .iter()
1243                    .map(|s| s.as_ref())
1244                    .collect::<Vec<_>>()
1245                    .join(", ");
1246                builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1247            }
1248        }
1249
1250        for (name, value) in &self.opts.extra_headers {
1251            builder = builder.header(name, value);
1252        }
1253
1254        let (parts, _) = builder
1255            .body(())
1256            .map_err(|e| StreamError::protocol(e.to_string()))?
1257            .into_parts();
1258
1259        let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer));
1260
1261        let resp = self
1262            .client
1263            .send_http_bidirectional(parts, body_stream)
1264            .await
1265            .map_err(StreamError::transport)?;
1266
1267        let (parts, body) = resp.into_parts();
1268
1269        Ok(XrpcResponseStream::<
1270            <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1271        >::from_typed_parts(parts, body))
1272    }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use super::*;
1278    use serde::{Deserialize, Serialize};
1279
1280    #[derive(Serialize, Deserialize)]
1281    #[allow(dead_code)]
1282    struct DummyReq;
1283
1284    #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1285    #[error("{0}")]
1286    struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1287
1288    impl IntoStatic for DummyErr<'_> {
1289        type Output = DummyErr<'static>;
1290        fn into_static(self) -> Self::Output {
1291            DummyErr(self.0.into_static())
1292        }
1293    }
1294
1295    struct DummyResp;
1296
1297    impl XrpcResp for DummyResp {
1298        const NSID: &'static str = "test.dummy";
1299        const ENCODING: &'static str = "application/json";
1300        type Output<'de> = ();
1301        type Err<'de> = DummyErr<'de>;
1302    }
1303
1304    impl XrpcRequest for DummyReq {
1305        const NSID: &'static str = "test.dummy";
1306        const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1307        type Response = DummyResp;
1308    }
1309
1310    #[test]
1311    fn generic_error_carries_context() {
1312        let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1313        let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1314        let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1315        match resp.parse().unwrap_err() {
1316            XrpcError::Generic(g) => {
1317                assert_eq!(g.error.as_str(), "InvalidRequest");
1318                assert_eq!(g.message.as_deref(), Some("missing"));
1319                assert_eq!(g.nsid, DummyResp::NSID);
1320                assert_eq!(g.method, ""); // method info only on request
1321                assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1322            }
1323            other => panic!("unexpected: {other:?}"),
1324        }
1325    }
1326
1327    #[test]
1328    fn auth_error_mapping() {
1329        for (code, expect) in [
1330            ("ExpiredToken", AuthError::TokenExpired),
1331            ("InvalidToken", AuthError::InvalidToken),
1332        ] {
1333            let body = serde_json::json!({"error": code});
1334            let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1335            let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1336            match resp.parse().unwrap_err() {
1337                XrpcError::Auth(e) => match (e, expect) {
1338                    (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1339                    (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1340                    other => panic!("mismatch: {other:?}"),
1341                },
1342                other => panic!("unexpected: {other:?}"),
1343            }
1344        }
1345    }
1346
1347    #[test]
1348    fn xrpc_uri_construction_basic() {
1349        use crate::alloc::string::ToString;
1350        #[derive(Serialize, Deserialize)]
1351        struct Req;
1352        #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1353        #[error("{0}")]
1354        struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1355        impl IntoStatic for Err<'_> {
1356            type Output = Err<'static>;
1357            fn into_static(self) -> Self::Output {
1358                Err(self.0.into_static())
1359            }
1360        }
1361        struct Resp;
1362        impl XrpcResp for Resp {
1363            const NSID: &'static str = "com.example.test";
1364            const ENCODING: &'static str = "application/json";
1365            type Output<'de> = ();
1366            type Err<'de> = Err<'de>;
1367        }
1368        impl XrpcRequest for Req {
1369            const NSID: &'static str = "com.example.test";
1370            const METHOD: XrpcMethod = XrpcMethod::Query;
1371            type Response = Resp;
1372        }
1373
1374        let opts = CallOptions::default();
1375
1376        // AC1.1: Base URI without trailing slash + NSID produces correct `/xrpc/{nsid}` path
1377        let base1 = Uri::parse("https://pds.example.com")
1378            .expect("URI should be valid")
1379            .to_owned();
1380        let req1 = build_http_request(&base1, &Req, &opts).unwrap();
1381        let uri1 = req1.uri().to_string();
1382        assert!(
1383            uri1.contains("/xrpc/com.example.test"),
1384            "AC1.1: URI {} should contain '/xrpc/com.example.test'",
1385            uri1
1386        );
1387        assert_eq!(
1388            uri1, "https://pds.example.com/xrpc/com.example.test",
1389            "AC1.1: URI should be exact match"
1390        );
1391
1392        // AC1.2: Base URI with sub-path preserves it: `/base/xrpc/{nsid}`
1393        let base2 = Uri::parse("https://pds.example.com/base")
1394            .expect("URI should be valid")
1395            .to_owned();
1396        let req2 = build_http_request(&base2, &Req, &opts).unwrap();
1397        let uri2 = req2.uri().to_string();
1398        assert!(
1399            uri2.contains("/base/xrpc/com.example.test"),
1400            "AC1.2: URI {} should contain '/base/xrpc/com.example.test'",
1401            uri2
1402        );
1403        assert_eq!(
1404            uri2, "https://pds.example.com/base/xrpc/com.example.test",
1405            "AC1.2: URI should preserve sub-path"
1406        );
1407
1408        // AC1.5: Base URI with trailing slash is normalized (slash stripped) before construction
1409        let base_with_slash = Uri::parse("https://pds.example.com/")
1410            .expect("URI should be valid")
1411            .to_owned();
1412        let req_slash = build_http_request(&base_with_slash, &Req, &opts).unwrap();
1413        let uri_slash = req_slash.uri().to_string();
1414        assert!(
1415            !uri_slash.contains("//xrpc"),
1416            "AC1.5: URI {} should not contain '//xrpc'",
1417            uri_slash
1418        );
1419        assert_eq!(
1420            uri_slash, "https://pds.example.com/xrpc/com.example.test",
1421            "AC1.5: URI should handle trailing slash"
1422        );
1423    }
1424
1425    #[test]
1426    fn xrpc_uri_query_parameters() {
1427        use crate::alloc::string::ToString;
1428        use serde::Serialize;
1429
1430        #[derive(Serialize)]
1431        struct QueryReq {
1432            #[serde(skip_serializing_if = "Option::is_none")]
1433            param1: Option<String>,
1434            #[serde(skip_serializing_if = "Option::is_none")]
1435            param2: Option<String>,
1436        }
1437
1438        #[derive(Serialize, Deserialize, Debug, thiserror::Error)]
1439        #[error("test error")]
1440        struct Err;
1441        impl IntoStatic for Err {
1442            type Output = Err;
1443            fn into_static(self) -> Self::Output {
1444                self
1445            }
1446        }
1447
1448        struct Resp;
1449        impl XrpcResp for Resp {
1450            const NSID: &'static str = "com.example.test";
1451            const ENCODING: &'static str = "application/json";
1452            type Output<'de> = ();
1453            type Err<'de> = Err;
1454        }
1455        impl XrpcRequest for QueryReq {
1456            const NSID: &'static str = "com.example.test";
1457            const METHOD: XrpcMethod = XrpcMethod::Query;
1458            type Response = Resp;
1459        }
1460
1461        let opts = CallOptions::default();
1462        let base = Uri::parse("https://pds.example.com")
1463            .expect("URI should be valid")
1464            .to_owned();
1465
1466        // AC1.3: Query parameters from serde serialisation are set correctly
1467        let req_with_params = QueryReq {
1468            param1: Some("value1".to_string()),
1469            param2: Some("value2".to_string()),
1470        };
1471        let http_req = build_http_request(&base, &req_with_params, &opts).unwrap();
1472        let uri_str = http_req.uri().to_string();
1473        assert!(
1474            uri_str.contains("?"),
1475            "AC1.3: URI should contain query string"
1476        );
1477        assert!(
1478            uri_str.contains("param1=value1"),
1479            "AC1.3: URI should contain param1"
1480        );
1481        assert!(
1482            uri_str.contains("param2=value2"),
1483            "AC1.3: URI should contain param2"
1484        );
1485
1486        // AC1.4: Empty/default query parameters result in no `?` in the constructed URI
1487        let req_empty_params = QueryReq {
1488            param1: None,
1489            param2: None,
1490        };
1491        let http_req_empty = build_http_request(&base, &req_empty_params, &opts).unwrap();
1492        let uri_str_empty = http_req_empty.uri().to_string();
1493        assert!(
1494            !uri_str_empty.contains("?"),
1495            "AC1.4: URI {} should not contain '?' with empty params",
1496            uri_str_empty
1497        );
1498        assert_eq!(
1499            uri_str_empty, "https://pds.example.com/xrpc/com.example.test",
1500            "AC1.4: URI should have no query string"
1501        );
1502    }
1503
1504    #[test]
1505    fn xrpc_uri_special_characters_in_query() {
1506        use crate::alloc::string::ToString;
1507        use serde::Serialize;
1508
1509        #[derive(Serialize)]
1510        struct QueryReq {
1511            #[serde(skip_serializing_if = "Option::is_none")]
1512            search: Option<String>,
1513            #[serde(skip_serializing_if = "Option::is_none")]
1514            filter: Option<String>,
1515            #[serde(skip_serializing_if = "Option::is_none")]
1516            unicode_param: Option<String>,
1517        }
1518
1519        #[derive(Serialize, Deserialize, Debug, thiserror::Error)]
1520        #[error("test error")]
1521        struct Err;
1522        impl IntoStatic for Err {
1523            type Output = Err;
1524            fn into_static(self) -> Self::Output {
1525                self
1526            }
1527        }
1528
1529        struct Resp;
1530        impl XrpcResp for Resp {
1531            const NSID: &'static str = "com.example.test";
1532            const ENCODING: &'static str = "application/json";
1533            type Output<'de> = ();
1534            type Err<'de> = Err;
1535        }
1536        impl XrpcRequest for QueryReq {
1537            const NSID: &'static str = "com.example.test";
1538            const METHOD: XrpcMethod = XrpcMethod::Query;
1539            type Response = Resp;
1540        }
1541
1542        let opts = CallOptions::default();
1543        let base = Uri::parse("https://pds.example.com")
1544            .expect("URI should be valid")
1545            .to_owned();
1546
1547        // AC1.3: Test with spaces (serde_html_form uses + for spaces per application/x-www-form-urlencoded)
1548        let req_spaces = QueryReq {
1549            search: Some("hello world".to_string()),
1550            filter: None,
1551            unicode_param: None,
1552        };
1553        let http_req_spaces = build_http_request(&base, &req_spaces, &opts).unwrap();
1554        let uri_spaces = http_req_spaces.uri().to_string();
1555        assert!(
1556            uri_spaces.contains("search=hello"),
1557            "AC1.3: URI should contain search param"
1558        );
1559        // serde_html_form encodes spaces as +
1560        assert!(
1561            uri_spaces.contains("hello+world") || uri_spaces.contains("hello%20world"),
1562            "AC1.3: URI {} should encode space in 'hello world'",
1563            uri_spaces
1564        );
1565
1566        // AC1.3: Test with special characters: &, =, +
1567        let req_special = QueryReq {
1568            search: Some("a=b&c+d".to_string()),
1569            filter: None,
1570            unicode_param: None,
1571        };
1572        let http_req_special = build_http_request(&base, &req_special, &opts).unwrap();
1573        let uri_special = http_req_special.uri().to_string();
1574        assert!(
1575            uri_special.contains("?"),
1576            "AC1.3: URI should contain query string for special chars"
1577        );
1578        // Verify the URI can be parsed successfully (fluent-uri handles encoded values)
1579        let parsed = Uri::parse(uri_special.clone());
1580        assert!(
1581            parsed.is_ok(),
1582            "AC1.3: URI {} should be parseable by fluent-uri",
1583            uri_special
1584        );
1585
1586        // AC1.3: Test with unicode characters
1587        let req_unicode = QueryReq {
1588            search: None,
1589            filter: None,
1590            unicode_param: Some("你好世界".to_string()),
1591        };
1592        let http_req_unicode = build_http_request(&base, &req_unicode, &opts).unwrap();
1593        let uri_unicode = http_req_unicode.uri().to_string();
1594        assert!(
1595            uri_unicode.contains("?"),
1596            "AC1.3: URI should contain query string for unicode"
1597        );
1598        // Verify the URI can be parsed successfully
1599        let parsed_unicode = Uri::parse(uri_unicode.clone());
1600        assert!(
1601            parsed_unicode.is_ok(),
1602            "AC1.3: URI {} should be parseable for unicode params",
1603            uri_unicode
1604        );
1605    }
1606
1607    #[test]
1608    fn no_double_slash_in_path() {
1609        use crate::alloc::string::ToString;
1610        #[derive(Serialize, Deserialize)]
1611        struct Req;
1612        #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1613        #[error("{0}")]
1614        struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1615        impl IntoStatic for Err<'_> {
1616            type Output = Err<'static>;
1617            fn into_static(self) -> Self::Output {
1618                Err(self.0.into_static())
1619            }
1620        }
1621        struct Resp;
1622        impl XrpcResp for Resp {
1623            const NSID: &'static str = "com.example.test";
1624            const ENCODING: &'static str = "application/json";
1625            type Output<'de> = ();
1626            type Err<'de> = Err<'de>;
1627        }
1628        impl XrpcRequest for Req {
1629            const NSID: &'static str = "com.example.test";
1630            const METHOD: XrpcMethod = XrpcMethod::Query;
1631            type Response = Resp;
1632        }
1633
1634        let opts = CallOptions::default();
1635
1636        // Ensure no double slashes in path
1637        let base1 = Uri::parse("https://pds")
1638            .expect("URI should be valid")
1639            .to_owned();
1640        let req1 = build_http_request(&base1, &Req, &opts).unwrap();
1641        let uri1 = req1.uri().to_string();
1642        assert!(
1643            !uri1.contains("//xrpc"),
1644            "URI {} should not contain '//xrpc'",
1645            uri1
1646        );
1647
1648        let base2 = Uri::parse("https://pds/base")
1649            .expect("URI should be valid")
1650            .to_owned();
1651        let req2 = build_http_request(&base2, &Req, &opts).unwrap();
1652        let uri2 = req2.uri().to_string();
1653        assert!(
1654            !uri2.contains("//xrpc"),
1655            "URI {} should not contain '//xrpc'",
1656            uri2
1657        );
1658    }
1659}