Skip to main content

pingora_proxy/subrequest/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pingora_cache::lock::{CacheKeyLockImpl, LockStatus, WritePermit};
16use pingora_cache::CacheKey;
17use pingora_core::protocols::http::subrequest::server::{
18    HttpSession as SessionSubrequest, SubrequestHandle,
19};
20use std::any::Any;
21
22struct LockCtx {
23    write_permit: WritePermit,
24    cache_lock: &'static CacheKeyLockImpl,
25    key: CacheKey,
26}
27
28/// Optional user-defined subrequest context.
29pub type UserCtx = Box<dyn Any + Sync + Send>;
30
31#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
32pub enum BodyMode {
33    /// No body to be sent for subrequest.
34    #[default]
35    NoBody,
36    /// Waiting on body if needed.
37    ExpectBody,
38}
39
40#[derive(Default)]
41pub struct CtxBuilder {
42    lock: Option<LockCtx>,
43    body_mode: BodyMode,
44    user_ctx: Option<UserCtx>,
45}
46
47impl CtxBuilder {
48    pub fn new() -> Self {
49        Self {
50            lock: None,
51            body_mode: BodyMode::NoBody,
52            user_ctx: None,
53        }
54    }
55
56    pub fn cache_write_lock(
57        mut self,
58        cache_lock: &'static CacheKeyLockImpl,
59        key: CacheKey,
60        write_permit: WritePermit,
61    ) -> Self {
62        self.lock = Some(LockCtx {
63            cache_lock,
64            key,
65            write_permit,
66        });
67        self
68    }
69
70    pub fn user_ctx(mut self, user_ctx: UserCtx) -> Self {
71        self.user_ctx = Some(user_ctx);
72        self
73    }
74
75    pub fn body_mode(mut self, body_mode: BodyMode) -> Self {
76        self.body_mode = body_mode;
77        self
78    }
79
80    pub fn build(self) -> Ctx {
81        Ctx {
82            lock: self.lock,
83            body_mode: self.body_mode,
84            user_ctx: self.user_ctx,
85        }
86    }
87}
88
89/// Context struct to share state across the parent and sub-request.
90pub struct Ctx {
91    body_mode: BodyMode,
92    lock: Option<LockCtx>,
93    // User-defined custom context.
94    user_ctx: Option<UserCtx>,
95}
96
97impl Ctx {
98    /// Create a [`CtxBuilder`] in order to make a new subrequest `Ctx`.
99    pub fn builder() -> CtxBuilder {
100        CtxBuilder::new()
101    }
102
103    /// Get a reference to the extensions inside this subrequest.
104    pub fn user_ctx(&self) -> Option<&UserCtx> {
105        self.user_ctx.as_ref()
106    }
107
108    /// Get a mutable reference to the extensions inside this subrequest.
109    pub fn user_ctx_mut(&mut self) -> Option<&mut UserCtx> {
110        self.user_ctx.as_mut()
111    }
112
113    /// Release the write lock from the subrequest (to clean up a write permit
114    /// that will not be used in the cache key lock).
115    pub fn release_write_lock(&mut self) {
116        if let Some(lock) = self.lock.take() {
117            // If we are releasing the write lock in the subrequest,
118            // it means that the cache did not take it for whatever reason.
119            // TransientError will cause the election of a new writer
120            lock.cache_lock
121                .release(&lock.key, lock.write_permit, LockStatus::TransientError);
122        }
123    }
124
125    /// Take the write lock from the subrequest, for use in a cache key lock.
126    pub fn take_write_lock(&mut self) -> Option<WritePermit> {
127        // also clear out lock ctx
128        self.lock.take().map(|lock| lock.write_permit)
129    }
130
131    /// Get the `BodyMode` when this subrequest was created.
132    pub fn body_mode(&self) -> BodyMode {
133        self.body_mode
134    }
135}
136
137use crate::HttpSession;
138
139pub(crate) fn create_session(parsed_session: &HttpSession) -> (HttpSession, SubrequestHandle) {
140    let (session, handle) = SessionSubrequest::new_from_session(parsed_session);
141    (HttpSession::new_subrequest(session), handle)
142}
143
144#[tokio::test]
145async fn test_dummy_request() {
146    use tokio_test::io::Builder;
147
148    let input = b"GET / HTTP/1.1\r\n\r\n";
149    let mock_io = Builder::new().read(&input[..]).build();
150    let mut req = HttpSession::new_http1(Box::new(mock_io));
151    req.read_request().await.unwrap();
152    assert_eq!(input.as_slice(), req.to_h1_raw());
153
154    let (mut subreq, _handle) = create_session(&req);
155    subreq.read_request().await.unwrap();
156    assert_eq!(input.as_slice(), subreq.to_h1_raw());
157}