pingora_proxy/subrequest/
mod.rs1use bytes::Bytes;
16use pingora_cache::lock::{CacheKeyLockImpl, LockStatus, WritePermit};
17use pingora_cache::CacheKey;
18use pingora_core::protocols::http::subrequest::server::{
19 HttpSession as SessionSubrequest, SubrequestHandle,
20};
21use std::any::Any;
22
23pub mod pipe;
24
25struct LockCtx {
26 write_permit: WritePermit,
27 cache_lock: &'static CacheKeyLockImpl,
28 key: CacheKey,
29}
30
31pub(crate) struct InputBodyReader(std::vec::IntoIter<Bytes>);
33
34impl InputBodyReader {
35 pub fn read_body(&mut self) -> Option<Bytes> {
36 self.0.next()
37 }
38}
39
40pub type UserCtx = Box<dyn Any + Sync + Send>;
42
43#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
44pub enum BodyMode {
45 #[default]
47 NoBody,
48 ExpectBody,
50}
51
52#[derive(Default)]
53pub struct CtxBuilder {
54 lock: Option<LockCtx>,
55 body_mode: BodyMode,
56 user_ctx: Option<UserCtx>,
57}
58
59impl CtxBuilder {
60 pub fn new() -> Self {
61 Self {
62 lock: None,
63 body_mode: BodyMode::NoBody,
64 user_ctx: None,
65 }
66 }
67
68 pub fn cache_write_lock(
69 mut self,
70 cache_lock: &'static CacheKeyLockImpl,
71 key: CacheKey,
72 write_permit: WritePermit,
73 ) -> Self {
74 self.lock = Some(LockCtx {
75 cache_lock,
76 key,
77 write_permit,
78 });
79 self
80 }
81
82 pub fn user_ctx(mut self, user_ctx: UserCtx) -> Self {
83 self.user_ctx = Some(user_ctx);
84 self
85 }
86
87 pub fn body_mode(mut self, body_mode: BodyMode) -> Self {
88 self.body_mode = body_mode;
89 self
90 }
91
92 pub fn build(self) -> Ctx {
93 Ctx {
94 lock: self.lock,
95 body_mode: self.body_mode,
96 user_ctx: self.user_ctx,
97 }
98 }
99}
100
101pub struct Ctx {
103 body_mode: BodyMode,
104 lock: Option<LockCtx>,
105 user_ctx: Option<UserCtx>,
107}
108
109impl Ctx {
110 pub fn builder() -> CtxBuilder {
112 CtxBuilder::new()
113 }
114
115 pub fn user_ctx(&self) -> Option<&UserCtx> {
117 self.user_ctx.as_ref()
118 }
119
120 pub fn user_ctx_mut(&mut self) -> Option<&mut UserCtx> {
122 self.user_ctx.as_mut()
123 }
124
125 pub fn release_write_lock(&mut self) {
128 if let Some(lock) = self.lock.take() {
129 lock.cache_lock
133 .release(&lock.key, lock.write_permit, LockStatus::TransientError);
134 }
135 }
136
137 pub fn take_write_lock(&mut self) -> Option<WritePermit> {
139 self.lock.take().map(|lock| lock.write_permit)
141 }
142
143 pub fn body_mode(&self) -> BodyMode {
145 self.body_mode
146 }
147}
148
149use crate::HttpSession;
150
151pub(crate) fn create_session(parsed_session: &HttpSession) -> (HttpSession, SubrequestHandle) {
152 let (session, handle) = SessionSubrequest::new_from_session(parsed_session);
153 (HttpSession::new_subrequest(session), handle)
154}
155
156#[tokio::test]
157async fn test_dummy_request() {
158 use tokio_test::io::Builder;
159
160 let input = b"GET / HTTP/1.1\r\n\r\n";
161 let mock_io = Builder::new().read(&input[..]).build();
162 let mut req = HttpSession::new_http1(Box::new(mock_io));
163 req.read_request().await.unwrap();
164 assert_eq!(input.as_slice(), req.to_h1_raw());
165
166 let (mut subreq, _handle) = create_session(&req);
167 subreq.read_request().await.unwrap();
168 assert_eq!(input.as_slice(), subreq.to_h1_raw());
169}