pingora_proxy/subrequest/
mod.rs1use pingora_cache::lock::{CacheKeyLockImpl, LockStatus, WritePermit};
16use pingora_cache::CacheKey;
17use pingora_core::protocols::http::subrequest::server::{
18 HttpSession as SessionSubrequest, SubrequestHandle,
19};
20use std::any::Any;
21
22struct LockCtx {
23 write_permit: WritePermit,
24 cache_lock: &'static CacheKeyLockImpl,
25 key: CacheKey,
26}
27
28pub type UserCtx = Box<dyn Any + Sync + Send>;
30
31#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
32pub enum BodyMode {
33 #[default]
35 NoBody,
36 ExpectBody,
38}
39
40#[derive(Default)]
41pub struct CtxBuilder {
42 lock: Option<LockCtx>,
43 body_mode: BodyMode,
44 user_ctx: Option<UserCtx>,
45}
46
47impl CtxBuilder {
48 pub fn new() -> Self {
49 Self {
50 lock: None,
51 body_mode: BodyMode::NoBody,
52 user_ctx: None,
53 }
54 }
55
56 pub fn cache_write_lock(
57 mut self,
58 cache_lock: &'static CacheKeyLockImpl,
59 key: CacheKey,
60 write_permit: WritePermit,
61 ) -> Self {
62 self.lock = Some(LockCtx {
63 cache_lock,
64 key,
65 write_permit,
66 });
67 self
68 }
69
70 pub fn user_ctx(mut self, user_ctx: UserCtx) -> Self {
71 self.user_ctx = Some(user_ctx);
72 self
73 }
74
75 pub fn body_mode(mut self, body_mode: BodyMode) -> Self {
76 self.body_mode = body_mode;
77 self
78 }
79
80 pub fn build(self) -> Ctx {
81 Ctx {
82 lock: self.lock,
83 body_mode: self.body_mode,
84 user_ctx: self.user_ctx,
85 }
86 }
87}
88
89pub struct Ctx {
91 body_mode: BodyMode,
92 lock: Option<LockCtx>,
93 user_ctx: Option<UserCtx>,
95}
96
97impl Ctx {
98 pub fn builder() -> CtxBuilder {
100 CtxBuilder::new()
101 }
102
103 pub fn user_ctx(&self) -> Option<&UserCtx> {
105 self.user_ctx.as_ref()
106 }
107
108 pub fn user_ctx_mut(&mut self) -> Option<&mut UserCtx> {
110 self.user_ctx.as_mut()
111 }
112
113 pub fn release_write_lock(&mut self) {
116 if let Some(lock) = self.lock.take() {
117 lock.cache_lock
121 .release(&lock.key, lock.write_permit, LockStatus::TransientError);
122 }
123 }
124
125 pub fn take_write_lock(&mut self) -> Option<WritePermit> {
127 self.lock.take().map(|lock| lock.write_permit)
129 }
130
131 pub fn body_mode(&self) -> BodyMode {
133 self.body_mode
134 }
135}
136
137use crate::HttpSession;
138
139pub(crate) fn create_session(parsed_session: &HttpSession) -> (HttpSession, SubrequestHandle) {
140 let (session, handle) = SessionSubrequest::new_from_session(parsed_session);
141 (HttpSession::new_subrequest(session), handle)
142}
143
144#[tokio::test]
145async fn test_dummy_request() {
146 use tokio_test::io::Builder;
147
148 let input = b"GET / HTTP/1.1\r\n\r\n";
149 let mock_io = Builder::new().read(&input[..]).build();
150 let mut req = HttpSession::new_http1(Box::new(mock_io));
151 req.read_request().await.unwrap();
152 assert_eq!(input.as_slice(), req.to_h1_raw());
153
154 let (mut subreq, _handle) = create_session(&req);
155 subreq.read_request().await.unwrap();
156 assert_eq!(input.as_slice(), subreq.to_h1_raw());
157}