Skip to main content

moq_net/
stats.rs

1//! Generic stats publishing for moq-net sessions.
2//!
3//! [`Stats`] aggregates per-broadcast counter bumps for traffic this relay
4//! node is handling and publishes them on a single `<prefix>/node/<node>`
5//! broadcast (or `<prefix>/node` when no node is configured). The broadcast
6//! carries four per-broadcast tracks, one per `(tier, role)` pair:
7//!
8//! * `publisher.json`           : external (e.g. customer) egress
9//! * `subscriber.json`          : external ingress
10//! * `internal/publisher.json`  : internal (e.g. mTLS cluster peer) egress
11//! * `internal/subscriber.json` : internal ingress
12//!
13//! plus two session tracks, one per tier, that count connected sessions
14//! keyed by auth root rather than broadcast:
15//!
16//! * `sessions.json`            : external sessions by root
17//! * `internal/sessions.json`   : internal sessions by root
18//!
19//! Each per-broadcast frame is a JSON object mapping broadcast path to a
20//! cumulative counter snapshot. Tier, role, and node are implied by the track
21//! and broadcast paths, so they aren't repeated inside the frame. An entry
22//! appears in the frame for a given `(tier, role)` on any tick where the
23//! broadcast is live (any open counter still exceeds its `*_closed`
24//! counterpart, so a subscription could begin at any moment) or its
25//! snapshot changed since the previous tick. Once every counter equals its
26//! `*_closed` counterpart no traffic can flow, so the entry is dropped. A
27//! downstream aggregator computes rates from successive cumulative
28//! snapshots and slices the data however a dashboard wants.
29//!
30//! Each session frame maps auth root to a `{ sessions, sessions_closed }`
31//! snapshot: `sessions` bumps when a session authenticated under that root
32//! connects, `sessions_closed` when it disconnects, so `sessions -
33//! sessions_closed` is the live session count for the root. This counts
34//! connected sessions regardless of whether any data flows, which is what
35//! presence-based billing wants. A root entry is emitted while live or on the
36//! tick it changed, then dropped once no session under it remains.
37//!
38//! Per-snapshot semantics:
39//!
40//! * `announced` / `announced_closed`: cumulative count of broadcast
41//!   announce/unannounce events on this `(tier, role)`. Bumped on every
42//!   `publisher()` / `subscriber()` guard creation and drop.
43//! * `broadcasts` / `broadcasts_closed`: per-(broadcast, session)
44//!   subscription sentinel. The first active subscription a peer session
45//!   opens for a broadcast bumps `broadcasts`; the last one it closes bumps
46//!   `broadcasts_closed`. Summed across sessions, `broadcasts -
47//!   broadcasts_closed` is the number of distinct sessions currently
48//!   subscribed to the broadcast (i.e. viewers on the egress side). Driven
49//!   by [`SessionBroadcasts`]; use `announced` if you want all broadcasts
50//!   ever seen.
51//! * `subscriptions` / `subscriptions_closed`: cumulative count of
52//!   track-level subscription guards opened/dropped.
53//! * `bytes` / `frames` / `groups`: cumulative payload counters bumped from
54//!   the session loops (both lite and IETF).
55//! * `sessions` / `sessions_closed` (session tracks only): cumulative count
56//!   of sessions connected/disconnected under an auth root on this tier.
57//!   Driven by [`StatsHandle::session`].
58//!
59//! Counters are strictly monotonic (only `fetch_add`); a counter going
60//! backwards across snapshots means the underlying entry was garbage
61//! collected and re-created. Downstream consumers should treat decreases
62//! as a fresh session segment, summing across resets when computing
63//! lifetime totals.
64//!
65//! A caller hands each session a tier-scoped [`StatsHandle`] (built from the
66//! single shared [`Stats`] via [`Stats::tier`]) which determines which counter
67//! set its bumps land in. Multiple relays in the same cluster origin can
68//! coexist by giving each one a distinct `<node>` suffix on the advertised
69//! path. The suffix itself may be multi-segment (e.g. `sjc/1`, `sjc/2`) so a
70//! region with multiple hosts can nest under a shared region key without
71//! colliding.
72//!
73//! # Disabled stats
74//!
75//! A [`StatsConfig`] with no origin (the default) builds a no-op aggregator:
76//! all counter bumps are silently dropped, no snapshot task spawns, and no
77//! broadcast is published. [`Stats::default`] / [`StatsHandle::default`]
78//! return one, so call sites can hold a [`StatsHandle`] unconditionally
79//! instead of threading an `Option`.
80//!
81//! # Lifecycle
82//!
83//! When the config has an origin, [`Stats::new`] spawns the snapshot task
84//! immediately, publishes the stats broadcast, and ticks at the configured
85//! interval, writing a frame per (tier, role) track. The broadcast stays
86//! announced for the lifetime of the [`Stats`] aggregator, even while idle
87//! (frames just go to `{}`). The task exits when the last [`Stats`] clone is
88//! dropped (the task holds only a `Weak` to the shared state).
89//!
90//! # Idle frame skipping
91//!
92//! On each tick the task compares the just-built per-(tier, role) JSON payload
93//! against the last one it emitted and writes a frame only when something
94//! changed. New subscribers still pick up a baseline immediately because
95//! track-latest semantics retain the most recent emitted frame.
96//!
97//! # Snapshot atomicity
98//!
99//! Each [`Counters`] snapshot reads `*_closed` atomics (with `Acquire`)
100//! before their open counterparts (with `Relaxed`). The matching close
101//! bumps in the RAII guards' `Drop` impls use `Release`. With this
102//! pairing the snapshot always satisfies `open >= closed` even on
103//! weakly-ordered architectures (ARM, POWER): the `Acquire` load of
104//! close synchronizes-with the `Release` bump that produced the
105//! observed value, making every write that happened-before that close
106//! (including the matching open bump on whichever thread opened the
107//! guard) visible to the snapshot thread. Open / payload counters can
108//! then stay `Relaxed` because the visibility comes for free through
109//! the close pairing. The cost is a slight upward bias on the open
110//! counts when a bump lands between the two loads, which never produces
111//! a logically impossible (`closed > open`) snapshot for downstream.
112//!
113//! # Cycles
114//!
115//! Calling [`StatsHandle::broadcast`] for a path under the configured
116//! top-level prefix returns an empty handle whose bumps no-op. This breaks
117//! the feedback loop where serving a `<top-prefix>/...` broadcast would
118//! itself generate more stats traffic.
119
120use std::{
121	collections::{BTreeMap, HashMap},
122	sync::{
123		Arc, Weak,
124		atomic::{AtomicU64, Ordering},
125	},
126	time::Duration,
127};
128
129use serde::Serialize;
130use web_async::{Lock, spawn};
131
132use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
133
134/// Cumulative atomic counters for a single `(tier, role)` on a broadcast.
135///
136/// Every field is bumped from a RAII guard: the open counters on construction
137/// and their `_closed` counterparts on drop. `broadcasts` / `broadcasts_closed`
138/// are the per-(broadcast, session) subscription sentinel driven by
139/// [`SessionBroadcasts`] (the first active subscription a session opens for the
140/// broadcast bumps `broadcasts`, the last to close bumps `broadcasts_closed`),
141/// so summed across sessions `broadcasts - broadcasts_closed` is the count of
142/// distinct sessions currently subscribed.
143#[derive(Default, Debug)]
144#[non_exhaustive]
145pub struct Counters {
146	pub announced: AtomicU64,
147	pub announced_closed: AtomicU64,
148	pub subscriptions: AtomicU64,
149	pub subscriptions_closed: AtomicU64,
150	pub broadcasts: AtomicU64,
151	pub broadcasts_closed: AtomicU64,
152	pub bytes: AtomicU64,
153	pub frames: AtomicU64,
154	pub groups: AtomicU64,
155}
156
157impl Counters {
158	/// Read all atomics into a `RawCounts`. Closed counters are read with
159	/// `Acquire` ordering before their open counterparts so the snapshot
160	/// always satisfies `open >= closed`; see the module-level "Snapshot
161	/// atomicity" note. Open / payload counters stay `Relaxed`: the
162	/// Acquire on close synchronizes-with the matching Release on the
163	/// close bump, which transitively makes all earlier writes (including
164	/// the prior open bump) visible to this thread.
165	fn snapshot(&self) -> RawCounts {
166		let announced_closed = self.announced_closed.load(Ordering::Acquire);
167		let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
168		let broadcasts_closed = self.broadcasts_closed.load(Ordering::Acquire);
169		let announced = self.announced.load(Ordering::Relaxed);
170		let subscriptions = self.subscriptions.load(Ordering::Relaxed);
171		let broadcasts = self.broadcasts.load(Ordering::Relaxed);
172		let bytes = self.bytes.load(Ordering::Relaxed);
173		let frames = self.frames.load(Ordering::Relaxed);
174		let groups = self.groups.load(Ordering::Relaxed);
175		RawCounts {
176			announced,
177			announced_closed,
178			broadcasts,
179			broadcasts_closed,
180			subscriptions,
181			subscriptions_closed,
182			bytes,
183			frames,
184			groups,
185		}
186	}
187}
188
189/// Per-(tier, root) session gauge. One of these is shared (via `Arc`) by every
190/// [`SessionStats`] guard for the same auth root on the same tier: `sessions`
191/// bumps on connect, `sessions_closed` on disconnect.
192#[derive(Default, Debug)]
193struct SessionCounters {
194	sessions: AtomicU64,
195	sessions_closed: AtomicU64,
196}
197
198impl SessionCounters {
199	/// Read `(sessions, sessions_closed)`. Closed is loaded with `Acquire`
200	/// before open with `Relaxed`, the same pairing as [`Counters::snapshot`],
201	/// so the readout never shows `closed > open`.
202	fn snapshot(&self) -> (u64, u64) {
203		let closed = self.sessions_closed.load(Ordering::Acquire);
204		let open = self.sessions.load(Ordering::Relaxed);
205		(open, closed)
206	}
207}
208
209/// Raw counter readout. Intermediate type that doesn't escape this module.
210#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
211struct RawCounts {
212	announced: u64,
213	announced_closed: u64,
214	broadcasts: u64,
215	broadcasts_closed: u64,
216	subscriptions: u64,
217	subscriptions_closed: u64,
218	bytes: u64,
219	frames: u64,
220	groups: u64,
221}
222
223/// Distinguishes traffic classes so a single [`Stats`] can record
224/// customer-facing and cluster-peer traffic separately. Each tracked
225/// broadcast keeps per-tier [`Counters`] on both its publisher and
226/// subscriber sides.
227#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228pub enum Tier {
229	External,
230	Internal,
231}
232
233impl Tier {
234	fn idx(self) -> usize {
235		match self {
236			Tier::External => 0,
237			Tier::Internal => 1,
238		}
239	}
240}
241
242/// Settings for a [`Stats`] aggregator. Construct with [`StatsConfig::new`]
243/// and chain the `with_*` setters (e.g.
244/// `StatsConfig::new().with_origin(origin).with_prefix(".foo")`), then hand it
245/// to [`Stats::new`].
246///
247/// With no origin set the resulting aggregator is a no-op: bumps are dropped
248/// and no task spawns. Call [`StatsConfig::with_origin`] to publish.
249///
250/// Distinct from the relay's clap-derived `StatsConfig`, which holds the raw
251/// CLI/TOML knobs and resolves into one of these.
252///
253/// `#[non_exhaustive]` so new knobs can land without breaking call sites; build
254/// via [`StatsConfig::new`] rather than a struct literal.
255#[derive(Clone)]
256#[non_exhaustive]
257pub struct StatsConfig {
258	/// Origin that receives the stats broadcast's `publish_broadcast` calls.
259	/// When `None`, [`Stats::new`] spawns no task and publishes nothing.
260	pub origin: Option<OriginProducer>,
261	/// Top-level path stats are published under (default `.stats`). The full
262	/// advertised path is `<prefix>/node/<node>` (or `<prefix>/node` when
263	/// `node` is unset).
264	pub prefix: PathOwned,
265	/// Node suffix that disambiguates broadcasts from different relays sharing a
266	/// cluster origin. Set this on every node in multi-relay deployments. May be
267	/// multi-segment (e.g. `sjc/1`, `sjc/2`) so a region with multiple hosts can
268	/// nest under a shared region key. An empty path is treated as unset.
269	/// Default none.
270	pub node: Option<PathOwned>,
271	/// How long the snapshot task waits between publishes. Default 1s.
272	pub interval: Duration,
273}
274
275impl StatsConfig {
276	/// A config with default settings: no origin (no-op), `.stats` prefix, 1s
277	/// snapshot interval, and no node suffix. Call [`Self::with_origin`] to
278	/// actually publish.
279	pub fn new() -> Self {
280		Self {
281			origin: None,
282			prefix: PathOwned::from(".stats"),
283			node: None,
284			interval: Duration::from_secs(1),
285		}
286	}
287
288	/// Set the origin to publish the stats broadcast on. Without this the
289	/// aggregator is a no-op.
290	pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
291		self.origin = origin.into();
292		self
293	}
294
295	/// Override the top-level prefix (default `.stats`).
296	pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
297		self.prefix = prefix.into();
298		self
299	}
300
301	/// Override the snapshot interval (default 1s).
302	pub fn with_interval(mut self, interval: Duration) -> Self {
303		self.interval = interval;
304		self
305	}
306
307	/// Set the node suffix (default none). An empty path is treated as unset.
308	pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
309		self.node = node.into();
310		self
311	}
312}
313
314impl Default for StatsConfig {
315	fn default() -> Self {
316		Self::new()
317	}
318}
319
320/// Top-level stats aggregator. Cheap to clone (`Arc` inside for the shared
321/// runtime state). One instance per relay; sessions get tier-scoped handles via
322/// [`Stats::tier`]. Build it from a [`StatsConfig`] via [`Stats::new`].
323#[derive(Clone)]
324pub struct Stats {
325	prefix: PathOwned,
326	/// `None` for a no-op aggregator (config had no origin): bumps are
327	/// dropped and no task was spawned.
328	shared: Option<Arc<StatsShared>>,
329}
330
331/// Runtime state shared by every clone of a [`Stats`] and held by the
332/// snapshot task through a `Weak`. Only allocated when an origin is set.
333struct StatsShared {
334	origin: OriginProducer,
335	entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
336	/// Connected-session gauges keyed by auth root, one map per tier (indexed
337	/// by `Tier::idx`). Independent of any broadcast; surfaced on the session
338	/// tracks.
339	sessions: [Lock<HashMap<PathOwned, Arc<SessionCounters>>>; 2],
340}
341
342/// Per-broadcast counters split by side then tier. The two side fields are
343/// named explicitly (rather than indexed by some `Role` enum) because the
344/// bump-path call sites always know which side they're on at compile time;
345/// only the tier varies dynamically with the session.
346struct BroadcastEntry {
347	publisher: [Counters; 2],
348	subscriber: [Counters; 2],
349}
350
351impl BroadcastEntry {
352	fn new() -> Self {
353		Self {
354			publisher: Default::default(),
355			subscriber: Default::default(),
356		}
357	}
358}
359
360/// Per-(entry, slot) state owned by the snapshot task. The snapshot task
361/// is single-threaded so this needs no atomics; we keep one of these per
362/// `(path, side, tier)` in a task-local map, mirroring the structure of
363/// [`BroadcastEntry`].
364#[derive(Default)]
365struct SlotState {
366	/// Last `Snapshot` we wrote to the frame for this slot, used to detect
367	/// changes that warrant re-emission.
368	prev_emitted: Option<Snapshot>,
369}
370
371/// Snapshot-task-local mirror of [`BroadcastEntry`]: per-side, per-tier
372/// `SlotState`. Same field layout so iteration in the snapshot loop is
373/// trivially parallel between the two.
374#[derive(Default)]
375struct EntrySnapState {
376	publisher: [SlotState; 2],
377	subscriber: [SlotState; 2],
378}
379
380impl EntrySnapState {
381	/// Iterate the four `(track_name, counters, slot_state)` slots in the
382	/// fixed order matching `TRACK_ORDER`.
383	fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
384		let [pub_ext_state, pub_int_state] = &mut self.publisher;
385		let [sub_ext_state, sub_int_state] = &mut self.subscriber;
386		[
387			("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
388			(
389				"subscriber.json",
390				&entry.subscriber[Tier::External.idx()],
391				sub_ext_state,
392			),
393			(
394				"internal/publisher.json",
395				&entry.publisher[Tier::Internal.idx()],
396				pub_int_state,
397			),
398			(
399				"internal/subscriber.json",
400				&entry.subscriber[Tier::Internal.idx()],
401				sub_int_state,
402			),
403		]
404	}
405}
406
407/// Number of `(side, tier)` slots, matching the four tracks per stats
408/// broadcast.
409const NUM_SLOTS: usize = 4;
410
411/// Track names in the same order [`EntrySnapState::zip_slots`] returns
412/// them. Used to construct the per-broadcast track set up front.
413const TRACK_ORDER: [&str; NUM_SLOTS] = [
414	"publisher.json",
415	"subscriber.json",
416	"internal/publisher.json",
417	"internal/subscriber.json",
418];
419
420/// Session track names, indexed by [`Tier::idx`]: external first, internal
421/// second.
422const SESSION_TRACK_ORDER: [&str; 2] = ["sessions.json", "internal/sessions.json"];
423
424impl Stats {
425	/// Build a stats aggregator from `config`.
426	///
427	/// When `config` has an origin, this spawns the snapshot task immediately
428	/// and publishes the stats broadcast; the task runs until the last [`Stats`]
429	/// clone is dropped. With no origin the aggregator is a no-op (bumps are
430	/// dropped, nothing is published) and no task spawns, so it's safe to build
431	/// outside an async runtime.
432	pub fn new(config: StatsConfig) -> Self {
433		let StatsConfig {
434			origin,
435			prefix,
436			node,
437			interval,
438		} = config;
439		// An empty path after normalization is indistinguishable from "no node
440		// set"; collapse it so downstream code only sees a single representation.
441		// We do this here (not in `with_node`) so a directly-assigned
442		// `config.node` is normalized too.
443		let node = node.filter(|p| !p.is_empty());
444
445		let shared = origin.map(|origin| {
446			let shared = Arc::new(StatsShared {
447				origin,
448				entries: Lock::default(),
449				sessions: Default::default(),
450			});
451			let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
452			spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
453			shared
454		});
455
456		Self { prefix, shared }
457	}
458
459	/// Returns the configured top-level prefix.
460	pub fn prefix(&self) -> &Path<'static> {
461		&self.prefix
462	}
463
464	/// The shared state, panicking for a no-op aggregator. Tests build with an
465	/// origin so this is always present.
466	#[cfg(test)]
467	fn shared(&self) -> &Arc<StatsShared> {
468		self.shared.as_ref().expect("enabled stats aggregator")
469	}
470
471	/// Returns a tier-scoped handle. Bumps through this handle land in the
472	/// tier's counters.
473	pub fn tier(&self, tier: Tier) -> StatsHandle {
474		StatsHandle {
475			stats: self.clone(),
476			tier,
477		}
478	}
479
480	fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
481		// No-op aggregator (no origin) never allocates state.
482		let shared = self.shared.as_ref()?;
483		let path = path.as_path();
484		// Skip our own stats broadcasts (and any sibling category under the
485		// same prefix) so serving a stats broadcast doesn't generate more
486		// stats.
487		if path.has_prefix(&self.prefix) {
488			return None;
489		}
490		let owned = path.to_owned();
491		let mut entries = shared.entries.lock();
492		Some(
493			entries
494				.entry(owned)
495				.or_insert_with(|| Arc::new(BroadcastEntry::new()))
496				.clone(),
497		)
498	}
499
500	/// Get-or-create the session gauge for `root` on `tier`. `None` for a no-op
501	/// aggregator. Unlike [`Self::entry`], roots are auth scopes (never under
502	/// the stats prefix), so no cycle-breaking filter is needed.
503	fn session_counters(&self, tier: Tier, root: impl AsPath) -> Option<Arc<SessionCounters>> {
504		let shared = self.shared.as_ref()?;
505		let owned = root.as_path().to_owned();
506		let mut sessions = shared.sessions[tier.idx()].lock();
507		Some(sessions.entry(owned).or_default().clone())
508	}
509}
510
511impl Default for Stats {
512	fn default() -> Self {
513		Self::new(StatsConfig::new())
514	}
515}
516
517/// Tier-scoped wrapper around [`Stats`]. What [`crate::Client::with_stats`] and
518/// [`crate::Server::with_stats`] accept. Cheap to clone.
519#[derive(Clone)]
520pub struct StatsHandle {
521	stats: Stats,
522	tier: Tier,
523}
524
525impl StatsHandle {
526	/// The aggregator this handle is tied to.
527	pub fn parent(&self) -> &Stats {
528		&self.stats
529	}
530
531	/// The tier this handle bumps into.
532	pub fn tier(&self) -> Tier {
533		self.tier
534	}
535
536	/// Returns a per-broadcast handle scoped to this tier.
537	///
538	/// Paths under the aggregator's configured `prefix` return an empty handle
539	/// whose bumps are no-ops. This keeps stats traffic from feeding back into
540	/// the aggregator.
541	pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
542		BroadcastStats {
543			entry: self.stats.entry(path),
544			tier: self.tier,
545		}
546	}
547
548	/// Per-session egress (publisher) broadcast-subscription tracker. Construct
549	/// one per session and call [`SessionBroadcasts::subscribe`] for each
550	/// downstream subscription so `broadcasts - broadcasts_closed` counts the
551	/// distinct sessions watching each broadcast.
552	pub fn publisher_broadcasts(&self) -> SessionBroadcasts {
553		SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Publisher)
554	}
555
556	/// Per-session ingress (subscriber) counterpart to
557	/// [`Self::publisher_broadcasts`].
558	pub fn subscriber_broadcasts(&self) -> SessionBroadcasts {
559		SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Subscriber)
560	}
561
562	/// Record a connected session authenticated under `root` on this tier. Hold
563	/// the returned guard for the session's lifetime; dropping it bumps
564	/// `sessions_closed`. Counts presence regardless of any data flow, so a
565	/// session that merely connects is still billable. Surfaced on the session
566	/// track for this tier, keyed by `root`.
567	pub fn session(&self, root: impl AsPath) -> SessionStats {
568		SessionStats::new(self.stats.session_counters(self.tier, root))
569	}
570}
571
572impl Default for StatsHandle {
573	/// A no-op handle backed by a [`Stats::default`] aggregator.
574	fn default() -> Self {
575		Stats::default().tier(Tier::External)
576	}
577}
578
579/// A per-broadcast, tier-scoped handle. Cheap to clone.
580///
581/// Open a broadcast-lifetime guard with [`Self::publisher`] / [`Self::subscriber`],
582/// or skip straight to a track guard with [`Self::publisher_track`] /
583/// [`Self::subscriber_track`] when the broadcast's lifetime is tracked
584/// elsewhere.
585#[derive(Clone)]
586pub struct BroadcastStats {
587	entry: Option<Arc<BroadcastEntry>>,
588	tier: Tier,
589}
590
591impl BroadcastStats {
592	/// True if this handle has no underlying entry (path was under the
593	/// aggregator's own prefix, or stats are disabled). All bumps through an
594	/// empty handle are no-ops.
595	pub fn is_empty(&self) -> bool {
596		self.entry.is_none()
597	}
598
599	/// Open a broadcast-lifetime guard for the publisher (egress) role.
600	/// Bumps `announced` on construction and `announced_closed` on drop.
601	/// (The `broadcasts` sentinel is driven separately by
602	/// [`SessionBroadcasts`]; see the module docs.)
603	pub fn publisher(&self) -> PublisherStats {
604		if let Some(entry) = &self.entry {
605			entry.publisher[self.tier.idx()]
606				.announced
607				.fetch_add(1, Ordering::Relaxed);
608		}
609		PublisherStats {
610			entry: self.entry.clone(),
611			tier: self.tier,
612		}
613	}
614
615	/// Open a broadcast-lifetime guard for the subscriber (ingress) role.
616	/// Bumps `announced` on construction and `announced_closed` on drop.
617	/// (The `broadcasts` sentinel is driven separately by
618	/// [`SessionBroadcasts`]; see the module docs.)
619	pub fn subscriber(&self) -> SubscriberStats {
620		if let Some(entry) = &self.entry {
621			entry.subscriber[self.tier.idx()]
622				.announced
623				.fetch_add(1, Ordering::Relaxed);
624		}
625		SubscriberStats {
626			entry: self.entry.clone(),
627			tier: self.tier,
628		}
629	}
630
631	/// Open a publisher-track guard.
632	///
633	/// `_name` is unused; counters are per-broadcast only. The track name
634	/// parameter is kept for symmetry with the rest of moq-net so callers
635	/// don't have to thread an `Option<&str>` through subscribe sites.
636	pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
637		if let Some(entry) = &self.entry {
638			entry.publisher[self.tier.idx()]
639				.subscriptions
640				.fetch_add(1, Ordering::Relaxed);
641		}
642		PublisherTrack {
643			entry: self.entry.clone(),
644			tier: self.tier,
645		}
646	}
647
648	/// Subscriber-side counterpart to [`Self::publisher_track`].
649	pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
650		if let Some(entry) = &self.entry {
651			entry.subscriber[self.tier.idx()]
652				.subscriptions
653				.fetch_add(1, Ordering::Relaxed);
654		}
655		SubscriberTrack {
656			entry: self.entry.clone(),
657			tier: self.tier,
658		}
659	}
660}
661
662/// Which side of a [`BroadcastEntry`] a [`SessionBroadcasts`] bumps.
663#[derive(Copy, Clone)]
664enum Side {
665	Publisher,
666	Subscriber,
667}
668
669impl Side {
670	fn counters(self, entry: &BroadcastEntry, tier: Tier) -> &Counters {
671		match self {
672			Side::Publisher => &entry.publisher[tier.idx()],
673			Side::Subscriber => &entry.subscriber[tier.idx()],
674		}
675	}
676}
677
678/// Per-session tracker that turns a peer session's per-broadcast subscription
679/// lifecycle into `broadcasts` / `broadcasts_closed` bumps.
680///
681/// Hold one per session (and side). Call [`Self::subscribe`] for every
682/// subscription the session opens and keep the returned [`BroadcastSubscription`]
683/// alive for that subscription's lifetime. The guard refcounts subscriptions per
684/// broadcast for this session, so the session's *first* subscription to a
685/// broadcast bumps `broadcasts` and its *last* to drop bumps `broadcasts_closed`.
686/// Summed across sessions, `broadcasts - broadcasts_closed` is the number of
687/// distinct sessions currently subscribed to the broadcast (viewers on the
688/// egress side).
689///
690/// Cheap to clone; clones share the same per-broadcast refcounts (so a single
691/// logical session that clones its handle still counts as one).
692#[derive(Clone)]
693pub struct SessionBroadcasts {
694	stats: Stats,
695	tier: Tier,
696	side: Side,
697	counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
698}
699
700impl SessionBroadcasts {
701	fn new(stats: Stats, tier: Tier, side: Side) -> Self {
702		Self {
703			stats,
704			tier,
705			side,
706			counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
707		}
708	}
709
710	/// Register one active subscription to `path` for this session. Hold the
711	/// returned guard for the subscription's lifetime; dropping it releases the
712	/// subscription (bumping `broadcasts_closed` when it was the session's last
713	/// for that broadcast).
714	pub fn subscribe(&self, path: impl AsPath) -> BroadcastSubscription {
715		let path = path.as_path().to_owned();
716		let entry = self.stats.entry(&path);
717		let first = {
718			let mut counts = self.counts.lock().expect("stats refcount poisoned");
719			let n = counts.entry(path.clone()).or_insert(0);
720			let first = *n == 0;
721			*n += 1;
722			first
723		};
724		if first {
725			if let Some(entry) = &entry {
726				self.side
727					.counters(entry, self.tier)
728					.broadcasts
729					.fetch_add(1, Ordering::Relaxed);
730			}
731		}
732		BroadcastSubscription {
733			entry,
734			tier: self.tier,
735			side: self.side,
736			counts: self.counts.clone(),
737			path,
738		}
739	}
740}
741
742/// RAII guard for one of a session's per-broadcast subscriptions.
743/// See [`SessionBroadcasts::subscribe`].
744#[must_use = "drop the guard to release the subscription"]
745pub struct BroadcastSubscription {
746	entry: Option<Arc<BroadcastEntry>>,
747	tier: Tier,
748	side: Side,
749	counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
750	path: PathOwned,
751}
752
753impl Drop for BroadcastSubscription {
754	fn drop(&mut self) {
755		let last = {
756			let mut counts = self.counts.lock().expect("stats refcount poisoned");
757			match counts.get_mut(&self.path) {
758				Some(n) => {
759					*n -= 1;
760					if *n == 0 {
761						counts.remove(&self.path);
762						true
763					} else {
764						false
765					}
766				}
767				None => false,
768			}
769		};
770		if last {
771			if let Some(entry) = &self.entry {
772				// Release pairs with the snapshot reader's Acquire load of
773				// `broadcasts_closed`; see `PublisherStats::drop`.
774				self.side
775					.counters(entry, self.tier)
776					.broadcasts_closed
777					.fetch_add(1, Ordering::Release);
778			}
779		}
780	}
781}
782
783/// RAII guard for a connected session, keyed by auth root and tier. Bumps
784/// `sessions` on construction and `sessions_closed` on drop. See
785/// [`StatsHandle::session`].
786#[must_use = "drop the guard to record the session as closed"]
787pub struct SessionStats {
788	/// `None` for a no-op aggregator; bumps are then dropped.
789	counters: Option<Arc<SessionCounters>>,
790}
791
792impl SessionStats {
793	fn new(counters: Option<Arc<SessionCounters>>) -> Self {
794		if let Some(counters) = &counters {
795			counters.sessions.fetch_add(1, Ordering::Relaxed);
796		}
797		Self { counters }
798	}
799}
800
801impl Drop for SessionStats {
802	fn drop(&mut self) {
803		if let Some(counters) = &self.counters {
804			// Release pairs with the snapshot reader's Acquire load of
805			// `sessions_closed`; see `PublisherStats::drop`.
806			counters.sessions_closed.fetch_add(1, Ordering::Release);
807		}
808	}
809}
810
811/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`].
812#[must_use = "drop the guard to record the broadcast as closed"]
813pub struct PublisherStats {
814	entry: Option<Arc<BroadcastEntry>>,
815	tier: Tier,
816}
817
818impl PublisherStats {
819	/// Open a track-subscription guard. Bumps `subscriptions` on construction
820	/// and `subscriptions_closed` on drop.
821	pub fn track(&self, name: &str) -> PublisherTrack {
822		BroadcastStats {
823			entry: self.entry.clone(),
824			tier: self.tier,
825		}
826		.publisher_track(name)
827	}
828}
829
830impl Drop for PublisherStats {
831	fn drop(&mut self) {
832		if let Some(entry) = &self.entry {
833			// Release pairs with the snapshot reader's Acquire load of
834			// `announced_closed`, propagating the open-bump from this
835			// guard's construction to whichever thread observes the close.
836			entry.publisher[self.tier.idx()]
837				.announced_closed
838				.fetch_add(1, Ordering::Release);
839		}
840	}
841}
842
843/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`].
844#[must_use = "drop the guard to record the broadcast as closed"]
845pub struct SubscriberStats {
846	entry: Option<Arc<BroadcastEntry>>,
847	tier: Tier,
848}
849
850impl SubscriberStats {
851	/// Open a track-subscription guard. Mirrors [`PublisherStats::track`].
852	pub fn track(&self, name: &str) -> SubscriberTrack {
853		BroadcastStats {
854			entry: self.entry.clone(),
855			tier: self.tier,
856		}
857		.subscriber_track(name)
858	}
859}
860
861impl Drop for SubscriberStats {
862	fn drop(&mut self) {
863		if let Some(entry) = &self.entry {
864			// See `PublisherStats::drop` for why this is Release.
865			entry.subscriber[self.tier.idx()]
866				.announced_closed
867				.fetch_add(1, Ordering::Release);
868		}
869	}
870}
871
872/// RAII subscription guard for the publisher role.
873#[must_use = "drop the guard to record the subscription as closed"]
874pub struct PublisherTrack {
875	entry: Option<Arc<BroadcastEntry>>,
876	tier: Tier,
877}
878
879impl PublisherTrack {
880	/// Bumps `frames` once.
881	pub fn frame(&self) {
882		if let Some(entry) = &self.entry {
883			entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
884		}
885	}
886
887	/// Bumps `bytes` by `n`.
888	pub fn bytes(&self, n: u64) {
889		if let Some(entry) = &self.entry {
890			entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
891		}
892	}
893
894	/// Bumps `groups` once.
895	pub fn group(&self) {
896		if let Some(entry) = &self.entry {
897			entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
898		}
899	}
900}
901
902impl Drop for PublisherTrack {
903	fn drop(&mut self) {
904		if let Some(entry) = &self.entry {
905			// See `PublisherStats::drop` for why this is Release.
906			entry.publisher[self.tier.idx()]
907				.subscriptions_closed
908				.fetch_add(1, Ordering::Release);
909		}
910	}
911}
912
913/// RAII subscription guard for the subscriber role.
914#[must_use = "drop the guard to record the subscription as closed"]
915pub struct SubscriberTrack {
916	entry: Option<Arc<BroadcastEntry>>,
917	tier: Tier,
918}
919
920impl SubscriberTrack {
921	/// Bumps `frames` once.
922	pub fn frame(&self) {
923		if let Some(entry) = &self.entry {
924			entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
925		}
926	}
927
928	/// Bumps `bytes` by `n`.
929	pub fn bytes(&self, n: u64) {
930		if let Some(entry) = &self.entry {
931			entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
932		}
933	}
934
935	/// Bumps `groups` once.
936	pub fn group(&self) {
937		if let Some(entry) = &self.entry {
938			entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
939		}
940	}
941}
942
943impl Drop for SubscriberTrack {
944	fn drop(&mut self) {
945		if let Some(entry) = &self.entry {
946			// See `PublisherStats::drop` for why this is Release.
947			entry.subscriber[self.tier.idx()]
948				.subscriptions_closed
949				.fetch_add(1, Ordering::Release);
950		}
951	}
952}
953
954/// Per-tick work for a single `(side, tier)` slot: build the emitted
955/// `Snapshot` from the raw counters, update the slot's `prev_emitted`, and
956/// hand the snap to `emit` iff the slot is live or changed this tick.
957fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
958	let raw = counters.snapshot();
959
960	let snap = Snapshot {
961		announced: raw.announced,
962		announced_closed: raw.announced_closed,
963		broadcasts: raw.broadcasts,
964		broadcasts_closed: raw.broadcasts_closed,
965		subscriptions: raw.subscriptions,
966		subscriptions_closed: raw.subscriptions_closed,
967		bytes: raw.bytes,
968		frames: raw.frames,
969		groups: raw.groups,
970	};
971
972	// A slot is live while any open counter still exceeds its `*_closed`
973	// counterpart: a guard is held, so a subscription could begin at any
974	// moment. Live slots are emitted every tick so a downstream "currently
975	// active" view always sees the full set. Once every pair is equal no
976	// traffic can flow and the entry is on its way out (the global GC drops
977	// it as soon as the last guard releases its `Arc`).
978	let live = snap.announced != snap.announced_closed
979		|| snap.subscriptions != snap.subscriptions_closed
980		|| snap.broadcasts != snap.broadcasts_closed;
981
982	// Include the entry whenever it's live OR its snapshot changed this
983	// tick. Change-driven inclusion catches bumps since the previous tick
984	// (incl. sub-tick flickers) and emits the final close snapshot on the
985	// tick a slot transitions to fully closed.
986	//
987	// `None` (slot never emitted) is treated as the default Snapshot so a
988	// first-tick all-zeros snap on an unused tier-side slot doesn't count
989	// as a "change". Without this, every entry would surface in all four
990	// tracks with zeros on the tick after creation even if only one slot
991	// is actually in use.
992	let prev_snap = slot_state.prev_emitted.unwrap_or_default();
993	let changed = snap != prev_snap;
994	if changed {
995		slot_state.prev_emitted = Some(snap);
996	}
997	if live || changed {
998		emit(snap);
999	}
1000}
1001
1002/// Snapshot-task-local change-detection state for one session-track root,
1003/// mirroring [`SlotState`].
1004#[derive(Default)]
1005struct SessionSlotState {
1006	prev_emitted: Option<SessionSnapshot>,
1007}
1008
1009/// Per-tick work for one session-track root (a `(tier, root)` gauge): build the
1010/// snapshot, update `prev_emitted`, and emit iff a session is connected
1011/// (`sessions != sessions_closed`) or the snapshot changed this tick. Same
1012/// live-or-changed rule as [`process_slot`].
1013fn process_session_slot(
1014	counters: &SessionCounters,
1015	slot_state: &mut SessionSlotState,
1016	mut emit: impl FnMut(SessionSnapshot),
1017) {
1018	let (sessions, sessions_closed) = counters.snapshot();
1019	let snap = SessionSnapshot {
1020		sessions,
1021		sessions_closed,
1022	};
1023
1024	let live = sessions != sessions_closed;
1025	let prev_snap = slot_state.prev_emitted.unwrap_or_default();
1026	let changed = snap != prev_snap;
1027	if changed {
1028		slot_state.prev_emitted = Some(snap);
1029	}
1030	if live || changed {
1031		emit(snap);
1032	}
1033}
1034
1035/// Serialize `frame` and write it to `track` unless it's byte-identical to
1036/// `last` (idle-frame skipping). On success `last` is updated; on a serialize
1037/// or write error it's left untouched so the next tick retries.
1038fn flush_track<T: Serialize>(track: &mut TrackProducer, frame: &T, last: &mut Vec<u8>, name: &str) {
1039	let json = match serde_json::to_vec(frame) {
1040		Ok(b) => b,
1041		Err(err) => {
1042			tracing::debug!(?err, name, "stats: failed to serialize frame");
1043			return;
1044		}
1045	};
1046	if &json == last {
1047		return;
1048	}
1049	if let Err(err) = track.write_frame(json.clone()) {
1050		tracing::debug!(?err, name, "stats: failed to write frame");
1051		return;
1052	}
1053	*last = json;
1054}
1055
1056/// Publishes the stats broadcast and writes a frame per tick. Spawned once by
1057/// [`Stats::new`] when an origin is set; runs until every [`Stats`] clone is
1058/// dropped (`weak.upgrade()` returns `None`).
1059async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
1060	let Some(shared) = weak.upgrade() else {
1061		return;
1062	};
1063
1064	let mut broadcast = Broadcast::new().produce();
1065
1066	// Create the four per-broadcast tracks and the two session tracks up front.
1067	let create = |broadcast: &mut crate::BroadcastProducer, name: &str| match broadcast.create_track(Track {
1068		name: name.into(),
1069		priority: 0,
1070	}) {
1071		Ok(t) => Some(t),
1072		Err(err) => {
1073			tracing::warn!(?err, name, "stats: failed to create track");
1074			None
1075		}
1076	};
1077
1078	let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
1079	for name in TRACK_ORDER {
1080		let Some(t) = create(&mut broadcast, name) else {
1081			return;
1082		};
1083		tracks.push(t);
1084	}
1085	let mut session_tracks: Vec<TrackProducer> = Vec::with_capacity(SESSION_TRACK_ORDER.len());
1086	for name in SESSION_TRACK_ORDER {
1087		let Some(t) = create(&mut broadcast, name) else {
1088			return;
1089		};
1090		session_tracks.push(t);
1091	}
1092
1093	if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
1094		tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
1095		return;
1096	}
1097	drop(shared);
1098
1099	// Per-path snapshot state owned by this task. Mirrors the global entries
1100	// and serves as the diff source for change detection across ticks.
1101	let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
1102	let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
1103	// Same, for the session tracks: per-tier root -> change-detection state.
1104	let mut session_local: [HashMap<PathOwned, SessionSlotState>; 2] = Default::default();
1105	let mut session_last_payload: [Vec<u8>; 2] = Default::default();
1106
1107	let mut ticker = tokio::time::interval(interval);
1108	ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1109
1110	loop {
1111		ticker.tick().await;
1112
1113		let Some(shared) = weak.upgrade() else {
1114			return;
1115		};
1116
1117		// Clone the current entries map into a Vec so we can drop the
1118		// global lock before the change-detection pass.
1119		let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
1120			let map = shared.entries.lock();
1121			map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1122		};
1123
1124		let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
1125		for (path, entry) in &entries {
1126			let snap_state = local.entry(path.clone()).or_default();
1127			for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
1128				process_slot(counters, slot_state, |snap| {
1129					frames[i].insert(path.as_str().to_string(), snap);
1130				});
1131			}
1132		}
1133		drop(entries);
1134
1135		// GC global entries: keep only those an external guard still holds.
1136		// `strong_count == 1` (just the map's own `Arc`) means no live
1137		// publisher/subscriber/track guard remains, so every open counter
1138		// has caught up to its `*_closed` counterpart and no traffic can
1139		// flow. We can't key this on the counters directly: a held but idle
1140		// `BroadcastStats` (all counters equal) must stay so a later bump
1141		// isn't lost on an orphaned `Arc`. Then drop local state for any
1142		// path that left the map. We already emitted each removed entry's
1143		// final snapshot above, so nothing is lost.
1144		{
1145			let mut map = shared.entries.lock();
1146			map.retain(|_, entry| Arc::strong_count(entry) > 1);
1147			local.retain(|path, _| map.contains_key(path));
1148		}
1149
1150		// Session tracks: one frame per tier, keyed by auth root.
1151		let mut session_frames: [BTreeMap<String, SessionSnapshot>; 2] = Default::default();
1152		for tier_idx in 0..2 {
1153			let roots: Vec<(PathOwned, Arc<SessionCounters>)> = {
1154				let map = shared.sessions[tier_idx].lock();
1155				map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1156			};
1157			let states = &mut session_local[tier_idx];
1158			for (root, counters) in &roots {
1159				let state = states.entry(root.clone()).or_default();
1160				process_session_slot(counters, state, |snap| {
1161					session_frames[tier_idx].insert(root.as_str().to_string(), snap);
1162				});
1163			}
1164			drop(roots);
1165
1166			// GC roots whose last session guard has dropped (`strong_count == 1`
1167			// is just the map's own `Arc`), then forget their local state. The
1168			// final snapshot was already emitted above.
1169			let mut map = shared.sessions[tier_idx].lock();
1170			map.retain(|_, counters| Arc::strong_count(counters) > 1);
1171			states.retain(|root, _| map.contains_key(root));
1172		}
1173
1174		for (i, (frame, last)) in frames.iter().zip(last_payload.iter_mut()).enumerate() {
1175			flush_track(&mut tracks[i], frame, last, TRACK_ORDER[i]);
1176		}
1177		for (i, (frame, last)) in session_frames.iter().zip(session_last_payload.iter_mut()).enumerate() {
1178			flush_track(&mut session_tracks[i], frame, last, SESSION_TRACK_ORDER[i]);
1179		}
1180
1181		drop(shared);
1182	}
1183}
1184
1185/// What we emit for one entry on one tier-role track. Every field comes
1186/// straight from [`RawCounts`]; `broadcasts` / `broadcasts_closed` are the
1187/// per-(broadcast, session) subscription sentinel maintained by
1188/// [`SessionBroadcasts`].
1189#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
1190#[cfg_attr(test, derive(serde::Deserialize))]
1191struct Snapshot {
1192	announced: u64,
1193	announced_closed: u64,
1194	broadcasts: u64,
1195	broadcasts_closed: u64,
1196	subscriptions: u64,
1197	subscriptions_closed: u64,
1198	bytes: u64,
1199	frames: u64,
1200	groups: u64,
1201}
1202
1203/// What we emit for one root on a session track. `sessions - sessions_closed`
1204/// is the live session count for the root.
1205#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
1206#[cfg_attr(test, derive(serde::Deserialize))]
1207struct SessionSnapshot {
1208	sessions: u64,
1209	sessions_closed: u64,
1210}
1211
1212fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
1213	// The fixed `node` category leaves room for sibling categories (e.g.
1214	// `<top-prefix>/cluster` for relay-mesh stats) under the same prefix.
1215	let mut out = format!("{}/node", prefix.as_str());
1216	if let Some(node) = node {
1217		out.push('/');
1218		out.push_str(node);
1219	}
1220	PathOwned::from(out)
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225	use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
1226
1227	use crate::{Origin, Path};
1228
1229	use super::*;
1230
1231	fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
1232		let origin = Origin::random().produce();
1233		let stats = Stats::new(
1234			StatsConfig::new()
1235				.with_origin(origin.clone())
1236				.with_node(node.map(|s| PathOwned::from(s.to_string()))),
1237		);
1238		(stats, origin)
1239	}
1240
1241	#[test]
1242	fn advertised_path_with_and_without_node() {
1243		let prefix = Path::new(".stats");
1244		assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
1245		assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
1246		assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
1247
1248		let prefix = Path::new("metrics");
1249		assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
1250	}
1251
1252	/// The advertised path normalizes a messy node suffix and drops an
1253	/// all-empty one. Observed through the announced path, since the task
1254	/// announces at construction.
1255	async fn announced_path_for_node(node: &str) -> String {
1256		let origin = Origin::random().produce();
1257		let _stats = Stats::new(
1258			StatsConfig::new()
1259				.with_origin(origin.clone())
1260				.with_node(PathOwned::from(node.to_string())),
1261		);
1262		let mut consumer = origin.consume();
1263		tokio::time::advance(Duration::from_millis(1)).await;
1264		let (path, _broadcast) = consumer.announced().await.expect("expected announce");
1265		path.as_str().to_string()
1266	}
1267
1268	#[tokio::test(start_paused = true)]
1269	async fn new_normalizes_and_drops_empty_node() {
1270		assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
1271		assert_eq!(announced_path_for_node("///").await, ".stats/node");
1272	}
1273
1274	#[tokio::test(start_paused = true)]
1275	async fn per_broadcast_counters_isolated() {
1276		// Bumps on one broadcast must not leak into another.
1277		let (stats, _origin) = test_stats(Some("sjc"));
1278		let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
1279		let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
1280		let g1 = bs1.publisher().track("video");
1281		g1.bytes(100);
1282		let g2 = bs2.publisher().track("video");
1283		g2.bytes(7);
1284
1285		let entries = stats.shared().entries.lock();
1286		let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1287		let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
1288		assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1289		assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
1290	}
1291
1292	#[tokio::test(start_paused = true)]
1293	async fn external_and_internal_tiers_are_independent() {
1294		let (stats, _origin) = test_stats(Some("sjc"));
1295		let ext = stats.tier(Tier::External);
1296		let int = stats.tier(Tier::Internal);
1297
1298		let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
1299		ext_track.bytes(100);
1300		let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
1301		int_track.bytes(7);
1302
1303		let entries = stats.shared().entries.lock();
1304		let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1305		assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1306		assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
1307		assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
1308		assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
1309	}
1310
1311	#[tokio::test(start_paused = true)]
1312	async fn paths_under_prefix_are_no_op() {
1313		// Our own stats broadcasts (and any sibling category under the same
1314		// prefix) must not feed back into the aggregator.
1315		let (stats, _origin) = test_stats(Some("sjc"));
1316		let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
1317		assert!(bs.is_empty());
1318		let p = bs.publisher();
1319		let track = p.track("video");
1320		track.bytes(100);
1321		drop(track);
1322		drop(p);
1323		assert!(stats.shared().entries.lock().is_empty());
1324	}
1325
1326	#[tokio::test(start_paused = true)]
1327	async fn disabled_stats_are_noop() {
1328		// A no-op aggregator (no origin) allocates no shared state and never
1329		// announces; every handle is empty and bumps are dropped.
1330		let stats = Stats::default();
1331		assert!(stats.shared.is_none());
1332		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1333		assert!(bs.is_empty());
1334		let p = bs.publisher();
1335		let track = p.track("video");
1336		track.bytes(100);
1337		drop(track);
1338		drop(p);
1339	}
1340
1341	#[tokio::test(start_paused = true)]
1342	async fn single_broadcast_path_announced() {
1343		// No matter how many broadcasts get bumped, exactly one stats
1344		// broadcast is announced (the per-node aggregate).
1345		let (stats, origin) = test_stats(Some("sjc/1"));
1346		let mut consumer = origin.consume();
1347
1348		let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
1349		let _t1 = bs1.publisher().track("video");
1350		let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
1351		let _t2 = bs2.publisher().track("video");
1352
1353		tokio::time::advance(Duration::from_millis(1)).await;
1354		let (path, broadcast) = consumer.announced().await.expect("expected announce");
1355		assert!(broadcast.is_some());
1356		assert_eq!(path.as_str(), ".stats/node/sjc/1");
1357	}
1358
1359	#[tokio::test(start_paused = true)]
1360	async fn task_announces_without_node_suffix() {
1361		let origin = Origin::random().produce();
1362		let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
1363		let mut consumer = origin.consume();
1364
1365		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1366		let _t = bs.publisher().track("video");
1367
1368		tokio::time::advance(Duration::from_millis(1)).await;
1369		let (path, broadcast) = consumer.announced().await.expect("expected announce");
1370		assert!(broadcast.is_some());
1371		assert_eq!(path.as_str(), ".stats/node");
1372	}
1373
1374	/// Drives the snapshot task forward by `count` ticks. In paused-time
1375	/// tests, `tokio::time::advance` doesn't poll spawned tasks itself; we
1376	/// have to combine it with explicit awaits. This helper interleaves
1377	/// `advance` with `consumer.announced()` (and later `yield_now` calls)
1378	/// so the task wakes, processes the tick, and re-parks each iteration.
1379	async fn drive_ticks(count: u32) {
1380		for _ in 0..count {
1381			tokio::time::advance(Duration::from_secs(1)).await;
1382			// Yield several times to let the task wake, snapshot, write the
1383			// frame, and re-await the next tick.
1384			for _ in 0..4 {
1385				tokio::task::yield_now().await;
1386			}
1387		}
1388	}
1389
1390	#[tokio::test(start_paused = true)]
1391	async fn live_entry_kept_while_idle() {
1392		// A broadcast with a live announce guard but no traffic must stay in
1393		// the map indefinitely: announced != announced_closed means a
1394		// subscription could still begin at any moment.
1395		let (stats, _origin) = test_stats(Some("sjc"));
1396		let key = PathOwned::from("foo/bar".to_string());
1397		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1398		let guard = bs.publisher();
1399
1400		drive_ticks(5).await;
1401		assert!(
1402			stats.shared().entries.lock().contains_key(&key),
1403			"announced-but-idle broadcast must stay while the guard is held"
1404		);
1405
1406		drop(guard);
1407		drop(bs);
1408		// announced == announced_closed now, and no guard holds the Arc, so
1409		// the entry is dropped on the next tick.
1410		drive_ticks(1).await;
1411		assert!(
1412			!stats.shared().entries.lock().contains_key(&key),
1413			"entry dropped once the announce guard closes"
1414		);
1415	}
1416
1417	#[tokio::test(start_paused = true)]
1418	async fn entry_dropped_once_fully_closed() {
1419		// Once every open counter equals its `*_closed` counterpart and no
1420		// guard holds the Arc, the entry is removed the very next tick.
1421		let (stats, _origin) = test_stats(Some("sjc"));
1422		let key = PathOwned::from("foo/bar".to_string());
1423		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1424		let track = bs.publisher().track("video");
1425
1426		drive_ticks(1).await;
1427		assert!(
1428			stats.shared().entries.lock().contains_key(&key),
1429			"live entry present while the track guard is held"
1430		);
1431
1432		drop(track);
1433		drop(bs);
1434		drive_ticks(1).await;
1435		assert!(
1436			!stats.shared().entries.lock().contains_key(&key),
1437			"fully-closed entry dropped on the next tick"
1438		);
1439	}
1440
1441	#[tokio::test(start_paused = true)]
1442	async fn frame_emits_expected_counters() {
1443		let (stats, origin) = test_stats(Some("sjc"));
1444		let mut consumer = origin.consume();
1445		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1446		let track = bs.publisher().track("video");
1447		track.bytes(42);
1448		track.frame();
1449		let sessions = stats.tier(Tier::External).publisher_broadcasts();
1450		let _sub = sessions.subscribe("foo/bar");
1451
1452		tokio::time::advance(Duration::from_millis(1100)).await;
1453
1454		let (_path, broadcast) = consumer.announced().await.expect("expected announce");
1455		let broadcast = broadcast.expect("active");
1456		let track = broadcast
1457			.subscribe_track(&Track {
1458				name: "publisher.json".into(),
1459				priority: 0,
1460			})
1461			.expect("subscribe");
1462		let frame = read_frame(track).await;
1463		let snap = frame.get("foo/bar").expect("foo/bar entry");
1464		assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
1465		assert_eq!(snap.broadcasts, 1, "one session subscribed");
1466		assert_eq!(snap.subscriptions, 1);
1467		assert_eq!(snap.bytes, 42);
1468		assert_eq!(snap.frames, 1);
1469	}
1470
1471	#[tokio::test(start_paused = true)]
1472	async fn announced_decouples_from_broadcasts() {
1473		// publisher() (announce) with no subscription should bump announced but
1474		// NOT broadcasts (which only counts sessions with an active sub).
1475		let (stats, origin) = test_stats(Some("sjc"));
1476		let mut consumer = origin.consume();
1477		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1478		let _guard = bs.publisher();
1479
1480		tokio::time::advance(Duration::from_millis(1100)).await;
1481
1482		let (_path, broadcast) = consumer.announced().await.expect("announce");
1483		let broadcast = broadcast.expect("active");
1484		let track = broadcast
1485			.subscribe_track(&Track {
1486				name: "publisher.json".into(),
1487				priority: 0,
1488			})
1489			.expect("subscribe");
1490		let frame = read_frame(track).await;
1491		let snap = frame.get("foo/bar").expect("foo/bar entry");
1492		assert_eq!(snap.announced, 1);
1493		assert_eq!(snap.broadcasts, 0, "no subscription, no broadcasts sentinel");
1494		assert_eq!(snap.subscriptions, 0);
1495	}
1496
1497	#[tokio::test(start_paused = true)]
1498	async fn short_lived_sub_is_surfaced() {
1499		// A subscription that opens AND closes within a single tick window
1500		// must still surface as a complete broadcasts open/close cycle. The
1501		// cumulative counters retain broadcasts=1/broadcasts_closed=1, and the
1502		// change-driven inclusion surfaces the entry even though it's net-idle
1503		// by snapshot time.
1504		let (stats, origin) = test_stats(Some("sjc"));
1505		let mut consumer = origin.consume();
1506		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1507		let sessions = stats.tier(Tier::External).publisher_broadcasts();
1508		{
1509			let track = bs.publisher().track("video");
1510			track.bytes(123);
1511			track.frame();
1512			let _sub = sessions.subscribe("foo/bar");
1513			// track + sub dropped here, all within tick 1
1514		}
1515
1516		tokio::time::advance(Duration::from_millis(1100)).await;
1517
1518		let (_path, broadcast) = consumer.announced().await.expect("announce");
1519		let broadcast = broadcast.expect("active");
1520		let track = broadcast
1521			.subscribe_track(&Track {
1522				name: "publisher.json".into(),
1523				priority: 0,
1524			})
1525			.expect("subscribe");
1526		let frame = read_frame(track).await;
1527		let snap = frame.get("foo/bar").expect("foo/bar entry");
1528		// One session opened then closed a subscription within the tick.
1529		assert_eq!(snap.subscriptions, 1);
1530		assert_eq!(snap.subscriptions_closed, 1);
1531		assert_eq!(snap.broadcasts, 1, "one session subscribed");
1532		assert_eq!(snap.broadcasts_closed, 1);
1533		assert_eq!(snap.bytes, 123);
1534		assert_eq!(snap.frames, 1);
1535	}
1536
1537	#[tokio::test(start_paused = true)]
1538	async fn multiple_subs_count_as_one_broadcast() {
1539		// Two concurrent subs from the SAME session count as one broadcast, not
1540		// two: broadcasts is "distinct sessions with >=1 active sub", not
1541		// "subscription count". broadcasts_closed only bumps once the session's
1542		// last sub for the broadcast closes.
1543		let (stats, _origin) = test_stats(Some("sjc"));
1544		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1545		let sessions = stats.tier(Tier::External).publisher_broadcasts();
1546		let pub_guard = bs.publisher();
1547		let t1 = pub_guard.track("video");
1548		let t2 = pub_guard.track("audio");
1549		let s1 = sessions.subscribe("foo/bar");
1550		let s2 = sessions.subscribe("foo/bar");
1551
1552		let raw = || {
1553			let entries = stats.shared().entries.lock();
1554			let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1555			entry.publisher[Tier::External.idx()].snapshot()
1556		};
1557
1558		let r = raw();
1559		assert_eq!(r.subscriptions, 2, "two track subs");
1560		assert_eq!(r.subscriptions_closed, 0, "neither dropped yet");
1561		assert_eq!(r.broadcasts, 1, "one session => one broadcast");
1562		assert_eq!(r.broadcasts_closed, 0);
1563
1564		drop(s1);
1565		assert_eq!(raw().broadcasts_closed, 0, "session still has a sub open");
1566
1567		drop(s2);
1568		drop(t1);
1569		drop(t2);
1570		let r = raw();
1571		assert_eq!(r.subscriptions_closed, 2, "both track subs dropped");
1572		assert_eq!(r.broadcasts, 1);
1573		assert_eq!(r.broadcasts_closed, 1, "last sub closed => one broadcasts_closed");
1574
1575		drop(pub_guard);
1576		drop(bs);
1577	}
1578
1579	#[tokio::test(start_paused = true)]
1580	async fn distinct_sessions_count_as_separate_broadcasts() {
1581		// The viewer-count invariant: two different sessions subscribing to the
1582		// same broadcast bump broadcasts to 2 (each is a distinct viewer).
1583		let (stats, _origin) = test_stats(Some("sjc"));
1584		let viewer1 = stats.tier(Tier::External).publisher_broadcasts();
1585		let viewer2 = stats.tier(Tier::External).publisher_broadcasts();
1586
1587		let raw = || {
1588			let entries = stats.shared().entries.lock();
1589			let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1590			entry.publisher[Tier::External.idx()].snapshot()
1591		};
1592
1593		let s1 = viewer1.subscribe("foo/bar");
1594		assert_eq!(raw().broadcasts, 1, "one viewer");
1595		let s2 = viewer2.subscribe("foo/bar");
1596		assert_eq!(raw().broadcasts, 2, "two distinct viewers");
1597		assert_eq!(raw().broadcasts_closed, 0);
1598
1599		drop(s1);
1600		let r = raw();
1601		assert_eq!(r.broadcasts, 2, "broadcasts is cumulative");
1602		assert_eq!(r.broadcasts_closed, 1, "one viewer left");
1603		// broadcasts - broadcasts_closed = 1 remaining viewer.
1604
1605		drop(s2);
1606		assert_eq!(raw().broadcasts_closed, 2, "both viewers gone");
1607	}
1608
1609	#[tokio::test(start_paused = true)]
1610	async fn session_counts_by_root() {
1611		// session() counts connected sessions per auth root, independent of any
1612		// broadcast: open bumps `sessions`, drop bumps `sessions_closed`.
1613		let (stats, _origin) = test_stats(Some("sjc"));
1614		let ext = stats.tier(Tier::External);
1615
1616		let snap = |root: &str| {
1617			let map = stats.shared().sessions[Tier::External.idx()].lock();
1618			map.get(&PathOwned::from(root.to_string())).map(|c| c.snapshot())
1619		};
1620
1621		let a1 = ext.session("acme");
1622		let a2 = ext.session("acme");
1623		let b1 = ext.session("globex");
1624		assert_eq!(snap("acme"), Some((2, 0)), "two sessions under one root");
1625		assert_eq!(snap("globex"), Some((1, 0)), "a distinct root is counted separately");
1626
1627		drop(a1);
1628		assert_eq!(snap("acme"), Some((2, 1)));
1629		drop(a2);
1630		drop(b1);
1631		assert_eq!(snap("acme"), Some((2, 2)));
1632		assert_eq!(snap("globex"), Some((1, 1)));
1633	}
1634
1635	#[tokio::test(start_paused = true)]
1636	async fn session_track_surfaces_by_root() {
1637		let (stats, origin) = test_stats(Some("sjc"));
1638		let mut consumer = origin.consume();
1639		let _a = stats.tier(Tier::External).session("acme");
1640		let _b = stats.tier(Tier::External).session("acme");
1641		let _c = stats.tier(Tier::Internal).session("peer");
1642
1643		tokio::time::advance(Duration::from_millis(1100)).await;
1644
1645		let (_path, broadcast) = consumer.announced().await.expect("announce");
1646		let broadcast = broadcast.expect("active");
1647
1648		let track = broadcast
1649			.subscribe_track(&Track {
1650				name: "sessions.json".into(),
1651				priority: 0,
1652			})
1653			.expect("subscribe");
1654		let frame = read_session_frame(track).await;
1655		let snap = frame.get("acme").expect("root entry");
1656		assert_eq!(snap.sessions, 2);
1657		assert_eq!(snap.sessions_closed, 0);
1658		assert!(
1659			!frame.contains_key("peer"),
1660			"internal session must not appear on the external track"
1661		);
1662
1663		let int_track = broadcast
1664			.subscribe_track(&Track {
1665				name: "internal/sessions.json".into(),
1666				priority: 0,
1667			})
1668			.expect("subscribe");
1669		let snap = *read_session_frame(int_track).await.get("peer").expect("internal entry");
1670		assert_eq!(snap.sessions, 1);
1671	}
1672
1673	#[tokio::test(start_paused = true)]
1674	async fn session_root_dropped_when_empty() {
1675		// Once the last session under a root disconnects, the root leaves the
1676		// map on the next tick (its final snapshot already emitted).
1677		let (stats, _origin) = test_stats(Some("sjc"));
1678		let key = PathOwned::from("acme");
1679		let session = stats.tier(Tier::External).session("acme");
1680
1681		drive_ticks(1).await;
1682		assert!(
1683			stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
1684			"root present while a session is connected"
1685		);
1686
1687		drop(session);
1688		drive_ticks(1).await;
1689		assert!(
1690			!stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
1691			"root GC'd after the last session leaves"
1692		);
1693	}
1694
1695	#[tokio::test(start_paused = true)]
1696	async fn unused_slots_dont_surface() {
1697		// A broadcast that only sees External Publisher traffic must NOT
1698		// appear in the other three tracks with zero counters. Regression
1699		// for the "None != Some(default)" first-tick change-detection bug:
1700		// without the unwrap_or_default fix, every entry would surface
1701		// once in every track even when only one slot had real activity.
1702		let (stats, origin) = test_stats(Some("sjc"));
1703		let mut consumer = origin.consume();
1704		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1705		let track = bs.publisher().track("video");
1706		track.frame();
1707
1708		drive_ticks(2).await;
1709
1710		let (_path, broadcast) = consumer.announced().await.expect("announce");
1711		let broadcast = broadcast.expect("active");
1712
1713		// External publisher slot SHOULD include foo/bar.
1714		let pub_track = broadcast
1715			.subscribe_track(&Track {
1716				name: "publisher.json".into(),
1717				priority: 0,
1718			})
1719			.expect("subscribe");
1720		assert!(
1721			read_frame(pub_track).await.contains_key("foo/bar"),
1722			"publisher.json must include the active foo/bar entry"
1723		);
1724
1725		// The other three slots had zero activity. The first frame on
1726		// each must be `{}`, not `{"foo/bar": {all zeros}}`.
1727		for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
1728			let t = broadcast
1729				.subscribe_track(&Track {
1730					name: name.into(),
1731					priority: 0,
1732				})
1733				.expect("subscribe");
1734			let frame = read_frame(t).await;
1735			assert!(
1736				frame.is_empty(),
1737				"{name} must be empty for an entry with no activity on that slot, got {frame:?}",
1738			);
1739		}
1740	}
1741
1742	#[test]
1743	fn snapshot_reads_closed_before_open() {
1744		// Reading closed counters before their open counterparts is the
1745		// guarantee that the emitted Snapshot never shows close > open
1746		// under concurrent bumps. This unit-test pins the ordering at the
1747		// source level so a future refactor that re-orders the loads
1748		// trips the test.
1749		let src = include_str!("stats.rs");
1750		// Find the body of `impl Counters { fn snapshot(...) ... }` and
1751		// check the line order.
1752		let body_start = src
1753			.find("fn snapshot(&self) -> RawCounts")
1754			.expect("snapshot fn present");
1755		let body = &src[body_start..];
1756		let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
1757		let open_pos = body.find("self.announced.load(").expect("announced load");
1758		assert!(
1759			closed_pos < open_pos,
1760			"announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
1761		);
1762		let subs_closed_pos = body
1763			.find("self.subscriptions_closed.load")
1764			.expect("subscriptions_closed load");
1765		let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
1766		assert!(
1767			subs_closed_pos < subs_pos,
1768			"subscriptions_closed must be loaded before subscriptions",
1769		);
1770		let bcast_closed_pos = body
1771			.find("self.broadcasts_closed.load")
1772			.expect("broadcasts_closed load");
1773		let bcast_pos = body.find("self.broadcasts.load").expect("broadcasts load");
1774		assert!(
1775			bcast_closed_pos < bcast_pos,
1776			"broadcasts_closed must be loaded before broadcasts",
1777		);
1778	}
1779
1780	#[test]
1781	fn session_snapshot_reads_closed_before_open() {
1782		// Same `closed`-before-`open` invariant as `Counters::snapshot`, pinned
1783		// at the source level so a reordering refactor can't let
1784		// `sessions_closed > sessions` leak into an emitted session frame.
1785		let src = include_str!("stats.rs");
1786		let body_start = src
1787			.find("fn snapshot(&self) -> (u64, u64)")
1788			.expect("SessionCounters::snapshot fn present");
1789		let body = &src[body_start..];
1790		let closed_pos = body.find("self.sessions_closed.load").expect("sessions_closed load");
1791		let open_pos = body.find("self.sessions.load").expect("sessions load");
1792		assert!(closed_pos < open_pos, "sessions_closed must be loaded before sessions",);
1793	}
1794
1795	async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
1796		let bytes = track.read_frame().await.expect("ok").expect("frame");
1797		serde_json::from_slice(&bytes).expect("json parse")
1798	}
1799
1800	async fn read_session_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, SessionSnapshot> {
1801		let bytes = track.read_frame().await.expect("ok").expect("frame");
1802		serde_json::from_slice(&bytes).expect("json parse")
1803	}
1804}