Skip to main content

better_fetch/tower/
service.rs

1use std::sync::{Arc, Mutex};
2use std::task::{Context, Poll};
3
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6use tower::util::BoxCloneService;
7use tower::{Service, ServiceExt};
8
9use crate::backend::exec::{send_reqwest, send_reqwest_stream};
10use crate::backend::{HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse};
11use crate::{Error, Result};
12
13/// Type-erased clone HTTP [`Service`](tower::Service) for buffered responses.
14pub type BoxHttpService = BoxCloneService<HttpRequest, HttpResponse, Error>;
15
16/// Type-erased clone HTTP [`Service`](tower::Service) for streaming responses.
17pub type BoxStreamingHttpService = BoxCloneService<HttpRequest, HttpStreamingResponse, Error>;
18
19/// Reqwest-backed [`Service`](tower::Service) for stacking Tower layers.
20#[derive(Clone, Debug)]
21pub struct ReqwestHttpService {
22    client: reqwest::Client,
23}
24
25impl ReqwestHttpService {
26    /// Creates a service that uses the given reqwest client.
27    pub fn new(client: reqwest::Client) -> Self {
28        Self { client }
29    }
30
31    /// Creates a service with a default reqwest client.
32    pub fn default_client() -> Self {
33        Self::new(reqwest::Client::new())
34    }
35
36    /// Returns the underlying reqwest client.
37    pub fn client(&self) -> &reqwest::Client {
38        &self.client
39    }
40}
41
42impl Default for ReqwestHttpService {
43    fn default() -> Self {
44        Self::default_client()
45    }
46}
47
48impl Service<HttpRequest> for ReqwestHttpService {
49    type Response = HttpResponse;
50    type Error = Error;
51    type Future = BoxFuture<'static, std::result::Result<HttpResponse, Error>>;
52
53    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
54        Poll::Ready(Ok(()))
55    }
56
57    fn call(&mut self, request: HttpRequest) -> Self::Future {
58        let client = self.client.clone();
59        Box::pin(async move { send_reqwest(&client, request).await })
60    }
61}
62
63/// Reqwest-backed streaming [`Service`](tower::Service).
64#[derive(Clone, Debug)]
65pub struct ReqwestStreamingHttpService {
66    client: reqwest::Client,
67}
68
69impl ReqwestStreamingHttpService {
70    /// Creates a streaming service using the given reqwest client.
71    pub fn new(client: reqwest::Client) -> Self {
72        Self { client }
73    }
74}
75
76impl Service<HttpRequest> for ReqwestStreamingHttpService {
77    type Response = HttpStreamingResponse;
78    type Error = Error;
79    type Future = BoxFuture<'static, std::result::Result<HttpStreamingResponse, Error>>;
80
81    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
82        Poll::Ready(Ok(()))
83    }
84
85    fn call(&mut self, request: HttpRequest) -> Self::Future {
86        let client = self.client.clone();
87        Box::pin(async move { send_reqwest_stream(&client, request).await })
88    }
89}
90
91/// Wraps Tower [`Service`] stacks as an [`HttpBackend`].
92///
93/// Both buffered and streaming paths run through Tower middleware when configured via
94/// [`ClientBuilder::transport_stack`](crate::ClientBuilder::transport_stack).
95pub struct ServiceBackend {
96    inner: Arc<Mutex<BoxHttpService>>,
97    streaming: Arc<Mutex<BoxStreamingHttpService>>,
98}
99
100impl ServiceBackend {
101    /// Wraps buffered and streaming Tower services.
102    pub fn new<B, S>(buffered: B, streaming: S) -> Self
103    where
104        B: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
105        B::Future: Send + 'static,
106        S: Service<HttpRequest, Response = HttpStreamingResponse, Error = Error>
107            + Clone
108            + Send
109            + 'static,
110        S::Future: Send + 'static,
111    {
112        Self {
113            inner: Arc::new(Mutex::new(BoxHttpService::new(buffered))),
114            streaming: Arc::new(Mutex::new(BoxStreamingHttpService::new(streaming))),
115        }
116    }
117
118    /// Wraps already-boxed buffered and streaming transport stacks.
119    pub fn from_boxes(buffered: BoxHttpService, streaming: BoxStreamingHttpService) -> Self {
120        Self {
121            inner: Arc::new(Mutex::new(buffered)),
122            streaming: Arc::new(Mutex::new(streaming)),
123        }
124    }
125
126    /// Buffered Tower stack + plain reqwest streaming (legacy).
127    pub fn buffered_with_reqwest_streaming<S>(service: S, client: reqwest::Client) -> Self
128    where
129        S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
130        S::Future: Send + 'static,
131    {
132        Self::new(service, ReqwestStreamingHttpService::new(client))
133    }
134
135    /// Wraps a Tower service and a reqwest backend used for streaming responses.
136    #[deprecated(note = "use `from_boxes` or `transport_stack` which wires both paths")]
137    pub fn new_buffered_only<S>(service: S, client: reqwest::Client) -> Self
138    where
139        S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
140        S::Future: Send + 'static,
141    {
142        Self::buffered_with_reqwest_streaming(service, client)
143    }
144
145    /// Wraps an already-boxed transport stack with reqwest-only streaming.
146    #[deprecated(note = "use `from_boxes`")]
147    pub fn from_box(service: BoxHttpService, client: reqwest::Client) -> Self {
148        Self::buffered_with_reqwest_streaming(service, client)
149    }
150
151    /// Returns a clone of the inner transport stack (for advanced testing).
152    pub fn clone_inner(&self) -> BoxHttpService {
153        self.inner
154            .lock()
155            .expect("ServiceBackend inner mutex poisoned")
156            .clone()
157    }
158}
159
160impl Clone for ServiceBackend {
161    fn clone(&self) -> Self {
162        Self {
163            inner: self.inner.clone(),
164            streaming: self.streaming.clone(),
165        }
166    }
167}
168
169#[async_trait]
170impl HttpBackend for ServiceBackend {
171    async fn execute(&self, request: HttpRequest) -> Result<HttpResponse> {
172        let mut service = self
173            .inner
174            .lock()
175            .expect("ServiceBackend inner mutex poisoned")
176            .clone();
177        service
178            .ready()
179            .await
180            .map_err(|e| Error::transport_message(format!("service not ready: {e}")))?;
181        service.call(request).await
182    }
183
184    async fn execute_stream(&self, request: HttpRequest) -> Result<HttpStreamingResponse> {
185        let mut service = self
186            .streaming
187            .lock()
188            .expect("ServiceBackend streaming mutex poisoned")
189            .clone();
190        service
191            .ready()
192            .await
193            .map_err(|e| Error::transport_message(format!("streaming service not ready: {e}")))?;
194        service.call(request).await
195    }
196}