1use hitbox::concurrency::ConcurrencyManager;
8use hitbox::config::CacheConfig;
9use hitbox_core::{DisabledOffload, Offload};
10use std::sync::Arc;
11
12use hitbox::{backend::CacheBackend, fsm::CacheFuture};
13use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
14use http::header::HeaderName;
15use http::{Request, Response};
16use hyper::body::Body as HttpBody;
17use tower::Service;
18
19use crate::future::CacheServiceFuture;
20use crate::upstream::TowerUpstream;
21
22pub struct CacheService<S, B, C, CM, O = DisabledOffload> {
45 upstream: S,
46 backend: Arc<B>,
47 configuration: C,
48 offload: O,
49 concurrency_manager: CM,
50 cache_status_header: HeaderName,
51}
52
53impl<S, B, C, CM, O> CacheService<S, B, C, CM, O> {
54 pub fn new(
61 upstream: S,
62 backend: Arc<B>,
63 configuration: C,
64 offload: O,
65 concurrency_manager: CM,
66 cache_status_header: HeaderName,
67 ) -> Self {
68 CacheService {
69 upstream,
70 backend,
71 configuration,
72 offload,
73 concurrency_manager,
74 cache_status_header,
75 }
76 }
77}
78
79impl<S, B, C, CM, O> Clone for CacheService<S, B, C, CM, O>
80where
81 S: Clone,
82 B: Clone,
83 C: Clone,
84 CM: Clone,
85 O: Clone,
86{
87 fn clone(&self) -> Self {
88 Self {
89 upstream: self.upstream.clone(),
90 backend: self.backend.clone(),
91 configuration: self.configuration.clone(),
92 offload: self.offload.clone(),
93 concurrency_manager: self.concurrency_manager.clone(),
94 cache_status_header: self.cache_status_header.clone(),
95 }
96 }
97}
98
99impl<S, B, C, CM, O, ReqBody, ResBody> Service<Request<ReqBody>> for CacheService<S, B, C, CM, O>
100where
101 S: Service<Request<BufferedBody<ReqBody>>, Response = Response<ResBody>>
102 + Clone
103 + Send
104 + 'static,
105 B: CacheBackend + Clone + Send + Sync + 'static,
106 S::Future: Send,
107 C: CacheConfig<CacheableHttpRequest<ReqBody>, CacheableHttpResponse<ResBody>>,
108 CM: ConcurrencyManager<Result<CacheableHttpResponse<ResBody>, S::Error>> + Clone + 'static,
109 O: Offload<'static> + Clone,
110 ReqBody: HttpBody + Send + 'static,
111 ReqBody::Error: Send,
112 ResBody: HttpBody + Send + 'static,
113 ResBody::Error: Send,
114 ResBody::Data: Send,
115 S::Error: Send,
116{
117 type Response = Response<BufferedBody<ResBody>>;
118 type Error = S::Error;
119 type Future = CacheServiceFuture<
120 CacheFuture<
121 'static,
122 B,
123 CacheableHttpRequest<ReqBody>,
124 Result<CacheableHttpResponse<ResBody>, S::Error>,
125 TowerUpstream<S, ReqBody, ResBody>,
126 C::RequestPredicate,
127 C::ResponsePredicate,
128 C::Extractor,
129 CM,
130 O,
131 >,
132 ResBody,
133 S::Error,
134 >;
135
136 fn poll_ready(
137 &mut self,
138 cx: &mut std::task::Context<'_>,
139 ) -> std::task::Poll<Result<(), Self::Error>> {
140 self.upstream.poll_ready(cx)
141 }
142
143 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
144 let configuration = &self.configuration;
145
146 let (parts, body) = req.into_parts();
148 let buffered_request = Request::from_parts(parts, BufferedBody::Passthrough(body));
149 let cacheable_req = CacheableHttpRequest::from_request(buffered_request);
150
151 let upstream = TowerUpstream::new(self.upstream.clone());
153
154 let cache_future = CacheFuture::new(
156 self.backend.clone(),
157 cacheable_req,
158 upstream,
159 configuration.request_predicates(),
160 configuration.response_predicates(),
161 configuration.extractors(),
162 Arc::new(configuration.policy().clone()),
163 self.offload.clone(),
164 self.concurrency_manager.clone(),
165 );
166
167 CacheServiceFuture::new(cache_future, self.cache_status_header.clone())
169 }
170}