Skip to main content

moq_net/
stats.rs

1//! Generic stats publishing for moq-net sessions.
2//!
3//! [`Stats`] aggregates per-broadcast counter bumps into per-prefix levels and
4//! publishes a `<top-prefix>/prefix/<level-path>/<node>` broadcast on a caller-provided
5//! [`OriginProducer`]. The `<node>` suffix is omitted when no node is configured.
6//! Each stats broadcast carries four tracks, one per `(tier, role)` pair:
7//!
8//! * `publisher.json`           : external (e.g. customer) egress
9//! * `subscriber.json`          : external ingress
10//! * `internal/publisher.json`  : internal (e.g. mTLS cluster peer) egress
11//! * `internal/subscriber.json` : internal ingress
12//!
13//! A caller hands each session a tier-scoped [`StatsHandle`] (built from the
14//! single shared [`Stats`] via [`Stats::tier`]) which determines which counter
15//! set its bumps land in. Multiple relays in the same cluster origin can
16//! coexist by giving each one a distinct `<node>` suffix on advertised paths.
17//! The suffix itself may be multi-segment (e.g. `sjc/1`, `sjc/2`) so a region
18//! with multiple hosts can nest under a shared region key without colliding.
19//!
20//! Each broadcast contributes to every prefix of its path (within the
21//! configured depth), so publishing `anon/bbb` with `levels = 2` produces:
22//!
23//! * `<top-prefix>/prefix/<node>`           (root aggregate)
24//! * `<top-prefix>/prefix/anon/<node>`      (per-first-segment aggregate)
25//! * `<top-prefix>/prefix/anon/bbb/<node>`  (per-broadcast)
26//!
27//! The fixed `prefix` segment between the top-level prefix and the
28//! aggregation level leaves room for sibling categories under the same prefix
29//! (e.g. `<top-prefix>/nodes/<node>` for host-level stats).
30//!
31//! # Disabled stats
32//!
33//! [`Stats::disabled`] (and the matching [`Default`] impl) returns a no-op
34//! aggregator. All counter bumps through it are silently dropped and no
35//! snapshot task is ever spawned, so call sites can hold a [`StatsHandle`]
36//! unconditionally instead of threading an `Option`.
37//!
38//! # Lifecycle
39//!
40//! No background work runs while no role × tier has an active subscription.
41//! The first `track()` call on a level spawns a per-level snapshot task that
42//! ticks every second. The task exits as soon as all four counter sets report
43//! zero active subscriptions, dropping its [`BroadcastProducer`] and
44//! unannouncing.
45//!
46//! # Idle frame skipping
47//!
48//! On each tick the task compares the current `Snapshot` against the last one
49//! it emitted for the same `(tier, role)` and writes a frame only when
50//! something changed. New subscribers still pick up a baseline immediately
51//! because track-latest semantics retain the most recent emitted frame.
52//!
53//! # Cycles
54//!
55//! Calling [`Stats::broadcast`] for a path under the configured top-level
56//! prefix returns an empty handle whose bumps no-op. This breaks the feedback
57//! loop where serving a `<top-prefix>/...` broadcast would itself generate
58//! more stats traffic.
59
60use std::{
61	collections::HashMap,
62	sync::{
63		Arc, Weak,
64		atomic::{AtomicU64, Ordering},
65	},
66	time::{Duration, SystemTime, UNIX_EPOCH},
67};
68
69use serde::Serialize;
70use web_async::{Lock, spawn};
71
72use crate::{AsPath, Broadcast, Origin, OriginProducer, Path, PathOwned, Track};
73
74/// Cumulative atomic counters for a single (tier, role) on a level.
75#[derive(Default, Debug)]
76#[non_exhaustive]
77pub struct Counters {
78	pub broadcasts: AtomicU64,
79	pub broadcasts_closed: AtomicU64,
80	pub subscriptions: AtomicU64,
81	pub subscriptions_closed: AtomicU64,
82	pub bytes: AtomicU64,
83	pub frames: AtomicU64,
84	pub groups: AtomicU64,
85}
86
87impl Counters {
88	fn snapshot(&self) -> Snapshot {
89		Snapshot {
90			broadcasts: self.broadcasts.load(Ordering::Relaxed),
91			broadcasts_closed: self.broadcasts_closed.load(Ordering::Relaxed),
92			subscriptions: self.subscriptions.load(Ordering::Relaxed),
93			subscriptions_closed: self.subscriptions_closed.load(Ordering::Relaxed),
94			bytes: self.bytes.load(Ordering::Relaxed),
95			frames: self.frames.load(Ordering::Relaxed),
96			groups: self.groups.load(Ordering::Relaxed),
97		}
98	}
99
100	fn active(&self) -> bool {
101		self.subscriptions.load(Ordering::Relaxed) > self.subscriptions_closed.load(Ordering::Relaxed)
102	}
103}
104
105/// Distinguishes traffic classes so a single [`Stats`] can record customer-facing
106/// and cluster-peer traffic separately. The four `(Tier, Role)` combinations are
107/// the four tracks published on each level's broadcast.
108#[derive(Copy, Clone, Debug, PartialEq, Eq)]
109pub enum Tier {
110	External,
111	Internal,
112}
113
114impl Tier {
115	fn as_str(&self) -> &'static str {
116		match self {
117			Tier::External => "external",
118			Tier::Internal => "internal",
119		}
120	}
121}
122
123#[derive(Copy, Clone, Debug, PartialEq, Eq)]
124enum Role {
125	Publisher,
126	Subscriber,
127}
128
129impl Role {
130	fn as_str(&self) -> &'static str {
131		match self {
132			Role::Publisher => "publisher",
133			Role::Subscriber => "subscriber",
134		}
135	}
136}
137
138/// Top-level stats aggregator. Cheap to clone (`Arc` inside). One instance per
139/// relay; sessions get tier-scoped handles via [`Stats::tier`].
140#[derive(Clone)]
141pub struct Stats {
142	inner: Arc<StatsInner>,
143}
144
145struct StatsInner {
146	prefix: PathOwned,
147	levels: u32,
148	node: Option<PathOwned>,
149	origin: OriginProducer,
150	entries: Lock<HashMap<PathOwned, Arc<Level>>>,
151}
152
153struct Level {
154	advertised: PathOwned,
155	external_publisher: Counters,
156	external_subscriber: Counters,
157	internal_publisher: Counters,
158	internal_subscriber: Counters,
159	task: Lock<Option<()>>,
160	origin: OriginProducer,
161	node: Option<PathOwned>,
162	level_key: PathOwned,
163}
164
165impl Level {
166	fn counters(&self, tier: Tier, role: Role) -> &Counters {
167		match (tier, role) {
168			(Tier::External, Role::Publisher) => &self.external_publisher,
169			(Tier::External, Role::Subscriber) => &self.external_subscriber,
170			(Tier::Internal, Role::Publisher) => &self.internal_publisher,
171			(Tier::Internal, Role::Subscriber) => &self.internal_subscriber,
172		}
173	}
174
175	fn any_active(&self) -> bool {
176		self.external_publisher.active()
177			|| self.external_subscriber.active()
178			|| self.internal_publisher.active()
179			|| self.internal_subscriber.active()
180	}
181}
182
183impl Stats {
184	/// Build a new stats aggregator.
185	///
186	/// * `prefix` is the top-level path under which stats are published, e.g. `.stats`.
187	///   The full advertised path is `<prefix>/prefix/<level-path>/<node>` (or
188	///   `<prefix>/prefix/<level-path>` when `node` is `None`).
189	/// * `levels` is the maximum segment depth stats are bucketed by, which caps the
190	///   number of aggregation buckets per broadcast. `0` disables stats entirely (no
191	///   buckets, including no root bucket). `1` produces the root bucket plus a
192	///   per-first-segment bucket. `2` adds a per-second-segment bucket, and so on.
193	///   A broadcast within the configured depth also gets its own dedicated bucket;
194	///   broadcasts deeper than `levels` are truncated.
195	/// * `node` disambiguates broadcasts published by different relays into a shared
196	///   cluster origin. Set this on every node in multi-relay deployments. The
197	///   value may be multi-segment (e.g. `sjc/1`, `sjc/2`) so a region with
198	///   multiple hosts can nest under a shared region key. `None` (or an empty
199	///   path after normalization) omits the suffix, which is fine for
200	///   single-relay deployments.
201	/// * `origin` is the [`OriginProducer`] that receives `publish_broadcast` calls
202	///   for each stats broadcast.
203	pub fn new(
204		prefix: impl Into<PathOwned>,
205		levels: u32,
206		node: impl Into<Option<PathOwned>>,
207		origin: OriginProducer,
208	) -> Self {
209		// An empty path after normalization is indistinguishable from "no node
210		// set"; collapse it so downstream code only sees a single representation.
211		let node = node.into().filter(|p| !p.is_empty());
212		Self {
213			inner: Arc::new(StatsInner {
214				prefix: prefix.into(),
215				levels,
216				node,
217				origin,
218				entries: Lock::default(),
219			}),
220		}
221	}
222
223	/// A no-op aggregator. Counter bumps are silently dropped and no snapshot
224	/// task is ever spawned. Use this when stats are disabled so call sites
225	/// can hold a [`Stats`] (or [`StatsHandle`]) unconditionally.
226	pub fn disabled() -> Self {
227		// Levels = 0 short-circuits broadcast_levels to an empty Arc, so every
228		// downstream operation is a no-op iteration. The origin is never
229		// touched because the snapshot task only spawns on the first track.
230		Self {
231			inner: Arc::new(StatsInner {
232				prefix: PathOwned::default(),
233				levels: 0,
234				node: None,
235				origin: Origin::random().produce(),
236				entries: Lock::default(),
237			}),
238		}
239	}
240
241	/// Returns the configured top-level prefix.
242	pub fn prefix(&self) -> &Path<'static> {
243		&self.inner.prefix
244	}
245
246	/// Returns a tier-scoped handle. Bumps through this handle land in the
247	/// tier's counters.
248	pub fn tier(&self, tier: Tier) -> StatsHandle {
249		StatsHandle {
250			stats: self.clone(),
251			tier,
252		}
253	}
254
255	fn broadcast_levels(&self, path: impl AsPath) -> Arc<[Arc<Level>]> {
256		let path = path.as_path();
257		// Skip our own stats broadcasts (and any sibling category under the same
258		// prefix) so serving a stats broadcast doesn't generate more stats.
259		if path.has_prefix(&self.inner.prefix) {
260			return Arc::from([]);
261		}
262
263		let keys = level_keys(&path, self.inner.levels);
264		let mut entries = self.inner.entries.lock();
265		let arcs: Vec<Arc<Level>> = keys
266			.into_iter()
267			.map(|key| {
268				entries
269					.entry(key.clone())
270					.or_insert_with(|| {
271						let advertised = advertised_path(&self.inner.prefix, &key, self.inner.node.as_ref());
272						Arc::new(Level {
273							advertised,
274							external_publisher: Counters::default(),
275							external_subscriber: Counters::default(),
276							internal_publisher: Counters::default(),
277							internal_subscriber: Counters::default(),
278							task: Lock::new(None),
279							origin: self.inner.origin.clone(),
280							node: self.inner.node.clone(),
281							level_key: key,
282						})
283					})
284					.clone()
285			})
286			.collect();
287
288		arcs.into()
289	}
290}
291
292impl Default for Stats {
293	fn default() -> Self {
294		Self::disabled()
295	}
296}
297
298/// Tier-scoped wrapper around [`Stats`]. What [`crate::Client::with_stats`] and
299/// [`crate::Server::with_stats`] accept. Cheap to clone.
300#[derive(Clone)]
301pub struct StatsHandle {
302	stats: Stats,
303	tier: Tier,
304}
305
306impl StatsHandle {
307	/// A no-op handle. See [`Stats::disabled`].
308	pub fn disabled() -> Self {
309		Stats::disabled().tier(Tier::External)
310	}
311
312	/// The aggregator this handle is tied to.
313	pub fn parent(&self) -> &Stats {
314		&self.stats
315	}
316
317	/// The tier this handle bumps into.
318	pub fn tier(&self) -> Tier {
319		self.tier
320	}
321
322	/// Returns a per-broadcast handle scoped to this tier. Cheap; level state is
323	/// created lazily and cached.
324	///
325	/// Paths under the aggregator's configured `prefix` return an empty handle
326	/// whose bumps are no-ops. This keeps stats traffic from feeding back into
327	/// the aggregator.
328	pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
329		BroadcastStats {
330			levels: self.stats.broadcast_levels(path),
331			tier: self.tier,
332		}
333	}
334}
335
336impl Default for StatsHandle {
337	fn default() -> Self {
338		Self::disabled()
339	}
340}
341
342/// A per-broadcast, tier-scoped handle. Cheap to clone.
343///
344/// Open a broadcast-lifetime guard with [`Self::publisher`] / [`Self::subscriber`],
345/// or skip straight to a track guard with [`Self::publisher_track`] /
346/// [`Self::subscriber_track`] when the broadcast's lifetime is tracked elsewhere.
347#[derive(Clone)]
348pub struct BroadcastStats {
349	levels: Arc<[Arc<Level>]>,
350	tier: Tier,
351}
352
353impl BroadcastStats {
354	/// True if this handle covers no levels (its path was under the aggregator's
355	/// own prefix, or stats are disabled). All bumps through an empty handle
356	/// are no-ops.
357	pub fn is_empty(&self) -> bool {
358		self.levels.is_empty()
359	}
360
361	/// Open a broadcast-lifetime guard for the publisher (egress) role.
362	/// Bumps `broadcasts` on construction and `broadcasts_closed` on drop.
363	pub fn publisher(&self) -> PublisherStats {
364		for level in self.levels.iter() {
365			level
366				.counters(self.tier, Role::Publisher)
367				.broadcasts
368				.fetch_add(1, Ordering::Relaxed);
369		}
370		PublisherStats {
371			levels: self.levels.clone(),
372			tier: self.tier,
373		}
374	}
375
376	/// Open a broadcast-lifetime guard for the subscriber (ingress) role.
377	/// Bumps `broadcasts` on construction and `broadcasts_closed` on drop.
378	pub fn subscriber(&self) -> SubscriberStats {
379		for level in self.levels.iter() {
380			level
381				.counters(self.tier, Role::Subscriber)
382				.broadcasts
383				.fetch_add(1, Ordering::Relaxed);
384		}
385		SubscriberStats {
386			levels: self.levels.clone(),
387			tier: self.tier,
388		}
389	}
390
391	/// Open a publisher-track guard without bumping the broadcast counters.
392	///
393	/// Use this when the broadcast is already counted by a [`PublisherStats`]
394	/// guard held elsewhere (e.g. by the announce loop), so the track guard
395	/// only contributes to subscription counters.
396	///
397	/// `_name` is currently unused; counters are per-level only. Reserved for
398	/// future per-track granularity.
399	pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
400		for level in self.levels.iter() {
401			level
402				.counters(self.tier, Role::Publisher)
403				.subscriptions
404				.fetch_add(1, Ordering::Relaxed);
405			ensure_task(level);
406		}
407		PublisherTrack {
408			levels: self.levels.clone(),
409			tier: self.tier,
410		}
411	}
412
413	/// Subscriber-side counterpart to [`Self::publisher_track`].
414	pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
415		for level in self.levels.iter() {
416			level
417				.counters(self.tier, Role::Subscriber)
418				.subscriptions
419				.fetch_add(1, Ordering::Relaxed);
420			ensure_task(level);
421		}
422		SubscriberTrack {
423			levels: self.levels.clone(),
424			tier: self.tier,
425		}
426	}
427}
428
429/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`].
430#[must_use = "drop the guard to record the broadcast as closed"]
431pub struct PublisherStats {
432	levels: Arc<[Arc<Level>]>,
433	tier: Tier,
434}
435
436impl PublisherStats {
437	/// Open a track-subscription guard. Bumps `subscriptions` on every level
438	/// and (on the 0->N transition in any role) spawns the level's snapshot
439	/// task. Drop bumps `subscriptions_closed`.
440	pub fn track(&self, name: &str) -> PublisherTrack {
441		BroadcastStats {
442			levels: self.levels.clone(),
443			tier: self.tier,
444		}
445		.publisher_track(name)
446	}
447}
448
449impl Drop for PublisherStats {
450	fn drop(&mut self) {
451		for level in self.levels.iter() {
452			level
453				.counters(self.tier, Role::Publisher)
454				.broadcasts_closed
455				.fetch_add(1, Ordering::Relaxed);
456		}
457	}
458}
459
460/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`].
461#[must_use = "drop the guard to record the broadcast as closed"]
462pub struct SubscriberStats {
463	levels: Arc<[Arc<Level>]>,
464	tier: Tier,
465}
466
467impl SubscriberStats {
468	/// Open a track-subscription guard. Mirrors [`PublisherStats::track`].
469	pub fn track(&self, name: &str) -> SubscriberTrack {
470		BroadcastStats {
471			levels: self.levels.clone(),
472			tier: self.tier,
473		}
474		.subscriber_track(name)
475	}
476}
477
478impl Drop for SubscriberStats {
479	fn drop(&mut self) {
480		for level in self.levels.iter() {
481			level
482				.counters(self.tier, Role::Subscriber)
483				.broadcasts_closed
484				.fetch_add(1, Ordering::Relaxed);
485		}
486	}
487}
488
489/// RAII subscription guard for the publisher role.
490#[must_use = "drop the guard to record the subscription as closed"]
491pub struct PublisherTrack {
492	levels: Arc<[Arc<Level>]>,
493	tier: Tier,
494}
495
496impl PublisherTrack {
497	/// Bumps `frames` once.
498	pub fn frame(&self) {
499		for level in self.levels.iter() {
500			level
501				.counters(self.tier, Role::Publisher)
502				.frames
503				.fetch_add(1, Ordering::Relaxed);
504		}
505	}
506
507	/// Bumps `bytes` by `n`.
508	pub fn bytes(&self, n: u64) {
509		for level in self.levels.iter() {
510			level
511				.counters(self.tier, Role::Publisher)
512				.bytes
513				.fetch_add(n, Ordering::Relaxed);
514		}
515	}
516
517	/// Bumps `groups` once.
518	pub fn group(&self) {
519		for level in self.levels.iter() {
520			level
521				.counters(self.tier, Role::Publisher)
522				.groups
523				.fetch_add(1, Ordering::Relaxed);
524		}
525	}
526}
527
528impl Drop for PublisherTrack {
529	fn drop(&mut self) {
530		for level in self.levels.iter() {
531			level
532				.counters(self.tier, Role::Publisher)
533				.subscriptions_closed
534				.fetch_add(1, Ordering::Relaxed);
535		}
536	}
537}
538
539/// RAII subscription guard for the subscriber role.
540#[must_use = "drop the guard to record the subscription as closed"]
541pub struct SubscriberTrack {
542	levels: Arc<[Arc<Level>]>,
543	tier: Tier,
544}
545
546impl SubscriberTrack {
547	/// Bumps `frames` once.
548	pub fn frame(&self) {
549		for level in self.levels.iter() {
550			level
551				.counters(self.tier, Role::Subscriber)
552				.frames
553				.fetch_add(1, Ordering::Relaxed);
554		}
555	}
556
557	/// Bumps `bytes` by `n`.
558	pub fn bytes(&self, n: u64) {
559		for level in self.levels.iter() {
560			level
561				.counters(self.tier, Role::Subscriber)
562				.bytes
563				.fetch_add(n, Ordering::Relaxed);
564		}
565	}
566
567	/// Bumps `groups` once.
568	pub fn group(&self) {
569		for level in self.levels.iter() {
570			level
571				.counters(self.tier, Role::Subscriber)
572				.groups
573				.fetch_add(1, Ordering::Relaxed);
574		}
575	}
576}
577
578impl Drop for SubscriberTrack {
579	fn drop(&mut self) {
580		for level in self.levels.iter() {
581			level
582				.counters(self.tier, Role::Subscriber)
583				.subscriptions_closed
584				.fetch_add(1, Ordering::Relaxed);
585		}
586	}
587}
588
589fn ensure_task(level: &Arc<Level>) {
590	let mut slot = level.task.lock();
591	if slot.is_none() {
592		*slot = Some(());
593		let weak = Arc::downgrade(level);
594		spawn(run_publisher(weak));
595	}
596}
597
598async fn run_publisher(weak: Weak<Level>) {
599	let setup = {
600		let Some(level) = weak.upgrade() else {
601			return;
602		};
603		let mut broadcast = Broadcast::new().produce();
604		let mut make = |name: &str| {
605			broadcast.create_track(Track {
606				name: name.into(),
607				priority: 0,
608			})
609		};
610		let ext_pub = match make("publisher.json") {
611			Ok(t) => t,
612			Err(err) => {
613				tracing::warn!(?err, "stats: failed to create publisher.json");
614				clear_task(&level);
615				return;
616			}
617		};
618		let ext_sub = match make("subscriber.json") {
619			Ok(t) => t,
620			Err(err) => {
621				tracing::warn!(?err, "stats: failed to create subscriber.json");
622				clear_task(&level);
623				return;
624			}
625		};
626		let int_pub = match make("internal/publisher.json") {
627			Ok(t) => t,
628			Err(err) => {
629				tracing::warn!(?err, "stats: failed to create internal/publisher.json");
630				clear_task(&level);
631				return;
632			}
633		};
634		let int_sub = match make("internal/subscriber.json") {
635			Ok(t) => t,
636			Err(err) => {
637				tracing::warn!(?err, "stats: failed to create internal/subscriber.json");
638				clear_task(&level);
639				return;
640			}
641		};
642		if !level.origin.publish_broadcast(&level.advertised, broadcast.consume()) {
643			tracing::warn!(level = %level.advertised, "stats: origin rejected stats broadcast");
644			clear_task(&level);
645			return;
646		}
647		(broadcast, ext_pub, ext_sub, int_pub, int_sub)
648	};
649	let (broadcast, mut ext_pub, mut ext_sub, mut int_pub, mut int_sub) = setup;
650
651	let mut last_ext_pub: Option<Snapshot> = None;
652	let mut last_ext_sub: Option<Snapshot> = None;
653	let mut last_int_pub: Option<Snapshot> = None;
654	let mut last_int_sub: Option<Snapshot> = None;
655
656	let mut tick = tokio::time::interval(Duration::from_secs(1));
657	tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
658
659	loop {
660		tick.tick().await;
661
662		let Some(level) = weak.upgrade() else {
663			return;
664		};
665
666		if !level.any_active() {
667			// Take the task slot under the lock and re-check. Any subscribe that
668			// raced with us either landed before we set None (so it sees Some
669			// and won't respawn) or after, in which case it spawns a fresh task.
670			let mut slot = level.task.lock();
671			if !level.any_active() {
672				*slot = None;
673				drop(slot);
674				drop(level);
675				// Drop `broadcast` to unannounce. Leftover producers/consumers
676				// follow the existing `closed()` watcher in OriginProducer.
677				drop(broadcast);
678				return;
679			}
680		}
681
682		maybe_write(&mut ext_pub, Tier::External, Role::Publisher, &level, &mut last_ext_pub);
683		maybe_write(
684			&mut ext_sub,
685			Tier::External,
686			Role::Subscriber,
687			&level,
688			&mut last_ext_sub,
689		);
690		maybe_write(&mut int_pub, Tier::Internal, Role::Publisher, &level, &mut last_int_pub);
691		maybe_write(
692			&mut int_sub,
693			Tier::Internal,
694			Role::Subscriber,
695			&level,
696			&mut last_int_sub,
697		);
698	}
699}
700
701fn maybe_write(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, last: &mut Option<Snapshot>) {
702	let snapshot = level.counters(tier, role).snapshot();
703	if last.as_ref() == Some(&snapshot) {
704		return;
705	}
706	write_snapshot(track, tier, role, level, snapshot);
707	*last = Some(snapshot);
708}
709
710fn clear_task(level: &Level) {
711	*level.task.lock() = None;
712}
713
714#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
715struct Snapshot {
716	broadcasts: u64,
717	broadcasts_closed: u64,
718	subscriptions: u64,
719	subscriptions_closed: u64,
720	bytes: u64,
721	frames: u64,
722	groups: u64,
723}
724
725#[derive(Debug, Serialize)]
726struct SnapshotFrame<'a> {
727	v: u32,
728	level: &'a str,
729	tier: &'a str,
730	role: &'a str,
731	#[serde(skip_serializing_if = "Option::is_none")]
732	node: Option<&'a str>,
733	ts_ms: u64,
734	#[serde(flatten)]
735	snapshot: Snapshot,
736}
737
738fn write_snapshot(track: &mut crate::TrackProducer, tier: Tier, role: Role, level: &Level, snapshot: Snapshot) {
739	let frame = SnapshotFrame {
740		v: 1,
741		level: level.level_key.as_str(),
742		tier: tier.as_str(),
743		role: role.as_str(),
744		node: level.node.as_ref().map(|p| p.as_str()),
745		ts_ms: now_ms(),
746		snapshot,
747	};
748
749	let buf = match serde_json::to_vec(&frame) {
750		Ok(buf) => buf,
751		Err(err) => {
752			tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to serialize snapshot");
753			return;
754		}
755	};
756
757	if let Err(err) = track.write_frame(buf) {
758		tracing::debug!(?err, ?tier, ?role, level = %level.advertised, "stats: failed to write snapshot frame");
759	}
760}
761
762fn now_ms() -> u64 {
763	SystemTime::now()
764		.duration_since(UNIX_EPOCH)
765		.map(|d| d.as_millis() as u64)
766		.unwrap_or(0)
767}
768
769/// Compute the level prefix keys this broadcast contributes to.
770///
771/// Produces every prefix of the broadcast path from 0 segments (root) up to
772/// `min(levels, segments)` segments, inclusive, so a broadcast within the
773/// configured depth budget gets a dedicated per-broadcast bucket in addition
774/// to the aggregating prefixes. Broadcasts deeper than `levels` are truncated
775/// (no per-broadcast bucket). `levels == 0` produces no buckets at all.
776fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
777	if levels == 0 {
778		return Vec::new();
779	}
780	if broadcast.is_empty() {
781		return vec![PathOwned::default()];
782	}
783
784	let segs: Vec<&str> = broadcast.as_str().split('/').collect();
785	let max = (levels as usize).min(segs.len());
786	(0..=max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
787}
788
789fn advertised_path(prefix: &Path, level_key: &Path, node: Option<&Path>) -> PathOwned {
790	// The fixed `prefix` category leaves room for sibling categories (e.g.
791	// `<top-prefix>/nodes/<node>` for host-level stats) under the same prefix.
792	let top = prefix.as_str();
793	let mut out = format!("{top}/prefix");
794	if !level_key.is_empty() {
795		out.push('/');
796		out.push_str(level_key.as_str());
797	}
798	if let Some(node) = node {
799		// `node` is already a normalized path, so multi-segment values like
800		// `sjc/1` nest under the region without any extra handling.
801		out.push('/');
802		out.push_str(node.as_str());
803	}
804	PathOwned::from(out)
805}
806
807#[cfg(test)]
808mod tests {
809	use std::sync::atomic::Ordering::Relaxed;
810
811	use crate::{Origin, Path};
812
813	use super::*;
814
815	#[test]
816	fn level_keys_basic() {
817		let key = |s: &str, n: u32| {
818			level_keys(&Path::new(s), n)
819				.into_iter()
820				.map(|p| p.as_str().to_string())
821				.collect::<Vec<_>>()
822		};
823
824		// levels=1 covers root + the first segment; for "demo/bbb" that's the
825		// "demo" aggregating prefix (no per-broadcast bucket since we'd need
826		// levels=2 to reach the broadcast itself).
827		assert_eq!(key("demo/bbb", 1), vec!["", "demo"]);
828		// levels=2 reaches the broadcast itself, so we get root + prefix + own.
829		assert_eq!(key("demo/bbb", 2), vec!["", "demo", "demo/bbb"]);
830		// Capped: broadcast is 2 segments, levels=3 still tops out at the
831		// broadcast's own path.
832		assert_eq!(key("demo/bbb", 3), vec!["", "demo", "demo/bbb"]);
833		// Deeper broadcast, levels=3 stops one short of the broadcast itself.
834		assert_eq!(key("a/b/c/d", 3), vec!["", "a", "a/b", "a/b/c"]);
835		// 1-segment broadcast, levels=2 reaches the broadcast.
836		assert_eq!(key("demo", 2), vec!["", "demo"]);
837		// levels=0 yields no buckets at all.
838		assert!(key("demo/bbb", 0).is_empty());
839	}
840
841	#[test]
842	fn advertised_path_root_and_nested() {
843		let prefix = Path::new(".stats");
844		let node = PathOwned::from("sjc");
845		assert_eq!(
846			advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
847			".stats/prefix/sjc"
848		);
849		assert_eq!(
850			advertised_path(&prefix, &Path::new("demo"), Some(&node)).as_str(),
851			".stats/prefix/demo/sjc"
852		);
853		assert_eq!(
854			advertised_path(&prefix, &Path::new("demo/foo"), Some(&node)).as_str(),
855			".stats/prefix/demo/foo/sjc"
856		);
857	}
858
859	#[test]
860	fn advertised_path_without_node() {
861		let prefix = Path::new(".stats");
862		assert_eq!(advertised_path(&prefix, &Path::new(""), None).as_str(), ".stats/prefix");
863		assert_eq!(
864			advertised_path(&prefix, &Path::new("demo"), None).as_str(),
865			".stats/prefix/demo"
866		);
867	}
868
869	#[test]
870	fn advertised_path_honors_custom_prefix() {
871		let prefix = Path::new("metrics");
872		let node = PathOwned::from("lon");
873		assert_eq!(
874			advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
875			"metrics/prefix/lon"
876		);
877		assert_eq!(
878			advertised_path(&prefix, &Path::new("demo/room"), Some(&node)).as_str(),
879			"metrics/prefix/demo/room/lon"
880		);
881	}
882
883	#[test]
884	fn advertised_path_multi_segment_node() {
885		let prefix = Path::new(".stats");
886		let node = PathOwned::from("sjc/1");
887		assert_eq!(
888			advertised_path(&prefix, &Path::new(""), Some(&node)).as_str(),
889			".stats/prefix/sjc/1"
890		);
891		assert_eq!(
892			advertised_path(&prefix, &Path::new("demo"), Some(&node)).as_str(),
893			".stats/prefix/demo/sjc/1"
894		);
895		assert_eq!(
896			advertised_path(&prefix, &Path::new("demo/foo"), Some(&node)).as_str(),
897			".stats/prefix/demo/foo/sjc/1"
898		);
899		// A second host in the same region nests under the shared region key
900		// without colliding with the first.
901		let node2 = PathOwned::from("sjc/2");
902		assert_eq!(
903			advertised_path(&prefix, &Path::new("demo"), Some(&node2)).as_str(),
904			".stats/prefix/demo/sjc/2"
905		);
906	}
907
908	#[test]
909	fn new_normalizes_and_drops_empty_node() {
910		let origin = Origin::random().produce();
911
912		// Trailing slash and a doubled separator both get normalized away.
913		let stats = Stats::new(".stats", 1, Some(PathOwned::from("/sjc//1/")), origin.clone());
914		assert_eq!(stats.inner.node.as_ref().unwrap().as_str(), "sjc/1");
915
916		// Empty / whitespace-only inputs collapse to None so downstream code
917		// only has to handle a single "no node" representation.
918		let stats = Stats::new(".stats", 1, Some(PathOwned::from("")), origin.clone());
919		assert!(stats.inner.node.is_none());
920		let stats = Stats::new(".stats", 1, Some(PathOwned::from("///")), origin);
921		assert!(stats.inner.node.is_none());
922	}
923
924	#[tokio::test(start_paused = true)]
925	async fn external_publisher_bumps_external_publisher_counters() {
926		let origin = Origin::random().produce();
927		let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
928		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
929		let pub_role = bs.publisher();
930		let track = pub_role.track("video");
931		track.frame();
932		track.bytes(100);
933		track.group();
934		drop(track);
935		drop(pub_role);
936
937		let entries = stats.inner.entries.lock();
938		let root = entries.get(&PathOwned::from("")).expect("root level");
939		assert_eq!(root.external_publisher.frames.load(Relaxed), 1);
940		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
941		assert_eq!(root.external_publisher.groups.load(Relaxed), 1);
942		assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
943		assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
944		assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 1);
945		assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 1);
946		// Other tier/role combos must remain untouched.
947		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
948		assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
949		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
950	}
951
952	#[tokio::test(start_paused = true)]
953	async fn external_subscriber_bumps_external_subscriber_counters() {
954		let origin = Origin::random().produce();
955		let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
956		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
957		let sub_role = bs.subscriber();
958		let track = sub_role.track("video");
959		track.frame();
960		track.bytes(50);
961
962		let entries = stats.inner.entries.lock();
963		let root = entries.get(&PathOwned::from("")).expect("root level");
964		assert_eq!(root.external_subscriber.frames.load(Relaxed), 1);
965		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 50);
966		assert_eq!(root.external_subscriber.broadcasts.load(Relaxed), 1);
967		assert_eq!(root.external_subscriber.subscriptions.load(Relaxed), 1);
968		assert_eq!(root.external_publisher.bytes.load(Relaxed), 0);
969		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 0);
970	}
971
972	#[tokio::test(start_paused = true)]
973	async fn external_and_internal_tiers_are_independent() {
974		let origin = Origin::random().produce();
975		let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
976		let ext = stats.tier(Tier::External);
977		let int = stats.tier(Tier::Internal);
978
979		let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
980		ext_track.bytes(100);
981		let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
982		int_track.bytes(7);
983
984		let entries = stats.inner.entries.lock();
985		let root = entries.get(&PathOwned::from("")).expect("root level");
986		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
987		assert_eq!(root.external_subscriber.bytes.load(Relaxed), 0);
988		assert_eq!(root.internal_publisher.bytes.load(Relaxed), 0);
989		assert_eq!(root.internal_subscriber.bytes.load(Relaxed), 7);
990	}
991
992	#[tokio::test(start_paused = true)]
993	async fn bumps_fanout_to_all_levels() {
994		let origin = Origin::random().produce();
995		let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
996		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
997		let p = bs.publisher();
998		let track = p.track("video");
999		track.bytes(100);
1000
1001		let entries = stats.inner.entries.lock();
1002		let root = entries.get(&PathOwned::from("")).expect("root level");
1003		let demo = entries.get(&PathOwned::from("demo")).expect("demo level");
1004		assert_eq!(root.external_publisher.bytes.load(Relaxed), 100);
1005		assert_eq!(demo.external_publisher.bytes.load(Relaxed), 100);
1006	}
1007
1008	#[tokio::test(start_paused = true)]
1009	async fn paths_under_prefix_are_no_op() {
1010		// Our own stats broadcasts (and any sibling category under the same
1011		// prefix) must not feed back into the aggregator.
1012		let origin = Origin::random().produce();
1013		let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin);
1014		let bs = stats.tier(Tier::External).broadcast(".stats/prefix/sjc");
1015		assert!(bs.is_empty());
1016
1017		let p = bs.publisher();
1018		let track = p.track("video");
1019		track.bytes(100);
1020		track.frame();
1021		track.group();
1022		drop(track);
1023		drop(p);
1024
1025		assert!(stats.inner.entries.lock().is_empty());
1026	}
1027
1028	#[tokio::test(start_paused = true)]
1029	async fn publisher_track_does_not_bump_broadcasts() {
1030		// Subscription-side track creation should not record a broadcast: the
1031		// broadcast lifetime is tracked separately by the announce loop.
1032		let origin = Origin::random().produce();
1033		let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin);
1034		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1035		let track = bs.publisher_track("video");
1036		track.bytes(10);
1037		drop(track);
1038
1039		let entries = stats.inner.entries.lock();
1040		let root = entries.get(&PathOwned::from("")).expect("root level");
1041		assert_eq!(root.external_publisher.broadcasts.load(Relaxed), 0);
1042		assert_eq!(root.external_publisher.broadcasts_closed.load(Relaxed), 0);
1043		assert_eq!(root.external_publisher.subscriptions.load(Relaxed), 1);
1044		assert_eq!(root.external_publisher.subscriptions_closed.load(Relaxed), 1);
1045		assert_eq!(root.external_publisher.bytes.load(Relaxed), 10);
1046	}
1047
1048	#[tokio::test(start_paused = true)]
1049	async fn disabled_stats_are_noop() {
1050		// A disabled aggregator must not allocate level state or spawn tasks.
1051		let stats = Stats::disabled();
1052		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1053		assert!(bs.is_empty());
1054		let p = bs.publisher();
1055		let track = p.track("video");
1056		track.bytes(100);
1057		drop(track);
1058		drop(p);
1059		assert!(stats.inner.entries.lock().is_empty());
1060	}
1061
1062	#[tokio::test(start_paused = true)]
1063	async fn task_spawns_on_first_subscribe_and_announces() {
1064		let origin = Origin::random().produce();
1065		let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone());
1066		let mut consumer = origin.consume();
1067
1068		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1069		let p = bs.publisher();
1070		let _track = p.track("video");
1071
1072		tokio::time::advance(Duration::from_millis(1)).await;
1073		// levels=1 + broadcast "foo/bar" → buckets ["", "foo"]: root + per-first-segment.
1074		let mut seen = std::collections::HashSet::new();
1075		for _ in 0..2 {
1076			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1077			assert!(broadcast.is_some());
1078			seen.insert(path.as_str().to_string());
1079		}
1080		assert!(seen.contains(".stats/prefix/sjc"));
1081		assert!(seen.contains(".stats/prefix/foo/sjc"));
1082	}
1083
1084	#[tokio::test(start_paused = true)]
1085	async fn task_spawns_with_node_suffix() {
1086		let origin = Origin::random().produce();
1087		let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin.clone());
1088		let mut consumer = origin.consume();
1089
1090		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1091		let p = bs.publisher();
1092		let _track = p.track("video");
1093
1094		tokio::time::advance(Duration::from_millis(1)).await;
1095		// levels=2 + broadcast "foo/bar" → buckets ["", "foo", "foo/bar"], each
1096		// suffixed with `/sjc`.
1097		let mut seen = std::collections::HashSet::new();
1098		for _ in 0..3 {
1099			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1100			assert!(broadcast.is_some());
1101			seen.insert(path.as_str().to_string());
1102		}
1103		assert!(seen.contains(".stats/prefix/sjc"));
1104		assert!(seen.contains(".stats/prefix/foo/sjc"));
1105		assert!(seen.contains(".stats/prefix/foo/bar/sjc"));
1106	}
1107
1108	#[tokio::test(start_paused = true)]
1109	async fn task_spawns_without_node_suffix() {
1110		// node=None: paths should omit the trailing /<node> segment.
1111		let origin = Origin::random().produce();
1112		let stats = Stats::new(".stats", 1, None, origin.clone());
1113		let mut consumer = origin.consume();
1114
1115		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1116		let p = bs.publisher();
1117		let _track = p.track("video");
1118
1119		tokio::time::advance(Duration::from_millis(1)).await;
1120		let mut seen = std::collections::HashSet::new();
1121		for _ in 0..2 {
1122			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1123			assert!(broadcast.is_some());
1124			seen.insert(path.as_str().to_string());
1125		}
1126		assert!(seen.contains(".stats/prefix"));
1127		assert!(seen.contains(".stats/prefix/foo"));
1128	}
1129
1130	#[tokio::test(start_paused = true)]
1131	async fn task_exits_when_all_roles_idle() {
1132		let origin = Origin::random().produce();
1133		// levels=1 + broadcast "foo/bar" → buckets ["", "foo"] (root prefix plus
1134		// the first-segment prefix; the broadcast's own path isn't reachable at
1135		// this depth, so we get exactly two stats announces).
1136		let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone());
1137		let mut consumer = origin.consume();
1138
1139		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1140		let p = bs.publisher();
1141		let track = p.track("video");
1142
1143		tokio::time::advance(Duration::from_millis(1)).await;
1144		let mut announced: Vec<String> = Vec::new();
1145		for _ in 0..2 {
1146			let (path, broadcast) = consumer.announced().await.expect("expected announce");
1147			assert!(broadcast.is_some(), "expected an active announce");
1148			announced.push(path.as_str().to_string());
1149		}
1150		announced.sort();
1151		assert_eq!(announced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1152
1153		drop(track);
1154		drop(p);
1155		drop(bs);
1156
1157		tokio::time::advance(Duration::from_secs(2)).await;
1158		let mut unannounced: Vec<String> = Vec::new();
1159		for _ in 0..2 {
1160			let (path, broadcast) = consumer.announced().await.expect("expected unannounce");
1161			assert!(broadcast.is_none(), "expected an unannounce");
1162			unannounced.push(path.as_str().to_string());
1163		}
1164		unannounced.sort();
1165		assert_eq!(unannounced, vec![".stats/prefix/foo/sjc", ".stats/prefix/sjc"]);
1166	}
1167
1168	// Idle-skip behavior (the snapshot task suppresses a write when the
1169	// current Snapshot equals the last emitted one) is covered end-to-end in
1170	// local relay verification; driving the broadcast/track plumbing in a unit
1171	// test is awkward enough that the skip logic itself (`if last == new
1172	// { return; }` in `maybe_write`) is more clearly exercised by inspection.
1173}