Skip to main content

moq_net/
stats.rs

1//! Generic stats publishing for moq-net sessions.
2//!
3//! [`Stats`] aggregates per-broadcast counter bumps for traffic this relay
4//! node is handling and publishes them on a single `<prefix>/node/<node>`
5//! broadcast (or `<prefix>/node` when no node is configured). The broadcast
6//! carries four tracks, one per `(tier, role)` pair:
7//!
8//! * `publisher.json`           : external (e.g. customer) egress
9//! * `subscriber.json`          : external ingress
10//! * `internal/publisher.json`  : internal (e.g. mTLS cluster peer) egress
11//! * `internal/subscriber.json` : internal ingress
12//!
13//! Each frame is a JSON object mapping broadcast path to a cumulative
14//! counter snapshot. Tier, role, and node are implied by the track and
15//! broadcast paths, so they aren't repeated inside the frame. An entry
16//! appears in the frame for a given `(tier, role)` on any tick where the
17//! broadcast is live (any open counter still exceeds its `*_closed`
18//! counterpart, so a subscription could begin at any moment) or its
19//! snapshot changed since the previous tick. Once every counter equals its
20//! `*_closed` counterpart no traffic can flow, so the entry is dropped. A
21//! downstream aggregator computes rates from successive cumulative
22//! snapshots and slices the data however a dashboard wants.
23//!
24//! Per-snapshot semantics:
25//!
26//! * `announced` / `announced_closed`: cumulative count of broadcast
27//!   announce/unannounce events on this `(tier, role)`. Bumped on every
28//!   `publisher()` / `subscriber()` guard creation and drop.
29//! * `broadcasts` / `broadcasts_closed`: derived by the snapshot task from
30//!   subscription transitions. `broadcasts` bumps each tick the slot
31//!   transitions from "no active subs" to "one or more active subs" (or
32//!   when subs flickered through 0 within a tick). `broadcasts_closed`
33//!   bumps on the reverse transition. Use these for "active broadcasts"
34//!   billing/UI metrics; use `announced` if you want all broadcasts ever
35//!   seen.
36//! * `subscriptions` / `subscriptions_closed`: cumulative count of
37//!   track-level subscription guards opened/dropped.
38//! * `bytes` / `frames` / `groups`: cumulative payload counters bumped from
39//!   the lite session loops.
40//!
41//! Counters are strictly monotonic (only `fetch_add`); a counter going
42//! backwards across snapshots means the underlying entry was garbage
43//! collected and re-created. Downstream consumers should treat decreases
44//! as a fresh session segment, summing across resets when computing
45//! lifetime totals.
46//!
47//! A caller hands each session a tier-scoped [`StatsHandle`] (built from the
48//! single shared [`Stats`] via [`Stats::tier`]) which determines which counter
49//! set its bumps land in. Multiple relays in the same cluster origin can
50//! coexist by giving each one a distinct `<node>` suffix on the advertised
51//! path. The suffix itself may be multi-segment (e.g. `sjc/1`, `sjc/2`) so a
52//! region with multiple hosts can nest under a shared region key without
53//! colliding.
54//!
55//! # Disabled stats
56//!
57//! A [`StatsConfig`] with no origin (the default) builds a no-op aggregator:
58//! all counter bumps are silently dropped, no snapshot task spawns, and no
59//! broadcast is published. [`Stats::default`] / [`StatsHandle::default`]
60//! return one, so call sites can hold a [`StatsHandle`] unconditionally
61//! instead of threading an `Option`.
62//!
63//! # Lifecycle
64//!
65//! When the config has an origin, [`Stats::new`] spawns the snapshot task
66//! immediately, publishes the stats broadcast, and ticks at the configured
67//! interval, writing a frame per (tier, role) track. The broadcast stays
68//! announced for the lifetime of the [`Stats`] aggregator, even while idle
69//! (frames just go to `{}`). The task exits when the last [`Stats`] clone is
70//! dropped (the task holds only a `Weak` to the shared state).
71//!
72//! # Idle frame skipping
73//!
74//! On each tick the task compares the just-built per-(tier, role) JSON payload
75//! against the last one it emitted and writes a frame only when something
76//! changed. New subscribers still pick up a baseline immediately because
77//! track-latest semantics retain the most recent emitted frame.
78//!
79//! # Snapshot atomicity
80//!
81//! Each [`Counters`] snapshot reads `*_closed` atomics (with `Acquire`)
82//! before their open counterparts (with `Relaxed`). The matching close
83//! bumps in the RAII guards' `Drop` impls use `Release`. With this
84//! pairing the snapshot always satisfies `open >= closed` even on
85//! weakly-ordered architectures (ARM, POWER): the `Acquire` load of
86//! close synchronizes-with the `Release` bump that produced the
87//! observed value, making every write that happened-before that close
88//! (including the matching open bump on whichever thread opened the
89//! guard) visible to the snapshot thread. Open / payload counters can
90//! then stay `Relaxed` because the visibility comes for free through
91//! the close pairing. The cost is a slight upward bias on the open
92//! counts when a bump lands between the two loads, which never produces
93//! a logically impossible (`closed > open`) snapshot for downstream.
94//!
95//! # Cycles
96//!
97//! Calling [`StatsHandle::broadcast`] for a path under the configured
98//! top-level prefix returns an empty handle whose bumps no-op. This breaks
99//! the feedback loop where serving a `<top-prefix>/...` broadcast would
100//! itself generate more stats traffic.
101
102use std::{
103	collections::{BTreeMap, HashMap},
104	sync::{
105		Arc, Weak,
106		atomic::{AtomicU64, Ordering},
107	},
108	time::Duration,
109};
110
111use serde::Serialize;
112use web_async::{Lock, spawn};
113
114use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
115
116/// Cumulative atomic counters for a single `(tier, role)` on a broadcast.
117///
118/// Only `announced` / `announced_closed` and `subscriptions` /
119/// `subscriptions_closed` (and the payload counters) are bumped from the
120/// hot bump path. `broadcasts` / `broadcasts_closed` are not stored here;
121/// they're derived in the snapshot task from observed subscription
122/// transitions.
123#[derive(Default, Debug)]
124#[non_exhaustive]
125pub struct Counters {
126	pub announced: AtomicU64,
127	pub announced_closed: AtomicU64,
128	pub subscriptions: AtomicU64,
129	pub subscriptions_closed: AtomicU64,
130	pub bytes: AtomicU64,
131	pub frames: AtomicU64,
132	pub groups: AtomicU64,
133}
134
135impl Counters {
136	/// Read all atomics into a `RawCounts`. Closed counters are read with
137	/// `Acquire` ordering before their open counterparts so the snapshot
138	/// always satisfies `open >= closed`; see the module-level "Snapshot
139	/// atomicity" note. Open / payload counters stay `Relaxed`: the
140	/// Acquire on close synchronizes-with the matching Release on the
141	/// close bump, which transitively makes all earlier writes (including
142	/// the prior open bump) visible to this thread.
143	fn snapshot(&self) -> RawCounts {
144		let announced_closed = self.announced_closed.load(Ordering::Acquire);
145		let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
146		let announced = self.announced.load(Ordering::Relaxed);
147		let subscriptions = self.subscriptions.load(Ordering::Relaxed);
148		let bytes = self.bytes.load(Ordering::Relaxed);
149		let frames = self.frames.load(Ordering::Relaxed);
150		let groups = self.groups.load(Ordering::Relaxed);
151		RawCounts {
152			announced,
153			announced_closed,
154			subscriptions,
155			subscriptions_closed,
156			bytes,
157			frames,
158			groups,
159		}
160	}
161}
162
163/// Raw counter readout, before the snapshot task layers on derived
164/// `broadcasts` / `broadcasts_closed`. Intermediate type that doesn't
165/// escape this module.
166#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
167struct RawCounts {
168	announced: u64,
169	announced_closed: u64,
170	subscriptions: u64,
171	subscriptions_closed: u64,
172	bytes: u64,
173	frames: u64,
174	groups: u64,
175}
176
177/// Distinguishes traffic classes so a single [`Stats`] can record
178/// customer-facing and cluster-peer traffic separately. Each tracked
179/// broadcast keeps per-tier [`Counters`] on both its publisher and
180/// subscriber sides.
181#[derive(Copy, Clone, Debug, PartialEq, Eq)]
182pub enum Tier {
183	External,
184	Internal,
185}
186
187impl Tier {
188	fn idx(self) -> usize {
189		match self {
190			Tier::External => 0,
191			Tier::Internal => 1,
192		}
193	}
194}
195
196/// Settings for a [`Stats`] aggregator. Construct with [`StatsConfig::new`]
197/// and chain the `with_*` setters (e.g.
198/// `StatsConfig::new().with_origin(origin).with_prefix(".foo")`), then hand it
199/// to [`Stats::new`].
200///
201/// With no origin set the resulting aggregator is a no-op: bumps are dropped
202/// and no task spawns. Call [`StatsConfig::with_origin`] to publish.
203///
204/// Distinct from the relay's clap-derived `StatsConfig`, which holds the raw
205/// CLI/TOML knobs and resolves into one of these.
206///
207/// `#[non_exhaustive]` so new knobs can land without breaking call sites; build
208/// via [`StatsConfig::new`] rather than a struct literal.
209#[derive(Clone)]
210#[non_exhaustive]
211pub struct StatsConfig {
212	/// Origin that receives the stats broadcast's `publish_broadcast` calls.
213	/// When `None`, [`Stats::new`] spawns no task and publishes nothing.
214	pub origin: Option<OriginProducer>,
215	/// Top-level path stats are published under (default `.stats`). The full
216	/// advertised path is `<prefix>/node/<node>` (or `<prefix>/node` when
217	/// `node` is unset).
218	pub prefix: PathOwned,
219	/// Node suffix that disambiguates broadcasts from different relays sharing a
220	/// cluster origin. Set this on every node in multi-relay deployments. May be
221	/// multi-segment (e.g. `sjc/1`, `sjc/2`) so a region with multiple hosts can
222	/// nest under a shared region key. An empty path is treated as unset.
223	/// Default none.
224	pub node: Option<PathOwned>,
225	/// How long the snapshot task waits between publishes. Default 1s.
226	pub interval: Duration,
227}
228
229impl StatsConfig {
230	/// A config with default settings: no origin (no-op), `.stats` prefix, 1s
231	/// snapshot interval, and no node suffix. Call [`Self::with_origin`] to
232	/// actually publish.
233	pub fn new() -> Self {
234		Self {
235			origin: None,
236			prefix: PathOwned::from(".stats"),
237			node: None,
238			interval: Duration::from_secs(1),
239		}
240	}
241
242	/// Set the origin to publish the stats broadcast on. Without this the
243	/// aggregator is a no-op.
244	pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
245		self.origin = origin.into();
246		self
247	}
248
249	/// Override the top-level prefix (default `.stats`).
250	pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
251		self.prefix = prefix.into();
252		self
253	}
254
255	/// Override the snapshot interval (default 1s).
256	pub fn with_interval(mut self, interval: Duration) -> Self {
257		self.interval = interval;
258		self
259	}
260
261	/// Set the node suffix (default none). An empty path is treated as unset.
262	pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
263		self.node = node.into();
264		self
265	}
266}
267
268impl Default for StatsConfig {
269	fn default() -> Self {
270		Self::new()
271	}
272}
273
274/// Top-level stats aggregator. Cheap to clone (`Arc` inside for the shared
275/// runtime state). One instance per relay; sessions get tier-scoped handles via
276/// [`Stats::tier`]. Build it from a [`StatsConfig`] via [`Stats::new`].
277#[derive(Clone)]
278pub struct Stats {
279	prefix: PathOwned,
280	/// `None` for a no-op aggregator (config had no origin): bumps are
281	/// dropped and no task was spawned.
282	shared: Option<Arc<StatsShared>>,
283}
284
285/// Runtime state shared by every clone of a [`Stats`] and held by the
286/// snapshot task through a `Weak`. Only allocated when an origin is set.
287struct StatsShared {
288	origin: OriginProducer,
289	entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
290}
291
292/// Per-broadcast counters split by side then tier. The two side fields are
293/// named explicitly (rather than indexed by some `Role` enum) because the
294/// bump-path call sites always know which side they're on at compile time;
295/// only the tier varies dynamically with the session.
296struct BroadcastEntry {
297	publisher: [Counters; 2],
298	subscriber: [Counters; 2],
299}
300
301impl BroadcastEntry {
302	fn new() -> Self {
303		Self {
304			publisher: Default::default(),
305			subscriber: Default::default(),
306		}
307	}
308}
309
310/// Per-(entry, slot) state owned by the snapshot task. The snapshot task
311/// is single-threaded so this needs no atomics; we keep one of these per
312/// `(path, side, tier)` in a task-local map, mirroring the structure of
313/// [`BroadcastEntry`].
314#[derive(Default)]
315struct SlotState {
316	/// Cumulative count of `inactive -> active` subscription transitions on
317	/// this slot since the snapshot task started. Resets to 0 when the
318	/// entry is GC'd from the local map (consumers must treat decreases as
319	/// a session restart).
320	derived_broadcasts: u64,
321	/// Cumulative count of `active -> inactive` transitions.
322	derived_broadcasts_closed: u64,
323	/// Last `Snapshot` we wrote to the frame for this slot, used to detect
324	/// changes that warrant re-emission and to derive `broadcasts` transitions.
325	prev_emitted: Option<Snapshot>,
326}
327
328/// Snapshot-task-local mirror of [`BroadcastEntry`]: per-side, per-tier
329/// `SlotState`. Same field layout so iteration in the snapshot loop is
330/// trivially parallel between the two.
331#[derive(Default)]
332struct EntrySnapState {
333	publisher: [SlotState; 2],
334	subscriber: [SlotState; 2],
335}
336
337impl EntrySnapState {
338	/// Iterate the four `(track_name, counters, slot_state)` slots in the
339	/// fixed order matching `TRACK_ORDER`.
340	fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
341		let [pub_ext_state, pub_int_state] = &mut self.publisher;
342		let [sub_ext_state, sub_int_state] = &mut self.subscriber;
343		[
344			("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
345			(
346				"subscriber.json",
347				&entry.subscriber[Tier::External.idx()],
348				sub_ext_state,
349			),
350			(
351				"internal/publisher.json",
352				&entry.publisher[Tier::Internal.idx()],
353				pub_int_state,
354			),
355			(
356				"internal/subscriber.json",
357				&entry.subscriber[Tier::Internal.idx()],
358				sub_int_state,
359			),
360		]
361	}
362}
363
364/// Number of `(side, tier)` slots, matching the four tracks per stats
365/// broadcast.
366const NUM_SLOTS: usize = 4;
367
368/// Track names in the same order [`EntrySnapState::zip_slots`] returns
369/// them. Used to construct the per-broadcast track set up front.
370const TRACK_ORDER: [&str; NUM_SLOTS] = [
371	"publisher.json",
372	"subscriber.json",
373	"internal/publisher.json",
374	"internal/subscriber.json",
375];
376
377impl Stats {
378	/// Build a stats aggregator from `config`.
379	///
380	/// When `config` has an origin, this spawns the snapshot task immediately
381	/// and publishes the stats broadcast; the task runs until the last [`Stats`]
382	/// clone is dropped. With no origin the aggregator is a no-op (bumps are
383	/// dropped, nothing is published) and no task spawns, so it's safe to build
384	/// outside an async runtime.
385	pub fn new(config: StatsConfig) -> Self {
386		let StatsConfig {
387			origin,
388			prefix,
389			node,
390			interval,
391		} = config;
392		// An empty path after normalization is indistinguishable from "no node
393		// set"; collapse it so downstream code only sees a single representation.
394		// We do this here (not in `with_node`) so a directly-assigned
395		// `config.node` is normalized too.
396		let node = node.filter(|p| !p.is_empty());
397
398		let shared = origin.map(|origin| {
399			let shared = Arc::new(StatsShared {
400				origin,
401				entries: Lock::default(),
402			});
403			let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
404			spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
405			shared
406		});
407
408		Self { prefix, shared }
409	}
410
411	/// Returns the configured top-level prefix.
412	pub fn prefix(&self) -> &Path<'static> {
413		&self.prefix
414	}
415
416	/// The shared state, panicking for a no-op aggregator. Tests build with an
417	/// origin so this is always present.
418	#[cfg(test)]
419	fn shared(&self) -> &Arc<StatsShared> {
420		self.shared.as_ref().expect("enabled stats aggregator")
421	}
422
423	/// Returns a tier-scoped handle. Bumps through this handle land in the
424	/// tier's counters.
425	pub fn tier(&self, tier: Tier) -> StatsHandle {
426		StatsHandle {
427			stats: self.clone(),
428			tier,
429		}
430	}
431
432	fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
433		// No-op aggregator (no origin) never allocates state.
434		let shared = self.shared.as_ref()?;
435		let path = path.as_path();
436		// Skip our own stats broadcasts (and any sibling category under the
437		// same prefix) so serving a stats broadcast doesn't generate more
438		// stats.
439		if path.has_prefix(&self.prefix) {
440			return None;
441		}
442		let owned = path.to_owned();
443		let mut entries = shared.entries.lock();
444		Some(
445			entries
446				.entry(owned)
447				.or_insert_with(|| Arc::new(BroadcastEntry::new()))
448				.clone(),
449		)
450	}
451}
452
453impl Default for Stats {
454	fn default() -> Self {
455		Self::new(StatsConfig::new())
456	}
457}
458
459/// Tier-scoped wrapper around [`Stats`]. What [`crate::Client::with_stats`] and
460/// [`crate::Server::with_stats`] accept. Cheap to clone.
461#[derive(Clone)]
462pub struct StatsHandle {
463	stats: Stats,
464	tier: Tier,
465}
466
467impl StatsHandle {
468	/// The aggregator this handle is tied to.
469	pub fn parent(&self) -> &Stats {
470		&self.stats
471	}
472
473	/// The tier this handle bumps into.
474	pub fn tier(&self) -> Tier {
475		self.tier
476	}
477
478	/// Returns a per-broadcast handle scoped to this tier.
479	///
480	/// Paths under the aggregator's configured `prefix` return an empty handle
481	/// whose bumps are no-ops. This keeps stats traffic from feeding back into
482	/// the aggregator.
483	pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
484		BroadcastStats {
485			entry: self.stats.entry(path),
486			tier: self.tier,
487		}
488	}
489}
490
491impl Default for StatsHandle {
492	/// A no-op handle backed by a [`Stats::default`] aggregator.
493	fn default() -> Self {
494		Stats::default().tier(Tier::External)
495	}
496}
497
498/// A per-broadcast, tier-scoped handle. Cheap to clone.
499///
500/// Open a broadcast-lifetime guard with [`Self::publisher`] / [`Self::subscriber`],
501/// or skip straight to a track guard with [`Self::publisher_track`] /
502/// [`Self::subscriber_track`] when the broadcast's lifetime is tracked
503/// elsewhere.
504#[derive(Clone)]
505pub struct BroadcastStats {
506	entry: Option<Arc<BroadcastEntry>>,
507	tier: Tier,
508}
509
510impl BroadcastStats {
511	/// True if this handle has no underlying entry (path was under the
512	/// aggregator's own prefix, or stats are disabled). All bumps through an
513	/// empty handle are no-ops.
514	pub fn is_empty(&self) -> bool {
515		self.entry.is_none()
516	}
517
518	/// Open a broadcast-lifetime guard for the publisher (egress) role.
519	/// Bumps `announced` on construction and `announced_closed` on drop.
520	/// (The emitted `broadcasts` counter is derived in the snapshot task
521	/// from subscription activity; see the module docs.)
522	pub fn publisher(&self) -> PublisherStats {
523		if let Some(entry) = &self.entry {
524			entry.publisher[self.tier.idx()]
525				.announced
526				.fetch_add(1, Ordering::Relaxed);
527		}
528		PublisherStats {
529			entry: self.entry.clone(),
530			tier: self.tier,
531		}
532	}
533
534	/// Open a broadcast-lifetime guard for the subscriber (ingress) role.
535	/// Bumps `announced` on construction and `announced_closed` on drop.
536	/// (The emitted `broadcasts` counter is derived in the snapshot task
537	/// from subscription activity; see the module docs.)
538	pub fn subscriber(&self) -> SubscriberStats {
539		if let Some(entry) = &self.entry {
540			entry.subscriber[self.tier.idx()]
541				.announced
542				.fetch_add(1, Ordering::Relaxed);
543		}
544		SubscriberStats {
545			entry: self.entry.clone(),
546			tier: self.tier,
547		}
548	}
549
550	/// Open a publisher-track guard.
551	///
552	/// `_name` is unused; counters are per-broadcast only. The track name
553	/// parameter is kept for symmetry with the rest of moq-net so callers
554	/// don't have to thread an `Option<&str>` through subscribe sites.
555	pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
556		if let Some(entry) = &self.entry {
557			entry.publisher[self.tier.idx()]
558				.subscriptions
559				.fetch_add(1, Ordering::Relaxed);
560		}
561		PublisherTrack {
562			entry: self.entry.clone(),
563			tier: self.tier,
564		}
565	}
566
567	/// Subscriber-side counterpart to [`Self::publisher_track`].
568	pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
569		if let Some(entry) = &self.entry {
570			entry.subscriber[self.tier.idx()]
571				.subscriptions
572				.fetch_add(1, Ordering::Relaxed);
573		}
574		SubscriberTrack {
575			entry: self.entry.clone(),
576			tier: self.tier,
577		}
578	}
579}
580
581/// RAII broadcast guard for the publisher role. See [`BroadcastStats::publisher`].
582#[must_use = "drop the guard to record the broadcast as closed"]
583pub struct PublisherStats {
584	entry: Option<Arc<BroadcastEntry>>,
585	tier: Tier,
586}
587
588impl PublisherStats {
589	/// Open a track-subscription guard. Bumps `subscriptions` on construction
590	/// and `subscriptions_closed` on drop.
591	pub fn track(&self, name: &str) -> PublisherTrack {
592		BroadcastStats {
593			entry: self.entry.clone(),
594			tier: self.tier,
595		}
596		.publisher_track(name)
597	}
598}
599
600impl Drop for PublisherStats {
601	fn drop(&mut self) {
602		if let Some(entry) = &self.entry {
603			// Release pairs with the snapshot reader's Acquire load of
604			// `announced_closed`, propagating the open-bump from this
605			// guard's construction to whichever thread observes the close.
606			entry.publisher[self.tier.idx()]
607				.announced_closed
608				.fetch_add(1, Ordering::Release);
609		}
610	}
611}
612
613/// RAII broadcast guard for the subscriber role. See [`BroadcastStats::subscriber`].
614#[must_use = "drop the guard to record the broadcast as closed"]
615pub struct SubscriberStats {
616	entry: Option<Arc<BroadcastEntry>>,
617	tier: Tier,
618}
619
620impl SubscriberStats {
621	/// Open a track-subscription guard. Mirrors [`PublisherStats::track`].
622	pub fn track(&self, name: &str) -> SubscriberTrack {
623		BroadcastStats {
624			entry: self.entry.clone(),
625			tier: self.tier,
626		}
627		.subscriber_track(name)
628	}
629}
630
631impl Drop for SubscriberStats {
632	fn drop(&mut self) {
633		if let Some(entry) = &self.entry {
634			// See `PublisherStats::drop` for why this is Release.
635			entry.subscriber[self.tier.idx()]
636				.announced_closed
637				.fetch_add(1, Ordering::Release);
638		}
639	}
640}
641
642/// RAII subscription guard for the publisher role.
643#[must_use = "drop the guard to record the subscription as closed"]
644pub struct PublisherTrack {
645	entry: Option<Arc<BroadcastEntry>>,
646	tier: Tier,
647}
648
649impl PublisherTrack {
650	/// Bumps `frames` once.
651	pub fn frame(&self) {
652		if let Some(entry) = &self.entry {
653			entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
654		}
655	}
656
657	/// Bumps `bytes` by `n`.
658	pub fn bytes(&self, n: u64) {
659		if let Some(entry) = &self.entry {
660			entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
661		}
662	}
663
664	/// Bumps `groups` once.
665	pub fn group(&self) {
666		if let Some(entry) = &self.entry {
667			entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
668		}
669	}
670}
671
672impl Drop for PublisherTrack {
673	fn drop(&mut self) {
674		if let Some(entry) = &self.entry {
675			// See `PublisherStats::drop` for why this is Release.
676			entry.publisher[self.tier.idx()]
677				.subscriptions_closed
678				.fetch_add(1, Ordering::Release);
679		}
680	}
681}
682
683/// RAII subscription guard for the subscriber role.
684#[must_use = "drop the guard to record the subscription as closed"]
685pub struct SubscriberTrack {
686	entry: Option<Arc<BroadcastEntry>>,
687	tier: Tier,
688}
689
690impl SubscriberTrack {
691	/// Bumps `frames` once.
692	pub fn frame(&self) {
693		if let Some(entry) = &self.entry {
694			entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
695		}
696	}
697
698	/// Bumps `bytes` by `n`.
699	pub fn bytes(&self, n: u64) {
700		if let Some(entry) = &self.entry {
701			entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
702		}
703	}
704
705	/// Bumps `groups` once.
706	pub fn group(&self) {
707		if let Some(entry) = &self.entry {
708			entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
709		}
710	}
711}
712
713impl Drop for SubscriberTrack {
714	fn drop(&mut self) {
715		if let Some(entry) = &self.entry {
716			// See `PublisherStats::drop` for why this is Release.
717			entry.subscriber[self.tier.idx()]
718				.subscriptions_closed
719				.fetch_add(1, Ordering::Release);
720		}
721	}
722}
723
724/// Per-tick work for a single `(side, tier)` slot: derive `broadcasts` /
725/// `broadcasts_closed` from subscription transitions, build the emitted
726/// `Snapshot`, update the slot's `prev_emitted`, and hand the snap to `emit`
727/// iff the slot is live or changed this tick.
728fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
729	let raw = counters.snapshot();
730
731	// Derive `broadcasts` / `broadcasts_closed` from subscription-active
732	// transitions observed across ticks. `delta_subs > 0` catches the case
733	// where a sub opened AND closed within a single tick window so the
734	// snapshot shows `subs == subs_closed` (curr_active false) but real
735	// activity happened. Such flickers count as a full open/close pair.
736	let (prev_subs, prev_subs_closed, prev_broadcasts, prev_broadcasts_closed) = match &slot_state.prev_emitted {
737		Some(prev) => (
738			prev.subscriptions,
739			prev.subscriptions_closed,
740			prev.broadcasts,
741			prev.broadcasts_closed,
742		),
743		None => (0, 0, 0, 0),
744	};
745	let prev_active = prev_subs > prev_subs_closed;
746	let curr_active = raw.subscriptions > raw.subscriptions_closed;
747	let delta_subs = raw.subscriptions.saturating_sub(prev_subs);
748	let active_during = prev_active || curr_active || delta_subs > 0;
749
750	if !prev_active && active_during {
751		slot_state.derived_broadcasts = prev_broadcasts.saturating_add(1);
752	}
753	if active_during && !curr_active {
754		slot_state.derived_broadcasts_closed = prev_broadcasts_closed.saturating_add(1);
755	}
756
757	let snap = Snapshot {
758		announced: raw.announced,
759		announced_closed: raw.announced_closed,
760		broadcasts: slot_state.derived_broadcasts,
761		broadcasts_closed: slot_state.derived_broadcasts_closed,
762		subscriptions: raw.subscriptions,
763		subscriptions_closed: raw.subscriptions_closed,
764		bytes: raw.bytes,
765		frames: raw.frames,
766		groups: raw.groups,
767	};
768
769	// A slot is live while any open counter still exceeds its `*_closed`
770	// counterpart: a guard is held, so a subscription could begin at any
771	// moment. Live slots are emitted every tick so a downstream "currently
772	// active" view always sees the full set. Once every pair is equal no
773	// traffic can flow and the entry is on its way out (the global GC drops
774	// it as soon as the last guard releases its `Arc`).
775	let live = snap.announced != snap.announced_closed
776		|| snap.subscriptions != snap.subscriptions_closed
777		|| snap.broadcasts != snap.broadcasts_closed;
778
779	// Include the entry whenever it's live OR its snapshot changed this
780	// tick. Change-driven inclusion catches bumps since the previous tick
781	// (incl. sub-tick flickers) and emits the final close snapshot on the
782	// tick a slot transitions to fully closed.
783	//
784	// `None` (slot never emitted) is treated as the default Snapshot so a
785	// first-tick all-zeros snap on an unused tier-side slot doesn't count
786	// as a "change". Without this, every entry would surface in all four
787	// tracks with zeros on the tick after creation even if only one slot
788	// is actually in use.
789	let prev_snap = slot_state.prev_emitted.unwrap_or_default();
790	let changed = snap != prev_snap;
791	if changed {
792		slot_state.prev_emitted = Some(snap);
793	}
794	if live || changed {
795		emit(snap);
796	}
797}
798
799/// Publishes the stats broadcast and writes a frame per tick. Spawned once by
800/// [`Stats::new`] when an origin is set; runs until every [`Stats`] clone is
801/// dropped (`weak.upgrade()` returns `None`).
802async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
803	let Some(shared) = weak.upgrade() else {
804		return;
805	};
806
807	let mut broadcast = Broadcast::new().produce();
808	let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
809	for name in TRACK_ORDER {
810		match broadcast.create_track(Track {
811			name: name.into(),
812			priority: 0,
813		}) {
814			Ok(t) => tracks.push(t),
815			Err(err) => {
816				tracing::warn!(?err, name, "stats: failed to create track");
817				return;
818			}
819		}
820	}
821	if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
822		tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
823		return;
824	}
825	drop(shared);
826
827	// Per-path snapshot state owned by this task. Mirrors the global entries
828	// and serves as the diff source for change detection and `broadcasts`
829	// derivation across ticks.
830	let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
831	let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
832
833	let mut ticker = tokio::time::interval(interval);
834	ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
835
836	loop {
837		ticker.tick().await;
838
839		let Some(shared) = weak.upgrade() else {
840			return;
841		};
842
843		// Clone the current entries map into a Vec so we can drop the
844		// global lock before the change-detection pass.
845		let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
846			let map = shared.entries.lock();
847			map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
848		};
849
850		let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
851		for (path, entry) in &entries {
852			let snap_state = local.entry(path.clone()).or_default();
853			for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
854				process_slot(counters, slot_state, |snap| {
855					frames[i].insert(path.as_str().to_string(), snap);
856				});
857			}
858		}
859		drop(entries);
860
861		// GC global entries: keep only those an external guard still holds.
862		// `strong_count == 1` (just the map's own `Arc`) means no live
863		// publisher/subscriber/track guard remains, so every open counter
864		// has caught up to its `*_closed` counterpart and no traffic can
865		// flow. We can't key this on the counters directly: a held but idle
866		// `BroadcastStats` (all counters equal) must stay so a later bump
867		// isn't lost on an orphaned `Arc`. Then drop local state for any
868		// path that left the map. We already emitted each removed entry's
869		// final snapshot above, so nothing is lost.
870		{
871			let mut map = shared.entries.lock();
872			map.retain(|_, entry| Arc::strong_count(entry) > 1);
873			local.retain(|path, _| map.contains_key(path));
874		}
875
876		for (((frame, last), track), slot) in frames
877			.iter()
878			.zip(last_payload.iter_mut())
879			.zip(tracks.iter_mut())
880			.zip(0usize..)
881		{
882			let json = match serde_json::to_vec(frame) {
883				Ok(b) => b,
884				Err(err) => {
885					tracing::debug!(?err, slot, "stats: failed to serialize frame");
886					continue;
887				}
888			};
889			if &json == last {
890				continue;
891			}
892			if let Err(err) = track.write_frame(json.clone()) {
893				tracing::debug!(?err, slot, "stats: failed to write frame");
894				// Leave `last_payload` untouched so the next tick retries this
895				// snapshot instead of skipping it as "already written".
896				continue;
897			}
898			*last = json;
899		}
900
901		drop(shared);
902	}
903}
904
905/// What we emit for one entry on one tier-role track. `announced` /
906/// `announced_closed` and the subscription / payload counters come straight
907/// from [`RawCounts`]; `broadcasts` / `broadcasts_closed` are derived in
908/// the snapshot task from observed subscription transitions.
909#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
910#[cfg_attr(test, derive(serde::Deserialize))]
911struct Snapshot {
912	announced: u64,
913	announced_closed: u64,
914	broadcasts: u64,
915	broadcasts_closed: u64,
916	subscriptions: u64,
917	subscriptions_closed: u64,
918	bytes: u64,
919	frames: u64,
920	groups: u64,
921}
922
923fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
924	// The fixed `node` category leaves room for sibling categories (e.g.
925	// `<top-prefix>/cluster` for relay-mesh stats) under the same prefix.
926	let mut out = format!("{}/node", prefix.as_str());
927	if let Some(node) = node {
928		out.push('/');
929		out.push_str(node);
930	}
931	PathOwned::from(out)
932}
933
934#[cfg(test)]
935mod tests {
936	use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
937
938	use crate::{Origin, Path};
939
940	use super::*;
941
942	fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
943		let origin = Origin::random().produce();
944		let stats = Stats::new(
945			StatsConfig::new()
946				.with_origin(origin.clone())
947				.with_node(node.map(|s| PathOwned::from(s.to_string()))),
948		);
949		(stats, origin)
950	}
951
952	#[test]
953	fn advertised_path_with_and_without_node() {
954		let prefix = Path::new(".stats");
955		assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
956		assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
957		assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
958
959		let prefix = Path::new("metrics");
960		assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
961	}
962
963	/// The advertised path normalizes a messy node suffix and drops an
964	/// all-empty one. Observed through the announced path, since the task
965	/// announces at construction.
966	async fn announced_path_for_node(node: &str) -> String {
967		let origin = Origin::random().produce();
968		let _stats = Stats::new(
969			StatsConfig::new()
970				.with_origin(origin.clone())
971				.with_node(PathOwned::from(node.to_string())),
972		);
973		let mut consumer = origin.consume();
974		tokio::time::advance(Duration::from_millis(1)).await;
975		let (path, _broadcast) = consumer.announced().await.expect("expected announce");
976		path.as_str().to_string()
977	}
978
979	#[tokio::test(start_paused = true)]
980	async fn new_normalizes_and_drops_empty_node() {
981		assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
982		assert_eq!(announced_path_for_node("///").await, ".stats/node");
983	}
984
985	#[tokio::test(start_paused = true)]
986	async fn per_broadcast_counters_isolated() {
987		// Bumps on one broadcast must not leak into another.
988		let (stats, _origin) = test_stats(Some("sjc"));
989		let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
990		let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
991		let g1 = bs1.publisher().track("video");
992		g1.bytes(100);
993		let g2 = bs2.publisher().track("video");
994		g2.bytes(7);
995
996		let entries = stats.shared().entries.lock();
997		let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
998		let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
999		assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1000		assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
1001	}
1002
1003	#[tokio::test(start_paused = true)]
1004	async fn external_and_internal_tiers_are_independent() {
1005		let (stats, _origin) = test_stats(Some("sjc"));
1006		let ext = stats.tier(Tier::External);
1007		let int = stats.tier(Tier::Internal);
1008
1009		let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
1010		ext_track.bytes(100);
1011		let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
1012		int_track.bytes(7);
1013
1014		let entries = stats.shared().entries.lock();
1015		let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
1016		assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
1017		assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
1018		assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
1019		assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
1020	}
1021
1022	#[tokio::test(start_paused = true)]
1023	async fn paths_under_prefix_are_no_op() {
1024		// Our own stats broadcasts (and any sibling category under the same
1025		// prefix) must not feed back into the aggregator.
1026		let (stats, _origin) = test_stats(Some("sjc"));
1027		let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
1028		assert!(bs.is_empty());
1029		let p = bs.publisher();
1030		let track = p.track("video");
1031		track.bytes(100);
1032		drop(track);
1033		drop(p);
1034		assert!(stats.shared().entries.lock().is_empty());
1035	}
1036
1037	#[tokio::test(start_paused = true)]
1038	async fn disabled_stats_are_noop() {
1039		// A no-op aggregator (no origin) allocates no shared state and never
1040		// announces; every handle is empty and bumps are dropped.
1041		let stats = Stats::default();
1042		assert!(stats.shared.is_none());
1043		let bs = stats.tier(Tier::External).broadcast("demo/bbb");
1044		assert!(bs.is_empty());
1045		let p = bs.publisher();
1046		let track = p.track("video");
1047		track.bytes(100);
1048		drop(track);
1049		drop(p);
1050	}
1051
1052	#[tokio::test(start_paused = true)]
1053	async fn single_broadcast_path_announced() {
1054		// No matter how many broadcasts get bumped, exactly one stats
1055		// broadcast is announced (the per-node aggregate).
1056		let (stats, origin) = test_stats(Some("sjc/1"));
1057		let mut consumer = origin.consume();
1058
1059		let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
1060		let _t1 = bs1.publisher().track("video");
1061		let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
1062		let _t2 = bs2.publisher().track("video");
1063
1064		tokio::time::advance(Duration::from_millis(1)).await;
1065		let (path, broadcast) = consumer.announced().await.expect("expected announce");
1066		assert!(broadcast.is_some());
1067		assert_eq!(path.as_str(), ".stats/node/sjc/1");
1068	}
1069
1070	#[tokio::test(start_paused = true)]
1071	async fn task_announces_without_node_suffix() {
1072		let origin = Origin::random().produce();
1073		let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
1074		let mut consumer = origin.consume();
1075
1076		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1077		let _t = bs.publisher().track("video");
1078
1079		tokio::time::advance(Duration::from_millis(1)).await;
1080		let (path, broadcast) = consumer.announced().await.expect("expected announce");
1081		assert!(broadcast.is_some());
1082		assert_eq!(path.as_str(), ".stats/node");
1083	}
1084
1085	/// Drives the snapshot task forward by `count` ticks. In paused-time
1086	/// tests, `tokio::time::advance` doesn't poll spawned tasks itself; we
1087	/// have to combine it with explicit awaits. This helper interleaves
1088	/// `advance` with `consumer.announced()` (and later `yield_now` calls)
1089	/// so the task wakes, processes the tick, and re-parks each iteration.
1090	async fn drive_ticks(count: u32) {
1091		for _ in 0..count {
1092			tokio::time::advance(Duration::from_secs(1)).await;
1093			// Yield several times to let the task wake, snapshot, write the
1094			// frame, and re-await the next tick.
1095			for _ in 0..4 {
1096				tokio::task::yield_now().await;
1097			}
1098		}
1099	}
1100
1101	#[tokio::test(start_paused = true)]
1102	async fn live_entry_kept_while_idle() {
1103		// A broadcast with a live announce guard but no traffic must stay in
1104		// the map indefinitely: announced != announced_closed means a
1105		// subscription could still begin at any moment.
1106		let (stats, _origin) = test_stats(Some("sjc"));
1107		let key = PathOwned::from("foo/bar".to_string());
1108		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1109		let guard = bs.publisher();
1110
1111		drive_ticks(5).await;
1112		assert!(
1113			stats.shared().entries.lock().contains_key(&key),
1114			"announced-but-idle broadcast must stay while the guard is held"
1115		);
1116
1117		drop(guard);
1118		drop(bs);
1119		// announced == announced_closed now, and no guard holds the Arc, so
1120		// the entry is dropped on the next tick.
1121		drive_ticks(1).await;
1122		assert!(
1123			!stats.shared().entries.lock().contains_key(&key),
1124			"entry dropped once the announce guard closes"
1125		);
1126	}
1127
1128	#[tokio::test(start_paused = true)]
1129	async fn entry_dropped_once_fully_closed() {
1130		// Once every open counter equals its `*_closed` counterpart and no
1131		// guard holds the Arc, the entry is removed the very next tick.
1132		let (stats, _origin) = test_stats(Some("sjc"));
1133		let key = PathOwned::from("foo/bar".to_string());
1134		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1135		let track = bs.publisher().track("video");
1136
1137		drive_ticks(1).await;
1138		assert!(
1139			stats.shared().entries.lock().contains_key(&key),
1140			"live entry present while the track guard is held"
1141		);
1142
1143		drop(track);
1144		drop(bs);
1145		drive_ticks(1).await;
1146		assert!(
1147			!stats.shared().entries.lock().contains_key(&key),
1148			"fully-closed entry dropped on the next tick"
1149		);
1150	}
1151
1152	#[tokio::test(start_paused = true)]
1153	async fn frame_emits_expected_counters() {
1154		let (stats, origin) = test_stats(Some("sjc"));
1155		let mut consumer = origin.consume();
1156		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1157		let track = bs.publisher().track("video");
1158		track.bytes(42);
1159		track.frame();
1160
1161		tokio::time::advance(Duration::from_millis(1100)).await;
1162
1163		let (_path, broadcast) = consumer.announced().await.expect("expected announce");
1164		let broadcast = broadcast.expect("active");
1165		let track = broadcast
1166			.subscribe_track(&Track {
1167				name: "publisher.json".into(),
1168				priority: 0,
1169			})
1170			.expect("subscribe");
1171		let frame = read_frame(track).await;
1172		let snap = frame.get("foo/bar").expect("foo/bar entry");
1173		assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
1174		assert_eq!(snap.broadcasts, 1, "subs went 0->1, derived broadcasts++");
1175		assert_eq!(snap.subscriptions, 1);
1176		assert_eq!(snap.bytes, 42);
1177		assert_eq!(snap.frames, 1);
1178	}
1179
1180	#[tokio::test(start_paused = true)]
1181	async fn announced_decouples_from_broadcasts() {
1182		// publisher() with no track subscription should bump announced but
1183		// NOT broadcasts (which only counts slots with sub activity).
1184		let (stats, origin) = test_stats(Some("sjc"));
1185		let mut consumer = origin.consume();
1186		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1187		let _guard = bs.publisher();
1188
1189		tokio::time::advance(Duration::from_millis(1100)).await;
1190
1191		let (_path, broadcast) = consumer.announced().await.expect("announce");
1192		let broadcast = broadcast.expect("active");
1193		let track = broadcast
1194			.subscribe_track(&Track {
1195				name: "publisher.json".into(),
1196				priority: 0,
1197			})
1198			.expect("subscribe");
1199		let frame = read_frame(track).await;
1200		let snap = frame.get("foo/bar").expect("foo/bar entry");
1201		assert_eq!(snap.announced, 1);
1202		assert_eq!(snap.broadcasts, 0, "no sub, no derived broadcasts");
1203		assert_eq!(snap.subscriptions, 0);
1204	}
1205
1206	#[tokio::test(start_paused = true)]
1207	async fn short_lived_sub_is_surfaced() {
1208		// A subscription that opens AND closes within a single tick window
1209		// must still surface as a complete broadcasts open/close cycle.
1210		// Before the change-driven inclusion fix, this entry would never
1211		// have appeared in any frame and would have been GC'd silently.
1212		let (stats, origin) = test_stats(Some("sjc"));
1213		let mut consumer = origin.consume();
1214		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1215		{
1216			let track = bs.publisher().track("video");
1217			track.bytes(123);
1218			track.frame();
1219			// track dropped here, all within tick 1
1220		}
1221
1222		tokio::time::advance(Duration::from_millis(1100)).await;
1223
1224		let (_path, broadcast) = consumer.announced().await.expect("announce");
1225		let broadcast = broadcast.expect("active");
1226		let track = broadcast
1227			.subscribe_track(&Track {
1228				name: "publisher.json".into(),
1229				priority: 0,
1230			})
1231			.expect("subscribe");
1232		let frame = read_frame(track).await;
1233		let snap = frame.get("foo/bar").expect("foo/bar entry");
1234		// subs went 0->1->0 within the same tick. delta_subs > 0 triggers
1235		// both broadcasts++ and broadcasts_closed++.
1236		assert_eq!(snap.subscriptions, 1);
1237		assert_eq!(snap.subscriptions_closed, 1);
1238		assert_eq!(snap.broadcasts, 1, "flicker counts as one broadcast");
1239		assert_eq!(snap.broadcasts_closed, 1);
1240		assert_eq!(snap.bytes, 123);
1241		assert_eq!(snap.frames, 1);
1242	}
1243
1244	#[tokio::test(start_paused = true)]
1245	async fn multiple_subs_count_as_one_broadcast() {
1246		// Two concurrent subs on the same slot should bump broadcasts by 1,
1247		// not 2. broadcasts is "broadcasts that had >=1 active sub" not
1248		// "subscription count". And dropping both subs should bump
1249		// broadcasts_closed by 1 (the 1->0 transition is one event).
1250		let (stats, _origin) = test_stats(Some("sjc"));
1251		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1252		let pub_guard = bs.publisher();
1253		let t1 = pub_guard.track("video");
1254		let t2 = pub_guard.track("audio");
1255
1256		drive_ticks(2).await;
1257		{
1258			let entries = stats.shared().entries.lock();
1259			let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
1260			let raw = entry.publisher[Tier::External.idx()].snapshot();
1261			assert_eq!(raw.subscriptions, 2, "two track subs");
1262			assert_eq!(raw.subscriptions_closed, 0, "neither dropped yet");
1263		}
1264
1265		// Drop only the track guards; keep `pub_guard` (and `bs`) so the
1266		// broadcast stays announced (live) and the entry isn't GC'd before
1267		// we can read the raw counters.
1268		drop(t1);
1269		drop(t2);
1270		drive_ticks(1).await;
1271
1272		// All subs dropped: subscriptions_closed catches up to subscriptions.
1273		// The snapshot task observes the 1->0 transition; derived
1274		// broadcasts_closed (which lives in task-local state, not on the
1275		// global entry) gets bumped to 1, but we can only see that through
1276		// the wire frame. Here we just confirm the raw counters squared up.
1277		let entries = stats.shared().entries.lock();
1278		let entry = entries
1279			.get(&PathOwned::from("foo/bar"))
1280			.expect("entry still live (publisher guard held)");
1281		let raw = entry.publisher[Tier::External.idx()].snapshot();
1282		assert_eq!(raw.subscriptions, 2);
1283		assert_eq!(raw.subscriptions_closed, 2, "both dropped");
1284	}
1285
1286	#[tokio::test(start_paused = true)]
1287	async fn unused_slots_dont_surface() {
1288		// A broadcast that only sees External Publisher traffic must NOT
1289		// appear in the other three tracks with zero counters. Regression
1290		// for the "None != Some(default)" first-tick change-detection bug:
1291		// without the unwrap_or_default fix, every entry would surface
1292		// once in every track even when only one slot had real activity.
1293		let (stats, origin) = test_stats(Some("sjc"));
1294		let mut consumer = origin.consume();
1295		let bs = stats.tier(Tier::External).broadcast("foo/bar");
1296		let track = bs.publisher().track("video");
1297		track.frame();
1298
1299		drive_ticks(2).await;
1300
1301		let (_path, broadcast) = consumer.announced().await.expect("announce");
1302		let broadcast = broadcast.expect("active");
1303
1304		// External publisher slot SHOULD include foo/bar.
1305		let pub_track = broadcast
1306			.subscribe_track(&Track {
1307				name: "publisher.json".into(),
1308				priority: 0,
1309			})
1310			.expect("subscribe");
1311		assert!(
1312			read_frame(pub_track).await.contains_key("foo/bar"),
1313			"publisher.json must include the active foo/bar entry"
1314		);
1315
1316		// The other three slots had zero activity. The first frame on
1317		// each must be `{}`, not `{"foo/bar": {all zeros}}`.
1318		for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
1319			let t = broadcast
1320				.subscribe_track(&Track {
1321					name: name.into(),
1322					priority: 0,
1323				})
1324				.expect("subscribe");
1325			let frame = read_frame(t).await;
1326			assert!(
1327				frame.is_empty(),
1328				"{name} must be empty for an entry with no activity on that slot, got {frame:?}",
1329			);
1330		}
1331	}
1332
1333	#[test]
1334	fn snapshot_reads_closed_before_open() {
1335		// Reading closed counters before their open counterparts is the
1336		// guarantee that the emitted Snapshot never shows close > open
1337		// under concurrent bumps. This unit-test pins the ordering at the
1338		// source level so a future refactor that re-orders the loads
1339		// trips the test.
1340		let src = include_str!("stats.rs");
1341		// Find the body of `impl Counters { fn snapshot(...) ... }` and
1342		// check the line order.
1343		let body_start = src
1344			.find("fn snapshot(&self) -> RawCounts")
1345			.expect("snapshot fn present");
1346		let body = &src[body_start..];
1347		let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
1348		let open_pos = body.find("self.announced.load(").expect("announced load");
1349		assert!(
1350			closed_pos < open_pos,
1351			"announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
1352		);
1353		let subs_closed_pos = body
1354			.find("self.subscriptions_closed.load")
1355			.expect("subscriptions_closed load");
1356		let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
1357		assert!(
1358			subs_closed_pos < subs_pos,
1359			"subscriptions_closed must be loaded before subscriptions",
1360		);
1361	}
1362
1363	async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
1364		let bytes = track.read_frame().await.expect("ok").expect("frame");
1365		serde_json::from_slice(&bytes).expect("json parse")
1366	}
1367}