use std::{
collections::{BTreeMap, HashMap},
sync::{
Arc, Weak,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use serde::Serialize;
use web_async::{Lock, spawn};
use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct Counters {
pub announced: AtomicU64,
pub announced_closed: AtomicU64,
pub subscriptions: AtomicU64,
pub subscriptions_closed: AtomicU64,
pub bytes: AtomicU64,
pub frames: AtomicU64,
pub groups: AtomicU64,
}
impl Counters {
fn snapshot(&self) -> RawCounts {
let announced_closed = self.announced_closed.load(Ordering::Acquire);
let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
let announced = self.announced.load(Ordering::Relaxed);
let subscriptions = self.subscriptions.load(Ordering::Relaxed);
let bytes = self.bytes.load(Ordering::Relaxed);
let frames = self.frames.load(Ordering::Relaxed);
let groups = self.groups.load(Ordering::Relaxed);
RawCounts {
announced,
announced_closed,
subscriptions,
subscriptions_closed,
bytes,
frames,
groups,
}
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
struct RawCounts {
announced: u64,
announced_closed: u64,
subscriptions: u64,
subscriptions_closed: u64,
bytes: u64,
frames: u64,
groups: u64,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Tier {
External,
Internal,
}
impl Tier {
fn idx(self) -> usize {
match self {
Tier::External => 0,
Tier::Internal => 1,
}
}
}
#[derive(Clone)]
#[non_exhaustive]
pub struct StatsConfig {
pub origin: Option<OriginProducer>,
pub prefix: PathOwned,
pub node: Option<PathOwned>,
pub interval: Duration,
}
impl StatsConfig {
pub fn new() -> Self {
Self {
origin: None,
prefix: PathOwned::from(".stats"),
node: None,
interval: Duration::from_secs(1),
}
}
pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
self.origin = origin.into();
self
}
pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
self.prefix = prefix.into();
self
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
self.node = node.into();
self
}
}
impl Default for StatsConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Stats {
prefix: PathOwned,
shared: Option<Arc<StatsShared>>,
}
struct StatsShared {
origin: OriginProducer,
entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
}
struct BroadcastEntry {
publisher: [Counters; 2],
subscriber: [Counters; 2],
}
impl BroadcastEntry {
fn new() -> Self {
Self {
publisher: Default::default(),
subscriber: Default::default(),
}
}
}
#[derive(Default)]
struct SlotState {
derived_broadcasts: u64,
derived_broadcasts_closed: u64,
prev_emitted: Option<Snapshot>,
}
#[derive(Default)]
struct EntrySnapState {
publisher: [SlotState; 2],
subscriber: [SlotState; 2],
}
impl EntrySnapState {
fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
let [pub_ext_state, pub_int_state] = &mut self.publisher;
let [sub_ext_state, sub_int_state] = &mut self.subscriber;
[
("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
(
"subscriber.json",
&entry.subscriber[Tier::External.idx()],
sub_ext_state,
),
(
"internal/publisher.json",
&entry.publisher[Tier::Internal.idx()],
pub_int_state,
),
(
"internal/subscriber.json",
&entry.subscriber[Tier::Internal.idx()],
sub_int_state,
),
]
}
}
const NUM_SLOTS: usize = 4;
const TRACK_ORDER: [&str; NUM_SLOTS] = [
"publisher.json",
"subscriber.json",
"internal/publisher.json",
"internal/subscriber.json",
];
impl Stats {
pub fn new(config: StatsConfig) -> Self {
let StatsConfig {
origin,
prefix,
node,
interval,
} = config;
let node = node.filter(|p| !p.is_empty());
let shared = origin.map(|origin| {
let shared = Arc::new(StatsShared {
origin,
entries: Lock::default(),
});
let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
shared
});
Self { prefix, shared }
}
pub fn prefix(&self) -> &Path<'static> {
&self.prefix
}
#[cfg(test)]
fn shared(&self) -> &Arc<StatsShared> {
self.shared.as_ref().expect("enabled stats aggregator")
}
pub fn tier(&self, tier: Tier) -> StatsHandle {
StatsHandle {
stats: self.clone(),
tier,
}
}
fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
let shared = self.shared.as_ref()?;
let path = path.as_path();
if path.has_prefix(&self.prefix) {
return None;
}
let owned = path.to_owned();
let mut entries = shared.entries.lock();
Some(
entries
.entry(owned)
.or_insert_with(|| Arc::new(BroadcastEntry::new()))
.clone(),
)
}
}
impl Default for Stats {
fn default() -> Self {
Self::new(StatsConfig::new())
}
}
#[derive(Clone)]
pub struct StatsHandle {
stats: Stats,
tier: Tier,
}
impl StatsHandle {
pub fn parent(&self) -> &Stats {
&self.stats
}
pub fn tier(&self) -> Tier {
self.tier
}
pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
BroadcastStats {
entry: self.stats.entry(path),
tier: self.tier,
}
}
}
impl Default for StatsHandle {
fn default() -> Self {
Stats::default().tier(Tier::External)
}
}
#[derive(Clone)]
pub struct BroadcastStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl BroadcastStats {
pub fn is_empty(&self) -> bool {
self.entry.is_none()
}
pub fn publisher(&self) -> PublisherStats {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.announced
.fetch_add(1, Ordering::Relaxed);
}
PublisherStats {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn subscriber(&self) -> SubscriberStats {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.announced
.fetch_add(1, Ordering::Relaxed);
}
SubscriberStats {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.subscriptions
.fetch_add(1, Ordering::Relaxed);
}
PublisherTrack {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.subscriptions
.fetch_add(1, Ordering::Relaxed);
}
SubscriberTrack {
entry: self.entry.clone(),
tier: self.tier,
}
}
}
#[must_use = "drop the guard to record the broadcast as closed"]
pub struct PublisherStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl PublisherStats {
pub fn track(&self, name: &str) -> PublisherTrack {
BroadcastStats {
entry: self.entry.clone(),
tier: self.tier,
}
.publisher_track(name)
}
}
impl Drop for PublisherStats {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.announced_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the broadcast as closed"]
pub struct SubscriberStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl SubscriberStats {
pub fn track(&self, name: &str) -> SubscriberTrack {
BroadcastStats {
entry: self.entry.clone(),
tier: self.tier,
}
.subscriber_track(name)
}
}
impl Drop for SubscriberStats {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.announced_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the subscription as closed"]
pub struct PublisherTrack {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl PublisherTrack {
pub fn frame(&self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
}
}
pub fn bytes(&self, n: u64) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
}
}
pub fn group(&self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Drop for PublisherTrack {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.subscriptions_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the subscription as closed"]
pub struct SubscriberTrack {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl SubscriberTrack {
pub fn frame(&self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
}
}
pub fn bytes(&self, n: u64) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
}
}
pub fn group(&self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Drop for SubscriberTrack {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.subscriptions_closed
.fetch_add(1, Ordering::Release);
}
}
}
fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
let raw = counters.snapshot();
let (prev_subs, prev_subs_closed, prev_broadcasts, prev_broadcasts_closed) = match &slot_state.prev_emitted {
Some(prev) => (
prev.subscriptions,
prev.subscriptions_closed,
prev.broadcasts,
prev.broadcasts_closed,
),
None => (0, 0, 0, 0),
};
let prev_active = prev_subs > prev_subs_closed;
let curr_active = raw.subscriptions > raw.subscriptions_closed;
let delta_subs = raw.subscriptions.saturating_sub(prev_subs);
let active_during = prev_active || curr_active || delta_subs > 0;
if !prev_active && active_during {
slot_state.derived_broadcasts = prev_broadcasts.saturating_add(1);
}
if active_during && !curr_active {
slot_state.derived_broadcasts_closed = prev_broadcasts_closed.saturating_add(1);
}
let snap = Snapshot {
announced: raw.announced,
announced_closed: raw.announced_closed,
broadcasts: slot_state.derived_broadcasts,
broadcasts_closed: slot_state.derived_broadcasts_closed,
subscriptions: raw.subscriptions,
subscriptions_closed: raw.subscriptions_closed,
bytes: raw.bytes,
frames: raw.frames,
groups: raw.groups,
};
let live = snap.announced != snap.announced_closed
|| snap.subscriptions != snap.subscriptions_closed
|| snap.broadcasts != snap.broadcasts_closed;
let prev_snap = slot_state.prev_emitted.unwrap_or_default();
let changed = snap != prev_snap;
if changed {
slot_state.prev_emitted = Some(snap);
}
if live || changed {
emit(snap);
}
}
async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
let Some(shared) = weak.upgrade() else {
return;
};
let mut broadcast = Broadcast::new().produce();
let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
for name in TRACK_ORDER {
match broadcast.create_track(Track {
name: name.into(),
priority: 0,
}) {
Ok(t) => tracks.push(t),
Err(err) => {
tracing::warn!(?err, name, "stats: failed to create track");
return;
}
}
}
if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
return;
}
drop(shared);
let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let Some(shared) = weak.upgrade() else {
return;
};
let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
let map = shared.entries.lock();
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
for (path, entry) in &entries {
let snap_state = local.entry(path.clone()).or_default();
for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
process_slot(counters, slot_state, |snap| {
frames[i].insert(path.as_str().to_string(), snap);
});
}
}
drop(entries);
{
let mut map = shared.entries.lock();
map.retain(|_, entry| Arc::strong_count(entry) > 1);
local.retain(|path, _| map.contains_key(path));
}
for (((frame, last), track), slot) in frames
.iter()
.zip(last_payload.iter_mut())
.zip(tracks.iter_mut())
.zip(0usize..)
{
let json = match serde_json::to_vec(frame) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, slot, "stats: failed to serialize frame");
continue;
}
};
if &json == last {
continue;
}
if let Err(err) = track.write_frame(json.clone()) {
tracing::debug!(?err, slot, "stats: failed to write frame");
continue;
}
*last = json;
}
drop(shared);
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
#[cfg_attr(test, derive(serde::Deserialize))]
struct Snapshot {
announced: u64,
announced_closed: u64,
broadcasts: u64,
broadcasts_closed: u64,
subscriptions: u64,
subscriptions_closed: u64,
bytes: u64,
frames: u64,
groups: u64,
}
fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
let mut out = format!("{}/node", prefix.as_str());
if let Some(node) = node {
out.push('/');
out.push_str(node);
}
PathOwned::from(out)
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
use crate::{Origin, Path};
use super::*;
fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
let origin = Origin::random().produce();
let stats = Stats::new(
StatsConfig::new()
.with_origin(origin.clone())
.with_node(node.map(|s| PathOwned::from(s.to_string()))),
);
(stats, origin)
}
#[test]
fn advertised_path_with_and_without_node() {
let prefix = Path::new(".stats");
assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
let prefix = Path::new("metrics");
assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
}
async fn announced_path_for_node(node: &str) -> String {
let origin = Origin::random().produce();
let _stats = Stats::new(
StatsConfig::new()
.with_origin(origin.clone())
.with_node(PathOwned::from(node.to_string())),
);
let mut consumer = origin.consume();
tokio::time::advance(Duration::from_millis(1)).await;
let (path, _broadcast) = consumer.announced().await.expect("expected announce");
path.as_str().to_string()
}
#[tokio::test(start_paused = true)]
async fn new_normalizes_and_drops_empty_node() {
assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
assert_eq!(announced_path_for_node("///").await, ".stats/node");
}
#[tokio::test(start_paused = true)]
async fn per_broadcast_counters_isolated() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
let g1 = bs1.publisher().track("video");
g1.bytes(100);
let g2 = bs2.publisher().track("video");
g2.bytes(7);
let entries = stats.shared().entries.lock();
let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
}
#[tokio::test(start_paused = true)]
async fn external_and_internal_tiers_are_independent() {
let (stats, _origin) = test_stats(Some("sjc"));
let ext = stats.tier(Tier::External);
let int = stats.tier(Tier::Internal);
let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
ext_track.bytes(100);
let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
int_track.bytes(7);
let entries = stats.shared().entries.lock();
let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
}
#[tokio::test(start_paused = true)]
async fn paths_under_prefix_are_no_op() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
assert!(bs.is_empty());
let p = bs.publisher();
let track = p.track("video");
track.bytes(100);
drop(track);
drop(p);
assert!(stats.shared().entries.lock().is_empty());
}
#[tokio::test(start_paused = true)]
async fn disabled_stats_are_noop() {
let stats = Stats::default();
assert!(stats.shared.is_none());
let bs = stats.tier(Tier::External).broadcast("demo/bbb");
assert!(bs.is_empty());
let p = bs.publisher();
let track = p.track("video");
track.bytes(100);
drop(track);
drop(p);
}
#[tokio::test(start_paused = true)]
async fn single_broadcast_path_announced() {
let (stats, origin) = test_stats(Some("sjc/1"));
let mut consumer = origin.consume();
let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
let _t1 = bs1.publisher().track("video");
let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
let _t2 = bs2.publisher().track("video");
tokio::time::advance(Duration::from_millis(1)).await;
let (path, broadcast) = consumer.announced().await.expect("expected announce");
assert!(broadcast.is_some());
assert_eq!(path.as_str(), ".stats/node/sjc/1");
}
#[tokio::test(start_paused = true)]
async fn task_announces_without_node_suffix() {
let origin = Origin::random().produce();
let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let _t = bs.publisher().track("video");
tokio::time::advance(Duration::from_millis(1)).await;
let (path, broadcast) = consumer.announced().await.expect("expected announce");
assert!(broadcast.is_some());
assert_eq!(path.as_str(), ".stats/node");
}
async fn drive_ticks(count: u32) {
for _ in 0..count {
tokio::time::advance(Duration::from_secs(1)).await;
for _ in 0..4 {
tokio::task::yield_now().await;
}
}
}
#[tokio::test(start_paused = true)]
async fn live_entry_kept_while_idle() {
let (stats, _origin) = test_stats(Some("sjc"));
let key = PathOwned::from("foo/bar".to_string());
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let guard = bs.publisher();
drive_ticks(5).await;
assert!(
stats.shared().entries.lock().contains_key(&key),
"announced-but-idle broadcast must stay while the guard is held"
);
drop(guard);
drop(bs);
drive_ticks(1).await;
assert!(
!stats.shared().entries.lock().contains_key(&key),
"entry dropped once the announce guard closes"
);
}
#[tokio::test(start_paused = true)]
async fn entry_dropped_once_fully_closed() {
let (stats, _origin) = test_stats(Some("sjc"));
let key = PathOwned::from("foo/bar".to_string());
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
drive_ticks(1).await;
assert!(
stats.shared().entries.lock().contains_key(&key),
"live entry present while the track guard is held"
);
drop(track);
drop(bs);
drive_ticks(1).await;
assert!(
!stats.shared().entries.lock().contains_key(&key),
"fully-closed entry dropped on the next tick"
);
}
#[tokio::test(start_paused = true)]
async fn frame_emits_expected_counters() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
track.bytes(42);
track.frame();
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("expected announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
assert_eq!(snap.broadcasts, 1, "subs went 0->1, derived broadcasts++");
assert_eq!(snap.subscriptions, 1);
assert_eq!(snap.bytes, 42);
assert_eq!(snap.frames, 1);
}
#[tokio::test(start_paused = true)]
async fn announced_decouples_from_broadcasts() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let _guard = bs.publisher();
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.announced, 1);
assert_eq!(snap.broadcasts, 0, "no sub, no derived broadcasts");
assert_eq!(snap.subscriptions, 0);
}
#[tokio::test(start_paused = true)]
async fn short_lived_sub_is_surfaced() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
{
let track = bs.publisher().track("video");
track.bytes(123);
track.frame();
}
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.subscriptions, 1);
assert_eq!(snap.subscriptions_closed, 1);
assert_eq!(snap.broadcasts, 1, "flicker counts as one broadcast");
assert_eq!(snap.broadcasts_closed, 1);
assert_eq!(snap.bytes, 123);
assert_eq!(snap.frames, 1);
}
#[tokio::test(start_paused = true)]
async fn multiple_subs_count_as_one_broadcast() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let pub_guard = bs.publisher();
let t1 = pub_guard.track("video");
let t2 = pub_guard.track("audio");
drive_ticks(2).await;
{
let entries = stats.shared().entries.lock();
let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
let raw = entry.publisher[Tier::External.idx()].snapshot();
assert_eq!(raw.subscriptions, 2, "two track subs");
assert_eq!(raw.subscriptions_closed, 0, "neither dropped yet");
}
drop(t1);
drop(t2);
drive_ticks(1).await;
let entries = stats.shared().entries.lock();
let entry = entries
.get(&PathOwned::from("foo/bar"))
.expect("entry still live (publisher guard held)");
let raw = entry.publisher[Tier::External.idx()].snapshot();
assert_eq!(raw.subscriptions, 2);
assert_eq!(raw.subscriptions_closed, 2, "both dropped");
}
#[tokio::test(start_paused = true)]
async fn unused_slots_dont_surface() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
track.frame();
drive_ticks(2).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let pub_track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
assert!(
read_frame(pub_track).await.contains_key("foo/bar"),
"publisher.json must include the active foo/bar entry"
);
for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
let t = broadcast
.subscribe_track(&Track {
name: name.into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(t).await;
assert!(
frame.is_empty(),
"{name} must be empty for an entry with no activity on that slot, got {frame:?}",
);
}
}
#[test]
fn snapshot_reads_closed_before_open() {
let src = include_str!("stats.rs");
let body_start = src
.find("fn snapshot(&self) -> RawCounts")
.expect("snapshot fn present");
let body = &src[body_start..];
let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
let open_pos = body.find("self.announced.load(").expect("announced load");
assert!(
closed_pos < open_pos,
"announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
);
let subs_closed_pos = body
.find("self.subscriptions_closed.load")
.expect("subscriptions_closed load");
let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
assert!(
subs_closed_pos < subs_pos,
"subscriptions_closed must be loaded before subscriptions",
);
}
async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
let bytes = track.read_frame().await.expect("ok").expect("frame");
serde_json::from_slice(&bytes).expect("json parse")
}
}