1use super::custom::server::Session as SessionCustom;
18use super::error_resp;
19use super::subrequest::server::HttpSession as SessionSubrequest;
20use super::v1::server::HttpSession as SessionV1;
21use super::v2::server::HttpSession as SessionV2;
22use super::HttpTask;
23use crate::custom_session;
24use crate::protocols::{Digest, SocketAddr, Stream};
25use bytes::Bytes;
26use http::HeaderValue;
27use http::{header::AsHeaderName, HeaderMap};
28use pingora_error::Result;
29use pingora_http::{RequestHeader, ResponseHeader};
30use std::time::Duration;
31
32pub enum Session {
34 H1(SessionV1),
35 H2(SessionV2),
36 Subrequest(SessionSubrequest),
37 Custom(Box<dyn SessionCustom>),
38}
39
40impl Session {
41 pub fn new_http1(stream: Stream) -> Self {
43 Self::H1(SessionV1::new(stream))
44 }
45
46 pub fn new_http2(session: SessionV2) -> Self {
48 Self::H2(session)
49 }
50
51 pub fn new_subrequest(session: SessionSubrequest) -> Self {
53 Self::Subrequest(session)
54 }
55
56 pub fn new_custom(session: Box<dyn SessionCustom>) -> Self {
58 Self::Custom(session)
59 }
60
61 pub fn is_http2(&self) -> bool {
63 matches!(self, Self::H2(_))
64 }
65
66 pub fn is_subrequest(&self) -> bool {
68 matches!(self, Self::Subrequest(_))
69 }
70
71 pub fn is_custom(&self) -> bool {
73 matches!(self, Self::Custom(_))
74 }
75
76 pub async fn read_request(&mut self) -> Result<bool> {
82 match self {
83 Self::H1(s) => {
84 let read = s.read_request().await?;
85 Ok(read.is_some())
86 }
87 Self::H2(_) => Ok(true),
89 Self::Subrequest(s) => {
90 let read = s.read_request().await?;
91 Ok(read.is_some())
92 }
93 Self::Custom(_) => Ok(true),
94 }
95 }
96
97 pub fn req_header(&self) -> &RequestHeader {
101 match self {
102 Self::H1(s) => s.req_header(),
103 Self::H2(s) => s.req_header(),
104 Self::Subrequest(s) => s.req_header(),
105 Self::Custom(s) => s.req_header(),
106 }
107 }
108
109 pub fn req_header_mut(&mut self) -> &mut RequestHeader {
113 match self {
114 Self::H1(s) => s.req_header_mut(),
115 Self::H2(s) => s.req_header_mut(),
116 Self::Subrequest(s) => s.req_header_mut(),
117 Self::Custom(s) => s.req_header_mut(),
118 }
119 }
120
121 pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
126 self.req_header().headers.get(key)
127 }
128
129 pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
132 self.get_header(key).map_or(b"", |v| v.as_bytes())
133 }
134
135 pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
137 match self {
138 Self::H1(s) => s.read_body_bytes().await,
139 Self::H2(s) => s.read_body_bytes().await,
140 Self::Subrequest(s) => s.read_body_bytes().await,
141 Self::Custom(s) => s.read_body_bytes().await,
142 }
143 }
144
145 pub async fn drain_request_body(&mut self) -> Result<()> {
150 match self {
151 Self::H1(s) => s.drain_request_body().await,
152 Self::H2(s) => s.drain_request_body().await,
153 Self::Subrequest(s) => s.drain_request_body().await,
154 Self::Custom(s) => s.drain_request_body().await,
155 }
156 }
157
158 pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
162 match self {
163 Self::H1(s) => {
164 s.write_response_header(resp).await?;
165 Ok(())
166 }
167 Self::H2(s) => s.write_response_header(resp, false),
168 Self::Subrequest(s) => {
169 s.write_response_header(resp).await?;
170 Ok(())
171 }
172 Self::Custom(s) => s.write_response_header(resp, false).await,
173 }
174 }
175
176 pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
178 match self {
179 Self::H1(s) => {
180 s.write_response_header_ref(resp).await?;
181 Ok(())
182 }
183 Self::H2(s) => s.write_response_header_ref(resp, false),
184 Self::Subrequest(s) => {
185 s.write_response_header_ref(resp).await?;
186 Ok(())
187 }
188 Self::Custom(s) => s.write_response_header_ref(resp, false).await,
189 }
190 }
191
192 pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
194 if data.is_empty() && !end {
195 return Ok(());
199 }
200 match self {
201 Self::H1(s) => {
202 if !data.is_empty() {
203 s.write_body(&data).await?;
204 }
205 if end {
206 s.finish_body().await?;
207 }
208 Ok(())
209 }
210 Self::H2(s) => s.write_body(data, end).await,
211 Self::Subrequest(s) => {
212 s.write_body(data).await?;
213 Ok(())
214 }
215 Self::Custom(s) => s.write_body(data, end).await,
216 }
217 }
218
219 pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
221 match self {
222 Self::H1(_) => Ok(()), Self::H2(s) => s.write_trailers(trailers),
224 Self::Subrequest(s) => s.write_trailers(Some(Box::new(trailers))).await,
225 Self::Custom(s) => s.write_trailers(trailers).await,
226 }
227 }
228
229 pub async fn finish(self) -> Result<Option<Stream>> {
234 match self {
235 Self::H1(mut s) => {
236 s.finish_body().await?;
238 s.reuse().await
239 }
240 Self::H2(mut s) => {
241 s.finish()?;
242 Ok(None)
243 }
244 Self::Subrequest(mut s) => {
245 s.finish().await?;
246 Ok(None)
247 }
248 Self::Custom(mut s) => {
249 s.finish().await?;
250 Ok(None)
251 }
252 }
253 }
254
255 pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
256 match self {
257 Self::H1(s) => s.response_duplex_vec(tasks).await,
258 Self::H2(s) => s.response_duplex_vec(tasks).await,
259 Self::Subrequest(s) => s.response_duplex_vec(tasks).await,
260 Self::Custom(s) => s.response_duplex_vec(tasks).await,
261 }
262 }
263
264 pub fn set_keepalive(&mut self, duration: Option<u64>) {
267 match self {
268 Self::H1(s) => s.set_server_keepalive(duration),
269 Self::H2(_) => {}
270 Self::Subrequest(_) => {}
271 Self::Custom(_) => {}
272 }
273 }
274
275 pub fn get_keepalive(&self) -> Option<u64> {
278 match self {
279 Self::H1(s) => s.get_keepalive_timeout(),
280 Self::H2(_) => None,
281 Self::Subrequest(_) => None,
282 Self::Custom(_) => None,
283 }
284 }
285
286 pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
291 match self {
292 Self::H1(s) => s.set_read_timeout(timeout),
293 Self::H2(_) => {}
294 Self::Subrequest(s) => s.set_read_timeout(timeout),
295 Self::Custom(c) => c.set_read_timeout(timeout),
296 }
297 }
298
299 pub fn get_read_timeout(&self) -> Option<Duration> {
301 match self {
302 Self::H1(s) => s.get_read_timeout(),
303 Self::H2(_) => None,
304 Self::Subrequest(s) => s.get_read_timeout(),
305 Self::Custom(s) => s.get_read_timeout(),
306 }
307 }
308
309 pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
313 match self {
314 Self::H1(s) => s.set_write_timeout(timeout),
315 Self::H2(s) => s.set_write_timeout(timeout),
316 Self::Subrequest(s) => s.set_write_timeout(timeout),
317 Self::Custom(c) => c.set_write_timeout(timeout),
318 }
319 }
320
321 pub fn get_write_timeout(&self) -> Option<Duration> {
323 match self {
324 Self::H1(s) => s.get_write_timeout(),
325 Self::H2(s) => s.get_write_timeout(),
326 Self::Subrequest(s) => s.get_write_timeout(),
327 Self::Custom(s) => s.get_write_timeout(),
328 }
329 }
330
331 pub fn set_total_drain_timeout(&mut self, timeout: Option<Duration>) {
338 match self {
339 Self::H1(s) => s.set_total_drain_timeout(timeout),
340 Self::H2(s) => s.set_total_drain_timeout(timeout),
341 Self::Subrequest(s) => s.set_total_drain_timeout(timeout),
342 Self::Custom(c) => c.set_total_drain_timeout(timeout),
343 }
344 }
345
346 pub fn get_total_drain_timeout(&self) -> Option<Duration> {
348 match self {
349 Self::H1(s) => s.get_total_drain_timeout(),
350 Self::H2(s) => s.get_total_drain_timeout(),
351 Self::Subrequest(s) => s.get_total_drain_timeout(),
352 Self::Custom(s) => s.get_total_drain_timeout(),
353 }
354 }
355
356 pub fn set_min_send_rate(&mut self, rate: Option<usize>) {
367 match self {
368 Self::H1(s) => s.set_min_send_rate(rate),
369 Self::H2(_) => {}
370 Self::Subrequest(_) => {}
371 Self::Custom(_) => {}
372 }
373 }
374
375 pub fn set_ignore_info_resp(&mut self, ignore: bool) {
384 match self {
385 Self::H1(s) => s.set_ignore_info_resp(ignore),
386 Self::H2(_) => {} Self::Subrequest(_) => {}
388 Self::Custom(_) => {} }
390 }
391
392 pub fn set_close_on_response_before_downstream_finish(&mut self, close: bool) {
397 match self {
398 Self::H1(s) => s.set_close_on_response_before_downstream_finish(close),
399 Self::H2(_) => {} Self::Subrequest(_) => {} Self::Custom(_) => {} }
403 }
404
405 pub fn request_summary(&self) -> String {
408 match self {
409 Self::H1(s) => s.request_summary(),
410 Self::H2(s) => s.request_summary(),
411 Self::Subrequest(s) => s.request_summary(),
412 Self::Custom(s) => s.request_summary(),
413 }
414 }
415
416 pub fn response_written(&self) -> Option<&ResponseHeader> {
419 match self {
420 Self::H1(s) => s.response_written(),
421 Self::H2(s) => s.response_written(),
422 Self::Subrequest(s) => s.response_written(),
423 Self::Custom(s) => s.response_written(),
424 }
425 }
426
427 pub async fn shutdown(&mut self) {
432 match self {
433 Self::H1(s) => s.shutdown().await,
434 Self::H2(s) => s.shutdown(),
435 Self::Subrequest(s) => s.shutdown(),
436 Self::Custom(s) => s.shutdown(1, "shutdown").await,
437 }
438 }
439
440 pub fn to_h1_raw(&self) -> Bytes {
441 match self {
442 Self::H1(s) => s.get_headers_raw_bytes(),
443 Self::H2(s) => s.pseudo_raw_h1_request_header(),
444 Self::Subrequest(s) => s.get_headers_raw_bytes(),
445 Self::Custom(c) => c.pseudo_raw_h1_request_header(),
446 }
447 }
448
449 pub fn is_body_done(&mut self) -> bool {
451 match self {
452 Self::H1(s) => s.is_body_done(),
453 Self::H2(s) => s.is_body_done(),
454 Self::Subrequest(s) => s.is_body_done(),
455 Self::Custom(s) => s.is_body_done(),
456 }
457 }
458
459 pub async fn finish_body(&mut self) -> Result<()> {
465 match self {
466 Self::H1(s) => s.finish_body().await.map(|_| ()),
467 Self::H2(s) => s.finish(),
468 Self::Subrequest(s) => s.finish().await.map(|_| ()),
469 Self::Custom(s) => s.finish().await,
470 }
471 }
472
473 pub fn generate_error(error: u16) -> ResponseHeader {
474 match error {
475 502 => error_resp::HTTP_502_RESPONSE.clone(),
477 400 => error_resp::HTTP_400_RESPONSE.clone(),
478 _ => error_resp::gen_error_response(error),
479 }
480 }
481
482 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
484 self.respond_error_with_body(error, Bytes::default()).await
485 }
486
487 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
489 let mut resp = Self::generate_error(error);
490 if !body.is_empty() {
491 resp.set_content_length(body.len())?
493 }
494 self.write_error_response(resp, body).await
495 }
496
497 pub async fn write_error_response(&mut self, resp: ResponseHeader, body: Bytes) -> Result<()> {
499 self.set_keepalive(None);
506
507 if let Some(resp_written) = self.response_written().as_ref() {
511 if !resp_written.status.is_informational() || resp_written.status == 101 {
512 return Ok(());
513 }
514 }
515
516 self.write_response_header(Box::new(resp)).await?;
517
518 if !body.is_empty() {
519 self.write_response_body(body, true).await?;
520 } else {
521 self.finish_body().await?;
522 }
523
524 custom_session!(self.finish_custom().await?);
525
526 Ok(())
527 }
528
529 pub fn is_body_empty(&mut self) -> bool {
531 match self {
532 Self::H1(s) => s.is_body_empty(),
533 Self::H2(s) => s.is_body_empty(),
534 Self::Subrequest(s) => s.is_body_empty(),
535 Self::Custom(s) => s.is_body_empty(),
536 }
537 }
538
539 pub fn retry_buffer_truncated(&self) -> bool {
540 match self {
541 Self::H1(s) => s.retry_buffer_truncated(),
542 Self::H2(s) => s.retry_buffer_truncated(),
543 Self::Subrequest(s) => s.retry_buffer_truncated(),
544 Self::Custom(s) => s.retry_buffer_truncated(),
545 }
546 }
547
548 pub fn enable_retry_buffering(&mut self) {
549 match self {
550 Self::H1(s) => s.enable_retry_buffering(),
551 Self::H2(s) => s.enable_retry_buffering(),
552 Self::Subrequest(s) => s.enable_retry_buffering(),
553 Self::Custom(s) => s.enable_retry_buffering(),
554 }
555 }
556
557 pub fn get_retry_buffer(&self) -> Option<Bytes> {
558 match self {
559 Self::H1(s) => s.get_retry_buffer(),
560 Self::H2(s) => s.get_retry_buffer(),
561 Self::Subrequest(s) => s.get_retry_buffer(),
562 Self::Custom(s) => s.get_retry_buffer(),
563 }
564 }
565
566 pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
569 match self {
570 Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
571 Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
572 Self::Subrequest(s) => s.read_body_or_idle(no_body_expected).await,
573 Self::Custom(s) => s.read_body_or_idle(no_body_expected).await,
574 }
575 }
576
577 pub fn as_http1(&self) -> Option<&SessionV1> {
578 match self {
579 Self::H1(s) => Some(s),
580 Self::H2(_) => None,
581 Self::Subrequest(_) => None,
582 Self::Custom(_) => None,
583 }
584 }
585
586 pub fn as_http2(&self) -> Option<&SessionV2> {
587 match self {
588 Self::H1(_) => None,
589 Self::H2(s) => Some(s),
590 Self::Subrequest(_) => None,
591 Self::Custom(_) => None,
592 }
593 }
594
595 pub fn as_subrequest(&self) -> Option<&SessionSubrequest> {
596 match self {
597 Self::H1(_) => None,
598 Self::H2(_) => None,
599 Self::Subrequest(s) => Some(s),
600 Self::Custom(_) => None,
601 }
602 }
603
604 pub fn as_subrequest_mut(&mut self) -> Option<&mut SessionSubrequest> {
605 match self {
606 Self::H1(_) => None,
607 Self::H2(_) => None,
608 Self::Subrequest(s) => Some(s),
609 Self::Custom(_) => None,
610 }
611 }
612
613 pub fn as_custom(&self) -> Option<&dyn SessionCustom> {
614 match self {
615 Self::H1(_) => None,
616 Self::H2(_) => None,
617 Self::Subrequest(_) => None,
618 Self::Custom(c) => Some(c.as_ref()),
619 }
620 }
621
622 pub fn as_custom_mut(&mut self) -> Option<&mut Box<dyn SessionCustom>> {
623 match self {
624 Self::H1(_) => None,
625 Self::H2(_) => None,
626 Self::Subrequest(_) => None,
627 Self::Custom(c) => Some(c),
628 }
629 }
630
631 pub async fn write_continue_response(&mut self) -> Result<()> {
633 match self {
634 Self::H1(s) => s.write_continue_response().await,
635 Self::H2(s) => s.write_response_header(
636 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
637 false,
638 ),
639 Self::Subrequest(s) => s.write_continue_response().await,
640 Self::Custom(s) => {
642 s.write_response_header(
643 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
644 false,
645 )
646 .await
647 }
648 }
649 }
650
651 pub fn is_upgrade_req(&self) -> bool {
653 match self {
654 Self::H1(s) => s.is_upgrade_req(),
655 Self::H2(_) => false,
656 Self::Subrequest(s) => s.is_upgrade_req(),
657 Self::Custom(_) => false,
658 }
659 }
660
661 pub fn body_bytes_sent(&self) -> usize {
663 match self {
664 Self::H1(s) => s.body_bytes_sent(),
665 Self::H2(s) => s.body_bytes_sent(),
666 Self::Subrequest(s) => s.body_bytes_sent(),
667 Self::Custom(s) => s.body_bytes_sent(),
668 }
669 }
670
671 pub fn body_bytes_read(&self) -> usize {
673 match self {
674 Self::H1(s) => s.body_bytes_read(),
675 Self::H2(s) => s.body_bytes_read(),
676 Self::Subrequest(s) => s.body_bytes_read(),
677 Self::Custom(s) => s.body_bytes_read(),
678 }
679 }
680
681 pub fn digest(&self) -> Option<&Digest> {
683 match self {
684 Self::H1(s) => Some(s.digest()),
685 Self::H2(s) => s.digest(),
686 Self::Subrequest(s) => s.digest(),
687 Self::Custom(s) => s.digest(),
688 }
689 }
690
691 pub fn digest_mut(&mut self) -> Option<&mut Digest> {
695 match self {
696 Self::H1(s) => Some(s.digest_mut()),
697 Self::H2(s) => s.digest_mut(),
698 Self::Subrequest(s) => s.digest_mut(),
699 Self::Custom(s) => s.digest_mut(),
700 }
701 }
702
703 pub fn client_addr(&self) -> Option<&SocketAddr> {
705 match self {
706 Self::H1(s) => s.client_addr(),
707 Self::H2(s) => s.client_addr(),
708 Self::Subrequest(s) => s.client_addr(),
709 Self::Custom(s) => s.client_addr(),
710 }
711 }
712
713 pub fn server_addr(&self) -> Option<&SocketAddr> {
715 match self {
716 Self::H1(s) => s.server_addr(),
717 Self::H2(s) => s.server_addr(),
718 Self::Subrequest(s) => s.server_addr(),
719 Self::Custom(s) => s.server_addr(),
720 }
721 }
722
723 pub fn stream(&self) -> Option<&Stream> {
726 match self {
727 Self::H1(s) => Some(s.stream()),
728 Self::H2(_) => None,
729 Self::Subrequest(_) => None,
730 Self::Custom(_) => None,
731 }
732 }
733}