1#[cfg(feature = "streaming")]
14pub mod streaming;
15
16use ipld_core::ipld::Ipld;
17#[cfg(feature = "streaming")]
18pub use streaming::{
19 StreamingResponse, XrpcProcedureSend, XrpcProcedureStream, XrpcResponseStream, XrpcStreamResp,
20};
21
22#[cfg(feature = "websocket")]
23pub mod subscription;
24
25#[cfg(feature = "streaming")]
26use crate::StreamError;
27use crate::http_client::HttpClient;
28#[cfg(feature = "streaming")]
29use crate::http_client::HttpClientExt;
30use crate::types::value::Data;
31use crate::{AuthorizationToken, error::AuthError};
32use crate::{CowStr, error::XrpcResult};
33use crate::{IntoStatic, error::DecodeError};
34use crate::{error::TransportError, types::value::RawData};
35use bytes::Bytes;
36use http::{
37 HeaderName, HeaderValue, Request, StatusCode,
38 header::{AUTHORIZATION, CONTENT_TYPE},
39};
40use serde::{Deserialize, Serialize};
41use smol_str::SmolStr;
42use std::fmt::{self, Debug};
43use std::{error::Error, marker::PhantomData};
44#[cfg(feature = "websocket")]
45pub use subscription::{
46 BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient,
47 SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp,
48 SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription,
49};
50use url::Url;
51
52#[derive(Debug, thiserror::Error, miette::Diagnostic)]
54pub enum EncodeError {
55 #[error("Failed to serialize query: {0}")]
57 Query(
58 #[from]
59 #[source]
60 serde_html_form::ser::Error,
61 ),
62 #[error("Failed to serialize JSON: {0}")]
64 Json(
65 #[from]
66 #[source]
67 serde_json::Error,
68 ),
69 #[error("Encoding error: {0}")]
71 Other(String),
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum XrpcMethod {
77 Query,
79 Procedure(&'static str),
81}
82
83impl XrpcMethod {
84 pub const fn as_str(&self) -> &'static str {
86 match self {
87 Self::Query => "GET",
88 Self::Procedure(_) => "POST",
89 }
90 }
91
92 pub const fn body_encoding(&self) -> Option<&'static str> {
94 match self {
95 Self::Query => None,
96 Self::Procedure(enc) => Some(enc),
97 }
98 }
99}
100
101pub trait XrpcRequest: Serialize {
108 const NSID: &'static str;
110
111 const METHOD: XrpcMethod;
113
114 type Response: XrpcResp;
116
117 fn encode_body(&self) -> Result<Vec<u8>, EncodeError> {
121 Ok(serde_json::to_vec(self)?)
122 }
123
124 fn decode_body<'de>(body: &'de [u8]) -> Result<Box<Self>, DecodeError>
128 where
129 Self: Deserialize<'de>,
130 {
131 let body: Self = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
132
133 Ok(Box::new(body))
134 }
135}
136
137pub trait XrpcResp {
141 const NSID: &'static str;
143
144 const ENCODING: &'static str;
146
147 type Output<'de>: Serialize + Deserialize<'de> + IntoStatic;
149
150 type Err<'de>: Error + Deserialize<'de> + IntoStatic;
152
153 fn encode_output(output: &Self::Output<'_>) -> Result<Vec<u8>, EncodeError> {
155 Ok(serde_json::to_vec(output)?)
156 }
157
158 fn decode_output<'de>(body: &'de [u8]) -> Result<Self::Output<'de>, DecodeError>
162 where
163 Self::Output<'de>: Deserialize<'de>,
164 {
165 let body = serde_json::from_slice(body).map_err(|e| DecodeError::Json(e))?;
166
167 Ok(body)
168 }
169}
170
171pub trait XrpcEndpoint {
179 const PATH: &'static str;
181 const METHOD: XrpcMethod;
183 type Request<'de>: XrpcRequest + Deserialize<'de> + IntoStatic;
185 type Response: XrpcResp;
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
191pub struct GenericError<'a>(#[serde(borrow)] Data<'a>);
192
193impl<'de> fmt::Display for GenericError<'de> {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 self.0.fmt(f)
196 }
197}
198
199impl Error for GenericError<'_> {}
200
201impl IntoStatic for GenericError<'_> {
202 type Output = GenericError<'static>;
203 fn into_static(self) -> Self::Output {
204 GenericError(self.0.into_static())
205 }
206}
207
208#[derive(Debug, Default, Clone)]
210pub struct CallOptions<'a> {
211 pub auth: Option<AuthorizationToken<'a>>,
213 pub atproto_proxy: Option<CowStr<'a>>,
215 pub atproto_accept_labelers: Option<Vec<CowStr<'a>>>,
217 pub extra_headers: Vec<(HeaderName, HeaderValue)>,
219}
220
221impl IntoStatic for CallOptions<'_> {
222 type Output = CallOptions<'static>;
223
224 fn into_static(self) -> Self::Output {
225 CallOptions {
226 auth: self.auth.map(|auth| auth.into_static()),
227 atproto_proxy: self.atproto_proxy.map(|proxy| proxy.into_static()),
228 atproto_accept_labelers: self
229 .atproto_accept_labelers
230 .map(|labelers| labelers.into_static()),
231 extra_headers: self.extra_headers,
232 }
233 }
234}
235
236pub trait XrpcExt: HttpClient {
252 fn xrpc<'a>(&'a self, base: Url) -> XrpcCall<'a, Self>
254 where
255 Self: Sized,
256 {
257 XrpcCall {
258 client: self,
259 base,
260 opts: CallOptions::default(),
261 }
262 }
263}
264
265impl<T: HttpClient> XrpcExt for T {}
266
267pub type XrpcResponse<R> = Response<<R as XrpcRequest>::Response>;
269
270#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
272pub trait XrpcClient: HttpClient {
273 fn base_uri(&self) -> impl Future<Output = Url>;
275
276 fn opts(&self) -> impl Future<Output = CallOptions<'_>> {
278 async { CallOptions::default() }
279 }
280
281 #[cfg(not(target_arch = "wasm32"))]
283 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
284 where
285 R: XrpcRequest + Send + Sync,
286 <R as XrpcRequest>::Response: Send + Sync,
287 Self: Sync;
288
289 #[cfg(target_arch = "wasm32")]
291 fn send<R>(&self, request: R) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
292 where
293 R: XrpcRequest + Send + Sync,
294 <R as XrpcRequest>::Response: Send + Sync;
295
296 #[cfg(not(target_arch = "wasm32"))]
298 fn send_with_opts<R>(
299 &self,
300 request: R,
301 opts: CallOptions<'_>,
302 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
303 where
304 R: XrpcRequest + Send + Sync,
305 <R as XrpcRequest>::Response: Send + Sync,
306 Self: Sync;
307
308 #[cfg(target_arch = "wasm32")]
310 fn send_with_opts<R>(
311 &self,
312 request: R,
313 opts: CallOptions<'_>,
314 ) -> impl Future<Output = XrpcResult<XrpcResponse<R>>>
315 where
316 R: XrpcRequest + Send + Sync,
317 <R as XrpcRequest>::Response: Send + Sync;
318}
319
320#[cfg(feature = "streaming")]
322pub trait XrpcStreamingClient: XrpcClient + HttpClientExt {
323 #[cfg(not(target_arch = "wasm32"))]
325 fn download<R>(
326 &self,
327 request: R,
328 ) -> impl Future<Output = Result<StreamingResponse, StreamError>> + Send
329 where
330 R: XrpcRequest + Send + Sync,
331 <R as XrpcRequest>::Response: Send + Sync,
332 Self: Sync;
333
334 #[cfg(target_arch = "wasm32")]
336 fn download<R>(
337 &self,
338 request: R,
339 ) -> impl Future<Output = Result<StreamingResponse, StreamError>>
340 where
341 R: XrpcRequest + Send + Sync,
342 <R as XrpcRequest>::Response: Send + Sync;
343
344 #[cfg(not(target_arch = "wasm32"))]
346 fn stream<S>(
347 &self,
348 stream: XrpcProcedureSend<S::Frame<'static>>,
349 ) -> impl Future<
350 Output = Result<
351 XrpcResponseStream<
352 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
353 >,
354 StreamError,
355 >,
356 >
357 where
358 S: XrpcProcedureStream + 'static,
359 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
360 Self: Sync;
361
362 #[cfg(target_arch = "wasm32")]
364 fn stream<S>(
365 &self,
366 stream: XrpcProcedureSend<S::Frame<'static>>,
367 ) -> impl Future<
368 Output = Result<
369 XrpcResponseStream<
370 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
371 >,
372 StreamError,
373 >,
374 >
375 where
376 S: XrpcProcedureStream + 'static,
377 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
378}
379
380pub struct XrpcCall<'a, C: HttpClient> {
401 pub(crate) client: &'a C,
402 pub(crate) base: Url,
403 pub(crate) opts: CallOptions<'a>,
404}
405
406impl<'a, C: HttpClient> XrpcCall<'a, C> {
407 pub fn auth(mut self, token: AuthorizationToken<'a>) -> Self {
409 self.opts.auth = Some(token);
410 self
411 }
412 pub fn proxy(mut self, proxy: CowStr<'a>) -> Self {
414 self.opts.atproto_proxy = Some(proxy);
415 self
416 }
417 pub fn accept_labelers(mut self, labelers: Vec<CowStr<'a>>) -> Self {
419 self.opts.atproto_accept_labelers = Some(labelers);
420 self
421 }
422 pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
424 self.opts.extra_headers.push((name, value));
425 self
426 }
427 pub fn with_options(mut self, opts: CallOptions<'a>) -> Self {
429 self.opts = opts;
430 self
431 }
432
433 #[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip(self, request), fields(nsid = R::NSID)))]
442 pub async fn send<R>(self, request: &R) -> XrpcResult<Response<<R as XrpcRequest>::Response>>
443 where
444 R: XrpcRequest,
445 <R as XrpcRequest>::Response: Send + Sync,
446 {
447 let http_request = build_http_request(&self.base, request, &self.opts)
448 .map_err(crate::error::TransportError::from)?;
449
450 let http_response = self
451 .client
452 .send_http(http_request)
453 .await
454 .map_err(|e| crate::error::TransportError::Other(Box::new(e)))?;
455
456 process_response(http_response)
457 }
458}
459
460#[inline]
464pub fn process_response<Resp>(http_response: http::Response<Vec<u8>>) -> XrpcResult<Response<Resp>>
465where
466 Resp: XrpcResp,
467{
468 let status = http_response.status();
469 if status.as_u16() == 401 {
472 if let Some(hv) = http_response.headers().get(http::header::WWW_AUTHENTICATE) {
473 return Err(crate::error::ClientError::Auth(
474 crate::error::AuthError::Other(hv.clone()),
475 ));
476 }
477 }
478 let buffer = Bytes::from(http_response.into_body());
479
480 if !status.is_success() && !matches!(status.as_u16(), 400 | 401) {
481 return Err(crate::error::HttpError {
482 status,
483 body: Some(buffer),
484 }
485 .into());
486 }
487
488 Ok(Response::new(buffer, status))
489}
490
491pub enum Header {
493 ContentType,
495 Authorization,
497 AtprotoProxy,
501 AtprotoAcceptLabelers,
503}
504
505impl From<Header> for HeaderName {
506 fn from(value: Header) -> Self {
507 match value {
508 Header::ContentType => CONTENT_TYPE,
509 Header::Authorization => AUTHORIZATION,
510 Header::AtprotoProxy => HeaderName::from_static("atproto-proxy"),
511 Header::AtprotoAcceptLabelers => HeaderName::from_static("atproto-accept-labelers"),
512 }
513 }
514}
515
516pub fn build_http_request<'s, R>(
518 base: &Url,
519 req: &R,
520 opts: &CallOptions<'_>,
521) -> core::result::Result<Request<Vec<u8>>, crate::error::TransportError>
522where
523 R: XrpcRequest,
524{
525 let mut url = base.clone();
526 let mut path = url.path().trim_end_matches('/').to_owned();
527 path.push_str("/xrpc/");
528 path.push_str(<R as XrpcRequest>::NSID);
529 url.set_path(&path);
530
531 if let XrpcMethod::Query = <R as XrpcRequest>::METHOD {
532 let qs = serde_html_form::to_string(&req)
533 .map_err(|e| crate::error::TransportError::InvalidRequest(e.to_string()))?;
534 if !qs.is_empty() {
535 url.set_query(Some(&qs));
536 } else {
537 url.set_query(None);
538 }
539 }
540
541 let method = match <R as XrpcRequest>::METHOD {
542 XrpcMethod::Query => http::Method::GET,
543 XrpcMethod::Procedure(_) => http::Method::POST,
544 };
545
546 let mut builder = Request::builder().method(method).uri(url.as_str());
547
548 if let XrpcMethod::Procedure(encoding) = <R as XrpcRequest>::METHOD {
549 builder = builder.header(Header::ContentType, encoding);
550 }
551 let output_encoding = <R::Response as XrpcResp>::ENCODING;
552 builder = builder.header(http::header::ACCEPT, output_encoding);
553
554 if let Some(token) = &opts.auth {
555 let hv = match token {
556 AuthorizationToken::Bearer(t) => {
557 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
558 }
559 AuthorizationToken::Dpop(t) => HeaderValue::from_str(&format!("DPoP {}", t.as_ref())),
560 }
561 .map_err(|e| {
562 TransportError::InvalidRequest(format!("Invalid authorization token: {}", e))
563 })?;
564 builder = builder.header(Header::Authorization, hv);
565 }
566
567 if let Some(proxy) = &opts.atproto_proxy {
568 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
569 }
570 if let Some(labelers) = &opts.atproto_accept_labelers {
571 if !labelers.is_empty() {
572 let joined = labelers
573 .iter()
574 .map(|s| s.as_ref())
575 .collect::<Vec<_>>()
576 .join(", ");
577 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
578 }
579 }
580 for (name, value) in &opts.extra_headers {
581 builder = builder.header(name, value);
582 }
583
584 let body = if let XrpcMethod::Procedure(_) = R::METHOD {
585 req.encode_body()
586 .map_err(|e| TransportError::InvalidRequest(e.to_string()))?
587 } else {
588 vec![]
589 };
590
591 builder
592 .body(body)
593 .map_err(|e| TransportError::InvalidRequest(e.to_string()))
594}
595
596pub struct Response<Resp>
601where
602 Resp: XrpcResp, {
604 _marker: PhantomData<fn() -> Resp>,
605 buffer: Bytes,
606 status: StatusCode,
607}
608
609impl<R> Response<R>
610where
611 R: XrpcResp,
612{
613 pub fn new(buffer: Bytes, status: StatusCode) -> Self {
615 Self {
616 buffer,
617 status,
618 _marker: PhantomData,
619 }
620 }
621
622 pub fn status(&self) -> StatusCode {
624 self.status
625 }
626
627 pub fn buffer(&self) -> &Bytes {
629 &self.buffer
630 }
631
632 pub fn parse<'s>(&'s self) -> Result<RespOutput<'s, R>, XrpcError<RespErr<'s, R>>> {
634 if self.status.is_success() {
636 match R::decode_output(&self.buffer) {
637 Ok(output) => Ok(output),
638 Err(e) => Err(XrpcError::Decode(e)),
639 }
640 } else if self.status.as_u16() == 400 {
642 match serde_json::from_slice::<_>(&self.buffer) {
643 Ok(error) => Err(XrpcError::Xrpc(error)),
644 Err(_) => {
645 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
647 Ok(mut generic) => {
648 generic.nsid = R::NSID;
649 generic.method = ""; generic.http_status = self.status;
651 match generic.error.as_str() {
653 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
654 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
655 _ => Err(XrpcError::Generic(generic)),
656 }
657 }
658 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
659 }
660 }
661 }
662 } else {
664 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
665 Ok(mut generic) => {
666 generic.nsid = R::NSID;
667 generic.method = ""; generic.http_status = self.status;
669 match generic.error.as_str() {
670 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
671 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
672 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
673 }
674 }
675 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
676 }
677 }
678 }
679
680 pub fn parse_data<'s>(&'s self) -> Result<Data<'s>, XrpcError<RespErr<'s, R>>> {
684 if self.status.is_success() {
686 match serde_json::from_slice::<_>(&self.buffer) {
687 Ok(output) => Ok(output),
688 Err(_) => {
689 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
690 if let Ok(data) = Data::from_cbor(&data) {
691 Ok(data.into_static())
692 } else {
693 Ok(Data::Bytes(self.buffer.clone()))
694 }
695 } else {
696 Ok(Data::Bytes(self.buffer.clone()))
697 }
698 }
699 }
700 } else if self.status.as_u16() == 400 {
702 match serde_json::from_slice::<_>(&self.buffer) {
703 Ok(error) => Err(XrpcError::Xrpc(error)),
704 Err(_) => {
705 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
707 Ok(mut generic) => {
708 generic.nsid = R::NSID;
709 generic.method = ""; generic.http_status = self.status;
711 match generic.error.as_str() {
713 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
714 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
715 _ => Err(XrpcError::Generic(generic)),
716 }
717 }
718 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
719 }
720 }
721 }
722 } else {
724 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
725 Ok(mut generic) => {
726 generic.nsid = R::NSID;
727 generic.method = ""; generic.http_status = self.status;
729 match generic.error.as_str() {
730 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
731 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
732 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
733 }
734 }
735 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
736 }
737 }
738 }
739
740 pub fn parse_raw<'s>(&'s self) -> Result<RawData<'s>, XrpcError<RespErr<'s, R>>> {
744 if self.status.is_success() {
746 match serde_json::from_slice::<_>(&self.buffer) {
747 Ok(output) => Ok(output),
748 Err(_) => {
749 if let Ok(data) = serde_ipld_dagcbor::from_slice::<Ipld>(&self.buffer) {
750 if let Ok(data) = RawData::from_cbor(&data) {
751 Ok(data.into_static())
752 } else {
753 Ok(RawData::Bytes(self.buffer.clone()))
754 }
755 } else {
756 Ok(RawData::Bytes(self.buffer.clone()))
757 }
758 }
759 }
760 } else if self.status.as_u16() == 400 {
762 match serde_json::from_slice::<_>(&self.buffer) {
763 Ok(error) => Err(XrpcError::Xrpc(error)),
764 Err(_) => {
765 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
767 Ok(mut generic) => {
768 generic.nsid = R::NSID;
769 generic.method = ""; generic.http_status = self.status;
771 match generic.error.as_str() {
773 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
774 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
775 _ => Err(XrpcError::Generic(generic)),
776 }
777 }
778 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
779 }
780 }
781 }
782 } else {
784 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
785 Ok(mut generic) => {
786 generic.nsid = R::NSID;
787 generic.method = ""; generic.http_status = self.status;
789 match generic.error.as_str() {
790 "ExpiredToken" => Err(XrpcError::Auth(AuthError::TokenExpired)),
791 "InvalidToken" => Err(XrpcError::Auth(AuthError::InvalidToken)),
792 _ => Err(XrpcError::Auth(AuthError::NotAuthenticated)),
793 }
794 }
795 Err(e) => Err(XrpcError::Decode(DecodeError::Json(e))),
796 }
797 }
798 }
799
800 pub fn transmute<NEW: XrpcResp>(self) -> Response<NEW> {
812 Response {
813 buffer: self.buffer,
814 status: self.status,
815 _marker: PhantomData,
816 }
817 }
818}
819
820pub type RespOutput<'a, Resp> = <Resp as XrpcResp>::Output<'a>;
822pub type RespErr<'a, Resp> = <Resp as XrpcResp>::Err<'a>;
824
825impl<R> Response<R>
826where
827 R: XrpcResp,
828{
829 pub fn into_output(self) -> Result<RespOutput<'static, R>, XrpcError<RespErr<'static, R>>>
831 where
832 for<'a> RespOutput<'a, R>: IntoStatic<Output = RespOutput<'static, R>>,
833 for<'a> RespErr<'a, R>: IntoStatic<Output = RespErr<'static, R>>,
834 {
835 fn parse_error<'b, R: XrpcResp>(buffer: &'b [u8]) -> Result<R::Err<'b>, serde_json::Error> {
836 serde_json::from_slice(buffer)
837 }
838
839 if self.status.is_success() {
841 match R::decode_output(&self.buffer) {
842 Ok(output) => Ok(output.into_static()),
843 Err(e) => Err(XrpcError::Decode(e)),
844 }
845 } else if self.status.as_u16() == 400 {
847 let error = match parse_error::<R>(&self.buffer) {
848 Ok(error) => XrpcError::Xrpc(error),
849 Err(_) => {
850 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
852 Ok(mut generic) => {
853 generic.nsid = R::NSID;
854 generic.method = ""; generic.http_status = self.status;
856 match generic.error.as_ref() {
858 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
859 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
860 _ => XrpcError::Generic(generic),
861 }
862 }
863 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
864 }
865 }
866 };
867 Err(error.into_static())
868 } else {
870 let error: XrpcError<<R as XrpcResp>::Err<'_>> =
871 match serde_json::from_slice::<GenericXrpcError>(&self.buffer) {
872 Ok(mut generic) => {
873 let status = self.status;
874 generic.nsid = R::NSID;
875 generic.method = ""; generic.http_status = status;
877 match generic.error.as_ref() {
878 "ExpiredToken" => XrpcError::Auth(AuthError::TokenExpired),
879 "InvalidToken" => XrpcError::Auth(AuthError::InvalidToken),
880 _ => XrpcError::Auth(AuthError::NotAuthenticated),
881 }
882 }
883 Err(e) => XrpcError::Decode(DecodeError::Json(e)),
884 };
885
886 Err(error.into_static())
887 }
888 }
889}
890
891#[derive(Debug, Clone, Deserialize, Serialize)]
895pub struct GenericXrpcError {
896 pub error: SmolStr,
898 pub message: Option<SmolStr>,
900 #[serde(skip)]
902 pub nsid: &'static str,
903 #[serde(skip)]
905 pub method: &'static str,
906 #[serde(skip)]
908 pub http_status: StatusCode,
909}
910
911impl std::fmt::Display for GenericXrpcError {
912 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
913 if let Some(msg) = &self.message {
914 write!(
915 f,
916 "{}: {} (nsid={}, method={}, status={})",
917 self.error, msg, self.nsid, self.method, self.http_status
918 )
919 } else {
920 write!(
921 f,
922 "{} (nsid={}, method={}, status={})",
923 self.error, self.nsid, self.method, self.http_status
924 )
925 }
926 }
927}
928
929impl IntoStatic for GenericXrpcError {
930 type Output = Self;
931
932 fn into_static(self) -> Self::Output {
933 self
934 }
935}
936
937impl std::error::Error for GenericXrpcError {}
938
939#[derive(Debug, thiserror::Error, miette::Diagnostic)]
944pub enum XrpcError<E: std::error::Error + IntoStatic> {
945 #[error("XRPC error: {0}")]
947 #[diagnostic(code(jacquard_common::xrpc::typed))]
948 Xrpc(E),
949
950 #[error("Authentication error: {0}")]
952 #[diagnostic(code(jacquard_common::xrpc::auth))]
953 Auth(#[from] AuthError),
954
955 #[error("XRPC error: {0}")]
957 #[diagnostic(code(jacquard_common::xrpc::generic))]
958 Generic(GenericXrpcError),
959
960 #[error("Failed to decode response: {0}")]
962 #[diagnostic(code(jacquard_common::xrpc::decode))]
963 Decode(#[from] DecodeError),
964}
965
966impl<E> IntoStatic for XrpcError<E>
967where
968 E: std::error::Error + IntoStatic,
969 E::Output: std::error::Error + IntoStatic,
970 <E as IntoStatic>::Output: std::error::Error + IntoStatic,
971{
972 type Output = XrpcError<E::Output>;
973 fn into_static(self) -> Self::Output {
974 match self {
975 XrpcError::Xrpc(e) => XrpcError::Xrpc(e.into_static()),
976 XrpcError::Auth(e) => XrpcError::Auth(e.into_static()),
977 XrpcError::Generic(e) => XrpcError::Generic(e),
978 XrpcError::Decode(e) => XrpcError::Decode(e),
979 }
980 }
981}
982
983#[cfg(feature = "streaming")]
984impl<'a, C: HttpClient + HttpClientExt> XrpcCall<'a, C> {
985 pub async fn download<R>(self, request: &R) -> Result<StreamingResponse, StreamError>
989 where
990 R: XrpcRequest,
991 <R as XrpcRequest>::Response: Send + Sync,
992 {
993 let http_request =
994 build_http_request(&self.base, request, &self.opts).map_err(StreamError::transport)?;
995
996 let http_response = self
997 .client
998 .send_http_streaming(http_request)
999 .await
1000 .map_err(StreamError::transport)?;
1001 let (parts, body) = http_response.into_parts();
1002
1003 Ok(StreamingResponse::new(parts, body))
1004 }
1005
1006 pub async fn stream<S>(
1011 self,
1012 stream: XrpcProcedureSend<S::Frame<'static>>,
1013 ) -> Result<XrpcResponseStream<<S::Response as XrpcStreamResp>::Frame<'static>>, StreamError>
1014 where
1015 S: XrpcProcedureStream + 'static,
1016 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp,
1017 {
1018 use futures::TryStreamExt;
1019 use n0_future::StreamExt;
1020
1021 let mut url = self.base;
1022 let mut path = url.path().trim_end_matches('/').to_owned();
1023 path.push_str("/xrpc/");
1024 path.push_str(<S::Request as XrpcRequest>::NSID);
1025 url.set_path(&path);
1026
1027 let mut builder = http::Request::post(url.to_string());
1028
1029 if let Some(token) = &self.opts.auth {
1030 let hv = match token {
1031 AuthorizationToken::Bearer(t) => {
1032 HeaderValue::from_str(&format!("Bearer {}", t.as_ref()))
1033 }
1034 AuthorizationToken::Dpop(t) => {
1035 HeaderValue::from_str(&format!("DPoP {}", t.as_ref()))
1036 }
1037 }
1038 .map_err(|e| StreamError::protocol(format!("Invalid authorization token: {}", e)))?;
1039 builder = builder.header(Header::Authorization, hv);
1040 }
1041
1042 if let Some(proxy) = &self.opts.atproto_proxy {
1043 builder = builder.header(Header::AtprotoProxy, proxy.as_ref());
1044 }
1045 if let Some(labelers) = &self.opts.atproto_accept_labelers {
1046 if !labelers.is_empty() {
1047 let joined = labelers
1048 .iter()
1049 .map(|s| s.as_ref())
1050 .collect::<Vec<_>>()
1051 .join(", ");
1052 builder = builder.header(Header::AtprotoAcceptLabelers, joined);
1053 }
1054 }
1055 for (name, value) in &self.opts.extra_headers {
1056 builder = builder.header(name, value);
1057 }
1058
1059 let (parts, _) = builder
1060 .body(())
1061 .map_err(|e| StreamError::protocol(e.to_string()))?
1062 .into_parts();
1063
1064 let body_stream = stream.0.map_ok(|f| f.buffer).boxed();
1065
1066 let resp = self
1067 .client
1068 .send_http_bidirectional(parts, body_stream)
1069 .await
1070 .map_err(StreamError::transport)?;
1071
1072 let (parts, body) = resp.into_parts();
1073
1074 Ok(XrpcResponseStream::<
1075 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>,
1076 >::from_typed_parts(parts, body))
1077 }
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use super::*;
1083 use serde::{Deserialize, Serialize};
1084
1085 #[derive(Serialize, Deserialize)]
1086 #[allow(dead_code)]
1087 struct DummyReq;
1088
1089 #[derive(Deserialize, Debug, thiserror::Error)]
1090 #[error("{0}")]
1091 struct DummyErr<'a>(#[serde(borrow)] CowStr<'a>);
1092
1093 impl IntoStatic for DummyErr<'_> {
1094 type Output = DummyErr<'static>;
1095 fn into_static(self) -> Self::Output {
1096 DummyErr(self.0.into_static())
1097 }
1098 }
1099
1100 struct DummyResp;
1101
1102 impl XrpcResp for DummyResp {
1103 const NSID: &'static str = "test.dummy";
1104 const ENCODING: &'static str = "application/json";
1105 type Output<'de> = ();
1106 type Err<'de> = DummyErr<'de>;
1107 }
1108
1109 impl XrpcRequest for DummyReq {
1110 const NSID: &'static str = "test.dummy";
1111 const METHOD: XrpcMethod = XrpcMethod::Procedure("application/json");
1112 type Response = DummyResp;
1113 }
1114
1115 #[test]
1116 fn generic_error_carries_context() {
1117 let body = serde_json::json!({"error":"InvalidRequest","message":"missing"});
1118 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1119 let resp: Response<DummyResp> = Response::new(buf, StatusCode::BAD_REQUEST);
1120 match resp.parse().unwrap_err() {
1121 XrpcError::Generic(g) => {
1122 assert_eq!(g.error.as_str(), "InvalidRequest");
1123 assert_eq!(g.message.as_deref(), Some("missing"));
1124 assert_eq!(g.nsid, DummyResp::NSID);
1125 assert_eq!(g.method, ""); assert_eq!(g.http_status, StatusCode::BAD_REQUEST);
1127 }
1128 other => panic!("unexpected: {other:?}"),
1129 }
1130 }
1131
1132 #[test]
1133 fn auth_error_mapping() {
1134 for (code, expect) in [
1135 ("ExpiredToken", AuthError::TokenExpired),
1136 ("InvalidToken", AuthError::InvalidToken),
1137 ] {
1138 let body = serde_json::json!({"error": code});
1139 let buf = Bytes::from(serde_json::to_vec(&body).unwrap());
1140 let resp: Response<DummyResp> = Response::new(buf, StatusCode::UNAUTHORIZED);
1141 match resp.parse().unwrap_err() {
1142 XrpcError::Auth(e) => match (e, expect) {
1143 (AuthError::TokenExpired, AuthError::TokenExpired) => {}
1144 (AuthError::InvalidToken, AuthError::InvalidToken) => {}
1145 other => panic!("mismatch: {other:?}"),
1146 },
1147 other => panic!("unexpected: {other:?}"),
1148 }
1149 }
1150 }
1151
1152 #[test]
1153 fn no_double_slash_in_path() {
1154 #[derive(Serialize, Deserialize)]
1155 struct Req;
1156 #[derive(Deserialize, Debug, thiserror::Error)]
1157 #[error("{0}")]
1158 struct Err<'a>(#[serde(borrow)] CowStr<'a>);
1159 impl IntoStatic for Err<'_> {
1160 type Output = Err<'static>;
1161 fn into_static(self) -> Self::Output {
1162 Err(self.0.into_static())
1163 }
1164 }
1165 struct Resp;
1166 impl XrpcResp for Resp {
1167 const NSID: &'static str = "com.example.test";
1168 const ENCODING: &'static str = "application/json";
1169 type Output<'de> = ();
1170 type Err<'de> = Err<'de>;
1171 }
1172 impl XrpcRequest for Req {
1173 const NSID: &'static str = "com.example.test";
1174 const METHOD: XrpcMethod = XrpcMethod::Query;
1175 type Response = Resp;
1176 }
1177
1178 let opts = CallOptions::default();
1179 for base in [
1180 Url::parse("https://pds").unwrap(),
1181 Url::parse("https://pds/").unwrap(),
1182 Url::parse("https://pds/base/").unwrap(),
1183 ] {
1184 let req = build_http_request(&base, &Req, &opts).unwrap();
1185 let uri = req.uri().to_string();
1186 assert!(uri.contains("/xrpc/com.example.test"));
1187 assert!(!uri.contains("//xrpc"));
1188 }
1189 }
1190}