1use std::{
11 io::{ErrorKind, Read, Write},
12 net::SocketAddr,
13};
14
15use mio::net::{TcpListener, TcpStream};
16use rustls::{ProtocolVersion, ServerConnection};
17use rusty_ulid::Ulid;
18use socket2::{Domain, Protocol, Socket, Type};
19use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::ansi_palette};
20
21use crate::metrics::names;
22
23#[derive(thiserror::Error, Debug)]
24pub enum ServerBindError {
25 #[error("could not set bind to socket: {0}")]
26 BindError(std::io::Error),
27 #[error("could not listen on socket: {0}")]
28 Listen(std::io::Error),
29 #[error("could not set socket to nonblocking: {0}")]
30 SetNonBlocking(std::io::Error),
31 #[error("could not set reuse address: {0}")]
32 SetReuseAddress(std::io::Error),
33 #[error("could not set reuse address: {0}")]
34 SetReusePort(std::io::Error),
35 #[error("Could not create socket: {0}")]
36 SocketCreationError(std::io::Error),
37 #[error("Invalid socket address '{address}': {error}")]
38 InvalidSocketAddress { address: String, error: String },
39}
40
41#[derive(Debug, PartialEq, Eq, Copy, Clone)]
42pub enum SocketResult {
43 Continue,
44 Closed,
45 WouldBlock,
46 Error,
47}
48
49#[derive(Debug, PartialEq, Eq, Copy, Clone)]
50pub enum TransportProtocol {
51 Tcp,
52 Ssl2,
53 Ssl3,
54 Tls1_0,
55 Tls1_1,
56 Tls1_2,
57 Tls1_3,
58}
59
60pub trait SocketHandler {
61 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult);
62 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult);
63 fn socket_write_vectored(&mut self, _buf: &[std::io::IoSlice]) -> (usize, SocketResult);
64 fn socket_wants_write(&self) -> bool {
65 false
66 }
67 fn socket_close(&mut self) {}
68 fn socket_ref(&self) -> &TcpStream;
69 fn socket_mut(&mut self) -> &mut TcpStream;
70 fn protocol(&self) -> TransportProtocol;
71 fn read_error(&self);
72 fn write_error(&self);
73 fn session_ulid(&self) -> Option<Ulid> {
79 None
80 }
81}
82
83macro_rules! log_socket_context {
98 ($self:expr) => {{
99 let (open, reset, grey, gray, white) = ansi_palette();
100 let ulid = match $self.session_ulid() {
101 Some(ulid) => ulid.to_string(),
102 None => "-".to_string(),
103 };
104 let snapshot = crate::socket::stats::socket_snapshot($self.socket_ref());
105 let rtt = snapshot.as_ref().map(|s| s.rtt);
106 let state = snapshot.as_ref().map(|s| s.state);
107 format!(
108 "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}{protocol:?}{reset})\t >>>",
109 open = open,
110 reset = reset,
111 grey = grey,
112 gray = gray,
113 white = white,
114 ulid = ulid,
115 peer = $self.socket_ref().peer_addr().ok(),
116 local = $self.socket_ref().local_addr().ok(),
117 rtt = rtt,
118 state = state,
119 protocol = $self.protocol(),
120 )
121 }};
122}
123
124fn log_socket_module_prefix(
145 stream: &TcpStream,
146 session_ulid: Option<Ulid>,
147 configured_peer: Option<SocketAddr>,
148) -> String {
149 let (open, reset, grey, gray, white) = ansi_palette();
150 let ulid = match session_ulid {
151 Some(ulid) => ulid.to_string(),
152 None => "-".to_string(),
153 };
154 let snapshot = crate::socket::stats::socket_snapshot(stream);
155 let rtt = snapshot.as_ref().map(|s| s.rtt);
156 let state = snapshot.as_ref().map(|s| s.state);
157 format!(
158 "[{ulid} - - -]\t{open}SOCKET{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}local{reset}={white}{local:?}{reset}, {gray}rtt{reset}={white}{rtt:?}{reset}, {gray}state{reset}={white}{state:?}{reset}, {gray}protocol{reset}={white}Tcp{reset})\t >>>",
159 peer = configured_peer.or_else(|| stream.peer_addr().ok()),
160 local = stream.local_addr().ok(),
161 )
162}
163
164fn tcp_socket_read(
172 stream: &mut TcpStream,
173 buf: &mut [u8],
174 session_ulid: Option<Ulid>,
175 configured_peer: Option<SocketAddr>,
176) -> (usize, SocketResult) {
177 let mut size = 0usize;
178 let mut counter = 0;
179 loop {
180 counter += 1;
181 if counter > MAX_LOOP_ITERATIONS {
182 error!(
183 "{} MAX_LOOP_ITERATION reached in TcpStream::socket_read",
184 log_socket_module_prefix(stream, session_ulid, configured_peer)
185 );
186 incr!(names::socket::READ_INFINITE_LOOP_ERROR);
187 return (size, SocketResult::Error);
188 }
189 if size == buf.len() {
190 return (size, SocketResult::Continue);
191 }
192 match stream.read(&mut buf[size..]) {
193 Ok(0) => return (size, SocketResult::Closed),
194 Ok(sz) => size += sz,
195 Err(e) => match e.kind() {
196 ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
197 ErrorKind::ConnectionReset
204 | ErrorKind::ConnectionAborted
205 | ErrorKind::BrokenPipe
206 | ErrorKind::ConnectionRefused => return (size, SocketResult::Closed),
207 ErrorKind::HostUnreachable
212 | ErrorKind::NetworkUnreachable
213 | ErrorKind::TimedOut
214 | ErrorKind::NotConnected => {
215 warn!(
216 "{} socket_read error={:?}",
217 log_socket_module_prefix(stream, session_ulid, configured_peer),
218 e
219 );
220 return (size, SocketResult::Error);
221 }
222 _ => {
226 error!(
227 "{} socket_read error={:?}",
228 log_socket_module_prefix(stream, session_ulid, configured_peer),
229 e
230 );
231 return (size, SocketResult::Error);
232 }
233 },
234 }
235 }
236}
237
238fn tcp_socket_write(
239 stream: &mut TcpStream,
240 buf: &[u8],
241 session_ulid: Option<Ulid>,
242 configured_peer: Option<SocketAddr>,
243) -> (usize, SocketResult) {
244 let mut size = 0usize;
245 let mut counter = 0;
246 loop {
247 counter += 1;
248 if counter > MAX_LOOP_ITERATIONS {
249 error!(
250 "{} MAX_LOOP_ITERATION reached in TcpStream::socket_write",
251 log_socket_module_prefix(stream, session_ulid, configured_peer)
252 );
253 incr!(names::socket::WRITE_INFINITE_LOOP_ERROR);
254 return (size, SocketResult::Error);
255 }
256 if size == buf.len() {
257 return (size, SocketResult::Continue);
258 }
259 match stream.write(&buf[size..]) {
260 Ok(0) => return (size, SocketResult::Continue),
261 Ok(sz) => size += sz,
262 Err(e) => match e.kind() {
263 ErrorKind::WouldBlock => return (size, SocketResult::WouldBlock),
264 ErrorKind::ConnectionReset
265 | ErrorKind::ConnectionAborted
266 | ErrorKind::BrokenPipe
267 | ErrorKind::ConnectionRefused => {
268 incr!(names::tcp::WRITE_ERROR);
269 return (size, SocketResult::Closed);
270 }
271 ErrorKind::HostUnreachable
276 | ErrorKind::NetworkUnreachable
277 | ErrorKind::TimedOut
278 | ErrorKind::NotConnected => {
279 warn!(
280 "{} socket_write error={:?}",
281 log_socket_module_prefix(stream, session_ulid, configured_peer),
282 e
283 );
284 incr!(names::tcp::WRITE_ERROR);
285 return (size, SocketResult::Error);
286 }
287 _ => {
288 error!(
290 "{} socket_write error={:?}",
291 log_socket_module_prefix(stream, session_ulid, configured_peer),
292 e
293 );
294 incr!(names::tcp::WRITE_ERROR);
295 return (size, SocketResult::Error);
296 }
297 },
298 }
299 }
300}
301
302fn tcp_socket_write_vectored(
303 stream: &mut TcpStream,
304 bufs: &[std::io::IoSlice],
305 session_ulid: Option<Ulid>,
306 configured_peer: Option<SocketAddr>,
307) -> (usize, SocketResult) {
308 match stream.write_vectored(bufs) {
309 Ok(sz) => (sz, SocketResult::Continue),
310 Err(e) => match e.kind() {
311 ErrorKind::WouldBlock => (0, SocketResult::WouldBlock),
312 ErrorKind::ConnectionReset
313 | ErrorKind::ConnectionAborted
314 | ErrorKind::BrokenPipe
315 | ErrorKind::ConnectionRefused => {
316 incr!(names::tcp::WRITE_ERROR);
317 (0, SocketResult::Closed)
318 }
319 ErrorKind::HostUnreachable
322 | ErrorKind::NetworkUnreachable
323 | ErrorKind::TimedOut
324 | ErrorKind::NotConnected => {
325 warn!(
326 "{} socket_write error={:?}",
327 log_socket_module_prefix(stream, session_ulid, configured_peer),
328 e
329 );
330 incr!(names::tcp::WRITE_ERROR);
331 (0, SocketResult::Error)
332 }
333 _ => {
334 error!(
336 "{} socket_write error={:?}",
337 log_socket_module_prefix(stream, session_ulid, configured_peer),
338 e
339 );
340 incr!(names::tcp::WRITE_ERROR);
341 (0, SocketResult::Error)
342 }
343 },
344 }
345}
346
347impl SocketHandler for TcpStream {
348 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
349 tcp_socket_read(self, buf, None, None)
350 }
351
352 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
353 tcp_socket_write(self, buf, None, None)
354 }
355
356 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
357 tcp_socket_write_vectored(self, bufs, None, None)
358 }
359
360 fn socket_ref(&self) -> &TcpStream {
361 self
362 }
363
364 fn socket_mut(&mut self) -> &mut TcpStream {
365 self
366 }
367
368 fn protocol(&self) -> TransportProtocol {
369 TransportProtocol::Tcp
370 }
371
372 fn read_error(&self) {
373 incr!(names::tcp::READ_ERROR);
374 }
375
376 fn write_error(&self) {
377 incr!(names::tcp::WRITE_ERROR);
378 }
379}
380
381#[derive(Debug)]
390pub struct SessionTcpStream {
391 pub stream: TcpStream,
392 pub session_ulid: Ulid,
393 pub configured_peer: Option<SocketAddr>,
404}
405
406impl SessionTcpStream {
407 pub fn new(stream: TcpStream, session_ulid: Ulid, configured_peer: Option<SocketAddr>) -> Self {
408 Self {
409 stream,
410 session_ulid,
411 configured_peer,
412 }
413 }
414}
415
416impl SocketHandler for SessionTcpStream {
417 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
418 tcp_socket_read(
419 &mut self.stream,
420 buf,
421 Some(self.session_ulid),
422 self.configured_peer,
423 )
424 }
425
426 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
427 tcp_socket_write(
428 &mut self.stream,
429 buf,
430 Some(self.session_ulid),
431 self.configured_peer,
432 )
433 }
434
435 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
436 tcp_socket_write_vectored(
437 &mut self.stream,
438 bufs,
439 Some(self.session_ulid),
440 self.configured_peer,
441 )
442 }
443
444 fn socket_ref(&self) -> &TcpStream {
445 &self.stream
446 }
447
448 fn socket_mut(&mut self) -> &mut TcpStream {
449 &mut self.stream
450 }
451
452 fn protocol(&self) -> TransportProtocol {
453 TransportProtocol::Tcp
454 }
455
456 fn read_error(&self) {
457 incr!(names::tcp::READ_ERROR);
458 }
459
460 fn write_error(&self) {
461 incr!(names::tcp::WRITE_ERROR);
462 }
463
464 fn session_ulid(&self) -> Option<Ulid> {
465 Some(self.session_ulid)
466 }
467}
468
469pub struct FrontRustls {
470 pub stream: TcpStream,
471 pub session: ServerConnection,
472 pub peer_disconnected: bool,
476 pub peer_reset: bool,
479 pub session_ulid: Ulid,
482}
483
484impl std::fmt::Debug for FrontRustls {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 f.debug_struct("FrontRustls")
487 .field("stream", &self.stream)
488 .finish_non_exhaustive()
489 }
490}
491
492impl SocketHandler for FrontRustls {
493 fn socket_read(&mut self, buf: &mut [u8]) -> (usize, SocketResult) {
494 let mut size = 0usize;
495 let mut can_read = true;
496 let mut is_error = false;
497 let mut is_closed = false;
498
499 let mut counter = 0;
500 loop {
501 counter += 1;
502 if counter > MAX_LOOP_ITERATIONS {
503 error!(
504 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_read",
505 log_socket_context!(self)
506 );
507 incr!(names::rustls::READ_INFINITE_LOOP_ERROR);
508 is_error = true;
509 break;
510 }
511
512 if size == buf.len() {
513 break;
514 }
515
516 if !can_read | is_error | is_closed {
517 break;
518 }
519
520 match self.session.read_tls(&mut self.stream) {
521 Ok(0) => {
522 can_read = false;
526 is_closed = true;
527 self.peer_disconnected = true;
528 }
529 Ok(_sz) => {}
530 Err(e) => match e.kind() {
531 ErrorKind::WouldBlock => {
532 can_read = false;
533 }
534 ErrorKind::ConnectionReset
535 | ErrorKind::ConnectionAborted
536 | ErrorKind::BrokenPipe => {
537 is_closed = true;
543 self.peer_disconnected = true;
544 self.peer_reset = true;
545 }
546 ErrorKind::Other => {}
551 _ => {
552 error!(
553 "{} could not read TLS stream from socket: {:?}",
554 log_socket_context!(self),
555 e
556 );
557 is_error = true;
558 break;
559 }
560 },
561 }
562
563 if let Err(e) = self.session.process_new_packets() {
564 error!(
565 "{} could not process read TLS packets: {:?}",
566 log_socket_context!(self),
567 e
568 );
569 is_error = true;
570 break;
571 }
572
573 while !self.session.wants_read() {
574 match self.session.reader().read(&mut buf[size..]) {
575 Ok(0) => break,
576 Ok(sz) => {
577 size += sz;
578 }
579 Err(e) => match e.kind() {
580 ErrorKind::WouldBlock => {
581 break;
582 }
583 ErrorKind::ConnectionReset
584 | ErrorKind::ConnectionAborted
585 | ErrorKind::BrokenPipe => {
586 is_closed = true;
587 break;
588 }
589 _ => {
590 error!(
591 "{} could not read data from TLS stream: {:?}",
592 log_socket_context!(self),
593 e
594 );
595 is_error = true;
596 break;
597 }
598 },
599 }
600 }
601 }
602
603 if is_error {
604 (size, SocketResult::Error)
605 } else if is_closed {
606 (size, SocketResult::Closed)
607 } else if size == buf.len() {
608 (size, SocketResult::Continue)
613 } else if !can_read {
614 (size, SocketResult::WouldBlock)
615 } else {
616 (size, SocketResult::Continue)
617 }
618 }
619
620 fn socket_write(&mut self, buf: &[u8]) -> (usize, SocketResult) {
625 if self.peer_reset {
628 return (0, SocketResult::Closed);
629 }
630
631 let mut buffered_size = 0usize;
632 let mut can_write = true;
633 let mut is_error = false;
634 let mut is_closed = false;
635
636 let mut counter = 0;
637 loop {
638 counter += 1;
639 if counter > MAX_LOOP_ITERATIONS {
640 error!(
641 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write",
642 log_socket_context!(self)
643 );
644 incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
645 is_error = true;
646 break;
647 }
648 if buffered_size == buf.len() {
649 break;
650 }
651
652 if !can_write | is_error | is_closed {
653 break;
654 }
655
656 match self.session.writer().write(&buf[buffered_size..]) {
657 Ok(0) => {} Ok(sz) => {
659 buffered_size += sz;
660 }
661 Err(e) => match e.kind() {
662 ErrorKind::WouldBlock => {
663 }
666 ErrorKind::ConnectionReset
667 | ErrorKind::ConnectionAborted
668 | ErrorKind::BrokenPipe => {
669 incr!(names::rustls::WRITE_ERROR);
671 is_closed = true;
672 self.peer_reset = true;
673 break;
674 }
675 _ => {
676 error!(
677 "{} could not write data to TLS stream: {:?}",
678 log_socket_context!(self),
679 e
680 );
681 incr!(names::rustls::WRITE_ERROR);
682 is_error = true;
683 break;
684 }
685 },
686 }
687
688 loop {
689 match self.session.write_tls(&mut self.stream) {
690 Ok(0) => {
691 break;
693 }
694 Ok(_sz) => {}
695 Err(e) => match e.kind() {
696 ErrorKind::WouldBlock => {
697 can_write = false;
698 break;
699 }
700 ErrorKind::ConnectionReset
701 | ErrorKind::ConnectionAborted
702 | ErrorKind::BrokenPipe => {
703 incr!(names::rustls::WRITE_ERROR);
704 is_closed = true;
705 self.peer_reset = true;
706 break;
707 }
708 _ => {
709 error!(
710 "{} could not write TLS stream to socket: {:?}",
711 log_socket_context!(self),
712 e
713 );
714 incr!(names::rustls::WRITE_ERROR);
715 is_error = true;
716 break;
717 }
718 },
719 }
720 }
721 }
722
723 if !is_error && !is_closed && can_write && self.session.wants_write() {
729 loop {
730 match self.session.write_tls(&mut self.stream) {
731 Ok(0) => break,
732 Ok(_) => {}
733 Err(e) => match e.kind() {
734 ErrorKind::WouldBlock => {
735 can_write = false;
736 break;
737 }
738 ErrorKind::ConnectionReset
739 | ErrorKind::ConnectionAborted
740 | ErrorKind::BrokenPipe => {
741 incr!(names::rustls::WRITE_ERROR);
742 is_closed = true;
743 self.peer_reset = true;
744 break;
745 }
746 _ => {
747 error!(
748 "{} could not flush TLS stream to socket: {:?}",
749 log_socket_context!(self),
750 e
751 );
752 incr!(names::rustls::WRITE_ERROR);
753 is_error = true;
754 break;
755 }
756 },
757 }
758 }
759 }
760
761 if is_error {
762 (buffered_size, SocketResult::Error)
763 } else if is_closed {
764 (buffered_size, SocketResult::Closed)
765 } else if !can_write {
766 (buffered_size, SocketResult::WouldBlock)
767 } else {
768 (buffered_size, SocketResult::Continue)
769 }
770 }
771
772 fn socket_write_vectored(&mut self, bufs: &[std::io::IoSlice]) -> (usize, SocketResult) {
789 if self.peer_reset {
790 return (0, SocketResult::Closed);
791 }
792
793 let total_len: usize = bufs.iter().map(|b| b.len()).sum();
794 let mut buffered_size = 0usize;
795 let mut can_write = true;
796 let mut is_error = false;
797 let mut is_closed = false;
798
799 let mut counter = 0;
800 loop {
801 counter += 1;
802 if counter > MAX_LOOP_ITERATIONS {
803 error!(
804 "{} MAX_LOOP_ITERATION reached in FrontRustls::socket_write_vectored",
805 log_socket_context!(self)
806 );
807 incr!(names::rustls::WRITE_INFINITE_LOOP_ERROR);
808 is_error = true;
809 break;
810 }
811 if buffered_size == total_len {
812 break;
813 }
814
815 if !can_write | is_error | is_closed {
816 break;
817 }
818
819 if buffered_size == 0 {
824 match self.session.writer().write_vectored(bufs) {
825 Ok(0) => {}
826 Ok(sz) => {
827 buffered_size += sz;
828 }
829 Err(e) => match e.kind() {
830 ErrorKind::WouldBlock => {}
831 ErrorKind::ConnectionReset
832 | ErrorKind::ConnectionAborted
833 | ErrorKind::BrokenPipe => {
834 incr!(names::rustls::WRITE_ERROR);
835 is_closed = true;
836 self.peer_reset = true;
837 break;
838 }
839 _ => {
840 error!(
841 "{} could not write data to TLS stream: {:?}",
842 log_socket_context!(self),
843 e
844 );
845 incr!(names::rustls::WRITE_ERROR);
846 is_error = true;
847 break;
848 }
849 },
850 }
851 }
852
853 if buffered_size > 0 && buffered_size < total_len {
858 loop {
859 match self.session.write_tls(&mut self.stream) {
860 Ok(0) => break,
861 Ok(_) => {}
862 Err(e) => match e.kind() {
863 ErrorKind::WouldBlock => {
864 can_write = false;
865 break;
866 }
867 ErrorKind::ConnectionReset
868 | ErrorKind::ConnectionAborted
869 | ErrorKind::BrokenPipe => {
870 incr!(names::rustls::WRITE_ERROR);
871 is_closed = true;
872 self.peer_reset = true;
873 break;
874 }
875 _ => {
876 error!(
877 "{} could not write TLS stream to socket: {:?}",
878 log_socket_context!(self),
879 e
880 );
881 incr!(names::rustls::WRITE_ERROR);
882 is_error = true;
883 break;
884 }
885 },
886 }
887 }
888 break;
889 }
890
891 loop {
892 match self.session.write_tls(&mut self.stream) {
893 Ok(0) => {
894 break;
895 }
896 Ok(_sz) => {}
897 Err(e) => match e.kind() {
898 ErrorKind::WouldBlock => {
899 can_write = false;
900 break;
901 }
902 ErrorKind::ConnectionReset
903 | ErrorKind::ConnectionAborted
904 | ErrorKind::BrokenPipe => {
905 incr!(names::rustls::WRITE_ERROR);
906 is_closed = true;
907 self.peer_reset = true;
908 break;
909 }
910 _ => {
911 error!(
912 "{} could not write TLS stream to socket: {:?}",
913 log_socket_context!(self),
914 e
915 );
916 incr!(names::rustls::WRITE_ERROR);
917 is_error = true;
918 break;
919 }
920 },
921 }
922 }
923 }
924
925 if !is_error && !is_closed && can_write && self.session.wants_write() {
926 loop {
927 match self.session.write_tls(&mut self.stream) {
928 Ok(0) => break,
929 Ok(_) => {}
930 Err(e) => match e.kind() {
931 ErrorKind::WouldBlock => {
932 can_write = false;
933 break;
934 }
935 ErrorKind::ConnectionReset
936 | ErrorKind::ConnectionAborted
937 | ErrorKind::BrokenPipe => {
938 incr!(names::rustls::WRITE_ERROR);
939 is_closed = true;
940 self.peer_reset = true;
941 break;
942 }
943 _ => {
944 error!(
945 "{} could not flush TLS stream to socket: {:?}",
946 log_socket_context!(self),
947 e
948 );
949 incr!(names::rustls::WRITE_ERROR);
950 is_error = true;
951 break;
952 }
953 },
954 }
955 }
956 }
957
958 if is_error {
959 (buffered_size, SocketResult::Error)
960 } else if is_closed {
961 (buffered_size, SocketResult::Closed)
962 } else if !can_write {
963 (buffered_size, SocketResult::WouldBlock)
964 } else {
965 (buffered_size, SocketResult::Continue)
966 }
967 }
968
969 fn socket_close(&mut self) {
970 self.session.send_close_notify();
971 }
972
973 fn socket_wants_write(&self) -> bool {
974 !self.peer_reset && self.session.wants_write()
977 }
978
979 fn socket_ref(&self) -> &TcpStream {
980 &self.stream
981 }
982
983 fn socket_mut(&mut self) -> &mut TcpStream {
984 &mut self.stream
985 }
986
987 fn protocol(&self) -> TransportProtocol {
988 self.session
989 .protocol_version()
990 .map(|version| match version {
991 ProtocolVersion::SSLv2 => TransportProtocol::Ssl2,
992 ProtocolVersion::SSLv3 => TransportProtocol::Ssl3,
993 ProtocolVersion::TLSv1_0 => TransportProtocol::Tls1_0,
994 ProtocolVersion::TLSv1_1 => TransportProtocol::Tls1_1,
995 ProtocolVersion::TLSv1_2 => TransportProtocol::Tls1_2,
996 ProtocolVersion::TLSv1_3 => TransportProtocol::Tls1_3,
997 _ => TransportProtocol::Tls1_3,
998 })
999 .unwrap_or(TransportProtocol::Tcp)
1000 }
1001
1002 fn read_error(&self) {
1003 incr!(names::rustls::READ_ERROR);
1004 }
1005
1006 fn write_error(&self) {
1007 incr!(names::rustls::WRITE_ERROR);
1008 }
1009
1010 fn session_ulid(&self) -> Option<Ulid> {
1011 Some(self.session_ulid)
1012 }
1013}
1014
1015pub fn server_bind(addr: SocketAddr) -> Result<TcpListener, ServerBindError> {
1016 let sock = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))
1017 .map_err(ServerBindError::SocketCreationError)?;
1018
1019 if cfg!(unix) {
1021 sock.set_reuse_address(true)
1022 .map_err(ServerBindError::SetReuseAddress)?;
1023 }
1024
1025 sock.set_reuse_port(true)
1026 .map_err(ServerBindError::SetReusePort)?;
1027
1028 sock.bind(&addr.into())
1029 .map_err(ServerBindError::BindError)?;
1030
1031 sock.set_nonblocking(true)
1032 .map_err(ServerBindError::SetNonBlocking)?;
1033
1034 sock.listen(1024).map_err(ServerBindError::Listen)?;
1037
1038 Ok(TcpListener::from_std(sock.into()))
1039}
1040
1041pub mod stats {
1043 use std::{os::fd::AsRawFd, time::Duration};
1044
1045 use internal::{OPT_LEVEL, OPT_NAME, TcpInfo};
1046
1047 #[derive(Clone, Debug)]
1053 pub struct TcpSnapshot {
1054 pub rtt: Duration,
1055 pub state: &'static str,
1056 }
1057
1058 pub fn socket_rtt<A: AsRawFd>(socket: &A) -> Option<Duration> {
1062 socket_info(socket.as_raw_fd()).map(|info| Duration::from_micros(info.rtt() as u64))
1063 }
1064
1065 pub fn socket_snapshot<A: AsRawFd>(socket: &A) -> Option<TcpSnapshot> {
1072 socket_info(socket.as_raw_fd()).map(|info| TcpSnapshot {
1073 rtt: Duration::from_micros(info.rtt() as u64),
1074 state: info.state(),
1075 })
1076 }
1077
1078 #[cfg(unix)]
1079 pub fn socket_info(fd: libc::c_int) -> Option<TcpInfo> {
1080 let mut tcp_info: TcpInfo = unsafe { std::mem::zeroed() };
1084 let mut len = std::mem::size_of::<TcpInfo>() as libc::socklen_t;
1085 let status = unsafe {
1090 libc::getsockopt(
1091 fd,
1092 OPT_LEVEL,
1093 OPT_NAME,
1094 &mut tcp_info as *mut _ as *mut _,
1095 &mut len,
1096 )
1097 };
1098 if status != 0 { None } else { Some(tcp_info) }
1099 }
1100 #[cfg(not(unix))]
1101 pub fn socketinfo(fd: libc::c_int) -> Option<TcpInfo> {
1102 None
1103 }
1104
1105 #[cfg(unix)]
1106 #[cfg(not(any(target_os = "macos", target_os = "ios")))]
1107 mod internal {
1108 #[cfg(target_os = "linux")]
1109 pub const OPT_LEVEL: libc::c_int = libc::SOL_TCP;
1110
1111 #[cfg(any(
1112 target_os = "freebsd",
1113 target_os = "dragonfly",
1114 target_os = "openbsd",
1115 target_os = "netbsd"
1116 ))]
1117 pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1118
1119 pub const OPT_NAME: libc::c_int = libc::TCP_INFO;
1120
1121 #[derive(Clone, Debug)]
1122 #[repr(C)]
1123 pub struct TcpInfo {
1124 tcpi_state: u8,
1126 tcpi_ca_state: u8,
1127 tcpi_retransmits: u8,
1128 tcpi_probes: u8,
1129 tcpi_backoff: u8,
1130 tcpi_options: u8,
1131 tcpi_snd_rcv_wscale: u8, tcpi_rto: u32,
1134 tcpi_ato: u32,
1135 tcpi_snd_mss: u32,
1136 tcpi_rcv_mss: u32,
1137
1138 tcpi_unacked: u32,
1139 tcpi_sacked: u32,
1140 tcpi_lost: u32,
1141 tcpi_retrans: u32,
1142 tcpi_fackets: u32,
1143
1144 tcpi_last_data_sent: u32,
1146 tcpi_last_ack_sent: u32, tcpi_last_data_recv: u32,
1148 tcpi_last_ack_recv: u32,
1149
1150 tcpi_pmtu: u32,
1152 tcpi_rcv_ssthresh: u32,
1153 tcpi_rtt: u32,
1154 tcpi_rttvar: u32,
1155 tcpi_snd_ssthresh: u32,
1156 tcpi_snd_cwnd: u32,
1157 tcpi_advmss: u32,
1158 tcpi_reordering: u32,
1159 }
1160 impl TcpInfo {
1161 pub fn rtt(&self) -> u32 {
1162 self.tcpi_rtt
1163 }
1164
1165 pub fn state(&self) -> &'static str {
1171 match self.tcpi_state {
1172 1 => "ESTABLISHED",
1173 2 => "SYN_SENT",
1174 3 => "SYN_RECV",
1175 4 => "FIN_WAIT1",
1176 5 => "FIN_WAIT2",
1177 6 => "TIME_WAIT",
1178 7 => "CLOSE",
1179 8 => "CLOSE_WAIT",
1180 9 => "LAST_ACK",
1181 10 => "LISTEN",
1182 11 => "CLOSING",
1183 12 => "NEW_SYN_RECV",
1184 _ => "UNKNOWN",
1185 }
1186 }
1187 }
1188 }
1189
1190 #[cfg(unix)]
1191 #[cfg(any(target_os = "macos", target_os = "ios"))]
1192 mod internal {
1193 pub const OPT_LEVEL: libc::c_int = libc::IPPROTO_TCP;
1194 pub const OPT_NAME: libc::c_int = 0x106;
1195
1196 #[derive(Clone, Debug)]
1197 #[repr(C)]
1198 pub struct TcpInfo {
1199 tcpi_state: u8,
1200 tcpi_snd_wscale: u8,
1201 tcpi_rcv_wscale: u8,
1202 __pad1: u8,
1203 tcpi_options: u32,
1204 tcpi_flags: u32,
1205 tcpi_rto: u32,
1206 tcpi_maxseg: u32,
1207 tcpi_snd_ssthresh: u32,
1208 tcpi_snd_cwnd: u32,
1209 tcpi_snd_wnd: u32,
1210 tcpi_snd_sbbytes: u32,
1211 tcpi_rcv_wnd: u32,
1212 tcpi_rttcur: u32,
1213 tcpi_srtt: u32,
1214 tcpi_rttvar: u32,
1215 tcpi_tfo: u32,
1216 tcpi_txpackets: u64,
1217 tcpi_txbytes: u64,
1218 tcpi_txretransmitbytes: u64,
1219 tcpi_rxpackets: u64,
1220 tcpi_rxbytes: u64,
1221 tcpi_rxoutoforderbytes: u64,
1222 tcpi_txretransmitpackets: u64,
1223 }
1224 impl TcpInfo {
1225 pub fn rtt(&self) -> u32 {
1226 self.tcpi_srtt * 1000
1228 }
1229
1230 pub fn state(&self) -> &'static str {
1235 match self.tcpi_state {
1236 0 => "CLOSED",
1237 1 => "LISTEN",
1238 2 => "SYN_SENT",
1239 3 => "SYN_RECEIVED",
1240 4 => "ESTABLISHED",
1241 5 => "CLOSE_WAIT",
1242 6 => "FIN_WAIT_1",
1243 7 => "CLOSING",
1244 8 => "LAST_ACK",
1245 9 => "FIN_WAIT_2",
1246 10 => "TIME_WAIT",
1247 _ => "UNKNOWN",
1248 }
1249 }
1250 }
1251 }
1252
1253 #[cfg(not(unix))]
1254 #[derive(Clone, Debug)]
1255 struct TcpInfo {}
1256
1257 #[test]
1258 #[serial_test::serial]
1259 fn test_rtt() {
1260 let sock = std::net::TcpStream::connect("google.com:80").unwrap();
1261 let fd = sock.as_raw_fd();
1262 let info = socket_info(fd);
1263 assert!(info.is_some());
1264 println!("{info:#?}");
1265 println!(
1266 "rtt: {}",
1267 sozu_command::logging::LogDuration(socket_rtt(&sock))
1268 );
1269 }
1270}