1use crate::error::{HeaderMap, IntoHeaderPair, LingerError};
2use bytes::Bytes;
3use futures_core::Stream;
4use futures_util::StreamExt;
5use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9
10pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, LingerError>> + Send>>;
13
14pub enum HttpRequestBody {
17 Bytes(Bytes),
20 Stream(BodyStream),
23}
24
25impl HttpRequestBody {
26 pub fn is_stream(&self) -> bool {
29 matches!(self, Self::Stream(_))
30 }
31
32 pub fn as_bytes(&self) -> Option<&[u8]> {
35 match self {
36 Self::Bytes(bytes) => Some(bytes),
37 Self::Stream(_) => None,
38 }
39 }
40
41 pub fn into_stream(self) -> BodyStream {
44 match self {
45 Self::Bytes(bytes) => {
46 Box::pin(futures_util::stream::once(async move { Ok(bytes) })) as BodyStream
47 }
48 Self::Stream(stream) => stream,
49 }
50 }
51}
52
53impl fmt::Debug for HttpRequestBody {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 Self::Bytes(bytes) => write!(f, "<{} bytes>", bytes.len()),
57 Self::Stream(_) => f.write_str("<stream>"),
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
65#[non_exhaustive]
66pub enum HttpMethod {
67 Get,
70 Post,
73 Delete,
76}
77
78pub struct HttpRequest {
81 method: HttpMethod,
82 url: String,
83 path: String,
84 headers: HeaderMap,
85 body: Option<HttpRequestBody>,
86}
87
88impl HttpRequest {
89 pub fn new(method: HttpMethod, base_url: impl AsRef<str>, path: impl Into<String>) -> Self {
92 let path = path.into();
93 let url = format!("{}{}", base_url.as_ref().trim_end_matches('/'), path);
94 Self {
95 method,
96 url,
97 path,
98 headers: HeaderMap::new(),
99 body: None,
100 }
101 }
102
103 pub fn method(&self) -> HttpMethod {
106 self.method
107 }
108
109 pub fn url(&self) -> &str {
112 &self.url
113 }
114
115 pub fn path(&self) -> &str {
118 &self.path
119 }
120
121 pub fn insert_header(&mut self, name: impl Into<String>, value: impl Into<String>) {
124 self.headers.insert(name, value);
125 }
126
127 pub fn header(&self, name: &str) -> Option<&str> {
130 self.headers.get(name)
131 }
132
133 pub fn headers(&self) -> &HeaderMap {
136 &self.headers
137 }
138
139 pub fn set_body(&mut self, body: impl Into<Bytes>) {
142 self.body = Some(HttpRequestBody::Bytes(body.into()));
143 }
144
145 pub fn set_body_stream<S>(&mut self, body: S)
148 where
149 S: Stream<Item = Result<Bytes, LingerError>> + Send + 'static,
150 {
151 self.body = Some(HttpRequestBody::Stream(Box::pin(body)));
152 }
153
154 pub fn body(&self) -> Option<&[u8]> {
157 self.body.as_ref().and_then(HttpRequestBody::as_bytes)
158 }
159
160 pub fn body_is_stream(&self) -> bool {
163 self.body.as_ref().is_some_and(HttpRequestBody::is_stream)
164 }
165
166 pub fn into_body(self) -> Option<HttpRequestBody> {
169 self.body
170 }
171}
172
173impl fmt::Debug for HttpRequest {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_struct("HttpRequest")
176 .field("method", &self.method)
177 .field("url", &self.url)
178 .field("path", &self.path)
179 .field("headers", &self.headers)
180 .field("body", &self.body.as_ref())
181 .finish()
182 }
183}
184
185enum HttpBody {
186 Bytes(Bytes),
187 Stream(BodyStream),
188}
189
190pub struct HttpResponse {
193 status: u16,
194 headers: HeaderMap,
195 body: HttpBody,
196}
197
198impl HttpResponse {
199 pub fn from_bytes<I, P>(status: u16, headers: I, body: impl Into<Bytes>) -> Self
202 where
203 I: IntoIterator<Item = P>,
204 P: IntoHeaderPair,
205 {
206 Self {
207 status,
208 headers: HeaderMap::from_pairs(headers),
209 body: HttpBody::Bytes(body.into()),
210 }
211 }
212
213 pub fn from_stream<I, P, S>(status: u16, headers: I, body: S) -> Self
216 where
217 I: IntoIterator<Item = P>,
218 P: IntoHeaderPair,
219 S: Stream<Item = Result<Bytes, LingerError>> + Send + 'static,
220 {
221 Self {
222 status,
223 headers: HeaderMap::from_pairs(headers),
224 body: HttpBody::Stream(Box::pin(body)),
225 }
226 }
227
228 pub fn status(&self) -> u16 {
231 self.status
232 }
233
234 pub fn headers(&self) -> &HeaderMap {
237 &self.headers
238 }
239
240 pub fn into_parts(self) -> (u16, HeaderMap, BodyStream) {
243 let body = match self.body {
244 HttpBody::Bytes(bytes) => {
245 Box::pin(futures_util::stream::once(async move { Ok(bytes) })) as BodyStream
246 }
247 HttpBody::Stream(stream) => stream,
248 };
249 (self.status, self.headers, body)
250 }
251
252 pub fn into_body_stream(self) -> BodyStream {
255 self.into_parts().2
256 }
257
258 pub async fn into_bytes(self) -> Result<(u16, HeaderMap, Bytes), LingerError> {
261 let (status, headers, mut stream) = self.into_parts();
262 let mut body = Vec::new();
263 while let Some(chunk) = stream.next().await {
264 body.extend_from_slice(&chunk?);
265 }
266 Ok((status, headers, Bytes::from(body)))
267 }
268}
269
270pub trait Transport: Send + Sync {
273 fn send(
276 &self,
277 request: HttpRequest,
278 ) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>>;
279}
280
281#[derive(Clone)]
284pub struct SharedTransport {
285 inner: Arc<dyn Transport>,
286}
287
288impl SharedTransport {
289 pub fn new<T>(transport: T) -> Self
292 where
293 T: Transport + 'static,
294 {
295 Self {
296 inner: Arc::new(transport),
297 }
298 }
299
300 pub fn send(
303 &self,
304 request: HttpRequest,
305 ) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>> {
306 self.inner.send(request)
307 }
308}
309
310impl fmt::Debug for SharedTransport {
311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312 f.debug_struct("SharedTransport").finish_non_exhaustive()
313 }
314}
315
316#[cfg(feature = "reqwest-transport")]
319#[derive(Clone, Debug)]
320pub struct ReqwestTransport {
321 client: reqwest::Client,
322}
323
324#[cfg(feature = "reqwest-transport")]
325impl Default for ReqwestTransport {
326 fn default() -> Self {
327 Self {
328 client: reqwest::Client::new(),
329 }
330 }
331}
332
333#[cfg(feature = "reqwest-transport")]
334impl ReqwestTransport {
335 pub fn new(client: reqwest::Client) -> Self {
338 Self { client }
339 }
340}
341
342#[cfg(feature = "reqwest-transport")]
343impl Transport for ReqwestTransport {
344 fn send(
345 &self,
346 request: HttpRequest,
347 ) -> Pin<Box<dyn Future<Output = Result<HttpResponse, LingerError>> + Send + '_>> {
348 Box::pin(async move {
349 let method = match request.method {
350 HttpMethod::Get => reqwest::Method::GET,
351 HttpMethod::Post => reqwest::Method::POST,
352 HttpMethod::Delete => reqwest::Method::DELETE,
353 };
354 let mut builder = self.client.request(method, request.url);
355 for (name, value) in request.headers.iter() {
356 builder = builder.header(name, value);
357 }
358 if let Some(body) = request.body {
359 builder = match body {
360 HttpRequestBody::Bytes(bytes) => builder.body(bytes),
361 HttpRequestBody::Stream(stream) => {
362 builder.body(reqwest::Body::wrap_stream(stream))
363 }
364 };
365 }
366 let response = builder
367 .send()
368 .await
369 .map_err(|error| LingerError::transport(error.to_string()))?;
370 let status = response.status().as_u16();
371 let headers =
372 HeaderMap::from_pairs(response.headers().iter().filter_map(|(name, value)| {
373 value.to_str().ok().map(|value| (name.as_str(), value))
374 }));
375 let stream = response
376 .bytes_stream()
377 .map(|chunk| chunk.map_err(|error| LingerError::transport(error.to_string())));
378 Ok(HttpResponse::from_stream(status, headers.iter(), stream))
379 })
380 }
381}