capybara_core/pipeline/http/
pipeline.rs1use std::borrow::Cow;
2use std::net::{IpAddr, SocketAddr};
3use std::sync::Arc;
4
5use anyhow::Result;
6use bitflags::bitflags;
7use hashbrown::hash_map::Entry;
8use hashbrown::HashMap;
9use smallvec::{smallvec, SmallVec};
10
11use capybara_util::cachestr::Cachestr;
12
13use crate::pipeline::misc;
14use crate::proto::UpstreamKey;
15use crate::protocol::http::{Headers, Method, RequestLine, Response, StatusLine};
16
17type Pipelines = SmallVec<[Arc<dyn HttpPipeline>; 8]>;
18
19#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
20pub struct HttpPipelineFlags(u32);
21
22bitflags! {
23 impl HttpPipelineFlags: u32 {
24 const READ_REQUEST_ALL = 1 << 0;
25 const WRITE_REQUEST_ALL = 1 << 1;
26 const READ_RESPONSE_ALL = 1 << 2;
27 const WRITE_RESPONSE_ALL = 1 << 3;
28 }
29}
30
31pub(crate) struct HttpContextBuilder {
32 client_addr: SocketAddr,
33 flags: HttpContextFlags,
34 pipelines: Pipelines,
35}
36
37impl HttpContextBuilder {
38 #[inline]
39 pub(crate) fn pipeline<P>(self, pipeline: P) -> Self
40 where
41 P: HttpPipeline,
42 {
43 self.pipeline_arc(Arc::new(pipeline))
44 }
45
46 #[inline]
47 pub(crate) fn pipeline_arc(mut self, pipeline: Arc<dyn HttpPipeline>) -> Self {
48 self.pipelines.push(pipeline);
49 self
50 }
51
52 pub(crate) fn flags(mut self, flags: HttpContextFlags) -> Self {
53 self.flags = flags;
54 self
55 }
56
57 pub(crate) fn build(self) -> HttpContext {
58 let Self {
59 client_addr,
60 pipelines,
61 flags,
62 } = self;
63 HttpContext {
64 id: misc::sequence(),
65 flags,
66 client_addr,
67 pipelines: (0, pipelines),
68 upstream: None,
69 request_ctx: Default::default(),
70 response_ctx: Default::default(),
71 immediate_response: None,
72 }
73 }
74}
75
76#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash)]
77pub(crate) struct HttpContextFlags(u32);
78
79bitflags! {
80 impl HttpContextFlags: u32 {
81 const HTTPS = 1 << 0;
82 const DOWNSTREAM_EXHAUSTED = 1 << 1;
83 }
84}
85
86pub(crate) enum AnyString {
87 Cache(Cachestr),
88 String(String),
89 Cow(Cow<'static, str>),
90 Arc(Arc<String>),
91}
92
93impl AsRef<str> for AnyString {
94 fn as_ref(&self) -> &str {
95 match self {
96 AnyString::Cache(c) => c.as_ref(),
97 AnyString::String(s) => s.as_ref(),
98 AnyString::Cow(c) => c.as_ref(),
99 AnyString::Arc(a) => a.as_ref(),
100 }
101 }
102}
103
104pub(crate) enum HeaderOperator {
105 Drop,
106 Add(AnyString),
107}
108
109#[derive(Default)]
110pub struct HeadersContext {
111 pub(crate) inner: HashMap<Cachestr, SmallVec<[HeaderOperator; 8]>>,
112}
113
114impl HeadersContext {
115 pub(crate) fn reset(&mut self) {
116 self.inner.clear();
117 }
118
119 pub fn len(&self) -> usize {
120 self.inner.len()
121 }
122
123 pub fn is_empty(&self) -> bool {
124 self.inner.is_empty()
125 }
126
127 pub fn exist(&self, key: &str) -> bool {
128 let key = Cachestr::from(key);
129 self.inner.contains_key(&key)
130 }
131
132 pub fn _exist(&self, key: Cachestr) -> bool {
133 self.inner.contains_key(&key)
134 }
135
136 #[inline]
137 pub(crate) fn _remove(&mut self, header: Cachestr) {
138 let v = smallvec![HeaderOperator::Drop];
139 self.inner.insert(header, v);
140 }
141
142 pub fn remove<A>(&mut self, header: A)
143 where
144 A: AsRef<str>,
145 {
146 let k = Cachestr::from(header.as_ref());
147 let v = smallvec![HeaderOperator::Drop];
148 self.inner.insert(k, v);
149 }
150
151 #[inline]
152 pub(crate) fn _replace(&mut self, header: Cachestr, value: AnyString) {
153 let v = smallvec![
154 HeaderOperator::Drop,
155 HeaderOperator::Add(AnyString::Cache(Cachestr::from(value.as_ref())))
156 ];
157
158 self.inner.insert(header, v);
159 }
160
161 pub fn replace<K, V>(&mut self, header: K, value: V)
162 where
163 K: AsRef<str>,
164 V: AsRef<str>,
165 {
166 let k = Cachestr::from(header.as_ref());
167 let v = smallvec![
168 HeaderOperator::Drop,
169 HeaderOperator::Add(AnyString::Cache(Cachestr::from(value.as_ref())))
170 ];
171
172 self.inner.insert(k, v);
173 }
174
175 #[inline]
176 pub(crate) fn _append(&mut self, header: Cachestr, value: AnyString) {
177 match self.inner.entry(header) {
178 Entry::Occupied(mut it) => {
179 it.get_mut().push(HeaderOperator::Add(value));
180 }
181 Entry::Vacant(it) => {
182 it.insert(smallvec![HeaderOperator::Add(value)]);
183 }
184 }
185 }
186
187 pub fn append<K, V>(&mut self, header: K, value: V)
188 where
189 K: AsRef<str>,
190 V: AsRef<str>,
191 {
192 let k = Cachestr::from(header.as_ref());
193 let v = AnyString::Cache(Cachestr::from(value.as_ref()));
194 self._append(k, v)
195 }
196}
197
198#[derive(Default)]
199pub struct RequestContext {
200 pub(crate) headers: HeadersContext,
201 pub(crate) method: Option<Method>,
202}
203
204impl RequestContext {
205 #[inline]
206 pub fn headers(&mut self) -> &mut HeadersContext {
207 &mut self.headers
208 }
209
210 pub fn method(&mut self, method: Method) {
211 self.method.replace(method);
212 }
213
214 pub(crate) fn reset(&mut self) {
215 self.headers.reset();
216 }
217}
218
219#[derive(Default)]
220pub struct ResponseContext {
221 pub(crate) headers: HeadersContext,
222}
223
224impl ResponseContext {
225 #[inline]
226 pub fn headers(&mut self) -> &mut HeadersContext {
227 &mut self.headers
228 }
229
230 pub(crate) fn reset(&mut self) {
231 self.headers.reset()
232 }
233}
234
235pub struct HttpContext {
236 pub(crate) id: u64,
237 pub(crate) flags: HttpContextFlags,
238 pub(crate) client_addr: SocketAddr,
239 pub(crate) upstream: Option<Arc<UpstreamKey>>,
240 pub(crate) pipelines: (usize, SmallVec<[Arc<dyn HttpPipeline>; 8]>),
241 pub(crate) request_ctx: RequestContext,
242 pub(crate) response_ctx: ResponseContext,
243 pub(crate) immediate_response: Option<Response>,
244}
245
246impl HttpContext {
247 pub(crate) fn builder(client_addr: SocketAddr) -> HttpContextBuilder {
248 HttpContextBuilder {
249 client_addr,
250 flags: Default::default(),
251 pipelines: smallvec![],
252 }
253 }
254
255 #[cfg(test)]
256 pub(crate) fn fake() -> HttpContext {
257 HttpContext::builder("127.0.0.1:12345".parse().unwrap()).build()
258 }
259
260 #[inline]
261 pub fn id(&self) -> u64 {
262 self.id
263 }
264
265 #[inline]
266 pub fn client_addr(&self) -> SocketAddr {
267 self.client_addr
268 }
269
270 #[inline]
271 pub fn request(&mut self) -> &mut RequestContext {
272 &mut self.request_ctx
273 }
274
275 #[inline]
276 pub fn response(&mut self) -> &mut ResponseContext {
277 &mut self.response_ctx
278 }
279
280 pub fn respond(&mut self, response: Response) {
281 self.immediate_response.replace(response);
282 }
283
284 #[inline]
285 pub(crate) fn flags(&self) -> HttpContextFlags {
286 self.flags
287 }
288
289 #[inline]
290 pub(crate) fn flags_mut(&mut self) -> &mut HttpContextFlags {
291 &mut self.flags
292 }
293
294 #[inline]
295 pub(crate) fn pipeline(&mut self) -> Option<Arc<dyn HttpPipeline>> {
296 if let Some(root) = self.pipelines.1.first() {
297 self.pipelines.0 = 1;
298 return Some(Clone::clone(root));
299 }
300 None
301 }
302
303 #[inline]
304 pub(crate) fn upstream(&self) -> Option<Arc<UpstreamKey>> {
305 self.upstream.clone()
306 }
307
308 #[inline]
309 pub fn set_upstream(&mut self, upstream: UpstreamKey) {
310 self.upstream.replace(upstream.into());
311 }
312
313 #[inline]
314 pub(crate) fn reset(&mut self) {
315 self.request_ctx.reset();
316 self.response_ctx.reset();
317 self.pipelines.0 = 0;
318 self.upstream.take();
319 self.immediate_response.take();
320 self.flags = HttpContextFlags::default();
321 }
322
323 #[inline]
325 #[allow(clippy::should_implement_trait)]
326 pub fn next(&mut self) -> Option<Arc<dyn HttpPipeline>> {
327 match self.pipelines.1.get(self.pipelines.0) {
328 None => None,
329 Some(next) => {
330 self.pipelines.0 += 1;
331 Some(Clone::clone(next))
332 }
333 }
334 }
335}
336
337impl Default for HttpContext {
338 fn default() -> Self {
339 HttpContext::builder(SocketAddr::new(IpAddr::from([127, 0, 0, 1]), 12345)).build()
340 }
341}
342
343#[async_trait::async_trait]
344pub trait HttpPipeline: Send + Sync + 'static {
345 fn flags(&self) -> HttpPipelineFlags {
346 Default::default()
347 }
348
349 async fn initialize(&self) -> Result<()> {
350 Ok(())
351 }
352
353 async fn handle_request_line(
354 &self,
355 ctx: &mut HttpContext,
356 request_line: &mut RequestLine,
357 ) -> Result<()> {
358 match ctx.next() {
359 None => Ok(()),
360 Some(next) => next.handle_request_line(ctx, request_line).await,
361 }
362 }
363
364 async fn handle_request_headers(
365 &self,
366 ctx: &mut HttpContext,
367 headers: &mut Headers,
368 ) -> Result<()> {
369 match ctx.next() {
370 None => Ok(()),
371 Some(next) => next.handle_request_headers(ctx, headers).await,
372 }
373 }
374
375 async fn handle_status_line(
376 &self,
377 ctx: &mut HttpContext,
378 status_line: &mut StatusLine,
379 ) -> Result<()> {
380 match ctx.next() {
381 None => Ok(()),
382 Some(next) => next.handle_status_line(ctx, status_line).await,
383 }
384 }
385
386 async fn handle_response_headers(
387 &self,
388 ctx: &mut HttpContext,
389 headers: &mut Headers,
390 ) -> Result<()> {
391 match ctx.next() {
392 None => Ok(()),
393 Some(next) => next.handle_response_headers(ctx, headers).await,
394 }
395 }
396}