pingora_proxy/subrequest/
pipe.rs1use crate::proxy_common::{DownstreamStateMachine, ResponseStateMachine};
29use crate::subrequest::*;
30use crate::{PreparedSubrequest, Session};
31use bytes::Bytes;
32use futures::FutureExt;
33use log::{debug, warn};
34use pingora_core::protocols::http::{subrequest::server::SubrequestHandle, HttpTask};
35use pingora_error::{Error, ErrorType::*, OrErr, Result};
36use tokio::sync::mpsc;
37
38pub enum InputBodyType {
39 Preset(InputBody),
41 SaveBody(usize),
43}
44
45#[derive(Clone)]
47pub struct PipeSubrequestState {
48 pub saved_body: Option<SavedBody>,
50}
51
52impl PipeSubrequestState {
53 fn new() -> PipeSubrequestState {
54 PipeSubrequestState { saved_body: None }
55 }
56}
57
58pub struct PipeSubrequestError {
59 pub state: PipeSubrequestState,
60 pub from_subreq: bool,
63 pub error: Box<Error>,
64}
65impl PipeSubrequestError {
66 pub fn new(
67 error: impl Into<Box<Error>>,
68 from_subreq: bool,
69 state: PipeSubrequestState,
70 ) -> Self {
71 PipeSubrequestError {
72 error: error.into(),
73 from_subreq,
74 state,
75 }
76 }
77}
78
79fn map_pipe_err<T, E: Into<Box<Error>>>(
80 result: Result<T, E>,
81 from_subreq: bool,
82 state: &PipeSubrequestState,
83) -> Result<T, PipeSubrequestError> {
84 result.map_err(|e| PipeSubrequestError::new(e, from_subreq, state.clone()))
85}
86
87#[derive(Debug, Clone)]
88pub struct SavedBody {
89 body: Vec<Bytes>,
90 complete: bool,
91 truncated: bool,
92 length: usize,
93 max_length: usize,
94}
95
96impl SavedBody {
97 pub fn new(max_length: usize) -> Self {
98 SavedBody {
99 body: vec![],
100 complete: false,
101 truncated: false,
102 length: 0,
103 max_length,
104 }
105 }
106
107 pub fn save_body_bytes(&mut self, body_bytes: Bytes) -> bool {
108 let len = body_bytes.len();
109 if self.length + len > self.max_length {
110 self.truncated = true;
111 return false;
112 }
113 self.length += len;
114 self.body.push(body_bytes);
115 true
116 }
117
118 pub fn is_body_complete(&self) -> bool {
119 self.complete && !self.truncated
120 }
121
122 pub fn set_body_complete(&mut self) {
123 self.complete = true;
124 }
125}
126
127#[derive(Debug, Clone)]
128pub enum InputBody {
129 NoBody,
130 Bytes(Vec<Bytes>),
131 }
133
134impl InputBody {
135 pub(crate) fn into_reader(self) -> InputBodyReader {
136 InputBodyReader(match self {
137 InputBody::NoBody => vec![].into_iter(),
138 InputBody::Bytes(v) => v.into_iter(),
139 })
140 }
141
142 pub fn is_body_empty(&self) -> bool {
143 match self {
144 InputBody::NoBody => true,
145 InputBody::Bytes(v) => v.is_empty(),
146 }
147 }
148}
149
150impl std::convert::From<SavedBody> for InputBody {
151 fn from(body: SavedBody) -> Self {
152 if body.body.is_empty() {
153 InputBody::NoBody
154 } else {
155 InputBody::Bytes(body.body)
156 }
157 }
158}
159
160pub async fn pipe_subrequest<F>(
161 session: &mut Session,
162 mut subrequest: PreparedSubrequest,
163 subrequest_handle: SubrequestHandle,
164 mut task_filter: F,
165 input_body: InputBodyType,
166) -> std::result::Result<PipeSubrequestState, PipeSubrequestError>
167where
168 F: FnMut(HttpTask) -> Result<Option<HttpTask>>,
169{
170 let (maybe_preset_body, saved_body) = match input_body {
171 InputBodyType::Preset(body) => (Some(body), None),
172 InputBodyType::SaveBody(limit) => (None, Some(SavedBody::new(limit))),
173 };
174 let use_preset_body = maybe_preset_body.is_some();
175
176 let mut response_state = ResponseStateMachine::new();
177 let (no_body_input, mut maybe_preset_reader) = if use_preset_body {
178 let preset_body = maybe_preset_body.expect("checked above");
179 (preset_body.is_body_empty(), Some(preset_body.into_reader()))
180 } else {
181 (session.as_mut().is_body_done(), None)
182 };
183 let mut downstream_state = DownstreamStateMachine::new(no_body_input);
184
185 let mut state = PipeSubrequestState::new();
186 state.saved_body = saved_body;
187
188 let _join_handle = tokio::spawn(async move {
192 if no_body_input {
193 subrequest
194 .session_mut()
195 .as_subrequest_mut()
196 .expect("PreparedSubrequest must be subrequest")
197 .clear_request_body_headers();
198 }
199 subrequest.run().await
200 });
201 let tx = subrequest_handle.tx;
202 let mut rx = subrequest_handle.rx;
203
204 let mut wants_body = false;
205 let mut wants_body_rx_err = false;
206 let mut wants_body_rx = subrequest_handle.subreq_wants_body;
207
208 let mut proxy_error_rx_err = false;
209 let mut proxy_error_rx = subrequest_handle.subreq_proxy_error;
210
211 while !downstream_state.is_done() || !response_state.is_done() {
214 let send_permit = tx
215 .try_reserve()
216 .or_err(InternalError, "try_reserve() body pipe for subrequest");
217
218 tokio::select! {
219 task = rx.recv(), if !response_state.upstream_done() => {
220 debug!("upstream event: {:?}", task);
221 if let Some(t) = task {
222 const TASK_BUFFER_SIZE: usize = 4;
224 let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
225 let task = map_pipe_err(task_filter(t), false, &state)?;
226 if let Some(filtered) = task {
227 tasks.push(filtered);
228 }
229 while let Some(maybe_task) = tokio::task::unconstrained(rx.recv()).now_or_never() {
231 if let Some(t) = maybe_task {
232 let task = map_pipe_err(task_filter(t), false, &state)?;
233 if let Some(filtered) = task {
234 tasks.push(filtered);
235 }
236 } else {
237 break
238 }
239 }
240 let response_done = map_pipe_err(session.write_response_tasks(tasks).await, false, &state)?;
243
244 response_state.maybe_set_upstream_done(response_done);
247 downstream_state.maybe_finished(!use_preset_body && session.is_body_done());
250 } else {
251 debug!("empty upstream event");
254 response_state.maybe_set_upstream_done(true);
255 }
256 },
257
258 res = &mut wants_body_rx, if !wants_body && !wants_body_rx_err => {
259 if res.is_err() {
262 wants_body_rx_err = true;
263 } else {
264 wants_body = true;
265 }
266 }
267
268 res = &mut proxy_error_rx, if !proxy_error_rx_err => {
269 if let Ok(e) = res {
270 return Err(PipeSubrequestError::new(e, true, state));
272 } else {
273 proxy_error_rx_err = true;
275 }
276 }
277
278 _ = tx.reserve(), if downstream_state.is_reading() && send_permit.is_err() => {
279 downstream_state.maybe_finished(tx.is_closed());
281 debug!("waiting for permit {send_permit:?}, upstream closed {}", tx.is_closed());
282 },
288
289 body = session.downstream_session.read_body_or_idle(downstream_state.is_done()),
290 if wants_body && !use_preset_body && downstream_state.can_poll() && send_permit.is_ok() => {
291 debug!("downstream event: main body for subrequest");
294 let body = map_pipe_err(body.map_err(|e| e.into_down()), false, &state)?;
295
296 if body.is_none() && session.is_upgrade_req() {
299 response_state.maybe_set_upstream_done(true);
300 }
301
302 let is_body_done = session.is_body_done();
303 let request_done = map_pipe_err(send_body_to_pipe(
304 session,
305 body,
306 is_body_done,
307 state.saved_body.as_mut(),
308 send_permit.expect("checked is_ok()"),
309 )
310 .await, false, &state)?;
311
312 downstream_state.maybe_finished(request_done);
313
314 },
315
316 body = async { maybe_preset_reader.as_mut().expect("preset body set").read_body() },
318 if wants_body && use_preset_body && !downstream_state.is_done() && downstream_state.can_poll() && send_permit.is_ok() => {
319 debug!("downstream event: preset body for subrequest");
320
321 let is_body_done = body.is_none();
325 let request_done = map_pipe_err(do_send_body_to_pipe(
327 body,
328 is_body_done,
329 None,
330 send_permit.expect("checked is_ok()"),
331 ), false, &state)?;
332 downstream_state.maybe_finished(request_done);
333
334 },
335
336 else => break,
337 }
338 }
339 Ok(state)
340}
341
342async fn send_body_to_pipe(
344 session: &mut Session,
345 mut data: Option<Bytes>,
346 end_of_body: bool,
347 saved_body: Option<&mut SavedBody>,
348 tx: mpsc::Permit<'_, HttpTask>,
349) -> Result<bool> {
350 let end_of_body = end_of_body || data.is_none();
354
355 session
356 .downstream_modules_ctx
357 .request_body_filter(&mut data, end_of_body)
358 .await?;
359
360 do_send_body_to_pipe(data, end_of_body, saved_body, tx)
361}
362
363fn do_send_body_to_pipe(
364 data: Option<Bytes>,
365 end_of_body: bool,
366 mut saved_body: Option<&mut SavedBody>,
367 tx: mpsc::Permit<'_, HttpTask>,
368) -> Result<bool> {
369 let upstream_end_of_body = end_of_body || data.is_none();
371
372 if !upstream_end_of_body && data.as_ref().is_some_and(|d| d.is_empty()) {
377 return Ok(false);
378 }
379
380 debug!(
381 "Read {} bytes body from downstream",
382 data.as_ref().map_or(-1, |d| d.len() as isize)
383 );
384
385 if let Some(capture) = saved_body.as_mut() {
386 if capture.is_body_complete() {
387 warn!("subrequest trying to save body after body is complete");
388 } else if let Some(d) = data.as_ref() {
389 capture.save_body_bytes(d.clone());
390 }
391 if end_of_body {
392 capture.set_body_complete();
393 }
394 }
395
396 tx.send(HttpTask::Body(data, upstream_end_of_body));
397
398 Ok(end_of_body)
399}