1mod local_drain;
10pub mod names;
11mod network_drain;
12mod writer;
13
14use std::{
15 cell::RefCell,
16 collections::{BTreeMap, HashMap},
17 io::{self, Write},
18 net::SocketAddr,
19 str,
20 time::{Duration, Instant},
21};
22
23use mio::net::UdpSocket;
24use sozu_command::config::MetricDetailLevel;
25use sozu_command::proto::command::{
26 FilteredMetrics, MetricsConfiguration, QueryMetricsOptions, ResponseContent,
27};
28
29use crate::metrics::{local_drain::LocalDrain, network_drain::NetworkDrain};
30
31fn filter_labels_for_detail<'a>(
45 detail: MetricDetailLevel,
46 cluster_id: Option<&'a str>,
47 backend_id: Option<&'a str>,
48) -> (Option<&'a str>, Option<&'a str>) {
49 let (out_cluster, out_backend) = match detail {
50 MetricDetailLevel::Process | MetricDetailLevel::Frontend => (None, None),
51 MetricDetailLevel::Cluster => (cluster_id, None),
52 MetricDetailLevel::Backend => (cluster_id, backend_id),
53 };
54 debug_assert!(
59 cluster_id.is_some() || out_cluster.is_none(),
60 "filter must never invent a cluster_id"
61 );
62 debug_assert!(
63 backend_id.is_some() || out_backend.is_none(),
64 "filter must never invent a backend_id"
65 );
66 debug_assert!(
67 out_backend.is_none() || out_cluster.is_some(),
68 "a kept backend label implies a kept cluster label (no orphan backend)"
69 );
70 (out_cluster, out_backend)
71}
72
73pub(crate) fn http_status_code_metric_name(status: u16) -> Option<&'static str> {
86 match status {
87 200 => Some("http.status.200"),
88 201 => Some("http.status.201"),
89 204 => Some("http.status.204"),
90 301 => Some("http.status.301"),
91 302 => Some("http.status.302"),
92 304 => Some("http.status.304"),
93 400 => Some("http.status.400"),
94 401 => Some("http.status.401"),
95 403 => Some("http.status.403"),
96 404 => Some("http.status.404"),
97 408 => Some("http.status.408"),
98 413 => Some("http.status.413"),
99 429 => Some("http.status.429"),
100 500 => Some("http.status.500"),
101 502 => Some("http.status.502"),
102 503 => Some("http.status.503"),
103 504 => Some("http.status.504"),
104 507 => Some("http.status.507"),
105 _ => None,
106 }
107}
108
109thread_local! {
110 pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
111}
112
113#[derive(thiserror::Error, Debug)]
114pub enum MetricError {
115 #[error("Could not parse udp address {address}: {error}")]
116 WrongUdpAddress { address: String, error: String },
117 #[error("Could not bind to udp address {address}: {error}")]
118 UdpBind { address: String, error: String },
119 #[error("No metrics found for object with id {0}")]
120 NoMetrics(String),
121 #[error("Could not create histogram for time metric {time_metric:?}: {error}")]
122 HistogramCreation {
123 time_metric: MetricValue,
124 error: String,
125 },
126 #[error("could not record time metric {time_metric:?}: {error}")]
127 TimeMetricRecordingError {
128 time_metric: MetricValue,
129 error: String,
130 },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MetricValue {
135 Gauge(usize),
136 GaugeAdd(i64),
137 Count(i64),
138 Time(usize),
139}
140
141impl MetricValue {
142 fn is_time(&self) -> bool {
143 matches!(self, &MetricValue::Time(_))
144 }
145
146 fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
147 match (self, m) {
148 (&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
149 let before = *v1;
154 let changed = *v1 != v2;
155 *v1 = v2;
156 debug_assert_eq!(*v1, v2, "gauge set must store exactly the requested value");
157 debug_assert_eq!(
158 changed,
159 before != v2,
160 "`changed` must report whether the gauge actually moved"
161 );
162 changed
163 }
164 (&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
165 let before = *v1;
182 let changed = v2 != 0;
183 let res = before as i64 + v2;
184 *v1 = if res >= 0 {
185 res as usize
186 } else {
187 error!(
188 "metric {} underflow: previous value: {}, adding: {}",
189 key, before, v2
190 );
191 0
192 };
193 debug_assert!(
194 res >= 0 || *v1 == 0,
195 "gauge underflow must clamp to exactly 0 (never wrap)"
196 );
197 debug_assert!(
198 res < 0 || *v1 as i64 == before as i64 + v2,
199 "non-underflow gauge add must equal before + v2 exactly"
200 );
201
202 changed
203 }
204 (&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
205 let before = *v1;
209 let changed = v2 != 0;
210 *v1 += v2;
211 debug_assert_eq!(*v1, before + v2, "count update must advance by exactly v2");
212 changed
213 }
214 (s, m) => {
215 error!(
223 "tried to update metric {} of value {:?} with an incompatible metric: {:?}",
224 key, s, m
225 );
226 false
227 }
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
233pub struct StoredMetricValue {
234 last_sent: Instant,
235 updated: bool,
236 data: MetricValue,
237}
238
239impl StoredMetricValue {
240 pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
241 let data = if let MetricValue::GaugeAdd(v) = data {
247 if v >= 0 {
248 MetricValue::Gauge(v as usize)
249 } else {
250 error!(
251 "stored metric created with negative GaugeAdd({}), clamping to 0",
252 v
253 );
254 MetricValue::Gauge(0)
255 }
256 } else {
257 data
258 };
259 debug_assert!(
263 !matches!(data, MetricValue::GaugeAdd(_)),
264 "GaugeAdd must be normalised to a non-negative Gauge at construction"
265 );
266 StoredMetricValue {
267 last_sent,
268 updated: true,
269 data,
270 }
271 }
272
273 pub fn update(&mut self, key: &'static str, m: MetricValue) {
274 let was_updated = self.updated;
279 let updated = self.data.update(key, m);
280 if !self.updated {
281 self.updated = updated;
282 }
283 debug_assert!(
284 self.updated || (!was_updated && !updated),
285 "`updated` must stay set once latched; a reported change must latch it"
286 );
287 debug_assert!(
288 !was_updated || self.updated,
289 "`updated` is a monotone latch within a drain cycle (never cleared here)"
290 );
291 }
292}
293
294pub fn setup<O: Into<String>>(
295 metrics_host: &SocketAddr,
296 origin: O,
297 use_tagged_metrics: bool,
298 prefix: Option<String>,
299 detail: MetricDetailLevel,
300) -> Result<(), MetricError> {
301 let metrics_socket = udp_bind()?;
302
303 debug!(
304 "setting up metrics: local address = {:#?}",
305 metrics_socket.local_addr()
306 );
307
308 METRICS.with(|metrics| {
309 if let Some(p) = prefix {
310 (*metrics.borrow_mut()).set_up_prefix(p);
311 }
312 (*metrics.borrow_mut()).set_up_remote(metrics_socket, *metrics_host);
313 (*metrics.borrow_mut()).set_up_origin(origin.into());
314 (*metrics.borrow_mut()).set_up_tagged_metrics(use_tagged_metrics);
315 (*metrics.borrow_mut()).set_up_detail(detail);
316 });
317 Ok(())
318}
319
320pub trait Subscriber {
321 fn receive_metric(
322 &mut self,
323 label: &'static str,
324 cluster_id: Option<&str>,
325 backend_id: Option<&str>,
326 metric: MetricValue,
327 );
328}
329
330const LEASE_TICK_INTERVAL: Duration = Duration::from_secs(5);
334
335pub const LEASE_TTL_MAX: Duration = Duration::from_secs(300);
338
339pub const LEASE_TTL_DEFAULT: Duration = Duration::from_secs(60);
342
343pub const LEASE_TABLE_CAP: usize = 64;
352
353pub const LEASE_CLIENT_ID_MAX_BYTES: usize = 64;
358
359#[derive(Clone, Copy, Debug, PartialEq, Eq)]
364pub enum LeaseApplyOutcome {
365 Applied {
367 previous_effective: MetricDetailLevel,
368 new_effective: MetricDetailLevel,
369 },
370 ClientIdTooLong,
372 TableFull,
376 TtlOutOfRange,
383 Unauthorized,
393}
394
395#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
406pub struct PeerBinding {
407 pub pid: Option<i32>,
408 pub session_ulid: Option<u128>,
412}
413
414impl PeerBinding {
415 pub fn is_known(&self) -> bool {
420 self.pid.is_some() && self.session_ulid.is_some()
421 }
422
423 pub fn matches(&self, other: &PeerBinding) -> bool {
427 self.is_known() && self.pid == other.pid && self.session_ulid == other.session_ulid
428 }
429}
430
431#[derive(Clone, Copy, Debug)]
435struct LeaseEntry {
436 level: MetricDetailLevel,
437 expires_at: Instant,
438 binding: PeerBinding,
439}
440
441#[derive(Clone, Copy, Debug, PartialEq, Eq)]
443pub enum LeaseClearOutcome {
444 Cleared {
449 previous_effective: MetricDetailLevel,
450 },
451 NotFound,
453 Unauthorized,
458}
459
460pub struct Aggregator {
461 prefix: String,
463 network: Option<NetworkDrain>,
465 local: LocalDrain,
467 configured: MetricDetailLevel,
471 effective: MetricDetailLevel,
475 leases: HashMap<String, LeaseEntry>,
480 last_lease_tick: Instant,
484}
485
486impl Aggregator {
487 pub fn new(prefix: String) -> Aggregator {
488 let default_detail = MetricDetailLevel::default();
489 Aggregator {
490 prefix: prefix.clone(),
491 network: None,
492 local: LocalDrain::new(prefix),
493 configured: default_detail,
494 effective: default_detail,
495 leases: HashMap::new(),
496 last_lease_tick: Instant::now(),
497 }
498 }
499
500 pub fn set_up_prefix(&mut self, prefix: String) {
501 self.prefix = prefix;
502 }
503
504 pub fn set_up_remote(&mut self, socket: UdpSocket, addr: SocketAddr) {
505 self.network = Some(NetworkDrain::new(self.prefix.clone(), socket, addr));
506 }
507
508 pub fn set_up_origin(&mut self, origin: String) {
509 if let Some(n) = self.network.as_mut() {
510 n.origin = origin;
511 }
512 }
513
514 pub fn set_up_tagged_metrics(&mut self, tagged: bool) {
515 if let Some(n) = self.network.as_mut() {
516 n.use_tagged_metrics = tagged;
517 }
518 }
519
520 pub fn set_up_detail(&mut self, detail: MetricDetailLevel) {
528 self.configured = detail;
529 self.recompute_effective();
530 debug_assert!(
533 self.effective >= self.configured,
534 "effective must dominate the freshly-set configured floor"
535 );
536 #[cfg(debug_assertions)]
537 self.check_lease_invariants();
538 }
539
540 pub fn detail_configured(&self) -> MetricDetailLevel {
543 self.configured
544 }
545
546 pub fn detail_effective(&self) -> MetricDetailLevel {
549 self.effective
550 }
551
552 pub fn lease_apply(
570 &mut self,
571 client_id: String,
572 level: MetricDetailLevel,
573 ttl: Duration,
574 binding: PeerBinding,
575 ) -> LeaseApplyOutcome {
576 if client_id.len() > LEASE_CLIENT_ID_MAX_BYTES {
577 return LeaseApplyOutcome::ClientIdTooLong;
578 }
579 if ttl > LEASE_TTL_MAX {
580 return LeaseApplyOutcome::TtlOutOfRange;
581 }
582 let is_renewal = self.leases.contains_key(&client_id);
588 if !is_renewal && self.leases.len() >= LEASE_TABLE_CAP {
589 return LeaseApplyOutcome::TableFull;
590 }
591 if is_renewal
602 && let Some(entry) = self.leases.get(&client_id)
603 && entry.binding.is_known()
604 && !entry.binding.matches(&binding)
605 {
606 return LeaseApplyOutcome::Unauthorized;
607 }
608 let expires_at = Instant::now() + ttl;
609 let before_len = self.leases.len();
610 self.leases.insert(
611 client_id,
612 LeaseEntry {
613 level,
614 expires_at,
615 binding,
616 },
617 );
618 debug_assert_eq!(
624 self.leases.len(),
625 before_len + (!is_renewal) as usize,
626 "lease_apply grows the table by exactly one iff this is a fresh insert"
627 );
628 debug_assert!(
629 self.leases.len() <= LEASE_TABLE_CAP,
630 "lease table must never exceed LEASE_TABLE_CAP after an accepted apply"
631 );
632 let previous_effective = self.effective;
633 self.recompute_effective();
634 debug_assert!(
637 self.effective >= previous_effective,
638 "lease_apply must not lower the effective detail level"
639 );
640 debug_assert!(
641 self.effective >= self.configured,
642 "effective must never drop below the configured floor"
643 );
644 #[cfg(debug_assertions)]
645 self.check_lease_invariants();
646 LeaseApplyOutcome::Applied {
647 previous_effective,
648 new_effective: self.effective,
649 }
650 }
651
652 pub fn lease_clear(&mut self, client_id: &str, presented: PeerBinding) -> LeaseClearOutcome {
660 let Some(entry) = self.leases.get(client_id) else {
661 return LeaseClearOutcome::NotFound;
662 };
663 if entry.binding.is_known() && !entry.binding.matches(&presented) {
669 return LeaseClearOutcome::Unauthorized;
670 }
671 let before_len = self.leases.len();
674 self.leases.remove(client_id);
675 debug_assert!(
676 !self.leases.contains_key(client_id),
677 "lease_clear must evict the cleared client_id"
678 );
679 debug_assert_eq!(
680 self.leases.len(),
681 before_len - 1,
682 "lease_clear removes exactly one entry on the authorised path"
683 );
684 let previous = self.effective;
685 self.recompute_effective();
686 debug_assert!(
689 self.effective <= previous,
690 "lease_clear must not raise the effective detail level"
691 );
692 debug_assert!(
693 self.effective >= self.configured,
694 "effective must never drop below the configured floor"
695 );
696 #[cfg(debug_assertions)]
697 self.check_lease_invariants();
698 LeaseClearOutcome::Cleared {
699 previous_effective: previous,
700 }
701 }
702
703 pub fn lease_count(&self) -> u32 {
707 self.leases.len() as u32
708 }
709
710 pub fn lease_tick(&mut self, now: Instant) -> Option<MetricDetailLevel> {
719 self.last_lease_tick = now;
720 let before = self.leases.len();
721 self.leases.retain(|_, entry| entry.expires_at > now);
722 debug_assert!(
726 self.leases.len() <= before,
727 "lease_tick (expiry) can only shrink the table, never grow it"
728 );
729 debug_assert!(
730 self.leases.values().all(|entry| entry.expires_at > now),
731 "every surviving lease must expire strictly after `now`"
732 );
733 if self.leases.len() == before {
734 return None;
735 }
736 let previous = self.effective;
737 self.recompute_effective();
738 debug_assert!(
740 self.effective <= previous,
741 "lease expiry must not raise the effective detail level"
742 );
743 debug_assert!(
744 self.effective >= self.configured,
745 "effective must never drop below the configured floor"
746 );
747 #[cfg(debug_assertions)]
748 self.check_lease_invariants();
749 if previous != self.effective {
750 Some(previous)
751 } else {
752 None
753 }
754 }
755
756 pub fn lease_tick_due(&self, now: Instant) -> bool {
760 now.duration_since(self.last_lease_tick) >= LEASE_TICK_INTERVAL
761 }
762
763 fn recompute_effective(&mut self) {
767 let mut max_lease = self.configured;
768 for entry in self.leases.values() {
769 if entry.level > max_lease {
770 max_lease = entry.level;
771 }
772 }
773 self.effective = max_lease;
774 debug_assert!(
778 self.effective >= self.configured,
779 "effective is bounded below by the configured floor"
780 );
781 debug_assert!(
782 self.leases.values().all(|e| self.effective >= e.level),
783 "effective dominates every active lease level"
784 );
785 debug_assert!(
786 self.effective == self.configured
787 || self.leases.values().any(|e| e.level == self.effective),
788 "effective must equal the configured floor or one active lease level"
789 );
790 }
791
792 #[cfg(debug_assertions)]
797 fn check_lease_invariants(&self) {
798 debug_assert!(
799 self.leases.len() <= LEASE_TABLE_CAP,
800 "lease table size must stay within LEASE_TABLE_CAP"
801 );
802 debug_assert!(
803 self.leases
804 .keys()
805 .all(|id| id.len() <= LEASE_CLIENT_ID_MAX_BYTES),
806 "every stored client_id must respect LEASE_CLIENT_ID_MAX_BYTES"
807 );
808 debug_assert!(
809 self.effective >= self.configured,
810 "effective is at least the configured floor"
811 );
812 debug_assert!(
813 self.leases.values().all(|e| self.effective >= e.level),
814 "effective dominates every active lease level"
815 );
816 debug_assert!(
817 self.effective == self.configured
818 || self.leases.values().any(|e| e.level == self.effective),
819 "effective equals the configured floor or some active lease level"
820 );
821 }
822
823 pub fn socket(&self) -> Option<&UdpSocket> {
824 self.network.as_ref().map(|n| &n.remote.get_ref().socket)
825 }
826
827 pub fn socket_mut(&mut self) -> Option<&mut UdpSocket> {
828 self.network
829 .as_mut()
830 .map(|n| &mut n.remote.get_mut().socket)
831 }
832
833 pub fn count_add(&mut self, key: &'static str, count_value: i64) {
834 self.receive_metric(key, None, None, MetricValue::Count(count_value));
835 }
836
837 pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
838 self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
839 }
840
841 pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
842 self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
843 }
844
845 pub fn writable(&mut self) {
846 if let Some(ref mut net) = self.network.as_mut() {
847 net.writable();
848 }
849 }
850
851 pub fn send_data(&mut self) {
852 if let Some(ref mut net) = self.network.as_mut() {
853 net.send_metrics();
854 }
855 }
856
857 pub fn dump_local_proxy_metrics(&mut self) -> BTreeMap<String, FilteredMetrics> {
858 self.local.dump_proxy_metrics(&Vec::new())
859 }
860
861 pub fn query(&mut self, q: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
862 self.local.query(q)
863 }
864
865 pub fn clear_local(&mut self) {
866 if let Some(ref mut net) = self.network.as_mut() {
867 net.clear();
868 }
869 self.local.clear();
870 }
871
872 pub fn configure(&mut self, config: &MetricsConfiguration) {
873 self.local.configure(config);
874 }
875
876 pub fn remove_cluster(&mut self, cluster_id: &str) {
889 if let Some(ref mut net) = self.network.as_mut() {
890 net.remove_cluster(cluster_id);
891 }
892 self.local.remove_cluster(cluster_id);
893 }
894
895 pub fn add_cluster(&mut self, cluster_id: &str) {
901 if let Some(ref mut net) = self.network.as_mut() {
902 net.add_cluster(cluster_id);
903 }
904 self.local.add_cluster(cluster_id);
905 }
906
907 pub fn remove_backend(&mut self, cluster_id: &str, backend_id: &str) {
911 if let Some(ref mut net) = self.network.as_mut() {
912 net.remove_backend(cluster_id, backend_id);
913 }
914 self.local.remove_backend(cluster_id, backend_id);
915 }
916}
917
918impl Subscriber for Aggregator {
919 fn receive_metric(
920 &mut self,
921 label: &'static str,
922 cluster_id: Option<&str>,
923 backend_id: Option<&str>,
924 metric: MetricValue,
925 ) {
926 let (cluster_id, backend_id) =
932 filter_labels_for_detail(self.effective, cluster_id, backend_id);
933 if let Some(ref mut net) = self.network.as_mut() {
934 net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
935 }
936 self.local
937 .receive_metric(label, cluster_id, backend_id, metric);
938 }
939}
940
941pub struct MetricSocket {
942 pub addr: SocketAddr,
943 pub socket: UdpSocket,
944}
945
946impl Write for MetricSocket {
947 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
948 self.socket.send_to(buf, self.addr)
949 }
950
951 fn flush(&mut self) -> io::Result<()> {
952 Ok(())
953 }
954}
955
956pub fn udp_bind() -> Result<UdpSocket, MetricError> {
957 let address = "0.0.0.0:0";
958
959 let udp_address =
960 address
961 .parse::<SocketAddr>()
962 .map_err(|parse_error| MetricError::WrongUdpAddress {
963 address: address.to_owned(),
964 error: parse_error.to_string(),
965 })?;
966
967 UdpSocket::bind(udp_address).map_err(|parse_error| MetricError::UdpBind {
968 address: udp_address.to_string(),
969 error: parse_error.to_string(),
970 })
971}
972
973#[macro_export]
975macro_rules! count (
976 ($key:expr, $value: expr) => ({
977 let v = $value;
978 $crate::metrics::METRICS.with(|metrics| {
979 (*metrics.borrow_mut()).count_add($key, v);
980 });
981 })
982);
983
984#[macro_export]
986macro_rules! incr (
987 ($key:expr) => (count!($key, 1));
988 ($key:expr, $cluster_id:expr, $backend_id:expr) => {
989 {
990 use $crate::metrics::Subscriber;
991
992 $crate::metrics::METRICS.with(|metrics| {
993 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
994 });
995 }
996 }
997);
998
999#[macro_export]
1000macro_rules! decr (
1001 ($key:expr) => (count!($key, -1))
1002);
1003
1004#[macro_export]
1005macro_rules! gauge (
1006 ($key:expr, $value: expr) => ({
1007 let v = $value;
1008 $crate::metrics::METRICS.with(|metrics| {
1009 (*metrics.borrow_mut()).set_gauge($key, v);
1010 });
1011 });
1012 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
1013 {
1014 use $crate::metrics::Subscriber;
1015 let v = $value;
1016
1017 $crate::metrics::METRICS.with(|metrics| {
1018 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Gauge(v as usize));
1019 });
1020 }
1021 }
1022);
1023
1024#[macro_export]
1025macro_rules! gauge_add (
1026 ($key:expr, $value: expr) => ({
1027 let v = $value;
1028 $crate::metrics::METRICS.with(|metrics| {
1029 (*metrics.borrow_mut()).gauge_add($key, v);
1030 });
1031 });
1032 ($key:expr, $value:expr, $cluster_id:expr, $backend_id:expr) => {
1033 {
1034 use $crate::metrics::Subscriber;
1035 let v = $value;
1036
1037 $crate::metrics::METRICS.with(|metrics| {
1038 (*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
1039 });
1040 }
1041 }
1042);
1043
1044#[macro_export]
1045macro_rules! time (
1046 ($key:expr, $value: expr) => ({
1047 use $crate::metrics::{MetricValue,Subscriber};
1048 let v = $value;
1049 $crate::metrics::METRICS.with(|metrics| {
1050 let m = &mut *metrics.borrow_mut();
1051
1052 m.receive_metric($key, None, None, MetricValue::Time(v as usize));
1053 });
1054 });
1055 ($key:expr, $cluster_id:expr, $value: expr) => ({
1056 use $crate::metrics::{MetricValue,Subscriber};
1057 let v = $value;
1058 $crate::metrics::METRICS.with(|metrics| {
1059 let m = &mut *metrics.borrow_mut();
1060 let cluster: &str = $cluster_id;
1061
1062 m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
1063 });
1064 })
1065);
1066
1067#[macro_export]
1068macro_rules! record_backend_metrics (
1069 ($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
1070 use $crate::metrics::{MetricValue,Subscriber};
1071 $crate::metrics::METRICS.with(|metrics| {
1072 let m = &mut *metrics.borrow_mut();
1073 let cluster_id: &str = $cluster_id;
1074 let backend_id: &str = $backend_id;
1075
1076 m.receive_metric($crate::metrics::names::backend::BYTES_IN, Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
1077 m.receive_metric($crate::metrics::names::backend::BYTES_OUT, Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
1078 m.receive_metric($crate::metrics::names::backend::RESPONSE_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
1079 if let Some(t) = $backend_connection_time {
1080 m.receive_metric($crate::metrics::names::backend::CONNECTION_TIME, Some(cluster_id), Some(backend_id), MetricValue::Time(t.as_millis() as usize));
1081 }
1082
1083 m.receive_metric($crate::metrics::names::backend::REQUESTS, Some(cluster_id), Some(backend_id), MetricValue::Count(1));
1084 });
1085 }
1086);
1087
1088#[cfg(test)]
1089mod tests {
1090 use super::*;
1091
1092 #[test]
1093 fn filter_labels_process_drops_both() {
1094 assert_eq!(
1095 filter_labels_for_detail(MetricDetailLevel::Process, Some("c"), Some("b")),
1096 (None, None),
1097 );
1098 }
1099
1100 #[test]
1101 fn filter_labels_frontend_drops_both_today() {
1102 assert_eq!(
1104 filter_labels_for_detail(MetricDetailLevel::Frontend, Some("c"), Some("b")),
1105 (None, None),
1106 );
1107 }
1108
1109 #[test]
1110 fn filter_labels_cluster_keeps_cluster_drops_backend() {
1111 assert_eq!(
1112 filter_labels_for_detail(MetricDetailLevel::Cluster, Some("c"), Some("b")),
1113 (Some("c"), None),
1114 );
1115 }
1116
1117 #[test]
1118 fn filter_labels_backend_keeps_both() {
1119 assert_eq!(
1120 filter_labels_for_detail(MetricDetailLevel::Backend, Some("c"), Some("b")),
1121 (Some("c"), Some("b")),
1122 );
1123 }
1124
1125 #[test]
1126 fn filter_labels_none_in_none_out() {
1127 for detail in [
1130 MetricDetailLevel::Process,
1131 MetricDetailLevel::Frontend,
1132 MetricDetailLevel::Cluster,
1133 MetricDetailLevel::Backend,
1134 ] {
1135 assert_eq!(filter_labels_for_detail(detail, None, None), (None, None));
1136 }
1137 }
1138
1139 #[test]
1140 fn aggregator_default_detail_is_cluster() {
1141 let agg = Aggregator::new("sozu".to_owned());
1144 assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
1145 assert_eq!(agg.detail_effective(), MetricDetailLevel::Cluster);
1146 assert_eq!(agg.lease_count(), 0);
1147 }
1148
1149 fn owner_binding() -> PeerBinding {
1153 PeerBinding {
1154 pid: Some(1234),
1155 session_ulid: Some(0x0123_4567_89ab_cdef_0123_4567_89ab_cdefu128),
1156 }
1157 }
1158
1159 fn other_binding() -> PeerBinding {
1160 PeerBinding {
1161 pid: Some(5678),
1162 session_ulid: Some(0xfedc_ba98_7654_3210_fedc_ba98_7654_3210u128),
1163 }
1164 }
1165
1166 fn unwrap_applied(outcome: LeaseApplyOutcome) -> (MetricDetailLevel, MetricDetailLevel) {
1170 match outcome {
1171 LeaseApplyOutcome::Applied {
1172 previous_effective,
1173 new_effective,
1174 } => (previous_effective, new_effective),
1175 other => panic!("expected LeaseApplyOutcome::Applied, got {other:?}"),
1176 }
1177 }
1178
1179 #[test]
1180 fn lease_apply_elevates_effective_above_configured() {
1181 let mut agg = Aggregator::new("sozu".to_owned());
1184 agg.set_up_detail(MetricDetailLevel::Cluster);
1185 let (prev, new) = unwrap_applied(agg.lease_apply(
1186 "test:1".to_owned(),
1187 MetricDetailLevel::Backend,
1188 Duration::from_secs(60),
1189 PeerBinding::default(),
1190 ));
1191 assert_eq!(prev, MetricDetailLevel::Cluster);
1192 assert_eq!(new, MetricDetailLevel::Backend);
1193 assert_eq!(agg.detail_configured(), MetricDetailLevel::Cluster);
1194 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1195 assert_eq!(agg.lease_count(), 1);
1196 }
1197
1198 #[test]
1199 fn lease_apply_below_configured_does_not_lower_effective() {
1200 let mut agg = Aggregator::new("sozu".to_owned());
1202 agg.set_up_detail(MetricDetailLevel::Backend);
1203 let (prev, new) = unwrap_applied(agg.lease_apply(
1204 "test:1".to_owned(),
1205 MetricDetailLevel::Cluster,
1206 Duration::from_secs(60),
1207 PeerBinding::default(),
1208 ));
1209 assert_eq!(prev, MetricDetailLevel::Backend);
1210 assert_eq!(new, MetricDetailLevel::Backend);
1211 }
1212
1213 #[test]
1214 fn lease_apply_rejects_client_id_over_cap() {
1215 let mut agg = Aggregator::new("sozu".to_owned());
1218 let too_long = "x".repeat(LEASE_CLIENT_ID_MAX_BYTES + 1);
1219 assert_eq!(
1220 agg.lease_apply(
1221 too_long,
1222 MetricDetailLevel::Backend,
1223 Duration::from_secs(60),
1224 PeerBinding::default(),
1225 ),
1226 LeaseApplyOutcome::ClientIdTooLong
1227 );
1228 assert_eq!(agg.lease_count(), 0);
1229 }
1230
1231 #[test]
1232 fn lease_apply_rejects_when_table_is_full() {
1233 let mut agg = Aggregator::new("sozu".to_owned());
1237 for i in 0..LEASE_TABLE_CAP {
1238 assert!(matches!(
1239 agg.lease_apply(
1240 format!("client:{i:02}"),
1241 MetricDetailLevel::Backend,
1242 Duration::from_secs(60),
1243 PeerBinding::default(),
1244 ),
1245 LeaseApplyOutcome::Applied { .. }
1246 ));
1247 }
1248 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1249 assert_eq!(
1251 agg.lease_apply(
1252 "newcomer".to_owned(),
1253 MetricDetailLevel::Backend,
1254 Duration::from_secs(60),
1255 PeerBinding::default(),
1256 ),
1257 LeaseApplyOutcome::TableFull,
1258 );
1259 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1260 assert!(matches!(
1262 agg.lease_apply(
1263 "client:00".to_owned(),
1264 MetricDetailLevel::Backend,
1265 Duration::from_secs(120),
1266 PeerBinding::default(),
1267 ),
1268 LeaseApplyOutcome::Applied { .. }
1269 ));
1270 assert_eq!(agg.lease_count() as usize, LEASE_TABLE_CAP);
1271 }
1272
1273 #[test]
1274 fn lease_apply_rejects_ttl_over_max() {
1275 let mut agg = Aggregator::new("sozu".to_owned());
1277 assert_eq!(
1278 agg.lease_apply(
1279 "client:0".to_owned(),
1280 MetricDetailLevel::Backend,
1281 LEASE_TTL_MAX + Duration::from_secs(1),
1282 PeerBinding::default(),
1283 ),
1284 LeaseApplyOutcome::TtlOutOfRange,
1285 );
1286 assert_eq!(agg.lease_count(), 0);
1287 }
1288
1289 #[test]
1290 fn lease_apply_renewal_replaces_previous_for_same_client() {
1291 let mut agg = Aggregator::new("sozu".to_owned());
1295 let _ = agg.lease_apply(
1296 "renewer".to_owned(),
1297 MetricDetailLevel::Backend,
1298 Duration::from_secs(30),
1299 PeerBinding::default(),
1300 );
1301 let _ = agg.lease_apply(
1302 "renewer".to_owned(),
1303 MetricDetailLevel::Backend,
1304 Duration::from_secs(60),
1305 PeerBinding::default(),
1306 );
1307 assert_eq!(agg.lease_count(), 1);
1308 }
1309
1310 #[test]
1311 fn lease_apply_renewal_rejects_foreign_binding() {
1312 let mut agg = Aggregator::new("sozu".to_owned());
1319 let victim = PeerBinding {
1320 pid: Some(4242),
1321 session_ulid: Some(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210),
1322 };
1323 let outcome = agg.lease_apply(
1324 "topcli".to_owned(),
1325 MetricDetailLevel::Backend,
1326 Duration::from_secs(60),
1327 victim,
1328 );
1329 assert!(
1330 matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1331 "victim's initial apply must succeed"
1332 );
1333 let attacker = PeerBinding {
1334 pid: Some(9999),
1335 session_ulid: Some(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF),
1336 };
1337 let outcome = agg.lease_apply(
1338 "topcli".to_owned(),
1339 MetricDetailLevel::Backend,
1340 Duration::from_secs(60),
1341 attacker,
1342 );
1343 assert_eq!(
1344 outcome,
1345 LeaseApplyOutcome::Unauthorized,
1346 "renewal with a mismatched known binding must be refused"
1347 );
1348 let clear = agg.lease_clear("topcli", victim);
1351 assert!(
1352 matches!(clear, LeaseClearOutcome::Cleared { .. }),
1353 "victim's original binding must still clear cleanly after \
1354 the foreign-binding renewal was refused"
1355 );
1356 }
1357
1358 #[test]
1359 fn lease_apply_renewal_with_matching_binding_succeeds() {
1360 let mut agg = Aggregator::new("sozu".to_owned());
1364 let owner = PeerBinding {
1365 pid: Some(1234),
1366 session_ulid: Some(0xAAAA_BBBB_CCCC_DDDD_EEEE_FFFF_0000_1111),
1367 };
1368 let _ = agg.lease_apply(
1369 "topcli".to_owned(),
1370 MetricDetailLevel::Backend,
1371 Duration::from_secs(30),
1372 owner,
1373 );
1374 let outcome = agg.lease_apply(
1375 "topcli".to_owned(),
1376 MetricDetailLevel::Backend,
1377 Duration::from_secs(60),
1378 owner,
1379 );
1380 assert!(
1381 matches!(outcome, LeaseApplyOutcome::Applied { .. }),
1382 "renewal with matching binding must succeed (otherwise the \
1383 TUI's own renewer thread would be locked out)"
1384 );
1385 }
1386
1387 #[test]
1388 fn lease_apply_max_merge_two_clients() {
1389 let mut agg = Aggregator::new("sozu".to_owned());
1394 agg.set_up_detail(MetricDetailLevel::Process);
1395 let _ = agg.lease_apply(
1396 "scraper".to_owned(),
1397 MetricDetailLevel::Frontend,
1398 Duration::from_secs(60),
1399 PeerBinding::default(),
1400 );
1401 let _ = agg.lease_apply(
1402 "topcli".to_owned(),
1403 MetricDetailLevel::Backend,
1404 Duration::from_secs(60),
1405 PeerBinding::default(),
1406 );
1407 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1408 assert_eq!(agg.lease_count(), 2);
1409 let outcome = agg.lease_clear("topcli", PeerBinding::default());
1411 assert_eq!(
1412 outcome,
1413 LeaseClearOutcome::Cleared {
1414 previous_effective: MetricDetailLevel::Backend,
1415 }
1416 );
1417 assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1418 assert_eq!(agg.lease_count(), 1);
1419 }
1420
1421 #[test]
1422 fn lease_clear_unknown_id_is_silent_noop() {
1423 let mut agg = Aggregator::new("sozu".to_owned());
1425 let _ = agg.lease_apply(
1426 "real".to_owned(),
1427 MetricDetailLevel::Backend,
1428 Duration::from_secs(60),
1429 PeerBinding::default(),
1430 );
1431 assert_eq!(
1432 agg.lease_clear("ghost", PeerBinding::default()),
1433 LeaseClearOutcome::NotFound
1434 );
1435 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1436 assert_eq!(agg.lease_count(), 1);
1437 }
1438
1439 #[test]
1440 fn lease_clear_with_matching_binding_authorised() {
1441 let mut agg = Aggregator::new("sozu".to_owned());
1443 let _ = agg.lease_apply(
1444 "owner-lease".to_owned(),
1445 MetricDetailLevel::Backend,
1446 Duration::from_secs(60),
1447 owner_binding(),
1448 );
1449 let outcome = agg.lease_clear("owner-lease", owner_binding());
1450 assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1451 assert_eq!(agg.lease_count(), 0);
1452 }
1453
1454 #[test]
1455 fn lease_clear_with_mismatched_binding_is_unauthorized() {
1456 let mut agg = Aggregator::new("sozu".to_owned());
1459 let _ = agg.lease_apply(
1460 "owner-lease".to_owned(),
1461 MetricDetailLevel::Backend,
1462 Duration::from_secs(60),
1463 owner_binding(),
1464 );
1465 let outcome = agg.lease_clear("owner-lease", other_binding());
1466 assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1467 assert_eq!(agg.lease_count(), 1);
1468 }
1469
1470 #[test]
1471 fn lease_clear_unknown_apply_binding_accepts_any_clear() {
1472 let mut agg = Aggregator::new("sozu".to_owned());
1474 let _ = agg.lease_apply(
1475 "legacy".to_owned(),
1476 MetricDetailLevel::Backend,
1477 Duration::from_secs(60),
1478 PeerBinding::default(),
1479 );
1480 let outcome = agg.lease_clear("legacy", owner_binding());
1481 assert!(matches!(outcome, LeaseClearOutcome::Cleared { .. }));
1482 assert_eq!(agg.lease_count(), 0);
1483 }
1484
1485 #[test]
1486 fn lease_clear_known_apply_rejects_default_clear() {
1487 let mut agg = Aggregator::new("sozu".to_owned());
1489 let _ = agg.lease_apply(
1490 "owner-lease".to_owned(),
1491 MetricDetailLevel::Backend,
1492 Duration::from_secs(60),
1493 owner_binding(),
1494 );
1495 let outcome = agg.lease_clear("owner-lease", PeerBinding::default());
1496 assert_eq!(outcome, LeaseClearOutcome::Unauthorized);
1497 }
1498
1499 #[test]
1500 fn lease_tick_expires_only_past_due_leases() {
1501 let mut agg = Aggregator::new("sozu".to_owned());
1507 agg.set_up_detail(MetricDetailLevel::Process);
1508 let now = Instant::now();
1509 agg.leases.insert(
1511 "expired".to_owned(),
1512 LeaseEntry {
1513 level: MetricDetailLevel::Backend,
1514 expires_at: now - Duration::from_secs(1),
1515 binding: PeerBinding::default(),
1516 },
1517 );
1518 agg.leases.insert(
1519 "live".to_owned(),
1520 LeaseEntry {
1521 level: MetricDetailLevel::Frontend,
1522 expires_at: now + Duration::from_secs(60),
1523 binding: PeerBinding::default(),
1524 },
1525 );
1526 agg.recompute_effective();
1527 assert_eq!(agg.detail_effective(), MetricDetailLevel::Backend);
1528 let prev = agg.lease_tick(now);
1529 assert_eq!(prev, Some(MetricDetailLevel::Backend));
1530 assert_eq!(agg.detail_effective(), MetricDetailLevel::Frontend);
1531 assert_eq!(agg.lease_count(), 1);
1532 }
1533
1534 #[test]
1535 fn lease_tick_no_change_returns_none() {
1536 let mut agg = Aggregator::new("sozu".to_owned());
1538 assert!(agg.lease_tick(Instant::now()).is_none());
1539 }
1540
1541 #[test]
1542 fn lease_apply_at_max_ttl_succeeds() {
1543 let mut agg = Aggregator::new("sozu".to_owned());
1546 let now = Instant::now();
1547 let outcome = agg.lease_apply(
1548 "max".to_owned(),
1549 MetricDetailLevel::Backend,
1550 LEASE_TTL_MAX,
1551 PeerBinding::default(),
1552 );
1553 assert!(matches!(outcome, LeaseApplyOutcome::Applied { .. }));
1554 let entry = agg.leases.get("max").unwrap();
1555 assert!(entry.expires_at <= now + LEASE_TTL_MAX + Duration::from_millis(50));
1556 }
1557}