1use std::{
61 collections::HashMap,
62 sync::{
63 Arc, Weak,
64 atomic::{AtomicU64, Ordering},
65 },
66 time::{Duration, SystemTime, UNIX_EPOCH},
67};
68
69use serde::Serialize;
70use web_async::{Lock, spawn};
71
72use crate::{AsPath, Broadcast, Origin, OriginProducer, Path, PathOwned, Track};
73
74#[derive(Default, Debug)]
76#[non_exhaustive]
77pub struct Counters {
78 pub broadcasts: AtomicU64,
79 pub broadcasts_closed: AtomicU64,
80 pub subscriptions: AtomicU64,
81 pub subscriptions_closed: AtomicU64,
82 pub bytes: AtomicU64,
83 pub frames: AtomicU64,
84 pub groups: AtomicU64,
85}
86
87impl Counters {
88 fn snapshot(&self) -> Snapshot {
89 Snapshot {
90 broadcasts: self.broadcasts.load(Ordering::Relaxed),
91 broadcasts_closed: self.broadcasts_closed.load(Ordering::Relaxed),
92 subscriptions: self.subscriptions.load(Ordering::Relaxed),
93 subscriptions_closed: self.subscriptions_closed.load(Ordering::Relaxed),
94 bytes: self.bytes.load(Ordering::Relaxed),
95 frames: self.frames.load(Ordering::Relaxed),
96 groups: self.groups.load(Ordering::Relaxed),
97 }
98 }
99
100 fn active(&self) -> bool {
101 self.subscriptions.load(Ordering::Relaxed) > self.subscriptions_closed.load(Ordering::Relaxed)
102 }
103}
104
105#[derive(Copy, Clone, Debug, PartialEq, Eq)]
109pub enum Tier {
110 External,
111 Internal,
112}
113
114impl Tier {
115 fn as_str(&self) -> &'static str {
116 match self {
117 Tier::External => "external",
118 Tier::Internal => "internal",
119 }
120 }
121}
122
123#[derive(Copy, Clone, Debug, PartialEq, Eq)]
124enum Role {
125 Publisher,
126 Subscriber,
127}
128
129impl Role {
130 fn as_str(&self) -> &'static str {
131 match self {
132 Role::Publisher => "publisher",
133 Role::Subscriber => "subscriber",
134 }
135 }
136}
137
138#[derive(Clone)]
141pub struct Stats {
142 inner: Arc<StatsInner>,
143}
144
145struct StatsInner {
146 prefix: PathOwned,
147 levels: u32,
148 node: Option<PathOwned>,
149 origin: OriginProducer,
150 entries: Lock<HashMap<PathOwned, Arc<Level>>>,
151}
152
153struct Level {
154 advertised: PathOwned,
155 external_publisher: Counters,
156 external_subscriber: Counters,
157 internal_publisher: Counters,
158 internal_subscriber: Counters,
159 task: Lock<Option<()>>,
160 origin: OriginProducer,
161 node: Option<PathOwned>,
162 level_key: PathOwned,
163}
164
165impl Level {
166 fn counters(&self, tier: Tier, role: Role) -> &Counters {
167 match (tier, role) {
168 (Tier::External, Role::Publisher) => &self.external_publisher,
169 (Tier::External, Role::Subscriber) => &self.external_subscriber,
170 (Tier::Internal, Role::Publisher) => &self.internal_publisher,
171 (Tier::Internal, Role::Subscriber) => &self.internal_subscriber,
172 }
173 }
174
175 fn any_active(&self) -> bool {
176 self.external_publisher.active()
177 || self.external_subscriber.active()
178 || self.internal_publisher.active()
179 || self.internal_subscriber.active()
180 }
181}
182
183impl Stats {
184 pub fn new(
204 prefix: impl Into<PathOwned>,
205 levels: u32,
206 node: impl Into<Option<PathOwned>>,
207 origin: OriginProducer,
208 ) -> Self {
209 let node = node.into().filter(|p| !p.is_empty());
212 Self {
213 inner: Arc::new(StatsInner {
214 prefix: prefix.into(),
215 levels,
216 node,
217 origin,
218 entries: Lock::default(),
219 }),
220 }
221 }
222
223 pub fn disabled() -> Self {
227 Self {
231 inner: Arc::new(StatsInner {
232 prefix: PathOwned::default(),
233 levels: 0,
234 node: None,
235 origin: Origin::random().produce(),
236 entries: Lock::default(),
237 }),
238 }
239 }
240
241 pub fn prefix(&self) -> &Path<'static> {
243 &self.inner.prefix
244 }
245
246 pub fn tier(&self, tier: Tier) -> StatsHandle {
249 StatsHandle {
250 stats: self.clone(),
251 tier,
252 }
253 }
254
255 fn broadcast_levels(&self, path: impl AsPath) -> Arc<[Arc<Level>]> {
256 let path = path.as_path();
257 if path.has_prefix(&self.inner.prefix) {
260 return Arc::from([]);
261 }
262
263 let keys = level_keys(&path, self.inner.levels);
264 let mut entries = self.inner.entries.lock();
265 let arcs: Vec<Arc<Level>> = keys
266 .into_iter()
267 .map(|key| {
268 entries
269 .entry(key.clone())
270 .or_insert_with(|| {
271 let advertised = advertised_path(&self.inner.prefix, &key, self.inner.node.as_ref());
272 Arc::new(Level {
273 advertised,
274 external_publisher: Counters::default(),
275 external_subscriber: Counters::default(),
276 internal_publisher: Counters::default(),
277 internal_subscriber: Counters::default(),
278 task: Lock::new(None),
279 origin: self.inner.origin.clone(),
280 node: self.inner.node.clone(),
281 level_key: key,
282 })
283 })
284 .clone()
285 })
286 .collect();
287
288 arcs.into()
289 }
290}
291
292impl Default for Stats {
293 fn default() -> Self {
294 Self::disabled()
295 }
296}
297
298#[derive(Clone)]
301pub struct StatsHandle {
302 stats: Stats,
303 tier: Tier,
304}
305
306impl StatsHandle {
307 pub fn disabled() -> Self {
309 Stats::disabled().tier(Tier::External)
310 }
311
312 pub fn parent(&self) -> &Stats {
314 &self.stats
315 }
316
317 pub fn tier(&self) -> Tier {
319 self.tier
320 }
321
322 pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
329 BroadcastStats {
330 levels: self.stats.broadcast_levels(path),
331 tier: self.tier,
332 }
333 }
334}
335
336impl Default for StatsHandle {
337 fn default() -> Self {
338 Self::disabled()
339 }
340}
341
342#[derive(Clone)]
348pub struct BroadcastStats {
349 levels: Arc<[Arc<Level>]>,
350 tier: Tier,
351}
352
353impl BroadcastStats {
354 pub fn is_empty(&self) -> bool {
358 self.levels.is_empty()
359 }
360
361 pub fn publisher(&self) -> PublisherStats {
364 for level in self.levels.iter() {
365 level
366 .counters(self.tier, Role::Publisher)
367 .broadcasts
368 .fetch_add(1, Ordering::Relaxed);
369 }
370 PublisherStats {
371 levels: self.levels.clone(),
372 tier: self.tier,
373 }
374 }
375
376 pub fn subscriber(&self) -> SubscriberStats {
379 for level in self.levels.iter() {
380 level
381 .counters(self.tier, Role::Subscriber)
382 .broadcasts
383 .fetch_add(1, Ordering::Relaxed);
384 }
385 SubscriberStats {
386 levels: self.levels.clone(),
387 tier: self.tier,
388 }
389 }
390
391 pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
400 for level in self.levels.iter() {
401 level
402 .counters(self.tier, Role::Publisher)
403 .subscriptions
404 .fetch_add(1, Ordering::Relaxed);
405 ensure_task(level);
406 }
407 PublisherTrack {
408 levels: self.levels.clone(),
409 tier: self.tier,
410 }
411 }
412
413 pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
415 for level in self.levels.iter() {
416 level
417 .counters(self.tier, Role::Subscriber)
418 .subscriptions
419 .fetch_add(1, Ordering::Relaxed);
420 ensure_task(level);
421 }
422 SubscriberTrack {
423 levels: self.levels.clone(),
424 tier: self.tier,
425 }
426 }
427}
428
429#[must_use = "drop the guard to record the broadcast as closed"]
431pub struct PublisherStats {
432 levels: Arc<[Arc<Level>]>,
433 tier: Tier,
434}
435
436impl PublisherStats {
437 pub fn track(&self, name: &str) -> PublisherTrack {
441 BroadcastStats {
442 levels: self.levels.clone(),
443 tier: self.tier,
444 }
445 .publisher_track(name)
446 }
447}
448
449impl Drop for PublisherStats {
450 fn drop(&mut self) {
451 for level in self.levels.iter() {
452 level
453 .counters(self.tier, Role::Publisher)
454 .broadcasts_closed
455 .fetch_add(1, Ordering::Relaxed);
456 }
457 }
458}
459
460#[must_use = "drop the guard to record the broadcast as closed"]
462pub struct SubscriberStats {
463 levels: Arc<[Arc<Level>]>,
464 tier: Tier,
465}
466
467impl SubscriberStats {
468 pub fn track(&self, name: &str) -> SubscriberTrack {
470 BroadcastStats {
471 levels: self.levels.clone(),
472 tier: self.tier,
473 }
474 .subscriber_track(name)
475 }
476}
477
478impl Drop for SubscriberStats {
479 fn drop(&mut self) {
480 for level in self.levels.iter() {
481 level
482 .counters(self.tier, Role::Subscriber)
483 .broadcasts_closed
484 .fetch_add(1, Ordering::Relaxed);
485 }
486 }
487}
488
489#[must_use = "drop the guard to record the subscription as closed"]
491pub struct PublisherTrack {
492 levels: Arc<[Arc<Level>]>,
493 tier: Tier,
494}
495
496impl PublisherTrack {
497 pub fn frame(&self) {
499 for level in self.levels.iter() {
500 level
501 .counters(self.tier, Role::Publisher)
502 .frames
503 .fetch_add(1, Ordering::Relaxed);
504 }
505 }
506
507 pub fn bytes(&self, n: u64) {
509 for level in self.levels.iter() {
510 level
511 .counters(self.tier, Role::Publisher)
512 .bytes
513 .fetch_add(n, Ordering::Relaxed);
514 }
515 }
516
517 pub fn group(&self) {
519 for level in self.levels.iter() {
520 level
521 .counters(self.tier, Role::Publisher)
522 .groups
523 .fetch_add(1, Ordering::Relaxed);
524 }
525 }
526}
527
528impl Drop for PublisherTrack {
529 fn drop(&mut self) {
530 for level in self.levels.iter() {
531 level
532 .counters(self.tier, Role::Publisher)
533 .subscriptions_closed
534 .fetch_add(1, Ordering::Relaxed);
535 }
536 }
537}
538
539#[must_use = "drop the guard to record the subscription as closed"]
541pub struct SubscriberTrack {
542 levels: Arc<[Arc<Level>]>,
543 tier: Tier,
544}
545
546impl SubscriberTrack {
547 pub fn frame(&self) {
549 for level in self.levels.iter() {
550 level
551 .counters(self.tier, Role::Subscriber)
552 .frames
553 .fetch_add(1, Ordering::Relaxed);
554 }
555 }
556
557 pub fn bytes(&self, n: u64) {
559 for level in self.levels.iter() {
560 level
561 .counters(self.tier, Role::Subscriber)
562 .bytes
563 .fetch_add(n, Ordering::Relaxed);
564 }
565 }
566
567 pub fn group(&self) {
569 for level in self.levels.iter() {
570 level
571 .counters(self.tier, Role::Subscriber)
572 .groups
573 .fetch_add(1, Ordering::Relaxed);
574 }
575 }
576}
577
578impl Drop for SubscriberTrack {
579 fn drop(&mut self) {
580 for level in self.levels.iter() {
581 level
582 .counters(self.tier, Role::Subscriber)
583 .subscriptions_closed
584 .fetch_add(1, Ordering::Relaxed);
585 }
586 }
587}
588
589fn ensure_task(level: &Arc<Level>) {
590 let mut slot = level.task.lock();
591 if slot.is_none() {
592 *slot = Some(());
593 let weak = Arc::downgrade(level);
594 spawn(run_publisher(weak));
595 }
596}
597
598async fn run_publisher(weak: Weak<Level>) {
599 let setup = {
600 let Some(level) = weak.upgrade() else {
601 return;
602 };
603 let mut broadcast = Broadcast::new().produce();
604 let mut make = |name: &str| {
605 broadcast.create_track(Track {
606 name: name.into(),
607 priority: 0,
608 })
609 };
610 let ext_pub = match make("publisher.json") {
611 Ok(t) => t,
612 Err(err) => {
613 tracing::warn!(?err, "stats: failed to create publisher.json");
614 clear_task(&level);
615 return;
616 }
617 };
618 let ext_sub = match make("subscriber.json") {
619 Ok(t) => t,
620 Err(err) => {
621 tracing::warn!(?err, "stats: failed to create subscriber.json");
622 clear_task(&level);
623 return;
624 }
625 };
626 let int_pub = match make("internal/publisher.json") {
627 Ok(t) => t,
628 Err(err) => {
629 tracing::warn!(?err, "stats: failed to create internal/publisher.json");
630 clear_task(&level);
631 return;
632 }
633 };
634 let int_sub = match make("internal/subscriber.json") {
635 Ok(t) => t,
636 Err(err) => {
637 tracing::warn!(?err, "stats: failed to create internal/subscriber.json");
638 clear_task(&level);
639 return;
640 }
641 };
642 if !level.origin.publish_broadcast(&level.advertised, broadcast.consume()) {
643 tracing::warn!(level = %level.advertised, "stats: origin rejected stats broadcast");
644 clear_task(&level);
645 return;
646 }
647 (broadcast, ext_pub, ext_sub, int_pub, int_sub)
648 };
649 let (broadcast, mut ext_pub, mut ext_sub, mut int_pub, mut int_sub) = setup;
650
651 let mut last_ext_pub: Option<Snapshot> = None;
652 let mut last_ext_sub: Option<Snapshot> = None;
653 let mut last_int_pub: Option<Snapshot> = None;
654 let mut last_int_sub: Option<Snapshot> = None;
655
656 let mut tick = tokio::time::interval(Duration::from_secs(1));
657 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
658
659 loop {
660 tick.tick().await;
661
662 let Some(level) = weak.upgrade() else {
663 return;
664 };
665
666 if !level.any_active() {
667 let mut slot = level.task.lock();
671 if !level.any_active() {
672 *slot = None;
673 drop(slot);
674 drop(level);
675 drop(broadcast);
678 return;
679 }
680 }
681
682 maybe_write(&mut ext_pub, Tier::External, Role::Publisher, &level, &mut last_ext_pub);
683 maybe_write(
684 &mut ext_sub,
685 Tier::External,
686 Role::Subscriber,
687 &level,
688 &mut last_ext_sub,
689 );
690 maybe_write(&mut int_pub, Tier::Internal, Role::Publisher, &level, &mut last_int_pub);
691 maybe_write(
692 &mut int_sub,
693 Tier::Internal,
694 Role::Subscriber,
695 &level,
696 &mut last_int_sub,
697 );
698 }
699}
700
701fn maybe_write(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, last: &mut Option<Snapshot>) {
702 let snapshot = level.counters(tier, role).snapshot();
703 if last.as_ref() == Some(&snapshot) {
704 return;
705 }
706 write_snapshot(track, tier, role, level, snapshot);
707 *last = Some(snapshot);
708}
709
710fn clear_task(level: &Level) {
711 *level.task.lock() = None;
712}
713
714#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
715struct Snapshot {
716 broadcasts: u64,
717 broadcasts_closed: u64,
718 subscriptions: u64,
719 subscriptions_closed: u64,
720 bytes: u64,
721 frames: u64,
722 groups: u64,
723}
724
725#[derive(Debug, Serialize)]
726struct SnapshotFrame<'a> {
727 v: u32,
728 level: &'a str,
729 tier: &'a str,
730 role: &'a str,
731 #[serde(skip_serializing_if = "Option::is_none")]
732 node: Option<&'a str>,
733 ts_ms: u64,
734 #[serde(flatten)]
735 snapshot: Snapshot,
736}
737
738fn write_snapshot(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, snapshot: Snapshot) {
739 let frame = SnapshotFrame {
740 v: 1,
741 level: level.level_key.as_str(),
742 tier: tier.as_str(),
743 role: role.as_str(),
744 node: level.node.as_ref().map(|p| p.as_str()),
745 ts_ms: now_ms(),
746 snapshot,
747 };
748
749 let buf = match serde_json::to_vec(&frame) {
750 Ok(buf) => buf,
751 Err(err) => {
752 tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to serialize snapshot");
753 return;
754 }
755 };
756
757 if let Err(err) = track.write_frame(buf) {
758 tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to write snapshot frame");
759 }
760}
761
762fn now_ms() -> u64 {
763 SystemTime::now()
764 .duration_since(UNIX_EPOCH)
765 .map(|d| d.as_millis() as u64)
766 .unwrap_or(0)
767}
768
769fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
777 if levels == 0 {
778 return Vec::new();
779 }
780 if broadcast.is_empty() {
781 return vec![PathOwned::default()];
782 }
783
784 let segs: Vec<&str> = broadcast.as_str().split('/').collect();
785 let max = (levels as usize).min(segs.len());
786 (0..=max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
787}
788
789fn advertised_path(prefix: &Path, level_key: &Path, node: Option<&Path>) -> PathOwned {
790 let top = prefix.as_str();
793 let mut out = format!("{top}/prefix");
794 if !level_key.is_empty() {
795 out.push('/');
796 out.push_str(level_key.as_str());
797 }
798 if let Some(node) = node {
799 out.push('/');
802 out.push_str(node.as_str());
803 }
804 PathOwned::from(out)
805}
806
807#[cfg(test)]
808mod tests {
809 use std::sync::atomic::Ordering::Relaxed;
810
811 use crate::{Origin, Path};
812
813 use super::*;
814
815 #[test]
816 fn level_keys_basic() {
817 let key = |s: &str, n: u32| {
818 level_keys(&Path::new(s), n)
819 .into_iter()
820 .map(|p| p.as_str().to_string())
821 .collect::<Vec<_>>()
822 };
823
824 assert_eq!(key("demo/bbb", 1), vec!["", "demo"]);
828 assert_eq!(key("demo/bbb", 2), vec!["", "demo", "demo/bbb"]);
830 assert_eq!(key("demo/bbb", 3), vec!["", "demo", "demo/bbb"]);
833 assert_eq!(key("a/b/c/d", 3), vec!["", "a", "a/b", "a/b/c"]);
835 assert_eq!(key("demo", 2), vec!["", "demo"]);
837 assert!(key("demo/bbb", 0).is_empty());
839 }
840
841 #[test]
842 fn advertised_path_root_and_nested() {
843 let prefix = Path::new(".stats");
844 let node = PathOwned::from("sjc");
845 assert_eq!(
846 advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
847 ".stats/prefix/sjc"
848 );
849 assert_eq!(
850 advertised_path(&prefix, &Path::new("demo"), Some(&node)).as_str(),
851 ".stats/prefix/demo/sjc"
852 );
853 assert_eq!(
854 advertised_path(&prefix, &Path::new("demo/foo"), Some(&node)).as_str(),
855 ".stats/prefix/demo/foo/sjc"
856 );
857 }
858
859 #[test]
860 fn advertised_path_without_node() {
861 let prefix = Path::new(".stats");
862 assert_eq!(advertised_path(&prefix, &Path::new(""), None).as_str(), ".stats/prefix");
863 assert_eq!(
864 advertised_path(&prefix, &Path::new("demo"), None).as_str(),
865 ".stats/prefix/demo"
866 );
867 }
868
869 #[test]
870 fn advertised_path_honors_custom_prefix() {
871 let prefix = Path::new("metrics");
872 let node = PathOwned::from("lon");
873 assert_eq!(
874 advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
875 "metrics/prefix/lon"
876 );
877 assert_eq!(
878 advertised_path(&prefix, &Path::new("demo/room"), Some(&node)).as_str(),
879 "metrics/prefix/demo/room/lon"
880 );
881 }
882
883 #[test]
884 fn advertised_path_multi_segment_node() {
885 let prefix = Path::new(".stats");
886 let node = PathOwned::from("sjc/1");
887 assert_eq!(
888 advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
889 ".stats/prefix/sjc/1"
890 );
891 assert_eq!(
892 advertised_path(&prefix, &Path::new("demo"), Some(&node)).as_str(),
893 ".stats/prefix/demo/sjc/1"
894 );
895 assert_eq!(
896 advertised_path(&prefix, &Path::new("demo/foo"), Some(&node)).as_str(),
897 ".stats/prefix/demo/foo/sjc/1"
898 );
899 let node2 = PathOwned::from("sjc/2");
902 assert_eq!(
903 advertised_path(&prefix, &Path::new("demo"), Some(&node2)).as_str(),
904 ".stats/prefix/demo/sjc/2"
905 );
906 }
907
908 #[test]
909 fn new_normalizes_and_drops_empty_node() {
910 let origin = Origin::random().produce();
911
912 let stats = Stats::new(".stats", 1, Some(PathOwned::from("/sjc//1/")), origin.clone());
914 assert_eq!(stats.inner.node.as_ref().unwrap().as_str(), "sjc/1");
915
916 let stats = Stats::new(".stats", 1, Some(PathOwned::from("")), origin.clone());
919 assert!(stats.inner.node.is_none());
920 let stats = Stats::new(".stats", 1, Some(PathOwned::from("///")), origin);
921 assert!(stats.inner.node.is_none());
922 }
923
924 #[tokio::test(start_paused = true)]
925 async fn external_publisher_bumps_external_publisher_counters() {
926 let origin = Origin::random().produce();
927 let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
928 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
929 let pub_role = bs.publisher();
930 let track = pub_role.track("video");
931 track.frame();
932 track.bytes(100);
933 track.group();
934 drop(track);
935 drop(pub_role);
936
937 let entries = stats.inner.entries.lock();
938 let root = entries.get(&PathOwned::from("")).expect("root level");
939 assert_eq!(root.external_publisher.frames.load(Relaxed), 1);
940 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
941 assert_eq!(root.external_publisher.groups.load(Relaxed), 1);
942 assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
943 assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
944 assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 1);
945 assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 1);
946 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
948 assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
949 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
950 }
951
952 #[tokio::test(start_paused = true)]
953 async fn external_subscriber_bumps_external_subscriber_counters() {
954 let origin = Origin::random().produce();
955 let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
956 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
957 let sub_role = bs.subscriber();
958 let track = sub_role.track("video");
959 track.frame();
960 track.bytes(50);
961
962 let entries = stats.inner.entries.lock();
963 let root = entries.get(&PathOwned::from("")).expect("root level");
964 assert_eq!(root.external_subscriber.frames.load(Relaxed), 1);
965 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 50);
966 assert_eq!(root.external_subscriber.broadcasts.load(Relaxed), 1);
967 assert_eq!(root.external_subscriber.subscriptions.load(Relaxed), 1);
968 assert_eq!(root.external_publisher.bytes.load(Relaxed), 0);
969 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
970 }
971
972 #[tokio::test(start_paused = true)]
973 async fn external_and_internal_tiers_are_independent() {
974 let origin = Origin::random().produce();
975 let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
976 let ext = stats.tier(Tier::External);
977 let int = stats.tier(Tier::Internal);
978
979 let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
980 ext_track.bytes(100);
981 let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
982 int_track.bytes(7);
983
984 let entries = stats.inner.entries.lock();
985 let root = entries.get(&PathOwned::from("")).expect("root level");
986 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
987 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
988 assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
989 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 7);
990 }
991
992 #[tokio::test(start_paused = true)]
993 async fn bumps_fanout_to_all_levels() {
994 let origin = Origin::random().produce();
995 let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
996 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
997 let p = bs.publisher();
998 let track = p.track("video");
999 track.bytes(100);
1000
1001 let entries = stats.inner.entries.lock();
1002 let root = entries.get(&PathOwned::from("")).expect("root level");
1003 let demo = entries.get(&PathOwned::from("demo")).expect("demo level");
1004 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
1005 assert_eq!(demo.external_publisher.bytes.load(Relaxed), 100);
1006 }
1007
1008 #[tokio::test(start_paused = true)]
1009 async fn paths_under_prefix_are_no_op() {
1010 let origin = Origin::random().produce();
1013 let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
1014 let bs = stats.tier(Tier::External).broadcast(".stats/prefix/sjc");
1015 assert!(bs.is_empty());
1016
1017 let p = bs.publisher();
1018 let track = p.track("video");
1019 track.bytes(100);
1020 track.frame();
1021 track.group();
1022 drop(track);
1023 drop(p);
1024
1025 assert!(stats.inner.entries.lock().is_empty());
1026 }
1027
1028 #[tokio::test(start_paused = true)]
1029 async fn publisher_track_does_not_bump_broadcasts() {
1030 let origin = Origin::random().produce();
1033 let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
1034 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1035 let track = bs.publisher_track("video");
1036 track.bytes(10);
1037 drop(track);
1038
1039 let entries = stats.inner.entries.lock();
1040 let root = entries.get(&PathOwned::from("")).expect("root level");
1041 assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 0);
1042 assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 0);
1043 assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
1044 assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
1045 assert_eq!(root.external_publisher.bytes.load(Relaxed), 10);
1046 }
1047
1048 #[tokio::test(start_paused = true)]
1049 async fn disabled_stats_are_noop() {
1050 let stats = Stats::disabled();
1052 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1053 assert!(bs.is_empty());
1054 let p = bs.publisher();
1055 let track = p.track("video");
1056 track.bytes(100);
1057 drop(track);
1058 drop(p);
1059 assert!(stats.inner.entries.lock().is_empty());
1060 }
1061
1062 #[tokio::test(start_paused = true)]
1063 async fn task_spawns_on_first_subscribe_and_announces() {
1064 let origin = Origin::random().produce();
1065 let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone());
1066 let mut consumer = origin.consume();
1067
1068 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1069 let p = bs.publisher();
1070 let _track = p.track("video");
1071
1072 tokio::time::advance(Duration::from_millis(1)).await;
1073 let mut seen = std::collections::HashSet::new();
1075 for _ in 0..2 {
1076 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1077 assert!(broadcast.is_some());
1078 seen.insert(path.as_str().to_string());
1079 }
1080 assert!(seen.contains(".stats/prefix/sjc"));
1081 assert!(seen.contains(".stats/prefix/foo/sjc"));
1082 }
1083
1084 #[tokio::test(start_paused = true)]
1085 async fn task_spawns_with_node_suffix() {
1086 let origin = Origin::random().produce();
1087 let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin.clone());
1088 let mut consumer = origin.consume();
1089
1090 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1091 let p = bs.publisher();
1092 let _track = p.track("video");
1093
1094 tokio::time::advance(Duration::from_millis(1)).await;
1095 let mut seen = std::collections::HashSet::new();
1098 for _ in 0..3 {
1099 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1100 assert!(broadcast.is_some());
1101 seen.insert(path.as_str().to_string());
1102 }
1103 assert!(seen.contains(".stats/prefix/sjc"));
1104 assert!(seen.contains(".stats/prefix/foo/sjc"));
1105 assert!(seen.contains(".stats/prefix/foo/bar/sjc"));
1106 }
1107
1108 #[tokio::test(start_paused = true)]
1109 async fn task_spawns_without_node_suffix() {
1110 let origin = Origin::random().produce();
1112 let stats = Stats::new(".stats", 1, None, origin.clone());
1113 let mut consumer = origin.consume();
1114
1115 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1116 let p = bs.publisher();
1117 let _track = p.track("video");
1118
1119 tokio::time::advance(Duration::from_millis(1)).await;
1120 let mut seen = std::collections::HashSet::new();
1121 for _ in 0..2 {
1122 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1123 assert!(broadcast.is_some());
1124 seen.insert(path.as_str().to_string());
1125 }
1126 assert!(seen.contains(".stats/prefix"));
1127 assert!(seen.contains(".stats/prefix/foo"));
1128 }
1129
1130 #[tokio::test(start_paused = true)]
1131 async fn task_exits_when_all_roles_idle() {
1132 let origin = Origin::random().produce();
1133 let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone());
1137 let mut consumer = origin.consume();
1138
1139 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1140 let p = bs.publisher();
1141 let track = p.track("video");
1142
1143 tokio::time::advance(Duration::from_millis(1)).await;
1144 let mut announced: Vec<String> = Vec::new();
1145 for _ in 0..2 {
1146 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1147 assert!(broadcast.is_some(), "expected an active announce");
1148 announced.push(path.as_str().to_string());
1149 }
1150 announced.sort();
1151 assert_eq!(announced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1152
1153 drop(track);
1154 drop(p);
1155 drop(bs);
1156
1157 tokio::time::advance(Duration::from_secs(2)).await;
1158 let mut unannounced: Vec<String> = Vec::new();
1159 for _ in 0..2 {
1160 let (path, broadcast) = consumer.announced().await.expect("expected unannounce");
1161 assert!(broadcast.is_none(), "expected an unannounce");
1162 unannounced.push(path.as_str().to_string());
1163 }
1164 unannounced.sort();
1165 assert_eq!(unannounced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1166 }
1167
1168 }