1#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19 StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::error::DecodeError;
28use crate::http_client::HttpClient;
29#[cfg(feature = "streaming")]
30use crate::http_client::HttpClientExt;
31use crate::types::value::Data;
32use crate::{AuthorizationToken, error::AuthError};
33use crate::{CowStr, error::XrpcResult};
34use crate::{IntoStatic, types::value::RawData};
35use bytes::Bytes;
36use http::{
37 HeaderName, HeaderValue, Request, StatusCode,
38 header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46 BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47 SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48 SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55 #[error("Failed to serialize query: {0}")]
57 Query(
58 #[from]
59 #[source]
60 serde_html_form::ser::Error,
61 ),
62 #[error("Failed to serialize JSON: {0}")]
64 Json(
65 #[from]
66 #[source]
67 serde_json::Error,
68 ),
69 #[error("Encoding error: {0}")]
71 Other(String),
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77 Query,
79 Procedure(&'static str),
81}
82
83impl XrpcMethod {
84 pub const fn as_str(&self) -> &'static str {
86 match self {
87 Self::Query => "GET",
88 Self::Procedure(_) => "POST",
89 }
90 }
91
92 pub const fn body_encoding(&self) -> Option<&'static str> {
94 match self {
95 Self::Query => None,
96 Self::Procedure(enc) => Some(enc),
97 }
98 }
99}
100
101pub trait XrpcRequest: Serialize {
108 const NSID: &'static str;
110
111 const METHOD: XrpcMethod;
113
114 type Response: XrpcResp;
116
117 fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121 Ok(serde_json::to_vec(self)?)
122 }
123
124 fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError>
128 where
129 Self: Deserialize<'de>,
130 {
131 let body: Self = serde_json::from_slice(body)?;
132
133 Ok(Box::new(body))
134 }
135}
136
137pub trait XrpcResp {
141 const NSID: &'static str;
143
144 const ENCODING: &'static str;
146
147 type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
149
150 type Err<'de>: Error + Deserialize<'de> + Serialize + IntoStatic;
152
153 fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
155 Ok(serde_json::to_vec(output)?)
156 }
157
158 fn decode_output<'de>(body: &'de [u8]) -> core::result::Result<Self::Output<'de>, DecodeError>
162 where
163 Self::Output<'de>: Deserialize<'de>,
164 {
165 let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
166 Ok(body)
167 }
168}
169
170pub trait XrpcEndpoint {
178 const PATH: &'static str;
180 const METHOD: XrpcMethod;
182 type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
184 type Response: XrpcResp;
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
190pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
191
192impl<'de> fmt::Display for GenericError<'de> {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 self.0.fmt(f)
195 }
196}
197
198impl Error for GenericError<'_> {}
199
200impl IntoStatic for GenericError<'_> {
201 type Output = GenericError<'static>;
202 fn into_static(self) -> Self::Output {
203 GenericError(self.0.into_static())
204 }
205}
206
207#[derive(Debug, Default, Clone)]
209pub struct CallOptions<'a> {
210 pub auth: Option<AuthorizationToken<'a>>,
212 pub atproto_proxy: Option<CowStr<'a>>,
214 pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
216 pub extra_headers: Vec<(HeaderName, HeaderValue)>,
218}
219
220impl IntoStatic for CallOptions<'_> {
221 type Output = CallOptions<'static>;
222
223 fn into_static(self) -> Self::Output {
224 CallOptions {
225 auth: self.auth.map(|auth| auth.into_static()),
226 atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
227 atproto_accept_labelers: self
228 .atproto_accept_labelers
229 .map(|labelers| labelers.into_static()),
230 extra_headers: self.extra_headers,
231 }
232 }
233}
234
235pub trait XrpcExt: HttpClient {
251 fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
253 where
254 Self: Sized,
255 {
256 XrpcCall {
257 client: self,
258 base,
259 opts: CallOptions::default(),
260 }
261 }
262}
263
264impl<T: HttpClient> XrpcExt for T {}
265
266pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
268
269#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
271pub trait XrpcClient: HttpClient {
272 fn base_uri(&self) -> impl Future<Output = CowStr<'static>>;
274
275 fn set_base_uri(&self, url: Url) -> impl Future<Output = ()> {
277 let _ = url;
278 async {}
279 }
280
281 fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
283 async { CallOptions::default() }
284 }
285
286 fn set_opts(&self, opts: CallOptions) -> impl Future<Output = ()> {
288 let _ = opts;
289 async {}
290 }
291
292 #[cfg(not(target_arch = "wasm32"))]
294 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
295 where
296 R: XrpcRequest + Send + Sync,
297 <R as XrpcRequest>::Response: Send + Sync,
298 Self: Sync;
299
300 #[cfg(target_arch = "wasm32")]
302 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
303 where
304 R: XrpcRequest + Send + Sync,
305 <R as XrpcRequest>::Response: Send + Sync;
306
307 #[cfg(not(target_arch = "wasm32"))]
309 fn send_with_opts<R>(
310 &self,
311 request: R,
312 opts: CallOptions<'_>,
313 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
314 where
315 R: XrpcRequest + Send + Sync,
316 <R as XrpcRequest>::Response: Send + Sync,
317 Self: Sync;
318
319 #[cfg(target_arch = "wasm32")]
321 fn send_with_opts<R>(
322 &self,
323 request: R,
324 opts: CallOptions<'_>,
325 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
326 where
327 R: XrpcRequest + Send + Sync,
328 <R as XrpcRequest>::Response: Send + Sync;
329}
330
331#[cfg(feature = "streaming")]
333pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
334 #[cfg(not(target_arch = "wasm32"))]
336 fn download<R>(
337 &self,
338 request: R,
339 ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
340 where
341 R: XrpcRequest + Send + Sync,
342 <R as XrpcRequest>::Response: Send + Sync,
343 Self: Sync;
344
345 #[cfg(target_arch = "wasm32")]
347 fn download<R>(
348 &self,
349 request: R,
350 ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
351 where
352 R: XrpcRequest + Send + Sync,
353 <R as XrpcRequest>::Response: Send + Sync;
354
355 #[cfg(not(target_arch = "wasm32"))]
357 fn stream<S>(
358 &self,
359 stream: XrpcProcedureSend<S::Frame<'static>>,
360 ) -> impl Future<
361 Output = Result<
362 XrpcResponseStream<
363 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
364 >,
365 StreamError,
366 >,
367 >
368 where
369 S: XrpcProcedureStream + 'static,
370 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
371 Self: Sync;
372
373 #[cfg(target_arch = "wasm32")]
375 fn stream<S>(
376 &self,
377 stream: XrpcProcedureSend<S::Frame<'static>>,
378 ) -> impl Future<
379 Output = Result<
380 XrpcResponseStream<
381 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
382 >,
383 StreamError,
384 >,
385 >
386 where
387 S: XrpcProcedureStream + 'static,
388 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
389}
390
391pub struct XrpcCall<'a, C: HttpClient> {
412 pub(crate) client: &'a C,
413 pub(crate) base: Url,
414 pub(crate) opts: CallOptions<'a>,
415}
416
417impl<'a, C: HttpClient> XrpcCall<'a, C> {
418 pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
420 self.opts.auth = Some(token);
421 self
422 }
423 pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
425 self.opts.atproto_proxy = Some(proxy);
426 self
427 }
428 pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
430 self.opts.atproto_accept_labelers = Some(labelers);
431 self
432 }
433 pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
435 self.opts.extra_headers.push((name, value));
436 self
437 }
438 pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
440 self.opts = opts;
441 self
442 }
443
444 #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
453 pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
454 where
455 R: XrpcRequest,
456 <R as XrpcRequest>::Response: Send + Sync,
457 {
458 let http_request = build_http_request(&self.base, request, &self.opts)?;
459
460 let http_response = self
461 .client
462 .send_http(http_request)
463 .await
464 .map_err(|e| crate::error::ClientError::transport(e))?;
465
466 process_response(http_response)
467 }
468}
469
470#[inline]
474pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
475where
476 Resp: XrpcResp,
477{
478 let status = http_response.status();
479 #[allow(deprecated)]
482 if status.as_u16() == 401 {
483 if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
484 return Err(crate::error::ClientError::auth(
485 crate::error::AuthError::Other(hv.clone()),
486 ));
487 }
488 }
489 let buffer = Bytes::from(http_response.into_body());
490
491 if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
492 return Err(crate::error::HttpError {
493 status,
494 body: Some(buffer),
495 }
496 .into());
497 }
498
499 Ok(Response::new(buffer, status))
500}
501
502pub enum Header {
504 ContentType,
506 Authorization,
508 AtprotoProxy,
512 AtprotoAcceptLabelers,
514}
515
516impl From<Header> for HeaderName {
517 fn from(value: Header) -> Self {
518 match value {
519 Header::ContentType => CONTENT_TYPE,
520 Header::Authorization => AUTHORIZATION,
521 Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
522 Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
523 }
524 }
525}
526
527pub fn build_http_request<'s, R>(
529 base: &Url,
530 req: &R,
531 opts: &CallOptions<'_>,
532) -> XrpcResult<Request<Vec<u8>>>
533where
534 R: XrpcRequest,
535{
536 use crate::error::ClientError;
537
538 let mut url = base.clone();
539 let mut path = url.path().trim_end_matches('/').to_owned();
540 path.push_str("/xrpc/");
541 path.push_str(<R as XrpcRequest>::NSID);
542 url.set_path(&path);
543 if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
546 let qs = serde_html_form::to_string(&req).map_err(|e| {
547 ClientError::invalid_request(format!("Failed to serialize query: {}", e))
548 })?;
549 if !qs.is_empty() {
550 url.set_query(Some(&qs));
551 } else {
552 url.set_query(None);
553 }
554 }
555
556 let method = match <R as XrpcRequest>::METHOD {
557 XrpcMethod::Query => http::Method::GET,
558 XrpcMethod::Procedure(_) => http::Method::POST,
559 };
560
561 let mut builder = Request::builder().method(method).uri(url.as_str());
562
563 let has_content_type = opts
564 .extra_headers
565 .iter()
566 .any(|(name, _)| name == CONTENT_TYPE);
567
568 if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
569 if !has_content_type {
571 builder = builder.header(Header::ContentType, encoding);
572 }
573 }
574 let output_encoding = <R::Response as XrpcResp>::ENCODING;
575 builder = builder.header(http::header::ACCEPT, output_encoding);
576
577 if let Some(token) = &opts.auth {
578 let hv = match token {
579 AuthorizationToken::Bearer(t) => {
580 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
581 }
582 AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
583 }
584 .map_err(|e| ClientError::invalid_request(format!("Invalid authorization token: {}", e)))?;
585 builder = builder.header(Header::Authorization, hv);
586 }
587
588 if let Some(proxy) = &opts.atproto_proxy {
589 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
590 }
591 if let Some(labelers) = &opts.atproto_accept_labelers {
592 if !labelers.is_empty() {
593 let joined = labelers
594 .iter()
595 .map(|s| s.as_ref())
596 .collect::<Vec<_>>()
597 .join(", ");
598 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
599 }
600 }
601 for (name, value) in &opts.extra_headers {
602 builder = builder.header(name, value);
603 }
604
605 let body = if let XrpcMethod::Procedure(_) = R::METHOD {
606 req.encode_body()
607 .map_err(|e| ClientError::invalid_request(format!("Failed to encode body: {}", e)))?
608 } else {
609 vec![]
610 };
611
612 builder
613 .body(body)
614 .map_err(|e| ClientError::invalid_request(format!("Failed to build request: {}", e)))
615}
616
617pub struct Response<Resp>
622where
623 Resp: XrpcResp, {
625 _marker: PhantomData<fn() -> Resp>,
626 buffer: Bytes,
627 status: StatusCode,
628}
629
630impl<R> Response<R>
631where
632 R: XrpcResp,
633{
634 pub fn new(buffer: Bytes, status: StatusCode) -> Self {
636 Self {
637 buffer,
638 status,
639 _marker: PhantomData,
640 }
641 }
642
643 pub fn status(&self) -> StatusCode {
645 self.status
646 }
647
648 pub fn buffer(&self) -> &Bytes {
650 &self.buffer
651 }
652
653 pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
655 if self.status.is_success() {
657 match R::decode_output(&self.buffer) {
658 Ok(output) => Ok(output),
659 Err(e) => Err(XrpcError::Decode(e)),
660 }
661 } else if self.status.as_u16() == 400 {
663 match serde_json::from_slice::<_>(&self.buffer) {
664 Ok(error) => Err(XrpcError::Xrpc(error)),
665 Err(_) => {
666 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
668 Ok(mut generic) => {
669 generic.nsid = R::NSID;
670 generic.method = ""; generic.http_status = self.status;
672 match generic.error.as_str() {
674 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
675 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
676 _ => Err(XrpcError::Generic(generic)),
677 }
678 }
679 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
680 }
681 }
682 }
683 } else {
685 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
686 Ok(mut generic) => {
687 generic.nsid = R::NSID;
688 generic.method = ""; generic.http_status = self.status;
690 match generic.error.as_str() {
691 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
692 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
693 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
694 }
695 }
696 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
697 }
698 }
699 }
700
701 pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
705 if self.status.is_success() {
707 match serde_json::from_slice::<_>(&self.buffer) {
708 Ok(output) => Ok(output),
709 Err(_) => {
710 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
711 if let Ok(data) = Data::from_cbor(&data) {
712 Ok(data.into_static())
713 } else {
714 Ok(Data::Bytes(self.buffer.clone()))
715 }
716 } else {
717 Ok(Data::Bytes(self.buffer.clone()))
718 }
719 }
720 }
721 } else if self.status.as_u16() == 400 {
723 match serde_json::from_slice::<_>(&self.buffer) {
724 Ok(error) => Err(XrpcError::Xrpc(error)),
725 Err(_) => {
726 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
728 Ok(mut generic) => {
729 generic.nsid = R::NSID;
730 generic.method = ""; generic.http_status = self.status;
732 match generic.error.as_str() {
734 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
735 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
736 _ => Err(XrpcError::Generic(generic)),
737 }
738 }
739 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
740 }
741 }
742 }
743 } else {
745 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
746 Ok(mut generic) => {
747 generic.nsid = R::NSID;
748 generic.method = ""; generic.http_status = self.status;
750 match generic.error.as_str() {
751 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
752 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
753 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
754 }
755 }
756 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
757 }
758 }
759 }
760
761 pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
765 if self.status.is_success() {
767 match serde_json::from_slice::<_>(&self.buffer) {
768 Ok(output) => Ok(output),
769 Err(_) => {
770 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
771 if let Ok(data) = RawData::from_cbor(&data) {
772 Ok(data.into_static())
773 } else {
774 Ok(RawData::Bytes(self.buffer.clone()))
775 }
776 } else {
777 Ok(RawData::Bytes(self.buffer.clone()))
778 }
779 }
780 }
781 } else if self.status.as_u16() == 400 {
783 match serde_json::from_slice::<_>(&self.buffer) {
784 Ok(error) => Err(XrpcError::Xrpc(error)),
785 Err(_) => {
786 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
788 Ok(mut generic) => {
789 generic.nsid = R::NSID;
790 generic.method = ""; generic.http_status = self.status;
792 match generic.error.as_str() {
794 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
795 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
796 _ => Err(XrpcError::Generic(generic)),
797 }
798 }
799 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
800 }
801 }
802 }
803 } else {
805 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
806 Ok(mut generic) => {
807 generic.nsid = R::NSID;
808 generic.method = ""; generic.http_status = self.status;
810 match generic.error.as_str() {
811 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
812 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
813 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
814 }
815 }
816 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
817 }
818 }
819 }
820
821 pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
833 Response {
834 buffer: self.buffer,
835 status: self.status,
836 _marker: PhantomData,
837 }
838 }
839}
840
841pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
843pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
845
846impl<R> Response<R>
847where
848 R: XrpcResp,
849{
850 pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
852 where
853 for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
854 for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
855 {
856 fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
857 serde_json::from_slice(buffer)
858 }
859
860 if self.status.is_success() {
862 match R::decode_output(&self.buffer) {
863 Ok(output) => Ok(output.into_static()),
864 Err(e) => Err(XrpcError::Decode(e)),
865 }
866 } else if self.status.as_u16() == 400 {
868 let error = match parse_error::<R>(&self.buffer) {
869 Ok(error) => XrpcError::Xrpc(error),
870 Err(_) => {
871 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
873 Ok(mut generic) => {
874 generic.nsid = R::NSID;
875 generic.method = ""; generic.http_status = self.status;
877 match generic.error.as_ref() {
879 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
880 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
881 _ => XrpcError::Generic(generic),
882 }
883 }
884 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
885 }
886 }
887 };
888 Err(error.into_static())
889 } else {
891 let error: XrpcError<<R as XrpcResp>::Err<'_>> =
892 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
893 Ok(mut generic) => {
894 let status = self.status;
895 generic.nsid = R::NSID;
896 generic.method = ""; generic.http_status = status;
898 match generic.error.as_ref() {
899 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
900 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
901 _ => XrpcError::Auth(AuthError::NotAuthenticated),
902 }
903 }
904 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
905 };
906
907 Err(error.into_static())
908 }
909 }
910}
911
912#[derive(Debug, Clone, Deserialize, Serialize)]
916pub struct GenericXrpcError {
917 pub error: SmolStr,
919 pub message: Option<SmolStr>,
921 #[serde(skip)]
923 pub nsid: &'static str,
924 #[serde(skip)]
926 pub method: &'static str,
927 #[serde(skip)]
929 pub http_status: StatusCode,
930}
931
932impl std::fmt::Display for GenericXrpcError {
933 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934 if let Some(msg) = &self.message {
935 write!(
936 f,
937 "{}: {} (nsid={}, method={}, status={})",
938 self.error, msg, self.nsid, self.method, self.http_status
939 )
940 } else {
941 write!(
942 f,
943 "{} (nsid={}, method={}, status={})",
944 self.error, self.nsid, self.method, self.http_status
945 )
946 }
947 }
948}
949
950impl IntoStatic for GenericXrpcError {
951 type Output = Self;
952
953 fn into_static(self) -> Self::Output {
954 self
955 }
956}
957
958impl std::error::Error for GenericXrpcError {}
959
960#[derive(Debug, thiserror::Error, miette::Diagnostic)]
965pub enum XrpcError<E: std::error::Error + IntoStatic> {
966 #[error("XRPC error: {0}")]
968 #[diagnostic(code(jacquard_common::xrpc::typed))]
969 Xrpc(E),
970
971 #[error("Authentication error: {0}")]
973 #[diagnostic(code(jacquard_common::xrpc::auth))]
974 Auth(#[from] AuthError),
975
976 #[error("XRPC error: {0}")]
978 #[diagnostic(code(jacquard_common::xrpc::generic))]
979 Generic(GenericXrpcError),
980
981 #[error("Failed to decode response: {0}")]
983 #[diagnostic(code(jacquard_common::xrpc::decode))]
984 Decode(#[from] DecodeError),
985}
986
987impl<E> IntoStatic for XrpcError<E>
988where
989 E: std::error::Error + IntoStatic,
990 E::Output: std::error::Error + IntoStatic,
991 <E as IntoStatic>::Output: std::error::Error + IntoStatic,
992{
993 type Output = XrpcError<E::Output>;
994 fn into_static(self) -> Self::Output {
995 match self {
996 XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
997 XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
998 XrpcError::Generic(e) => XrpcError::Generic(e),
999 XrpcError::Decode(e) => XrpcError::Decode(e),
1000 }
1001 }
1002}
1003
1004impl<E> Serialize for XrpcError<E>
1005where
1006 E: std::error::Error + IntoStatic + Serialize,
1007{
1008 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1009 where
1010 S: serde::Serializer,
1011 {
1012 use serde::ser::SerializeStruct;
1013
1014 match self {
1015 XrpcError::Xrpc(e) => e.serialize(serializer),
1017 XrpcError::Generic(g) => g.serialize(serializer),
1019 XrpcError::Auth(auth) => {
1021 let mut state = serializer.serialize_struct("XrpcError", 2)?;
1022 let (error, message) = match auth {
1023 AuthError::TokenExpired => ("ExpiredToken", Some("Access token has expired")),
1024 AuthError::InvalidToken => {
1025 ("InvalidToken", Some("Access token is invalid or malformed"))
1026 }
1027 AuthError::RefreshFailed => {
1028 ("RefreshFailed", Some("Token refresh request failed"))
1029 }
1030 AuthError::NotAuthenticated => (
1031 "AuthenticationRequired",
1032 Some("Request requires authentication but none was provided"),
1033 ),
1034 AuthError::Other(hv) => {
1035 let msg = hv.to_str().unwrap_or("[non-utf8 header]");
1036 ("AuthenticationError", Some(msg))
1037 }
1038 };
1039 state.serialize_field("error", error)?;
1040 if let Some(msg) = message {
1041 state.serialize_field("message", msg)?;
1042 }
1043 state.end()
1044 }
1045 XrpcError::Decode(decode_err) => {
1046 let mut state = serializer.serialize_struct("XrpcError", 2)?;
1047 state.serialize_field("error", "ResponseDecodeError")?;
1048 let msg = format!("{:?}", decode_err);
1050 state.serialize_field("message", &msg)?;
1051 state.end()
1052 }
1053 }
1054 }
1055}
1056
1057#[cfg(feature = "streaming")]
1058impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
1059 pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
1063 where
1064 R: XrpcRequest,
1065 <R as XrpcRequest>::Response: Send + Sync,
1066 {
1067 let http_request =
1068 build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
1069
1070 let http_response = self
1071 .client
1072 .send_http_streaming(http_request)
1073 .await
1074 .map_err(StreamError::transport)?;
1075 let (parts, body) = http_response.into_parts();
1076
1077 Ok(StreamingResponse::new(parts, body))
1078 }
1079
1080 pub async fn stream<S>(
1085 self,
1086 stream: XrpcProcedureSend<S::Frame<'static>>,
1087 ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1088 where
1089 S: XrpcProcedureStream + 'static,
1090 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1091 {
1092 use futures::TryStreamExt;
1093
1094 let mut url = self.base;
1095 let mut path = url.path().trim_end_matches('/').to_owned();
1096 path.push_str("/xrpc/");
1097 path.push_str(<S::Request as XrpcRequest>::NSID);
1098 url.set_path(&path);
1099
1100 let mut builder = http::Request::post(url.to_string());
1101
1102 if let Some(token) = &self.opts.auth {
1103 let hv = match token {
1104 AuthorizationToken::Bearer(t) => {
1105 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1106 }
1107 AuthorizationToken::Dpop(t) => {
1108 HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1109 }
1110 }
1111 .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1112 builder = builder.header(Header::Authorization, hv);
1113 }
1114
1115 if let Some(proxy) = &self.opts.atproto_proxy {
1116 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1117 }
1118 if let Some(labelers) = &self.opts.atproto_accept_labelers {
1119 if !labelers.is_empty() {
1120 let joined = labelers
1121 .iter()
1122 .map(|s| s.as_ref())
1123 .collect::<Vec<_>>()
1124 .join(", ");
1125 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1126 }
1127 }
1128
1129 for (name, value) in &self.opts.extra_headers {
1130 builder = builder.header(name, value);
1131 }
1132
1133 let (parts, _) = builder
1134 .body(())
1135 .map_err(|e| StreamError::protocol(e.to_string()))?
1136 .into_parts();
1137
1138 let body_stream = Box::pin(stream.0.map_ok(|f| f.buffer));
1139
1140 let resp = self
1141 .client
1142 .send_http_bidirectional(parts, body_stream)
1143 .await
1144 .map_err(StreamError::transport)?;
1145
1146 let (parts, body) = resp.into_parts();
1147
1148 Ok(XrpcResponseStream::<
1149 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1150 >::from_typed_parts(parts, body))
1151 }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157 use serde::{Deserialize, Serialize};
1158
1159 #[derive(Serialize, Deserialize)]
1160 #[allow(dead_code)]
1161 struct DummyReq;
1162
1163 #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1164 #[error("{0}")]
1165 struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1166
1167 impl IntoStatic for DummyErr<'_> {
1168 type Output = DummyErr<'static>;
1169 fn into_static(self) -> Self::Output {
1170 DummyErr(self.0.into_static())
1171 }
1172 }
1173
1174 struct DummyResp;
1175
1176 impl XrpcResp for DummyResp {
1177 const NSID: &'static str = "test.dummy";
1178 const ENCODING: &'static str = "application/json";
1179 type Output<'de> = ();
1180 type Err<'de> = DummyErr<'de>;
1181 }
1182
1183 impl XrpcRequest for DummyReq {
1184 const NSID: &'static str = "test.dummy";
1185 const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1186 type Response = DummyResp;
1187 }
1188
1189 #[test]
1190 fn generic_error_carries_context() {
1191 let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1192 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1193 let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1194 match resp.parse().unwrap_err() {
1195 XrpcError::Generic(g) => {
1196 assert_eq!(g.error.as_str(), "InvalidRequest");
1197 assert_eq!(g.message.as_deref(), Some("missing"));
1198 assert_eq!(g.nsid, DummyResp::NSID);
1199 assert_eq!(g.method, ""); assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1201 }
1202 other => panic!("unexpected: {other:?}"),
1203 }
1204 }
1205
1206 #[test]
1207 fn auth_error_mapping() {
1208 for (code, expect) in [
1209 ("ExpiredToken", AuthError::TokenExpired),
1210 ("InvalidToken", AuthError::InvalidToken),
1211 ] {
1212 let body = serde_json::json!({"error": code});
1213 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1214 let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1215 match resp.parse().unwrap_err() {
1216 XrpcError::Auth(e) => match (e, expect) {
1217 (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1218 (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1219 other => panic!("mismatch: {other:?}"),
1220 },
1221 other => panic!("unexpected: {other:?}"),
1222 }
1223 }
1224 }
1225
1226 #[test]
1227 fn no_double_slash_in_path() {
1228 #[derive(Serialize, Deserialize)]
1229 struct Req;
1230 #[derive(Deserialize, Serialize, Debug, thiserror::Error)]
1231 #[error("{0}")]
1232 struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1233 impl IntoStatic for Err<'_> {
1234 type Output = Err<'static>;
1235 fn into_static(self) -> Self::Output {
1236 Err(self.0.into_static())
1237 }
1238 }
1239 struct Resp;
1240 impl XrpcResp for Resp {
1241 const NSID: &'static str = "com.example.test";
1242 const ENCODING: &'static str = "application/json";
1243 type Output<'de> = ();
1244 type Err<'de> = Err<'de>;
1245 }
1246 impl XrpcRequest for Req {
1247 const NSID: &'static str = "com.example.test";
1248 const METHOD: XrpcMethod = XrpcMethod::Query;
1249 type Response = Resp;
1250 }
1251
1252 let opts = CallOptions::default();
1253 for base in [
1254 Url::parse("https://pds").unwrap(),
1255 Url::parse("https://pds/").unwrap(),
1256 Url::parse("https://pds/base/").unwrap(),
1257 ] {
1258 let req = build_http_request(&base, &Req, &opts).unwrap();
1259 let uri = req.uri().to_string();
1260 assert!(uri.contains("/xrpc/com.example.test"));
1261 assert!(!uri.contains("//xrpc"));
1262 }
1263 }
1264}