Skip to main content

better_fetch/tower/
service.rs

1use std::sync::{Arc, Mutex};
2use std::task::{Context, Poll};
3
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6use tower::util::BoxCloneService;
7use tower::{Service, ServiceExt};
8
9use crate::backend::exec::send_reqwest;
10use crate::backend::{
11    HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse, ReqwestBackend,
12};
13use crate::{Error, Result};
14
15/// Type-erased clone HTTP [`Service`](tower::Service).
16pub type BoxHttpService = BoxCloneService<HttpRequest, HttpResponse, Error>;
17
18/// Reqwest-backed [`Service`](tower::Service) for stacking Tower layers.
19#[derive(Clone, Debug)]
20pub struct ReqwestHttpService {
21    client: reqwest::Client,
22}
23
24impl ReqwestHttpService {
25    /// Creates a service that uses the given reqwest client.
26    pub fn new(client: reqwest::Client) -> Self {
27        Self { client }
28    }
29
30    /// Creates a service with a default reqwest client.
31    pub fn default_client() -> Self {
32        Self::new(reqwest::Client::new())
33    }
34
35    /// Returns the underlying reqwest client.
36    pub fn client(&self) -> &reqwest::Client {
37        &self.client
38    }
39}
40
41impl Default for ReqwestHttpService {
42    fn default() -> Self {
43        Self::default_client()
44    }
45}
46
47impl Service<HttpRequest> for ReqwestHttpService {
48    type Response = HttpResponse;
49    type Error = Error;
50    type Future = BoxFuture<'static, std::result::Result<HttpResponse, Error>>;
51
52    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
53        Poll::Ready(Ok(()))
54    }
55
56    fn call(&mut self, request: HttpRequest) -> Self::Future {
57        let client = self.client.clone();
58        Box::pin(async move { send_reqwest(&client, request).await })
59    }
60}
61
62/// Wraps a Tower [`Service`] as an [`HttpBackend`].
63///
64/// Buffered requests use the Tower stack. [`HttpBackend::execute_stream`] delegates to a
65/// separate [`ReqwestBackend`] (same reqwest client as [`ClientBuilder::transport_stack`](crate::ClientBuilder::transport_stack)).
66/// Tower request middleware does not apply to the streaming path.
67pub struct ServiceBackend {
68    inner: Arc<Mutex<BoxHttpService>>,
69    streaming: ReqwestBackend,
70}
71
72impl ServiceBackend {
73    /// Wraps a Tower service and a reqwest backend used for streaming responses.
74    pub fn new<S>(service: S, streaming: ReqwestBackend) -> Self
75    where
76        S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
77        S::Future: Send + 'static,
78    {
79        Self {
80            inner: Arc::new(Mutex::new(BoxHttpService::new(service))),
81            streaming,
82        }
83    }
84
85    /// Wraps an already-boxed transport stack with a streaming backend.
86    pub fn from_box(service: BoxHttpService, streaming: ReqwestBackend) -> Self {
87        Self {
88            inner: Arc::new(Mutex::new(service)),
89            streaming,
90        }
91    }
92
93    /// Returns a clone of the inner transport stack (for advanced testing).
94    pub fn clone_inner(&self) -> BoxHttpService {
95        self.inner
96            .lock()
97            .expect("ServiceBackend inner mutex poisoned")
98            .clone()
99    }
100
101    /// Returns the reqwest backend used for [`HttpBackend::execute_stream`](crate::backend::HttpBackend::execute_stream).
102    pub fn streaming_backend(&self) -> &ReqwestBackend {
103        &self.streaming
104    }
105}
106
107impl Clone for ServiceBackend {
108    fn clone(&self) -> Self {
109        Self {
110            inner: self.inner.clone(),
111            streaming: self.streaming.clone(),
112        }
113    }
114}
115
116#[async_trait]
117impl HttpBackend for ServiceBackend {
118    async fn execute(&self, request: HttpRequest) -> Result<HttpResponse> {
119        let mut service = self
120            .inner
121            .lock()
122            .expect("ServiceBackend inner mutex poisoned")
123            .clone();
124        service
125            .ready()
126            .await
127            .map_err(|e| Error::transport_message(format!("service not ready: {e}")))?;
128        service.call(request).await
129    }
130
131    async fn execute_stream(&self, request: HttpRequest) -> Result<HttpStreamingResponse> {
132        self.streaming.execute_stream(request).await
133    }
134}