1use crate::{
10 header::{TRAILER, X_STREAM_ERROR_KEY},
11 read::{JsonLineDecoder, StreamReader},
12 ApiError, ApiRequest,
13};
14use async_trait::async_trait;
15use bytes::Bytes;
16use common_multipart_rfc7578::client::multipart;
17use futures::{future, FutureExt, Stream, TryStreamExt};
18use http::{
19 header::{HeaderName, HeaderValue},
20 StatusCode,
21};
22use serde::Deserialize;
23use std::fmt::Display;
24use tokio_util::codec::{Decoder, FramedRead};
25
26cfg_if::cfg_if! {
27 if #[cfg(feature = "with-send-sync")] {
28 pub type BoxStream<T, E> = Box<dyn Stream<Item = Result<T, E>> + Send + Unpin>;
29 } else {
30 pub type BoxStream<T, E> = Box<dyn Stream<Item = Result<T, E>> + Unpin>;
31 }
32}
33
34#[cfg_attr(feature = "with-send-sync", async_trait)]
35#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
36pub trait Backend {
37 cfg_if::cfg_if! {
38 if #[cfg(feature = "with-send-sync")] {
39 type HttpRequest: Send;
40 } else {
41 type HttpRequest;
42 }
43 }
44
45 type HttpResponse;
46
47 cfg_if::cfg_if! {
48 if #[cfg(feature = "with-send-sync")] {
49 type Error: Display + From<ApiError> + From<crate::Error> + Send + 'static;
50 } else {
51 type Error: Display + From<ApiError> + From<crate::Error> + 'static;
52 }
53 }
54
55 fn build_base_request<Req>(
58 &self,
59 req: Req,
60 form: Option<multipart::Form<'static>>,
61 ) -> Result<Self::HttpRequest, Self::Error>
62 where
63 Req: ApiRequest;
64
65 fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue>;
68
69 async fn request_raw<Req>(
72 &self,
73 req: Req,
74 form: Option<multipart::Form<'static>>,
75 ) -> Result<(StatusCode, Bytes), Self::Error>
76 where
77 Req: ApiRequest;
78
79 fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error>;
80
81 fn request_stream<Res, F>(
85 &self,
86 req: Self::HttpRequest,
87 process: F,
88 ) -> BoxStream<Res, Self::Error>
89 where
90 F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>;
91
92 #[inline]
95 fn process_error_from_body(body: Bytes) -> Self::Error {
96 match serde_json::from_slice::<ApiError>(&body) {
97 Ok(e) => e.into(),
98 Err(_) => {
99 let err = match String::from_utf8(body.to_vec()) {
100 Ok(s) => crate::Error::UnrecognizedApiError(s),
101 Err(e) => crate::Error::from(e),
102 };
103
104 err.into()
105 }
106 }
107 }
108
109 fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Self::Error>
113 where
114 for<'de> Res: 'static + Deserialize<'de> + Send,
115 {
116 match status {
117 StatusCode::OK => serde_json::from_slice(&body)
118 .map_err(crate::Error::from)
119 .map_err(Self::Error::from),
120 _ => Err(Self::process_error_from_body(body)),
121 }
122 }
123
124 fn process_stream_response<D, Res>(
128 res: Self::HttpResponse,
129 decoder: D,
130 ) -> FramedRead<StreamReader<BoxStream<Bytes, Self::Error>>, D>
131 where
132 D: Decoder<Item = Res, Error = crate::Error>,
133 {
134 FramedRead::new(
135 StreamReader::new(Self::response_to_byte_stream(res)),
136 decoder,
137 )
138 }
139
140 async fn request<Req, Res>(
144 &self,
145 req: Req,
146 form: Option<multipart::Form<'static>>,
147 ) -> Result<Res, Self::Error>
148 where
149 Req: ApiRequest,
150 for<'de> Res: 'static + Deserialize<'de> + Send,
151 {
152 let (status, chunk) = self.request_raw(req, form).await?;
153
154 Self::process_json_response(status, chunk)
155 }
156
157 async fn request_empty<Req>(
161 &self,
162 req: Req,
163 form: Option<multipart::Form<'static>>,
164 ) -> Result<(), Self::Error>
165 where
166 Req: ApiRequest,
167 {
168 let (status, chunk) = self.request_raw(req, form).await?;
169
170 match status {
171 StatusCode::OK => Ok(()),
172 _ => Err(Self::process_error_from_body(chunk)),
173 }
174 }
175
176 async fn request_string<Req>(
180 &self,
181 req: Req,
182 form: Option<multipart::Form<'static>>,
183 ) -> Result<String, Self::Error>
184 where
185 Req: ApiRequest,
186 {
187 let (status, chunk) = self.request_raw(req, form).await?;
188
189 match status {
190 StatusCode::OK => String::from_utf8(chunk.to_vec())
191 .map_err(crate::Error::from)
192 .map_err(Self::Error::from),
193 _ => Err(Self::process_error_from_body(chunk)),
194 }
195 }
196
197 fn request_stream_bytes(&self, req: Self::HttpRequest) -> BoxStream<Bytes, Self::Error> {
201 self.request_stream(req, |res| Self::response_to_byte_stream(res))
202 }
203
204 fn request_stream_json<Res>(&self, req: Self::HttpRequest) -> BoxStream<Res, Self::Error>
208 where
209 for<'de> Res: 'static + Deserialize<'de> + Send,
210 {
211 self.request_stream(req, |res| {
212 let parse_stream_error = if let Some(trailer) = Self::get_header(&res, TRAILER) {
213 if trailer == X_STREAM_ERROR_KEY {
218 true
219 } else {
220 let err = crate::Error::UnrecognizedTrailerHeader(
221 String::from_utf8_lossy(trailer.as_ref()).into(),
222 );
223
224 return Box::new(future::err(err).into_stream().err_into());
228 }
229 } else {
230 false
231 };
232
233 Box::new(
234 Self::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
235 .err_into(),
236 )
237 })
238 }
239
240 fn with_credentials<U, P>(self, username: U, password: P) -> Self
242 where
243 U: Into<String>,
244 P: Into<String>;
245}