1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
10
11use mio::{Token, net::TcpStream};
12use rusty_ulid::Ulid;
13use sozu_command::{
14 config::MAX_LOOP_ITERATIONS,
15 logging::{EndpointRecord, LogContext, ansi_palette},
16};
17
18use crate::metrics::names;
19use crate::{
20 L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
21 backends::Backend,
22 pool::Checkout,
23 protocol::{SessionState, http::parser::Method},
24 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
25 sozu_command::ready::Ready,
26 timer::TimeoutContainer,
27};
28
29#[cfg(all(target_os = "linux", feature = "splice"))]
30use crate::splice::{self, SplicePipe};
31
32macro_rules! log_context {
39 ($self:expr) => {{
40 let (open, reset, grey, gray, white) = ansi_palette();
41 format!(
42 "{gray}{ctx}{reset}\t{open}PIPE{reset}\t{grey}Session{reset}({gray}address{reset}={white}{address}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}frontend_status{reset}={white}{frontend_status:?}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_status{reset}={white}{backend_status:?}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
43 open = open,
44 reset = reset,
45 grey = grey,
46 gray = gray,
47 white = white,
48 ctx = $self.log_context(),
49 address = $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
50 frontend = $self.frontend_token.0,
51 frontend_readiness = $self.frontend_readiness,
52 frontend_status = $self.frontend_status,
53 backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
54 backend_status = $self.backend_status,
55 backend_readiness = $self.backend_readiness,
56 )
57 }};
58}
59
60#[derive(PartialEq, Eq)]
61pub enum SessionStatus {
62 Normal,
63 DefaultAnswer,
64}
65
66#[derive(Copy, Clone, Debug)]
67enum ConnectionStatus {
68 Normal,
69 ReadOpen,
70 WriteOpen,
71 Closed,
72}
73
74pub enum WebSocketContext {
76 Http {
77 method: Option<Method>,
78 authority: Option<String>,
79 path: Option<String>,
80 status: Option<u16>,
81 reason: Option<String>,
82 },
83 Tcp,
84}
85
86pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
87 backend_buffer: Checkout,
88 backend_id: Option<String>,
89 pub backend_readiness: Readiness,
90 backend_socket: Option<TcpStream>,
91 backend_status: ConnectionStatus,
92 backend_token: Option<Token>,
93 pub backend: Option<Rc<RefCell<Backend>>>,
94 cluster_id: Option<String>,
95 pub container_backend_timeout: Option<TimeoutContainer>,
96 pub container_frontend_timeout: Option<TimeoutContainer>,
97 frontend_buffer: Checkout,
98 pub frontend_readiness: Readiness,
99 frontend_status: ConnectionStatus,
100 frontend_token: Token,
101 frontend: Front,
102 listener: Rc<RefCell<L>>,
103 protocol: Protocol,
104 session_id: Ulid,
107 request_id: Ulid,
108 session_address: Option<SocketAddr>,
109 websocket_context: WebSocketContext,
110 tls_version: Option<&'static str>,
115 tls_cipher: Option<&'static str>,
116 tls_sni: Option<String>,
119 tls_alpn: Option<&'static str>,
120 #[cfg(all(target_os = "linux", feature = "splice"))]
125 splice_pipe: Option<SplicePipe>,
126}
127
128impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
129 #[allow(clippy::too_many_arguments)]
138 pub fn new(
139 backend_buffer: Checkout,
140 backend_id: Option<String>,
141 backend_socket: Option<TcpStream>,
142 backend: Option<Rc<RefCell<Backend>>>,
143 container_backend_timeout: Option<TimeoutContainer>,
144 container_frontend_timeout: Option<TimeoutContainer>,
145 cluster_id: Option<String>,
146 frontend_buffer: Checkout,
147 frontend_token: Token,
148 frontend: Front,
149 listener: Rc<RefCell<L>>,
150 protocol: Protocol,
151 session_id: Ulid,
152 request_id: Ulid,
153 session_address: Option<SocketAddr>,
154 websocket_context: WebSocketContext,
155 ) -> Pipe<Front, L> {
156 let frontend_status = ConnectionStatus::Normal;
157 let backend_status = if backend_socket.is_none() {
158 ConnectionStatus::Closed
159 } else {
160 ConnectionStatus::Normal
161 };
162
163 let session = Pipe {
164 backend_buffer,
165 backend_id,
166 backend_readiness: Readiness {
167 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
168 event: Ready::EMPTY,
169 },
170 backend_socket,
171 backend_status,
172 backend_token: None,
173 backend,
174 cluster_id,
175 container_backend_timeout,
176 container_frontend_timeout,
177 frontend_buffer,
178 frontend_readiness: Readiness {
179 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
180 event: Ready::EMPTY,
181 },
182 frontend_status,
183 frontend_token,
184 frontend,
185 listener,
186 protocol,
187 session_id,
188 request_id,
189 session_address,
190 websocket_context,
191 tls_version: None,
192 tls_cipher: None,
193 tls_sni: None,
194 tls_alpn: None,
195 #[cfg(all(target_os = "linux", feature = "splice"))]
196 splice_pipe: if protocol == Protocol::TCP {
197 SplicePipe::new()
198 } else {
199 None
200 },
201 };
202
203 trace!("{} created pipe", log_context!(session));
204 session
205 }
206
207 pub fn set_tls_metadata(
214 &mut self,
215 version: Option<&'static str>,
216 cipher: Option<&'static str>,
217 sni: Option<String>,
218 alpn: Option<&'static str>,
219 ) {
220 self.tls_version = version;
221 self.tls_cipher = cipher;
222 self.tls_sni = sni;
223 self.tls_alpn = alpn;
224 }
225
226 pub fn front_socket(&self) -> &TcpStream {
227 self.frontend.socket_ref()
228 }
229
230 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
231 self.frontend.socket_mut()
232 }
233
234 pub fn back_socket(&self) -> Option<&TcpStream> {
235 self.backend_socket.as_ref()
236 }
237
238 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
239 self.backend_socket.as_mut()
240 }
241
242 pub fn set_back_socket(&mut self, socket: TcpStream) {
243 self.backend_socket = Some(socket);
244 self.backend_status = ConnectionStatus::Normal;
245 }
246
247 pub fn back_token(&self) -> Vec<Token> {
248 self.backend_token.iter().cloned().collect()
249 }
250
251 fn reset_timeouts(&mut self) {
252 if let Some(t) = self.container_frontend_timeout.as_mut() {
253 if !t.reset() {
254 error!(
255 "{} Could not reset front timeout (pipe)",
256 log_context!(self)
257 );
258 }
259 }
260
261 if let Some(t) = self.container_backend_timeout.as_mut() {
262 if !t.reset() {
263 error!("{} Could not reset back timeout (pipe)", log_context!(self));
264 }
265 }
266 }
267
268 pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
269 self.cluster_id = cluster_id;
270 }
271
272 pub fn set_backend_id(&mut self, backend_id: Option<String>) {
273 self.backend_id = backend_id;
274 }
275
276 pub fn set_back_token(&mut self, token: Token) {
277 self.backend_token = Some(token);
278 }
279
280 pub fn get_session_address(&self) -> Option<SocketAddr> {
281 self.session_address
282 .or_else(|| self.frontend.socket_ref().peer_addr().ok())
283 }
284
285 pub fn get_backend_address(&self) -> Option<SocketAddr> {
286 self.backend_socket
287 .as_ref()
288 .and_then(|backend| backend.peer_addr().ok())
289 }
290
291 fn protocol_string(&self) -> &'static str {
292 match self.protocol {
293 Protocol::TCP => "TCP",
294 Protocol::HTTP => "WS",
295 Protocol::HTTPS => match self.frontend.protocol() {
296 TransportProtocol::Ssl2 => "WSS-SSL2",
297 TransportProtocol::Ssl3 => "WSS-SSL3",
298 TransportProtocol::Tls1_0 => "WSS-TLS1.0",
299 TransportProtocol::Tls1_1 => "WSS-TLS1.1",
300 TransportProtocol::Tls1_2 => "WSS-TLS1.2",
301 TransportProtocol::Tls1_3 => "WSS-TLS1.3",
302 _ => unreachable!(),
303 },
304 _ => unreachable!(),
305 }
306 }
307
308 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
309 let listener = self.listener.borrow();
310 let context = self.log_context();
311 let endpoint = self.log_endpoint();
312 metrics.register_end_of_session(&context);
313 log_access!(
314 error,
315 on_failure: { incr!(names::access_logs::UNSENT) },
316 message,
317 context,
318 session_address: self.get_session_address(),
319 backend_address: self.get_backend_address(),
320 protocol: self.protocol_string(),
321 endpoint,
322 tags: listener.get_tags(&listener.get_addr().to_string()),
323 client_rtt: socket_rtt(self.front_socket()),
324 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
325 service_time: metrics.service_time(),
326 response_time: metrics.backend_response_time(),
327 request_time: metrics.request_time(),
328 bytes_in: metrics.bin,
329 bytes_out: metrics.bout,
330 user_agent: None,
331 x_request_id: None,
332 tls_version: self.tls_version,
337 tls_cipher: self.tls_cipher,
338 tls_sni: self.tls_sni.as_deref(),
339 tls_alpn: self.tls_alpn,
340 xff_chain: None,
341 otel: None,
342 );
343 }
344
345 pub fn log_request_success(&self, metrics: &SessionMetrics) {
346 self.log_request(metrics, false, None);
347 }
348
349 pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
350 incr!(names::pipe::ERRORS);
351 error!(
352 "{} Could not process request properly got: {}",
353 log_context!(self),
354 message
355 );
356 self.print_state(self.protocol_string());
357 self.log_request(metrics, true, Some(message));
358 }
359
360 pub fn log_request_timeout(&self, metrics: &SessionMetrics, message: &str) {
367 debug!("{} pipe timeout: {}", log_context!(self), message);
368 self.log_request(metrics, true, Some(message));
369 }
370
371 #[cfg(all(target_os = "linux", feature = "splice"))]
376 fn splice_in_pending(&self) -> usize {
377 self.splice_pipe
378 .as_ref()
379 .map(|p| p.in_pipe_pending)
380 .unwrap_or(0)
381 }
382 #[cfg(not(all(target_os = "linux", feature = "splice")))]
383 fn splice_in_pending(&self) -> usize {
384 0
385 }
386
387 #[cfg(all(target_os = "linux", feature = "splice"))]
391 fn splice_out_pending(&self) -> usize {
392 self.splice_pipe
393 .as_ref()
394 .map(|p| p.out_pipe_pending)
395 .unwrap_or(0)
396 }
397 #[cfg(not(all(target_os = "linux", feature = "splice")))]
398 fn splice_out_pending(&self) -> usize {
399 0
400 }
401
402 #[cfg(all(target_os = "linux", feature = "splice"))]
406 fn splice_capacity(&self) -> usize {
407 self.splice_pipe.as_ref().map(|p| p.capacity).unwrap_or(0)
408 }
409
410 pub fn check_connections(&self) -> bool {
413 let request_is_inflight = self.frontend_buffer.available_data() > 0
414 || self.frontend_readiness.event.is_readable()
415 || self.splice_in_pending() > 0;
416 let response_is_inflight = self.backend_buffer.available_data() > 0
417 || self.backend_readiness.event.is_readable()
418 || self.splice_out_pending() > 0;
419 match (self.frontend_status, self.backend_status) {
420 (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
421 (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
422 (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
423 request_is_inflight || response_is_inflight
428 }
429 (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
430
431 (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
432 request_is_inflight || response_is_inflight
435 }
436 (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
437 (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
438 request_is_inflight || response_is_inflight
439 }
440 (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
441
442 (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
443 (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
444 (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
445 (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
446
447 (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
448 (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
449 (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
450 (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
451 }
452 }
453
454 pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
455 self.log_request_success(metrics);
456 self.frontend_status = ConnectionStatus::Closed;
457 SessionResult::Close
458 }
459
460 pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
461 self.backend_status = ConnectionStatus::Closed;
462 let pipe_has_data = self.splice_out_pending() > 0;
463 if self.backend_buffer.available_data() == 0 && !pipe_has_data {
464 if self.backend_readiness.event.is_readable() {
465 self.backend_readiness.interest.insert(Ready::READABLE);
466 debug!(
467 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
468 log_context!(self)
469 );
470 SessionResult::Continue
471 } else {
472 self.log_request_success(metrics);
473 SessionResult::Close
474 }
475 } else {
476 debug!(
477 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
478 log_context!(self)
479 );
480 self.frontend_readiness.interest.insert(Ready::WRITABLE);
481 if self.backend_readiness.event.is_readable() {
482 self.backend_readiness.interest.insert(Ready::READABLE);
483 }
484 SessionResult::Continue
485 }
486 }
487
488 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
490 #[cfg(all(target_os = "linux", feature = "splice"))]
491 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
492 return self.splice_readable(metrics);
493 }
494
495 self.reset_timeouts();
496
497 trace!("{} pipe readable", log_context!(self));
498 if self.frontend_buffer.available_space() == 0 {
499 self.frontend_readiness.interest.remove(Ready::READABLE);
500 self.backend_readiness.interest.insert(Ready::WRITABLE);
501 return SessionResult::Continue;
502 }
503
504 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
505 debug!("{} Read {} bytes", log_context!(self), sz);
506
507 if sz > 0 {
508 self.frontend_buffer.fill(sz);
510
511 count!(names::backend::BYTES_IN, sz as i64);
512 metrics.bin += sz;
513
514 if self.frontend_buffer.available_space() == 0 {
515 self.frontend_readiness.interest.remove(Ready::READABLE);
516 }
517 self.backend_readiness.interest.insert(Ready::WRITABLE);
518 } else {
519 self.frontend_readiness.event.remove(Ready::READABLE);
520
521 if res == SocketResult::Continue {
522 self.frontend_status = match self.frontend_status {
523 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
524 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
525 s => s,
526 };
527 }
528 }
529
530 if !self.check_connections() {
531 self.frontend_readiness.reset();
532 self.backend_readiness.reset();
533 self.log_request_success(metrics);
534 return SessionResult::Close;
535 }
536
537 match res {
538 SocketResult::Error => {
539 self.frontend_readiness.reset();
540 self.backend_readiness.reset();
541 self.log_request_error(metrics, "front socket read error");
542 return SessionResult::Close;
543 }
544 SocketResult::Closed => {
545 self.frontend_readiness.reset();
546 self.backend_readiness.reset();
547 self.log_request_success(metrics);
548 return SessionResult::Close;
549 }
550 SocketResult::WouldBlock => {
551 self.frontend_readiness.event.remove(Ready::READABLE);
552 }
553 SocketResult::Continue => {}
554 };
555
556 self.backend_readiness.interest.insert(Ready::WRITABLE);
557 SessionResult::Continue
558 }
559
560 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
562 #[cfg(all(target_os = "linux", feature = "splice"))]
563 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
564 return self.splice_writable(metrics);
565 }
566
567 trace!("{} Pipe writable", log_context!(self));
568 if self.backend_buffer.available_data() == 0 {
569 self.backend_readiness.interest.insert(Ready::READABLE);
570 self.frontend_readiness.interest.remove(Ready::WRITABLE);
571 return SessionResult::Continue;
572 }
573
574 let mut sz = 0usize;
575 let mut res = SocketResult::Continue;
576 while res == SocketResult::Continue {
577 if self.backend_buffer.available_data() == 0 {
579 count!(names::backend::BYTES_OUT, sz as i64);
580 metrics.bout += sz;
581 self.backend_readiness.interest.insert(Ready::READABLE);
582 self.frontend_readiness.interest.remove(Ready::WRITABLE);
583 return SessionResult::Continue;
584 }
585 let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
586 res = current_res;
587 self.backend_buffer.consume(current_sz);
588 sz += current_sz;
589
590 if current_sz == 0 && res == SocketResult::Continue {
591 self.frontend_status = match self.frontend_status {
592 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
593 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
594 s => s,
595 };
596 }
597
598 if !self.check_connections() {
599 metrics.bout += sz;
600 count!(names::backend::BYTES_OUT, sz as i64);
601 self.frontend_readiness.reset();
602 self.backend_readiness.reset();
603 self.log_request_success(metrics);
604 return SessionResult::Close;
605 }
606 }
607
608 if sz > 0 {
609 count!(names::backend::BYTES_OUT, sz as i64);
610 self.backend_readiness.interest.insert(Ready::READABLE);
611 metrics.bout += sz;
612 }
613
614 debug!(
615 "{} Wrote {} bytes of {}",
616 log_context!(self),
617 sz,
618 self.backend_buffer.available_data()
619 );
620
621 match res {
622 SocketResult::Error => {
623 self.frontend_readiness.reset();
624 self.backend_readiness.reset();
625 self.log_request_error(metrics, "front socket write error");
626 return SessionResult::Close;
627 }
628 SocketResult::Closed => {
629 self.frontend_readiness.reset();
630 self.backend_readiness.reset();
631 self.log_request_success(metrics);
632 return SessionResult::Close;
633 }
634 SocketResult::WouldBlock => {
635 self.frontend_readiness.event.remove(Ready::WRITABLE);
636 }
637 SocketResult::Continue => {}
638 }
639
640 SessionResult::Continue
641 }
642
643 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
645 #[cfg(all(target_os = "linux", feature = "splice"))]
646 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
647 return self.splice_backend_writable(metrics);
648 }
649
650 trace!("{} pipe back_writable", log_context!(self));
651
652 if self.frontend_buffer.available_data() == 0 {
653 self.frontend_readiness.interest.insert(Ready::READABLE);
654 self.backend_readiness.interest.remove(Ready::WRITABLE);
655 return SessionResult::Continue;
656 }
657
658 let output_size = self.frontend_buffer.available_data();
659
660 let mut sz = 0usize;
661 let mut socket_res = SocketResult::Continue;
662
663 if let Some(ref mut backend) = self.backend_socket {
664 while socket_res == SocketResult::Continue {
665 if self.frontend_buffer.available_data() == 0 {
667 self.frontend_readiness.interest.insert(Ready::READABLE);
668 self.backend_readiness.interest.remove(Ready::WRITABLE);
669 count!(names::backend::BACK_BYTES_OUT, sz as i64);
670 metrics.backend_bout += sz;
671 return SessionResult::Continue;
672 }
673
674 let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
675 socket_res = current_res;
676 self.frontend_buffer.consume(current_sz);
677 sz += current_sz;
678
679 if current_sz == 0 && current_res == SocketResult::Continue {
680 self.backend_status = match self.backend_status {
681 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
682 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
683 s => s,
684 };
685 }
686 }
687 }
688
689 count!(names::backend::BACK_BYTES_OUT, sz as i64);
690 metrics.backend_bout += sz;
691
692 if !self.check_connections() {
693 self.frontend_readiness.reset();
694 self.backend_readiness.reset();
695 self.log_request_success(metrics);
696 return SessionResult::Close;
697 }
698
699 debug!(
700 "{} Wrote {} bytes of {}",
701 log_context!(self),
702 sz,
703 output_size
704 );
705
706 match socket_res {
707 SocketResult::Error => {
708 self.frontend_readiness.reset();
709 self.backend_readiness.reset();
710 self.log_request_error(metrics, "back socket write error");
711 return SessionResult::Close;
712 }
713 SocketResult::Closed => {
714 self.frontend_readiness.reset();
715 self.backend_readiness.reset();
716 self.log_request_success(metrics);
717 return SessionResult::Close;
718 }
719 SocketResult::WouldBlock => {
720 self.backend_readiness.event.remove(Ready::WRITABLE);
721 }
722 SocketResult::Continue => {}
723 }
724 SessionResult::Continue
725 }
726
727 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
729 #[cfg(all(target_os = "linux", feature = "splice"))]
730 if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
731 return self.splice_backend_readable(metrics);
732 }
733
734 self.reset_timeouts();
735
736 trace!("{} Pipe backend_readable", log_context!(self));
737 if self.backend_buffer.available_space() == 0 {
738 self.backend_readiness.interest.remove(Ready::READABLE);
739 return SessionResult::Continue;
740 }
741
742 if let Some(ref mut backend) = self.backend_socket {
743 let (size, remaining) = backend.socket_read(self.backend_buffer.space());
744 self.backend_buffer.fill(size);
745
746 debug!("{} Read {} bytes", log_context!(self), size);
747
748 if remaining != SocketResult::Continue || size == 0 {
749 self.backend_readiness.event.remove(Ready::READABLE);
750 }
751 if size > 0 {
752 self.frontend_readiness.interest.insert(Ready::WRITABLE);
753 count!(names::backend::BACK_BYTES_IN, size as i64);
754 metrics.backend_bin += size;
755 }
756
757 if size == 0 && remaining == SocketResult::Closed {
758 self.backend_status = match self.backend_status {
759 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
760 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
761 s => s,
762 };
763
764 if !self.check_connections() {
765 self.frontend_readiness.reset();
766 self.backend_readiness.reset();
767 self.log_request_success(metrics);
768 return SessionResult::Close;
769 }
770 }
771
772 match remaining {
773 SocketResult::Error => {
774 self.frontend_readiness.reset();
775 self.backend_readiness.reset();
776 self.log_request_error(metrics, "back socket read error");
777 return SessionResult::Close;
778 }
779 SocketResult::Closed => {
780 if !self.check_connections() {
781 self.frontend_readiness.reset();
782 self.backend_readiness.reset();
783 self.log_request_success(metrics);
784 return SessionResult::Close;
785 }
786 }
787 SocketResult::WouldBlock => {
788 self.backend_readiness.event.remove(Ready::READABLE);
789 }
790 SocketResult::Continue => {}
791 }
792 }
793
794 SessionResult::Continue
795 }
796
797 #[cfg(all(target_os = "linux", feature = "splice"))]
805 fn splice_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
806 self.reset_timeouts();
807
808 trace!("{} pipe splice_readable", log_context!(self));
809 let capacity = self.splice_capacity();
810 if self.splice_in_pending() >= capacity {
811 self.frontend_readiness.interest.remove(Ready::READABLE);
813 self.backend_readiness.interest.insert(Ready::WRITABLE);
814 return SessionResult::Continue;
815 }
816
817 let pipe_write_end = self.splice_pipe.as_ref().unwrap().in_pipe[1];
818 let (sz, res) = splice::splice_in(self.frontend.socket_ref(), pipe_write_end, capacity);
819 debug!("{} Spliced {} bytes from frontend", log_context!(self), sz);
820
821 if sz > 0 {
822 self.splice_pipe.as_mut().unwrap().in_pipe_pending += sz;
823 count!(names::backend::BYTES_IN, sz as i64);
824 metrics.bin += sz;
825 self.backend_readiness.interest.insert(Ready::WRITABLE);
826 } else {
827 self.frontend_readiness.event.remove(Ready::READABLE);
828
829 if res == SocketResult::Continue {
830 self.frontend_status = match self.frontend_status {
831 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
832 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
833 s => s,
834 };
835 }
836 }
837
838 if !self.check_connections() {
839 self.frontend_readiness.reset();
840 self.backend_readiness.reset();
841 self.log_request_success(metrics);
842 return SessionResult::Close;
843 }
844
845 match res {
846 SocketResult::Error => {
847 self.frontend_readiness.reset();
848 self.backend_readiness.reset();
849 self.log_request_error(metrics, "splice front socket read error");
850 return SessionResult::Close;
851 }
852 SocketResult::Closed => {
853 self.frontend_readiness.reset();
854 self.backend_readiness.reset();
855 self.log_request_success(metrics);
856 return SessionResult::Close;
857 }
858 SocketResult::WouldBlock => {
859 self.frontend_readiness.event.remove(Ready::READABLE);
860 }
861 SocketResult::Continue => {}
862 }
863
864 self.backend_readiness.interest.insert(Ready::WRITABLE);
865 SessionResult::Continue
866 }
867
868 #[cfg(all(target_os = "linux", feature = "splice"))]
873 fn splice_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
874 trace!("{} Pipe splice_writable", log_context!(self));
875 if self.splice_out_pending() == 0 {
876 self.backend_readiness.interest.insert(Ready::READABLE);
877 self.frontend_readiness.interest.remove(Ready::WRITABLE);
878 return SessionResult::Continue;
879 }
880
881 let mut sz = 0usize;
882 let mut res = SocketResult::Continue;
883 while res == SocketResult::Continue {
884 let pending = self.splice_out_pending();
885 if pending == 0 {
887 count!(names::backend::BYTES_OUT, sz as i64);
888 metrics.bout += sz;
889 self.backend_readiness.interest.insert(Ready::READABLE);
890 self.frontend_readiness.interest.remove(Ready::WRITABLE);
891 return SessionResult::Continue;
892 }
893
894 let pipe_read_end = self.splice_pipe.as_ref().unwrap().out_pipe[0];
895 let (current_sz, current_res) =
896 splice::splice_out(pipe_read_end, self.frontend.socket_ref(), pending);
897 res = current_res;
898 if current_sz > 0 {
899 self.splice_pipe.as_mut().unwrap().out_pipe_pending -= current_sz;
900 }
901 sz += current_sz;
902
903 if current_sz == 0 && res == SocketResult::Continue {
904 self.frontend_status = match self.frontend_status {
905 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
906 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
907 s => s,
908 };
909 }
910
911 if !self.check_connections() {
912 metrics.bout += sz;
913 count!(names::backend::BYTES_OUT, sz as i64);
914 self.frontend_readiness.reset();
915 self.backend_readiness.reset();
916 self.log_request_success(metrics);
917 return SessionResult::Close;
918 }
919 }
920
921 if sz > 0 {
922 count!(names::backend::BYTES_OUT, sz as i64);
923 self.backend_readiness.interest.insert(Ready::READABLE);
924 metrics.bout += sz;
925 }
926
927 debug!(
928 "{} Spliced {} bytes (out_pipe_pending={})",
929 log_context!(self),
930 sz,
931 self.splice_out_pending()
932 );
933
934 match res {
935 SocketResult::Error => {
936 self.frontend_readiness.reset();
937 self.backend_readiness.reset();
938 self.log_request_error(metrics, "splice front socket write error");
939 return SessionResult::Close;
940 }
941 SocketResult::Closed => {
942 self.frontend_readiness.reset();
943 self.backend_readiness.reset();
944 self.log_request_success(metrics);
945 return SessionResult::Close;
946 }
947 SocketResult::WouldBlock => {
948 self.frontend_readiness.event.remove(Ready::WRITABLE);
949 }
950 SocketResult::Continue => {}
951 }
952
953 SessionResult::Continue
954 }
955
956 #[cfg(all(target_os = "linux", feature = "splice"))]
961 fn splice_backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
962 trace!("{} pipe splice_backend_writable", log_context!(self));
963
964 if self.splice_in_pending() == 0 {
965 self.frontend_readiness.interest.insert(Ready::READABLE);
966 self.backend_readiness.interest.remove(Ready::WRITABLE);
967 return SessionResult::Continue;
968 }
969
970 let output_size = self.splice_in_pending();
971 let mut sz = 0usize;
972 let mut socket_res = SocketResult::Continue;
973
974 while socket_res == SocketResult::Continue {
975 let pending = self.splice_in_pending();
976 if pending == 0 {
978 self.frontend_readiness.interest.insert(Ready::READABLE);
979 self.backend_readiness.interest.remove(Ready::WRITABLE);
980 count!(names::backend::BACK_BYTES_OUT, sz as i64);
981 metrics.backend_bout += sz;
982 return SessionResult::Continue;
983 }
984
985 let pipe_read_end = self.splice_pipe.as_ref().unwrap().in_pipe[0];
986 let (current_sz, current_res) = match self.backend_socket.as_ref() {
987 Some(b) => splice::splice_out(pipe_read_end, b, pending),
988 None => break,
989 };
990 socket_res = current_res;
991 if current_sz > 0 {
992 self.splice_pipe.as_mut().unwrap().in_pipe_pending -= current_sz;
993 }
994 sz += current_sz;
995
996 if current_sz == 0 && current_res == SocketResult::Continue {
997 self.backend_status = match self.backend_status {
998 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
999 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1000 s => s,
1001 };
1002 }
1003 }
1004
1005 count!(names::backend::BACK_BYTES_OUT, sz as i64);
1006 metrics.backend_bout += sz;
1007
1008 if !self.check_connections() {
1009 self.frontend_readiness.reset();
1010 self.backend_readiness.reset();
1011 self.log_request_success(metrics);
1012 return SessionResult::Close;
1013 }
1014
1015 debug!(
1016 "{} Spliced {} bytes of {}",
1017 log_context!(self),
1018 sz,
1019 output_size
1020 );
1021
1022 match socket_res {
1023 SocketResult::Error => {
1024 self.frontend_readiness.reset();
1025 self.backend_readiness.reset();
1026 self.log_request_error(metrics, "splice back socket write error");
1027 return SessionResult::Close;
1028 }
1029 SocketResult::Closed => {
1030 self.frontend_readiness.reset();
1031 self.backend_readiness.reset();
1032 self.log_request_success(metrics);
1033 return SessionResult::Close;
1034 }
1035 SocketResult::WouldBlock => {
1036 self.backend_readiness.event.remove(Ready::WRITABLE);
1037 }
1038 SocketResult::Continue => {}
1039 }
1040 SessionResult::Continue
1041 }
1042
1043 #[cfg(all(target_os = "linux", feature = "splice"))]
1049 fn splice_backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1050 self.reset_timeouts();
1051
1052 trace!("{} Pipe splice_backend_readable", log_context!(self));
1053 let capacity = self.splice_capacity();
1054 if self.splice_out_pending() >= capacity {
1055 self.backend_readiness.interest.remove(Ready::READABLE);
1057 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1058 return SessionResult::Continue;
1059 }
1060
1061 let pipe_write_end = self.splice_pipe.as_ref().unwrap().out_pipe[1];
1062 let (size, remaining) = match self.backend_socket.as_ref() {
1063 Some(b) => splice::splice_in(b, pipe_write_end, capacity),
1064 None => return SessionResult::Continue,
1065 };
1066
1067 debug!("{} Spliced {} bytes from backend", log_context!(self), size);
1068
1069 if remaining != SocketResult::Continue || size == 0 {
1070 self.backend_readiness.event.remove(Ready::READABLE);
1071 }
1072 if size > 0 {
1073 self.splice_pipe.as_mut().unwrap().out_pipe_pending += size;
1074 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1075 count!(names::backend::BACK_BYTES_IN, size as i64);
1076 metrics.backend_bin += size;
1077 }
1078
1079 if size == 0 && remaining == SocketResult::Closed {
1080 self.backend_status = match self.backend_status {
1081 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
1082 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
1083 s => s,
1084 };
1085
1086 if !self.check_connections() {
1087 self.frontend_readiness.reset();
1088 self.backend_readiness.reset();
1089 self.log_request_success(metrics);
1090 return SessionResult::Close;
1091 }
1092 }
1093
1094 match remaining {
1095 SocketResult::Error => {
1096 self.frontend_readiness.reset();
1097 self.backend_readiness.reset();
1098 self.log_request_error(metrics, "splice back socket read error");
1099 return SessionResult::Close;
1100 }
1101 SocketResult::Closed => {
1102 if !self.check_connections() {
1103 self.frontend_readiness.reset();
1104 self.backend_readiness.reset();
1105 self.log_request_success(metrics);
1106 return SessionResult::Close;
1107 }
1108 }
1109 SocketResult::WouldBlock => {
1110 self.backend_readiness.event.remove(Ready::READABLE);
1111 }
1112 SocketResult::Continue => {}
1113 }
1114
1115 SessionResult::Continue
1116 }
1117
1118 pub fn log_context(&self) -> LogContext<'_> {
1119 LogContext {
1120 session_id: self.session_id,
1121 request_id: Some(self.request_id),
1122 cluster_id: self.cluster_id.as_deref(),
1123 backend_id: self.backend_id.as_deref(),
1124 }
1125 }
1126
1127 fn log_endpoint(&self) -> EndpointRecord<'_> {
1128 match &self.websocket_context {
1129 WebSocketContext::Http {
1130 method,
1131 authority,
1132 path,
1133 status,
1134 reason,
1135 } => EndpointRecord::Http {
1136 method: method.as_deref(),
1137 authority: authority.as_deref(),
1138 path: path.as_deref(),
1139 status: status.to_owned(),
1140 reason: reason.as_deref(),
1141 },
1142 WebSocketContext::Tcp => EndpointRecord::Tcp,
1143 }
1144 }
1145}
1146
1147impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
1148 fn ready(
1149 &mut self,
1150 _session: Rc<RefCell<dyn crate::ProxySession>>,
1151 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
1152 metrics: &mut SessionMetrics,
1153 ) -> SessionResult {
1154 let mut counter = 0;
1155
1156 if self.frontend_readiness.event.is_hup() {
1157 return SessionResult::Close;
1158 }
1159
1160 while counter < MAX_LOOP_ITERATIONS {
1161 let frontend_interest = self.frontend_readiness.filter_interest();
1162 let backend_interest = self.backend_readiness.filter_interest();
1163
1164 trace!(
1165 "{} Frontend interest({:?}), backend interest({:?})",
1166 log_context!(self),
1167 frontend_interest,
1168 backend_interest
1169 );
1170 if frontend_interest.is_empty() && backend_interest.is_empty() {
1171 break;
1172 }
1173
1174 if self.backend_readiness.event.is_hup()
1175 && self.frontend_readiness.interest.is_writable()
1176 && !self.frontend_readiness.event.is_writable()
1177 {
1178 break;
1179 }
1180
1181 if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
1182 return SessionResult::Close;
1183 }
1184
1185 if backend_interest.is_writable()
1186 && self.backend_writable(metrics) == SessionResult::Close
1187 {
1188 return SessionResult::Close;
1189 }
1190
1191 if backend_interest.is_readable()
1192 && self.backend_readable(metrics) == SessionResult::Close
1193 {
1194 return SessionResult::Close;
1195 }
1196
1197 if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
1198 return SessionResult::Close;
1199 }
1200
1201 if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
1202 return SessionResult::Close;
1203 }
1204
1205 if frontend_interest.is_error() {
1206 error!(
1207 "{} Frontend socket error, disconnecting",
1208 log_context!(self)
1209 );
1210
1211 self.frontend_readiness.interest = Ready::EMPTY;
1212 self.backend_readiness.interest = Ready::EMPTY;
1213
1214 return SessionResult::Close;
1215 }
1216
1217 if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
1218 self.frontend_readiness.interest = Ready::EMPTY;
1219 self.backend_readiness.interest = Ready::EMPTY;
1220
1221 error!("{} Backend socket error, disconnecting", log_context!(self));
1222 return SessionResult::Close;
1223 }
1224
1225 counter += 1;
1226 }
1227
1228 if counter >= MAX_LOOP_ITERATIONS {
1229 error!(
1230 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1231 log_context!(self),
1232 MAX_LOOP_ITERATIONS
1233 );
1234
1235 incr!(names::http::INFINITE_LOOP_ERROR);
1236 self.print_state(self.protocol_string());
1237
1238 return SessionResult::Close;
1239 }
1240
1241 SessionResult::Continue
1242 }
1243
1244 fn update_readiness(&mut self, token: Token, events: Ready) {
1245 if self.frontend_token == token {
1246 self.frontend_readiness.event |= events;
1247 } else if self.backend_token == Some(token) {
1248 self.backend_readiness.event |= events;
1249 }
1250 }
1251
1252 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1253 if self.frontend_token == token {
1255 self.log_request_timeout(metrics, "frontend socket timeout");
1256 if let Some(timeout) = self.container_frontend_timeout.as_mut() {
1257 timeout.triggered()
1258 }
1259 return StateResult::CloseSession;
1260 }
1261
1262 if self.backend_token == Some(token) {
1263 if let Some(timeout) = self.container_backend_timeout.as_mut() {
1265 timeout.triggered()
1266 }
1267
1268 self.log_request_timeout(metrics, "backend socket timeout");
1269 return StateResult::CloseSession;
1270 }
1271
1272 error!("{} Got timeout for an invalid token", log_context!(self));
1273 self.log_request_error(metrics, "invalid token timeout");
1274 StateResult::CloseSession
1275 }
1276
1277 fn cancel_timeouts(&mut self) {
1278 self.container_frontend_timeout.as_mut().map(|t| t.cancel());
1279 self.container_backend_timeout.as_mut().map(|t| t.cancel());
1280 }
1281
1282 fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1283 if let Some(backend) = self.backend.as_mut() {
1284 let mut backend = backend.borrow_mut();
1285 backend.active_requests = backend.active_requests.saturating_sub(1);
1286 }
1287 }
1288
1289 fn print_state(&self, context: &str) {
1290 error!(
1291 "\
1292{} {} Session(Pipe)
1293\tFrontend:
1294\t\ttoken: {:?}\treadiness: {:?}
1295\tBackend:
1296\t\ttoken: {:?}\treadiness: {:?}",
1297 log_context!(self),
1298 context,
1299 self.frontend_token,
1300 self.frontend_readiness,
1301 self.backend_token,
1302 self.backend_readiness
1303 );
1304 }
1305}