better_fetch/tower/
service.rs1use std::sync::{Arc, Mutex};
2use std::task::{Context, Poll};
3
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6use tower::util::BoxCloneService;
7use tower::{Service, ServiceExt};
8
9use crate::backend::exec::{send_reqwest, send_reqwest_stream};
10use crate::backend::{HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse};
11use crate::{Error, Result};
12
13pub type BoxHttpService = BoxCloneService<HttpRequest, HttpResponse, Error>;
15
16pub type BoxStreamingHttpService = BoxCloneService<HttpRequest, HttpStreamingResponse, Error>;
18
19#[derive(Clone, Debug)]
21pub struct ReqwestHttpService {
22 client: reqwest::Client,
23}
24
25impl ReqwestHttpService {
26 pub fn new(client: reqwest::Client) -> Self {
28 Self { client }
29 }
30
31 pub fn default_client() -> Self {
33 Self::new(reqwest::Client::new())
34 }
35
36 pub fn client(&self) -> &reqwest::Client {
38 &self.client
39 }
40}
41
42impl Default for ReqwestHttpService {
43 fn default() -> Self {
44 Self::default_client()
45 }
46}
47
48impl Service<HttpRequest> for ReqwestHttpService {
49 type Response = HttpResponse;
50 type Error = Error;
51 type Future = BoxFuture<'static, std::result::Result<HttpResponse, Error>>;
52
53 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
54 Poll::Ready(Ok(()))
55 }
56
57 fn call(&mut self, request: HttpRequest) -> Self::Future {
58 let client = self.client.clone();
59 Box::pin(async move { send_reqwest(&client, request).await })
60 }
61}
62
63#[derive(Clone, Debug)]
65pub struct ReqwestStreamingHttpService {
66 client: reqwest::Client,
67}
68
69impl ReqwestStreamingHttpService {
70 pub fn new(client: reqwest::Client) -> Self {
72 Self { client }
73 }
74}
75
76impl Service<HttpRequest> for ReqwestStreamingHttpService {
77 type Response = HttpStreamingResponse;
78 type Error = Error;
79 type Future = BoxFuture<'static, std::result::Result<HttpStreamingResponse, Error>>;
80
81 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
82 Poll::Ready(Ok(()))
83 }
84
85 fn call(&mut self, request: HttpRequest) -> Self::Future {
86 let client = self.client.clone();
87 Box::pin(async move { send_reqwest_stream(&client, request).await })
88 }
89}
90
91pub struct ServiceBackend {
96 inner: Arc<Mutex<BoxHttpService>>,
97 streaming: Arc<Mutex<BoxStreamingHttpService>>,
98}
99
100impl ServiceBackend {
101 pub fn new<B, S>(buffered: B, streaming: S) -> Self
103 where
104 B: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
105 B::Future: Send + 'static,
106 S: Service<HttpRequest, Response = HttpStreamingResponse, Error = Error>
107 + Clone
108 + Send
109 + 'static,
110 S::Future: Send + 'static,
111 {
112 Self {
113 inner: Arc::new(Mutex::new(BoxHttpService::new(buffered))),
114 streaming: Arc::new(Mutex::new(BoxStreamingHttpService::new(streaming))),
115 }
116 }
117
118 pub fn from_boxes(buffered: BoxHttpService, streaming: BoxStreamingHttpService) -> Self {
120 Self {
121 inner: Arc::new(Mutex::new(buffered)),
122 streaming: Arc::new(Mutex::new(streaming)),
123 }
124 }
125
126 pub fn buffered_with_reqwest_streaming<S>(service: S, client: reqwest::Client) -> Self
128 where
129 S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
130 S::Future: Send + 'static,
131 {
132 Self::new(service, ReqwestStreamingHttpService::new(client))
133 }
134
135 #[deprecated(note = "use `from_boxes` or `transport_stack` which wires both paths")]
137 pub fn new_buffered_only<S>(service: S, client: reqwest::Client) -> Self
138 where
139 S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
140 S::Future: Send + 'static,
141 {
142 Self::buffered_with_reqwest_streaming(service, client)
143 }
144
145 #[deprecated(note = "use `from_boxes`")]
147 pub fn from_box(service: BoxHttpService, client: reqwest::Client) -> Self {
148 Self::buffered_with_reqwest_streaming(service, client)
149 }
150
151 pub fn clone_inner(&self) -> BoxHttpService {
153 self.inner
154 .lock()
155 .expect("ServiceBackend inner mutex poisoned")
156 .clone()
157 }
158}
159
160impl Clone for ServiceBackend {
161 fn clone(&self) -> Self {
162 Self {
163 inner: self.inner.clone(),
164 streaming: self.streaming.clone(),
165 }
166 }
167}
168
169#[async_trait]
170impl HttpBackend for ServiceBackend {
171 async fn execute(&self, request: HttpRequest) -> Result<HttpResponse> {
172 let mut service = self
173 .inner
174 .lock()
175 .expect("ServiceBackend inner mutex poisoned")
176 .clone();
177 service
178 .ready()
179 .await
180 .map_err(|e| Error::transport_message(format!("service not ready: {e}")))?;
181 service.call(request).await
182 }
183
184 async fn execute_stream(&self, request: HttpRequest) -> Result<HttpStreamingResponse> {
185 let mut service = self
186 .streaming
187 .lock()
188 .expect("ServiceBackend streaming mutex poisoned")
189 .clone();
190 service
191 .ready()
192 .await
193 .map_err(|e| Error::transport_message(format!("streaming service not ready: {e}")))?;
194 service.call(request).await
195 }
196}