1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
10
11use mio::{Token, net::TcpStream};
12use rusty_ulid::Ulid;
13use sozu_command::{
14 config::MAX_LOOP_ITERATIONS,
15 logging::{EndpointRecord, LogContext, ansi_palette},
16};
17
18use crate::metrics::names;
19use crate::{
20 L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
21 backends::Backend,
22 pool::Checkout,
23 protocol::{SessionState, http::parser::Method},
24 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
25 sozu_command::ready::Ready,
26 timer::TimeoutContainer,
27};
28
29#[cfg(all(target_os = "linux", feature = "splice"))]
30use crate::splice::{self, SplicePipe};
31
32macro_rules! log_context {
39 ($self:expr) => {{
40 let (open, reset, grey, gray, white) = ansi_palette();
41 format!(
42 "{gray}{ctx}{reset}\t{open}PIPE{reset}\t{grey}Session{reset}({gray}address{reset}={white}{address}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}frontend_status{reset}={white}{frontend_status:?}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_status{reset}={white}{backend_status:?}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
43 open = open,
44 reset = reset,
45 grey = grey,
46 gray = gray,
47 white = white,
48 ctx = $self.log_context(),
49 address = $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
50 frontend = $self.frontend_token.0,
51 frontend_readiness = $self.frontend_readiness,
52 frontend_status = $self.frontend_status,
53 backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
54 backend_status = $self.backend_status,
55 backend_readiness = $self.backend_readiness,
56 )
57 }};
58}
59
60#[derive(PartialEq, Eq)]
61pub enum SessionStatus {
62 Normal,
63 DefaultAnswer,
64}
65
66#[derive(Copy, Clone, Debug)]
67enum ConnectionStatus {
68 Normal,
69 ReadOpen,
70 WriteOpen,
71 Closed,
72}
73
74pub enum WebSocketContext {
76 Http {
77 method: Option<Method>,
78 authority: Option<String>,
79 path: Option<String>,
80 status: Option<u16>,
81 reason: Option<String>,
82 },
83 Tcp,
84}
85
86pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
87 backend_buffer: Checkout,
88 backend_id: Option<String>,
89 pub backend_readiness: Readiness,
90 backend_socket: Option<TcpStream>,
91 backend_status: ConnectionStatus,
92 backend_token: Option<Token>,
93 pub backend: Option<Rc<RefCell<Backend>>>,
94 cluster_id: Option<String>,
95 pub container_backend_timeout: Option<TimeoutContainer>,
96 pub container_frontend_timeout: Option<TimeoutContainer>,
97 frontend_buffer: Checkout,
98 pub frontend_readiness: Readiness,
99 frontend_status: ConnectionStatus,
100 frontend_token: Token,
101 frontend: Front,
102 listener: Rc<RefCell<L>>,
103 protocol: Protocol,
104 session_id: Ulid,
107 request_id: Ulid,
108 session_address: Option<SocketAddr>,
109 websocket_context: WebSocketContext,
110 tls_version: Option<&'static str>,
115 tls_cipher: Option<&'static str>,
116 tls_sni: Option<String>,
119 tls_alpn: Option<&'static str>,
120 #[cfg(all(target_os = "linux", feature = "splice"))]
125 splice_pipe: Option<SplicePipe>,
126}
127
128impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
129 #[allow(clippy::too_many_arguments)]
138 pub fn new(
139 backend_buffer: Checkout,
140 backend_id: Option<String>,
141 backend_socket: Option<TcpStream>,
142 backend: Option<Rc<RefCell<Backend>>>,
143 container_backend_timeout: Option<TimeoutContainer>,
144 container_frontend_timeout: Option<TimeoutContainer>,
145 cluster_id: Option<String>,
146 frontend_buffer: Checkout,
147 frontend_token: Token,
148 frontend: Front,
149 listener: Rc<RefCell<L>>,
150 protocol: Protocol,
151 session_id: Ulid,
152 request_id: Ulid,
153 session_address: Option<SocketAddr>,
154 websocket_context: WebSocketContext,
155 ) -> Pipe<Front, L> {
156 let frontend_status = ConnectionStatus::Normal;
157 let backend_status = if backend_socket.is_none() {
158 ConnectionStatus::Closed
159 } else {
160 ConnectionStatus::Normal
161 };
162
163 let session = Pipe {
164 backend_buffer,
165 backend_id,
166 backend_readiness: Readiness {
167 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
168 event: Ready::EMPTY,
169 },
170 backend_socket,
171 backend_status,
172 backend_token: None,
173 backend,
174 cluster_id,
175 container_backend_timeout,
176 container_frontend_timeout,
177 frontend_buffer,
178 frontend_readiness: Readiness {
179 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
180 event: Ready::EMPTY,
181 },
182 frontend_status,
183 frontend_token,
184 frontend,
185 listener,
186 protocol,
187 session_id,
188 request_id,
189 session_address,
190 websocket_context,
191 tls_version: None,
192 tls_cipher: None,
193 tls_sni: None,
194 tls_alpn: None,
195 #[cfg(all(target_os = "linux", feature = "splice"))]
196 splice_pipe: if protocol == Protocol::TCP {
197 SplicePipe::new()
198 } else {
199 None
200 },
201 };
202
203 trace!("{} created pipe", log_context!(session));
204 session
205 }
206
207 pub fn set_tls_metadata(
214 &mut self,
215 version: Option<&'static str>,
216 cipher: Option<&'static str>,
217 sni: Option<String>,
218 alpn: Option<&'static str>,
219 ) {
220 self.tls_version = version;
221 self.tls_cipher = cipher;
222 self.tls_sni = sni;
223 self.tls_alpn = alpn;
224 }
225
226 pub fn front_socket(&self) -> &TcpStream {
227 self.frontend.socket_ref()
228 }
229
230 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
231 self.frontend.socket_mut()
232 }
233
234 pub fn back_socket(&self) -> Option<&TcpStream> {
235 self.backend_socket.as_ref()
236 }
237
238 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
239 self.backend_socket.as_mut()
240 }
241
242 pub fn set_back_socket(&mut self, socket: TcpStream) {
243 self.backend_socket = Some(socket);
244 self.backend_status = ConnectionStatus::Normal;
245 }
246
247 pub fn back_token(&self) -> Vec<Token> {
248 self.backend_token.iter().cloned().collect()
249 }
250
251 fn reset_timeouts(&mut self) {
252 if let Some(t) = self.container_frontend_timeout.as_mut() {
253 if !t.reset() {
254 error!(
255 "{} Could not reset front timeout (pipe)",
256 log_context!(self)
257 );
258 }
259 }
260
261 if let Some(t) = self.container_backend_timeout.as_mut() {
262 if !t.reset() {
263 error!("{} Could not reset back timeout (pipe)", log_context!(self));
264 }
265 }
266 }
267
268 pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
269 self.cluster_id = cluster_id;
270 }
271
272 pub fn set_backend_id(&mut self, backend_id: Option<String>) {
273 self.backend_id = backend_id;
274 }
275
276 pub fn set_back_token(&mut self, token: Token) {
277 self.backend_token = Some(token);
278 }
279
280 pub fn get_session_address(&self) -> Option<SocketAddr> {
281 self.session_address
282 .or_else(|| self.frontend.socket_ref().peer_addr().ok())
283 }
284
285 pub fn get_backend_address(&self) -> Option<SocketAddr> {
286 self.backend_socket
287 .as_ref()
288 .and_then(|backend| backend.peer_addr().ok())
289 }
290
291 fn protocol_string(&self) -> &'static str {
292 match self.protocol {
293 Protocol::TCP => "TCP",
294 Protocol::HTTP => "WS",
295 Protocol::HTTPS => match self.frontend.protocol() {
296 TransportProtocol::Ssl2 => "WSS-SSL2",
297 TransportProtocol::Ssl3 => "WSS-SSL3",
298 TransportProtocol::Tls1_0 => "WSS-TLS1.0",
299 TransportProtocol::Tls1_1 => "WSS-TLS1.1",
300 TransportProtocol::Tls1_2 => "WSS-TLS1.2",
301 TransportProtocol::Tls1_3 => "WSS-TLS1.3",
302 _ => unreachable!(),
303 },
304 _ => unreachable!(),
305 }
306 }
307
308 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
309 let listener = self.listener.borrow();
310 let context = self.log_context();
311 let endpoint = self.log_endpoint();
312 metrics.register_end_of_session(&context);
313 log_access!(
314 error,
315 on_failure: { incr!(names::access_logs::UNSENT) },
316 message,
317 context,
318 session_address: self.get_session_address(),
319 backend_address: self.get_backend_address(),
320 protocol: self.protocol_string(),
321 endpoint,
322 tags: listener.get_tags(&listener.get_addr().to_string()),
323 client_rtt: socket_rtt(self.front_socket()),
324 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
325 service_time: metrics.service_time(),
326 response_time: metrics.backend_response_time(),
327 request_time: metrics.request_time(),
328 start_time_ns: metrics.start_wall_ns(),
329 bytes_in: metrics.bin,
330 bytes_out: metrics.bout,
331 user_agent: None,
332 x_request_id: None,
333 tls_version: self.tls_version,
338 tls_cipher: self.tls_cipher,
339 tls_sni: self.tls_sni.as_deref(),
340 tls_alpn: self.tls_alpn,
341 xff_chain: None,
342 otel: None,
343 );
344 }
345
346 pub fn log_request_success(&self, metrics: &SessionMetrics) {
347 self.log_request(metrics, false, None);
348 }
349
350 pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
351 incr!(names::pipe::ERRORS);
352 error!(
353 "{} Could not process request properly got: {}",
354 log_context!(self),
355 message
356 );
357 self.print_state(self.protocol_string());
358 self.log_request(metrics, true, Some(message));
359 }
360
361 pub fn log_request_timeout(&self, metrics: &SessionMetrics, message: &str) {
368 debug!("{} pipe timeout: {}", log_context!(self), message);
369 self.log_request(metrics, true, Some(message));
370 }
371
372 #[cfg(all(target_os = "linux", feature = "splice"))]
377 fn splice_in_pending(&self) -> usize {
378 self.splice_pipe
379 .as_ref()
380 .map(|p| p.in_pipe_pending)
381 .unwrap_or(0)
382 }
383 #[cfg(not(all(target_os = "linux", feature = "splice")))]
384 fn splice_in_pending(&self) -> usize {
385 0
386 }
387
388 #[cfg(all(target_os = "linux", feature = "splice"))]
392 fn splice_out_pending(&self) -> usize {
393 self.splice_pipe
394 .as_ref()
395 .map(|p| p.out_pipe_pending)
396 .unwrap_or(0)
397 }
398 #[cfg(not(all(target_os = "linux", feature = "splice")))]
399 fn splice_out_pending(&self) -> usize {
400 0
401 }
402
403 #[cfg(all(target_os = "linux", feature = "splice"))]
407 fn splice_capacity(&self) -> usize {
408 self.splice_pipe.as_ref().map(|p| p.capacity).unwrap_or(0)
409 }
410
411 fn reset_readiness_for_close(&mut self) {
421 self.frontend_readiness.reset();
422 self.backend_readiness.reset();
423 debug_assert!(
424 self.frontend_readiness.interest.is_empty() && self.frontend_readiness.event.is_empty(),
425 "frontend readiness must be fully cleared on close (write-only-shutdown discipline)"
426 );
427 debug_assert!(
428 self.backend_readiness.interest.is_empty() && self.backend_readiness.event.is_empty(),
429 "backend readiness must be fully cleared on close (write-only-shutdown discipline)"
430 );
431 }
432
433 pub fn check_connections(&self) -> bool {
436 debug_assert!(
444 self.frontend_buffer.available_data() <= self.frontend_buffer.capacity(),
445 "frontend buffered data exceeds its capacity"
446 );
447 debug_assert!(
448 self.backend_buffer.available_data() <= self.backend_buffer.capacity(),
449 "backend buffered data exceeds its capacity"
450 );
451
452 let request_is_inflight = self.frontend_buffer.available_data() > 0
453 || self.frontend_readiness.event.is_readable()
454 || self.splice_in_pending() > 0;
455 let response_is_inflight = self.backend_buffer.available_data() > 0
456 || self.backend_readiness.event.is_readable()
457 || self.splice_out_pending() > 0;
458 match (self.frontend_status, self.backend_status) {
459 (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
460 (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
461 (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
462 request_is_inflight || response_is_inflight
467 }
468 (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
469
470 (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
471 request_is_inflight || response_is_inflight
474 }
475 (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
476 (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
477 request_is_inflight || response_is_inflight
478 }
479 (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
480
481 (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
482 (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
483 (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
484 (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
485
486 (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
487 (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
488 (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
489 (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
490 }
491 }
492
493 pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
494 self.log_request_success(metrics);
495 self.frontend_status = ConnectionStatus::Closed;
496 SessionResult::Close
497 }
498
499 pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
500 self.backend_status = ConnectionStatus::Closed;
501 debug_assert!(
504 matches!(self.backend_status, ConnectionStatus::Closed),
505 "backend_hup must mark the backend Closed"
506 );
507 let pipe_has_data = self.splice_out_pending() > 0;
508 if self.backend_buffer.available_data() == 0 && !pipe_has_data {
509 debug_assert_eq!(
512 self.backend_buffer.available_data(),
513 0,
514 "no-data branch entered with response bytes still buffered"
515 );
516 if self.backend_readiness.event.is_readable() {
517 self.backend_readiness.interest.insert(Ready::READABLE);
518 debug!(
519 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
520 log_context!(self)
521 );
522 SessionResult::Continue
523 } else {
524 self.log_request_success(metrics);
525 SessionResult::Close
526 }
527 } else {
528 debug!(
529 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
530 log_context!(self)
531 );
532 self.frontend_readiness.interest.insert(Ready::WRITABLE);
533 if self.backend_readiness.event.is_readable() {
534 self.backend_readiness.interest.insert(Ready::READABLE);
535 }
536 SessionResult::Continue
537 }
538 }
539
540 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
542 #[cfg(all(target_os = "linux", feature = "splice"))]
543 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
544 return self.splice_readable(metrics);
545 }
546
547 self.reset_timeouts();
548
549 trace!("{} pipe readable", log_context!(self));
550 if self.frontend_buffer.available_space() == 0 {
551 self.frontend_readiness.interest.remove(Ready::READABLE);
552 self.backend_readiness.interest.insert(Ready::WRITABLE);
553 return SessionResult::Continue;
554 }
555
556 let space_before = self.frontend_buffer.available_space();
557 let data_before = self.frontend_buffer.available_data();
558 let bin_before = metrics.bin;
559 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
560 debug_assert!(
563 sz <= space_before,
564 "frontend socket_read reported more bytes ({sz}) than the buffer space offered ({space_before})"
565 );
566 debug!("{} Read {} bytes", log_context!(self), sz);
567
568 if sz > 0 {
569 self.frontend_buffer.fill(sz);
571 debug_assert_eq!(
574 self.frontend_buffer.available_data(),
575 data_before + sz,
576 "fill must grow readable data by exactly the bytes read"
577 );
578
579 count!(names::backend::BYTES_IN, sz as i64);
580 metrics.bin += sz;
581 debug_assert_eq!(
583 metrics.bin,
584 bin_before + sz,
585 "metrics.bin must advance by exactly the bytes read"
586 );
587
588 if self.frontend_buffer.available_space() == 0 {
589 self.frontend_readiness.interest.remove(Ready::READABLE);
590 }
591 self.backend_readiness.interest.insert(Ready::WRITABLE);
592 } else {
593 self.frontend_readiness.event.remove(Ready::READABLE);
594
595 if res == SocketResult::Continue {
596 self.frontend_status = match self.frontend_status {
597 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
598 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
599 s => s,
600 };
601 }
602 }
603
604 if !self.check_connections() {
605 self.reset_readiness_for_close();
606 self.log_request_success(metrics);
607 return SessionResult::Close;
608 }
609
610 match res {
611 SocketResult::Error => {
612 self.reset_readiness_for_close();
613 self.log_request_error(metrics, "front socket read error");
614 return SessionResult::Close;
615 }
616 SocketResult::Closed => {
617 self.reset_readiness_for_close();
618 self.log_request_success(metrics);
619 return SessionResult::Close;
620 }
621 SocketResult::WouldBlock => {
622 self.frontend_readiness.event.remove(Ready::READABLE);
623 }
624 SocketResult::Continue => {}
625 };
626
627 self.backend_readiness.interest.insert(Ready::WRITABLE);
628 SessionResult::Continue
629 }
630
631 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
633 #[cfg(all(target_os = "linux", feature = "splice"))]
634 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
635 return self.splice_writable(metrics);
636 }
637
638 trace!("{} Pipe writable", log_context!(self));
639 if self.backend_buffer.available_data() == 0 {
640 self.backend_readiness.interest.insert(Ready::READABLE);
641 self.frontend_readiness.interest.remove(Ready::WRITABLE);
642 return SessionResult::Continue;
643 }
644
645 let queued_total = self.backend_buffer.available_data();
646 let mut sz = 0usize;
647 let mut res = SocketResult::Continue;
648 while res == SocketResult::Continue {
649 if self.backend_buffer.available_data() == 0 {
651 count!(names::backend::BYTES_OUT, sz as i64);
652 metrics.bout += sz;
653 self.backend_readiness.interest.insert(Ready::READABLE);
654 self.frontend_readiness.interest.remove(Ready::WRITABLE);
655 return SessionResult::Continue;
656 }
657 let queued = self.backend_buffer.available_data();
658 let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
659 debug_assert!(
662 current_sz <= queued,
663 "frontend socket_write reported {current_sz} bytes but only {queued} were queued"
664 );
665 res = current_res;
666 let consumed = self.backend_buffer.consume(current_sz);
667 debug_assert_eq!(
670 consumed, current_sz,
671 "consume must drop exactly the bytes written to the frontend"
672 );
673 sz += current_sz;
674 debug_assert!(
676 sz <= queued_total,
677 "cumulative frontend write ({sz}) exceeded the queued backend data ({queued_total})"
678 );
679
680 if current_sz == 0 && res == SocketResult::Continue {
681 self.frontend_status = match self.frontend_status {
682 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
683 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
684 s => s,
685 };
686 }
687
688 if !self.check_connections() {
689 metrics.bout += sz;
690 count!(names::backend::BYTES_OUT, sz as i64);
691 self.reset_readiness_for_close();
692 self.log_request_success(metrics);
693 return SessionResult::Close;
694 }
695 }
696
697 if sz > 0 {
698 count!(names::backend::BYTES_OUT, sz as i64);
699 self.backend_readiness.interest.insert(Ready::READABLE);
700 metrics.bout += sz;
701 }
702
703 debug!(
704 "{} Wrote {} bytes of {}",
705 log_context!(self),
706 sz,
707 self.backend_buffer.available_data()
708 );
709
710 match res {
711 SocketResult::Error => {
712 self.reset_readiness_for_close();
713 self.log_request_error(metrics, "front socket write error");
714 return SessionResult::Close;
715 }
716 SocketResult::Closed => {
717 self.reset_readiness_for_close();
718 self.log_request_success(metrics);
719 return SessionResult::Close;
720 }
721 SocketResult::WouldBlock => {
722 self.frontend_readiness.event.remove(Ready::WRITABLE);
723 }
724 SocketResult::Continue => {}
725 }
726
727 SessionResult::Continue
728 }
729
730 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
732 #[cfg(all(target_os = "linux", feature = "splice"))]
733 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
734 return self.splice_backend_writable(metrics);
735 }
736
737 trace!("{} pipe back_writable", log_context!(self));
738
739 if self.frontend_buffer.available_data() == 0 {
740 self.frontend_readiness.interest.insert(Ready::READABLE);
741 self.backend_readiness.interest.remove(Ready::WRITABLE);
742 return SessionResult::Continue;
743 }
744
745 let output_size = self.frontend_buffer.available_data();
746
747 let mut sz = 0usize;
748 let mut socket_res = SocketResult::Continue;
749
750 if let Some(ref mut backend) = self.backend_socket {
751 while socket_res == SocketResult::Continue {
752 if self.frontend_buffer.available_data() == 0 {
754 self.frontend_readiness.interest.insert(Ready::READABLE);
755 self.backend_readiness.interest.remove(Ready::WRITABLE);
756 count!(names::backend::BACK_BYTES_OUT, sz as i64);
757 metrics.backend_bout += sz;
758 return SessionResult::Continue;
759 }
760
761 let queued = self.frontend_buffer.available_data();
762 let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
763 debug_assert!(
765 current_sz <= queued,
766 "backend socket_write reported {current_sz} bytes but only {queued} were queued"
767 );
768 socket_res = current_res;
769 let consumed = self.frontend_buffer.consume(current_sz);
770 debug_assert_eq!(
771 consumed, current_sz,
772 "consume must drop exactly the bytes written to the backend"
773 );
774 sz += current_sz;
775 debug_assert!(
777 sz <= output_size,
778 "cumulative backend write ({sz}) exceeded the queued frontend data ({output_size})"
779 );
780
781 if current_sz == 0 && current_res == SocketResult::Continue {
782 self.backend_status = match self.backend_status {
783 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
784 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
785 s => s,
786 };
787 }
788 }
789 }
790
791 let backend_bout_before = metrics.backend_bout;
792 count!(names::backend::BACK_BYTES_OUT, sz as i64);
793 metrics.backend_bout += sz;
794 debug_assert_eq!(
796 metrics.backend_bout,
797 backend_bout_before + sz,
798 "metrics.backend_bout must advance by exactly the bytes written"
799 );
800
801 if !self.check_connections() {
802 self.reset_readiness_for_close();
803 self.log_request_success(metrics);
804 return SessionResult::Close;
805 }
806
807 debug!(
808 "{} Wrote {} bytes of {}",
809 log_context!(self),
810 sz,
811 output_size
812 );
813
814 match socket_res {
815 SocketResult::Error => {
816 self.reset_readiness_for_close();
817 self.log_request_error(metrics, "back socket write error");
818 return SessionResult::Close;
819 }
820 SocketResult::Closed => {
821 self.reset_readiness_for_close();
822 self.log_request_success(metrics);
823 return SessionResult::Close;
824 }
825 SocketResult::WouldBlock => {
826 self.backend_readiness.event.remove(Ready::WRITABLE);
827 }
828 SocketResult::Continue => {}
829 }
830 SessionResult::Continue
831 }
832
833 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
835 #[cfg(all(target_os = "linux", feature = "splice"))]
836 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
837 return self.splice_backend_readable(metrics);
838 }
839
840 self.reset_timeouts();
841
842 trace!("{} Pipe backend_readable", log_context!(self));
843 if self.backend_buffer.available_space() == 0 {
844 self.backend_readiness.interest.remove(Ready::READABLE);
845 return SessionResult::Continue;
846 }
847
848 let space_before = self.backend_buffer.available_space();
849 let data_before = self.backend_buffer.available_data();
850 let backend_bin_before = metrics.backend_bin;
851 if let Some(ref mut backend) = self.backend_socket {
852 let (size, remaining) = backend.socket_read(self.backend_buffer.space());
853 debug_assert!(
855 size <= space_before,
856 "backend socket_read reported more bytes ({size}) than the buffer space offered ({space_before})"
857 );
858 self.backend_buffer.fill(size);
859 debug_assert_eq!(
862 self.backend_buffer.available_data(),
863 data_before + size,
864 "fill must grow readable data by exactly the bytes read"
865 );
866
867 debug!("{} Read {} bytes", log_context!(self), size);
868
869 if remaining != SocketResult::Continue || size == 0 {
870 self.backend_readiness.event.remove(Ready::READABLE);
871 }
872 if size > 0 {
873 self.frontend_readiness.interest.insert(Ready::WRITABLE);
874 count!(names::backend::BACK_BYTES_IN, size as i64);
875 metrics.backend_bin += size;
876 debug_assert_eq!(
878 metrics.backend_bin,
879 backend_bin_before + size,
880 "metrics.backend_bin must advance by exactly the bytes read"
881 );
882 }
883
884 if size == 0 && remaining == SocketResult::Closed {
885 self.backend_status = match self.backend_status {
886 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
887 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
888 s => s,
889 };
890
891 if !self.check_connections() {
892 self.reset_readiness_for_close();
893 self.log_request_success(metrics);
894 return SessionResult::Close;
895 }
896 }
897
898 match remaining {
899 SocketResult::Error => {
900 self.reset_readiness_for_close();
901 self.log_request_error(metrics, "back socket read error");
902 return SessionResult::Close;
903 }
904 SocketResult::Closed => {
905 if !self.check_connections() {
906 self.reset_readiness_for_close();
907 self.log_request_success(metrics);
908 return SessionResult::Close;
909 }
910 }
911 SocketResult::WouldBlock => {
912 self.backend_readiness.event.remove(Ready::READABLE);
913 }
914 SocketResult::Continue => {}
915 }
916 }
917
918 SessionResult::Continue
919 }
920
921 #[cfg(all(target_os = "linux", feature = "splice"))]
929 fn splice_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
930 self.reset_timeouts();
931
932 trace!("{} pipe splice_readable", log_context!(self));
933 let capacity = self.splice_capacity();
934 if self.splice_in_pending() >= capacity {
935 self.frontend_readiness.interest.remove(Ready::READABLE);
937 self.backend_readiness.interest.insert(Ready::WRITABLE);
938 return SessionResult::Continue;
939 }
940
941 let pending_before = self.splice_in_pending();
942 let bin_before = metrics.bin;
943 let pipe_write_end = self.splice_pipe.as_ref().unwrap().in_pipe[1];
944 let (sz, res) = splice::splice_in(self.frontend.socket_ref(), pipe_write_end, capacity);
945 debug_assert!(
954 sz <= capacity,
955 "splice_in reported {sz} bytes but was capped at len {capacity}"
956 );
957 debug!("{} Spliced {} bytes from frontend", log_context!(self), sz);
958
959 if sz > 0 {
960 self.splice_pipe.as_mut().unwrap().in_pipe_pending += sz;
961 debug_assert_eq!(
964 self.splice_in_pending(),
965 pending_before + sz,
966 "in_pipe_pending must grow by exactly the spliced bytes"
967 );
968 count!(names::backend::BYTES_IN, sz as i64);
969 metrics.bin += sz;
970 debug_assert_eq!(
971 metrics.bin,
972 bin_before + sz,
973 "metrics.bin must advance by exactly the spliced bytes"
974 );
975 self.backend_readiness.interest.insert(Ready::WRITABLE);
976 } else {
977 self.frontend_readiness.event.remove(Ready::READABLE);
978
979 if res == SocketResult::Continue {
980 self.frontend_status = match self.frontend_status {
981 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
982 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
983 s => s,
984 };
985 }
986 }
987
988 if !self.check_connections() {
989 self.reset_readiness_for_close();
990 self.log_request_success(metrics);
991 return SessionResult::Close;
992 }
993
994 match res {
995 SocketResult::Error => {
996 self.reset_readiness_for_close();
997 self.log_request_error(metrics, "splice front socket read error");
998 return SessionResult::Close;
999 }
1000 SocketResult::Closed => {
1001 self.reset_readiness_for_close();
1002 self.log_request_success(metrics);
1003 return SessionResult::Close;
1004 }
1005 SocketResult::WouldBlock => {
1006 self.frontend_readiness.event.remove(Ready::READABLE);
1007 }
1008 SocketResult::Continue => {}
1009 }
1010
1011 self.backend_readiness.interest.insert(Ready::WRITABLE);
1012 SessionResult::Continue
1013 }
1014
1015 #[cfg(all(target_os = "linux", feature = "splice"))]
1020 fn splice_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1021 trace!("{} Pipe splice_writable", log_context!(self));
1022 if self.splice_out_pending() == 0 {
1023 self.backend_readiness.interest.insert(Ready::READABLE);
1024 self.frontend_readiness.interest.remove(Ready::WRITABLE);
1025 return SessionResult::Continue;
1026 }
1027
1028 let mut sz = 0usize;
1029 let mut res = SocketResult::Continue;
1030 while res == SocketResult::Continue {
1031 let pending = self.splice_out_pending();
1032 if pending == 0 {
1034 count!(names::backend::BYTES_OUT, sz as i64);
1035 metrics.bout += sz;
1036 self.backend_readiness.interest.insert(Ready::READABLE);
1037 self.frontend_readiness.interest.remove(Ready::WRITABLE);
1038 return SessionResult::Continue;
1039 }
1040
1041 let pipe_read_end = self.splice_pipe.as_ref().unwrap().out_pipe[0];
1042 let (current_sz, current_res) =
1043 splice::splice_out(pipe_read_end, self.frontend.socket_ref(), pending);
1044 debug_assert!(
1048 current_sz <= pending,
1049 "splice_out drained {current_sz} bytes but only {pending} were pending (would underflow)"
1050 );
1051 res = current_res;
1052 if current_sz > 0 {
1053 self.splice_pipe.as_mut().unwrap().out_pipe_pending -= current_sz;
1054 debug_assert_eq!(
1055 self.splice_out_pending(),
1056 pending - current_sz,
1057 "out_pipe_pending must shrink by exactly the drained bytes"
1058 );
1059 }
1060 sz += current_sz;
1061
1062 if current_sz == 0 && res == SocketResult::Continue {
1063 self.frontend_status = match self.frontend_status {
1064 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
1065 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1066 s => s,
1067 };
1068 }
1069
1070 if !self.check_connections() {
1071 metrics.bout += sz;
1072 count!(names::backend::BYTES_OUT, sz as i64);
1073 self.reset_readiness_for_close();
1074 self.log_request_success(metrics);
1075 return SessionResult::Close;
1076 }
1077 }
1078
1079 if sz > 0 {
1080 count!(names::backend::BYTES_OUT, sz as i64);
1081 self.backend_readiness.interest.insert(Ready::READABLE);
1082 metrics.bout += sz;
1083 }
1084
1085 debug!(
1086 "{} Spliced {} bytes (out_pipe_pending={})",
1087 log_context!(self),
1088 sz,
1089 self.splice_out_pending()
1090 );
1091
1092 match res {
1093 SocketResult::Error => {
1094 self.reset_readiness_for_close();
1095 self.log_request_error(metrics, "splice front socket write error");
1096 return SessionResult::Close;
1097 }
1098 SocketResult::Closed => {
1099 self.reset_readiness_for_close();
1100 self.log_request_success(metrics);
1101 return SessionResult::Close;
1102 }
1103 SocketResult::WouldBlock => {
1104 self.frontend_readiness.event.remove(Ready::WRITABLE);
1105 }
1106 SocketResult::Continue => {}
1107 }
1108
1109 SessionResult::Continue
1110 }
1111
1112 #[cfg(all(target_os = "linux", feature = "splice"))]
1117 fn splice_backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1118 trace!("{} pipe splice_backend_writable", log_context!(self));
1119
1120 if self.splice_in_pending() == 0 {
1121 self.frontend_readiness.interest.insert(Ready::READABLE);
1122 self.backend_readiness.interest.remove(Ready::WRITABLE);
1123 return SessionResult::Continue;
1124 }
1125
1126 let output_size = self.splice_in_pending();
1127 let mut sz = 0usize;
1128 let mut socket_res = SocketResult::Continue;
1129
1130 while socket_res == SocketResult::Continue {
1131 let pending = self.splice_in_pending();
1132 if pending == 0 {
1134 self.frontend_readiness.interest.insert(Ready::READABLE);
1135 self.backend_readiness.interest.remove(Ready::WRITABLE);
1136 count!(names::backend::BACK_BYTES_OUT, sz as i64);
1137 metrics.backend_bout += sz;
1138 return SessionResult::Continue;
1139 }
1140
1141 let pipe_read_end = self.splice_pipe.as_ref().unwrap().in_pipe[0];
1142 let (current_sz, current_res) = match self.backend_socket.as_ref() {
1143 Some(b) => splice::splice_out(pipe_read_end, b, pending),
1144 None => break,
1145 };
1146 debug_assert!(
1148 current_sz <= pending,
1149 "splice_out drained {current_sz} bytes but only {pending} were pending (would underflow)"
1150 );
1151 socket_res = current_res;
1152 if current_sz > 0 {
1153 self.splice_pipe.as_mut().unwrap().in_pipe_pending -= current_sz;
1154 debug_assert_eq!(
1155 self.splice_in_pending(),
1156 pending - current_sz,
1157 "in_pipe_pending must shrink by exactly the drained bytes"
1158 );
1159 }
1160 sz += current_sz;
1161 debug_assert!(
1163 sz <= output_size,
1164 "cumulative splice drain ({sz}) exceeded the bytes pending at entry ({output_size})"
1165 );
1166
1167 if current_sz == 0 && current_res == SocketResult::Continue {
1168 self.backend_status = match self.backend_status {
1169 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
1170 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1171 s => s,
1172 };
1173 }
1174 }
1175
1176 count!(names::backend::BACK_BYTES_OUT, sz as i64);
1177 metrics.backend_bout += sz;
1178
1179 if !self.check_connections() {
1180 self.reset_readiness_for_close();
1181 self.log_request_success(metrics);
1182 return SessionResult::Close;
1183 }
1184
1185 debug!(
1186 "{} Spliced {} bytes of {}",
1187 log_context!(self),
1188 sz,
1189 output_size
1190 );
1191
1192 match socket_res {
1193 SocketResult::Error => {
1194 self.reset_readiness_for_close();
1195 self.log_request_error(metrics, "splice back socket write error");
1196 return SessionResult::Close;
1197 }
1198 SocketResult::Closed => {
1199 self.reset_readiness_for_close();
1200 self.log_request_success(metrics);
1201 return SessionResult::Close;
1202 }
1203 SocketResult::WouldBlock => {
1204 self.backend_readiness.event.remove(Ready::WRITABLE);
1205 }
1206 SocketResult::Continue => {}
1207 }
1208 SessionResult::Continue
1209 }
1210
1211 #[cfg(all(target_os = "linux", feature = "splice"))]
1217 fn splice_backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1218 self.reset_timeouts();
1219
1220 trace!("{} Pipe splice_backend_readable", log_context!(self));
1221 let capacity = self.splice_capacity();
1222 if self.splice_out_pending() >= capacity {
1223 self.backend_readiness.interest.remove(Ready::READABLE);
1225 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1226 return SessionResult::Continue;
1227 }
1228
1229 let pending_before = self.splice_out_pending();
1230 let backend_bin_before = metrics.backend_bin;
1231 let pipe_write_end = self.splice_pipe.as_ref().unwrap().out_pipe[1];
1232 let (size, remaining) = match self.backend_socket.as_ref() {
1233 Some(b) => splice::splice_in(b, pipe_write_end, capacity),
1234 None => return SessionResult::Continue,
1235 };
1236 debug_assert!(
1243 size <= capacity,
1244 "splice_in reported {size} bytes but was capped at len {capacity}"
1245 );
1246
1247 debug!("{} Spliced {} bytes from backend", log_context!(self), size);
1248
1249 if remaining != SocketResult::Continue || size == 0 {
1250 self.backend_readiness.event.remove(Ready::READABLE);
1251 }
1252 if size > 0 {
1253 self.splice_pipe.as_mut().unwrap().out_pipe_pending += size;
1254 debug_assert_eq!(
1255 self.splice_out_pending(),
1256 pending_before + size,
1257 "out_pipe_pending must grow by exactly the spliced bytes"
1258 );
1259 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1260 count!(names::backend::BACK_BYTES_IN, size as i64);
1261 metrics.backend_bin += size;
1262 debug_assert_eq!(
1263 metrics.backend_bin,
1264 backend_bin_before + size,
1265 "metrics.backend_bin must advance by exactly the spliced bytes"
1266 );
1267 }
1268
1269 if size == 0 && remaining == SocketResult::Closed {
1270 self.backend_status = match self.backend_status {
1271 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
1272 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
1273 s => s,
1274 };
1275
1276 if !self.check_connections() {
1277 self.reset_readiness_for_close();
1278 self.log_request_success(metrics);
1279 return SessionResult::Close;
1280 }
1281 }
1282
1283 match remaining {
1284 SocketResult::Error => {
1285 self.reset_readiness_for_close();
1286 self.log_request_error(metrics, "splice back socket read error");
1287 return SessionResult::Close;
1288 }
1289 SocketResult::Closed => {
1290 if !self.check_connections() {
1291 self.reset_readiness_for_close();
1292 self.log_request_success(metrics);
1293 return SessionResult::Close;
1294 }
1295 }
1296 SocketResult::WouldBlock => {
1297 self.backend_readiness.event.remove(Ready::READABLE);
1298 }
1299 SocketResult::Continue => {}
1300 }
1301
1302 SessionResult::Continue
1303 }
1304
1305 pub fn log_context(&self) -> LogContext<'_> {
1306 LogContext {
1307 session_id: self.session_id,
1308 request_id: Some(self.request_id),
1309 cluster_id: self.cluster_id.as_deref(),
1310 backend_id: self.backend_id.as_deref(),
1311 }
1312 }
1313
1314 fn log_endpoint(&self) -> EndpointRecord<'_> {
1315 match &self.websocket_context {
1316 WebSocketContext::Http {
1317 method,
1318 authority,
1319 path,
1320 status,
1321 reason,
1322 } => EndpointRecord::Http {
1323 method: method.as_deref(),
1324 authority: authority.as_deref(),
1325 path: path.as_deref(),
1326 status: status.to_owned(),
1327 reason: reason.as_deref(),
1328 },
1329 WebSocketContext::Tcp => EndpointRecord::Tcp,
1330 }
1331 }
1332}
1333
1334impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
1335 fn ready(
1336 &mut self,
1337 _session: Rc<RefCell<dyn crate::ProxySession>>,
1338 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
1339 metrics: &mut SessionMetrics,
1340 ) -> SessionResult {
1341 let mut counter = 0;
1342
1343 if self.frontend_readiness.event.is_hup() {
1344 return SessionResult::Close;
1345 }
1346
1347 while counter < MAX_LOOP_ITERATIONS {
1348 let frontend_interest = self.frontend_readiness.filter_interest();
1349 let backend_interest = self.backend_readiness.filter_interest();
1350
1351 trace!(
1352 "{} Frontend interest({:?}), backend interest({:?})",
1353 log_context!(self),
1354 frontend_interest,
1355 backend_interest
1356 );
1357 if frontend_interest.is_empty() && backend_interest.is_empty() {
1358 break;
1359 }
1360
1361 if self.backend_readiness.event.is_hup()
1362 && self.frontend_readiness.interest.is_writable()
1363 && !self.frontend_readiness.event.is_writable()
1364 {
1365 break;
1366 }
1367
1368 if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
1369 return SessionResult::Close;
1370 }
1371
1372 if backend_interest.is_writable()
1373 && self.backend_writable(metrics) == SessionResult::Close
1374 {
1375 return SessionResult::Close;
1376 }
1377
1378 if backend_interest.is_readable()
1379 && self.backend_readable(metrics) == SessionResult::Close
1380 {
1381 return SessionResult::Close;
1382 }
1383
1384 if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
1385 return SessionResult::Close;
1386 }
1387
1388 if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
1389 return SessionResult::Close;
1390 }
1391
1392 if frontend_interest.is_error() {
1393 error!(
1394 "{} Frontend socket error, disconnecting",
1395 log_context!(self)
1396 );
1397
1398 self.frontend_readiness.interest = Ready::EMPTY;
1399 self.backend_readiness.interest = Ready::EMPTY;
1400
1401 return SessionResult::Close;
1402 }
1403
1404 if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
1405 self.frontend_readiness.interest = Ready::EMPTY;
1406 self.backend_readiness.interest = Ready::EMPTY;
1407
1408 error!("{} Backend socket error, disconnecting", log_context!(self));
1409 return SessionResult::Close;
1410 }
1411
1412 counter += 1;
1413 }
1414
1415 if counter >= MAX_LOOP_ITERATIONS {
1416 error!(
1417 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1418 log_context!(self),
1419 MAX_LOOP_ITERATIONS
1420 );
1421
1422 incr!(names::http::INFINITE_LOOP_ERROR);
1423 self.print_state(self.protocol_string());
1424
1425 return SessionResult::Close;
1426 }
1427
1428 SessionResult::Continue
1429 }
1430
1431 fn update_readiness(&mut self, token: Token, events: Ready) {
1432 if self.frontend_token == token {
1433 self.frontend_readiness.event |= events;
1434 } else if self.backend_token == Some(token) {
1435 self.backend_readiness.event |= events;
1436 }
1437 }
1438
1439 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1440 if self.frontend_token == token {
1442 self.log_request_timeout(metrics, "frontend socket timeout");
1443 if let Some(timeout) = self.container_frontend_timeout.as_mut() {
1444 timeout.triggered()
1445 }
1446 return StateResult::CloseSession;
1447 }
1448
1449 if self.backend_token == Some(token) {
1450 if let Some(timeout) = self.container_backend_timeout.as_mut() {
1452 timeout.triggered()
1453 }
1454
1455 self.log_request_timeout(metrics, "backend socket timeout");
1456 return StateResult::CloseSession;
1457 }
1458
1459 error!("{} Got timeout for an invalid token", log_context!(self));
1460 self.log_request_error(metrics, "invalid token timeout");
1461 StateResult::CloseSession
1462 }
1463
1464 fn cancel_timeouts(&mut self) {
1465 self.container_frontend_timeout.as_mut().map(|t| t.cancel());
1466 self.container_backend_timeout.as_mut().map(|t| t.cancel());
1467 }
1468
1469 fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1470 if let Some(backend) = self.backend.as_mut() {
1471 let mut backend = backend.borrow_mut();
1472 backend.active_requests = backend.active_requests.saturating_sub(1);
1473 }
1474 }
1475
1476 fn print_state(&self, context: &str) {
1477 error!(
1478 "\
1479{} {} Session(Pipe)
1480\tFrontend:
1481\t\ttoken: {:?}\treadiness: {:?}
1482\tBackend:
1483\t\ttoken: {:?}\treadiness: {:?}",
1484 log_context!(self),
1485 context,
1486 self.frontend_token,
1487 self.frontend_readiness,
1488 self.backend_token,
1489 self.backend_readiness
1490 );
1491 }
1492}