better_fetch/tower/
service.rs1use std::sync::{Arc, Mutex};
2use std::task::{Context, Poll};
3
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6use tower::util::BoxCloneService;
7use tower::{Service, ServiceExt};
8
9use crate::backend::exec::send_reqwest;
10use crate::backend::{
11 HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse, ReqwestBackend,
12};
13use crate::{Error, Result};
14
15pub type BoxHttpService = BoxCloneService<HttpRequest, HttpResponse, Error>;
17
18#[derive(Clone, Debug)]
20pub struct ReqwestHttpService {
21 client: reqwest::Client,
22}
23
24impl ReqwestHttpService {
25 pub fn new(client: reqwest::Client) -> Self {
27 Self { client }
28 }
29
30 pub fn default_client() -> Self {
32 Self::new(reqwest::Client::new())
33 }
34
35 pub fn client(&self) -> &reqwest::Client {
37 &self.client
38 }
39}
40
41impl Default for ReqwestHttpService {
42 fn default() -> Self {
43 Self::default_client()
44 }
45}
46
47impl Service<HttpRequest> for ReqwestHttpService {
48 type Response = HttpResponse;
49 type Error = Error;
50 type Future = BoxFuture<'static, std::result::Result<HttpResponse, Error>>;
51
52 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
53 Poll::Ready(Ok(()))
54 }
55
56 fn call(&mut self, request: HttpRequest) -> Self::Future {
57 let client = self.client.clone();
58 Box::pin(async move { send_reqwest(&client, request).await })
59 }
60}
61
62pub struct ServiceBackend {
68 inner: Arc<Mutex<BoxHttpService>>,
69 streaming: ReqwestBackend,
70}
71
72impl ServiceBackend {
73 pub fn new<S>(service: S, streaming: ReqwestBackend) -> Self
75 where
76 S: Service<HttpRequest, Response = HttpResponse, Error = Error> + Clone + Send + 'static,
77 S::Future: Send + 'static,
78 {
79 Self {
80 inner: Arc::new(Mutex::new(BoxHttpService::new(service))),
81 streaming,
82 }
83 }
84
85 pub fn from_box(service: BoxHttpService, streaming: ReqwestBackend) -> Self {
87 Self {
88 inner: Arc::new(Mutex::new(service)),
89 streaming,
90 }
91 }
92
93 pub fn clone_inner(&self) -> BoxHttpService {
95 self.inner
96 .lock()
97 .expect("ServiceBackend inner mutex poisoned")
98 .clone()
99 }
100
101 pub fn streaming_backend(&self) -> &ReqwestBackend {
103 &self.streaming
104 }
105}
106
107impl Clone for ServiceBackend {
108 fn clone(&self) -> Self {
109 Self {
110 inner: self.inner.clone(),
111 streaming: self.streaming.clone(),
112 }
113 }
114}
115
116#[async_trait]
117impl HttpBackend for ServiceBackend {
118 async fn execute(&self, request: HttpRequest) -> Result<HttpResponse> {
119 let mut service = self
120 .inner
121 .lock()
122 .expect("ServiceBackend inner mutex poisoned")
123 .clone();
124 service
125 .ready()
126 .await
127 .map_err(|e| Error::transport_message(format!("service not ready: {e}")))?;
128 service.call(request).await
129 }
130
131 async fn execute_stream(&self, request: HttpRequest) -> Result<HttpStreamingResponse> {
132 self.streaming.execute_stream(request).await
133 }
134}