1use std::{
121 collections::{BTreeMap, HashMap},
122 sync::{
123 Arc, Weak,
124 atomic::{AtomicU64, Ordering},
125 },
126 time::Duration,
127};
128
129use serde::Serialize;
130use web_async::{Lock, spawn};
131
132use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
133
134#[derive(Default, Debug)]
144#[non_exhaustive]
145pub struct Counters {
146 pub announced: AtomicU64,
147 pub announced_closed: AtomicU64,
148 pub subscriptions: AtomicU64,
149 pub subscriptions_closed: AtomicU64,
150 pub broadcasts: AtomicU64,
151 pub broadcasts_closed: AtomicU64,
152 pub bytes: AtomicU64,
153 pub frames: AtomicU64,
154 pub groups: AtomicU64,
155}
156
157impl Counters {
158 fn snapshot(&self) -> RawCounts {
166 let announced_closed = self.announced_closed.load(Ordering::Acquire);
167 let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
168 let broadcasts_closed = self.broadcasts_closed.load(Ordering::Acquire);
169 let announced = self.announced.load(Ordering::Relaxed);
170 let subscriptions = self.subscriptions.load(Ordering::Relaxed);
171 let broadcasts = self.broadcasts.load(Ordering::Relaxed);
172 let bytes = self.bytes.load(Ordering::Relaxed);
173 let frames = self.frames.load(Ordering::Relaxed);
174 let groups = self.groups.load(Ordering::Relaxed);
175 RawCounts {
176 announced,
177 announced_closed,
178 broadcasts,
179 broadcasts_closed,
180 subscriptions,
181 subscriptions_closed,
182 bytes,
183 frames,
184 groups,
185 }
186 }
187}
188
189#[derive(Default, Debug)]
193struct SessionCounters {
194 sessions: AtomicU64,
195 sessions_closed: AtomicU64,
196}
197
198impl SessionCounters {
199 fn snapshot(&self) -> (u64, u64) {
203 let closed = self.sessions_closed.load(Ordering::Acquire);
204 let open = self.sessions.load(Ordering::Relaxed);
205 (open, closed)
206 }
207}
208
209#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
211struct RawCounts {
212 announced: u64,
213 announced_closed: u64,
214 broadcasts: u64,
215 broadcasts_closed: u64,
216 subscriptions: u64,
217 subscriptions_closed: u64,
218 bytes: u64,
219 frames: u64,
220 groups: u64,
221}
222
223#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228pub enum Tier {
229 External,
230 Internal,
231}
232
233impl Tier {
234 fn idx(self) -> usize {
235 match self {
236 Tier::External => 0,
237 Tier::Internal => 1,
238 }
239 }
240}
241
242#[derive(Clone)]
256#[non_exhaustive]
257pub struct StatsConfig {
258 pub origin: Option<OriginProducer>,
261 pub prefix: PathOwned,
265 pub node: Option<PathOwned>,
271 pub interval: Duration,
273}
274
275impl StatsConfig {
276 pub fn new() -> Self {
280 Self {
281 origin: None,
282 prefix: PathOwned::from(".stats"),
283 node: None,
284 interval: Duration::from_secs(1),
285 }
286 }
287
288 pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
291 self.origin = origin.into();
292 self
293 }
294
295 pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
297 self.prefix = prefix.into();
298 self
299 }
300
301 pub fn with_interval(mut self, interval: Duration) -> Self {
303 self.interval = interval;
304 self
305 }
306
307 pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
309 self.node = node.into();
310 self
311 }
312}
313
314impl Default for StatsConfig {
315 fn default() -> Self {
316 Self::new()
317 }
318}
319
320#[derive(Clone)]
324pub struct Stats {
325 prefix: PathOwned,
326 shared: Option<Arc<StatsShared>>,
329}
330
331struct StatsShared {
334 origin: OriginProducer,
335 entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
336 sessions: [Lock<HashMap<PathOwned, Arc<SessionCounters>>>; 2],
340}
341
342struct BroadcastEntry {
347 publisher: [Counters; 2],
348 subscriber: [Counters; 2],
349}
350
351impl BroadcastEntry {
352 fn new() -> Self {
353 Self {
354 publisher: Default::default(),
355 subscriber: Default::default(),
356 }
357 }
358}
359
360#[derive(Default)]
365struct SlotState {
366 prev_emitted: Option<Snapshot>,
369}
370
371#[derive(Default)]
375struct EntrySnapState {
376 publisher: [SlotState; 2],
377 subscriber: [SlotState; 2],
378}
379
380impl EntrySnapState {
381 fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
384 let [pub_ext_state, pub_int_state] = &mut self.publisher;
385 let [sub_ext_state, sub_int_state] = &mut self.subscriber;
386 [
387 ("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
388 (
389 "subscriber.json",
390 &entry.subscriber[Tier::External.idx()],
391 sub_ext_state,
392 ),
393 (
394 "internal/publisher.json",
395 &entry.publisher[Tier::Internal.idx()],
396 pub_int_state,
397 ),
398 (
399 "internal/subscriber.json",
400 &entry.subscriber[Tier::Internal.idx()],
401 sub_int_state,
402 ),
403 ]
404 }
405}
406
407const NUM_SLOTS: usize = 4;
410
411const TRACK_ORDER: [&str; NUM_SLOTS] = [
414 "publisher.json",
415 "subscriber.json",
416 "internal/publisher.json",
417 "internal/subscriber.json",
418];
419
420const SESSION_TRACK_ORDER: [&str; 2] = ["sessions.json", "internal/sessions.json"];
423
424impl Stats {
425 pub fn new(config: StatsConfig) -> Self {
433 let StatsConfig {
434 origin,
435 prefix,
436 node,
437 interval,
438 } = config;
439 let node = node.filter(|p| !p.is_empty());
444
445 let shared = origin.map(|origin| {
446 let shared = Arc::new(StatsShared {
447 origin,
448 entries: Lock::default(),
449 sessions: Default::default(),
450 });
451 let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
452 spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
453 shared
454 });
455
456 Self { prefix, shared }
457 }
458
459 pub fn prefix(&self) -> &Path<'static> {
461 &self.prefix
462 }
463
464 #[cfg(test)]
467 fn shared(&self) -> &Arc<StatsShared> {
468 self.shared.as_ref().expect("enabled stats aggregator")
469 }
470
471 pub fn tier(&self, tier: Tier) -> StatsHandle {
474 StatsHandle {
475 stats: self.clone(),
476 tier,
477 }
478 }
479
480 fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
481 let shared = self.shared.as_ref()?;
483 let path = path.as_path();
484 if path.has_prefix(&self.prefix) {
488 return None;
489 }
490 let owned = path.to_owned();
491 let mut entries = shared.entries.lock();
492 Some(
493 entries
494 .entry(owned)
495 .or_insert_with(|| Arc::new(BroadcastEntry::new()))
496 .clone(),
497 )
498 }
499
500 fn session_counters(&self, tier: Tier, root: impl AsPath) -> Option<Arc<SessionCounters>> {
504 let shared = self.shared.as_ref()?;
505 let owned = root.as_path().to_owned();
506 let mut sessions = shared.sessions[tier.idx()].lock();
507 Some(sessions.entry(owned).or_default().clone())
508 }
509}
510
511impl Default for Stats {
512 fn default() -> Self {
513 Self::new(StatsConfig::new())
514 }
515}
516
517#[derive(Clone)]
520pub struct StatsHandle {
521 stats: Stats,
522 tier: Tier,
523}
524
525impl StatsHandle {
526 pub fn parent(&self) -> &Stats {
528 &self.stats
529 }
530
531 pub fn tier(&self) -> Tier {
533 self.tier
534 }
535
536 pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
542 BroadcastStats {
543 entry: self.stats.entry(path),
544 tier: self.tier,
545 }
546 }
547
548 pub fn publisher_broadcasts(&self) -> SessionBroadcasts {
553 SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Publisher)
554 }
555
556 pub fn subscriber_broadcasts(&self) -> SessionBroadcasts {
559 SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Subscriber)
560 }
561
562 pub fn session(&self, root: impl AsPath) -> SessionStats {
568 SessionStats::new(self.stats.session_counters(self.tier, root))
569 }
570}
571
572impl Default for StatsHandle {
573 fn default() -> Self {
575 Stats::default().tier(Tier::External)
576 }
577}
578
579#[derive(Clone)]
586pub struct BroadcastStats {
587 entry: Option<Arc<BroadcastEntry>>,
588 tier: Tier,
589}
590
591impl BroadcastStats {
592 pub fn is_empty(&self) -> bool {
596 self.entry.is_none()
597 }
598
599 pub fn publisher(&self) -> PublisherStats {
604 if let Some(entry) = &self.entry {
605 entry.publisher[self.tier.idx()]
606 .announced
607 .fetch_add(1, Ordering::Relaxed);
608 }
609 PublisherStats {
610 entry: self.entry.clone(),
611 tier: self.tier,
612 }
613 }
614
615 pub fn subscriber(&self) -> SubscriberStats {
620 if let Some(entry) = &self.entry {
621 entry.subscriber[self.tier.idx()]
622 .announced
623 .fetch_add(1, Ordering::Relaxed);
624 }
625 SubscriberStats {
626 entry: self.entry.clone(),
627 tier: self.tier,
628 }
629 }
630
631 pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
637 if let Some(entry) = &self.entry {
638 entry.publisher[self.tier.idx()]
639 .subscriptions
640 .fetch_add(1, Ordering::Relaxed);
641 }
642 PublisherTrack {
643 entry: self.entry.clone(),
644 tier: self.tier,
645 }
646 }
647
648 pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
650 if let Some(entry) = &self.entry {
651 entry.subscriber[self.tier.idx()]
652 .subscriptions
653 .fetch_add(1, Ordering::Relaxed);
654 }
655 SubscriberTrack {
656 entry: self.entry.clone(),
657 tier: self.tier,
658 }
659 }
660}
661
662#[derive(Copy, Clone)]
664enum Side {
665 Publisher,
666 Subscriber,
667}
668
669impl Side {
670 fn counters(self, entry: &BroadcastEntry, tier: Tier) -> &Counters {
671 match self {
672 Side::Publisher => &entry.publisher[tier.idx()],
673 Side::Subscriber => &entry.subscriber[tier.idx()],
674 }
675 }
676}
677
678#[derive(Clone)]
693pub struct SessionBroadcasts {
694 stats: Stats,
695 tier: Tier,
696 side: Side,
697 counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
698}
699
700impl SessionBroadcasts {
701 fn new(stats: Stats, tier: Tier, side: Side) -> Self {
702 Self {
703 stats,
704 tier,
705 side,
706 counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
707 }
708 }
709
710 pub fn subscribe(&self, path: impl AsPath) -> BroadcastSubscription {
715 let path = path.as_path().to_owned();
716 let entry = self.stats.entry(&path);
717 let first = {
718 let mut counts = self.counts.lock().expect("stats refcount poisoned");
719 let n = counts.entry(path.clone()).or_insert(0);
720 let first = *n == 0;
721 *n += 1;
722 first
723 };
724 if first {
725 if let Some(entry) = &entry {
726 self.side
727 .counters(entry, self.tier)
728 .broadcasts
729 .fetch_add(1, Ordering::Relaxed);
730 }
731 }
732 BroadcastSubscription {
733 entry,
734 tier: self.tier,
735 side: self.side,
736 counts: self.counts.clone(),
737 path,
738 }
739 }
740}
741
742#[must_use = "drop the guard to release the subscription"]
745pub struct BroadcastSubscription {
746 entry: Option<Arc<BroadcastEntry>>,
747 tier: Tier,
748 side: Side,
749 counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
750 path: PathOwned,
751}
752
753impl Drop for BroadcastSubscription {
754 fn drop(&mut self) {
755 let last = {
756 let mut counts = self.counts.lock().expect("stats refcount poisoned");
757 match counts.get_mut(&self.path) {
758 Some(n) => {
759 *n -= 1;
760 if *n == 0 {
761 counts.remove(&self.path);
762 true
763 } else {
764 false
765 }
766 }
767 None => false,
768 }
769 };
770 if last {
771 if let Some(entry) = &self.entry {
772 self.side
775 .counters(entry, self.tier)
776 .broadcasts_closed
777 .fetch_add(1, Ordering::Release);
778 }
779 }
780 }
781}
782
783#[must_use = "drop the guard to record the session as closed"]
787pub struct SessionStats {
788 counters: Option<Arc<SessionCounters>>,
790}
791
792impl SessionStats {
793 fn new(counters: Option<Arc<SessionCounters>>) -> Self {
794 if let Some(counters) = &counters {
795 counters.sessions.fetch_add(1, Ordering::Relaxed);
796 }
797 Self { counters }
798 }
799}
800
801impl Drop for SessionStats {
802 fn drop(&mut self) {
803 if let Some(counters) = &self.counters {
804 counters.sessions_closed.fetch_add(1, Ordering::Release);
807 }
808 }
809}
810
811#[must_use = "drop the guard to record the broadcast as closed"]
813pub struct PublisherStats {
814 entry: Option<Arc<BroadcastEntry>>,
815 tier: Tier,
816}
817
818impl PublisherStats {
819 pub fn track(&self, name: &str) -> PublisherTrack {
822 BroadcastStats {
823 entry: self.entry.clone(),
824 tier: self.tier,
825 }
826 .publisher_track(name)
827 }
828}
829
830impl Drop for PublisherStats {
831 fn drop(&mut self) {
832 if let Some(entry) = &self.entry {
833 entry.publisher[self.tier.idx()]
837 .announced_closed
838 .fetch_add(1, Ordering::Release);
839 }
840 }
841}
842
843#[must_use = "drop the guard to record the broadcast as closed"]
845pub struct SubscriberStats {
846 entry: Option<Arc<BroadcastEntry>>,
847 tier: Tier,
848}
849
850impl SubscriberStats {
851 pub fn track(&self, name: &str) -> SubscriberTrack {
853 BroadcastStats {
854 entry: self.entry.clone(),
855 tier: self.tier,
856 }
857 .subscriber_track(name)
858 }
859}
860
861impl Drop for SubscriberStats {
862 fn drop(&mut self) {
863 if let Some(entry) = &self.entry {
864 entry.subscriber[self.tier.idx()]
866 .announced_closed
867 .fetch_add(1, Ordering::Release);
868 }
869 }
870}
871
872#[must_use = "drop the guard to record the subscription as closed"]
874pub struct PublisherTrack {
875 entry: Option<Arc<BroadcastEntry>>,
876 tier: Tier,
877}
878
879impl PublisherTrack {
880 pub fn frame(&self) {
882 if let Some(entry) = &self.entry {
883 entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
884 }
885 }
886
887 pub fn bytes(&self, n: u64) {
889 if let Some(entry) = &self.entry {
890 entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
891 }
892 }
893
894 pub fn group(&self) {
896 if let Some(entry) = &self.entry {
897 entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
898 }
899 }
900}
901
902impl Drop for PublisherTrack {
903 fn drop(&mut self) {
904 if let Some(entry) = &self.entry {
905 entry.publisher[self.tier.idx()]
907 .subscriptions_closed
908 .fetch_add(1, Ordering::Release);
909 }
910 }
911}
912
913#[must_use = "drop the guard to record the subscription as closed"]
915pub struct SubscriberTrack {
916 entry: Option<Arc<BroadcastEntry>>,
917 tier: Tier,
918}
919
920impl SubscriberTrack {
921 pub fn frame(&self) {
923 if let Some(entry) = &self.entry {
924 entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
925 }
926 }
927
928 pub fn bytes(&self, n: u64) {
930 if let Some(entry) = &self.entry {
931 entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
932 }
933 }
934
935 pub fn group(&self) {
937 if let Some(entry) = &self.entry {
938 entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
939 }
940 }
941}
942
943impl Drop for SubscriberTrack {
944 fn drop(&mut self) {
945 if let Some(entry) = &self.entry {
946 entry.subscriber[self.tier.idx()]
948 .subscriptions_closed
949 .fetch_add(1, Ordering::Release);
950 }
951 }
952}
953
954fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
958 let raw = counters.snapshot();
959
960 let snap = Snapshot {
961 announced: raw.announced,
962 announced_closed: raw.announced_closed,
963 broadcasts: raw.broadcasts,
964 broadcasts_closed: raw.broadcasts_closed,
965 subscriptions: raw.subscriptions,
966 subscriptions_closed: raw.subscriptions_closed,
967 bytes: raw.bytes,
968 frames: raw.frames,
969 groups: raw.groups,
970 };
971
972 let live = snap.announced != snap.announced_closed
979 || snap.subscriptions != snap.subscriptions_closed
980 || snap.broadcasts != snap.broadcasts_closed;
981
982 let prev_snap = slot_state.prev_emitted.unwrap_or_default();
993 let changed = snap != prev_snap;
994 if changed {
995 slot_state.prev_emitted = Some(snap);
996 }
997 if live || changed {
998 emit(snap);
999 }
1000}
1001
1002#[derive(Default)]
1005struct SessionSlotState {
1006 prev_emitted: Option<SessionSnapshot>,
1007}
1008
1009fn process_session_slot(
1014 counters: &SessionCounters,
1015 slot_state: &mut SessionSlotState,
1016 mut emit: impl FnMut(SessionSnapshot),
1017) {
1018 let (sessions, sessions_closed) = counters.snapshot();
1019 let snap = SessionSnapshot {
1020 sessions,
1021 sessions_closed,
1022 };
1023
1024 let live = sessions != sessions_closed;
1025 let prev_snap = slot_state.prev_emitted.unwrap_or_default();
1026 let changed = snap != prev_snap;
1027 if changed {
1028 slot_state.prev_emitted = Some(snap);
1029 }
1030 if live || changed {
1031 emit(snap);
1032 }
1033}
1034
1035fn flush_track<T: Serialize>(track: &mut TrackProducer, frame: &T, last: &mut Vec<u8>, name: &str) {
1039 let json = match serde_json::to_vec(frame) {
1040 Ok(b) => b,
1041 Err(err) => {
1042 tracing::debug!(?err, name, "stats: failed to serialize frame");
1043 return;
1044 }
1045 };
1046 if &json == last {
1047 return;
1048 }
1049 if let Err(err) = track.write_frame(json.clone()) {
1050 tracing::debug!(?err, name, "stats: failed to write frame");
1051 return;
1052 }
1053 *last = json;
1054}
1055
1056async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
1060 let Some(shared) = weak.upgrade() else {
1061 return;
1062 };
1063
1064 let mut broadcast = Broadcast::new().produce();
1065
1066 let create = |broadcast: &mut crate::BroadcastProducer, name: &str| match broadcast.create_track(Track {
1068 name: name.into(),
1069 priority: 0,
1070 }) {
1071 Ok(t) => Some(t),
1072 Err(err) => {
1073 tracing::warn!(?err, name, "stats: failed to create track");
1074 None
1075 }
1076 };
1077
1078 let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
1079 for name in TRACK_ORDER {
1080 let Some(t) = create(&mut broadcast, name) else {
1081 return;
1082 };
1083 tracks.push(t);
1084 }
1085 let mut session_tracks: Vec<TrackProducer> = Vec::with_capacity(SESSION_TRACK_ORDER.len());
1086 for name in SESSION_TRACK_ORDER {
1087 let Some(t) = create(&mut broadcast, name) else {
1088 return;
1089 };
1090 session_tracks.push(t);
1091 }
1092
1093 if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
1094 tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
1095 return;
1096 }
1097 drop(shared);
1098
1099 let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
1102 let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
1103 let mut session_local: [HashMap<PathOwned, SessionSlotState>; 2] = Default::default();
1105 let mut session_last_payload: [Vec<u8>; 2] = Default::default();
1106
1107 let mut ticker = tokio::time::interval(interval);
1108 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1109
1110 loop {
1111 ticker.tick().await;
1112
1113 let Some(shared) = weak.upgrade() else {
1114 return;
1115 };
1116
1117 let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
1120 let map = shared.entries.lock();
1121 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1122 };
1123
1124 let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
1125 for (path, entry) in &entries {
1126 let snap_state = local.entry(path.clone()).or_default();
1127 for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
1128 process_slot(counters, slot_state, |snap| {
1129 frames[i].insert(path.as_str().to_string(), snap);
1130 });
1131 }
1132 }
1133 drop(entries);
1134
1135 {
1145 let mut map = shared.entries.lock();
1146 map.retain(|_, entry| Arc::strong_count(entry) > 1);
1147 local.retain(|path, _| map.contains_key(path));
1148 }
1149
1150 let mut session_frames: [BTreeMap<String, SessionSnapshot>; 2] = Default::default();
1152 for tier_idx in 0..2 {
1153 let roots: Vec<(PathOwned, Arc<SessionCounters>)> = {
1154 let map = shared.sessions[tier_idx].lock();
1155 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1156 };
1157 let states = &mut session_local[tier_idx];
1158 for (root, counters) in &roots {
1159 let state = states.entry(root.clone()).or_default();
1160 process_session_slot(counters, state, |snap| {
1161 session_frames[tier_idx].insert(root.as_str().to_string(), snap);
1162 });
1163 }
1164 drop(roots);
1165
1166 let mut map = shared.sessions[tier_idx].lock();
1170 map.retain(|_, counters| Arc::strong_count(counters) > 1);
1171 states.retain(|root, _| map.contains_key(root));
1172 }
1173
1174 for (i, (frame, last)) in frames.iter().zip(last_payload.iter_mut()).enumerate() {
1175 flush_track(&mut tracks[i], frame, last, TRACK_ORDER[i]);
1176 }
1177 for (i, (frame, last)) in session_frames.iter().zip(session_last_payload.iter_mut()).enumerate() {
1178 flush_track(&mut session_tracks[i], frame, last, SESSION_TRACK_ORDER[i]);
1179 }
1180
1181 drop(shared);
1182 }
1183}
1184
1185#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
1190#[cfg_attr(test, derive(serde::Deserialize))]
1191struct Snapshot {
1192 announced: u64,
1193 announced_closed: u64,
1194 broadcasts: u64,
1195 broadcasts_closed: u64,
1196 subscriptions: u64,
1197 subscriptions_closed: u64,
1198 bytes: u64,
1199 frames: u64,
1200 groups: u64,
1201}
1202
1203#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
1206#[cfg_attr(test, derive(serde::Deserialize))]
1207struct SessionSnapshot {
1208 sessions: u64,
1209 sessions_closed: u64,
1210}
1211
1212fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
1213 let mut out = format!("{}/node", prefix.as_str());
1216 if let Some(node) = node {
1217 out.push('/');
1218 out.push_str(node);
1219 }
1220 PathOwned::from(out)
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225 use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
1226
1227 use crate::{Origin, Path};
1228
1229 use super::*;
1230
1231 fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
1232 let origin = Origin::random().produce();
1233 let stats = Stats::new(
1234 StatsConfig::new()
1235 .with_origin(origin.clone())
1236 .with_node(node.map(|s| PathOwned::from(s.to_string()))),
1237 );
1238 (stats, origin)
1239 }
1240
1241 #[test]
1242 fn advertised_path_with_and_without_node() {
1243 let prefix = Path::new(".stats");
1244 assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
1245 assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
1246 assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
1247
1248 let prefix = Path::new("metrics");
1249 assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
1250 }
1251
1252 async fn announced_path_for_node(node: &str) -> String {
1256 let origin = Origin::random().produce();
1257 let _stats = Stats::new(
1258 StatsConfig::new()
1259 .with_origin(origin.clone())
1260 .with_node(PathOwned::from(node.to_string())),
1261 );
1262 let mut consumer = origin.consume();
1263 tokio::time::advance(Duration::from_millis(1)).await;
1264 let (path, _broadcast) = consumer.announced().await.expect("expected announce");
1265 path.as_str().to_string()
1266 }
1267
1268 #[tokio::test(start_paused = true)]
1269 async fn new_normalizes_and_drops_empty_node() {
1270 assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
1271 assert_eq!(announced_path_for_node("///").await, ".stats/node");
1272 }
1273
1274 #[tokio::test(start_paused = true)]
1275 async fn per_broadcast_counters_isolated() {
1276 let (stats, _origin) = test_stats(Some("sjc"));
1278 let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
1279 let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
1280 let g1 = bs1.publisher().track("video");
1281 g1.bytes(100);
1282 let g2 = bs2.publisher().track("video");
1283 g2.bytes(7);
1284
1285 let entries = stats.shared().entries.lock();
1286 let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1287 let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
1288 assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1289 assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
1290 }
1291
1292 #[tokio::test(start_paused = true)]
1293 async fn external_and_internal_tiers_are_independent() {
1294 let (stats, _origin) = test_stats(Some("sjc"));
1295 let ext = stats.tier(Tier::External);
1296 let int = stats.tier(Tier::Internal);
1297
1298 let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
1299 ext_track.bytes(100);
1300 let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
1301 int_track.bytes(7);
1302
1303 let entries = stats.shared().entries.lock();
1304 let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1305 assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1306 assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
1307 assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
1308 assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
1309 }
1310
1311 #[tokio::test(start_paused = true)]
1312 async fn paths_under_prefix_are_no_op() {
1313 let (stats, _origin) = test_stats(Some("sjc"));
1316 let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
1317 assert!(bs.is_empty());
1318 let p = bs.publisher();
1319 let track = p.track("video");
1320 track.bytes(100);
1321 drop(track);
1322 drop(p);
1323 assert!(stats.shared().entries.lock().is_empty());
1324 }
1325
1326 #[tokio::test(start_paused = true)]
1327 async fn disabled_stats_are_noop() {
1328 let stats = Stats::default();
1331 assert!(stats.shared.is_none());
1332 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1333 assert!(bs.is_empty());
1334 let p = bs.publisher();
1335 let track = p.track("video");
1336 track.bytes(100);
1337 drop(track);
1338 drop(p);
1339 }
1340
1341 #[tokio::test(start_paused = true)]
1342 async fn single_broadcast_path_announced() {
1343 let (stats, origin) = test_stats(Some("sjc/1"));
1346 let mut consumer = origin.consume();
1347
1348 let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
1349 let _t1 = bs1.publisher().track("video");
1350 let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
1351 let _t2 = bs2.publisher().track("video");
1352
1353 tokio::time::advance(Duration::from_millis(1)).await;
1354 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1355 assert!(broadcast.is_some());
1356 assert_eq!(path.as_str(), ".stats/node/sjc/1");
1357 }
1358
1359 #[tokio::test(start_paused = true)]
1360 async fn task_announces_without_node_suffix() {
1361 let origin = Origin::random().produce();
1362 let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
1363 let mut consumer = origin.consume();
1364
1365 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1366 let _t = bs.publisher().track("video");
1367
1368 tokio::time::advance(Duration::from_millis(1)).await;
1369 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1370 assert!(broadcast.is_some());
1371 assert_eq!(path.as_str(), ".stats/node");
1372 }
1373
1374 async fn drive_ticks(count: u32) {
1380 for _ in 0..count {
1381 tokio::time::advance(Duration::from_secs(1)).await;
1382 for _ in 0..4 {
1385 tokio::task::yield_now().await;
1386 }
1387 }
1388 }
1389
1390 #[tokio::test(start_paused = true)]
1391 async fn live_entry_kept_while_idle() {
1392 let (stats, _origin) = test_stats(Some("sjc"));
1396 let key = PathOwned::from("foo/bar".to_string());
1397 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1398 let guard = bs.publisher();
1399
1400 drive_ticks(5).await;
1401 assert!(
1402 stats.shared().entries.lock().contains_key(&key),
1403 "announced-but-idle broadcast must stay while the guard is held"
1404 );
1405
1406 drop(guard);
1407 drop(bs);
1408 drive_ticks(1).await;
1411 assert!(
1412 !stats.shared().entries.lock().contains_key(&key),
1413 "entry dropped once the announce guard closes"
1414 );
1415 }
1416
1417 #[tokio::test(start_paused = true)]
1418 async fn entry_dropped_once_fully_closed() {
1419 let (stats, _origin) = test_stats(Some("sjc"));
1422 let key = PathOwned::from("foo/bar".to_string());
1423 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1424 let track = bs.publisher().track("video");
1425
1426 drive_ticks(1).await;
1427 assert!(
1428 stats.shared().entries.lock().contains_key(&key),
1429 "live entry present while the track guard is held"
1430 );
1431
1432 drop(track);
1433 drop(bs);
1434 drive_ticks(1).await;
1435 assert!(
1436 !stats.shared().entries.lock().contains_key(&key),
1437 "fully-closed entry dropped on the next tick"
1438 );
1439 }
1440
1441 #[tokio::test(start_paused = true)]
1442 async fn frame_emits_expected_counters() {
1443 let (stats, origin) = test_stats(Some("sjc"));
1444 let mut consumer = origin.consume();
1445 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1446 let track = bs.publisher().track("video");
1447 track.bytes(42);
1448 track.frame();
1449 let sessions = stats.tier(Tier::External).publisher_broadcasts();
1450 let _sub = sessions.subscribe("foo/bar");
1451
1452 tokio::time::advance(Duration::from_millis(1100)).await;
1453
1454 let (_path, broadcast) = consumer.announced().await.expect("expected announce");
1455 let broadcast = broadcast.expect("active");
1456 let track = broadcast
1457 .subscribe_track(&Track {
1458 name: "publisher.json".into(),
1459 priority: 0,
1460 })
1461 .expect("subscribe");
1462 let frame = read_frame(track).await;
1463 let snap = frame.get("foo/bar").expect("foo/bar entry");
1464 assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
1465 assert_eq!(snap.broadcasts, 1, "one session subscribed");
1466 assert_eq!(snap.subscriptions, 1);
1467 assert_eq!(snap.bytes, 42);
1468 assert_eq!(snap.frames, 1);
1469 }
1470
1471 #[tokio::test(start_paused = true)]
1472 async fn announced_decouples_from_broadcasts() {
1473 let (stats, origin) = test_stats(Some("sjc"));
1476 let mut consumer = origin.consume();
1477 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1478 let _guard = bs.publisher();
1479
1480 tokio::time::advance(Duration::from_millis(1100)).await;
1481
1482 let (_path, broadcast) = consumer.announced().await.expect("announce");
1483 let broadcast = broadcast.expect("active");
1484 let track = broadcast
1485 .subscribe_track(&Track {
1486 name: "publisher.json".into(),
1487 priority: 0,
1488 })
1489 .expect("subscribe");
1490 let frame = read_frame(track).await;
1491 let snap = frame.get("foo/bar").expect("foo/bar entry");
1492 assert_eq!(snap.announced, 1);
1493 assert_eq!(snap.broadcasts, 0, "no subscription, no broadcasts sentinel");
1494 assert_eq!(snap.subscriptions, 0);
1495 }
1496
1497 #[tokio::test(start_paused = true)]
1498 async fn short_lived_sub_is_surfaced() {
1499 let (stats, origin) = test_stats(Some("sjc"));
1505 let mut consumer = origin.consume();
1506 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1507 let sessions = stats.tier(Tier::External).publisher_broadcasts();
1508 {
1509 let track = bs.publisher().track("video");
1510 track.bytes(123);
1511 track.frame();
1512 let _sub = sessions.subscribe("foo/bar");
1513 }
1515
1516 tokio::time::advance(Duration::from_millis(1100)).await;
1517
1518 let (_path, broadcast) = consumer.announced().await.expect("announce");
1519 let broadcast = broadcast.expect("active");
1520 let track = broadcast
1521 .subscribe_track(&Track {
1522 name: "publisher.json".into(),
1523 priority: 0,
1524 })
1525 .expect("subscribe");
1526 let frame = read_frame(track).await;
1527 let snap = frame.get("foo/bar").expect("foo/bar entry");
1528 assert_eq!(snap.subscriptions, 1);
1530 assert_eq!(snap.subscriptions_closed, 1);
1531 assert_eq!(snap.broadcasts, 1, "one session subscribed");
1532 assert_eq!(snap.broadcasts_closed, 1);
1533 assert_eq!(snap.bytes, 123);
1534 assert_eq!(snap.frames, 1);
1535 }
1536
1537 #[tokio::test(start_paused = true)]
1538 async fn multiple_subs_count_as_one_broadcast() {
1539 let (stats, _origin) = test_stats(Some("sjc"));
1544 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1545 let sessions = stats.tier(Tier::External).publisher_broadcasts();
1546 let pub_guard = bs.publisher();
1547 let t1 = pub_guard.track("video");
1548 let t2 = pub_guard.track("audio");
1549 let s1 = sessions.subscribe("foo/bar");
1550 let s2 = sessions.subscribe("foo/bar");
1551
1552 let raw = || {
1553 let entries = stats.shared().entries.lock();
1554 let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1555 entry.publisher[Tier::External.idx()].snapshot()
1556 };
1557
1558 let r = raw();
1559 assert_eq!(r.subscriptions, 2, "two track subs");
1560 assert_eq!(r.subscriptions_closed, 0, "neither dropped yet");
1561 assert_eq!(r.broadcasts, 1, "one session => one broadcast");
1562 assert_eq!(r.broadcasts_closed, 0);
1563
1564 drop(s1);
1565 assert_eq!(raw().broadcasts_closed, 0, "session still has a sub open");
1566
1567 drop(s2);
1568 drop(t1);
1569 drop(t2);
1570 let r = raw();
1571 assert_eq!(r.subscriptions_closed, 2, "both track subs dropped");
1572 assert_eq!(r.broadcasts, 1);
1573 assert_eq!(r.broadcasts_closed, 1, "last sub closed => one broadcasts_closed");
1574
1575 drop(pub_guard);
1576 drop(bs);
1577 }
1578
1579 #[tokio::test(start_paused = true)]
1580 async fn distinct_sessions_count_as_separate_broadcasts() {
1581 let (stats, _origin) = test_stats(Some("sjc"));
1584 let viewer1 = stats.tier(Tier::External).publisher_broadcasts();
1585 let viewer2 = stats.tier(Tier::External).publisher_broadcasts();
1586
1587 let raw = || {
1588 let entries = stats.shared().entries.lock();
1589 let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1590 entry.publisher[Tier::External.idx()].snapshot()
1591 };
1592
1593 let s1 = viewer1.subscribe("foo/bar");
1594 assert_eq!(raw().broadcasts, 1, "one viewer");
1595 let s2 = viewer2.subscribe("foo/bar");
1596 assert_eq!(raw().broadcasts, 2, "two distinct viewers");
1597 assert_eq!(raw().broadcasts_closed, 0);
1598
1599 drop(s1);
1600 let r = raw();
1601 assert_eq!(r.broadcasts, 2, "broadcasts is cumulative");
1602 assert_eq!(r.broadcasts_closed, 1, "one viewer left");
1603 drop(s2);
1606 assert_eq!(raw().broadcasts_closed, 2, "both viewers gone");
1607 }
1608
1609 #[tokio::test(start_paused = true)]
1610 async fn session_counts_by_root() {
1611 let (stats, _origin) = test_stats(Some("sjc"));
1614 let ext = stats.tier(Tier::External);
1615
1616 let snap = |root: &str| {
1617 let map = stats.shared().sessions[Tier::External.idx()].lock();
1618 map.get(&PathOwned::from(root.to_string())).map(|c| c.snapshot())
1619 };
1620
1621 let a1 = ext.session("acme");
1622 let a2 = ext.session("acme");
1623 let b1 = ext.session("globex");
1624 assert_eq!(snap("acme"), Some((2, 0)), "two sessions under one root");
1625 assert_eq!(snap("globex"), Some((1, 0)), "a distinct root is counted separately");
1626
1627 drop(a1);
1628 assert_eq!(snap("acme"), Some((2, 1)));
1629 drop(a2);
1630 drop(b1);
1631 assert_eq!(snap("acme"), Some((2, 2)));
1632 assert_eq!(snap("globex"), Some((1, 1)));
1633 }
1634
1635 #[tokio::test(start_paused = true)]
1636 async fn session_track_surfaces_by_root() {
1637 let (stats, origin) = test_stats(Some("sjc"));
1638 let mut consumer = origin.consume();
1639 let _a = stats.tier(Tier::External).session("acme");
1640 let _b = stats.tier(Tier::External).session("acme");
1641 let _c = stats.tier(Tier::Internal).session("peer");
1642
1643 tokio::time::advance(Duration::from_millis(1100)).await;
1644
1645 let (_path, broadcast) = consumer.announced().await.expect("announce");
1646 let broadcast = broadcast.expect("active");
1647
1648 let track = broadcast
1649 .subscribe_track(&Track {
1650 name: "sessions.json".into(),
1651 priority: 0,
1652 })
1653 .expect("subscribe");
1654 let frame = read_session_frame(track).await;
1655 let snap = frame.get("acme").expect("root entry");
1656 assert_eq!(snap.sessions, 2);
1657 assert_eq!(snap.sessions_closed, 0);
1658 assert!(
1659 !frame.contains_key("peer"),
1660 "internal session must not appear on the external track"
1661 );
1662
1663 let int_track = broadcast
1664 .subscribe_track(&Track {
1665 name: "internal/sessions.json".into(),
1666 priority: 0,
1667 })
1668 .expect("subscribe");
1669 let snap = *read_session_frame(int_track).await.get("peer").expect("internal entry");
1670 assert_eq!(snap.sessions, 1);
1671 }
1672
1673 #[tokio::test(start_paused = true)]
1674 async fn session_root_dropped_when_empty() {
1675 let (stats, _origin) = test_stats(Some("sjc"));
1678 let key = PathOwned::from("acme");
1679 let session = stats.tier(Tier::External).session("acme");
1680
1681 drive_ticks(1).await;
1682 assert!(
1683 stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
1684 "root present while a session is connected"
1685 );
1686
1687 drop(session);
1688 drive_ticks(1).await;
1689 assert!(
1690 !stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
1691 "root GC'd after the last session leaves"
1692 );
1693 }
1694
1695 #[tokio::test(start_paused = true)]
1696 async fn unused_slots_dont_surface() {
1697 let (stats, origin) = test_stats(Some("sjc"));
1703 let mut consumer = origin.consume();
1704 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1705 let track = bs.publisher().track("video");
1706 track.frame();
1707
1708 drive_ticks(2).await;
1709
1710 let (_path, broadcast) = consumer.announced().await.expect("announce");
1711 let broadcast = broadcast.expect("active");
1712
1713 let pub_track = broadcast
1715 .subscribe_track(&Track {
1716 name: "publisher.json".into(),
1717 priority: 0,
1718 })
1719 .expect("subscribe");
1720 assert!(
1721 read_frame(pub_track).await.contains_key("foo/bar"),
1722 "publisher.json must include the active foo/bar entry"
1723 );
1724
1725 for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
1728 let t = broadcast
1729 .subscribe_track(&Track {
1730 name: name.into(),
1731 priority: 0,
1732 })
1733 .expect("subscribe");
1734 let frame = read_frame(t).await;
1735 assert!(
1736 frame.is_empty(),
1737 "{name} must be empty for an entry with no activity on that slot, got {frame:?}",
1738 );
1739 }
1740 }
1741
1742 #[test]
1743 fn snapshot_reads_closed_before_open() {
1744 let src = include_str!("stats.rs");
1750 let body_start = src
1753 .find("fn snapshot(&self) -> RawCounts")
1754 .expect("snapshot fn present");
1755 let body = &src[body_start..];
1756 let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
1757 let open_pos = body.find("self.announced.load(").expect("announced load");
1758 assert!(
1759 closed_pos < open_pos,
1760 "announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
1761 );
1762 let subs_closed_pos = body
1763 .find("self.subscriptions_closed.load")
1764 .expect("subscriptions_closed load");
1765 let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
1766 assert!(
1767 subs_closed_pos < subs_pos,
1768 "subscriptions_closed must be loaded before subscriptions",
1769 );
1770 let bcast_closed_pos = body
1771 .find("self.broadcasts_closed.load")
1772 .expect("broadcasts_closed load");
1773 let bcast_pos = body.find("self.broadcasts.load").expect("broadcasts load");
1774 assert!(
1775 bcast_closed_pos < bcast_pos,
1776 "broadcasts_closed must be loaded before broadcasts",
1777 );
1778 }
1779
1780 #[test]
1781 fn session_snapshot_reads_closed_before_open() {
1782 let src = include_str!("stats.rs");
1786 let body_start = src
1787 .find("fn snapshot(&self) -> (u64, u64)")
1788 .expect("SessionCounters::snapshot fn present");
1789 let body = &src[body_start..];
1790 let closed_pos = body.find("self.sessions_closed.load").expect("sessions_closed load");
1791 let open_pos = body.find("self.sessions.load").expect("sessions load");
1792 assert!(closed_pos < open_pos, "sessions_closed must be loaded before sessions",);
1793 }
1794
1795 async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
1796 let bytes = track.read_frame().await.expect("ok").expect("frame");
1797 serde_json::from_slice(&bytes).expect("json parse")
1798 }
1799
1800 async fn read_session_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, SessionSnapshot> {
1801 let bytes = track.read_frame().await.expect("ok").expect("frame");
1802 serde_json::from_slice(&bytes).expect("json parse")
1803 }
1804}