1use super::custom::server::Session as SessionCustom;
18use super::error_resp;
19use super::subrequest::server::HttpSession as SessionSubrequest;
20use super::v1::server::HttpSession as SessionV1;
21use super::v2::server::HttpSession as SessionV2;
22use super::HttpTask;
23use crate::custom_session;
24use crate::protocols::{Digest, SocketAddr, Stream};
25use bytes::Bytes;
26use http::HeaderValue;
27use http::{header::AsHeaderName, HeaderMap};
28use pingora_error::{Error, Result};
29use pingora_http::{RequestHeader, ResponseHeader};
30use std::time::Duration;
31
32pub enum Session {
34 H1(SessionV1),
35 H2(SessionV2),
36 Subrequest(SessionSubrequest),
37 Custom(Box<dyn SessionCustom>),
38}
39
40impl Session {
41 pub fn new_http1(stream: Stream) -> Self {
43 Self::H1(SessionV1::new(stream))
44 }
45
46 pub fn new_http2(session: SessionV2) -> Self {
48 Self::H2(session)
49 }
50
51 pub fn new_subrequest(session: SessionSubrequest) -> Self {
53 Self::Subrequest(session)
54 }
55
56 pub fn new_custom(session: Box<dyn SessionCustom>) -> Self {
58 Self::Custom(session)
59 }
60
61 pub fn is_http2(&self) -> bool {
63 matches!(self, Self::H2(_))
64 }
65
66 pub fn is_subrequest(&self) -> bool {
68 matches!(self, Self::Subrequest(_))
69 }
70
71 pub fn is_custom(&self) -> bool {
73 matches!(self, Self::Custom(_))
74 }
75
76 pub async fn read_request(&mut self) -> Result<bool> {
82 match self {
83 Self::H1(s) => {
84 let read = s.read_request().await?;
85 Ok(read.is_some())
86 }
87 Self::H2(_) => Ok(true),
89 Self::Subrequest(s) => {
90 let read = s.read_request().await?;
91 Ok(read.is_some())
92 }
93 Self::Custom(_) => Ok(true),
94 }
95 }
96
97 pub fn req_header(&self) -> &RequestHeader {
101 match self {
102 Self::H1(s) => s.req_header(),
103 Self::H2(s) => s.req_header(),
104 Self::Subrequest(s) => s.req_header(),
105 Self::Custom(s) => s.req_header(),
106 }
107 }
108
109 pub fn req_header_mut(&mut self) -> &mut RequestHeader {
113 match self {
114 Self::H1(s) => s.req_header_mut(),
115 Self::H2(s) => s.req_header_mut(),
116 Self::Subrequest(s) => s.req_header_mut(),
117 Self::Custom(s) => s.req_header_mut(),
118 }
119 }
120
121 pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
126 self.req_header().headers.get(key)
127 }
128
129 pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
132 self.get_header(key).map_or(b"", |v| v.as_bytes())
133 }
134
135 pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
137 match self {
138 Self::H1(s) => s.read_body_bytes().await,
139 Self::H2(s) => s.read_body_bytes().await,
140 Self::Subrequest(s) => s.read_body_bytes().await,
141 Self::Custom(s) => s.read_body_bytes().await,
142 }
143 }
144
145 pub async fn drain_request_body(&mut self) -> Result<()> {
150 match self {
151 Self::H1(s) => s.drain_request_body().await,
152 Self::H2(s) => s.drain_request_body().await,
153 Self::Subrequest(s) => s.drain_request_body().await,
154 Self::Custom(s) => s.drain_request_body().await,
155 }
156 }
157
158 pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
162 match self {
163 Self::H1(s) => {
164 s.write_response_header(resp).await?;
165 Ok(())
166 }
167 Self::H2(s) => s.write_response_header(resp, false),
168 Self::Subrequest(s) => {
169 s.write_response_header(resp).await?;
170 Ok(())
171 }
172 Self::Custom(s) => s.write_response_header(resp, false).await,
173 }
174 }
175
176 pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
178 match self {
179 Self::H1(s) => {
180 s.write_response_header_ref(resp).await?;
181 Ok(())
182 }
183 Self::H2(s) => s.write_response_header_ref(resp, false),
184 Self::Subrequest(s) => {
185 s.write_response_header_ref(resp).await?;
186 Ok(())
187 }
188 Self::Custom(s) => s.write_response_header_ref(resp, false).await,
189 }
190 }
191
192 pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
194 if data.is_empty() && !end {
195 return Ok(());
199 }
200 match self {
201 Self::H1(s) => {
202 if !data.is_empty() {
203 s.write_body(&data).await?;
204 }
205 if end {
206 s.finish_body().await?;
207 }
208 Ok(())
209 }
210 Self::H2(s) => s.write_body(data, end).await,
211 Self::Subrequest(s) => {
212 s.write_body(data).await?;
213 Ok(())
214 }
215 Self::Custom(s) => s.write_body(data, end).await,
216 }
217 }
218
219 pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
221 match self {
222 Self::H1(_) => Ok(()), Self::H2(s) => s.write_trailers(trailers),
224 Self::Subrequest(s) => s.write_trailers(Some(Box::new(trailers))).await,
225 Self::Custom(s) => s.write_trailers(trailers).await,
226 }
227 }
228
229 pub async fn finish(self) -> Result<Option<Stream>> {
234 match self {
235 Self::H1(mut s) => {
236 s.finish_body().await?;
238 s.reuse().await
239 }
240 Self::H2(mut s) => {
241 s.finish()?;
242 Ok(None)
243 }
244 Self::Subrequest(mut s) => {
245 s.finish().await?;
246 Ok(None)
247 }
248 Self::Custom(mut s) => {
249 s.finish().await?;
250 Ok(None)
251 }
252 }
253 }
254
255 pub fn on_proxy_failure(&mut self, e: Box<Error>) {
261 match self {
262 Self::H1(_) | Self::H2(_) | Self::Custom(_) => {
263 }
266 Self::Subrequest(ref mut s) => s.on_proxy_failure(e),
267 }
268 }
269
270 pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
271 match self {
272 Self::H1(s) => s.response_duplex_vec(tasks).await,
273 Self::H2(s) => s.response_duplex_vec(tasks).await,
274 Self::Subrequest(s) => s.response_duplex_vec(tasks).await,
275 Self::Custom(s) => s.response_duplex_vec(tasks).await,
276 }
277 }
278
279 pub fn set_keepalive(&mut self, duration: Option<u64>) {
282 match self {
283 Self::H1(s) => s.set_server_keepalive(duration),
284 Self::H2(_) => {}
285 Self::Subrequest(_) => {}
286 Self::Custom(_) => {}
287 }
288 }
289
290 pub fn get_keepalive(&self) -> Option<u64> {
293 match self {
294 Self::H1(s) => s.get_keepalive_timeout(),
295 Self::H2(_) => None,
296 Self::Subrequest(_) => None,
297 Self::Custom(_) => None,
298 }
299 }
300
301 pub fn set_keepalive_reuses_remaining(&mut self, reuses: Option<u32>) {
304 if let Self::H1(s) = self {
305 s.set_keepalive_reuses_remaining(reuses);
306 }
307 }
308
309 pub fn get_keepalive_reuses_remaining(&self) -> Option<u32> {
313 if let Self::H1(s) = self {
314 s.get_keepalive_reuses_remaining()
315 } else {
316 None
317 }
318 }
319
320 pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
325 match self {
326 Self::H1(s) => s.set_read_timeout(timeout),
327 Self::H2(_) => {}
328 Self::Subrequest(s) => s.set_read_timeout(timeout),
329 Self::Custom(c) => c.set_read_timeout(timeout),
330 }
331 }
332
333 pub fn get_read_timeout(&self) -> Option<Duration> {
335 match self {
336 Self::H1(s) => s.get_read_timeout(),
337 Self::H2(_) => None,
338 Self::Subrequest(s) => s.get_read_timeout(),
339 Self::Custom(s) => s.get_read_timeout(),
340 }
341 }
342
343 pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
347 match self {
348 Self::H1(s) => s.set_write_timeout(timeout),
349 Self::H2(s) => s.set_write_timeout(timeout),
350 Self::Subrequest(s) => s.set_write_timeout(timeout),
351 Self::Custom(c) => c.set_write_timeout(timeout),
352 }
353 }
354
355 pub fn get_write_timeout(&self) -> Option<Duration> {
357 match self {
358 Self::H1(s) => s.get_write_timeout(),
359 Self::H2(s) => s.get_write_timeout(),
360 Self::Subrequest(s) => s.get_write_timeout(),
361 Self::Custom(s) => s.get_write_timeout(),
362 }
363 }
364
365 pub fn set_total_drain_timeout(&mut self, timeout: Option<Duration>) {
372 match self {
373 Self::H1(s) => s.set_total_drain_timeout(timeout),
374 Self::H2(s) => s.set_total_drain_timeout(timeout),
375 Self::Subrequest(s) => s.set_total_drain_timeout(timeout),
376 Self::Custom(c) => c.set_total_drain_timeout(timeout),
377 }
378 }
379
380 pub fn get_total_drain_timeout(&self) -> Option<Duration> {
382 match self {
383 Self::H1(s) => s.get_total_drain_timeout(),
384 Self::H2(s) => s.get_total_drain_timeout(),
385 Self::Subrequest(s) => s.get_total_drain_timeout(),
386 Self::Custom(s) => s.get_total_drain_timeout(),
387 }
388 }
389
390 pub fn set_min_send_rate(&mut self, rate: Option<usize>) {
401 match self {
402 Self::H1(s) => s.set_min_send_rate(rate),
403 Self::H2(_) => {}
404 Self::Subrequest(_) => {}
405 Self::Custom(_) => {}
406 }
407 }
408
409 pub fn set_ignore_info_resp(&mut self, ignore: bool) {
418 match self {
419 Self::H1(s) => s.set_ignore_info_resp(ignore),
420 Self::H2(_) => {} Self::Subrequest(_) => {}
422 Self::Custom(_) => {} }
424 }
425
426 pub fn set_close_on_response_before_downstream_finish(&mut self, close: bool) {
431 match self {
432 Self::H1(s) => s.set_close_on_response_before_downstream_finish(close),
433 Self::H2(_) => {} Self::Subrequest(_) => {} Self::Custom(_) => {} }
437 }
438
439 pub fn request_summary(&self) -> String {
442 match self {
443 Self::H1(s) => s.request_summary(),
444 Self::H2(s) => s.request_summary(),
445 Self::Subrequest(s) => s.request_summary(),
446 Self::Custom(s) => s.request_summary(),
447 }
448 }
449
450 pub fn response_written(&self) -> Option<&ResponseHeader> {
453 match self {
454 Self::H1(s) => s.response_written(),
455 Self::H2(s) => s.response_written(),
456 Self::Subrequest(s) => s.response_written(),
457 Self::Custom(s) => s.response_written(),
458 }
459 }
460
461 pub async fn shutdown(&mut self) {
466 match self {
467 Self::H1(s) => s.shutdown().await,
468 Self::H2(s) => s.shutdown(),
469 Self::Subrequest(s) => s.shutdown(),
470 Self::Custom(s) => s.shutdown(0, "shutdown").await,
471 }
472 }
473
474 pub fn to_h1_raw(&self) -> Bytes {
475 match self {
476 Self::H1(s) => s.get_headers_raw_bytes(),
477 Self::H2(s) => s.pseudo_raw_h1_request_header(),
478 Self::Subrequest(s) => s.get_headers_raw_bytes(),
479 Self::Custom(c) => c.pseudo_raw_h1_request_header(),
480 }
481 }
482
483 pub fn is_body_done(&mut self) -> bool {
485 match self {
486 Self::H1(s) => s.is_body_done(),
487 Self::H2(s) => s.is_body_done(),
488 Self::Subrequest(s) => s.is_body_done(),
489 Self::Custom(s) => s.is_body_done(),
490 }
491 }
492
493 pub async fn finish_body(&mut self) -> Result<()> {
499 match self {
500 Self::H1(s) => s.finish_body().await.map(|_| ()),
501 Self::H2(s) => s.finish(),
502 Self::Subrequest(s) => s.finish().await.map(|_| ()),
503 Self::Custom(s) => s.finish().await,
504 }
505 }
506
507 pub fn generate_error(error: u16) -> ResponseHeader {
508 match error {
509 502 => error_resp::HTTP_502_RESPONSE.clone(),
511 400 => error_resp::HTTP_400_RESPONSE.clone(),
512 _ => error_resp::gen_error_response(error),
513 }
514 }
515
516 pub async fn respond_error(&mut self, error: u16) -> Result<()> {
518 self.respond_error_with_body(error, Bytes::default()).await
519 }
520
521 pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
523 let mut resp = Self::generate_error(error);
524 if !body.is_empty() {
525 resp.set_content_length(body.len())?
527 }
528 self.write_error_response(resp, body).await
529 }
530
531 pub async fn write_error_response(&mut self, resp: ResponseHeader, body: Bytes) -> Result<()> {
533 self.set_keepalive(None);
540
541 if let Some(resp_written) = self.response_written().as_ref() {
545 if !resp_written.status.is_informational() || resp_written.status == 101 {
546 return Ok(());
547 }
548 }
549
550 self.write_response_header(Box::new(resp)).await?;
551
552 if !body.is_empty() {
553 self.write_response_body(body, true).await?;
554 } else {
555 self.finish_body().await?;
556 }
557
558 custom_session!(self.finish_custom().await?);
559
560 Ok(())
561 }
562
563 pub fn is_body_empty(&mut self) -> bool {
565 match self {
566 Self::H1(s) => s.is_body_empty(),
567 Self::H2(s) => s.is_body_empty(),
568 Self::Subrequest(s) => s.is_body_empty(),
569 Self::Custom(s) => s.is_body_empty(),
570 }
571 }
572
573 pub fn retry_buffer_truncated(&self) -> bool {
574 match self {
575 Self::H1(s) => s.retry_buffer_truncated(),
576 Self::H2(s) => s.retry_buffer_truncated(),
577 Self::Subrequest(s) => s.retry_buffer_truncated(),
578 Self::Custom(s) => s.retry_buffer_truncated(),
579 }
580 }
581
582 pub fn enable_retry_buffering(&mut self) {
583 match self {
584 Self::H1(s) => s.enable_retry_buffering(),
585 Self::H2(s) => s.enable_retry_buffering(),
586 Self::Subrequest(s) => s.enable_retry_buffering(),
587 Self::Custom(s) => s.enable_retry_buffering(),
588 }
589 }
590
591 pub fn get_retry_buffer(&self) -> Option<Bytes> {
592 match self {
593 Self::H1(s) => s.get_retry_buffer(),
594 Self::H2(s) => s.get_retry_buffer(),
595 Self::Subrequest(s) => s.get_retry_buffer(),
596 Self::Custom(s) => s.get_retry_buffer(),
597 }
598 }
599
600 pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
603 match self {
604 Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
605 Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
606 Self::Subrequest(s) => s.read_body_or_idle(no_body_expected).await,
607 Self::Custom(s) => s.read_body_or_idle(no_body_expected).await,
608 }
609 }
610
611 pub fn as_http1(&self) -> Option<&SessionV1> {
612 match self {
613 Self::H1(s) => Some(s),
614 Self::H2(_) => None,
615 Self::Subrequest(_) => None,
616 Self::Custom(_) => None,
617 }
618 }
619
620 pub fn as_http2(&self) -> Option<&SessionV2> {
621 match self {
622 Self::H1(_) => None,
623 Self::H2(s) => Some(s),
624 Self::Subrequest(_) => None,
625 Self::Custom(_) => None,
626 }
627 }
628
629 pub fn as_subrequest(&self) -> Option<&SessionSubrequest> {
630 match self {
631 Self::H1(_) => None,
632 Self::H2(_) => None,
633 Self::Subrequest(s) => Some(s),
634 Self::Custom(_) => None,
635 }
636 }
637
638 pub fn as_subrequest_mut(&mut self) -> Option<&mut SessionSubrequest> {
639 match self {
640 Self::H1(_) => None,
641 Self::H2(_) => None,
642 Self::Subrequest(s) => Some(s),
643 Self::Custom(_) => None,
644 }
645 }
646
647 pub fn as_custom(&self) -> Option<&dyn SessionCustom> {
648 match self {
649 Self::H1(_) => None,
650 Self::H2(_) => None,
651 Self::Subrequest(_) => None,
652 Self::Custom(c) => Some(c.as_ref()),
653 }
654 }
655
656 pub fn as_custom_mut(&mut self) -> Option<&mut Box<dyn SessionCustom>> {
657 match self {
658 Self::H1(_) => None,
659 Self::H2(_) => None,
660 Self::Subrequest(_) => None,
661 Self::Custom(c) => Some(c),
662 }
663 }
664
665 pub async fn write_continue_response(&mut self) -> Result<()> {
667 match self {
668 Self::H1(s) => s.write_continue_response().await,
669 Self::H2(s) => s.write_response_header(
670 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
671 false,
672 ),
673 Self::Subrequest(s) => s.write_continue_response().await,
674 Self::Custom(s) => {
676 s.write_response_header(
677 Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
678 false,
679 )
680 .await
681 }
682 }
683 }
684
685 pub fn is_upgrade_req(&self) -> bool {
687 match self {
688 Self::H1(s) => s.is_upgrade_req(),
689 Self::H2(_) => false,
690 Self::Subrequest(s) => s.is_upgrade_req(),
691 Self::Custom(s) => s.is_upgrade_req(),
692 }
693 }
694
695 pub fn was_upgraded(&self) -> bool {
697 match self {
698 Self::H1(s) => s.was_upgraded(),
699 Self::H2(_) => false,
700 Self::Subrequest(s) => s.was_upgraded(),
701 Self::Custom(s) => s.was_upgraded(),
702 }
703 }
704
705 pub fn body_bytes_sent(&self) -> usize {
707 match self {
708 Self::H1(s) => s.body_bytes_sent(),
709 Self::H2(s) => s.body_bytes_sent(),
710 Self::Subrequest(s) => s.body_bytes_sent(),
711 Self::Custom(s) => s.body_bytes_sent(),
712 }
713 }
714
715 pub fn body_bytes_read(&self) -> usize {
717 match self {
718 Self::H1(s) => s.body_bytes_read(),
719 Self::H2(s) => s.body_bytes_read(),
720 Self::Subrequest(s) => s.body_bytes_read(),
721 Self::Custom(s) => s.body_bytes_read(),
722 }
723 }
724
725 pub fn digest(&self) -> Option<&Digest> {
727 match self {
728 Self::H1(s) => Some(s.digest()),
729 Self::H2(s) => s.digest(),
730 Self::Subrequest(s) => s.digest(),
731 Self::Custom(s) => s.digest(),
732 }
733 }
734
735 pub fn digest_mut(&mut self) -> Option<&mut Digest> {
739 match self {
740 Self::H1(s) => Some(s.digest_mut()),
741 Self::H2(s) => s.digest_mut(),
742 Self::Subrequest(s) => s.digest_mut(),
743 Self::Custom(s) => s.digest_mut(),
744 }
745 }
746
747 pub fn client_addr(&self) -> Option<&SocketAddr> {
749 match self {
750 Self::H1(s) => s.client_addr(),
751 Self::H2(s) => s.client_addr(),
752 Self::Subrequest(s) => s.client_addr(),
753 Self::Custom(s) => s.client_addr(),
754 }
755 }
756
757 pub fn server_addr(&self) -> Option<&SocketAddr> {
759 match self {
760 Self::H1(s) => s.server_addr(),
761 Self::H2(s) => s.server_addr(),
762 Self::Subrequest(s) => s.server_addr(),
763 Self::Custom(s) => s.server_addr(),
764 }
765 }
766
767 pub fn stream(&self) -> Option<&Stream> {
770 match self {
771 Self::H1(s) => Some(s.stream()),
772 Self::H2(_) => None,
773 Self::Subrequest(_) => None,
774 Self::Custom(_) => None,
775 }
776 }
777}