1use std::{
103 collections::{BTreeMap, HashMap},
104 sync::{
105 Arc, Weak,
106 atomic::{AtomicU64, Ordering},
107 },
108 time::Duration,
109};
110
111use serde::Serialize;
112use web_async::{Lock, spawn};
113
114use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
115
116#[derive(Default, Debug)]
124#[non_exhaustive]
125pub struct Counters {
126 pub announced: AtomicU64,
127 pub announced_closed: AtomicU64,
128 pub subscriptions: AtomicU64,
129 pub subscriptions_closed: AtomicU64,
130 pub bytes: AtomicU64,
131 pub frames: AtomicU64,
132 pub groups: AtomicU64,
133}
134
135impl Counters {
136 fn snapshot(&self) -> RawCounts {
144 let announced_closed = self.announced_closed.load(Ordering::Acquire);
145 let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
146 let announced = self.announced.load(Ordering::Relaxed);
147 let subscriptions = self.subscriptions.load(Ordering::Relaxed);
148 let bytes = self.bytes.load(Ordering::Relaxed);
149 let frames = self.frames.load(Ordering::Relaxed);
150 let groups = self.groups.load(Ordering::Relaxed);
151 RawCounts {
152 announced,
153 announced_closed,
154 subscriptions,
155 subscriptions_closed,
156 bytes,
157 frames,
158 groups,
159 }
160 }
161}
162
163#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
167struct RawCounts {
168 announced: u64,
169 announced_closed: u64,
170 subscriptions: u64,
171 subscriptions_closed: u64,
172 bytes: u64,
173 frames: u64,
174 groups: u64,
175}
176
177#[derive(Copy, Clone, Debug, PartialEq, Eq)]
182pub enum Tier {
183 External,
184 Internal,
185}
186
187impl Tier {
188 fn idx(self) -> usize {
189 match self {
190 Tier::External => 0,
191 Tier::Internal => 1,
192 }
193 }
194}
195
196#[derive(Clone)]
210#[non_exhaustive]
211pub struct StatsConfig {
212 pub origin: Option<OriginProducer>,
215 pub prefix: PathOwned,
219 pub node: Option<PathOwned>,
225 pub interval: Duration,
227}
228
229impl StatsConfig {
230 pub fn new() -> Self {
234 Self {
235 origin: None,
236 prefix: PathOwned::from(".stats"),
237 node: None,
238 interval: Duration::from_secs(1),
239 }
240 }
241
242 pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
245 self.origin = origin.into();
246 self
247 }
248
249 pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
251 self.prefix = prefix.into();
252 self
253 }
254
255 pub fn with_interval(mut self, interval: Duration) -> Self {
257 self.interval = interval;
258 self
259 }
260
261 pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
263 self.node = node.into();
264 self
265 }
266}
267
268impl Default for StatsConfig {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274#[derive(Clone)]
278pub struct Stats {
279 prefix: PathOwned,
280 shared: Option<Arc<StatsShared>>,
283}
284
285struct StatsShared {
288 origin: OriginProducer,
289 entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
290}
291
292struct BroadcastEntry {
297 publisher: [Counters; 2],
298 subscriber: [Counters; 2],
299}
300
301impl BroadcastEntry {
302 fn new() -> Self {
303 Self {
304 publisher: Default::default(),
305 subscriber: Default::default(),
306 }
307 }
308}
309
310#[derive(Default)]
315struct SlotState {
316 derived_broadcasts: u64,
321 derived_broadcasts_closed: u64,
323 prev_emitted: Option<Snapshot>,
326}
327
328#[derive(Default)]
332struct EntrySnapState {
333 publisher: [SlotState; 2],
334 subscriber: [SlotState; 2],
335}
336
337impl EntrySnapState {
338 fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
341 let [pub_ext_state, pub_int_state] = &mut self.publisher;
342 let [sub_ext_state, sub_int_state] = &mut self.subscriber;
343 [
344 ("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
345 (
346 "subscriber.json",
347 &entry.subscriber[Tier::External.idx()],
348 sub_ext_state,
349 ),
350 (
351 "internal/publisher.json",
352 &entry.publisher[Tier::Internal.idx()],
353 pub_int_state,
354 ),
355 (
356 "internal/subscriber.json",
357 &entry.subscriber[Tier::Internal.idx()],
358 sub_int_state,
359 ),
360 ]
361 }
362}
363
364const NUM_SLOTS: usize = 4;
367
368const TRACK_ORDER: [&str; NUM_SLOTS] = [
371 "publisher.json",
372 "subscriber.json",
373 "internal/publisher.json",
374 "internal/subscriber.json",
375];
376
377impl Stats {
378 pub fn new(config: StatsConfig) -> Self {
386 let StatsConfig {
387 origin,
388 prefix,
389 node,
390 interval,
391 } = config;
392 let node = node.filter(|p| !p.is_empty());
397
398 let shared = origin.map(|origin| {
399 let shared = Arc::new(StatsShared {
400 origin,
401 entries: Lock::default(),
402 });
403 let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
404 spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
405 shared
406 });
407
408 Self { prefix, shared }
409 }
410
411 pub fn prefix(&self) -> &Path<'static> {
413 &self.prefix
414 }
415
416 #[cfg(test)]
419 fn shared(&self) -> &Arc<StatsShared> {
420 self.shared.as_ref().expect("enabled stats aggregator")
421 }
422
423 pub fn tier(&self, tier: Tier) -> StatsHandle {
426 StatsHandle {
427 stats: self.clone(),
428 tier,
429 }
430 }
431
432 fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
433 let shared = self.shared.as_ref()?;
435 let path = path.as_path();
436 if path.has_prefix(&self.prefix) {
440 return None;
441 }
442 let owned = path.to_owned();
443 let mut entries = shared.entries.lock();
444 Some(
445 entries
446 .entry(owned)
447 .or_insert_with(|| Arc::new(BroadcastEntry::new()))
448 .clone(),
449 )
450 }
451}
452
453impl Default for Stats {
454 fn default() -> Self {
455 Self::new(StatsConfig::new())
456 }
457}
458
459#[derive(Clone)]
462pub struct StatsHandle {
463 stats: Stats,
464 tier: Tier,
465}
466
467impl StatsHandle {
468 pub fn parent(&self) -> &Stats {
470 &self.stats
471 }
472
473 pub fn tier(&self) -> Tier {
475 self.tier
476 }
477
478 pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
484 BroadcastStats {
485 entry: self.stats.entry(path),
486 tier: self.tier,
487 }
488 }
489}
490
491impl Default for StatsHandle {
492 fn default() -> Self {
494 Stats::default().tier(Tier::External)
495 }
496}
497
498#[derive(Clone)]
505pub struct BroadcastStats {
506 entry: Option<Arc<BroadcastEntry>>,
507 tier: Tier,
508}
509
510impl BroadcastStats {
511 pub fn is_empty(&self) -> bool {
515 self.entry.is_none()
516 }
517
518 pub fn publisher(&self) -> PublisherStats {
523 if let Some(entry) = &self.entry {
524 entry.publisher[self.tier.idx()]
525 .announced
526 .fetch_add(1, Ordering::Relaxed);
527 }
528 PublisherStats {
529 entry: self.entry.clone(),
530 tier: self.tier,
531 }
532 }
533
534 pub fn subscriber(&self) -> SubscriberStats {
539 if let Some(entry) = &self.entry {
540 entry.subscriber[self.tier.idx()]
541 .announced
542 .fetch_add(1, Ordering::Relaxed);
543 }
544 SubscriberStats {
545 entry: self.entry.clone(),
546 tier: self.tier,
547 }
548 }
549
550 pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
556 if let Some(entry) = &self.entry {
557 entry.publisher[self.tier.idx()]
558 .subscriptions
559 .fetch_add(1, Ordering::Relaxed);
560 }
561 PublisherTrack {
562 entry: self.entry.clone(),
563 tier: self.tier,
564 }
565 }
566
567 pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
569 if let Some(entry) = &self.entry {
570 entry.subscriber[self.tier.idx()]
571 .subscriptions
572 .fetch_add(1, Ordering::Relaxed);
573 }
574 SubscriberTrack {
575 entry: self.entry.clone(),
576 tier: self.tier,
577 }
578 }
579}
580
581#[must_use = "drop the guard to record the broadcast as closed"]
583pub struct PublisherStats {
584 entry: Option<Arc<BroadcastEntry>>,
585 tier: Tier,
586}
587
588impl PublisherStats {
589 pub fn track(&self, name: &str) -> PublisherTrack {
592 BroadcastStats {
593 entry: self.entry.clone(),
594 tier: self.tier,
595 }
596 .publisher_track(name)
597 }
598}
599
600impl Drop for PublisherStats {
601 fn drop(&mut self) {
602 if let Some(entry) = &self.entry {
603 entry.publisher[self.tier.idx()]
607 .announced_closed
608 .fetch_add(1, Ordering::Release);
609 }
610 }
611}
612
613#[must_use = "drop the guard to record the broadcast as closed"]
615pub struct SubscriberStats {
616 entry: Option<Arc<BroadcastEntry>>,
617 tier: Tier,
618}
619
620impl SubscriberStats {
621 pub fn track(&self, name: &str) -> SubscriberTrack {
623 BroadcastStats {
624 entry: self.entry.clone(),
625 tier: self.tier,
626 }
627 .subscriber_track(name)
628 }
629}
630
631impl Drop for SubscriberStats {
632 fn drop(&mut self) {
633 if let Some(entry) = &self.entry {
634 entry.subscriber[self.tier.idx()]
636 .announced_closed
637 .fetch_add(1, Ordering::Release);
638 }
639 }
640}
641
642#[must_use = "drop the guard to record the subscription as closed"]
644pub struct PublisherTrack {
645 entry: Option<Arc<BroadcastEntry>>,
646 tier: Tier,
647}
648
649impl PublisherTrack {
650 pub fn frame(&self) {
652 if let Some(entry) = &self.entry {
653 entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
654 }
655 }
656
657 pub fn bytes(&self, n: u64) {
659 if let Some(entry) = &self.entry {
660 entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
661 }
662 }
663
664 pub fn group(&self) {
666 if let Some(entry) = &self.entry {
667 entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
668 }
669 }
670}
671
672impl Drop for PublisherTrack {
673 fn drop(&mut self) {
674 if let Some(entry) = &self.entry {
675 entry.publisher[self.tier.idx()]
677 .subscriptions_closed
678 .fetch_add(1, Ordering::Release);
679 }
680 }
681}
682
683#[must_use = "drop the guard to record the subscription as closed"]
685pub struct SubscriberTrack {
686 entry: Option<Arc<BroadcastEntry>>,
687 tier: Tier,
688}
689
690impl SubscriberTrack {
691 pub fn frame(&self) {
693 if let Some(entry) = &self.entry {
694 entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
695 }
696 }
697
698 pub fn bytes(&self, n: u64) {
700 if let Some(entry) = &self.entry {
701 entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
702 }
703 }
704
705 pub fn group(&self) {
707 if let Some(entry) = &self.entry {
708 entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
709 }
710 }
711}
712
713impl Drop for SubscriberTrack {
714 fn drop(&mut self) {
715 if let Some(entry) = &self.entry {
716 entry.subscriber[self.tier.idx()]
718 .subscriptions_closed
719 .fetch_add(1, Ordering::Release);
720 }
721 }
722}
723
724fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
729 let raw = counters.snapshot();
730
731 let (prev_subs, prev_subs_closed, prev_broadcasts, prev_broadcasts_closed) = match &slot_state.prev_emitted {
737 Some(prev) => (
738 prev.subscriptions,
739 prev.subscriptions_closed,
740 prev.broadcasts,
741 prev.broadcasts_closed,
742 ),
743 None => (0, 0, 0, 0),
744 };
745 let prev_active = prev_subs > prev_subs_closed;
746 let curr_active = raw.subscriptions > raw.subscriptions_closed;
747 let delta_subs = raw.subscriptions.saturating_sub(prev_subs);
748 let active_during = prev_active || curr_active || delta_subs > 0;
749
750 if !prev_active && active_during {
751 slot_state.derived_broadcasts = prev_broadcasts.saturating_add(1);
752 }
753 if active_during && !curr_active {
754 slot_state.derived_broadcasts_closed = prev_broadcasts_closed.saturating_add(1);
755 }
756
757 let snap = Snapshot {
758 announced: raw.announced,
759 announced_closed: raw.announced_closed,
760 broadcasts: slot_state.derived_broadcasts,
761 broadcasts_closed: slot_state.derived_broadcasts_closed,
762 subscriptions: raw.subscriptions,
763 subscriptions_closed: raw.subscriptions_closed,
764 bytes: raw.bytes,
765 frames: raw.frames,
766 groups: raw.groups,
767 };
768
769 let live = snap.announced != snap.announced_closed
776 || snap.subscriptions != snap.subscriptions_closed
777 || snap.broadcasts != snap.broadcasts_closed;
778
779 let prev_snap = slot_state.prev_emitted.unwrap_or_default();
790 let changed = snap != prev_snap;
791 if changed {
792 slot_state.prev_emitted = Some(snap);
793 }
794 if live || changed {
795 emit(snap);
796 }
797}
798
799async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
803 let Some(shared) = weak.upgrade() else {
804 return;
805 };
806
807 let mut broadcast = Broadcast::new().produce();
808 let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
809 for name in TRACK_ORDER {
810 match broadcast.create_track(Track {
811 name: name.into(),
812 priority: 0,
813 }) {
814 Ok(t) => tracks.push(t),
815 Err(err) => {
816 tracing::warn!(?err, name, "stats: failed to create track");
817 return;
818 }
819 }
820 }
821 if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
822 tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
823 return;
824 }
825 drop(shared);
826
827 let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
831 let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
832
833 let mut ticker = tokio::time::interval(interval);
834 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
835
836 loop {
837 ticker.tick().await;
838
839 let Some(shared) = weak.upgrade() else {
840 return;
841 };
842
843 let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
846 let map = shared.entries.lock();
847 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
848 };
849
850 let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
851 for (path, entry) in &entries {
852 let snap_state = local.entry(path.clone()).or_default();
853 for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
854 process_slot(counters, slot_state, |snap| {
855 frames[i].insert(path.as_str().to_string(), snap);
856 });
857 }
858 }
859 drop(entries);
860
861 {
871 let mut map = shared.entries.lock();
872 map.retain(|_, entry| Arc::strong_count(entry) > 1);
873 local.retain(|path, _| map.contains_key(path));
874 }
875
876 for (((frame, last), track), slot) in frames
877 .iter()
878 .zip(last_payload.iter_mut())
879 .zip(tracks.iter_mut())
880 .zip(0usize..)
881 {
882 let json = match serde_json::to_vec(frame) {
883 Ok(b) => b,
884 Err(err) => {
885 tracing::debug!(?err, slot, "stats: failed to serialize frame");
886 continue;
887 }
888 };
889 if &json == last {
890 continue;
891 }
892 if let Err(err) = track.write_frame(json.clone()) {
893 tracing::debug!(?err, slot, "stats: failed to write frame");
894 continue;
897 }
898 *last = json;
899 }
900
901 drop(shared);
902 }
903}
904
905#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
910#[cfg_attr(test, derive(serde::Deserialize))]
911struct Snapshot {
912 announced: u64,
913 announced_closed: u64,
914 broadcasts: u64,
915 broadcasts_closed: u64,
916 subscriptions: u64,
917 subscriptions_closed: u64,
918 bytes: u64,
919 frames: u64,
920 groups: u64,
921}
922
923fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
924 let mut out = format!("{}/node", prefix.as_str());
927 if let Some(node) = node {
928 out.push('/');
929 out.push_str(node);
930 }
931 PathOwned::from(out)
932}
933
934#[cfg(test)]
935mod tests {
936 use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
937
938 use crate::{Origin, Path};
939
940 use super::*;
941
942 fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
943 let origin = Origin::random().produce();
944 let stats = Stats::new(
945 StatsConfig::new()
946 .with_origin(origin.clone())
947 .with_node(node.map(|s| PathOwned::from(s.to_string()))),
948 );
949 (stats, origin)
950 }
951
952 #[test]
953 fn advertised_path_with_and_without_node() {
954 let prefix = Path::new(".stats");
955 assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
956 assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
957 assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
958
959 let prefix = Path::new("metrics");
960 assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
961 }
962
963 async fn announced_path_for_node(node: &str) -> String {
967 let origin = Origin::random().produce();
968 let _stats = Stats::new(
969 StatsConfig::new()
970 .with_origin(origin.clone())
971 .with_node(PathOwned::from(node.to_string())),
972 );
973 let mut consumer = origin.consume();
974 tokio::time::advance(Duration::from_millis(1)).await;
975 let (path, _broadcast) = consumer.announced().await.expect("expected announce");
976 path.as_str().to_string()
977 }
978
979 #[tokio::test(start_paused = true)]
980 async fn new_normalizes_and_drops_empty_node() {
981 assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
982 assert_eq!(announced_path_for_node("///").await, ".stats/node");
983 }
984
985 #[tokio::test(start_paused = true)]
986 async fn per_broadcast_counters_isolated() {
987 let (stats, _origin) = test_stats(Some("sjc"));
989 let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
990 let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
991 let g1 = bs1.publisher().track("video");
992 g1.bytes(100);
993 let g2 = bs2.publisher().track("video");
994 g2.bytes(7);
995
996 let entries = stats.shared().entries.lock();
997 let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
998 let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
999 assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1000 assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
1001 }
1002
1003 #[tokio::test(start_paused = true)]
1004 async fn external_and_internal_tiers_are_independent() {
1005 let (stats, _origin) = test_stats(Some("sjc"));
1006 let ext = stats.tier(Tier::External);
1007 let int = stats.tier(Tier::Internal);
1008
1009 let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
1010 ext_track.bytes(100);
1011 let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
1012 int_track.bytes(7);
1013
1014 let entries = stats.shared().entries.lock();
1015 let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1016 assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1017 assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
1018 assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
1019 assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
1020 }
1021
1022 #[tokio::test(start_paused = true)]
1023 async fn paths_under_prefix_are_no_op() {
1024 let (stats, _origin) = test_stats(Some("sjc"));
1027 let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
1028 assert!(bs.is_empty());
1029 let p = bs.publisher();
1030 let track = p.track("video");
1031 track.bytes(100);
1032 drop(track);
1033 drop(p);
1034 assert!(stats.shared().entries.lock().is_empty());
1035 }
1036
1037 #[tokio::test(start_paused = true)]
1038 async fn disabled_stats_are_noop() {
1039 let stats = Stats::default();
1042 assert!(stats.shared.is_none());
1043 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1044 assert!(bs.is_empty());
1045 let p = bs.publisher();
1046 let track = p.track("video");
1047 track.bytes(100);
1048 drop(track);
1049 drop(p);
1050 }
1051
1052 #[tokio::test(start_paused = true)]
1053 async fn single_broadcast_path_announced() {
1054 let (stats, origin) = test_stats(Some("sjc/1"));
1057 let mut consumer = origin.consume();
1058
1059 let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
1060 let _t1 = bs1.publisher().track("video");
1061 let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
1062 let _t2 = bs2.publisher().track("video");
1063
1064 tokio::time::advance(Duration::from_millis(1)).await;
1065 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1066 assert!(broadcast.is_some());
1067 assert_eq!(path.as_str(), ".stats/node/sjc/1");
1068 }
1069
1070 #[tokio::test(start_paused = true)]
1071 async fn task_announces_without_node_suffix() {
1072 let origin = Origin::random().produce();
1073 let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
1074 let mut consumer = origin.consume();
1075
1076 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1077 let _t = bs.publisher().track("video");
1078
1079 tokio::time::advance(Duration::from_millis(1)).await;
1080 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1081 assert!(broadcast.is_some());
1082 assert_eq!(path.as_str(), ".stats/node");
1083 }
1084
1085 async fn drive_ticks(count: u32) {
1091 for _ in 0..count {
1092 tokio::time::advance(Duration::from_secs(1)).await;
1093 for _ in 0..4 {
1096 tokio::task::yield_now().await;
1097 }
1098 }
1099 }
1100
1101 #[tokio::test(start_paused = true)]
1102 async fn live_entry_kept_while_idle() {
1103 let (stats, _origin) = test_stats(Some("sjc"));
1107 let key = PathOwned::from("foo/bar".to_string());
1108 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1109 let guard = bs.publisher();
1110
1111 drive_ticks(5).await;
1112 assert!(
1113 stats.shared().entries.lock().contains_key(&key),
1114 "announced-but-idle broadcast must stay while the guard is held"
1115 );
1116
1117 drop(guard);
1118 drop(bs);
1119 drive_ticks(1).await;
1122 assert!(
1123 !stats.shared().entries.lock().contains_key(&key),
1124 "entry dropped once the announce guard closes"
1125 );
1126 }
1127
1128 #[tokio::test(start_paused = true)]
1129 async fn entry_dropped_once_fully_closed() {
1130 let (stats, _origin) = test_stats(Some("sjc"));
1133 let key = PathOwned::from("foo/bar".to_string());
1134 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1135 let track = bs.publisher().track("video");
1136
1137 drive_ticks(1).await;
1138 assert!(
1139 stats.shared().entries.lock().contains_key(&key),
1140 "live entry present while the track guard is held"
1141 );
1142
1143 drop(track);
1144 drop(bs);
1145 drive_ticks(1).await;
1146 assert!(
1147 !stats.shared().entries.lock().contains_key(&key),
1148 "fully-closed entry dropped on the next tick"
1149 );
1150 }
1151
1152 #[tokio::test(start_paused = true)]
1153 async fn frame_emits_expected_counters() {
1154 let (stats, origin) = test_stats(Some("sjc"));
1155 let mut consumer = origin.consume();
1156 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1157 let track = bs.publisher().track("video");
1158 track.bytes(42);
1159 track.frame();
1160
1161 tokio::time::advance(Duration::from_millis(1100)).await;
1162
1163 let (_path, broadcast) = consumer.announced().await.expect("expected announce");
1164 let broadcast = broadcast.expect("active");
1165 let track = broadcast
1166 .subscribe_track(&Track {
1167 name: "publisher.json".into(),
1168 priority: 0,
1169 })
1170 .expect("subscribe");
1171 let frame = read_frame(track).await;
1172 let snap = frame.get("foo/bar").expect("foo/bar entry");
1173 assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
1174 assert_eq!(snap.broadcasts, 1, "subs went 0->1, derived broadcasts++");
1175 assert_eq!(snap.subscriptions, 1);
1176 assert_eq!(snap.bytes, 42);
1177 assert_eq!(snap.frames, 1);
1178 }
1179
1180 #[tokio::test(start_paused = true)]
1181 async fn announced_decouples_from_broadcasts() {
1182 let (stats, origin) = test_stats(Some("sjc"));
1185 let mut consumer = origin.consume();
1186 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1187 let _guard = bs.publisher();
1188
1189 tokio::time::advance(Duration::from_millis(1100)).await;
1190
1191 let (_path, broadcast) = consumer.announced().await.expect("announce");
1192 let broadcast = broadcast.expect("active");
1193 let track = broadcast
1194 .subscribe_track(&Track {
1195 name: "publisher.json".into(),
1196 priority: 0,
1197 })
1198 .expect("subscribe");
1199 let frame = read_frame(track).await;
1200 let snap = frame.get("foo/bar").expect("foo/bar entry");
1201 assert_eq!(snap.announced, 1);
1202 assert_eq!(snap.broadcasts, 0, "no sub, no derived broadcasts");
1203 assert_eq!(snap.subscriptions, 0);
1204 }
1205
1206 #[tokio::test(start_paused = true)]
1207 async fn short_lived_sub_is_surfaced() {
1208 let (stats, origin) = test_stats(Some("sjc"));
1213 let mut consumer = origin.consume();
1214 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1215 {
1216 let track = bs.publisher().track("video");
1217 track.bytes(123);
1218 track.frame();
1219 }
1221
1222 tokio::time::advance(Duration::from_millis(1100)).await;
1223
1224 let (_path, broadcast) = consumer.announced().await.expect("announce");
1225 let broadcast = broadcast.expect("active");
1226 let track = broadcast
1227 .subscribe_track(&Track {
1228 name: "publisher.json".into(),
1229 priority: 0,
1230 })
1231 .expect("subscribe");
1232 let frame = read_frame(track).await;
1233 let snap = frame.get("foo/bar").expect("foo/bar entry");
1234 assert_eq!(snap.subscriptions, 1);
1237 assert_eq!(snap.subscriptions_closed, 1);
1238 assert_eq!(snap.broadcasts, 1, "flicker counts as one broadcast");
1239 assert_eq!(snap.broadcasts_closed, 1);
1240 assert_eq!(snap.bytes, 123);
1241 assert_eq!(snap.frames, 1);
1242 }
1243
1244 #[tokio::test(start_paused = true)]
1245 async fn multiple_subs_count_as_one_broadcast() {
1246 let (stats, _origin) = test_stats(Some("sjc"));
1251 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1252 let pub_guard = bs.publisher();
1253 let t1 = pub_guard.track("video");
1254 let t2 = pub_guard.track("audio");
1255
1256 drive_ticks(2).await;
1257 {
1258 let entries = stats.shared().entries.lock();
1259 let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1260 let raw = entry.publisher[Tier::External.idx()].snapshot();
1261 assert_eq!(raw.subscriptions, 2, "two track subs");
1262 assert_eq!(raw.subscriptions_closed, 0, "neither dropped yet");
1263 }
1264
1265 drop(t1);
1269 drop(t2);
1270 drive_ticks(1).await;
1271
1272 let entries = stats.shared().entries.lock();
1278 let entry = entries
1279 .get(&PathOwned::from("foo/bar"))
1280 .expect("entry still live (publisher guard held)");
1281 let raw = entry.publisher[Tier::External.idx()].snapshot();
1282 assert_eq!(raw.subscriptions, 2);
1283 assert_eq!(raw.subscriptions_closed, 2, "both dropped");
1284 }
1285
1286 #[tokio::test(start_paused = true)]
1287 async fn unused_slots_dont_surface() {
1288 let (stats, origin) = test_stats(Some("sjc"));
1294 let mut consumer = origin.consume();
1295 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1296 let track = bs.publisher().track("video");
1297 track.frame();
1298
1299 drive_ticks(2).await;
1300
1301 let (_path, broadcast) = consumer.announced().await.expect("announce");
1302 let broadcast = broadcast.expect("active");
1303
1304 let pub_track = broadcast
1306 .subscribe_track(&Track {
1307 name: "publisher.json".into(),
1308 priority: 0,
1309 })
1310 .expect("subscribe");
1311 assert!(
1312 read_frame(pub_track).await.contains_key("foo/bar"),
1313 "publisher.json must include the active foo/bar entry"
1314 );
1315
1316 for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
1319 let t = broadcast
1320 .subscribe_track(&Track {
1321 name: name.into(),
1322 priority: 0,
1323 })
1324 .expect("subscribe");
1325 let frame = read_frame(t).await;
1326 assert!(
1327 frame.is_empty(),
1328 "{name} must be empty for an entry with no activity on that slot, got {frame:?}",
1329 );
1330 }
1331 }
1332
1333 #[test]
1334 fn snapshot_reads_closed_before_open() {
1335 let src = include_str!("stats.rs");
1341 let body_start = src
1344 .find("fn snapshot(&self) -> RawCounts")
1345 .expect("snapshot fn present");
1346 let body = &src[body_start..];
1347 let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
1348 let open_pos = body.find("self.announced.load(").expect("announced load");
1349 assert!(
1350 closed_pos < open_pos,
1351 "announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
1352 );
1353 let subs_closed_pos = body
1354 .find("self.subscriptions_closed.load")
1355 .expect("subscriptions_closed load");
1356 let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
1357 assert!(
1358 subs_closed_pos < subs_pos,
1359 "subscriptions_closed must be loaded before subscriptions",
1360 );
1361 }
1362
1363 async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
1364 let bytes = track.read_frame().await.expect("ok").expect("frame");
1365 serde_json::from_slice(&bytes).expect("json parse")
1366 }
1367}