capybara_core/pipeline/http/
pipeline.rs

1use std::borrow::Cow;
2use std::net::{IpAddr, SocketAddr};
3use std::sync::Arc;
4
5use anyhow::Result;
6use bitflags::bitflags;
7use hashbrown::hash_map::Entry;
8use hashbrown::HashMap;
9use smallvec::{smallvec, SmallVec};
10
11use capybara_util::cachestr::Cachestr;
12
13use crate::pipeline::misc;
14use crate::proto::UpstreamKey;
15use crate::protocol::http::{Headers, Method, RequestLine, Response, StatusLine};
16
17type Pipelines = SmallVec<[Arc<dyn HttpPipeline>; 8]>;
18
19#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
20pub struct HttpPipelineFlags(u32);
21
22bitflags! {
23    impl HttpPipelineFlags: u32 {
24        const READ_REQUEST_ALL = 1 << 0;
25        const WRITE_REQUEST_ALL = 1 << 1;
26        const READ_RESPONSE_ALL = 1 << 2;
27        const WRITE_RESPONSE_ALL = 1 << 3;
28    }
29}
30
31pub(crate) struct HttpContextBuilder {
32    client_addr: SocketAddr,
33    flags: HttpContextFlags,
34    pipelines: Pipelines,
35}
36
37impl HttpContextBuilder {
38    #[inline]
39    pub(crate) fn pipeline<P>(self, pipeline: P) -> Self
40    where
41        P: HttpPipeline,
42    {
43        self.pipeline_arc(Arc::new(pipeline))
44    }
45
46    #[inline]
47    pub(crate) fn pipeline_arc(mut self, pipeline: Arc<dyn HttpPipeline>) -> Self {
48        self.pipelines.push(pipeline);
49        self
50    }
51
52    pub(crate) fn flags(mut self, flags: HttpContextFlags) -> Self {
53        self.flags = flags;
54        self
55    }
56
57    pub(crate) fn build(self) -> HttpContext {
58        let Self {
59            client_addr,
60            pipelines,
61            flags,
62        } = self;
63        HttpContext {
64            id: misc::sequence(),
65            flags,
66            client_addr,
67            pipelines: (0, pipelines),
68            upstream: None,
69            request_ctx: Default::default(),
70            response_ctx: Default::default(),
71            immediate_response: None,
72        }
73    }
74}
75
76#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash)]
77pub(crate) struct HttpContextFlags(u32);
78
79bitflags! {
80    impl HttpContextFlags: u32 {
81        const HTTPS = 1 << 0;
82        const DOWNSTREAM_EXHAUSTED = 1 << 1;
83    }
84}
85
86pub(crate) enum AnyString {
87    Cache(Cachestr),
88    String(String),
89    Cow(Cow<'static, str>),
90    Arc(Arc<String>),
91}
92
93impl AsRef<str> for AnyString {
94    fn as_ref(&self) -> &str {
95        match self {
96            AnyString::Cache(c) => c.as_ref(),
97            AnyString::String(s) => s.as_ref(),
98            AnyString::Cow(c) => c.as_ref(),
99            AnyString::Arc(a) => a.as_ref(),
100        }
101    }
102}
103
104pub(crate) enum HeaderOperator {
105    Drop,
106    Add(AnyString),
107}
108
109#[derive(Default)]
110pub struct HeadersContext {
111    pub(crate) inner: HashMap<Cachestr, SmallVec<[HeaderOperator; 8]>>,
112}
113
114impl HeadersContext {
115    pub(crate) fn reset(&mut self) {
116        self.inner.clear();
117    }
118
119    pub fn len(&self) -> usize {
120        self.inner.len()
121    }
122
123    pub fn is_empty(&self) -> bool {
124        self.inner.is_empty()
125    }
126
127    pub fn exist(&self, key: &str) -> bool {
128        let key = Cachestr::from(key);
129        self.inner.contains_key(&key)
130    }
131
132    pub fn _exist(&self, key: Cachestr) -> bool {
133        self.inner.contains_key(&key)
134    }
135
136    #[inline]
137    pub(crate) fn _remove(&mut self, header: Cachestr) {
138        let v = smallvec![HeaderOperator::Drop];
139        self.inner.insert(header, v);
140    }
141
142    pub fn remove<A>(&mut self, header: A)
143    where
144        A: AsRef<str>,
145    {
146        let k = Cachestr::from(header.as_ref());
147        let v = smallvec![HeaderOperator::Drop];
148        self.inner.insert(k, v);
149    }
150
151    #[inline]
152    pub(crate) fn _replace(&mut self, header: Cachestr, value: AnyString) {
153        let v = smallvec![
154            HeaderOperator::Drop,
155            HeaderOperator::Add(AnyString::Cache(Cachestr::from(value.as_ref())))
156        ];
157
158        self.inner.insert(header, v);
159    }
160
161    pub fn replace<K, V>(&mut self, header: K, value: V)
162    where
163        K: AsRef<str>,
164        V: AsRef<str>,
165    {
166        let k = Cachestr::from(header.as_ref());
167        let v = smallvec![
168            HeaderOperator::Drop,
169            HeaderOperator::Add(AnyString::Cache(Cachestr::from(value.as_ref())))
170        ];
171
172        self.inner.insert(k, v);
173    }
174
175    #[inline]
176    pub(crate) fn _append(&mut self, header: Cachestr, value: AnyString) {
177        match self.inner.entry(header) {
178            Entry::Occupied(mut it) => {
179                it.get_mut().push(HeaderOperator::Add(value));
180            }
181            Entry::Vacant(it) => {
182                it.insert(smallvec![HeaderOperator::Add(value)]);
183            }
184        }
185    }
186
187    pub fn append<K, V>(&mut self, header: K, value: V)
188    where
189        K: AsRef<str>,
190        V: AsRef<str>,
191    {
192        let k = Cachestr::from(header.as_ref());
193        let v = AnyString::Cache(Cachestr::from(value.as_ref()));
194        self._append(k, v)
195    }
196}
197
198#[derive(Default)]
199pub struct RequestContext {
200    pub(crate) headers: HeadersContext,
201    pub(crate) method: Option<Method>,
202}
203
204impl RequestContext {
205    #[inline]
206    pub fn headers(&mut self) -> &mut HeadersContext {
207        &mut self.headers
208    }
209
210    pub fn method(&mut self, method: Method) {
211        self.method.replace(method);
212    }
213
214    pub(crate) fn reset(&mut self) {
215        self.headers.reset();
216    }
217}
218
219#[derive(Default)]
220pub struct ResponseContext {
221    pub(crate) headers: HeadersContext,
222}
223
224impl ResponseContext {
225    #[inline]
226    pub fn headers(&mut self) -> &mut HeadersContext {
227        &mut self.headers
228    }
229
230    pub(crate) fn reset(&mut self) {
231        self.headers.reset()
232    }
233}
234
235pub struct HttpContext {
236    pub(crate) id: u64,
237    pub(crate) flags: HttpContextFlags,
238    pub(crate) client_addr: SocketAddr,
239    pub(crate) upstream: Option<Arc<UpstreamKey>>,
240    pub(crate) pipelines: (usize, SmallVec<[Arc<dyn HttpPipeline>; 8]>),
241    pub(crate) request_ctx: RequestContext,
242    pub(crate) response_ctx: ResponseContext,
243    pub(crate) immediate_response: Option<Response>,
244}
245
246impl HttpContext {
247    pub(crate) fn builder(client_addr: SocketAddr) -> HttpContextBuilder {
248        HttpContextBuilder {
249            client_addr,
250            flags: Default::default(),
251            pipelines: smallvec![],
252        }
253    }
254
255    #[cfg(test)]
256    pub(crate) fn fake() -> HttpContext {
257        HttpContext::builder("127.0.0.1:12345".parse().unwrap()).build()
258    }
259
260    #[inline]
261    pub fn id(&self) -> u64 {
262        self.id
263    }
264
265    #[inline]
266    pub fn client_addr(&self) -> SocketAddr {
267        self.client_addr
268    }
269
270    #[inline]
271    pub fn request(&mut self) -> &mut RequestContext {
272        &mut self.request_ctx
273    }
274
275    #[inline]
276    pub fn response(&mut self) -> &mut ResponseContext {
277        &mut self.response_ctx
278    }
279
280    pub fn respond(&mut self, response: Response) {
281        self.immediate_response.replace(response);
282    }
283
284    #[inline]
285    pub(crate) fn flags(&self) -> HttpContextFlags {
286        self.flags
287    }
288
289    #[inline]
290    pub(crate) fn flags_mut(&mut self) -> &mut HttpContextFlags {
291        &mut self.flags
292    }
293
294    #[inline]
295    pub(crate) fn pipeline(&mut self) -> Option<Arc<dyn HttpPipeline>> {
296        if let Some(root) = self.pipelines.1.first() {
297            self.pipelines.0 = 1;
298            return Some(Clone::clone(root));
299        }
300        None
301    }
302
303    #[inline]
304    pub(crate) fn upstream(&self) -> Option<Arc<UpstreamKey>> {
305        self.upstream.clone()
306    }
307
308    #[inline]
309    pub fn set_upstream(&mut self, upstream: UpstreamKey) {
310        self.upstream.replace(upstream.into());
311    }
312
313    #[inline]
314    pub(crate) fn reset(&mut self) {
315        self.request_ctx.reset();
316        self.response_ctx.reset();
317        self.pipelines.0 = 0;
318        self.upstream.take();
319        self.immediate_response.take();
320        self.flags = HttpContextFlags::default();
321    }
322
323    /// Returns the next http pipeline.
324    #[inline]
325    #[allow(clippy::should_implement_trait)]
326    pub fn next(&mut self) -> Option<Arc<dyn HttpPipeline>> {
327        match self.pipelines.1.get(self.pipelines.0) {
328            None => None,
329            Some(next) => {
330                self.pipelines.0 += 1;
331                Some(Clone::clone(next))
332            }
333        }
334    }
335}
336
337impl Default for HttpContext {
338    fn default() -> Self {
339        HttpContext::builder(SocketAddr::new(IpAddr::from([127, 0, 0, 1]), 12345)).build()
340    }
341}
342
343#[async_trait::async_trait]
344pub trait HttpPipeline: Send + Sync + 'static {
345    fn flags(&self) -> HttpPipelineFlags {
346        Default::default()
347    }
348
349    async fn initialize(&self) -> Result<()> {
350        Ok(())
351    }
352
353    async fn handle_request_line(
354        &self,
355        ctx: &mut HttpContext,
356        request_line: &mut RequestLine,
357    ) -> Result<()> {
358        match ctx.next() {
359            None => Ok(()),
360            Some(next) => next.handle_request_line(ctx, request_line).await,
361        }
362    }
363
364    async fn handle_request_headers(
365        &self,
366        ctx: &mut HttpContext,
367        headers: &mut Headers,
368    ) -> Result<()> {
369        match ctx.next() {
370            None => Ok(()),
371            Some(next) => next.handle_request_headers(ctx, headers).await,
372        }
373    }
374
375    async fn handle_status_line(
376        &self,
377        ctx: &mut HttpContext,
378        status_line: &mut StatusLine,
379    ) -> Result<()> {
380        match ctx.next() {
381            None => Ok(()),
382            Some(next) => next.handle_status_line(ctx, status_line).await,
383        }
384    }
385
386    async fn handle_response_headers(
387        &self,
388        ctx: &mut HttpContext,
389        headers: &mut Headers,
390    ) -> Result<()> {
391        match ctx.next() {
392            None => Ok(()),
393            Some(next) => next.handle_response_headers(ctx, headers).await,
394        }
395    }
396}