Skip to main content

pingora_proxy/subrequest/
pipe.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Subrequest piping.
16//!
17//! Along with subrequests themselves, subrequest piping as a feature is in
18//! alpha stages, APIs are highly unstable and subject to change at any point.
19//!
20//! Unlike proxy_*, it is not a "true" proxy mode; the functions here help
21//! establish a pipe between the main downstream session and the subrequest (which
22//! in most cases will be used as a downstream session itself).
23//!
24//! Furthermore, only downstream modules are invoked on the main downstream session,
25//! and the ProxyHttp trait filters are not run on the HttpTasks from the main session
26//! (the only relevant one being the request body filter).
27
28use crate::proxy_common::{DownstreamStateMachine, ResponseStateMachine};
29use crate::subrequest::*;
30use crate::{PreparedSubrequest, Session};
31use bytes::Bytes;
32use futures::FutureExt;
33use log::{debug, warn};
34use pingora_core::protocols::http::{subrequest::server::SubrequestHandle, HttpTask};
35use pingora_error::{Error, ErrorType::*, OrErr, Result};
36use tokio::sync::mpsc;
37
38pub enum InputBodyType {
39    /// Preset body
40    Preset(InputBody),
41    /// Body should be saved (up to limit)
42    SaveBody(usize),
43}
44
45/// Context struct as a result of subrequest piping.
46#[derive(Clone)]
47pub struct PipeSubrequestState {
48    /// The saved (captured) body from the main session.
49    pub saved_body: Option<SavedBody>,
50}
51
52impl PipeSubrequestState {
53    fn new() -> PipeSubrequestState {
54        PipeSubrequestState { saved_body: None }
55    }
56}
57
58pub struct PipeSubrequestError {
59    pub state: PipeSubrequestState,
60    /// Whether error originated (and was propagated from) subrequest itself
61    /// (vs. an error that occurred while sending task)
62    pub from_subreq: bool,
63    pub error: Box<Error>,
64}
65impl PipeSubrequestError {
66    pub fn new(
67        error: impl Into<Box<Error>>,
68        from_subreq: bool,
69        state: PipeSubrequestState,
70    ) -> Self {
71        PipeSubrequestError {
72            error: error.into(),
73            from_subreq,
74            state,
75        }
76    }
77}
78
79fn map_pipe_err<T, E: Into<Box<Error>>>(
80    result: Result<T, E>,
81    from_subreq: bool,
82    state: &PipeSubrequestState,
83) -> Result<T, PipeSubrequestError> {
84    result.map_err(|e| PipeSubrequestError::new(e, from_subreq, state.clone()))
85}
86
87#[derive(Debug, Clone)]
88pub struct SavedBody {
89    body: Vec<Bytes>,
90    complete: bool,
91    truncated: bool,
92    length: usize,
93    max_length: usize,
94}
95
96impl SavedBody {
97    pub fn new(max_length: usize) -> Self {
98        SavedBody {
99            body: vec![],
100            complete: false,
101            truncated: false,
102            length: 0,
103            max_length,
104        }
105    }
106
107    pub fn save_body_bytes(&mut self, body_bytes: Bytes) -> bool {
108        let len = body_bytes.len();
109        if self.length + len > self.max_length {
110            self.truncated = true;
111            return false;
112        }
113        self.length += len;
114        self.body.push(body_bytes);
115        true
116    }
117
118    pub fn is_body_complete(&self) -> bool {
119        self.complete && !self.truncated
120    }
121
122    pub fn set_body_complete(&mut self) {
123        self.complete = true;
124    }
125}
126
127#[derive(Debug, Clone)]
128pub enum InputBody {
129    NoBody,
130    Bytes(Vec<Bytes>),
131    // TODO: stream
132}
133
134impl InputBody {
135    pub(crate) fn into_reader(self) -> InputBodyReader {
136        InputBodyReader(match self {
137            InputBody::NoBody => vec![].into_iter(),
138            InputBody::Bytes(v) => v.into_iter(),
139        })
140    }
141
142    pub fn is_body_empty(&self) -> bool {
143        match self {
144            InputBody::NoBody => true,
145            InputBody::Bytes(v) => v.is_empty(),
146        }
147    }
148}
149
150impl std::convert::From<SavedBody> for InputBody {
151    fn from(body: SavedBody) -> Self {
152        if body.body.is_empty() {
153            InputBody::NoBody
154        } else {
155            InputBody::Bytes(body.body)
156        }
157    }
158}
159
160pub async fn pipe_subrequest<F>(
161    session: &mut Session,
162    mut subrequest: PreparedSubrequest,
163    subrequest_handle: SubrequestHandle,
164    mut task_filter: F,
165    input_body: InputBodyType,
166) -> std::result::Result<PipeSubrequestState, PipeSubrequestError>
167where
168    F: FnMut(HttpTask) -> Result<Option<HttpTask>>,
169{
170    let (maybe_preset_body, saved_body) = match input_body {
171        InputBodyType::Preset(body) => (Some(body), None),
172        InputBodyType::SaveBody(limit) => (None, Some(SavedBody::new(limit))),
173    };
174    let use_preset_body = maybe_preset_body.is_some();
175
176    let mut response_state = ResponseStateMachine::new();
177    let (no_body_input, mut maybe_preset_reader) = if use_preset_body {
178        let preset_body = maybe_preset_body.expect("checked above");
179        (preset_body.is_body_empty(), Some(preset_body.into_reader()))
180    } else {
181        (session.as_mut().is_body_done(), None)
182    };
183    let mut downstream_state = DownstreamStateMachine::new(no_body_input);
184
185    let mut state = PipeSubrequestState::new();
186    state.saved_body = saved_body;
187
188    // Have the subrequest remove all body-related headers if no body will be sent
189    // TODO: we could also await the join handle, but subrequest may be running logging phase
190    // also the full run() may also await cache fill if downstream fails
191    let _join_handle = tokio::spawn(async move {
192        if no_body_input {
193            subrequest
194                .session_mut()
195                .as_subrequest_mut()
196                .expect("PreparedSubrequest must be subrequest")
197                .clear_request_body_headers();
198        }
199        subrequest.run().await
200    });
201    let tx = subrequest_handle.tx;
202    let mut rx = subrequest_handle.rx;
203
204    let mut wants_body = false;
205    let mut wants_body_rx_err = false;
206    let mut wants_body_rx = subrequest_handle.subreq_wants_body;
207
208    let mut proxy_error_rx_err = false;
209    let mut proxy_error_rx = subrequest_handle.subreq_proxy_error;
210
211    // Note: "upstream" here refers to subrequest session tasks,
212    // downstream refers to main session
213    while !downstream_state.is_done() || !response_state.is_done() {
214        let send_permit = tx
215            .try_reserve()
216            .or_err(InternalError, "try_reserve() body pipe for subrequest");
217
218        tokio::select! {
219            task = rx.recv(), if !response_state.upstream_done() => {
220                debug!("upstream event: {:?}", task);
221                if let Some(t) = task {
222                    // pull as many tasks as we can
223                    const TASK_BUFFER_SIZE: usize = 4;
224                    let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
225                    let task = map_pipe_err(task_filter(t), false, &state)?;
226                    if let Some(filtered) = task {
227                        tasks.push(filtered);
228                    }
229                    // tokio::task::unconstrained because now_or_never may yield None when the future is ready
230                    while let Some(maybe_task) = tokio::task::unconstrained(rx.recv()).now_or_never() {
231                        if let Some(t) = maybe_task {
232                            let task = map_pipe_err(task_filter(t), false, &state)?;
233                            if let Some(filtered) = task {
234                                tasks.push(filtered);
235                            }
236                        } else {
237                            break
238                        }
239                    }
240                    // FIXME: if one of these tasks is Failed(e), the session will return that
241                    // error; in this case, the error is actually from the subreq
242                    let response_done = map_pipe_err(session.write_response_tasks(tasks).await, false, &state)?;
243
244                    // NOTE: technically it is the downstream whose response state has finished here
245                    // we consider the subrequest's work done however
246                    response_state.maybe_set_upstream_done(response_done);
247                    // unsuccessful upgrade response may force the request done
248                    // (can only happen with a real session, TODO to allow with preset body)
249                    downstream_state.maybe_finished(!use_preset_body && session.is_body_done());
250                } else {
251                    // quite possible that the subrequest may be finished, though the main session
252                    // is not - we still must exit in this case
253                    debug!("empty upstream event");
254                    response_state.maybe_set_upstream_done(true);
255                }
256            },
257
258            res = &mut wants_body_rx, if !wants_body && !wants_body_rx_err => {
259                // subrequest may need time before it needs body, or it may not actually require it
260                // TODO: tx send permit may not be necessary if no oneshot exists
261                if res.is_err() {
262                    wants_body_rx_err = true;
263                } else {
264                    wants_body = true;
265                }
266            }
267
268            res = &mut proxy_error_rx, if !proxy_error_rx_err => {
269                if let Ok(e) = res {
270                    // propagate proxy error to caller
271                    return Err(PipeSubrequestError::new(e, true, state));
272                } else {
273                    // subrequest dropped, let select loop finish
274                    proxy_error_rx_err = true;
275                }
276            }
277
278            _ = tx.reserve(), if downstream_state.is_reading() && send_permit.is_err() => {
279                // If tx is closed, the upstream has already finished its job.
280                downstream_state.maybe_finished(tx.is_closed());
281                debug!("waiting for permit {send_permit:?}, upstream closed {}", tx.is_closed());
282                /* No permit, wait on more capacity to avoid starving.
283                 * Otherwise this select only blocks on rx, which might send no data
284                 * before the entire body is uploaded.
285                 * once more capacity arrives we just loop back
286                 */
287            },
288
289            body = session.downstream_session.read_body_or_idle(downstream_state.is_done()),
290                if wants_body && !use_preset_body && downstream_state.can_poll() && send_permit.is_ok() => {
291                // this is the first subrequest
292                // send the body
293                debug!("downstream event: main body for subrequest");
294                let body = map_pipe_err(body.map_err(|e| e.into_down()), false, &state)?;
295
296                // If the request is websocket, `None` body means the request is closed.
297                // Set the response to be done as well so that the request completes normally.
298                if body.is_none() && session.is_upgrade_req() {
299                    response_state.maybe_set_upstream_done(true);
300                }
301
302                let is_body_done = session.is_body_done();
303                let request_done = map_pipe_err(send_body_to_pipe(
304                    session,
305                    body,
306                    is_body_done,
307                    state.saved_body.as_mut(),
308                    send_permit.expect("checked is_ok()"),
309                )
310                .await, false, &state)?;
311
312                downstream_state.maybe_finished(request_done);
313
314            },
315
316            // lazily evaluated async block allows us to expect() inside the select! branch
317            body = async { maybe_preset_reader.as_mut().expect("preset body set").read_body() },
318                if wants_body && use_preset_body && !downstream_state.is_done() && downstream_state.can_poll() && send_permit.is_ok() => {
319                debug!("downstream event: preset body for subrequest");
320
321                // TODO: WebSocket handling to set upstream done?
322
323                // preset None body indicates we are done
324                let is_body_done = body.is_none();
325                // Don't run downstream modules on preset input body
326                let request_done = map_pipe_err(do_send_body_to_pipe(
327                    body,
328                    is_body_done,
329                    None,
330                    send_permit.expect("checked is_ok()"),
331                ), false, &state)?;
332                downstream_state.maybe_finished(request_done);
333
334            },
335
336            else => break,
337        }
338    }
339    Ok(state)
340}
341
342// Mostly the same as proxy_common, but does not run proxy request_body_filter
343async fn send_body_to_pipe(
344    session: &mut Session,
345    mut data: Option<Bytes>,
346    end_of_body: bool,
347    saved_body: Option<&mut SavedBody>,
348    tx: mpsc::Permit<'_, HttpTask>,
349) -> Result<bool> {
350    // None: end of body
351    // this var is to signal if downstream finish sending the body, which shouldn't be
352    // affected by the request_body_filter
353    let end_of_body = end_of_body || data.is_none();
354
355    session
356        .downstream_modules_ctx
357        .request_body_filter(&mut data, end_of_body)
358        .await?;
359
360    do_send_body_to_pipe(data, end_of_body, saved_body, tx)
361}
362
363fn do_send_body_to_pipe(
364    data: Option<Bytes>,
365    end_of_body: bool,
366    mut saved_body: Option<&mut SavedBody>,
367    tx: mpsc::Permit<'_, HttpTask>,
368) -> Result<bool> {
369    // the flag to signal to upstream
370    let upstream_end_of_body = end_of_body || data.is_none();
371
372    /* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to
373     * output anything yet.
374     * Don't write 0 bytes to the network since it will be
375     * treated as the terminating chunk */
376    if !upstream_end_of_body && data.as_ref().is_some_and(|d| d.is_empty()) {
377        return Ok(false);
378    }
379
380    debug!(
381        "Read {} bytes body from downstream",
382        data.as_ref().map_or(-1, |d| d.len() as isize)
383    );
384
385    if let Some(capture) = saved_body.as_mut() {
386        if capture.is_body_complete() {
387            warn!("subrequest trying to save body after body is complete");
388        } else if let Some(d) = data.as_ref() {
389            capture.save_body_bytes(d.clone());
390        }
391        if end_of_body {
392            capture.set_body_complete();
393        }
394    }
395
396    tx.send(HttpTask::Body(data, upstream_end_of_body));
397
398    Ok(end_of_body)
399}