1#[macro_use]
308extern crate sozu_command_lib as sozu_command;
309#[cfg(test)]
310#[macro_use]
311extern crate quickcheck;
312
313#[macro_use]
314pub mod util;
315#[macro_use]
316pub mod metrics;
317
318pub mod backends;
319pub mod features;
320pub mod http;
321pub mod load_balancing;
322pub mod pool;
323pub mod protocol;
324pub mod retry;
325pub mod router;
326pub mod socket;
327pub mod timer;
328pub mod tls;
329
330#[cfg(feature = "splice")]
332mod splice;
333
334pub mod server;
335pub mod tcp;
336
337pub mod https;
338
339use std::{
340 cell::RefCell,
341 collections::{BTreeMap, HashMap},
342 fmt::{self, Display, Formatter},
343 net::SocketAddr,
344 rc::Rc,
345 str,
346 time::{Duration, Instant},
347};
348
349use backends::BackendError;
350use hex::FromHexError;
351use mio::{Interest, Token, net::TcpStream};
352use protocol::http::{answers::TemplateError, parser::Method};
353use router::RouterError;
354use socket::ServerBindError;
355use sozu_command::{
356 AsStr, ObjectKind,
357 logging::{CachedTags, LogContext},
358 proto::command::{Cluster, ListenerType, RequestHttpFrontend, WorkerRequest, WorkerResponse},
359 ready::Ready,
360 state::ClusterId,
361};
362use tls::CertificateResolverError;
363
364use crate::{backends::BackendMap, router::Route};
365
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum Protocol {
369 HTTP,
370 HTTPS,
371 TCP,
372 HTTPListen,
373 HTTPSListen,
374 TCPListen,
375 Channel,
376 Metrics,
377 Timer,
378}
379
380pub trait ProxySession {
382 fn protocol(&self) -> Protocol;
387 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed;
391 fn update_readiness(&mut self, token: Token, events: Ready);
394 fn close(&mut self);
397 fn timeout(&mut self, t: Token) -> SessionIsToBeClosed;
400 fn last_event(&self) -> Instant;
402 fn print_session(&self);
404 fn frontend_token(&self) -> Token;
406 fn shutting_down(&mut self) -> SessionIsToBeClosed;
411}
412
413#[macro_export]
414macro_rules! branch {
415 (if $($value:ident)? == $expected:ident { $($then:tt)* } else { $($else:tt)* }) => {
416 macro_rules! expect {
417 ($expected) => {$($then)*};
418 ($a:ident) => {$($else)*};
419 () => {$($else)*}
420 }
421 expect!($($value)?);
422 };
423 (if $($value:ident)? == $expected:ident { $($then:tt)* } ) => {
424 macro_rules! expect {
425 ($expected) => {$($then)*};
426 }
427 expect!($($value)?);
428 };
429}
430
431#[macro_export]
432macro_rules! fallback {
433 ({} $($default:tt)*) => {
434 $($default)*
435 };
436 ({$($value:tt)+} $($default:tt)*) => {
437 $($value)+
438 };
439}
440
441#[macro_export]
442macro_rules! StateMachineBuilder {
443 (
444 ($d:tt)
445 $(#[$($state_macros:tt)*])*
446 enum $state_name:ident $(impl $trait:ident)? {
447 $($(#[$($variant_macros:tt)*])*
448 $variant_name:ident($state:ty$(,$($aux:ty),+)?) $(-> $override:expr)?),+ $(,)?
449 }
450 ) => {
451 #[derive(Clone, Copy, Debug)]
453 pub enum StateMarker {
454 $($variant_name,)+
455 }
456
457 $(#[$($state_macros)*])*
458 pub enum $state_name {
459 $(
460 $(#[$($variant_macros)*])*
461 $variant_name($state$(,$($aux),+)?),
462 )+
463 FailedUpgrade(StateMarker),
465 }
466
467 macro_rules! _fn_impl {
468 ($function:ident(&$d($mut:ident)?, self $d(,$arg_name:ident: $arg_type:ty)*) $d(-> $ret:ty)? $d(| $marker:tt => $fail:expr)?) => {
469 fn $function(&$d($mut)? self $d(,$arg_name: $arg_type)*) $d(-> $ret)? {
470 match self {
471 $($state_name::$variant_name(_state, ..) => $crate::fallback!({$($override)?} _state.$function($d($arg_name),*)),)+
472 $state_name::FailedUpgrade($crate::fallback!({$d($marker)?} _)) => $crate::fallback!({$d($fail)?} unreachable!())
473 }
474 }
475 };
476 }
477
478 impl $state_name {
479 fn marker(&self) -> StateMarker {
481 match self {
482 $($state_name::$variant_name(..) => StateMarker::$variant_name,)+
483 $state_name::FailedUpgrade(marker) => *marker,
484 }
485 }
486 fn failed(&self) -> bool {
488 match self {
489 $state_name::FailedUpgrade(_) => true,
490 _ => false,
491 }
492 }
493 fn take(&mut self) -> $state_name {
497 let mut owned_state = $state_name::FailedUpgrade(self.marker());
498 std::mem::swap(&mut owned_state, self);
499 owned_state
500 }
501 _fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
502 }
503
504 $crate::branch!{
505 if $($trait)? == SessionState {
506 impl SessionState for $state_name {
507 _fn_impl!{ready(&mut, self, session: Rc<RefCell<dyn ProxySession>>, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) -> SessionResult}
508 _fn_impl!{update_readiness(&mut, self, token: Token, events: Ready)}
509 _fn_impl!{timeout(&mut, self, token: Token, metrics: &mut SessionMetrics) -> StateResult}
510 _fn_impl!{cancel_timeouts(&mut, self)}
511 _fn_impl!{print_state(&, self, context: &str) | marker => error!("{} Session(FailedUpgrade({:?}))", context, marker)}
512 _fn_impl!{close(&mut, self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) | _ => {}}
513 _fn_impl!{shutting_down(&mut, self) -> SessionIsToBeClosed | _ => true}
514 }
515 } else {}
516 }
517 };
518 ($($tt:tt)+) => {
519 StateMachineBuilder!{($) $($tt)+}
520 }
521}
522
523pub trait ListenerHandler {
524 fn get_addr(&self) -> &SocketAddr;
525
526 fn get_tags(&self, key: &str) -> Option<&CachedTags>;
527
528 fn get_concatenated_tags(&self, key: &str) -> Option<&str> {
529 self.get_tags(key).map(|tags| tags.concatenated.as_str())
530 }
531
532 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>);
533}
534
535#[derive(thiserror::Error, Debug)]
536pub enum FrontendFromRequestError {
537 #[error("Could not parse hostname from '{host}': {error}")]
538 HostParse { host: String, error: String },
539 #[error("invalid remaining chars after hostname. Host: {0}")]
540 InvalidCharsAfterHost(String),
541 #[error("no cluster: {0}")]
542 NoClusterFound(RouterError),
543}
544
545pub trait L7ListenerHandler {
546 fn get_sticky_name(&self) -> &str;
547
548 fn get_connect_timeout(&self) -> u32;
549
550 fn frontend_from_request(
552 &self,
553 host: &str,
554 uri: &str,
555 method: &Method,
556 ) -> Result<Route, FrontendFromRequestError>;
557}
558
559#[derive(Clone, Copy, Debug, PartialEq, Eq)]
560pub enum BackendConnectionStatus {
561 NotConnected,
562 Connecting(Instant),
563 Connected,
564}
565
566impl BackendConnectionStatus {
567 pub fn is_connecting(&self) -> bool {
568 matches!(self, BackendConnectionStatus::Connecting(_))
569 }
570}
571
572#[derive(Debug, PartialEq, Eq)]
573pub enum BackendConnectAction {
574 New,
575 Reuse,
576 Replace,
577}
578
579#[derive(thiserror::Error, Debug)]
580pub enum BackendConnectionError {
581 #[error("Not found: {0:?}")]
582 NotFound(ObjectKind),
583 #[error("Too many connections on cluster {0:?}")]
584 MaxConnectionRetries(Option<String>),
585 #[error("the sessions slab has reached maximum capacity")]
586 MaxSessionsMemory,
587 #[error("error from the backend: {0}")]
588 Backend(BackendError),
589 #[error("failed to retrieve the cluster: {0}")]
590 RetrieveClusterError(RetrieveClusterError),
591}
592
593#[derive(thiserror::Error, Debug)]
595pub enum RetrieveClusterError {
596 #[error("No method given")]
597 NoMethod,
598 #[error("No host given")]
599 NoHost,
600 #[error("No path given")]
601 NoPath,
602 #[error("unauthorized route")]
603 UnauthorizedRoute,
604 #[error("{0}")]
605 RetrieveFrontend(FrontendFromRequestError),
606}
607
608#[derive(Debug, PartialEq, Eq)]
610pub enum AcceptError {
611 IoError,
612 TooManySessions,
613 WouldBlock,
614 RegisterError,
615 WrongSocketAddress,
616 BufferCapacityReached,
617}
618
619#[derive(thiserror::Error, Debug)]
621pub enum ListenerError {
622 #[error("failed to handle certificate request, got a resolver error, {0}")]
623 Resolver(CertificateResolverError),
624 #[error("failed to parse pem, {0}")]
625 PemParse(String),
626 #[error("failed to parse template {0}: {1}")]
627 TemplateParse(u16, TemplateError),
628 #[error("failed to build rustls context, {0}")]
629 BuildRustls(String),
630 #[error("could not activate listener with address {address:?}: {error}")]
631 Activation { address: SocketAddr, error: String },
632 #[error("Could not register listener socket: {0}")]
633 SocketRegistration(std::io::Error),
634 #[error("could not add frontend: {0}")]
635 AddFrontend(RouterError),
636 #[error("could not remove frontend: {0}")]
637 RemoveFrontend(RouterError),
638}
639
640#[derive(thiserror::Error, Debug)]
642pub enum ProxyError {
643 #[error("error while soft stopping {proxy_protocol} proxy: {error}")]
644 SoftStop {
645 proxy_protocol: String,
646 error: String,
647 },
648 #[error("error while hard stopping {proxy_protocol} proxy: {error}")]
649 HardStop {
650 proxy_protocol: String,
651 error: String,
652 },
653 #[error("found no listener with address {0:?}")]
654 NoListenerFound(SocketAddr),
655 #[error("a listener is already present for this token")]
656 ListenerAlreadyPresent,
657 #[error("could not add listener: {0}")]
658 AddListener(ListenerError),
659 #[error("could not add cluster: {0}")]
660 AddCluster(ListenerError),
661 #[error("failed to activate listener with address {address:?}: {listener_error}")]
662 ListenerActivation {
663 address: SocketAddr,
664 listener_error: ListenerError,
665 },
666 #[error("can not add frontend {front:?}: {error}")]
667 WrongInputFrontend {
668 front: RequestHttpFrontend,
669 error: String,
670 },
671 #[error("could not add frontend: {0}")]
672 AddFrontend(ListenerError),
673 #[error("could not remove frontend: {0}")]
674 RemoveFrontend(ListenerError),
675 #[error("could not add certificate: {0}")]
676 AddCertificate(CertificateResolverError),
677 #[error("could not remove certificate: {0}")]
678 RemoveCertificate(CertificateResolverError),
679 #[error("could not replace certificate: {0}")]
680 ReplaceCertificate(CertificateResolverError),
681 #[error("wrong certificate fingerprint: {0}")]
682 WrongCertificateFingerprint(FromHexError),
683 #[error("this request is not supported by the proxy")]
684 UnsupportedMessage,
685 #[error("failed to acquire the lock, {0}")]
686 Lock(String),
687 #[error("could not bind to socket {0:?}: {1}")]
688 BindToSocket(SocketAddr, ServerBindError),
689 #[error("error registering socket of listener: {0}")]
690 RegisterListener(std::io::Error),
691 #[error("the listener is not activated")]
692 UnactivatedListener,
693}
694
695use self::server::ListenToken;
696pub trait ProxyConfiguration {
697 fn notify(&mut self, message: WorkerRequest) -> WorkerResponse;
698 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError>;
699 fn create_session(
700 &mut self,
701 socket: TcpStream,
702 token: ListenToken,
703 wait_time: Duration,
704 proxy: Rc<RefCell<Self>>,
705 ) -> Result<(), AcceptError>;
707}
708
709pub trait L7Proxy {
710 fn kind(&self) -> ListenerType;
711
712 fn register_socket(
713 &self,
714 socket: &mut TcpStream,
715 token: Token,
716 interest: Interest,
717 ) -> Result<(), std::io::Error>;
718
719 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error>;
720
721 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token;
722
723 fn remove_session(&self, token: Token) -> bool;
726
727 fn backends(&self) -> Rc<RefCell<BackendMap>>;
728
729 fn clusters(&self) -> &HashMap<ClusterId, Cluster>;
730}
731
732#[derive(Debug, PartialEq, Eq)]
733pub enum RequiredEvents {
734 FrontReadBackNone,
735 FrontWriteBackNone,
736 FrontReadWriteBackNone,
737 FrontNoneBackNone,
738 FrontReadBackRead,
739 FrontWriteBackRead,
740 FrontReadWriteBackRead,
741 FrontNoneBackRead,
742 FrontReadBackWrite,
743 FrontWriteBackWrite,
744 FrontReadWriteBackWrite,
745 FrontNoneBackWrite,
746 FrontReadBackReadWrite,
747 FrontWriteBackReadWrite,
748 FrontReadWriteBackReadWrite,
749 FrontNoneBackReadWrite,
750}
751
752impl RequiredEvents {
753 pub fn front_readable(&self) -> bool {
754 matches!(
755 *self,
756 RequiredEvents::FrontReadBackNone
757 | RequiredEvents::FrontReadWriteBackNone
758 | RequiredEvents::FrontReadBackRead
759 | RequiredEvents::FrontReadWriteBackRead
760 | RequiredEvents::FrontReadBackWrite
761 | RequiredEvents::FrontReadWriteBackWrite
762 | RequiredEvents::FrontReadBackReadWrite
763 | RequiredEvents::FrontReadWriteBackReadWrite
764 )
765 }
766
767 pub fn front_writable(&self) -> bool {
768 matches!(
769 *self,
770 RequiredEvents::FrontWriteBackNone
771 | RequiredEvents::FrontReadWriteBackNone
772 | RequiredEvents::FrontWriteBackRead
773 | RequiredEvents::FrontReadWriteBackRead
774 | RequiredEvents::FrontWriteBackWrite
775 | RequiredEvents::FrontReadWriteBackWrite
776 | RequiredEvents::FrontWriteBackReadWrite
777 | RequiredEvents::FrontReadWriteBackReadWrite
778 )
779 }
780
781 pub fn back_readable(&self) -> bool {
782 matches!(
783 *self,
784 RequiredEvents::FrontReadBackRead
785 | RequiredEvents::FrontWriteBackRead
786 | RequiredEvents::FrontReadWriteBackRead
787 | RequiredEvents::FrontNoneBackRead
788 | RequiredEvents::FrontReadBackReadWrite
789 | RequiredEvents::FrontWriteBackReadWrite
790 | RequiredEvents::FrontReadWriteBackReadWrite
791 | RequiredEvents::FrontNoneBackReadWrite
792 )
793 }
794
795 pub fn back_writable(&self) -> bool {
796 matches!(
797 *self,
798 RequiredEvents::FrontReadBackWrite
799 | RequiredEvents::FrontWriteBackWrite
800 | RequiredEvents::FrontReadWriteBackWrite
801 | RequiredEvents::FrontNoneBackWrite
802 | RequiredEvents::FrontReadBackReadWrite
803 | RequiredEvents::FrontWriteBackReadWrite
804 | RequiredEvents::FrontReadWriteBackReadWrite
805 | RequiredEvents::FrontNoneBackReadWrite
806 )
807 }
808}
809
810#[derive(Debug, PartialEq, Eq)]
812pub enum StateResult {
813 CloseBackend,
815 CloseSession,
817 ConnectBackend,
819 Continue,
821 Upgrade,
823}
824
825#[derive(Debug, Clone, Copy, PartialEq, Eq)]
827pub enum SessionResult {
828 Close,
830 Continue,
832 Upgrade,
834}
835
836#[derive(Debug, PartialEq, Eq)]
837pub enum SocketType {
838 Listener,
839 FrontClient,
840}
841
842type SessionIsToBeClosed = bool;
843
844#[derive(Clone)]
845pub struct Readiness {
846 pub event: Ready,
848 pub interest: Ready,
850}
851
852impl Display for Readiness {
853 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
854 let i = &mut [b'-'; 4];
855 let r = &mut [b'-'; 4];
856 let mixed = &mut [b'-'; 4];
857
858 display_ready(i, self.interest);
859 display_ready(r, self.event);
860 display_ready(mixed, self.interest & self.event);
861
862 write!(
863 f,
864 "I({:?})&R({:?})=M({:?})",
865 String::from_utf8_lossy(i),
866 String::from_utf8_lossy(r),
867 String::from_utf8_lossy(mixed)
868 )
869 }
870}
871
872impl Default for Readiness {
873 fn default() -> Self {
874 Self::new()
875 }
876}
877
878impl Readiness {
879 pub const fn new() -> Readiness {
880 Readiness {
881 event: Ready::EMPTY,
882 interest: Ready::EMPTY,
883 }
884 }
885
886 pub fn reset(&mut self) {
887 self.event = Ready::EMPTY;
888 self.interest = Ready::EMPTY;
889 }
890
891 pub fn filter_interest(&self) -> Ready {
893 self.event & self.interest
894 }
895}
896
897pub fn display_ready(s: &mut [u8], readiness: Ready) {
898 if readiness.is_readable() {
899 s[0] = b'R';
900 }
901 if readiness.is_writable() {
902 s[1] = b'W';
903 }
904 if readiness.is_error() {
905 s[2] = b'E';
906 }
907 if readiness.is_hup() {
908 s[3] = b'H';
909 }
910}
911
912pub fn ready_to_string(readiness: Ready) -> String {
913 let s = &mut [b'-'; 4];
914 display_ready(s, readiness);
915 String::from_utf8(s.to_vec()).unwrap()
916}
917
918impl fmt::Debug for Readiness {
919 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
920 let i = &mut [b'-'; 4];
921 let r = &mut [b'-'; 4];
922 let mixed = &mut [b'-'; 4];
923
924 display_ready(i, self.interest);
925 display_ready(r, self.event);
926 display_ready(mixed, self.interest & self.event);
927
928 write!(
929 f,
930 "Readiness {{ interest: {}, readiness: {}, mixed: {} }}",
931 str::from_utf8(i).unwrap(),
932 str::from_utf8(r).unwrap(),
933 str::from_utf8(mixed).unwrap()
934 )
935 }
936}
937
938#[derive(Clone, Debug)]
939pub struct SessionMetrics {
940 pub start: Option<Instant>,
942 pub service_time: Duration,
944 pub wait_time: Duration,
946 pub bin: usize,
948 pub bout: usize,
950
951 pub service_start: Option<Instant>,
953 pub wait_start: Instant,
954
955 pub backend_id: Option<String>,
956 pub backend_start: Option<Instant>,
957 pub backend_connected: Option<Instant>,
958 pub backend_stop: Option<Instant>,
959 pub backend_bin: usize,
960 pub backend_bout: usize,
961}
962
963impl SessionMetrics {
964 pub fn new(wait_time: Option<Duration>) -> SessionMetrics {
965 SessionMetrics {
966 start: Some(Instant::now()),
967 service_time: Duration::from_secs(0),
968 wait_time: wait_time.unwrap_or_else(|| Duration::from_secs(0)),
969 bin: 0,
970 bout: 0,
971 service_start: None,
972 wait_start: Instant::now(),
973 backend_id: None,
974 backend_start: None,
975 backend_connected: None,
976 backend_stop: None,
977 backend_bin: 0,
978 backend_bout: 0,
979 }
980 }
981
982 pub fn reset(&mut self) {
983 self.start = None;
984 self.service_time = Duration::from_secs(0);
985 self.wait_time = Duration::from_secs(0);
986 self.bin = 0;
987 self.bout = 0;
988 self.service_start = None;
989 self.backend_start = None;
990 self.backend_connected = None;
991 self.backend_stop = None;
992 self.backend_bin = 0;
993 self.backend_bout = 0;
994 }
995
996 pub fn service_start(&mut self) {
997 let now = Instant::now();
998
999 if self.start.is_none() {
1000 self.start = Some(now);
1001 }
1002
1003 self.service_start = Some(now);
1004 self.wait_time += now - self.wait_start;
1005 }
1006
1007 pub fn service_stop(&mut self) {
1008 if let Some(start) = self.service_start.take() {
1009 let duration = Instant::now() - start;
1010 self.service_time += duration;
1011 }
1012 }
1013
1014 pub fn wait_start(&mut self) {
1015 self.wait_start = Instant::now();
1016 }
1017
1018 pub fn service_time(&self) -> Duration {
1019 match self.service_start {
1020 Some(start) => {
1021 let last_duration = Instant::now() - start;
1022 self.service_time + last_duration
1023 }
1024 None => self.service_time,
1025 }
1026 }
1027
1028 pub fn request_time(&self) -> Duration {
1030 match self.start {
1031 Some(start) => Instant::now() - start,
1032 None => Duration::from_secs(0),
1033 }
1034 }
1035
1036 pub fn backend_start(&mut self) {
1037 self.backend_start = Some(Instant::now());
1038 }
1039
1040 pub fn backend_connected(&mut self) {
1041 self.backend_connected = Some(Instant::now());
1042 }
1043
1044 pub fn backend_stop(&mut self) {
1045 self.backend_stop = Some(Instant::now());
1046 }
1047
1048 pub fn backend_response_time(&self) -> Option<Duration> {
1049 match (self.backend_connected, self.backend_stop) {
1050 (Some(start), Some(end)) => Some(end - start),
1051 (Some(start), None) => Some(Instant::now() - start),
1052 _ => None,
1053 }
1054 }
1055
1056 pub fn backend_connection_time(&self) -> Option<Duration> {
1057 match (self.backend_start, self.backend_connected) {
1058 (Some(start), Some(end)) => Some(end - start),
1059 _ => None,
1060 }
1061 }
1062
1063 pub fn register_end_of_session(&self, context: &LogContext) {
1064 let request_time = self.request_time();
1065 let service_time = self.service_time();
1066
1067 if let Some(cluster_id) = context.cluster_id {
1068 time!("request_time", cluster_id, request_time.as_millis());
1069 time!("service_time", cluster_id, service_time.as_millis());
1070 }
1071 time!("request_time", request_time.as_millis());
1072 time!("service_time", service_time.as_millis());
1073
1074 if let Some(backend_id) = self.backend_id.as_ref() {
1075 if let Some(backend_response_time) = self.backend_response_time() {
1076 record_backend_metrics!(
1077 context.cluster_id.as_str_or("-"),
1078 backend_id,
1079 backend_response_time.as_millis(),
1080 self.backend_connection_time(),
1081 self.backend_bin,
1082 self.backend_bout
1083 );
1084 }
1085 }
1086
1087 incr!("access_logs.count", context.cluster_id, context.backend_id);
1088 }
1089}
1090
1091#[derive(Debug, PartialEq, Clone)]
1095pub struct PeakEWMA {
1096 pub decay: f64,
1100 pub rtt: f64,
1105 pub last_event: Instant,
1107}
1108
1109impl Default for PeakEWMA {
1110 fn default() -> Self {
1111 Self::new()
1112 }
1113}
1114
1115impl PeakEWMA {
1116 pub fn new() -> Self {
1118 PeakEWMA {
1119 decay: 1_000_000_000f64,
1121 rtt: 50_000_000f64,
1123 last_event: Instant::now(),
1124 }
1125 }
1126
1127 pub fn observe(&mut self, rtt: f64) {
1128 let now = Instant::now();
1129 let dur = now - self.last_event;
1130
1131 if rtt > self.rtt {
1133 self.rtt = rtt;
1134 } else {
1135 let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
1137 self.rtt = self.rtt * weight + rtt * (1.0 - weight);
1138 }
1139
1140 self.last_event = now;
1141 }
1142
1143 pub fn get(&mut self, active_requests: usize) -> f64 {
1144 self.observe(0.0);
1147
1148 (active_requests + 1) as f64 * self.rtt
1149 }
1150}
1151
1152pub mod testing {
1153 pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc};
1154
1155 pub use anyhow::Context;
1156 pub use mio::{Poll, Registry, Token, net::UnixStream};
1157 pub use slab::Slab;
1158 pub use sozu_command::{
1159 proto::command::{
1160 HttpListenerConfig, HttpsListenerConfig, ServerConfig, TcpListenerConfig,
1161 },
1162 scm_socket::{Listeners, ScmSocket},
1163 };
1164
1165 pub use crate::{
1166 Protocol, ProxySession,
1167 backends::BackendMap,
1168 http::HttpProxy,
1169 https::HttpsProxy,
1170 pool::Pool,
1171 server::{ListenSession, ProxyChannel, Server, SessionManager},
1172 tcp::TcpProxy,
1173 };
1174
1175 pub struct ServerParts {
1177 pub event_loop: Poll,
1178 pub registry: Registry,
1179 pub sessions: Rc<RefCell<SessionManager>>,
1180 pub pool: Rc<RefCell<Pool>>,
1181 pub backends: Rc<RefCell<BackendMap>>,
1182 pub client_scm_socket: ScmSocket,
1183 pub server_scm_socket: ScmSocket,
1184 pub server_config: ServerConfig,
1185 }
1186
1187 pub fn prebuild_server(
1189 max_buffers: usize,
1190 buffer_size: usize,
1191 send_scm: bool,
1192 ) -> anyhow::Result<ServerParts> {
1193 let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?;
1194 let backends = Rc::new(RefCell::new(BackendMap::new()));
1195 let server_config = ServerConfig {
1196 max_connections: max_buffers as u64,
1197 ..Default::default()
1198 };
1199
1200 let pool = Rc::new(RefCell::new(Pool::with_capacity(
1201 1,
1202 max_buffers,
1203 buffer_size,
1204 )));
1205
1206 let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
1207 {
1208 let entry = sessions.vacant_entry();
1209 info!("taking token {:?} for channel", entry.key());
1210 entry.insert(Rc::new(RefCell::new(ListenSession {
1211 protocol: Protocol::Channel,
1212 })));
1213 }
1214 {
1215 let entry = sessions.vacant_entry();
1216 info!("taking token {:?} for timer", entry.key());
1217 entry.insert(Rc::new(RefCell::new(ListenSession {
1218 protocol: Protocol::Timer,
1219 })));
1220 }
1221 {
1222 let entry = sessions.vacant_entry();
1223 info!("taking token {:?} for metrics", entry.key());
1224 entry.insert(Rc::new(RefCell::new(ListenSession {
1225 protocol: Protocol::Metrics,
1226 })));
1227 }
1228 let sessions = SessionManager::new(sessions, max_buffers);
1229
1230 let registry = event_loop
1231 .registry()
1232 .try_clone()
1233 .with_context(|| "Failed at creating a registry")?;
1234
1235 let (scm_server, scm_client) =
1236 UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?;
1237 let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd())
1238 .with_context(|| "Failed at creating the scm client socket")?;
1239 let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd())
1240 .with_context(|| "Failed at creating the scm server socket")?;
1241 if send_scm {
1242 client_scm_socket
1243 .send_listeners(&Listeners::default())
1244 .with_context(|| "Failed at sending empty listeners")?;
1245 }
1246
1247 Ok(ServerParts {
1248 event_loop,
1249 registry,
1250 sessions,
1251 pool,
1252 backends,
1253 client_scm_socket,
1254 server_scm_socket,
1255 server_config,
1256 })
1257 }
1258}