1use std::{
59 collections::HashMap,
60 sync::{
61 Arc, Weak,
62 atomic::{AtomicU64, Ordering},
63 },
64 time::{Duration, SystemTime, UNIX_EPOCH},
65};
66
67use serde::Serialize;
68use web_async::{Lock, spawn};
69
70use crate::{AsPath, Broadcast, Origin, OriginProducer, Path, PathOwned, Track};
71
72#[derive(Default, Debug)]
74#[non_exhaustive]
75pub struct Counters {
76 pub broadcasts: AtomicU64,
77 pub broadcasts_closed: AtomicU64,
78 pub subscriptions: AtomicU64,
79 pub subscriptions_closed: AtomicU64,
80 pub bytes: AtomicU64,
81 pub frames: AtomicU64,
82 pub groups: AtomicU64,
83}
84
85impl Counters {
86 fn snapshot(&self) -> Snapshot {
87 Snapshot {
88 broadcasts: self.broadcasts.load(Ordering::Relaxed),
89 broadcasts_closed: self.broadcasts_closed.load(Ordering::Relaxed),
90 subscriptions: self.subscriptions.load(Ordering::Relaxed),
91 subscriptions_closed: self.subscriptions_closed.load(Ordering::Relaxed),
92 bytes: self.bytes.load(Ordering::Relaxed),
93 frames: self.frames.load(Ordering::Relaxed),
94 groups: self.groups.load(Ordering::Relaxed),
95 }
96 }
97
98 fn active(&self) -> bool {
99 self.subscriptions.load(Ordering::Relaxed) > self.subscriptions_closed.load(Ordering::Relaxed)
100 }
101}
102
103#[derive(Copy, Clone, Debug, PartialEq, Eq)]
107pub enum Tier {
108 External,
109 Internal,
110}
111
112impl Tier {
113 fn as_str(&self) -> &'static str {
114 match self {
115 Tier::External => "external",
116 Tier::Internal => "internal",
117 }
118 }
119}
120
121#[derive(Copy, Clone, Debug, PartialEq, Eq)]
122enum Role {
123 Publisher,
124 Subscriber,
125}
126
127impl Role {
128 fn as_str(&self) -> &'static str {
129 match self {
130 Role::Publisher => "publisher",
131 Role::Subscriber => "subscriber",
132 }
133 }
134}
135
136#[derive(Clone)]
139pub struct Stats {
140 inner: Arc<StatsInner>,
141}
142
143struct StatsInner {
144 prefix: PathOwned,
145 levels: u32,
146 node: Option<String>,
147 origin: OriginProducer,
148 entries: Lock<HashMap<PathOwned, Arc<Level>>>,
149}
150
151struct Level {
152 advertised: PathOwned,
153 external_publisher: Counters,
154 external_subscriber: Counters,
155 internal_publisher: Counters,
156 internal_subscriber: Counters,
157 task: Lock<Option<()>>,
158 origin: OriginProducer,
159 node: Option<String>,
160 level_key: PathOwned,
161}
162
163impl Level {
164 fn counters(&self, tier: Tier, role: Role) -> &Counters {
165 match (tier, role) {
166 (Tier::External, Role::Publisher) => &self.external_publisher,
167 (Tier::External, Role::Subscriber) => &self.external_subscriber,
168 (Tier::Internal, Role::Publisher) => &self.internal_publisher,
169 (Tier::Internal, Role::Subscriber) => &self.internal_subscriber,
170 }
171 }
172
173 fn any_active(&self) -> bool {
174 self.external_publisher.active()
175 || self.external_subscriber.active()
176 || self.internal_publisher.active()
177 || self.internal_subscriber.active()
178 }
179}
180
181impl Stats {
182 pub fn new(
199 prefix: impl Into<PathOwned>,
200 levels: u32,
201 node: impl Into<Option<String>>,
202 origin: OriginProducer,
203 ) -> Self {
204 Self {
205 inner: Arc::new(StatsInner {
206 prefix: prefix.into(),
207 levels,
208 node: node.into(),
209 origin,
210 entries: Lock::default(),
211 }),
212 }
213 }
214
215 pub fn disabled() -> Self {
219 Self {
223 inner: Arc::new(StatsInner {
224 prefix: PathOwned::default(),
225 levels: 0,
226 node: None,
227 origin: Origin::random().produce(),
228 entries: Lock::default(),
229 }),
230 }
231 }
232
233 pub fn prefix(&self) -> &Path<'static> {
235 &self.inner.prefix
236 }
237
238 pub fn tier(&self, tier: Tier) -> StatsHandle {
241 StatsHandle {
242 stats: self.clone(),
243 tier,
244 }
245 }
246
247 fn broadcast_levels(&self, path: impl AsPath) -> Arc<[Arc<Level>]> {
248 let path = path.as_path();
249 if path.has_prefix(&self.inner.prefix) {
252 return Arc::from([]);
253 }
254
255 let keys = level_keys(&path, self.inner.levels);
256 let mut entries = self.inner.entries.lock();
257 let arcs: Vec<Arc<Level>> = keys
258 .into_iter()
259 .map(|key| {
260 entries
261 .entry(key.clone())
262 .or_insert_with(|| {
263 let advertised = advertised_path(&self.inner.prefix, &key, self.inner.node.as_deref());
264 Arc::new(Level {
265 advertised,
266 external_publisher: Counters::default(),
267 external_subscriber: Counters::default(),
268 internal_publisher: Counters::default(),
269 internal_subscriber: Counters::default(),
270 task: Lock::new(None),
271 origin: self.inner.origin.clone(),
272 node: self.inner.node.clone(),
273 level_key: key,
274 })
275 })
276 .clone()
277 })
278 .collect();
279
280 arcs.into()
281 }
282}
283
284impl Default for Stats {
285 fn default() -> Self {
286 Self::disabled()
287 }
288}
289
290#[derive(Clone)]
293pub struct StatsHandle {
294 stats: Stats,
295 tier: Tier,
296}
297
298impl StatsHandle {
299 pub fn disabled() -> Self {
301 Stats::disabled().tier(Tier::External)
302 }
303
304 pub fn parent(&self) -> &Stats {
306 &self.stats
307 }
308
309 pub fn tier(&self) -> Tier {
311 self.tier
312 }
313
314 pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
321 BroadcastStats {
322 levels: self.stats.broadcast_levels(path),
323 tier: self.tier,
324 }
325 }
326}
327
328impl Default for StatsHandle {
329 fn default() -> Self {
330 Self::disabled()
331 }
332}
333
334#[derive(Clone)]
340pub struct BroadcastStats {
341 levels: Arc<[Arc<Level>]>,
342 tier: Tier,
343}
344
345impl BroadcastStats {
346 pub fn is_empty(&self) -> bool {
350 self.levels.is_empty()
351 }
352
353 pub fn publisher(&self) -> PublisherStats {
356 for level in self.levels.iter() {
357 level
358 .counters(self.tier, Role::Publisher)
359 .broadcasts
360 .fetch_add(1, Ordering::Relaxed);
361 }
362 PublisherStats {
363 levels: self.levels.clone(),
364 tier: self.tier,
365 }
366 }
367
368 pub fn subscriber(&self) -> SubscriberStats {
371 for level in self.levels.iter() {
372 level
373 .counters(self.tier, Role::Subscriber)
374 .broadcasts
375 .fetch_add(1, Ordering::Relaxed);
376 }
377 SubscriberStats {
378 levels: self.levels.clone(),
379 tier: self.tier,
380 }
381 }
382
383 pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
392 for level in self.levels.iter() {
393 level
394 .counters(self.tier, Role::Publisher)
395 .subscriptions
396 .fetch_add(1, Ordering::Relaxed);
397 ensure_task(level);
398 }
399 PublisherTrack {
400 levels: self.levels.clone(),
401 tier: self.tier,
402 }
403 }
404
405 pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
407 for level in self.levels.iter() {
408 level
409 .counters(self.tier, Role::Subscriber)
410 .subscriptions
411 .fetch_add(1, Ordering::Relaxed);
412 ensure_task(level);
413 }
414 SubscriberTrack {
415 levels: self.levels.clone(),
416 tier: self.tier,
417 }
418 }
419}
420
421#[must_use = "drop the guard to record the broadcast as closed"]
423pub struct PublisherStats {
424 levels: Arc<[Arc<Level>]>,
425 tier: Tier,
426}
427
428impl PublisherStats {
429 pub fn track(&self, name: &str) -> PublisherTrack {
433 BroadcastStats {
434 levels: self.levels.clone(),
435 tier: self.tier,
436 }
437 .publisher_track(name)
438 }
439}
440
441impl Drop for PublisherStats {
442 fn drop(&mut self) {
443 for level in self.levels.iter() {
444 level
445 .counters(self.tier, Role::Publisher)
446 .broadcasts_closed
447 .fetch_add(1, Ordering::Relaxed);
448 }
449 }
450}
451
452#[must_use = "drop the guard to record the broadcast as closed"]
454pub struct SubscriberStats {
455 levels: Arc<[Arc<Level>]>,
456 tier: Tier,
457}
458
459impl SubscriberStats {
460 pub fn track(&self, name: &str) -> SubscriberTrack {
462 BroadcastStats {
463 levels: self.levels.clone(),
464 tier: self.tier,
465 }
466 .subscriber_track(name)
467 }
468}
469
470impl Drop for SubscriberStats {
471 fn drop(&mut self) {
472 for level in self.levels.iter() {
473 level
474 .counters(self.tier, Role::Subscriber)
475 .broadcasts_closed
476 .fetch_add(1, Ordering::Relaxed);
477 }
478 }
479}
480
481#[must_use = "drop the guard to record the subscription as closed"]
483pub struct PublisherTrack {
484 levels: Arc<[Arc<Level>]>,
485 tier: Tier,
486}
487
488impl PublisherTrack {
489 pub fn frame(&self) {
491 for level in self.levels.iter() {
492 level
493 .counters(self.tier, Role::Publisher)
494 .frames
495 .fetch_add(1, Ordering::Relaxed);
496 }
497 }
498
499 pub fn bytes(&self, n: u64) {
501 for level in self.levels.iter() {
502 level
503 .counters(self.tier, Role::Publisher)
504 .bytes
505 .fetch_add(n, Ordering::Relaxed);
506 }
507 }
508
509 pub fn group(&self) {
511 for level in self.levels.iter() {
512 level
513 .counters(self.tier, Role::Publisher)
514 .groups
515 .fetch_add(1, Ordering::Relaxed);
516 }
517 }
518}
519
520impl Drop for PublisherTrack {
521 fn drop(&mut self) {
522 for level in self.levels.iter() {
523 level
524 .counters(self.tier, Role::Publisher)
525 .subscriptions_closed
526 .fetch_add(1, Ordering::Relaxed);
527 }
528 }
529}
530
531#[must_use = "drop the guard to record the subscription as closed"]
533pub struct SubscriberTrack {
534 levels: Arc<[Arc<Level>]>,
535 tier: Tier,
536}
537
538impl SubscriberTrack {
539 pub fn frame(&self) {
541 for level in self.levels.iter() {
542 level
543 .counters(self.tier, Role::Subscriber)
544 .frames
545 .fetch_add(1, Ordering::Relaxed);
546 }
547 }
548
549 pub fn bytes(&self, n: u64) {
551 for level in self.levels.iter() {
552 level
553 .counters(self.tier, Role::Subscriber)
554 .bytes
555 .fetch_add(n, Ordering::Relaxed);
556 }
557 }
558
559 pub fn group(&self) {
561 for level in self.levels.iter() {
562 level
563 .counters(self.tier, Role::Subscriber)
564 .groups
565 .fetch_add(1, Ordering::Relaxed);
566 }
567 }
568}
569
570impl Drop for SubscriberTrack {
571 fn drop(&mut self) {
572 for level in self.levels.iter() {
573 level
574 .counters(self.tier, Role::Subscriber)
575 .subscriptions_closed
576 .fetch_add(1, Ordering::Relaxed);
577 }
578 }
579}
580
581fn ensure_task(level: &Arc<Level>) {
582 let mut slot = level.task.lock();
583 if slot.is_none() {
584 *slot = Some(());
585 let weak = Arc::downgrade(level);
586 spawn(run_publisher(weak));
587 }
588}
589
590async fn run_publisher(weak: Weak<Level>) {
591 let setup = {
592 let Some(level) = weak.upgrade() else {
593 return;
594 };
595 let mut broadcast = Broadcast::new().produce();
596 let mut make = |name: &str| {
597 broadcast.create_track(Track {
598 name: name.into(),
599 priority: 0,
600 })
601 };
602 let ext_pub = match make("publisher.json") {
603 Ok(t) => t,
604 Err(err) => {
605 tracing::warn!(?err, "stats: failed to create publisher.json");
606 clear_task(&level);
607 return;
608 }
609 };
610 let ext_sub = match make("subscriber.json") {
611 Ok(t) => t,
612 Err(err) => {
613 tracing::warn!(?err, "stats: failed to create subscriber.json");
614 clear_task(&level);
615 return;
616 }
617 };
618 let int_pub = match make("internal/publisher.json") {
619 Ok(t) => t,
620 Err(err) => {
621 tracing::warn!(?err, "stats: failed to create internal/publisher.json");
622 clear_task(&level);
623 return;
624 }
625 };
626 let int_sub = match make("internal/subscriber.json") {
627 Ok(t) => t,
628 Err(err) => {
629 tracing::warn!(?err, "stats: failed to create internal/subscriber.json");
630 clear_task(&level);
631 return;
632 }
633 };
634 if !level.origin.publish_broadcast(&level.advertised, broadcast.consume()) {
635 tracing::warn!(level = %level.advertised, "stats: origin rejected stats broadcast");
636 clear_task(&level);
637 return;
638 }
639 (broadcast, ext_pub, ext_sub, int_pub, int_sub)
640 };
641 let (broadcast, mut ext_pub, mut ext_sub, mut int_pub, mut int_sub) = setup;
642
643 let mut last_ext_pub: Option<Snapshot> = None;
644 let mut last_ext_sub: Option<Snapshot> = None;
645 let mut last_int_pub: Option<Snapshot> = None;
646 let mut last_int_sub: Option<Snapshot> = None;
647
648 let mut tick = tokio::time::interval(Duration::from_secs(1));
649 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
650
651 loop {
652 tick.tick().await;
653
654 let Some(level) = weak.upgrade() else {
655 return;
656 };
657
658 if !level.any_active() {
659 let mut slot = level.task.lock();
663 if !level.any_active() {
664 *slot = None;
665 drop(slot);
666 drop(level);
667 drop(broadcast);
670 return;
671 }
672 }
673
674 maybe_write(&mut ext_pub, Tier::External, Role::Publisher, &level, &mut last_ext_pub);
675 maybe_write(
676 &mut ext_sub,
677 Tier::External,
678 Role::Subscriber,
679 &level,
680 &mut last_ext_sub,
681 );
682 maybe_write(&mut int_pub, Tier::Internal, Role::Publisher, &level, &mut last_int_pub);
683 maybe_write(
684 &mut int_sub,
685 Tier::Internal,
686 Role::Subscriber,
687 &level,
688 &mut last_int_sub,
689 );
690 }
691}
692
693fn maybe_write(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, last: &mut Option<Snapshot>) {
694 let snapshot = level.counters(tier, role).snapshot();
695 if last.as_ref() == Some(&snapshot) {
696 return;
697 }
698 write_snapshot(track, tier, role, level, snapshot);
699 *last = Some(snapshot);
700}
701
702fn clear_task(level: &Level) {
703 *level.task.lock() = None;
704}
705
706#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
707struct Snapshot {
708 broadcasts: u64,
709 broadcasts_closed: u64,
710 subscriptions: u64,
711 subscriptions_closed: u64,
712 bytes: u64,
713 frames: u64,
714 groups: u64,
715}
716
717#[derive(Debug, Serialize)]
718struct SnapshotFrame<'a> {
719 v: u32,
720 level: &'a str,
721 tier: &'a str,
722 role: &'a str,
723 #[serde(skip_serializing_if = "Option::is_none")]
724 node: Option<&'a str>,
725 ts_ms: u64,
726 #[serde(flatten)]
727 snapshot: Snapshot,
728}
729
730fn write_snapshot(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, snapshot: Snapshot) {
731 let frame = SnapshotFrame {
732 v: 1,
733 level: level.level_key.as_str(),
734 tier: tier.as_str(),
735 role: role.as_str(),
736 node: level.node.as_deref(),
737 ts_ms: now_ms(),
738 snapshot,
739 };
740
741 let buf = match serde_json::to_vec(&frame) {
742 Ok(buf) => buf,
743 Err(err) => {
744 tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to serialize snapshot");
745 return;
746 }
747 };
748
749 if let Err(err) = track.write_frame(buf) {
750 tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to write snapshot frame");
751 }
752}
753
754fn now_ms() -> u64 {
755 SystemTime::now()
756 .duration_since(UNIX_EPOCH)
757 .map(|d| d.as_millis() as u64)
758 .unwrap_or(0)
759}
760
761fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
769 if levels == 0 {
770 return Vec::new();
771 }
772 if broadcast.is_empty() {
773 return vec![PathOwned::default()];
774 }
775
776 let segs: Vec<&str> = broadcast.as_str().split('/').collect();
777 let max = (levels as usize).min(segs.len());
778 (0..=max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
779}
780
781fn advertised_path(prefix: &Path, level_key: &Path, node: Option<&str>) -> PathOwned {
782 let top = prefix.as_str();
785 let mut out = format!("{top}/prefix");
786 if !level_key.is_empty() {
787 out.push('/');
788 out.push_str(level_key.as_str());
789 }
790 if let Some(node) = node {
791 out.push('/');
792 out.push_str(node);
793 }
794 PathOwned::from(out)
795}
796
797#[cfg(test)]
798mod tests {
799 use std::sync::atomic::Ordering::Relaxed;
800
801 use crate::{Origin, Path};
802
803 use super::*;
804
805 #[test]
806 fn level_keys_basic() {
807 let key = |s: &str, n: u32| {
808 level_keys(&Path::new(s), n)
809 .into_iter()
810 .map(|p| p.as_str().to_string())
811 .collect::<Vec<_>>()
812 };
813
814 assert_eq!(key("demo/bbb", 1), vec!["", "demo"]);
818 assert_eq!(key("demo/bbb", 2), vec!["", "demo", "demo/bbb"]);
820 assert_eq!(key("demo/bbb", 3), vec!["", "demo", "demo/bbb"]);
823 assert_eq!(key("a/b/c/d", 3), vec!["", "a", "a/b", "a/b/c"]);
825 assert_eq!(key("demo", 2), vec!["", "demo"]);
827 assert!(key("demo/bbb", 0).is_empty());
829 }
830
831 #[test]
832 fn advertised_path_root_and_nested() {
833 let prefix = Path::new(".stats");
834 assert_eq!(
835 advertised_path(&prefix, &Path::new(""), Some("sjc")).as_str(),
836 ".stats/prefix/sjc"
837 );
838 assert_eq!(
839 advertised_path(&prefix, &Path::new("demo"), Some("sjc")).as_str(),
840 ".stats/prefix/demo/sjc"
841 );
842 assert_eq!(
843 advertised_path(&prefix, &Path::new("demo/foo"), Some("sjc")).as_str(),
844 ".stats/prefix/demo/foo/sjc"
845 );
846 }
847
848 #[test]
849 fn advertised_path_without_node() {
850 let prefix = Path::new(".stats");
851 assert_eq!(advertised_path(&prefix, &Path::new(""), None).as_str(), ".stats/prefix");
852 assert_eq!(
853 advertised_path(&prefix, &Path::new("demo"), None).as_str(),
854 ".stats/prefix/demo"
855 );
856 }
857
858 #[test]
859 fn advertised_path_honors_custom_prefix() {
860 let prefix = Path::new("metrics");
861 assert_eq!(
862 advertised_path(&prefix, &Path::new(""), Some("lon")).as_str(),
863 "metrics/prefix/lon"
864 );
865 assert_eq!(
866 advertised_path(&prefix, &Path::new("demo/room"), Some("lon")).as_str(),
867 "metrics/prefix/demo/room/lon"
868 );
869 }
870
871 #[tokio::test(start_paused = true)]
872 async fn external_publisher_bumps_external_publisher_counters() {
873 let origin = Origin::random().produce();
874 let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
875 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
876 let pub_role = bs.publisher();
877 let track = pub_role.track("video");
878 track.frame();
879 track.bytes(100);
880 track.group();
881 drop(track);
882 drop(pub_role);
883
884 let entries = stats.inner.entries.lock();
885 let root = entries.get(&PathOwned::from("")).expect("root level");
886 assert_eq!(root.external_publisher.frames.load(Relaxed), 1);
887 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
888 assert_eq!(root.external_publisher.groups.load(Relaxed), 1);
889 assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
890 assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
891 assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 1);
892 assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 1);
893 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
895 assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
896 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
897 }
898
899 #[tokio::test(start_paused = true)]
900 async fn external_subscriber_bumps_external_subscriber_counters() {
901 let origin = Origin::random().produce();
902 let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
903 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
904 let sub_role = bs.subscriber();
905 let track = sub_role.track("video");
906 track.frame();
907 track.bytes(50);
908
909 let entries = stats.inner.entries.lock();
910 let root = entries.get(&PathOwned::from("")).expect("root level");
911 assert_eq!(root.external_subscriber.frames.load(Relaxed), 1);
912 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 50);
913 assert_eq!(root.external_subscriber.broadcasts.load(Relaxed), 1);
914 assert_eq!(root.external_subscriber.subscriptions.load(Relaxed), 1);
915 assert_eq!(root.external_publisher.bytes.load(Relaxed), 0);
916 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
917 }
918
919 #[tokio::test(start_paused = true)]
920 async fn external_and_internal_tiers_are_independent() {
921 let origin = Origin::random().produce();
922 let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
923 let ext = stats.tier(Tier::External);
924 let int = stats.tier(Tier::Internal);
925
926 let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
927 ext_track.bytes(100);
928 let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
929 int_track.bytes(7);
930
931 let entries = stats.inner.entries.lock();
932 let root = entries.get(&PathOwned::from("")).expect("root level");
933 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
934 assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
935 assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
936 assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 7);
937 }
938
939 #[tokio::test(start_paused = true)]
940 async fn bumps_fanout_to_all_levels() {
941 let origin = Origin::random().produce();
942 let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
943 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
944 let p = bs.publisher();
945 let track = p.track("video");
946 track.bytes(100);
947
948 let entries = stats.inner.entries.lock();
949 let root = entries.get(&PathOwned::from("")).expect("root level");
950 let demo = entries.get(&PathOwned::from("demo")).expect("demo level");
951 assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
952 assert_eq!(demo.external_publisher.bytes.load(Relaxed), 100);
953 }
954
955 #[tokio::test(start_paused = true)]
956 async fn paths_under_prefix_are_no_op() {
957 let origin = Origin::random().produce();
960 let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
961 let bs = stats.tier(Tier::External).broadcast(".stats/prefix/sjc");
962 assert!(bs.is_empty());
963
964 let p = bs.publisher();
965 let track = p.track("video");
966 track.bytes(100);
967 track.frame();
968 track.group();
969 drop(track);
970 drop(p);
971
972 assert!(stats.inner.entries.lock().is_empty());
973 }
974
975 #[tokio::test(start_paused = true)]
976 async fn publisher_track_does_not_bump_broadcasts() {
977 let origin = Origin::random().produce();
980 let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
981 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
982 let track = bs.publisher_track("video");
983 track.bytes(10);
984 drop(track);
985
986 let entries = stats.inner.entries.lock();
987 let root = entries.get(&PathOwned::from("")).expect("root level");
988 assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 0);
989 assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 0);
990 assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
991 assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
992 assert_eq!(root.external_publisher.bytes.load(Relaxed), 10);
993 }
994
995 #[tokio::test(start_paused = true)]
996 async fn disabled_stats_are_noop() {
997 let stats = Stats::disabled();
999 let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1000 assert!(bs.is_empty());
1001 let p = bs.publisher();
1002 let track = p.track("video");
1003 track.bytes(100);
1004 drop(track);
1005 drop(p);
1006 assert!(stats.inner.entries.lock().is_empty());
1007 }
1008
1009 #[tokio::test(start_paused = true)]
1010 async fn task_spawns_on_first_subscribe_and_announces() {
1011 let origin = Origin::random().produce();
1012 let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin.clone());
1013 let mut consumer = origin.consume();
1014
1015 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1016 let p = bs.publisher();
1017 let _track = p.track("video");
1018
1019 tokio::time::advance(Duration::from_millis(1)).await;
1020 let mut seen = std::collections::HashSet::new();
1022 for _ in 0..2 {
1023 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1024 assert!(broadcast.is_some());
1025 seen.insert(path.as_str().to_string());
1026 }
1027 assert!(seen.contains(".stats/prefix/sjc"));
1028 assert!(seen.contains(".stats/prefix/foo/sjc"));
1029 }
1030
1031 #[tokio::test(start_paused = true)]
1032 async fn task_spawns_with_node_suffix() {
1033 let origin = Origin::random().produce();
1034 let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin.clone());
1035 let mut consumer = origin.consume();
1036
1037 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1038 let p = bs.publisher();
1039 let _track = p.track("video");
1040
1041 tokio::time::advance(Duration::from_millis(1)).await;
1042 let mut seen = std::collections::HashSet::new();
1045 for _ in 0..3 {
1046 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1047 assert!(broadcast.is_some());
1048 seen.insert(path.as_str().to_string());
1049 }
1050 assert!(seen.contains(".stats/prefix/sjc"));
1051 assert!(seen.contains(".stats/prefix/foo/sjc"));
1052 assert!(seen.contains(".stats/prefix/foo/bar/sjc"));
1053 }
1054
1055 #[tokio::test(start_paused = true)]
1056 async fn task_spawns_without_node_suffix() {
1057 let origin = Origin::random().produce();
1059 let stats = Stats::new(".stats", 1, None, origin.clone());
1060 let mut consumer = origin.consume();
1061
1062 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1063 let p = bs.publisher();
1064 let _track = p.track("video");
1065
1066 tokio::time::advance(Duration::from_millis(1)).await;
1067 let mut seen = std::collections::HashSet::new();
1068 for _ in 0..2 {
1069 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1070 assert!(broadcast.is_some());
1071 seen.insert(path.as_str().to_string());
1072 }
1073 assert!(seen.contains(".stats/prefix"));
1074 assert!(seen.contains(".stats/prefix/foo"));
1075 }
1076
1077 #[tokio::test(start_paused = true)]
1078 async fn task_exits_when_all_roles_idle() {
1079 let origin = Origin::random().produce();
1080 let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin.clone());
1084 let mut consumer = origin.consume();
1085
1086 let bs = stats.tier(Tier::External).broadcast("foo/bar");
1087 let p = bs.publisher();
1088 let track = p.track("video");
1089
1090 tokio::time::advance(Duration::from_millis(1)).await;
1091 let mut announced: Vec<String> = Vec::new();
1092 for _ in 0..2 {
1093 let (path, broadcast) = consumer.announced().await.expect("expected announce");
1094 assert!(broadcast.is_some(), "expected an active announce");
1095 announced.push(path.as_str().to_string());
1096 }
1097 announced.sort();
1098 assert_eq!(announced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1099
1100 drop(track);
1101 drop(p);
1102 drop(bs);
1103
1104 tokio::time::advance(Duration::from_secs(2)).await;
1105 let mut unannounced: Vec<String> = Vec::new();
1106 for _ in 0..2 {
1107 let (path, broadcast) = consumer.announced().await.expect("expected unannounce");
1108 assert!(broadcast.is_none(), "expected an unannounce");
1109 unannounced.push(path.as_str().to_string());
1110 }
1111 unannounced.sort();
1112 assert_eq!(unannounced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1113 }
1114
1115 }