1#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309#[cfg(test)]
310#[macro_use]
311extern crate quickcheck;
312
313#[macro_use]
314pub mod util;
315#[macro_use]
316pub mod metrics;
317
318pub mod backends;
319pub mod features;
320pub mod http;
321pub mod load_balancing;
322pub mod pool;
323pub mod protocol;
324pub mod retry;
325pub mod router;
326pub mod socket;
327pub mod timer;
328pub mod tls;
329
330#[cfg(feature = "splice")]
332mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340 cell::RefCell,
341 collections::{BTreeMap, HashMap},
342 fmt::{self, Display, Formatter},
343 net::SocketAddr,
344 rc::Rc,
345 str,
346 time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{net::TcpStream, Interest, Token};
352use protocol::http::{answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use tls::CertificateResolverError;
356
357use sozu_command::{
358 logging::{CachedTags, LogContext},
359 proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
360 ready::Ready,
361 state::ClusterId,
362 AsStr, ObjectKind,
363};
364
365use crate::{backends::BackendMap, router::Route};
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum Protocol {
370 HTTP,
371 HTTPS,
372 TCP,
373 HTTPListen,
374 HTTPSListen,
375 TCPListen,
376 Channel,
377 Metrics,
378 Timer,
379}
380
381pub trait ProxySession {
383 fn protocol(&self) -> Protocol;
388 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
392 fn update_readiness(&mut self, token: Token, events: Ready);
395 fn close(&mut self);
398 fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
401 fn last_event(&self) -> Instant;
403 fn print_session(&self);
405 fn frontend_token(&self) -> Token;
407 fn shutting_down(&mut self) -> SessionIsToBeClosed;
412}
413
414#[macro_export]
415macro_rules! branch {
416 (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
417 macro_rules! expect {
418 ($expected) => {$($then)*};
419 ($a:ident) => {$($else)*};
420 () => {$($else)*}
421 }
422 expect!($($value)?);
423 };
424 (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
425 macro_rules! expect {
426 ($expected) => {$($then)*};
427 }
428 expect!($($value)?);
429 };
430}
431
432#[macro_export]
433macro_rules! fallback {
434 ({} $($default:tt)*) => {
435 $($default)*
436 };
437 ({$($value:tt)+} $($default:tt)*) => {
438 $($value)+
439 };
440}
441
442#[macro_export]
443macro_rules! StateMachineBuilder {
444 (
445 ($d:tt)
446 $(#[$($state_macros:tt)*])*
447 enum $state_name:ident $(impl $trait:ident)? {
448 $($(#[$($variant_macros:tt)*])*
449 $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
450 }
451 ) => {
452 #[derive(Clone, Copy, Debug)]
454 pub enum StateMarker {
455 $($variant_name,)+
456 }
457
458 $(#[$($state_macros)*])*
459 pub enum $state_name {
460 $(
461 $(#[$($variant_macros)*])*
462 $variant_name($state$(,$($aux),+)?),
463 )+
464 FailedUpgrade(StateMarker),
466 }
467
468 macro_rules! _fn_impl {
469 ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
470 fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
471 match self {
472 $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
473 $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
474 }
475 }
476 };
477 }
478
479 impl $state_name {
480 fn marker(&self) -> StateMarker {
482 match self {
483 $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
484 $state_name::FailedUpgrade(marker) => *marker,
485 }
486 }
487 fn failed(&self) -> bool {
489 match self {
490 $state_name::FailedUpgrade(_) => true,
491 _ => false,
492 }
493 }
494 fn take(&mut self) -> $state_name {
498 let mut owned_state = $state_name::FailedUpgrade(self.marker());
499 std::mem::swap(&mut owned_state, self);
500 owned_state
501 }
502 _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
503 }
504
505 $crate::branch!{
506 if $($trait)? == SessionState {
507 impl SessionState for $state_name {
508 _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
509 _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
510 _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
511 _fn_impl!{cancel_timeouts(&mut, self)}
512 _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
513 _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
514 _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
515 }
516 } else {}
517 }
518 };
519 ($($tt:tt)+) => {
520 StateMachineBuilder!{($) $($tt)+}
521 }
522}
523
524pub trait ListenerHandler {
525 fn get_addr(&self) -> &SocketAddr;
526
527 fn get_tags(&self, key: &str) -> Option<&CachedTags>;
528
529 fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
530 self.get_tags(key).map(|tags| tags.concatenated.as_str())
531 }
532
533 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
534}
535
536#[derive(thiserror::Error, Debug)]
537pub enum FrontendFromRequestError {
538 #[error("Could not parse hostname from '{host}': {error}")]
539 HostParse { host: String, error: String },
540 #[error("invalid remaining chars after hostname. Host: {0}")]
541 InvalidCharsAfterHost(String),
542 #[error("no cluster: {0}")]
543 NoClusterFound(RouterError),
544}
545
546pub trait L7ListenerHandler {
547 fn get_sticky_name(&self) -> &str;
548
549 fn get_connect_timeout(&self) -> u32;
550
551 fn frontend_from_request(
553 &self,
554 host: &str,
555 uri: &str,
556 method: &Method,
557 ) -> Result<Route, FrontendFromRequestError>;
558}
559
560#[derive(Clone, Copy, Debug, PartialEq, Eq)]
561pub enum BackendConnectionStatus {
562 NotConnected,
563 Connecting(Instant),
564 Connected,
565}
566
567impl BackendConnectionStatus {
568 pub fn is_connecting(&self) -> bool {
569 matches!(self, BackendConnectionStatus::Connecting(_))
570 }
571}
572
573#[derive(Debug, PartialEq, Eq)]
574pub enum BackendConnectAction {
575 New,
576 Reuse,
577 Replace,
578}
579
580#[derive(thiserror::Error, Debug)]
581pub enum BackendConnectionError {
582 #[error("Not found: {0:?}")]
583 NotFound(ObjectKind),
584 #[error("Too many connections on cluster {0:?}")]
585 MaxConnectionRetries(Option<String>),
586 #[error("the sessions slab has reached maximum capacity")]
587 MaxSessionsMemory,
588 #[error("error from the backend: {0}")]
589 Backend(BackendError),
590 #[error("failed to retrieve the cluster: {0}")]
591 RetrieveClusterError(RetrieveClusterError),
592}
593
594#[derive(thiserror::Error, Debug)]
596pub enum RetrieveClusterError {
597 #[error("No method given")]
598 NoMethod,
599 #[error("No host given")]
600 NoHost,
601 #[error("No path given")]
602 NoPath,
603 #[error("unauthorized route")]
604 UnauthorizedRoute,
605 #[error("{0}")]
606 RetrieveFrontend(FrontendFromRequestError),
607}
608
609#[derive(Debug, PartialEq, Eq)]
611pub enum AcceptError {
612 IoError,
613 TooManySessions,
614 WouldBlock,
615 RegisterError,
616 WrongSocketAddress,
617 BufferCapacityReached,
618}
619
620#[derive(thiserror::Error, Debug)]
622pub enum ListenerError {
623 #[error("failed to handle certificate request, got a resolver error, {0}")]
624 Resolver(CertificateResolverError),
625 #[error("failed to parse pem, {0}")]
626 PemParse(String),
627 #[error("failed to parse template {0}: {1}")]
628 TemplateParse(u16, TemplateError),
629 #[error("failed to build rustls context, {0}")]
630 BuildRustls(String),
631 #[error("could not activate listener with address {address:?}: {error}")]
632 Activation { address: SocketAddr, error: String },
633 #[error("Could not register listener socket: {0}")]
634 SocketRegistration(std::io::Error),
635 #[error("could not add frontend: {0}")]
636 AddFrontend(RouterError),
637 #[error("could not remove frontend: {0}")]
638 RemoveFrontend(RouterError),
639}
640
641#[derive(thiserror::Error, Debug)]
643pub enum ProxyError {
644 #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
645 SoftStop {
646 proxy_protocol: String,
647 error: String,
648 },
649 #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
650 HardStop {
651 proxy_protocol: String,
652 error: String,
653 },
654 #[error("found no listener with address {0:?}")]
655 NoListenerFound(SocketAddr),
656 #[error("a listener is already present for this token")]
657 ListenerAlreadyPresent,
658 #[error("could not add listener: {0}")]
659 AddListener(ListenerError),
660 #[error("could not add cluster: {0}")]
661 AddCluster(ListenerError),
662 #[error("failed to activate listener with address {address:?}: {listener_error}")]
663 ListenerActivation {
664 address: SocketAddr,
665 listener_error: ListenerError,
666 },
667 #[error("can not add frontend {front:?}: {error}")]
668 WrongInputFrontend {
669 front: RequestHttpFrontend,
670 error: String,
671 },
672 #[error("could not add frontend: {0}")]
673 AddFrontend(ListenerError),
674 #[error("could not remove frontend: {0}")]
675 RemoveFrontend(ListenerError),
676 #[error("could not add certificate: {0}")]
677 AddCertificate(CertificateResolverError),
678 #[error("could not remove certificate: {0}")]
679 RemoveCertificate(CertificateResolverError),
680 #[error("could not replace certificate: {0}")]
681 ReplaceCertificate(CertificateResolverError),
682 #[error("wrong certificate fingerprint: {0}")]
683 WrongCertificateFingerprint(FromHexError),
684 #[error("this request is not supported by the proxy")]
685 UnsupportedMessage,
686 #[error("failed to acquire the lock, {0}")]
687 Lock(String),
688 #[error("could not bind to socket {0:?}: {1}")]
689 BindToSocket(SocketAddr, ServerBindError),
690 #[error("error registering socket of listener: {0}")]
691 RegisterListener(std::io::Error),
692 #[error("the listener is not activated")]
693 UnactivatedListener,
694}
695
696use self::server::ListenToken;
697pub trait ProxyConfiguration {
698 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
699 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
700 fn create_session(
701 &mut self,
702 socket: TcpStream,
703 token: ListenToken,
704 wait_time: Duration,
705 proxy: Rc<RefCell<Self>>,
706 ) -> Result<(), AcceptError>;
708}
709
710pub trait L7Proxy {
711 fn kind(&self) -> ListenerType;
712
713 fn register_socket(
714 &self,
715 socket: &mut TcpStream,
716 token: Token,
717 interest: Interest,
718 ) -> Result<(), std::io::Error>;
719
720 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
721
722 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
723
724 fn remove_session(&self, token: Token) -> bool;
727
728 fn backends(&self) -> Rc<RefCell<BackendMap>>;
729
730 fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
731}
732
733#[derive(Debug, PartialEq, Eq)]
734pub enum RequiredEvents {
735 FrontReadBackNone,
736 FrontWriteBackNone,
737 FrontReadWriteBackNone,
738 FrontNoneBackNone,
739 FrontReadBackRead,
740 FrontWriteBackRead,
741 FrontReadWriteBackRead,
742 FrontNoneBackRead,
743 FrontReadBackWrite,
744 FrontWriteBackWrite,
745 FrontReadWriteBackWrite,
746 FrontNoneBackWrite,
747 FrontReadBackReadWrite,
748 FrontWriteBackReadWrite,
749 FrontReadWriteBackReadWrite,
750 FrontNoneBackReadWrite,
751}
752
753impl RequiredEvents {
754 pub fn front_readable(&self) -> bool {
755 matches!(
756 *self,
757 RequiredEvents::FrontReadBackNone
758 | RequiredEvents::FrontReadWriteBackNone
759 | RequiredEvents::FrontReadBackRead
760 | RequiredEvents::FrontReadWriteBackRead
761 | RequiredEvents::FrontReadBackWrite
762 | RequiredEvents::FrontReadWriteBackWrite
763 | RequiredEvents::FrontReadBackReadWrite
764 | RequiredEvents::FrontReadWriteBackReadWrite
765 )
766 }
767
768 pub fn front_writable(&self) -> bool {
769 matches!(
770 *self,
771 RequiredEvents::FrontWriteBackNone
772 | RequiredEvents::FrontReadWriteBackNone
773 | RequiredEvents::FrontWriteBackRead
774 | RequiredEvents::FrontReadWriteBackRead
775 | RequiredEvents::FrontWriteBackWrite
776 | RequiredEvents::FrontReadWriteBackWrite
777 | RequiredEvents::FrontWriteBackReadWrite
778 | RequiredEvents::FrontReadWriteBackReadWrite
779 )
780 }
781
782 pub fn back_readable(&self) -> bool {
783 matches!(
784 *self,
785 RequiredEvents::FrontReadBackRead
786 | RequiredEvents::FrontWriteBackRead
787 | RequiredEvents::FrontReadWriteBackRead
788 | RequiredEvents::FrontNoneBackRead
789 | RequiredEvents::FrontReadBackReadWrite
790 | RequiredEvents::FrontWriteBackReadWrite
791 | RequiredEvents::FrontReadWriteBackReadWrite
792 | RequiredEvents::FrontNoneBackReadWrite
793 )
794 }
795
796 pub fn back_writable(&self) -> bool {
797 matches!(
798 *self,
799 RequiredEvents::FrontReadBackWrite
800 | RequiredEvents::FrontWriteBackWrite
801 | RequiredEvents::FrontReadWriteBackWrite
802 | RequiredEvents::FrontNoneBackWrite
803 | RequiredEvents::FrontReadBackReadWrite
804 | RequiredEvents::FrontWriteBackReadWrite
805 | RequiredEvents::FrontReadWriteBackReadWrite
806 | RequiredEvents::FrontNoneBackReadWrite
807 )
808 }
809}
810
811#[derive(Debug, PartialEq, Eq)]
813pub enum StateResult {
814 CloseBackend,
816 CloseSession,
818 ConnectBackend,
820 Continue,
822 Upgrade,
824}
825
826#[derive(Debug, Clone, Copy, PartialEq, Eq)]
828pub enum SessionResult {
829 Close,
831 Continue,
833 Upgrade,
835}
836
837#[derive(Debug, PartialEq, Eq)]
838pub enum SocketType {
839 Listener,
840 FrontClient,
841}
842
843type SessionIsToBeClosed = bool;
844
845#[derive(Clone)]
846pub struct Readiness {
847 pub event: Ready,
849 pub interest: Ready,
851}
852
853impl Display for Readiness {
854 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
855 let i = &mut [b'-'; 4];
856 let r = &mut [b'-'; 4];
857 let mixed = &mut [b'-'; 4];
858
859 display_ready(i, self.interest);
860 display_ready(r, self.event);
861 display_ready(mixed, self.interest & self.event);
862
863 write!(
864 f,
865 "I({:?})&R({:?})=M({:?})",
866 String::from_utf8_lossy(i),
867 String::from_utf8_lossy(r),
868 String::from_utf8_lossy(mixed)
869 )
870 }
871}
872
873impl Default for Readiness {
874 fn default() -> Self {
875 Self::new()
876 }
877}
878
879impl Readiness {
880 pub const fn new() -> Readiness {
881 Readiness {
882 event: Ready::EMPTY,
883 interest: Ready::EMPTY,
884 }
885 }
886
887 pub fn reset(&mut self) {
888 self.event = Ready::EMPTY;
889 self.interest = Ready::EMPTY;
890 }
891
892 pub fn filter_interest(&self) -> Ready {
894 self.event & self.interest
895 }
896}
897
898pub fn display_ready(s: &mut [u8], readiness: Ready) {
899 if readiness.is_readable() {
900 s[0] = b'R';
901 }
902 if readiness.is_writable() {
903 s[1] = b'W';
904 }
905 if readiness.is_error() {
906 s[2] = b'E';
907 }
908 if readiness.is_hup() {
909 s[3] = b'H';
910 }
911}
912
913pub fn ready_to_string(readiness: Ready) -> String {
914 let s = &mut [b'-'; 4];
915 display_ready(s, readiness);
916 String::from_utf8(s.to_vec()).unwrap()
917}
918
919impl fmt::Debug for Readiness {
920 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
921 let i = &mut [b'-'; 4];
922 let r = &mut [b'-'; 4];
923 let mixed = &mut [b'-'; 4];
924
925 display_ready(i, self.interest);
926 display_ready(r, self.event);
927 display_ready(mixed, self.interest & self.event);
928
929 write!(
930 f,
931 "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
932 str::from_utf8(i).unwrap(),
933 str::from_utf8(r).unwrap(),
934 str::from_utf8(mixed).unwrap()
935 )
936 }
937}
938
939#[derive(Clone, Debug)]
940pub struct SessionMetrics {
941 pub start: Option<Instant>,
943 pub service_time: Duration,
945 pub wait_time: Duration,
947 pub bin: usize,
949 pub bout: usize,
951
952 pub service_start: Option<Instant>,
954 pub wait_start: Instant,
955
956 pub backend_id: Option<String>,
957 pub backend_start: Option<Instant>,
958 pub backend_connected: Option<Instant>,
959 pub backend_stop: Option<Instant>,
960 pub backend_bin: usize,
961 pub backend_bout: usize,
962}
963
964impl SessionMetrics {
965 pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
966 SessionMetrics {
967 start: Some(Instant::now()),
968 service_time: Duration::from_secs(0),
969 wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
970 bin: 0,
971 bout: 0,
972 service_start: None,
973 wait_start: Instant::now(),
974 backend_id: None,
975 backend_start: None,
976 backend_connected: None,
977 backend_stop: None,
978 backend_bin: 0,
979 backend_bout: 0,
980 }
981 }
982
983 pub fn reset(&mut self) {
984 self.start = None;
985 self.service_time = Duration::from_secs(0);
986 self.wait_time = Duration::from_secs(0);
987 self.bin = 0;
988 self.bout = 0;
989 self.service_start = None;
990 self.backend_start = None;
991 self.backend_connected = None;
992 self.backend_stop = None;
993 self.backend_bin = 0;
994 self.backend_bout = 0;
995 }
996
997 pub fn service_start(&mut self) {
998 let now = Instant::now();
999
1000 if self.start.is_none() {
1001 self.start = Some(now);
1002 }
1003
1004 self.service_start = Some(now);
1005 self.wait_time += now - self.wait_start;
1006 }
1007
1008 pub fn service_stop(&mut self) {
1009 if let Some(start) = self.service_start.take() {
1010 let duration = Instant::now() - start;
1011 self.service_time += duration;
1012 }
1013 }
1014
1015 pub fn wait_start(&mut self) {
1016 self.wait_start = Instant::now();
1017 }
1018
1019 pub fn service_time(&self) -> Duration {
1020 match self.service_start {
1021 Some(start) => {
1022 let last_duration = Instant::now() - start;
1023 self.service_time + last_duration
1024 }
1025 None => self.service_time,
1026 }
1027 }
1028
1029 pub fn request_time(&self) -> Duration {
1031 match self.start {
1032 Some(start) => Instant::now() - start,
1033 None => Duration::from_secs(0),
1034 }
1035 }
1036
1037 pub fn backend_start(&mut self) {
1038 self.backend_start = Some(Instant::now());
1039 }
1040
1041 pub fn backend_connected(&mut self) {
1042 self.backend_connected = Some(Instant::now());
1043 }
1044
1045 pub fn backend_stop(&mut self) {
1046 self.backend_stop = Some(Instant::now());
1047 }
1048
1049 pub fn backend_response_time(&self) -> Option<Duration> {
1050 match (self.backend_connected, self.backend_stop) {
1051 (Some(start), Some(end)) => Some(end - start),
1052 (Some(start), None) => Some(Instant::now() - start),
1053 _ => None,
1054 }
1055 }
1056
1057 pub fn backend_connection_time(&self) -> Option<Duration> {
1058 match (self.backend_start, self.backend_connected) {
1059 (Some(start), Some(end)) => Some(end - start),
1060 _ => None,
1061 }
1062 }
1063
1064 pub fn register_end_of_session(&self, context: &LogContext) {
1065 let request_time = self.request_time();
1066 let service_time = self.service_time();
1067
1068 if let Some(cluster_id) = context.cluster_id {
1069 time!("request_time", cluster_id, request_time.as_millis());
1070 time!("service_time", cluster_id, service_time.as_millis());
1071 }
1072 time!("request_time", request_time.as_millis());
1073 time!("service_time", service_time.as_millis());
1074
1075 if let Some(backend_id) = self.backend_id.as_ref() {
1076 if let Some(backend_response_time) = self.backend_response_time() {
1077 record_backend_metrics!(
1078 context.cluster_id.as_str_or("-"),
1079 backend_id,
1080 backend_response_time.as_millis(),
1081 self.backend_connection_time(),
1082 self.backend_bin,
1083 self.backend_bout
1084 );
1085 }
1086 }
1087
1088 incr!("access_logs.count", context.cluster_id, context.backend_id);
1089 }
1090}
1091
1092#[derive(Debug, PartialEq, Clone)]
1096pub struct PeakEWMA {
1097 pub decay: f64,
1101 pub rtt: f64,
1106 pub last_event: Instant,
1108}
1109
1110impl Default for PeakEWMA {
1111 fn default() -> Self {
1112 Self::new()
1113 }
1114}
1115
1116impl PeakEWMA {
1117 pub fn new() -> Self {
1119 PeakEWMA {
1120 decay: 1_000_000_000f64,
1122 rtt: 50_000_000f64,
1124 last_event: Instant::now(),
1125 }
1126 }
1127
1128 pub fn observe(&mut self, rtt: f64) {
1129 let now = Instant::now();
1130 let dur = now - self.last_event;
1131
1132 if rtt > self.rtt {
1134 self.rtt = rtt;
1135 } else {
1136 let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
1138 self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1139 }
1140
1141 self.last_event = now;
1142 }
1143
1144 pub fn get(&mut self, active_requests: usize) -> f64 {
1145 self.observe(0.0);
1148
1149 (active_requests + 1) as f64 * self.rtt
1150 }
1151}
1152
1153pub mod testing {
1154 pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1155
1156 pub use anyhow::Context;
1157 pub use mio::{net::UnixStream, Poll, Registry, Token};
1158 pub use slab::Slab;
1159 pub use sozu_command::{
1160 proto::command::{
1161 HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1162 },
1163 scm_socket::{Listeners, ScmSocket},
1164 };
1165
1166 pub use crate::{
1167 backends::BackendMap,
1168 http::HttpProxy,
1169 https::HttpsProxy,
1170 pool::Pool,
1171 server::Server,
1172 server::{ListenSession, ProxyChannel, SessionManager},
1173 tcp::TcpProxy,
1174 Protocol, ProxySession,
1175 };
1176
1177 pub struct ServerParts {
1179 pub event_loop: Poll,
1180 pub registry: Registry,
1181 pub sessions: Rc<RefCell<SessionManager>>,
1182 pub pool: Rc<RefCell<Pool>>,
1183 pub backends: Rc<RefCell<BackendMap>>,
1184 pub client_scm_socket: ScmSocket,
1185 pub server_scm_socket: ScmSocket,
1186 pub server_config: ServerConfig,
1187 }
1188
1189 pub fn prebuild_server(
1191 max_buffers: usize,
1192 buffer_size: usize,
1193 send_scm: bool,
1194 ) -> anyhow::Result<ServerParts> {
1195 let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1196 let backends = Rc::new(RefCell::new(BackendMap::new()));
1197 let server_config = ServerConfig {
1198 max_connections: max_buffers as u64,
1199 ..Default::default()
1200 };
1201
1202 let pool = Rc::new(RefCell::new(Pool::with_capacity(
1203 1,
1204 max_buffers,
1205 buffer_size,
1206 )));
1207
1208 let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1209 {
1210 let entry = sessions.vacant_entry();
1211 info!("taking token {:?} for channel", entry.key());
1212 entry.insert(Rc::new(RefCell::new(ListenSession {
1213 protocol: Protocol::Channel,
1214 })));
1215 }
1216 {
1217 let entry = sessions.vacant_entry();
1218 info!("taking token {:?} for timer", entry.key());
1219 entry.insert(Rc::new(RefCell::new(ListenSession {
1220 protocol: Protocol::Timer,
1221 })));
1222 }
1223 {
1224 let entry = sessions.vacant_entry();
1225 info!("taking token {:?} for metrics", entry.key());
1226 entry.insert(Rc::new(RefCell::new(ListenSession {
1227 protocol: Protocol::Metrics,
1228 })));
1229 }
1230 let sessions = SessionManager::new(sessions, max_buffers);
1231
1232 let registry = event_loop
1233 .registry()
1234 .try_clone()
1235 .with_context(|| "Failed at creating a registry")?;
1236
1237 let (scm_server, scm_client) =
1238 UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1239 let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1240 .with_context(|| "Failed at creating the scm client socket")?;
1241 let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1242 .with_context(|| "Failed at creating the scm server socket")?;
1243 if send_scm {
1244 client_scm_socket
1245 .send_listeners(&Listeners::default())
1246 .with_context(|| "Failed at sending empty listeners")?;
1247 }
1248
1249 Ok(ServerParts {
1250 event_loop,
1251 registry,
1252 sessions,
1253 pool,
1254 backends,
1255 client_scm_socket,
1256 server_scm_socket,
1257 server_config,
1258 })
1259 }
1260}