Skip to main content

pingora_proxy/subrequest/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use pingora_cache::lock::{CacheKeyLockImpl, LockStatus, WritePermit};
17use pingora_cache::CacheKey;
18use pingora_core::protocols::http::subrequest::server::{
19    HttpSession as SessionSubrequest, SubrequestHandle,
20};
21use std::any::Any;
22
23pub mod pipe;
24
25struct LockCtx {
26    write_permit: WritePermit,
27    cache_lock: &'static CacheKeyLockImpl,
28    key: CacheKey,
29}
30
31// Thin wrapper to allow iterating over InputBody Vec.
32pub(crate) struct InputBodyReader(std::vec::IntoIter<Bytes>);
33
34impl InputBodyReader {
35    pub fn read_body(&mut self) -> Option<Bytes> {
36        self.0.next()
37    }
38}
39
40/// Optional user-defined subrequest context.
41pub type UserCtx = Box<dyn Any + Sync + Send>;
42
43#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
44pub enum BodyMode {
45    /// No body to be sent for subrequest.
46    #[default]
47    NoBody,
48    /// Waiting on body if needed.
49    ExpectBody,
50}
51
52#[derive(Default)]
53pub struct CtxBuilder {
54    lock: Option<LockCtx>,
55    body_mode: BodyMode,
56    user_ctx: Option<UserCtx>,
57}
58
59impl CtxBuilder {
60    pub fn new() -> Self {
61        Self {
62            lock: None,
63            body_mode: BodyMode::NoBody,
64            user_ctx: None,
65        }
66    }
67
68    pub fn cache_write_lock(
69        mut self,
70        cache_lock: &'static CacheKeyLockImpl,
71        key: CacheKey,
72        write_permit: WritePermit,
73    ) -> Self {
74        self.lock = Some(LockCtx {
75            cache_lock,
76            key,
77            write_permit,
78        });
79        self
80    }
81
82    pub fn user_ctx(mut self, user_ctx: UserCtx) -> Self {
83        self.user_ctx = Some(user_ctx);
84        self
85    }
86
87    pub fn body_mode(mut self, body_mode: BodyMode) -> Self {
88        self.body_mode = body_mode;
89        self
90    }
91
92    pub fn build(self) -> Ctx {
93        Ctx {
94            lock: self.lock,
95            body_mode: self.body_mode,
96            user_ctx: self.user_ctx,
97        }
98    }
99}
100
101/// Context struct to share state across the parent and sub-request.
102pub struct Ctx {
103    body_mode: BodyMode,
104    lock: Option<LockCtx>,
105    // User-defined custom context.
106    user_ctx: Option<UserCtx>,
107}
108
109impl Ctx {
110    /// Create a [`CtxBuilder`] in order to make a new subrequest `Ctx`.
111    pub fn builder() -> CtxBuilder {
112        CtxBuilder::new()
113    }
114
115    /// Get a reference to the extensions inside this subrequest.
116    pub fn user_ctx(&self) -> Option<&UserCtx> {
117        self.user_ctx.as_ref()
118    }
119
120    /// Get a mutable reference to the extensions inside this subrequest.
121    pub fn user_ctx_mut(&mut self) -> Option<&mut UserCtx> {
122        self.user_ctx.as_mut()
123    }
124
125    /// Release the write lock from the subrequest (to clean up a write permit
126    /// that will not be used in the cache key lock).
127    pub fn release_write_lock(&mut self) {
128        if let Some(lock) = self.lock.take() {
129            // If we are releasing the write lock in the subrequest,
130            // it means that the cache did not take it for whatever reason.
131            // TransientError will cause the election of a new writer
132            lock.cache_lock
133                .release(&lock.key, lock.write_permit, LockStatus::TransientError);
134        }
135    }
136
137    /// Take the write lock from the subrequest, for use in a cache key lock.
138    pub fn take_write_lock(&mut self) -> Option<WritePermit> {
139        // also clear out lock ctx
140        self.lock.take().map(|lock| lock.write_permit)
141    }
142
143    /// Get the `BodyMode` when this subrequest was created.
144    pub fn body_mode(&self) -> BodyMode {
145        self.body_mode
146    }
147}
148
149use crate::HttpSession;
150
151pub(crate) fn create_session(parsed_session: &HttpSession) -> (HttpSession, SubrequestHandle) {
152    let (session, handle) = SessionSubrequest::new_from_session(parsed_session);
153    (HttpSession::new_subrequest(session), handle)
154}
155
156#[tokio::test]
157async fn test_dummy_request() {
158    use tokio_test::io::Builder;
159
160    let input = b"GET / HTTP/1.1\r\n\r\n";
161    let mock_io = Builder::new().read(&input[..]).build();
162    let mut req = HttpSession::new_http1(Box::new(mock_io));
163    req.read_request().await.unwrap();
164    assert_eq!(input.as_slice(), req.to_h1_raw());
165
166    let (mut subreq, _handle) = create_session(&req);
167    subreq.read_request().await.unwrap();
168    assert_eq!(input.as_slice(), subreq.to_h1_raw());
169}