1mod local_drain;
10pub mod names;
11mod network_drain;
12mod writer;
13
14use std::{
15 cell::RefCell,
16 collections::{BTreeMap, HashMap},
17 io::{self, Write},
18 net::SocketAddr,
19 str,
20 time::{Duration, Instant},
21};
22
23use mio::net::UdpSocket;
24use sozu_command::config::MetricDetailLevel;
25use sozu_command::proto::command::{
26 FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
27};
28
29use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
30
31fn filter_labels_for_detail<'a>(
45 detail: MetricDetailLevel,
46 cluster_id: Option<&'a str>,
47 backend_id: Option<&'a str>,
48) -> (Option<&'a str>, Option<&'a str>) {
49 match detail {
50 MetricDetailLevel::Process | MetricDetailLevel::Frontend => (None, None),
51 MetricDetailLevel::Cluster => (cluster_id, None),
52 MetricDetailLevel::Backend => (cluster_id, backend_id),
53 }
54}
55
56pub(crate) fn http_status_code_metric_name(status: u16) -> Option<&'static str> {
69 match status {
70 200 => Some("http.status.200"),
71 201 => Some("http.status.201"),
72 204 => Some("http.status.204"),
73 301 => Some("http.status.301"),
74 302 => Some("http.status.302"),
75 304 => Some("http.status.304"),
76 400 => Some("http.status.400"),
77 401 => Some("http.status.401"),
78 403 => Some("http.status.403"),
79 404 => Some("http.status.404"),
80 408 => Some("http.status.408"),
81 413 => Some("http.status.413"),
82 429 => Some("http.status.429"),
83 500 => Some("http.status.500"),
84 502 => Some("http.status.502"),
85 503 => Some("http.status.503"),
86 504 => Some("http.status.504"),
87 507 => Some("http.status.507"),
88 _ => None,
89 }
90}
91
92thread_local! {
93 pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
94}
95
96#[derive(thiserror::Error, Debug)]
97pub enum MetricError {
98 #[error("Could not parse udp address {address}: {error}")]
99 WrongUdpAddress { address: String, error: String },
100 #[error("Could not bind to udp address {address}: {error}")]
101 UdpBind { address: String, error: String },
102 #[error("No metrics found for object with id {0}")]
103 NoMetrics(String),
104 #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
105 HistogramCreation {
106 time_metric: MetricValue,
107 error: String,
108 },
109 #[error("could not record time metric {time_metric:?}: {error}")]
110 TimeMetricRecordingError {
111 time_metric: MetricValue,
112 error: String,
113 },
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum MetricValue {
118 Gauge(usize),
119 GaugeAdd(i64),
120 Count(i64),
121 Time(usize),
122}
123
124impl MetricValue {
125 fn is_time(&self) -> bool {
126 matches!(self, &MetricValue::Time(_))
127 }
128
129 fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
130 match (self, m) {
131 (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
132 let changed = *v1 != v2;
133 *v1 = v2;
134 changed
135 }
136 (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
137 let changed = v2 != 0;
146 let res = *v1 as i64 + v2;
147 *v1 = if res >= 0 {
148 res as usize
149 } else {
150 error!(
151 "metric {} underflow: previous value: {}, adding: {}",
152 key, v1, v2
153 );
154 0
155 };
156
157 changed
158 }
159 (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
160 let changed = v2 != 0;
161 *v1 += v2;
162 changed
163 }
164 (s, m) => {
165 error!(
173 "tried to update metric {} of value {:?} with an incompatible metric: {:?}",
174 key, s, m
175 );
176 false
177 }
178 }
179 }
180}
181
182#[derive(Debug, Clone)]
183pub struct StoredMetricValue {
184 last_sent: Instant,
185 updated: bool,
186 data: MetricValue,
187}
188
189impl StoredMetricValue {
190 pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
191 let data = if let MetricValue::GaugeAdd(v) = data {
197 if v >= 0 {
198 MetricValue::Gauge(v as usize)
199 } else {
200 error!(
201 "stored metric created with negative GaugeAdd({}), clamping to 0",
202 v
203 );
204 MetricValue::Gauge(0)
205 }
206 } else {
207 data
208 };
209 StoredMetricValue {
210 last_sent,
211 updated: true,
212 data,
213 }
214 }
215
216 pub fn update(&mut self, key: &'static str, m: MetricValue) {
217 let updated = self.data.update(key, m);
218 if !self.updated {
219 self.updated = updated;
220 }
221 }
222}
223
224pub fn setup<O: Into<String>>(
225 metrics_host: &SocketAddr,
226 origin: O,
227 use_tagged_metrics: bool,
228 prefix: Option<String>,
229 detail: MetricDetailLevel,
230) -> Result<(), MetricError> {
231 let metrics_socket = udp_bind()?;
232
233 debug!(
234 "setting up metrics: local address = {:#?}",
235 metrics_socket.local_addr()
236 );
237
238 METRICS.with(|metrics| {
239 if let Some(p) = prefix {
240 (*metrics.borrow_mut()).set_up_prefix(p);
241 }
242 (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
243 (*metrics.borrow_mut()).set_up_origin(origin.into());
244 (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
245 (*metrics.borrow_mut()).set_up_detail(detail);
246 });
247 Ok(())
248}
249
250pub trait Subscriber {
251 fn receive_metric(
252 &mut self,
253 label: &'static str,
254 cluster_id: Option<&str>,
255 backend_id: Option<&str>,
256 metric: MetricValue,
257 );
258}
259
260const LEASE_TICK_INTERVAL: Duration = Duration::from_secs(5);
264
265pub const LEASE_TTL_MAX: Duration = Duration::from_secs(300);
268
269pub const LEASE_TTL_DEFAULT: Duration = Duration::from_secs(60);
272
273pub const LEASE_TABLE_CAP: usize = 64;
282
283pub const LEASE_CLIENT_ID_MAX_BYTES: usize = 64;
288
289#[derive(Clone, Copy, Debug, PartialEq, Eq)]
294pub enum LeaseApplyOutcome {
295 Applied {
297 previous_effective: MetricDetailLevel,
298 new_effective: MetricDetailLevel,
299 },
300 ClientIdTooLong,
302 TableFull,
306 TtlOutOfRange,
313 Unauthorized,
323}
324
325#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
336pub struct PeerBinding {
337 pub pid: Option<i32>,
338 pub session_ulid: Option<u128>,
342}
343
344impl PeerBinding {
345 pub fn is_known(&self) -> bool {
350 self.pid.is_some() && self.session_ulid.is_some()
351 }
352
353 pub fn matches(&self, other: &PeerBinding) -> bool {
357 self.is_known() && self.pid == other.pid && self.session_ulid == other.session_ulid
358 }
359}
360
361#[derive(Clone, Copy, Debug)]
365struct LeaseEntry {
366 level: MetricDetailLevel,
367 expires_at: Instant,
368 binding: PeerBinding,
369}
370
371#[derive(Clone, Copy, Debug, PartialEq, Eq)]
373pub enum LeaseClearOutcome {
374 Cleared {
379 previous_effective: MetricDetailLevel,
380 },
381 NotFound,
383 Unauthorized,
388}
389
390pub struct Aggregator {
391 prefix: String,
393 network: Option<NetworkDrain>,
395 local: LocalDrain,
397 configured: MetricDetailLevel,
401 effective: MetricDetailLevel,
405 leases: HashMap<String, LeaseEntry>,
410 last_lease_tick: Instant,
414}
415
416impl Aggregator {
417 pub fn new(prefix: String) -> Aggregator {
418 let default_detail = MetricDetailLevel::default();
419 Aggregator {
420 prefix: prefix.clone(),
421 network: None,
422 local: LocalDrain::new(prefix),
423 configured: default_detail,
424 effective: default_detail,
425 leases: HashMap::new(),
426 last_lease_tick: Instant::now(),
427 }
428 }
429
430 pub fn set_up_prefix(&mut self, prefix: String) {
431 self.prefix = prefix;
432 }
433
434 pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
435 self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
436 }
437
438 pub fn set_up_origin(&mut self, origin: String) {
439 if let Some(n) = self.network.as_mut() {
440 n.origin = origin;
441 }
442 }
443
444 pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
445 if let Some(n) = self.network.as_mut() {
446 n.use_tagged_metrics = tagged;
447 }
448 }
449
450 pub fn set_up_detail(&mut self, detail: MetricDetailLevel) {
458 self.configured = detail;
459 self.recompute_effective();
460 }
461
462 pub fn detail_configured(&self) -> MetricDetailLevel {
465 self.configured
466 }
467
468 pub fn detail_effective(&self) -> MetricDetailLevel {
471 self.effective
472 }
473
474 pub fn lease_apply(
492 &mut self,
493 client_id: String,
494 level: MetricDetailLevel,
495 ttl: Duration,
496 binding: PeerBinding,
497 ) -> LeaseApplyOutcome {
498 if client_id.len() > LEASE_CLIENT_ID_MAX_BYTES {
499 return LeaseApplyOutcome::ClientIdTooLong;
500 }
501 if ttl > LEASE_TTL_MAX {
502 return LeaseApplyOutcome::TtlOutOfRange;
503 }
504 let is_renewal = self.leases.contains_key(&client_id);
510 if !is_renewal && self.leases.len() >= LEASE_TABLE_CAP {
511 return LeaseApplyOutcome::TableFull;
512 }
513 if is_renewal
524 && let Some(entry) = self.leases.get(&client_id)
525 && entry.binding.is_known()
526 && !entry.binding.matches(&binding)
527 {
528 return LeaseApplyOutcome::Unauthorized;
529 }
530 let expires_at = Instant::now() + ttl;
531 self.leases.insert(
532 client_id,
533 LeaseEntry {
534 level,
535 expires_at,
536 binding,
537 },
538 );
539 let previous_effective = self.effective;
540 self.recompute_effective();
541 LeaseApplyOutcome::Applied {
542 previous_effective,
543 new_effective: self.effective,
544 }
545 }
546
547 pub fn lease_clear(&mut self, client_id: &str, presented: PeerBinding) -> LeaseClearOutcome {
555 let Some(entry) = self.leases.get(client_id) else {
556 return LeaseClearOutcome::NotFound;
557 };
558 if entry.binding.is_known() && !entry.binding.matches(&presented) {
564 return LeaseClearOutcome::Unauthorized;
565 }
566 self.leases.remove(client_id);
567 let previous = self.effective;
568 self.recompute_effective();
569 LeaseClearOutcome::Cleared {
570 previous_effective: previous,
571 }
572 }
573
574 pub fn lease_count(&self) -> u32 {
578 self.leases.len() as u32
579 }
580
581 pub fn lease_tick(&mut self, now: Instant) -> Option<MetricDetailLevel> {
590 self.last_lease_tick = now;
591 let before = self.leases.len();
592 self.leases.retain(|_, entry| entry.expires_at > now);
593 if self.leases.len() == before {
594 return None;
595 }
596 let previous = self.effective;
597 self.recompute_effective();
598 if previous != self.effective {
599 Some(previous)
600 } else {
601 None
602 }
603 }
604
605 pub fn lease_tick_due(&self, now: Instant) -> bool {
609 now.duration_since(self.last_lease_tick) >= LEASE_TICK_INTERVAL
610 }
611
612 fn recompute_effective(&mut self) {
616 let mut max_lease = self.configured;
617 for entry in self.leases.values() {
618 if entry.level > max_lease {
619 max_lease = entry.level;
620 }
621 }
622 self.effective = max_lease;
623 }
624
625 pub fn socket(&self) -> Option<&UdpSocket> {
626 self.network.as_ref().map(|n| &n.remote.get_ref().socket)
627 }
628
629 pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
630 self.network
631 .as_mut()
632 .map(|n| &mut n.remote.get_mut().socket)
633 }
634
635 pub fn count_add(&mut self, key: &'static str, count_value: i64) {
636 self.receive_metric(key, None, None, MetricValue::Count(count_value));
637 }
638
639 pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
640 self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
641 }
642
643 pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
644 self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
645 }
646
647 pub fn writable(&mut self) {
648 if let Some(ref mut net) = self.network.as_mut() {
649 net.writable();
650 }
651 }
652
653 pub fn send_data(&mut self) {
654 if let Some(ref mut net) = self.network.as_mut() {
655 net.send_metrics();
656 }
657 }
658
659 pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
660 self.local.dump_proxy_metrics(&Vec::new())
661 }
662
663 pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
664 self.local.query(q)
665 }
666
667 pub fn clear_local(&mut self) {
668 if let Some(ref mut net) = self.network.as_mut() {
669 net.clear();
670 }
671 self.local.clear();
672 }
673
674 pub fn configure(&mut self, config: &MetricsConfiguration) {
675 self.local.configure(config);
676 }
677
678 pub fn remove_cluster(&mut self, cluster_id: &str) {
691 if let Some(ref mut net) = self.network.as_mut() {
692 net.remove_cluster(cluster_id);
693 }
694 self.local.remove_cluster(cluster_id);
695 }
696
697 pub fn add_cluster(&mut self, cluster_id: &str) {
703 if let Some(ref mut net) = self.network.as_mut() {
704 net.add_cluster(cluster_id);
705 }
706 self.local.add_cluster(cluster_id);
707 }
708
709 pub fn remove_backend(&mut self, cluster_id: &str, backend_id: &str) {
713 if let Some(ref mut net) = self.network.as_mut() {
714 net.remove_backend(cluster_id, backend_id);
715 }
716 self.local.remove_backend(cluster_id, backend_id);
717 }
718}
719
720impl Subscriber for Aggregator {
721 fn receive_metric(
722 &mut self,
723 label: &'static str,
724 cluster_id: Option<&str>,
725 backend_id: Option<&str>,
726 metric: MetricValue,
727 ) {
728 let (cluster_id, backend_id) =
734 filter_labels_for_detail(self.effective, cluster_id, backend_id);
735 if let Some(ref mut net) = self.network.as_mut() {
736 net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
737 }
738 self.local
739 .receive_metric(label, cluster_id, backend_id, metric);
740 }
741}
742
743pub struct MetricSocket {
744 pub addr: SocketAddr,
745 pub socket: UdpSocket,
746}
747
748impl Write for MetricSocket {
749 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
750 self.socket.send_to(buf, self.addr)
751 }
752
753 fn flush(&mut self) -> io::Result<()> {
754 Ok(())
755 }
756}
757
758pub fn udp_bind() -> Result<UdpSocket, MetricError> {
759 let address = "0.0.0.0:0";
760
761 let udp_address =
762 address
763 .parse::<SocketAddr>()
764 .map_err(|parse_error| MetricError::WrongUdpAddress {
765 address: address.to_owned(),
766 error: parse_error.to_string(),
767 })?;
768
769 UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
770 address: udp_address.to_string(),
771 error: parse_error.to_string(),
772 })
773}
774
775#[macro_export]
777macro_rules! count (
778 ($key:expr, $value: expr) => ({
779 let v = $value;
780 $crate::metrics::METRICS.with(|metrics| {
781 (*metrics.borrow_mut()).count_add($key, v);
782 });
783 })
784);
785
786#[macro_export]
788macro_rules! incr (
789 ($key:expr) => (count!($key, 1));
790 ($key:expr, $cluster_id:expr, $backend_id:expr) => {
791 {
792 use $crate::metrics::Subscriber;
793
794 $crate::metrics::METRICS.with(|metrics| {
795 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
796 });
797 }
798 }
799);
800
801#[macro_export]
802macro_rules! decr (
803 ($key:expr) => (count!($key, -1))
804);
805
806#[macro_export]
807macro_rules! gauge (
808 ($key:expr, $value: expr) => ({
809 let v = $value;
810 $crate::metrics::METRICS.with(|metrics| {
811 (*metrics.borrow_mut()).set_gauge($key, v);
812 });
813 });
814 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
815 {
816 use $crate::metrics::Subscriber;
817 let v = $value;
818
819 $crate::metrics::METRICS.with(|metrics| {
820 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Gauge(v as usize));
821 });
822 }
823 }
824);
825
826#[macro_export]
827macro_rules! gauge_add (
828 ($key:expr, $value: expr) => ({
829 let v = $value;
830 $crate::metrics::METRICS.with(|metrics| {
831 (*metrics.borrow_mut()).gauge_add($key, v);
832 });
833 });
834 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
835 {
836 use $crate::metrics::Subscriber;
837 let v = $value;
838
839 $crate::metrics::METRICS.with(|metrics| {
840 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
841 });
842 }
843 }
844);
845
846#[macro_export]
847macro_rules! time (
848 ($key:expr, $value: expr) => ({
849 use $crate::metrics::{MetricValue,Subscriber};
850 let v = $value;
851 $crate::metrics::METRICS.with(|metrics| {
852 let m = &mut *metrics.borrow_mut();
853
854 m.receive_metric($key, None, None, MetricValue::Time(v as usize));
855 });
856 });
857 ($key:expr, $cluster_id:expr, $value: expr) => ({
858 use $crate::metrics::{MetricValue,Subscriber};
859 let v = $value;
860 $crate::metrics::METRICS.with(|metrics| {
861 let m = &mut *metrics.borrow_mut();
862 let cluster: &str = $cluster_id;
863
864 m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
865 });
866 })
867);
868
869#[macro_export]
870macro_rules! record_backend_metrics (
871 ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
872 use $crate::metrics::{MetricValue,Subscriber};
873 $crate::metrics::METRICS.with(|metrics| {
874 let m = &mut *metrics.borrow_mut();
875 let cluster_id: &str = $cluster_id;
876 let backend_id: &str = $backend_id;
877
878 m.receive_metric($crate::metrics::names::backend::BYTES_IN, Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
879 m.receive_metric($crate::metrics::names::backend::BYTES_OUT, Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
880 m.receive_metric($crate::metrics::names::backend::RESPONSE_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
881 if let Some(t) = $backend_connection_time {
882 m.receive_metric($crate::metrics::names::backend::CONNECTION_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
883 }
884
885 m.receive_metric($crate::metrics::names::backend::REQUESTS, Some(cluster_id), Some(backend_id), MetricValue::Count(1));
886 });
887 }
888);
889
890#[cfg(test)]
891mod tests {
892 use super::*;
893
894 #[test]
895 fn filter_labels_process_drops_both() {
896 assert_eq!(
897 filter_labels_for_detail(MetricDetailLevel::Process, Some("c"), Some("b")),
898 (None, None),
899 );
900 }
901
902 #[test]
903 fn filter_labels_frontend_drops_both_today() {
904 assert_eq!(
906 filter_labels_for_detail(MetricDetailLevel::Frontend, Some("c"), Some("b")),
907 (None, None),
908 );
909 }
910
911 #[test]
912 fn filter_labels_cluster_keeps_cluster_drops_backend() {
913 assert_eq!(
914 filter_labels_for_detail(MetricDetailLevel::Cluster, Some("c"), Some("b")),
915 (Some("c"), None),
916 );
917 }
918
919 #[test]
920 fn filter_labels_backend_keeps_both() {
921 assert_eq!(
922 filter_labels_for_detail(MetricDetailLevel::Backend, Some("c"), Some("b")),
923 (Some("c"), Some("b")),
924 );
925 }
926
927 #[test]
928 fn filter_labels_none_in_none_out() {
929 for detail in [
932 MetricDetailLevel::Process,
933 MetricDetailLevel::Frontend,
934 MetricDetailLevel::Cluster,
935 MetricDetailLevel::Backend,
936 ] {
937 assert_eq!(filter_labels_for_detail(detail, None, None), (None, None));
938 }
939 }
940
941 #[test]
942 fn aggregator_default_detail_is_cluster() {
943 let agg = Aggregator::new("sozu".to_owned());
946 assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
947 assert_eq!(agg.detail_effective(), MetricDetailLevel::Cluster);
948 assert_eq!(agg.lease_count(), 0);
949 }
950
951 fn owner_binding() -> PeerBinding {
955 PeerBinding {
956 pid: Some(1234),
957 session_ulid: Some(0x0123_4567_89ab_cdef_0123_4567_89ab_cdefu128),
958 }
959 }
960
961 fn other_binding() -> PeerBinding {
962 PeerBinding {
963 pid: Some(5678),
964 session_ulid: Some(0xfedc_ba98_7654_3210_fedc_ba98_7654_3210u128),
965 }
966 }
967
968 fn unwrap_applied(outcome: LeaseApplyOutcome) -> (MetricDetailLevel, MetricDetailLevel) {
972 match outcome {
973 LeaseApplyOutcome::Applied {
974 previous_effective,
975 new_effective,
976 } => (previous_effective, new_effective),
977 other => panic!("expected LeaseApplyOutcome::Applied, got {other:?}"),
978 }
979 }
980
981 #[test]
982 fn lease_apply_elevates_effective_above_configured() {
983 let mut agg = Aggregator::new("sozu".to_owned());
986 agg.set_up_detail(MetricDetailLevel::Cluster);
987 let (prev, new) = unwrap_applied(agg.lease_apply(
988 "test:1".to_owned(),
989 MetricDetailLevel::Backend,
990 Duration::from_secs(60),
991 PeerBinding::default(),
992 ));
993 assert_eq!(prev, MetricDetailLevel::Cluster);
994 assert_eq!(new, MetricDetailLevel::Backend);
995 assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
996 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
997 assert_eq!(agg.lease_count(), 1);
998 }
999
1000 #[test]
1001 fn lease_apply_below_configured_does_not_lower_effective() {
1002 let mut agg = Aggregator::new("sozu".to_owned());
1004 agg.set_up_detail(MetricDetailLevel::Backend);
1005 let (prev, new) = unwrap_applied(agg.lease_apply(
1006 "test:1".to_owned(),
1007 MetricDetailLevel::Cluster,
1008 Duration::from_secs(60),
1009 PeerBinding::default(),
1010 ));
1011 assert_eq!(prev, MetricDetailLevel::Backend);
1012 assert_eq!(new, MetricDetailLevel::Backend);
1013 }
1014
1015 #[test]
1016 fn lease_apply_rejects_client_id_over_cap() {
1017 let mut agg = Aggregator::new("sozu".to_owned());
1020 let too_long = "x".repeat(LEASE_CLIENT_ID_MAX_BYTES + 1);
1021 assert_eq!(
1022 agg.lease_apply(
1023 too_long,
1024 MetricDetailLevel::Backend,
1025 Duration::from_secs(60),
1026 PeerBinding::default(),
1027 ),
1028 LeaseApplyOutcome::ClientIdTooLong
1029 );
1030 assert_eq!(agg.lease_count(), 0);
1031 }
1032
1033 #[test]
1034 fn lease_apply_rejects_when_table_is_full() {
1035 let mut agg = Aggregator::new("sozu".to_owned());
1039 for i in 0..LEASE_TABLE_CAP {
1040 assert!(matches!(
1041 agg.lease_apply(
1042 format!("client:{i:02}"),
1043 MetricDetailLevel::Backend,
1044 Duration::from_secs(60),
1045 PeerBinding::default(),
1046 ),
1047 LeaseApplyOutcome::Applied { .. }
1048 ));
1049 }
1050 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1051 assert_eq!(
1053 agg.lease_apply(
1054 "newcomer".to_owned(),
1055 MetricDetailLevel::Backend,
1056 Duration::from_secs(60),
1057 PeerBinding::default(),
1058 ),
1059 LeaseApplyOutcome::TableFull,
1060 );
1061 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1062 assert!(matches!(
1064 agg.lease_apply(
1065 "client:00".to_owned(),
1066 MetricDetailLevel::Backend,
1067 Duration::from_secs(120),
1068 PeerBinding::default(),
1069 ),
1070 LeaseApplyOutcome::Applied { .. }
1071 ));
1072 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1073 }
1074
1075 #[test]
1076 fn lease_apply_rejects_ttl_over_max() {
1077 let mut agg = Aggregator::new("sozu".to_owned());
1079 assert_eq!(
1080 agg.lease_apply(
1081 "client:0".to_owned(),
1082 MetricDetailLevel::Backend,
1083 LEASE_TTL_MAX + Duration::from_secs(1),
1084 PeerBinding::default(),
1085 ),
1086 LeaseApplyOutcome::TtlOutOfRange,
1087 );
1088 assert_eq!(agg.lease_count(), 0);
1089 }
1090
1091 #[test]
1092 fn lease_apply_renewal_replaces_previous_for_same_client() {
1093 let mut agg = Aggregator::new("sozu".to_owned());
1097 let _ = agg.lease_apply(
1098 "renewer".to_owned(),
1099 MetricDetailLevel::Backend,
1100 Duration::from_secs(30),
1101 PeerBinding::default(),
1102 );
1103 let _ = agg.lease_apply(
1104 "renewer".to_owned(),
1105 MetricDetailLevel::Backend,
1106 Duration::from_secs(60),
1107 PeerBinding::default(),
1108 );
1109 assert_eq!(agg.lease_count(), 1);
1110 }
1111
1112 #[test]
1113 fn lease_apply_renewal_rejects_foreign_binding() {
1114 let mut agg = Aggregator::new("sozu".to_owned());
1121 let victim = PeerBinding {
1122 pid: Some(4242),
1123 session_ulid: Some(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210),
1124 };
1125 let outcome = agg.lease_apply(
1126 "topcli".to_owned(),
1127 MetricDetailLevel::Backend,
1128 Duration::from_secs(60),
1129 victim,
1130 );
1131 assert!(
1132 matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1133 "victim's initial apply must succeed"
1134 );
1135 let attacker = PeerBinding {
1136 pid: Some(9999),
1137 session_ulid: Some(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF),
1138 };
1139 let outcome = agg.lease_apply(
1140 "topcli".to_owned(),
1141 MetricDetailLevel::Backend,
1142 Duration::from_secs(60),
1143 attacker,
1144 );
1145 assert_eq!(
1146 outcome,
1147 LeaseApplyOutcome::Unauthorized,
1148 "renewal with a mismatched known binding must be refused"
1149 );
1150 let clear = agg.lease_clear("topcli", victim);
1153 assert!(
1154 matches!(clear, LeaseClearOutcome::Cleared { .. }),
1155 "victim's original binding must still clear cleanly after \
1156 the foreign-binding renewal was refused"
1157 );
1158 }
1159
1160 #[test]
1161 fn lease_apply_renewal_with_matching_binding_succeeds() {
1162 let mut agg = Aggregator::new("sozu".to_owned());
1166 let owner = PeerBinding {
1167 pid: Some(1234),
1168 session_ulid: Some(0xAAAA_BBBB_CCCC_DDDD_EEEE_FFFF_0000_1111),
1169 };
1170 let _ = agg.lease_apply(
1171 "topcli".to_owned(),
1172 MetricDetailLevel::Backend,
1173 Duration::from_secs(30),
1174 owner,
1175 );
1176 let outcome = agg.lease_apply(
1177 "topcli".to_owned(),
1178 MetricDetailLevel::Backend,
1179 Duration::from_secs(60),
1180 owner,
1181 );
1182 assert!(
1183 matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1184 "renewal with matching binding must succeed (otherwise the \
1185 TUI's own renewer thread would be locked out)"
1186 );
1187 }
1188
1189 #[test]
1190 fn lease_apply_max_merge_two_clients() {
1191 let mut agg = Aggregator::new("sozu".to_owned());
1196 agg.set_up_detail(MetricDetailLevel::Process);
1197 let _ = agg.lease_apply(
1198 "scraper".to_owned(),
1199 MetricDetailLevel::Frontend,
1200 Duration::from_secs(60),
1201 PeerBinding::default(),
1202 );
1203 let _ = agg.lease_apply(
1204 "topcli".to_owned(),
1205 MetricDetailLevel::Backend,
1206 Duration::from_secs(60),
1207 PeerBinding::default(),
1208 );
1209 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1210 assert_eq!(agg.lease_count(), 2);
1211 let outcome = agg.lease_clear("topcli", PeerBinding::default());
1213 assert_eq!(
1214 outcome,
1215 LeaseClearOutcome::Cleared {
1216 previous_effective: MetricDetailLevel::Backend,
1217 }
1218 );
1219 assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1220 assert_eq!(agg.lease_count(), 1);
1221 }
1222
1223 #[test]
1224 fn lease_clear_unknown_id_is_silent_noop() {
1225 let mut agg = Aggregator::new("sozu".to_owned());
1227 let _ = agg.lease_apply(
1228 "real".to_owned(),
1229 MetricDetailLevel::Backend,
1230 Duration::from_secs(60),
1231 PeerBinding::default(),
1232 );
1233 assert_eq!(
1234 agg.lease_clear("ghost", PeerBinding::default()),
1235 LeaseClearOutcome::NotFound
1236 );
1237 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1238 assert_eq!(agg.lease_count(), 1);
1239 }
1240
1241 #[test]
1242 fn lease_clear_with_matching_binding_authorised() {
1243 let mut agg = Aggregator::new("sozu".to_owned());
1245 let _ = agg.lease_apply(
1246 "owner-lease".to_owned(),
1247 MetricDetailLevel::Backend,
1248 Duration::from_secs(60),
1249 owner_binding(),
1250 );
1251 let outcome = agg.lease_clear("owner-lease", owner_binding());
1252 assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1253 assert_eq!(agg.lease_count(), 0);
1254 }
1255
1256 #[test]
1257 fn lease_clear_with_mismatched_binding_is_unauthorized() {
1258 let mut agg = Aggregator::new("sozu".to_owned());
1261 let _ = agg.lease_apply(
1262 "owner-lease".to_owned(),
1263 MetricDetailLevel::Backend,
1264 Duration::from_secs(60),
1265 owner_binding(),
1266 );
1267 let outcome = agg.lease_clear("owner-lease", other_binding());
1268 assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1269 assert_eq!(agg.lease_count(), 1);
1270 }
1271
1272 #[test]
1273 fn lease_clear_unknown_apply_binding_accepts_any_clear() {
1274 let mut agg = Aggregator::new("sozu".to_owned());
1276 let _ = agg.lease_apply(
1277 "legacy".to_owned(),
1278 MetricDetailLevel::Backend,
1279 Duration::from_secs(60),
1280 PeerBinding::default(),
1281 );
1282 let outcome = agg.lease_clear("legacy", owner_binding());
1283 assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1284 assert_eq!(agg.lease_count(), 0);
1285 }
1286
1287 #[test]
1288 fn lease_clear_known_apply_rejects_default_clear() {
1289 let mut agg = Aggregator::new("sozu".to_owned());
1291 let _ = agg.lease_apply(
1292 "owner-lease".to_owned(),
1293 MetricDetailLevel::Backend,
1294 Duration::from_secs(60),
1295 owner_binding(),
1296 );
1297 let outcome = agg.lease_clear("owner-lease", PeerBinding::default());
1298 assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1299 }
1300
1301 #[test]
1302 fn lease_tick_expires_only_past_due_leases() {
1303 let mut agg = Aggregator::new("sozu".to_owned());
1309 agg.set_up_detail(MetricDetailLevel::Process);
1310 let now = Instant::now();
1311 agg.leases.insert(
1313 "expired".to_owned(),
1314 LeaseEntry {
1315 level: MetricDetailLevel::Backend,
1316 expires_at: now - Duration::from_secs(1),
1317 binding: PeerBinding::default(),
1318 },
1319 );
1320 agg.leases.insert(
1321 "live".to_owned(),
1322 LeaseEntry {
1323 level: MetricDetailLevel::Frontend,
1324 expires_at: now + Duration::from_secs(60),
1325 binding: PeerBinding::default(),
1326 },
1327 );
1328 agg.recompute_effective();
1329 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1330 let prev = agg.lease_tick(now);
1331 assert_eq!(prev, Some(MetricDetailLevel::Backend));
1332 assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1333 assert_eq!(agg.lease_count(), 1);
1334 }
1335
1336 #[test]
1337 fn lease_tick_no_change_returns_none() {
1338 let mut agg = Aggregator::new("sozu".to_owned());
1340 assert!(agg.lease_tick(Instant::now()).is_none());
1341 }
1342
1343 #[test]
1344 fn lease_apply_at_max_ttl_succeeds() {
1345 let mut agg = Aggregator::new("sozu".to_owned());
1348 let now = Instant::now();
1349 let outcome = agg.lease_apply(
1350 "max".to_owned(),
1351 MetricDetailLevel::Backend,
1352 LEASE_TTL_MAX,
1353 PeerBinding::default(),
1354 );
1355 assert!(matches!(outcome, LeaseApplyOutcome::Applied { .. }));
1356 let entry = agg.leases.get("max").unwrap();
1357 assert!(entry.expires_at <= now + LEASE_TTL_MAX + Duration::from_millis(50));
1358 }
1359}