Skip to main content

moq_net/
stats.rs

1//! Generic stats publishing for moq-net sessions.
2//!
3//! [`Stats`] aggregates per-broadcast counter bumps into per-prefix levels and
4//! publishes a `<top-prefix>/prefix/<level-path>/<node>` broadcast on a caller-provided
5//! [`OriginProducer`]. The `<node>` suffix is omitted when no node is configured.
6//! Each stats broadcast carries four tracks, one per `(tier, role)` pair:
7//!
8//! * `publisher.json`           : external (e.g. customer) egress
9//! * `subscriber.json`          : external ingress
10//! * `internal/publisher.json`  : internal (e.g. mTLS cluster peer) egress
11//! * `internal/subscriber.json` : internal ingress
12//!
13//! A caller hands each session a tier-scoped [`StatsHandle`] (built from the
14//! single shared [`Stats`] via [`Stats::tier`]) which determines which counter
15//! set its bumps land in. Multiple relays in the same cluster origin can
16//! coexist by giving each one a distinct `<node>` suffix on advertised paths.
17//!
18//! Each broadcast contributes to every prefix of its path (within the
19//! configured depth), so publishing `anon/bbb` with `levels = 2` produces:
20//!
21//! * `<top-prefix>/prefix/<node>`           (root aggregate)
22//! * `<top-prefix>/prefix/anon/<node>`      (per-first-segment aggregate)
23//! * `<top-prefix>/prefix/anon/bbb/<node>`  (per-broadcast)
24//!
25//! The fixed `prefix` segment between the top-level prefix and the
26//! aggregation level leaves room for sibling categories under the same prefix
27//! (e.g. `<top-prefix>/nodes/<node>` for host-level stats).
28//!
29//! # Disabled stats
30//!
31//! [`Stats::disabled`] (and the matching [`Default`] impl) returns a no-op
32//! aggregator. All counter bumps through it are silently dropped and no
33//! snapshot task is ever spawned, so call sites can hold a [`StatsHandle`]
34//! unconditionally instead of threading an `Option`.
35//!
36//! # Lifecycle
37//!
38//! No background work runs while no role × tier has an active subscription.
39//! The first `track()` call on a level spawns a per-level snapshot task that
40//! ticks every second. The task exits as soon as all four counter sets report
41//! zero active subscriptions, dropping its [`BroadcastProducer`] and
42//! unannouncing.
43//!
44//! # Idle frame skipping
45//!
46//! On each tick the task compares the current `Snapshot` against the last one
47//! it emitted for the same `(tier, role)` and writes a frame only when
48//! something changed. New subscribers still pick up a baseline immediately
49//! because track-latest semantics retain the most recent emitted frame.
50//!
51//! # Cycles
52//!
53//! Calling [`Stats::broadcast`] for a path under the configured top-level
54//! prefix returns an empty handle whose bumps no-op. This breaks the feedback
55//! loop where serving a `<top-prefix>/...` broadcast would itself generate
56//! more stats traffic.
57
58use std::{
59	collections::HashMap,
60	sync::{
61		Arc, Weak,
62		atomic::{AtomicU64, Ordering},
63	},
64	time::{Duration, SystemTime, UNIX_EPOCH},
65};
66
67use serde::Serialize;
68use web_async::{Lock, spawn};
69
70use crate::{AsPath, Broadcast, Origin, OriginProducer, Path, PathOwned, Track};
71
72/// Cumulative atomic counters for a single (tier, role) on a level.
73#[derive(Default, Debug)]
74#[non_exhaustive]
75pub struct Counters {
76	pub broadcasts: AtomicU64,
77	pub broadcasts_closed: AtomicU64,
78	pub subscriptions: AtomicU64,
79	pub subscriptions_closed: AtomicU64,
80	pub bytes: AtomicU64,
81	pub frames: AtomicU64,
82	pub groups: AtomicU64,
83}
84
85impl Counters {
86	fn snapshot(&self) -> Snapshot {
87		Snapshot {
88			broadcasts: self.broadcasts.load(Ordering::Relaxed),
89			broadcasts_closed: self.broadcasts_closed.load(Ordering::Relaxed),
90			subscriptions: self.subscriptions.load(Ordering::Relaxed),
91			subscriptions_closed: self.subscriptions_closed.load(Ordering::Relaxed),
92			bytes: self.bytes.load(Ordering::Relaxed),
93			frames: self.frames.load(Ordering::Relaxed),
94			groups: self.groups.load(Ordering::Relaxed),
95		}
96	}
97
98	fn active(&self) -> bool {
99		self.subscriptions.load(Ordering::Relaxed) > self.subscriptions_closed.load(Ordering::Relaxed)
100	}
101}
102
103/// Distinguishes traffic classes so a single [`Stats`] can record customer-facing
104/// and cluster-peer traffic separately. The four `(Tier, Role)` combinations are
105/// the four tracks published on each level's broadcast.
106#[derive(Copy, Clone, Debug, PartialEq, Eq)]
107pub enum Tier {
108	External,
109	Internal,
110}
111
112impl Tier {
113	fn as_str(&self) -> &'static str {
114		match self {
115			Tier::External => "external",
116			Tier::Internal => "internal",
117		}
118	}
119}
120
121#[derive(Copy, Clone, Debug, PartialEq, Eq)]
122enum Role {
123	Publisher,
124	Subscriber,
125}
126
127impl Role {
128	fn as_str(&self) -> &'static str {
129		match self {
130			Role::Publisher => "publisher",
131			Role::Subscriber => "subscriber",
132		}
133	}
134}
135
136/// Top-level stats aggregator. Cheap to clone (`Arc` inside). One instance per
137/// relay; sessions get tier-scoped handles via [`Stats::tier`].
138#[derive(Clone)]
139pub struct Stats {
140	inner: Arc<StatsInner>,
141}
142
143struct StatsInner {
144	prefix: PathOwned,
145	levels: u32,
146	node: Option<String>,
147	origin: OriginProducer,
148	entries: Lock<HashMap<PathOwned, Arc<Level>>>,
149}
150
151struct Level {
152	advertised: PathOwned,
153	external_publisher: Counters,
154	external_subscriber: Counters,
155	internal_publisher: Counters,
156	internal_subscriber: Counters,
157	task: Lock<Option<()>>,
158	origin: OriginProducer,
159	node: Option<String>,
160	level_key: PathOwned,
161}
162
163impl Level {
164	fn counters(&self, tier: Tier, role: Role) -> &Counters {
165		match (tier, role) {
166			(Tier::External, Role::Publisher) => &self.external_publisher,
167			(Tier::External, Role::Subscriber) => &self.external_subscriber,
168			(Tier::Internal, Role::Publisher) => &self.internal_publisher,
169			(Tier::Internal, Role::Subscriber) => &self.internal_subscriber,
170		}
171	}
172
173	fn any_active(&self) -> bool {
174		self.external_publisher.active()
175			|| self.external_subscriber.active()
176			|| self.internal_publisher.active()
177			|| self.internal_subscriber.active()
178	}
179}
180
181impl Stats {
182	/// Build a new stats aggregator.
183	///
184	/// * `prefix` is the top-level path under which stats are published, e.g. `.stats`.
185	///   The full advertised path is `<prefix>/prefix/<level-path>/<node>` (or
186	///   `<prefix>/prefix/<level-path>` when `node` is `None`).
187	/// * `levels` is the maximum segment depth stats are bucketed by, which caps the
188	///   number of aggregation buckets per broadcast. `0` disables stats entirely (no
189	///   buckets, including no root bucket). `1` produces the root bucket plus a
190	///   per-first-segment bucket. `2` adds a per-second-segment bucket, and so on.
191	///   A broadcast within the configured depth also gets its own dedicated bucket;
192	///   broadcasts deeper than `levels` are truncated.
193	/// * `node` disambiguates broadcasts published by different relays into a shared
194	///   cluster origin. Set this on every node in multi-relay deployments. `None`
195	///   omits the suffix, which is fine for single-relay deployments.
196	/// * `origin` is the [`OriginProducer`] that receives `publish_broadcast` calls
197	///   for each stats broadcast.
198	pub fn new(
199		prefix: impl Into<PathOwned>,
200		levels: u32,
201		node: impl Into<Option<String>>,
202		origin: OriginProducer,
203	) -> Self {
204		Self {
205			inner: Arc::new(StatsInner {
206				prefix: prefix.into(),
207				levels,
208				node: node.into(),
209				origin,
210				entries: Lock::default(),
211			}),
212		}
213	}
214
215	/// A no-op aggregator. Counter bumps are silently dropped and no snapshot
216	/// task is ever spawned. Use this when stats are disabled so call sites
217	/// can hold a [`Stats`] (or [`StatsHandle`]) unconditionally.
218	pub fn disabled() -> Self {
219		// Levels = 0 short-circuits broadcast_levels to an empty Arc, so every
220		// downstream operation is a no-op iteration. The origin is never
221		// touched because the snapshot task only spawns on the first track.
222		Self {
223			inner: Arc::new(StatsInner {
224				prefix: PathOwned::default(),
225				levels: 0,
226				node: None,
227				origin: Origin::random().produce(),
228				entries: Lock::default(),
229			}),
230		}
231	}
232
233	/// Returns the configured top-level prefix.
234	pub fn prefix(&self) -> &Path<'static> {
235		&self.inner.prefix
236	}
237
238	/// Returns a tier-scoped handle. Bumps through this handle land in the
239	/// tier's counters.
240	pub fn tier(&self, tier: Tier) -> StatsHandle {
241		StatsHandle {
242			stats: self.clone(),
243			tier,
244		}
245	}
246
247	fn broadcast_levels(&self, path: impl AsPath) -> Arc<[Arc<Level>]> {
248		let path = path.as_path();
249		// Skip our own stats broadcasts (and any sibling category under the same
250		// prefix) so serving a stats broadcast doesn't generate more stats.
251		if path.has_prefix(&self.inner.prefix) {
252			return Arc::from([]);
253		}
254
255		let keys = level_keys(&path, self.inner.levels);
256		let mut entries = self.inner.entries.lock();
257		let arcs: Vec<Arc<Level>> = keys
258			.into_iter()
259			.map(|key| {
260				entries
261					.entry(key.clone())
262					.or_insert_with(|| {
263						let advertised = advertised_path(&self.inner.prefix, &key, self.inner.node.as_deref());
264						Arc::new(Level {
265							advertised,
266							external_publisher: Counters::default(),
267							external_subscriber: Counters::default(),
268							internal_publisher: Counters::default(),
269							internal_subscriber: Counters::default(),
270							task: Lock::new(None),
271							origin: self.inner.origin.clone(),
272							node: self.inner.node.clone(),
273							level_key: key,
274						})
275					})
276					.clone()
277			})
278			.collect();
279
280		arcs.into()
281	}
282}
283
284impl Default for Stats {
285	fn default() -> Self {
286		Self::disabled()
287	}
288}
289
290/// Tier-scoped wrapper around [`Stats`]. What [`crate::Client::with_stats`] and
291/// [`crate::Server::with_stats`] accept. Cheap to clone.
292#[derive(Clone)]
293pub struct StatsHandle {
294	stats: Stats,
295	tier: Tier,
296}
297
298impl StatsHandle {
299	/// A no-op handle. See [`Stats::disabled`].
300	pub fn disabled() -> Self {
301		Stats::disabled().tier(Tier::External)
302	}
303
304	/// The aggregator this handle is tied to.
305	pub fn parent(&self) -> &Stats {
306		&self.stats
307	}
308
309	/// The tier this handle bumps into.
310	pub fn tier(&self) -> Tier {
311		self.tier
312	}
313
314	/// Returns a per-broadcast handle scoped to this tier. Cheap; level state is
315	/// created lazily and cached.
316	///
317	/// Paths under the aggregator's configured `prefix` return an empty handle
318	/// whose bumps are no-ops. This keeps stats traffic from feeding back into
319	/// the aggregator.
320	pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
321		BroadcastStats {
322			levels: self.stats.broadcast_levels(path),
323			tier: self.tier,
324		}
325	}
326}
327
328impl Default for StatsHandle {
329	fn default() -> Self {
330		Self::disabled()
331	}
332}
333
334/// A per-broadcast, tier-scoped handle. Cheap to clone.
335///
336/// Open a broadcast-lifetime guard with [`Self::publisher`] / [`Self::subscriber`],
337/// or skip straight to a track guard with [`Self::publisher_track`] /
338/// [`Self::subscriber_track`] when the broadcast's lifetime is tracked elsewhere.
339#[derive(Clone)]
340pub struct BroadcastStats {
341	levels: Arc<[Arc<Level>]>,
342	tier: Tier,
343}
344
345impl BroadcastStats {
346	/// True if this handle covers no levels (its path was under the aggregator's
347	/// own prefix, or stats are disabled). All bumps through an empty handle
348	/// are no-ops.
349	pub fn is_empty(&self) -> bool {
350		self.levels.is_empty()
351	}
352
353	/// Open a broadcast-lifetime guard for the publisher (egress) role.
354	/// Bumps `broadcasts` on construction and `broadcasts_closed` on drop.
355	pub fn publisher(&self) -> PublisherStats {
356		for level in self.levels.iter() {
357			level
358				.counters(self.tier, Role::Publisher)
359				.broadcasts
360				.fetch_add(1, Ordering::Relaxed);
361		}
362		PublisherStats {
363			levels: self.levels.clone(),
364			tier: self.tier,
365		}
366	}
367
368	/// Open a broadcast-lifetime guard for the subscriber (ingress) role.
369	/// Bumps `broadcasts` on construction and `broadcasts_closed` on drop.
370	pub fn subscriber(&self) -> SubscriberStats {
371		for level in self.levels.iter() {
372			level
373				.counters(self.tier, Role::Subscriber)
374				.broadcasts
375				.fetch_add(1, Ordering::Relaxed);
376		}
377		SubscriberStats {
378			levels: self.levels.clone(),
379			tier: self.tier,
380		}
381	}
382
383	/// Open a publisher-track guard without bumping the broadcast counters.
384	///
385	/// Use this when the broadcast is already counted by a [`PublisherStats`]
386	/// guard held elsewhere (e.g. by the announce loop), so the track guard
387	/// only contributes to subscription counters.
388	///
389	/// `_name` is currently unused; counters are per-level only. Reserved for
390	/// future per-track granularity.
391	pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
392		for level in self.levels.iter() {
393			level
394				.counters(self.tier, Role::Publisher)
395				.subscriptions
396				.fetch_add(1, Ordering::Relaxed);
397			ensure_task(level);
398		}
399		PublisherTrack {
400			levels: self.levels.clone(),
401			tier: self.tier,
402		}
403	}
404
405	/// Subscriber-side counterpart to [`Self::publisher_track`].
406	pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
407		for level in self.levels.iter() {
408			level
409				.counters(self.tier, Role::Subscriber)
410				.subscriptions
411				.fetch_add(1, Ordering::Relaxed);
412			ensure_task(level);
413		}
414		SubscriberTrack {
415			levels: self.levels.clone(),
416			tier: self.tier,
417		}
418	}
419}
420
421/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`].
422#[must_use = "drop the guard to record the broadcast as closed"]
423pub struct PublisherStats {
424	levels: Arc<[Arc<Level>]>,
425	tier: Tier,
426}
427
428impl PublisherStats {
429	/// Open a track-subscription guard. Bumps `subscriptions` on every level
430	/// and (on the 0->N transition in any role) spawns the level's snapshot
431	/// task. Drop bumps `subscriptions_closed`.
432	pub fn track(&self, name: &str) -> PublisherTrack {
433		BroadcastStats {
434			levels: self.levels.clone(),
435			tier: self.tier,
436		}
437		.publisher_track(name)
438	}
439}
440
441impl Drop for PublisherStats {
442	fn drop(&mut self) {
443		for level in self.levels.iter() {
444			level
445				.counters(self.tier, Role::Publisher)
446				.broadcasts_closed
447				.fetch_add(1, Ordering::Relaxed);
448		}
449	}
450}
451
452/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`].
453#[must_use = "drop the guard to record the broadcast as closed"]
454pub struct SubscriberStats {
455	levels: Arc<[Arc<Level>]>,
456	tier: Tier,
457}
458
459impl SubscriberStats {
460	/// Open a track-subscription guard. Mirrors [`PublisherStats::track`].
461	pub fn track(&self, name: &str) -> SubscriberTrack {
462		BroadcastStats {
463			levels: self.levels.clone(),
464			tier: self.tier,
465		}
466		.subscriber_track(name)
467	}
468}
469
470impl Drop for SubscriberStats {
471	fn drop(&mut self) {
472		for level in self.levels.iter() {
473			level
474				.counters(self.tier, Role::Subscriber)
475				.broadcasts_closed
476				.fetch_add(1, Ordering::Relaxed);
477		}
478	}
479}
480
481/// RAII subscription guard for the publisher role.
482#[must_use = "drop the guard to record the subscription as closed"]
483pub struct PublisherTrack {
484	levels: Arc<[Arc<Level>]>,
485	tier: Tier,
486}
487
488impl PublisherTrack {
489	/// Bumps `frames` once.
490	pub fn frame(&self) {
491		for level in self.levels.iter() {
492			level
493				.counters(self.tier, Role::Publisher)
494				.frames
495				.fetch_add(1, Ordering::Relaxed);
496		}
497	}
498
499	/// Bumps `bytes` by `n`.
500	pub fn bytes(&self, n: u64) {
501		for level in self.levels.iter() {
502			level
503				.counters(self.tier, Role::Publisher)
504				.bytes
505				.fetch_add(n, Ordering::Relaxed);
506		}
507	}
508
509	/// Bumps `groups` once.
510	pub fn group(&self) {
511		for level in self.levels.iter() {
512			level
513				.counters(self.tier, Role::Publisher)
514				.groups
515				.fetch_add(1, Ordering::Relaxed);
516		}
517	}
518}
519
520impl Drop for PublisherTrack {
521	fn drop(&mut self) {
522		for level in self.levels.iter() {
523			level
524				.counters(self.tier, Role::Publisher)
525				.subscriptions_closed
526				.fetch_add(1, Ordering::Relaxed);
527		}
528	}
529}
530
531/// RAII subscription guard for the subscriber role.
532#[must_use = "drop the guard to record the subscription as closed"]
533pub struct SubscriberTrack {
534	levels: Arc<[Arc<Level>]>,
535	tier: Tier,
536}
537
538impl SubscriberTrack {
539	/// Bumps `frames` once.
540	pub fn frame(&self) {
541		for level in self.levels.iter() {
542			level
543				.counters(self.tier, Role::Subscriber)
544				.frames
545				.fetch_add(1, Ordering::Relaxed);
546		}
547	}
548
549	/// Bumps `bytes` by `n`.
550	pub fn bytes(&self, n: u64) {
551		for level in self.levels.iter() {
552			level
553				.counters(self.tier, Role::Subscriber)
554				.bytes
555				.fetch_add(n, Ordering::Relaxed);
556		}
557	}
558
559	/// Bumps `groups` once.
560	pub fn group(&self) {
561		for level in self.levels.iter() {
562			level
563				.counters(self.tier, Role::Subscriber)
564				.groups
565				.fetch_add(1, Ordering::Relaxed);
566		}
567	}
568}
569
570impl Drop for SubscriberTrack {
571	fn drop(&mut self) {
572		for level in self.levels.iter() {
573			level
574				.counters(self.tier, Role::Subscriber)
575				.subscriptions_closed
576				.fetch_add(1, Ordering::Relaxed);
577		}
578	}
579}
580
581fn ensure_task(level: &Arc<Level>) {
582	let mut slot = level.task.lock();
583	if slot.is_none() {
584		*slot = Some(());
585		let weak = Arc::downgrade(level);
586		spawn(run_publisher(weak));
587	}
588}
589
590async fn run_publisher(weak: Weak<Level>) {
591	let setup = {
592		let Some(level) = weak.upgrade() else {
593			return;
594		};
595		let mut broadcast = Broadcast::new().produce();
596		let mut make = |name: &str| {
597			broadcast.create_track(Track {
598				name: name.into(),
599				priority: 0,
600			})
601		};
602		let ext_pub = match make("publisher.json") {
603			Ok(t) => t,
604			Err(err) => {
605				tracing::warn!(?err, "stats: failed to create publisher.json");
606				clear_task(&level);
607				return;
608			}
609		};
610		let ext_sub = match make("subscriber.json") {
611			Ok(t) => t,
612			Err(err) => {
613				tracing::warn!(?err, "stats: failed to create subscriber.json");
614				clear_task(&level);
615				return;
616			}
617		};
618		let int_pub = match make("internal/publisher.json") {
619			Ok(t) => t,
620			Err(err) => {
621				tracing::warn!(?err, "stats: failed to create internal/publisher.json");
622				clear_task(&level);
623				return;
624			}
625		};
626		let int_sub = match make("internal/subscriber.json") {
627			Ok(t) => t,
628			Err(err) => {
629				tracing::warn!(?err, "stats: failed to create internal/subscriber.json");
630				clear_task(&level);
631				return;
632			}
633		};
634		if !level.origin.publish_broadcast(&level.advertised, broadcast.consume()) {
635			tracing::warn!(level = %level.advertised, "stats: origin rejected stats broadcast");
636			clear_task(&level);
637			return;
638		}
639		(broadcast, ext_pub, ext_sub, int_pub, int_sub)
640	};
641	let (broadcast, mut ext_pub, mut ext_sub, mut int_pub, mut int_sub) = setup;
642
643	let mut last_ext_pub: Option<Snapshot> = None;
644	let mut last_ext_sub: Option<Snapshot> = None;
645	let mut last_int_pub: Option<Snapshot> = None;
646	let mut last_int_sub: Option<Snapshot> = None;
647
648	let mut tick = tokio::time::interval(Duration::from_secs(1));
649	tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
650
651	loop {
652		tick.tick().await;
653
654		let Some(level) = weak.upgrade() else {
655			return;
656		};
657
658		if !level.any_active() {
659			// Take the task slot under the lock and re-check. Any subscribe that
660			// raced with us either landed before we set None (so it sees Some
661			// and won't respawn) or after, in which case it spawns a fresh task.
662			let mut slot = level.task.lock();
663			if !level.any_active() {
664				*slot = None;
665				drop(slot);
666				drop(level);
667				// Drop `broadcast` to unannounce. Leftover producers/consumers
668				// follow the existing `closed()` watcher in OriginProducer.
669				drop(broadcast);
670				return;
671			}
672		}
673
674		maybe_write(&mut ext_pub, Tier::External, Role::Publisher, &level, &mut last_ext_pub);
675		maybe_write(
676			&mut ext_sub,
677			Tier::External,
678			Role::Subscriber,
679			&level,
680			&mut last_ext_sub,
681		);
682		maybe_write(&mut int_pub, Tier::Internal, Role::Publisher, &level, &mut last_int_pub);
683		maybe_write(
684			&mut int_sub,
685			Tier::Internal,
686			Role::Subscriber,
687			&level,
688			&mut last_int_sub,
689		);
690	}
691}
692
693fn maybe_write(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, last: &mut Option<Snapshot>) {
694	let snapshot = level.counters(tier, role).snapshot();
695	if last.as_ref() == Some(&snapshot) {
696		return;
697	}
698	write_snapshot(track, tier, role, level, snapshot);
699	*last = Some(snapshot);
700}
701
702fn clear_task(level: &Level) {
703	*level.task.lock() = None;
704}
705
706#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
707struct Snapshot {
708	broadcasts: u64,
709	broadcasts_closed: u64,
710	subscriptions: u64,
711	subscriptions_closed: u64,
712	bytes: u64,
713	frames: u64,
714	groups: u64,
715}
716
717#[derive(Debug, Serialize)]
718struct SnapshotFrame<'a> {
719	v: u32,
720	level: &'a str,
721	tier: &'a str,
722	role: &'a str,
723	#[serde(skip_serializing_if = "Option::is_none")]
724	node: Option<&'a str>,
725	ts_ms: u64,
726	#[serde(flatten)]
727	snapshot: Snapshot,
728}
729
730fn write_snapshot(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, snapshot: Snapshot) {
731	let frame = SnapshotFrame {
732		v: 1,
733		level: level.level_key.as_str(),
734		tier: tier.as_str(),
735		role: role.as_str(),
736		node: level.node.as_deref(),
737		ts_ms: now_ms(),
738		snapshot,
739	};
740
741	let buf = match serde_json::to_vec(&frame) {
742		Ok(buf) => buf,
743		Err(err) => {
744			tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to serialize snapshot");
745			return;
746		}
747	};
748
749	if let Err(err) = track.write_frame(buf) {
750		tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to write snapshot frame");
751	}
752}
753
754fn now_ms() -> u64 {
755	SystemTime::now()
756		.duration_since(UNIX_EPOCH)
757		.map(|d| d.as_millis() as u64)
758		.unwrap_or(0)
759}
760
761/// Compute the level prefix keys this broadcast contributes to.
762///
763/// Produces every prefix of the broadcast path from 0 segments (root) up to
764/// `min(levels, segments)` segments, inclusive, so a broadcast within the
765/// configured depth budget gets a dedicated per-broadcast bucket in addition
766/// to the aggregating prefixes. Broadcasts deeper than `levels` are truncated
767/// (no per-broadcast bucket). `levels == 0` produces no buckets at all.
768fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
769	if levels == 0 {
770		return Vec::new();
771	}
772	if broadcast.is_empty() {
773		return vec![PathOwned::default()];
774	}
775
776	let segs: Vec<&str> = broadcast.as_str().split('/').collect();
777	let max = (levels as usize).min(segs.len());
778	(0..=max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
779}
780
781fn advertised_path(prefix: &Path, level_key: &Path, node: Option<&str>) -> PathOwned {
782	// The fixed `prefix` category leaves room for sibling categories (e.g.
783	// `<top-prefix>/nodes/<node>` for host-level stats) under the same prefix.
784	let top = prefix.as_str();
785	let mut out = format!("{top}/prefix");
786	if !level_key.is_empty() {
787		out.push('/');
788		out.push_str(level_key.as_str());
789	}
790	if let Some(node) = node {
791		out.push('/');
792		out.push_str(node);
793	}
794	PathOwned::from(out)
795}
796
797#[cfg(test)]
798mod tests {
799	use std::sync::atomic::Ordering::Relaxed;
800
801	use crate::{Origin, Path};
802
803	use super::*;
804
805	#[test]
806	fn level_keys_basic() {
807		let key = |s: &str, n: u32| {
808			level_keys(&Path::new(s), n)
809				.into_iter()
810				.map(|p| p.as_str().to_string())
811				.collect::<Vec<_>>()
812		};
813
814		// levels=1 covers root + the first segment; for "demo/bbb" that's the
815		// "demo" aggregating prefix (no per-broadcast bucket since we'd need
816		// levels=2 to reach the broadcast itself).
817		assert_eq!(key("demo/bbb", 1), vec!["", "demo"]);
818		// levels=2 reaches the broadcast itself, so we get root + prefix + own.
819		assert_eq!(key("demo/bbb", 2), vec!["", "demo", "demo/bbb"]);
820		// Capped: broadcast is 2 segments, levels=3 still tops out at the
821		// broadcast's own path.
822		assert_eq!(key("demo/bbb", 3), vec!["", "demo", "demo/bbb"]);
823		// Deeper broadcast, levels=3 stops one short of the broadcast itself.
824		assert_eq!(key("a/b/c/d", 3), vec!["", "a", "a/b", "a/b/c"]);
825		// 1-segment broadcast, levels=2 reaches the broadcast.
826		assert_eq!(key("demo", 2), vec!["", "demo"]);
827		// levels=0 yields no buckets at all.
828		assert!(key("demo/bbb", 0).is_empty());
829	}
830
831	#[test]
832	fn advertised_path_root_and_nested() {
833		let prefix = Path::new(".stats");
834		assert_eq!(
835			advertised_path(&prefix, &Path::new(""), Some("sjc")).as_str(),
836			".stats/prefix/sjc"
837		);
838		assert_eq!(
839			advertised_path(&prefix, &Path::new("demo"), Some("sjc")).as_str(),
840			".stats/prefix/demo/sjc"
841		);
842		assert_eq!(
843			advertised_path(&prefix, &Path::new("demo/foo"), Some("sjc")).as_str(),
844			".stats/prefix/demo/foo/sjc"
845		);
846	}
847
848	#[test]
849	fn advertised_path_without_node() {
850		let prefix = Path::new(".stats");
851		assert_eq!(advertised_path(&prefix, &Path::new(""), None).as_str(), ".stats/prefix");
852		assert_eq!(
853			advertised_path(&prefix, &Path::new("demo"), None).as_str(),
854			".stats/prefix/demo"
855		);
856	}
857
858	#[test]
859	fn advertised_path_honors_custom_prefix() {
860		let prefix = Path::new("metrics");
861		assert_eq!(
862			advertised_path(&prefix, &Path::new(""), Some("lon")).as_str(),
863			"metrics/prefix/lon"
864		);
865		assert_eq!(
866			advertised_path(&prefix, &Path::new("demo/room"), Some("lon")).as_str(),
867			"metrics/prefix/demo/room/lon"
868		);
869	}
870
871	#[tokio::test(start_paused = true)]
872	async fn external_publisher_bumps_external_publisher_counters() {
873		let origin = Origin::random().produce();
874		let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
875		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
876		let pub_role = bs.publisher();
877		let track = pub_role.track("video");
878		track.frame();
879		track.bytes(100);
880		track.group();
881		drop(track);
882		drop(pub_role);
883
884		let entries = stats.inner.entries.lock();
885		let root = entries.get(&PathOwned::from("")).expect("root level");
886		assert_eq!(root.external_publisher.frames.load(Relaxed), 1);
887		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
888		assert_eq!(root.external_publisher.groups.load(Relaxed), 1);
889		assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
890		assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
891		assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 1);
892		assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 1);
893		// Other tier/role combos must remain untouched.
894		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
895		assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
896		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
897	}
898
899	#[tokio::test(start_paused = true)]
900	async fn external_subscriber_bumps_external_subscriber_counters() {
901		let origin = Origin::random().produce();
902		let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
903		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
904		let sub_role = bs.subscriber();
905		let track = sub_role.track("video");
906		track.frame();
907		track.bytes(50);
908
909		let entries = stats.inner.entries.lock();
910		let root = entries.get(&PathOwned::from("")).expect("root level");
911		assert_eq!(root.external_subscriber.frames.load(Relaxed), 1);
912		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 50);
913		assert_eq!(root.external_subscriber.broadcasts.load(Relaxed), 1);
914		assert_eq!(root.external_subscriber.subscriptions.load(Relaxed), 1);
915		assert_eq!(root.external_publisher.bytes.load(Relaxed), 0);
916		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
917	}
918
919	#[tokio::test(start_paused = true)]
920	async fn external_and_internal_tiers_are_independent() {
921		let origin = Origin::random().produce();
922		let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
923		let ext = stats.tier(Tier::External);
924		let int = stats.tier(Tier::Internal);
925
926		let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
927		ext_track.bytes(100);
928		let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
929		int_track.bytes(7);
930
931		let entries = stats.inner.entries.lock();
932		let root = entries.get(&PathOwned::from("")).expect("root level");
933		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
934		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
935		assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
936		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 7);
937	}
938
939	#[tokio::test(start_paused = true)]
940	async fn bumps_fanout_to_all_levels() {
941		let origin = Origin::random().produce();
942		let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
943		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
944		let p = bs.publisher();
945		let track = p.track("video");
946		track.bytes(100);
947
948		let entries = stats.inner.entries.lock();
949		let root = entries.get(&PathOwned::from("")).expect("root level");
950		let demo = entries.get(&PathOwned::from("demo")).expect("demo level");
951		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
952		assert_eq!(demo.external_publisher.bytes.load(Relaxed), 100);
953	}
954
955	#[tokio::test(start_paused = true)]
956	async fn paths_under_prefix_are_no_op() {
957		// Our own stats broadcasts (and any sibling category under the same
958		// prefix) must not feed back into the aggregator.
959		let origin = Origin::random().produce();
960		let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin);
961		let bs = stats.tier(Tier::External).broadcast(".stats/prefix/sjc");
962		assert!(bs.is_empty());
963
964		let p = bs.publisher();
965		let track = p.track("video");
966		track.bytes(100);
967		track.frame();
968		track.group();
969		drop(track);
970		drop(p);
971
972		assert!(stats.inner.entries.lock().is_empty());
973	}
974
975	#[tokio::test(start_paused = true)]
976	async fn publisher_track_does_not_bump_broadcasts() {
977		// Subscription-side track creation should not record a broadcast: the
978		// broadcast lifetime is tracked separately by the announce loop.
979		let origin = Origin::random().produce();
980		let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin);
981		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
982		let track = bs.publisher_track("video");
983		track.bytes(10);
984		drop(track);
985
986		let entries = stats.inner.entries.lock();
987		let root = entries.get(&PathOwned::from("")).expect("root level");
988		assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 0);
989		assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 0);
990		assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
991		assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
992		assert_eq!(root.external_publisher.bytes.load(Relaxed), 10);
993	}
994
995	#[tokio::test(start_paused = true)]
996	async fn disabled_stats_are_noop() {
997		// A disabled aggregator must not allocate level state or spawn tasks.
998		let stats = Stats::disabled();
999		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1000		assert!(bs.is_empty());
1001		let p = bs.publisher();
1002		let track = p.track("video");
1003		track.bytes(100);
1004		drop(track);
1005		drop(p);
1006		assert!(stats.inner.entries.lock().is_empty());
1007	}
1008
1009	#[tokio::test(start_paused = true)]
1010	async fn task_spawns_on_first_subscribe_and_announces() {
1011		let origin = Origin::random().produce();
1012		let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin.clone());
1013		let mut consumer = origin.consume();
1014
1015		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1016		let p = bs.publisher();
1017		let _track = p.track("video");
1018
1019		tokio::time::advance(Duration::from_millis(1)).await;
1020		// levels=1 + broadcast "foo/bar" → buckets ["", "foo"]: root + per-first-segment.
1021		let mut seen = std::collections::HashSet::new();
1022		for _ in 0..2 {
1023			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1024			assert!(broadcast.is_some());
1025			seen.insert(path.as_str().to_string());
1026		}
1027		assert!(seen.contains(".stats/prefix/sjc"));
1028		assert!(seen.contains(".stats/prefix/foo/sjc"));
1029	}
1030
1031	#[tokio::test(start_paused = true)]
1032	async fn task_spawns_with_node_suffix() {
1033		let origin = Origin::random().produce();
1034		let stats = Stats::new(".stats", 2, Some("sjc".to_string()), origin.clone());
1035		let mut consumer = origin.consume();
1036
1037		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1038		let p = bs.publisher();
1039		let _track = p.track("video");
1040
1041		tokio::time::advance(Duration::from_millis(1)).await;
1042		// levels=2 + broadcast "foo/bar" → buckets ["", "foo", "foo/bar"], each
1043		// suffixed with `/sjc`.
1044		let mut seen = std::collections::HashSet::new();
1045		for _ in 0..3 {
1046			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1047			assert!(broadcast.is_some());
1048			seen.insert(path.as_str().to_string());
1049		}
1050		assert!(seen.contains(".stats/prefix/sjc"));
1051		assert!(seen.contains(".stats/prefix/foo/sjc"));
1052		assert!(seen.contains(".stats/prefix/foo/bar/sjc"));
1053	}
1054
1055	#[tokio::test(start_paused = true)]
1056	async fn task_spawns_without_node_suffix() {
1057		// node=None: paths should omit the trailing /<node> segment.
1058		let origin = Origin::random().produce();
1059		let stats = Stats::new(".stats", 1, None, origin.clone());
1060		let mut consumer = origin.consume();
1061
1062		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1063		let p = bs.publisher();
1064		let _track = p.track("video");
1065
1066		tokio::time::advance(Duration::from_millis(1)).await;
1067		let mut seen = std::collections::HashSet::new();
1068		for _ in 0..2 {
1069			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1070			assert!(broadcast.is_some());
1071			seen.insert(path.as_str().to_string());
1072		}
1073		assert!(seen.contains(".stats/prefix"));
1074		assert!(seen.contains(".stats/prefix/foo"));
1075	}
1076
1077	#[tokio::test(start_paused = true)]
1078	async fn task_exits_when_all_roles_idle() {
1079		let origin = Origin::random().produce();
1080		// levels=1 + broadcast "foo/bar" → buckets ["", "foo"] (root prefix plus
1081		// the first-segment prefix; the broadcast's own path isn't reachable at
1082		// this depth, so we get exactly two stats announces).
1083		let stats = Stats::new(".stats", 1, Some("sjc".to_string()), origin.clone());
1084		let mut consumer = origin.consume();
1085
1086		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1087		let p = bs.publisher();
1088		let track = p.track("video");
1089
1090		tokio::time::advance(Duration::from_millis(1)).await;
1091		let mut announced: Vec<String> = Vec::new();
1092		for _ in 0..2 {
1093			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1094			assert!(broadcast.is_some(), "expected an active announce");
1095			announced.push(path.as_str().to_string());
1096		}
1097		announced.sort();
1098		assert_eq!(announced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1099
1100		drop(track);
1101		drop(p);
1102		drop(bs);
1103
1104		tokio::time::advance(Duration::from_secs(2)).await;
1105		let mut unannounced: Vec<String> = Vec::new();
1106		for _ in 0..2 {
1107			let (path, broadcast) = consumer.announced().await.expect("expected unannounce");
1108			assert!(broadcast.is_none(), "expected an unannounce");
1109			unannounced.push(path.as_str().to_string());
1110		}
1111		unannounced.sort();
1112		assert_eq!(unannounced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1113	}
1114
1115	// Idle-skip behavior (the snapshot task suppresses a write when the
1116	// current Snapshot equals the last emitted one) is covered end-to-end in
1117	// local relay verification; driving the broadcast/track plumbing in a unit
1118	// test is awkward enough that the skip logic itself (`if last == new
1119	// { return; }` in `maybe_write`) is more clearly exercised by inspection.
1120}