1#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19 StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::error::DecodeError;
28use crate::http_client::HttpClient;
29#[cfg(feature = "streaming")]
30use crate::http_client::HttpClientExt;
31use crate::types::value::Data;
32use crate::{AuthorizationToken, error::AuthError};
33use crate::{CowStr, error::XrpcResult};
34use crate::{IntoStatic, types::value::RawData};
35use bytes::Bytes;
36use http::{
37 HeaderName, HeaderValue, Request, StatusCode,
38 header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46 BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47 SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48 SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55 #[error("Failed to serialize query: {0}")]
57 Query(
58 #[from]
59 #[source]
60 serde_html_form::ser::Error,
61 ),
62 #[error("Failed to serialize JSON: {0}")]
64 Json(
65 #[from]
66 #[source]
67 serde_json::Error,
68 ),
69 #[error("Encoding error: {0}")]
71 Other(String),
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77 Query,
79 Procedure(&'static str),
81}
82
83impl XrpcMethod {
84 pub const fn as_str(&self) -> &'static str {
86 match self {
87 Self::Query => "GET",
88 Self::Procedure(_) => "POST",
89 }
90 }
91
92 pub const fn body_encoding(&self) -> Option<&'static str> {
94 match self {
95 Self::Query => None,
96 Self::Procedure(enc) => Some(enc),
97 }
98 }
99}
100
101pub trait XrpcRequest: Serialize {
108 const NSID: &'static str;
110
111 const METHOD: XrpcMethod;
113
114 type Response: XrpcResp;
116
117 fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121 Ok(serde_json::to_vec(self)?)
122 }
123
124 fn decode_body<'de>(body: &'de [u8]) -> XrpcResult<Box<Self>>
128 where
129 Self: Deserialize<'de>,
130 {
131 let body: Self = serde_json::from_slice(body)
132 .map_err(|e| crate::error::ClientError::decode(format!("{:?}", e)))?;
133
134 Ok(Box::new(body))
135 }
136}
137
138pub trait XrpcResp {
142 const NSID: &'static str;
144
145 const ENCODING: &'static str;
147
148 type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
150
151 type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic;
153
154 fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
156 Ok(serde_json::to_vec(output)?)
157 }
158
159 fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError>
163 where
164 Self::Output<'de>: Deserialize<'de>,
165 {
166 #[allow(deprecated)]
167 let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
168
169 Ok(body)
170 }
171}
172
173pub trait XrpcEndpoint {
181 const PATH: &'static str;
183 const METHOD: XrpcMethod;
185 type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
187 type Response: XrpcResp;
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
193pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
194
195impl<'de> fmt::Display for GenericError<'de> {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 self.0.fmt(f)
198 }
199}
200
201impl Error for GenericError<'_> {}
202
203impl IntoStatic for GenericError<'_> {
204 type Output = GenericError<'static>;
205 fn into_static(self) -> Self::Output {
206 GenericError(self.0.into_static())
207 }
208}
209
210#[derive(Debug, Default, Clone)]
212pub struct CallOptions<'a> {
213 pub auth: Option<AuthorizationToken<'a>>,
215 pub atproto_proxy: Option<CowStr<'a>>,
217 pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
219 pub extra_headers: Vec<(HeaderName, HeaderValue)>,
221}
222
223impl IntoStatic for CallOptions<'_> {
224 type Output = CallOptions<'static>;
225
226 fn into_static(self) -> Self::Output {
227 CallOptions {
228 auth: self.auth.map(|auth| auth.into_static()),
229 atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
230 atproto_accept_labelers: self
231 .atproto_accept_labelers
232 .map(|labelers| labelers.into_static()),
233 extra_headers: self.extra_headers,
234 }
235 }
236}
237
238pub trait XrpcExt: HttpClient {
254 fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
256 where
257 Self: Sized,
258 {
259 XrpcCall {
260 client: self,
261 base,
262 opts: CallOptions::default(),
263 }
264 }
265}
266
267impl<T: HttpClient> XrpcExt for T {}
268
269pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
271
272#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
274pub trait XrpcClient: HttpClient {
275 fn base_uri(&self) -> impl Future<Output = Url>;
277
278 fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
280 async { CallOptions::default() }
281 }
282
283 #[cfg(not(target_arch = "wasm32"))]
285 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
286 where
287 R: XrpcRequest + Send + Sync,
288 <R as XrpcRequest>::Response: Send + Sync,
289 Self: Sync;
290
291 #[cfg(target_arch = "wasm32")]
293 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
294 where
295 R: XrpcRequest + Send + Sync,
296 <R as XrpcRequest>::Response: Send + Sync;
297
298 #[cfg(not(target_arch = "wasm32"))]
300 fn send_with_opts<R>(
301 &self,
302 request: R,
303 opts: CallOptions<'_>,
304 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
305 where
306 R: XrpcRequest + Send + Sync,
307 <R as XrpcRequest>::Response: Send + Sync,
308 Self: Sync;
309
310 #[cfg(target_arch = "wasm32")]
312 fn send_with_opts<R>(
313 &self,
314 request: R,
315 opts: CallOptions<'_>,
316 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
317 where
318 R: XrpcRequest + Send + Sync,
319 <R as XrpcRequest>::Response: Send + Sync;
320}
321
322#[cfg(feature = "streaming")]
324pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
325 #[cfg(not(target_arch = "wasm32"))]
327 fn download<R>(
328 &self,
329 request: R,
330 ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
331 where
332 R: XrpcRequest + Send + Sync,
333 <R as XrpcRequest>::Response: Send + Sync,
334 Self: Sync;
335
336 #[cfg(target_arch = "wasm32")]
338 fn download<R>(
339 &self,
340 request: R,
341 ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
342 where
343 R: XrpcRequest + Send + Sync,
344 <R as XrpcRequest>::Response: Send + Sync;
345
346 #[cfg(not(target_arch = "wasm32"))]
348 fn stream<S>(
349 &self,
350 stream: XrpcProcedureSend<S::Frame<'static>>,
351 ) -> impl Future<
352 Output = Result<
353 XrpcResponseStream<
354 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
355 >,
356 StreamError,
357 >,
358 >
359 where
360 S: XrpcProcedureStream + 'static,
361 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
362 Self: Sync;
363
364 #[cfg(target_arch = "wasm32")]
366 fn stream<S>(
367 &self,
368 stream: XrpcProcedureSend<S::Frame<'static>>,
369 ) -> impl Future<
370 Output = Result<
371 XrpcResponseStream<
372 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
373 >,
374 StreamError,
375 >,
376 >
377 where
378 S: XrpcProcedureStream + 'static,
379 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
380}
381
382pub struct XrpcCall<'a, C: HttpClient> {
403 pub(crate) client: &'a C,
404 pub(crate) base: Url,
405 pub(crate) opts: CallOptions<'a>,
406}
407
408impl<'a, C: HttpClient> XrpcCall<'a, C> {
409 pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
411 self.opts.auth = Some(token);
412 self
413 }
414 pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
416 self.opts.atproto_proxy = Some(proxy);
417 self
418 }
419 pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
421 self.opts.atproto_accept_labelers = Some(labelers);
422 self
423 }
424 pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
426 self.opts.extra_headers.push((name, value));
427 self
428 }
429 pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
431 self.opts = opts;
432 self
433 }
434
435 #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
444 pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
445 where
446 R: XrpcRequest,
447 <R as XrpcRequest>::Response: Send + Sync,
448 {
449 let http_request = build_http_request(&self.base, request, &self.opts)?;
450
451 let http_response = self
452 .client
453 .send_http(http_request)
454 .await
455 .map_err(|e| crate::error::ClientError::transport(e))?;
456
457 process_response(http_response)
458 }
459}
460
461#[inline]
465pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
466where
467 Resp: XrpcResp,
468{
469 let status = http_response.status();
470 #[allow(deprecated)]
473 if status.as_u16() == 401 {
474 if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
475 return Err(crate::error::ClientError::auth(
476 crate::error::AuthError::Other(hv.clone()),
477 ));
478 }
479 }
480 let buffer = Bytes::from(http_response.into_body());
481
482 if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
483 return Err(crate::error::HttpError {
484 status,
485 body: Some(buffer),
486 }
487 .into());
488 }
489
490 Ok(Response::new(buffer, status))
491}
492
493pub enum Header {
495 ContentType,
497 Authorization,
499 AtprotoProxy,
503 AtprotoAcceptLabelers,
505}
506
507impl From<Header> for HeaderName {
508 fn from(value: Header) -> Self {
509 match value {
510 Header::ContentType => CONTENT_TYPE,
511 Header::Authorization => AUTHORIZATION,
512 Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
513 Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
514 }
515 }
516}
517
518pub fn build_http_request<'s, R>(
520 base: &Url,
521 req: &R,
522 opts: &CallOptions<'_>,
523) -> XrpcResult<Request<Vec<u8>>>
524where
525 R: XrpcRequest,
526{
527 use crate::error::ClientError;
528
529 let mut url = base.clone();
530 let mut path = url.path().trim_end_matches('/').to_owned();
531 path.push_str("/xrpc/");
532 path.push_str(<R as XrpcRequest>::NSID);
533 url.set_path(&path);
534
535 if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
536 let qs = serde_html_form::to_string(&req).map_err(|e| {
537 ClientError::invalid_request(format!("Failed to serialize query: {}", e))
538 })?;
539 if !qs.is_empty() {
540 url.set_query(Some(&qs));
541 } else {
542 url.set_query(None);
543 }
544 }
545
546 let method = match <R as XrpcRequest>::METHOD {
547 XrpcMethod::Query => http::Method::GET,
548 XrpcMethod::Procedure(_) => http::Method::POST,
549 };
550
551 let mut builder = Request::builder().method(method).uri(url.as_str());
552
553 if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
554 builder = builder.header(Header::ContentType, encoding);
555 }
556 let output_encoding = <R::Response as XrpcResp>::ENCODING;
557 builder = builder.header(http::header::ACCEPT, output_encoding);
558
559 if let Some(token) = &opts.auth {
560 let hv = match token {
561 AuthorizationToken::Bearer(t) => {
562 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
563 }
564 AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
565 }
566 .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?;
567 builder = builder.header(Header::Authorization, hv);
568 }
569
570 if let Some(proxy) = &opts.atproto_proxy {
571 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
572 }
573 if let Some(labelers) = &opts.atproto_accept_labelers {
574 if !labelers.is_empty() {
575 let joined = labelers
576 .iter()
577 .map(|s| s.as_ref())
578 .collect::<Vec<_>>()
579 .join(", ");
580 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
581 }
582 }
583 for (name, value) in &opts.extra_headers {
584 builder = builder.header(name, value);
585 }
586
587 let body = if let XrpcMethod::Procedure(_) = R::METHOD {
588 req.encode_body()
589 .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))?
590 } else {
591 vec![]
592 };
593
594 builder
595 .body(body)
596 .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e)))
597}
598
599pub struct Response<Resp>
604where
605 Resp: XrpcResp, {
607 _marker: PhantomData<fn() -> Resp>,
608 buffer: Bytes,
609 status: StatusCode,
610}
611
612impl<R> Response<R>
613where
614 R: XrpcResp,
615{
616 pub fn new(buffer: Bytes, status: StatusCode) -> Self {
618 Self {
619 buffer,
620 status,
621 _marker: PhantomData,
622 }
623 }
624
625 pub fn status(&self) -> StatusCode {
627 self.status
628 }
629
630 pub fn buffer(&self) -> &Bytes {
632 &self.buffer
633 }
634
635 pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
637 if self.status.is_success() {
639 match R::decode_output(&self.buffer) {
640 Ok(output) => Ok(output),
641 Err(e) => Err(XrpcError::Decode(e)),
642 }
643 } else if self.status.as_u16() == 400 {
645 match serde_json::from_slice::<_>(&self.buffer) {
646 Ok(error) => Err(XrpcError::Xrpc(error)),
647 Err(_) => {
648 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
650 Ok(mut generic) => {
651 generic.nsid = R::NSID;
652 generic.method = ""; generic.http_status = self.status;
654 match generic.error.as_str() {
656 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
657 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
658 _ => Err(XrpcError::Generic(generic)),
659 }
660 }
661 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
662 }
663 }
664 }
665 } else {
667 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
668 Ok(mut generic) => {
669 generic.nsid = R::NSID;
670 generic.method = ""; generic.http_status = self.status;
672 match generic.error.as_str() {
673 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
674 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
675 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
676 }
677 }
678 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
679 }
680 }
681 }
682
683 pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
687 if self.status.is_success() {
689 match serde_json::from_slice::<_>(&self.buffer) {
690 Ok(output) => Ok(output),
691 Err(_) => {
692 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
693 if let Ok(data) = Data::from_cbor(&data) {
694 Ok(data.into_static())
695 } else {
696 Ok(Data::Bytes(self.buffer.clone()))
697 }
698 } else {
699 Ok(Data::Bytes(self.buffer.clone()))
700 }
701 }
702 }
703 } else if self.status.as_u16() == 400 {
705 match serde_json::from_slice::<_>(&self.buffer) {
706 Ok(error) => Err(XrpcError::Xrpc(error)),
707 Err(_) => {
708 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
710 Ok(mut generic) => {
711 generic.nsid = R::NSID;
712 generic.method = ""; generic.http_status = self.status;
714 match generic.error.as_str() {
716 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
717 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
718 _ => Err(XrpcError::Generic(generic)),
719 }
720 }
721 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
722 }
723 }
724 }
725 } else {
727 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
728 Ok(mut generic) => {
729 generic.nsid = R::NSID;
730 generic.method = ""; generic.http_status = self.status;
732 match generic.error.as_str() {
733 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
734 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
735 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
736 }
737 }
738 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
739 }
740 }
741 }
742
743 pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
747 if self.status.is_success() {
749 match serde_json::from_slice::<_>(&self.buffer) {
750 Ok(output) => Ok(output),
751 Err(_) => {
752 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
753 if let Ok(data) = RawData::from_cbor(&data) {
754 Ok(data.into_static())
755 } else {
756 Ok(RawData::Bytes(self.buffer.clone()))
757 }
758 } else {
759 Ok(RawData::Bytes(self.buffer.clone()))
760 }
761 }
762 }
763 } else if self.status.as_u16() == 400 {
765 match serde_json::from_slice::<_>(&self.buffer) {
766 Ok(error) => Err(XrpcError::Xrpc(error)),
767 Err(_) => {
768 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
770 Ok(mut generic) => {
771 generic.nsid = R::NSID;
772 generic.method = ""; generic.http_status = self.status;
774 match generic.error.as_str() {
776 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
777 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
778 _ => Err(XrpcError::Generic(generic)),
779 }
780 }
781 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
782 }
783 }
784 }
785 } else {
787 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
788 Ok(mut generic) => {
789 generic.nsid = R::NSID;
790 generic.method = ""; generic.http_status = self.status;
792 match generic.error.as_str() {
793 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
794 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
795 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
796 }
797 }
798 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
799 }
800 }
801 }
802
803 pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
815 Response {
816 buffer: self.buffer,
817 status: self.status,
818 _marker: PhantomData,
819 }
820 }
821}
822
823pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
825pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
827
828impl<R> Response<R>
829where
830 R: XrpcResp,
831{
832 pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
834 where
835 for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
836 for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
837 {
838 fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
839 serde_json::from_slice(buffer)
840 }
841
842 if self.status.is_success() {
844 match R::decode_output(&self.buffer) {
845 Ok(output) => Ok(output.into_static()),
846 Err(e) => Err(XrpcError::Decode(e)),
847 }
848 } else if self.status.as_u16() == 400 {
850 let error = match parse_error::<R>(&self.buffer) {
851 Ok(error) => XrpcError::Xrpc(error),
852 Err(_) => {
853 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
855 Ok(mut generic) => {
856 generic.nsid = R::NSID;
857 generic.method = ""; generic.http_status = self.status;
859 match generic.error.as_ref() {
861 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
862 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
863 _ => XrpcError::Generic(generic),
864 }
865 }
866 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
867 }
868 }
869 };
870 Err(error.into_static())
871 } else {
873 let error: XrpcError<<R as XrpcResp>::Err<'_>> =
874 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
875 Ok(mut generic) => {
876 let status = self.status;
877 generic.nsid = R::NSID;
878 generic.method = ""; generic.http_status = status;
880 match generic.error.as_ref() {
881 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
882 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
883 _ => XrpcError::Auth(AuthError::NotAuthenticated),
884 }
885 }
886 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
887 };
888
889 Err(error.into_static())
890 }
891 }
892}
893
894#[derive(Debug, Clone, Deserialize, Serialize)]
898pub struct GenericXrpcError {
899 pub error: SmolStr,
901 pub message: Option<SmolStr>,
903 #[serde(skip)]
905 pub nsid: &'static str,
906 #[serde(skip)]
908 pub method: &'static str,
909 #[serde(skip)]
911 pub http_status: StatusCode,
912}
913
914impl std::fmt::Display for GenericXrpcError {
915 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
916 if let Some(msg) = &self.message {
917 write!(
918 f,
919 "{}: {} (nsid={}, method={}, status={})",
920 self.error, msg, self.nsid, self.method, self.http_status
921 )
922 } else {
923 write!(
924 f,
925 "{} (nsid={}, method={}, status={})",
926 self.error, self.nsid, self.method, self.http_status
927 )
928 }
929 }
930}
931
932impl IntoStatic for GenericXrpcError {
933 type Output = Self;
934
935 fn into_static(self) -> Self::Output {
936 self
937 }
938}
939
940impl std::error::Error for GenericXrpcError {}
941
942#[derive(Debug, thiserror::Error, miette::Diagnostic)]
947pub enum XrpcError<E: std::error::Error + IntoStatic> {
948 #[error("XRPC error: {0}")]
950 #[diagnostic(code(jacquard_common::xrpc::typed))]
951 Xrpc(E),
952
953 #[error("Authentication error: {0}")]
955 #[diagnostic(code(jacquard_common::xrpc::auth))]
956 Auth(#[from] AuthError),
957
958 #[error("XRPC error: {0}")]
960 #[diagnostic(code(jacquard_common::xrpc::generic))]
961 Generic(GenericXrpcError),
962
963 #[error("Failed to decode response: {0}")]
965 #[diagnostic(code(jacquard_common::xrpc::decode))]
966 Decode(#[from] DecodeError),
967}
968
969impl<E> IntoStatic for XrpcError<E>
970where
971 E: std::error::Error + IntoStatic,
972 E::Output: std::error::Error + IntoStatic,
973 <E as IntoStatic>::Output: std::error::Error + IntoStatic,
974{
975 type Output = XrpcError<E::Output>;
976 fn into_static(self) -> Self::Output {
977 match self {
978 XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
979 XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
980 XrpcError::Generic(e) => XrpcError::Generic(e),
981 XrpcError::Decode(e) => XrpcError::Decode(e),
982 }
983 }
984}
985
986impl<E> Serialize for XrpcError<E>
987where
988 E: std::error::Error + IntoStatic + Serialize,
989{
990 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
991 where
992 S: serde::Serializer,
993 {
994 use serde::ser::SerializeStruct;
995
996 match self {
997 XrpcError::Xrpc(e) => e.serialize(serializer),
999 XrpcError::Generic(g) => g.serialize(serializer),
1001 XrpcError::Auth(auth) => {
1003 let mut state = serializer.serialize_struct("XrpcError", 2)?;
1004 let (error, message) = match auth {
1005 AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")),
1006 AuthError::InvalidToken => {
1007 ("InvalidToken", Some("Access token is invalid or malformed"))
1008 }
1009 AuthError::RefreshFailed => {
1010 ("RefreshFailed", Some("Token refresh request failed"))
1011 }
1012 AuthError::NotAuthenticated => (
1013 "AuthenticationRequired",
1014 Some("Request requires authentication but none was provided"),
1015 ),
1016 AuthError::Other(hv) => {
1017 let msg = hv.to_str().unwrap_or("[non-utf8 header]");
1018 ("AuthenticationError", Some(msg))
1019 }
1020 };
1021 state.serialize_field("error", error)?;
1022 if let Some(msg) = message {
1023 state.serialize_field("message", msg)?;
1024 }
1025 state.end()
1026 }
1027 XrpcError::Decode(decode_err) => {
1028 let mut state = serializer.serialize_struct("XrpcError", 2)?;
1029 state.serialize_field("error", "ResponseDecodeError")?;
1030 let msg = format!("{:?}", decode_err);
1032 state.serialize_field("message", &msg)?;
1033 state.end()
1034 }
1035 }
1036 }
1037}
1038
1039#[cfg(feature = "streaming")]
1040impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
1041 pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
1045 where
1046 R: XrpcRequest,
1047 <R as XrpcRequest>::Response: Send + Sync,
1048 {
1049 let http_request =
1050 build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
1051
1052 let http_response = self
1053 .client
1054 .send_http_streaming(http_request)
1055 .await
1056 .map_err(StreamError::transport)?;
1057 let (parts, body) = http_response.into_parts();
1058
1059 Ok(StreamingResponse::new(parts, body))
1060 }
1061
1062 pub async fn stream<S>(
1067 self,
1068 stream: XrpcProcedureSend<S::Frame<'static>>,
1069 ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1070 where
1071 S: XrpcProcedureStream + 'static,
1072 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1073 {
1074 use futures::TryStreamExt;
1075
1076 let mut url = self.base;
1077 let mut path = url.path().trim_end_matches('/').to_owned();
1078 path.push_str("/xrpc/");
1079 path.push_str(<S::Request as XrpcRequest>::NSID);
1080 url.set_path(&path);
1081
1082 let mut builder = http::Request::post(url.to_string());
1083
1084 if let Some(token) = &self.opts.auth {
1085 let hv = match token {
1086 AuthorizationToken::Bearer(t) => {
1087 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1088 }
1089 AuthorizationToken::Dpop(t) => {
1090 HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1091 }
1092 }
1093 .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1094 builder = builder.header(Header::Authorization, hv);
1095 }
1096
1097 if let Some(proxy) = &self.opts.atproto_proxy {
1098 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1099 }
1100 if let Some(labelers) = &self.opts.atproto_accept_labelers {
1101 if !labelers.is_empty() {
1102 let joined = labelers
1103 .iter()
1104 .map(|s| s.as_ref())
1105 .collect::<Vec<_>>()
1106 .join(", ");
1107 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1108 }
1109 }
1110 for (name, value) in &self.opts.extra_headers {
1111 builder = builder.header(name, value);
1112 }
1113
1114 let (parts, _) = builder
1115 .body(())
1116 .map_err(|e| StreamError::protocol(e.to_string()))?
1117 .into_parts();
1118
1119 let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer));
1120
1121 let resp = self
1122 .client
1123 .send_http_bidirectional(parts, body_stream)
1124 .await
1125 .map_err(StreamError::transport)?;
1126
1127 let (parts, body) = resp.into_parts();
1128
1129 Ok(XrpcResponseStream::<
1130 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1131 >::from_typed_parts(parts, body))
1132 }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use super::*;
1138 use serde::{Deserialize, Serialize};
1139
1140 #[derive(Serialize, Deserialize)]
1141 #[allow(dead_code)]
1142 struct DummyReq;
1143
1144 #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1145 #[error("{0}")]
1146 struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1147
1148 impl IntoStatic for DummyErr<'_> {
1149 type Output = DummyErr<'static>;
1150 fn into_static(self) -> Self::Output {
1151 DummyErr(self.0.into_static())
1152 }
1153 }
1154
1155 struct DummyResp;
1156
1157 impl XrpcResp for DummyResp {
1158 const NSID: &'static str = "test.dummy";
1159 const ENCODING: &'static str = "application/json";
1160 type Output<'de> = ();
1161 type Err<'de> = DummyErr<'de>;
1162 }
1163
1164 impl XrpcRequest for DummyReq {
1165 const NSID: &'static str = "test.dummy";
1166 const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1167 type Response = DummyResp;
1168 }
1169
1170 #[test]
1171 fn generic_error_carries_context() {
1172 let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1173 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1174 let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1175 match resp.parse().unwrap_err() {
1176 XrpcError::Generic(g) => {
1177 assert_eq!(g.error.as_str(), "InvalidRequest");
1178 assert_eq!(g.message.as_deref(), Some("missing"));
1179 assert_eq!(g.nsid, DummyResp::NSID);
1180 assert_eq!(g.method, ""); assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1182 }
1183 other => panic!("unexpected: {other:?}"),
1184 }
1185 }
1186
1187 #[test]
1188 fn auth_error_mapping() {
1189 for (code, expect) in [
1190 ("ExpiredToken", AuthError::TokenExpired),
1191 ("InvalidToken", AuthError::InvalidToken),
1192 ] {
1193 let body = serde_json::json!({"error": code});
1194 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1195 let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1196 match resp.parse().unwrap_err() {
1197 XrpcError::Auth(e) => match (e, expect) {
1198 (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1199 (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1200 other => panic!("mismatch: {other:?}"),
1201 },
1202 other => panic!("unexpected: {other:?}"),
1203 }
1204 }
1205 }
1206
1207 #[test]
1208 fn no_double_slash_in_path() {
1209 #[derive(Serialize, Deserialize)]
1210 struct Req;
1211 #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1212 #[error("{0}")]
1213 struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1214 impl IntoStatic for Err<'_> {
1215 type Output = Err<'static>;
1216 fn into_static(self) -> Self::Output {
1217 Err(self.0.into_static())
1218 }
1219 }
1220 struct Resp;
1221 impl XrpcResp for Resp {
1222 const NSID: &'static str = "com.example.test";
1223 const ENCODING: &'static str = "application/json";
1224 type Output<'de> = ();
1225 type Err<'de> = Err<'de>;
1226 }
1227 impl XrpcRequest for Req {
1228 const NSID: &'static str = "com.example.test";
1229 const METHOD: XrpcMethod = XrpcMethod::Query;
1230 type Response = Resp;
1231 }
1232
1233 let opts = CallOptions::default();
1234 for base in [
1235 Url::parse("https://pds").unwrap(),
1236 Url::parse("https://pds/").unwrap(),
1237 Url::parse("https://pds/base/").unwrap(),
1238 ] {
1239 let req = build_http_request(&base, &Req, &opts).unwrap();
1240 let uri = req.uri().to_string();
1241 assert!(uri.contains("/xrpc/com.example.test"));
1242 assert!(!uri.contains("//xrpc"));
1243 }
1244 }
1245}