use std::{
collections::{BTreeMap, HashMap},
sync::{
Arc, Weak,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use serde::Serialize;
use web_async::{Lock, spawn};
use crate::{AsPath, Broadcast, OriginProducer, Path, PathOwned, Track, TrackProducer};
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct Counters {
pub announced: AtomicU64,
pub announced_closed: AtomicU64,
pub subscriptions: AtomicU64,
pub subscriptions_closed: AtomicU64,
pub broadcasts: AtomicU64,
pub broadcasts_closed: AtomicU64,
pub bytes: AtomicU64,
pub frames: AtomicU64,
pub groups: AtomicU64,
}
impl Counters {
fn snapshot(&self) -> RawCounts {
let announced_closed = self.announced_closed.load(Ordering::Acquire);
let subscriptions_closed = self.subscriptions_closed.load(Ordering::Acquire);
let broadcasts_closed = self.broadcasts_closed.load(Ordering::Acquire);
let announced = self.announced.load(Ordering::Relaxed);
let subscriptions = self.subscriptions.load(Ordering::Relaxed);
let broadcasts = self.broadcasts.load(Ordering::Relaxed);
let bytes = self.bytes.load(Ordering::Relaxed);
let frames = self.frames.load(Ordering::Relaxed);
let groups = self.groups.load(Ordering::Relaxed);
RawCounts {
announced,
announced_closed,
broadcasts,
broadcasts_closed,
subscriptions,
subscriptions_closed,
bytes,
frames,
groups,
}
}
}
#[derive(Default, Debug)]
struct SessionCounters {
sessions: AtomicU64,
sessions_closed: AtomicU64,
}
impl SessionCounters {
fn snapshot(&self) -> (u64, u64) {
let closed = self.sessions_closed.load(Ordering::Acquire);
let open = self.sessions.load(Ordering::Relaxed);
(open, closed)
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
struct RawCounts {
announced: u64,
announced_closed: u64,
broadcasts: u64,
broadcasts_closed: u64,
subscriptions: u64,
subscriptions_closed: u64,
bytes: u64,
frames: u64,
groups: u64,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Tier {
External,
Internal,
}
impl Tier {
fn idx(self) -> usize {
match self {
Tier::External => 0,
Tier::Internal => 1,
}
}
}
#[derive(Clone)]
#[non_exhaustive]
pub struct StatsConfig {
pub origin: Option<OriginProducer>,
pub prefix: PathOwned,
pub node: Option<PathOwned>,
pub interval: Duration,
}
impl StatsConfig {
pub fn new() -> Self {
Self {
origin: None,
prefix: PathOwned::from(".stats"),
node: None,
interval: Duration::from_secs(1),
}
}
pub fn with_origin(mut self, origin: impl Into<Option<OriginProducer>>) -> Self {
self.origin = origin.into();
self
}
pub fn with_prefix(mut self, prefix: impl Into<PathOwned>) -> Self {
self.prefix = prefix.into();
self
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn with_node(mut self, node: impl Into<Option<PathOwned>>) -> Self {
self.node = node.into();
self
}
}
impl Default for StatsConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Stats {
prefix: PathOwned,
shared: Option<Arc<StatsShared>>,
}
struct StatsShared {
origin: OriginProducer,
entries: Lock<HashMap<PathOwned, Arc<BroadcastEntry>>>,
sessions: [Lock<HashMap<PathOwned, Arc<SessionCounters>>>; 2],
}
struct BroadcastEntry {
publisher: [Counters; 2],
subscriber: [Counters; 2],
}
impl BroadcastEntry {
fn new() -> Self {
Self {
publisher: Default::default(),
subscriber: Default::default(),
}
}
}
#[derive(Default)]
struct SlotState {
prev_emitted: Option<Snapshot>,
}
#[derive(Default)]
struct EntrySnapState {
publisher: [SlotState; 2],
subscriber: [SlotState; 2],
}
impl EntrySnapState {
fn zip_slots<'a>(&'a mut self, entry: &'a BroadcastEntry) -> [(&'static str, &'a Counters, &'a mut SlotState); 4] {
let [pub_ext_state, pub_int_state] = &mut self.publisher;
let [sub_ext_state, sub_int_state] = &mut self.subscriber;
[
("publisher.json", &entry.publisher[Tier::External.idx()], pub_ext_state),
(
"subscriber.json",
&entry.subscriber[Tier::External.idx()],
sub_ext_state,
),
(
"internal/publisher.json",
&entry.publisher[Tier::Internal.idx()],
pub_int_state,
),
(
"internal/subscriber.json",
&entry.subscriber[Tier::Internal.idx()],
sub_int_state,
),
]
}
}
const NUM_SLOTS: usize = 4;
const TRACK_ORDER: [&str; NUM_SLOTS] = [
"publisher.json",
"subscriber.json",
"internal/publisher.json",
"internal/subscriber.json",
];
const SESSION_TRACK_ORDER: [&str; 2] = ["sessions.json", "internal/sessions.json"];
impl Stats {
pub fn new(config: StatsConfig) -> Self {
let StatsConfig {
origin,
prefix,
node,
interval,
} = config;
let node = node.filter(|p| !p.is_empty());
let shared = origin.map(|origin| {
let shared = Arc::new(StatsShared {
origin,
entries: Lock::default(),
sessions: Default::default(),
});
let advertised = advertised_path(&prefix, node.as_ref().map(|p| p.as_str()));
spawn(run_publisher(Arc::downgrade(&shared), advertised, interval));
shared
});
Self { prefix, shared }
}
pub fn prefix(&self) -> &Path<'static> {
&self.prefix
}
#[cfg(test)]
fn shared(&self) -> &Arc<StatsShared> {
self.shared.as_ref().expect("enabled stats aggregator")
}
pub fn tier(&self, tier: Tier) -> StatsHandle {
StatsHandle {
stats: self.clone(),
tier,
}
}
fn entry(&self, path: impl AsPath) -> Option<Arc<BroadcastEntry>> {
let shared = self.shared.as_ref()?;
let path = path.as_path();
if path.has_prefix(&self.prefix) {
return None;
}
let owned = path.to_owned();
let mut entries = shared.entries.lock();
Some(
entries
.entry(owned)
.or_insert_with(|| Arc::new(BroadcastEntry::new()))
.clone(),
)
}
fn session_counters(&self, tier: Tier, root: impl AsPath) -> Option<Arc<SessionCounters>> {
let shared = self.shared.as_ref()?;
let owned = root.as_path().to_owned();
let mut sessions = shared.sessions[tier.idx()].lock();
Some(sessions.entry(owned).or_default().clone())
}
}
impl Default for Stats {
fn default() -> Self {
Self::new(StatsConfig::new())
}
}
#[derive(Clone)]
pub struct StatsHandle {
stats: Stats,
tier: Tier,
}
impl StatsHandle {
pub fn parent(&self) -> &Stats {
&self.stats
}
pub fn tier(&self) -> Tier {
self.tier
}
pub fn broadcast(&self, path: impl AsPath) -> BroadcastStats {
BroadcastStats {
entry: self.stats.entry(path),
tier: self.tier,
}
}
pub fn publisher_broadcasts(&self) -> SessionBroadcasts {
SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Publisher)
}
pub fn subscriber_broadcasts(&self) -> SessionBroadcasts {
SessionBroadcasts::new(self.stats.clone(), self.tier, Side::Subscriber)
}
pub fn session(&self, root: impl AsPath) -> SessionStats {
SessionStats::new(self.stats.session_counters(self.tier, root))
}
}
impl Default for StatsHandle {
fn default() -> Self {
Stats::default().tier(Tier::External)
}
}
#[derive(Clone)]
pub struct BroadcastStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl BroadcastStats {
pub fn is_empty(&self) -> bool {
self.entry.is_none()
}
pub fn publisher(&self) -> PublisherStats {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.announced
.fetch_add(1, Ordering::Relaxed);
}
PublisherStats {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn subscriber(&self) -> SubscriberStats {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.announced
.fetch_add(1, Ordering::Relaxed);
}
SubscriberStats {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn publisher_track(&self, _name: &str) -> PublisherTrack {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.subscriptions
.fetch_add(1, Ordering::Relaxed);
}
PublisherTrack {
entry: self.entry.clone(),
tier: self.tier,
}
}
pub fn subscriber_track(&self, _name: &str) -> SubscriberTrack {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.subscriptions
.fetch_add(1, Ordering::Relaxed);
}
SubscriberTrack {
entry: self.entry.clone(),
tier: self.tier,
}
}
}
#[derive(Copy, Clone)]
enum Side {
Publisher,
Subscriber,
}
impl Side {
fn counters(self, entry: &BroadcastEntry, tier: Tier) -> &Counters {
match self {
Side::Publisher => &entry.publisher[tier.idx()],
Side::Subscriber => &entry.subscriber[tier.idx()],
}
}
}
#[derive(Clone)]
pub struct SessionBroadcasts {
stats: Stats,
tier: Tier,
side: Side,
counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
}
impl SessionBroadcasts {
fn new(stats: Stats, tier: Tier, side: Side) -> Self {
Self {
stats,
tier,
side,
counts: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub fn subscribe(&self, path: impl AsPath) -> BroadcastSubscription {
let path = path.as_path().to_owned();
let entry = self.stats.entry(&path);
let first = {
let mut counts = self.counts.lock().expect("stats refcount poisoned");
let n = counts.entry(path.clone()).or_insert(0);
let first = *n == 0;
*n += 1;
first
};
if first {
if let Some(entry) = &entry {
self.side
.counters(entry, self.tier)
.broadcasts
.fetch_add(1, Ordering::Relaxed);
}
}
BroadcastSubscription {
entry,
tier: self.tier,
side: self.side,
counts: self.counts.clone(),
path,
}
}
}
#[must_use = "drop the guard to release the subscription"]
pub struct BroadcastSubscription {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
side: Side,
counts: Arc<std::sync::Mutex<HashMap<PathOwned, u32>>>,
path: PathOwned,
}
impl Drop for BroadcastSubscription {
fn drop(&mut self) {
let last = {
let mut counts = self.counts.lock().expect("stats refcount poisoned");
match counts.get_mut(&self.path) {
Some(n) => {
*n -= 1;
if *n == 0 {
counts.remove(&self.path);
true
} else {
false
}
}
None => false,
}
};
if last {
if let Some(entry) = &self.entry {
self.side
.counters(entry, self.tier)
.broadcasts_closed
.fetch_add(1, Ordering::Release);
}
}
}
}
#[must_use = "drop the guard to record the session as closed"]
pub struct SessionStats {
counters: Option<Arc<SessionCounters>>,
}
impl SessionStats {
fn new(counters: Option<Arc<SessionCounters>>) -> Self {
if let Some(counters) = &counters {
counters.sessions.fetch_add(1, Ordering::Relaxed);
}
Self { counters }
}
}
impl Drop for SessionStats {
fn drop(&mut self) {
if let Some(counters) = &self.counters {
counters.sessions_closed.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the broadcast as closed"]
pub struct PublisherStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl PublisherStats {
pub fn track(&self, name: &str) -> PublisherTrack {
BroadcastStats {
entry: self.entry.clone(),
tier: self.tier,
}
.publisher_track(name)
}
}
impl Drop for PublisherStats {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.announced_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the broadcast as closed"]
pub struct SubscriberStats {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl SubscriberStats {
pub fn track(&self, name: &str) -> SubscriberTrack {
BroadcastStats {
entry: self.entry.clone(),
tier: self.tier,
}
.subscriber_track(name)
}
}
impl Drop for SubscriberStats {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.announced_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the subscription as closed"]
pub struct PublisherTrack {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl PublisherTrack {
pub fn frame(&self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
}
}
pub fn bytes(&self, n: u64) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
}
}
pub fn group(&self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Drop for PublisherTrack {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.publisher[self.tier.idx()]
.subscriptions_closed
.fetch_add(1, Ordering::Release);
}
}
}
#[must_use = "drop the guard to record the subscription as closed"]
pub struct SubscriberTrack {
entry: Option<Arc<BroadcastEntry>>,
tier: Tier,
}
impl SubscriberTrack {
pub fn frame(&self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].frames.fetch_add(1, Ordering::Relaxed);
}
}
pub fn bytes(&self, n: u64) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].bytes.fetch_add(n, Ordering::Relaxed);
}
}
pub fn group(&self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()].groups.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Drop for SubscriberTrack {
fn drop(&mut self) {
if let Some(entry) = &self.entry {
entry.subscriber[self.tier.idx()]
.subscriptions_closed
.fetch_add(1, Ordering::Release);
}
}
}
fn process_slot(counters: &Counters, slot_state: &mut SlotState, mut emit: impl FnMut(Snapshot)) {
let raw = counters.snapshot();
let snap = Snapshot {
announced: raw.announced,
announced_closed: raw.announced_closed,
broadcasts: raw.broadcasts,
broadcasts_closed: raw.broadcasts_closed,
subscriptions: raw.subscriptions,
subscriptions_closed: raw.subscriptions_closed,
bytes: raw.bytes,
frames: raw.frames,
groups: raw.groups,
};
let live = snap.announced != snap.announced_closed
|| snap.subscriptions != snap.subscriptions_closed
|| snap.broadcasts != snap.broadcasts_closed;
let prev_snap = slot_state.prev_emitted.unwrap_or_default();
let changed = snap != prev_snap;
if changed {
slot_state.prev_emitted = Some(snap);
}
if live || changed {
emit(snap);
}
}
#[derive(Default)]
struct SessionSlotState {
prev_emitted: Option<SessionSnapshot>,
}
fn process_session_slot(
counters: &SessionCounters,
slot_state: &mut SessionSlotState,
mut emit: impl FnMut(SessionSnapshot),
) {
let (sessions, sessions_closed) = counters.snapshot();
let snap = SessionSnapshot {
sessions,
sessions_closed,
};
let live = sessions != sessions_closed;
let prev_snap = slot_state.prev_emitted.unwrap_or_default();
let changed = snap != prev_snap;
if changed {
slot_state.prev_emitted = Some(snap);
}
if live || changed {
emit(snap);
}
}
fn flush_track<T: Serialize>(track: &mut TrackProducer, frame: &T, last: &mut Vec<u8>, name: &str) {
let json = match serde_json::to_vec(frame) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, name, "stats: failed to serialize frame");
return;
}
};
if &json == last {
return;
}
if let Err(err) = track.write_frame(json.clone()) {
tracing::debug!(?err, name, "stats: failed to write frame");
return;
}
*last = json;
}
async fn run_publisher(weak: Weak<StatsShared>, advertised: PathOwned, interval: Duration) {
let Some(shared) = weak.upgrade() else {
return;
};
let mut broadcast = Broadcast::new().produce();
let create = |broadcast: &mut crate::BroadcastProducer, name: &str| match broadcast.create_track(Track {
name: name.into(),
priority: 0,
}) {
Ok(t) => Some(t),
Err(err) => {
tracing::warn!(?err, name, "stats: failed to create track");
None
}
};
let mut tracks: Vec<TrackProducer> = Vec::with_capacity(NUM_SLOTS);
for name in TRACK_ORDER {
let Some(t) = create(&mut broadcast, name) else {
return;
};
tracks.push(t);
}
let mut session_tracks: Vec<TrackProducer> = Vec::with_capacity(SESSION_TRACK_ORDER.len());
for name in SESSION_TRACK_ORDER {
let Some(t) = create(&mut broadcast, name) else {
return;
};
session_tracks.push(t);
}
if !shared.origin.publish_broadcast(&advertised, broadcast.consume()) {
tracing::warn!(advertised = %advertised, "stats: origin rejected stats broadcast");
return;
}
drop(shared);
let mut local: HashMap<PathOwned, EntrySnapState> = HashMap::new();
let mut last_payload: [Vec<u8>; NUM_SLOTS] = Default::default();
let mut session_local: [HashMap<PathOwned, SessionSlotState>; 2] = Default::default();
let mut session_last_payload: [Vec<u8>; 2] = Default::default();
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let Some(shared) = weak.upgrade() else {
return;
};
let entries: Vec<(PathOwned, Arc<BroadcastEntry>)> = {
let map = shared.entries.lock();
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut frames: [BTreeMap<String, Snapshot>; NUM_SLOTS] = Default::default();
for (path, entry) in &entries {
let snap_state = local.entry(path.clone()).or_default();
for (i, (_track_name, counters, slot_state)) in snap_state.zip_slots(entry).into_iter().enumerate() {
process_slot(counters, slot_state, |snap| {
frames[i].insert(path.as_str().to_string(), snap);
});
}
}
drop(entries);
{
let mut map = shared.entries.lock();
map.retain(|_, entry| Arc::strong_count(entry) > 1);
local.retain(|path, _| map.contains_key(path));
}
let mut session_frames: [BTreeMap<String, SessionSnapshot>; 2] = Default::default();
for tier_idx in 0..2 {
let roots: Vec<(PathOwned, Arc<SessionCounters>)> = {
let map = shared.sessions[tier_idx].lock();
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let states = &mut session_local[tier_idx];
for (root, counters) in &roots {
let state = states.entry(root.clone()).or_default();
process_session_slot(counters, state, |snap| {
session_frames[tier_idx].insert(root.as_str().to_string(), snap);
});
}
drop(roots);
let mut map = shared.sessions[tier_idx].lock();
map.retain(|_, counters| Arc::strong_count(counters) > 1);
states.retain(|root, _| map.contains_key(root));
}
for (i, (frame, last)) in frames.iter().zip(last_payload.iter_mut()).enumerate() {
flush_track(&mut tracks[i], frame, last, TRACK_ORDER[i]);
}
for (i, (frame, last)) in session_frames.iter().zip(session_last_payload.iter_mut()).enumerate() {
flush_track(&mut session_tracks[i], frame, last, SESSION_TRACK_ORDER[i]);
}
drop(shared);
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
#[cfg_attr(test, derive(serde::Deserialize))]
struct Snapshot {
announced: u64,
announced_closed: u64,
broadcasts: u64,
broadcasts_closed: u64,
subscriptions: u64,
subscriptions_closed: u64,
bytes: u64,
frames: u64,
groups: u64,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
#[cfg_attr(test, derive(serde::Deserialize))]
struct SessionSnapshot {
sessions: u64,
sessions_closed: u64,
}
fn advertised_path(prefix: &Path, node: Option<&str>) -> PathOwned {
let mut out = format!("{}/node", prefix.as_str());
if let Some(node) = node {
out.push('/');
out.push_str(node);
}
PathOwned::from(out)
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::atomic::Ordering::Relaxed};
use crate::{Origin, Path};
use super::*;
fn test_stats(node: Option<&str>) -> (Stats, OriginProducer) {
let origin = Origin::random().produce();
let stats = Stats::new(
StatsConfig::new()
.with_origin(origin.clone())
.with_node(node.map(|s| PathOwned::from(s.to_string()))),
);
(stats, origin)
}
#[test]
fn advertised_path_with_and_without_node() {
let prefix = Path::new(".stats");
assert_eq!(advertised_path(&prefix, Some("sjc")).as_str(), ".stats/node/sjc");
assert_eq!(advertised_path(&prefix, Some("sjc/1")).as_str(), ".stats/node/sjc/1");
assert_eq!(advertised_path(&prefix, None).as_str(), ".stats/node");
let prefix = Path::new("metrics");
assert_eq!(advertised_path(&prefix, Some("lon")).as_str(), "metrics/node/lon");
}
async fn announced_path_for_node(node: &str) -> String {
let origin = Origin::random().produce();
let _stats = Stats::new(
StatsConfig::new()
.with_origin(origin.clone())
.with_node(PathOwned::from(node.to_string())),
);
let mut consumer = origin.consume();
tokio::time::advance(Duration::from_millis(1)).await;
let (path, _broadcast) = consumer.announced().await.expect("expected announce");
path.as_str().to_string()
}
#[tokio::test(start_paused = true)]
async fn new_normalizes_and_drops_empty_node() {
assert_eq!(announced_path_for_node("/sjc//1/").await, ".stats/node/sjc/1");
assert_eq!(announced_path_for_node("///").await, ".stats/node");
}
#[tokio::test(start_paused = true)]
async fn per_broadcast_counters_isolated() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs1 = stats.tier(Tier::External).broadcast("demo/bbb");
let bs2 = stats.tier(Tier::External).broadcast("demo/ccc");
let g1 = bs1.publisher().track("video");
g1.bytes(100);
let g2 = bs2.publisher().track("video");
g2.bytes(7);
let entries = stats.shared().entries.lock();
let e1 = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
let e2 = entries.get(&PathOwned::from("demo/ccc")).expect("entry");
assert_eq!(e1.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
assert_eq!(e2.publisher[Tier::External.idx()].bytes.load(Relaxed), 7);
}
#[tokio::test(start_paused = true)]
async fn external_and_internal_tiers_are_independent() {
let (stats, _origin) = test_stats(Some("sjc"));
let ext = stats.tier(Tier::External);
let int = stats.tier(Tier::Internal);
let ext_track = ext.broadcast("demo/bbb").publisher().track("video");
ext_track.bytes(100);
let int_track = int.broadcast("demo/bbb").subscriber().track("audio");
int_track.bytes(7);
let entries = stats.shared().entries.lock();
let entry = entries.get(&PathOwned::from("demo/bbb")).expect("entry");
assert_eq!(entry.publisher[Tier::External.idx()].bytes.load(Relaxed), 100);
assert_eq!(entry.subscriber[Tier::External.idx()].bytes.load(Relaxed), 0);
assert_eq!(entry.publisher[Tier::Internal.idx()].bytes.load(Relaxed), 0);
assert_eq!(entry.subscriber[Tier::Internal.idx()].bytes.load(Relaxed), 7);
}
#[tokio::test(start_paused = true)]
async fn paths_under_prefix_are_no_op() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs = stats.tier(Tier::External).broadcast(".stats/node/sjc");
assert!(bs.is_empty());
let p = bs.publisher();
let track = p.track("video");
track.bytes(100);
drop(track);
drop(p);
assert!(stats.shared().entries.lock().is_empty());
}
#[tokio::test(start_paused = true)]
async fn disabled_stats_are_noop() {
let stats = Stats::default();
assert!(stats.shared.is_none());
let bs = stats.tier(Tier::External).broadcast("demo/bbb");
assert!(bs.is_empty());
let p = bs.publisher();
let track = p.track("video");
track.bytes(100);
drop(track);
drop(p);
}
#[tokio::test(start_paused = true)]
async fn single_broadcast_path_announced() {
let (stats, origin) = test_stats(Some("sjc/1"));
let mut consumer = origin.consume();
let bs1 = stats.tier(Tier::External).broadcast("foo/bar");
let _t1 = bs1.publisher().track("video");
let bs2 = stats.tier(Tier::External).broadcast("baz/qux");
let _t2 = bs2.publisher().track("video");
tokio::time::advance(Duration::from_millis(1)).await;
let (path, broadcast) = consumer.announced().await.expect("expected announce");
assert!(broadcast.is_some());
assert_eq!(path.as_str(), ".stats/node/sjc/1");
}
#[tokio::test(start_paused = true)]
async fn task_announces_without_node_suffix() {
let origin = Origin::random().produce();
let stats = Stats::new(StatsConfig::new().with_origin(origin.clone()));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let _t = bs.publisher().track("video");
tokio::time::advance(Duration::from_millis(1)).await;
let (path, broadcast) = consumer.announced().await.expect("expected announce");
assert!(broadcast.is_some());
assert_eq!(path.as_str(), ".stats/node");
}
async fn drive_ticks(count: u32) {
for _ in 0..count {
tokio::time::advance(Duration::from_secs(1)).await;
for _ in 0..4 {
tokio::task::yield_now().await;
}
}
}
#[tokio::test(start_paused = true)]
async fn live_entry_kept_while_idle() {
let (stats, _origin) = test_stats(Some("sjc"));
let key = PathOwned::from("foo/bar".to_string());
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let guard = bs.publisher();
drive_ticks(5).await;
assert!(
stats.shared().entries.lock().contains_key(&key),
"announced-but-idle broadcast must stay while the guard is held"
);
drop(guard);
drop(bs);
drive_ticks(1).await;
assert!(
!stats.shared().entries.lock().contains_key(&key),
"entry dropped once the announce guard closes"
);
}
#[tokio::test(start_paused = true)]
async fn entry_dropped_once_fully_closed() {
let (stats, _origin) = test_stats(Some("sjc"));
let key = PathOwned::from("foo/bar".to_string());
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
drive_ticks(1).await;
assert!(
stats.shared().entries.lock().contains_key(&key),
"live entry present while the track guard is held"
);
drop(track);
drop(bs);
drive_ticks(1).await;
assert!(
!stats.shared().entries.lock().contains_key(&key),
"fully-closed entry dropped on the next tick"
);
}
#[tokio::test(start_paused = true)]
async fn frame_emits_expected_counters() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
track.bytes(42);
track.frame();
let sessions = stats.tier(Tier::External).publisher_broadcasts();
let _sub = sessions.subscribe("foo/bar");
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("expected announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.announced, 1, "publisher() guard bumps announced");
assert_eq!(snap.broadcasts, 1, "one session subscribed");
assert_eq!(snap.subscriptions, 1);
assert_eq!(snap.bytes, 42);
assert_eq!(snap.frames, 1);
}
#[tokio::test(start_paused = true)]
async fn announced_decouples_from_broadcasts() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let _guard = bs.publisher();
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.announced, 1);
assert_eq!(snap.broadcasts, 0, "no subscription, no broadcasts sentinel");
assert_eq!(snap.subscriptions, 0);
}
#[tokio::test(start_paused = true)]
async fn short_lived_sub_is_surfaced() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let sessions = stats.tier(Tier::External).publisher_broadcasts();
{
let track = bs.publisher().track("video");
track.bytes(123);
track.frame();
let _sub = sessions.subscribe("foo/bar");
}
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(track).await;
let snap = frame.get("foo/bar").expect("foo/bar entry");
assert_eq!(snap.subscriptions, 1);
assert_eq!(snap.subscriptions_closed, 1);
assert_eq!(snap.broadcasts, 1, "one session subscribed");
assert_eq!(snap.broadcasts_closed, 1);
assert_eq!(snap.bytes, 123);
assert_eq!(snap.frames, 1);
}
#[tokio::test(start_paused = true)]
async fn multiple_subs_count_as_one_broadcast() {
let (stats, _origin) = test_stats(Some("sjc"));
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let sessions = stats.tier(Tier::External).publisher_broadcasts();
let pub_guard = bs.publisher();
let t1 = pub_guard.track("video");
let t2 = pub_guard.track("audio");
let s1 = sessions.subscribe("foo/bar");
let s2 = sessions.subscribe("foo/bar");
let raw = || {
let entries = stats.shared().entries.lock();
let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
entry.publisher[Tier::External.idx()].snapshot()
};
let r = raw();
assert_eq!(r.subscriptions, 2, "two track subs");
assert_eq!(r.subscriptions_closed, 0, "neither dropped yet");
assert_eq!(r.broadcasts, 1, "one session => one broadcast");
assert_eq!(r.broadcasts_closed, 0);
drop(s1);
assert_eq!(raw().broadcasts_closed, 0, "session still has a sub open");
drop(s2);
drop(t1);
drop(t2);
let r = raw();
assert_eq!(r.subscriptions_closed, 2, "both track subs dropped");
assert_eq!(r.broadcasts, 1);
assert_eq!(r.broadcasts_closed, 1, "last sub closed => one broadcasts_closed");
drop(pub_guard);
drop(bs);
}
#[tokio::test(start_paused = true)]
async fn distinct_sessions_count_as_separate_broadcasts() {
let (stats, _origin) = test_stats(Some("sjc"));
let viewer1 = stats.tier(Tier::External).publisher_broadcasts();
let viewer2 = stats.tier(Tier::External).publisher_broadcasts();
let raw = || {
let entries = stats.shared().entries.lock();
let entry = entries.get(&PathOwned::from("foo/bar")).expect("entry");
entry.publisher[Tier::External.idx()].snapshot()
};
let s1 = viewer1.subscribe("foo/bar");
assert_eq!(raw().broadcasts, 1, "one viewer");
let s2 = viewer2.subscribe("foo/bar");
assert_eq!(raw().broadcasts, 2, "two distinct viewers");
assert_eq!(raw().broadcasts_closed, 0);
drop(s1);
let r = raw();
assert_eq!(r.broadcasts, 2, "broadcasts is cumulative");
assert_eq!(r.broadcasts_closed, 1, "one viewer left");
drop(s2);
assert_eq!(raw().broadcasts_closed, 2, "both viewers gone");
}
#[tokio::test(start_paused = true)]
async fn session_counts_by_root() {
let (stats, _origin) = test_stats(Some("sjc"));
let ext = stats.tier(Tier::External);
let snap = |root: &str| {
let map = stats.shared().sessions[Tier::External.idx()].lock();
map.get(&PathOwned::from(root.to_string())).map(|c| c.snapshot())
};
let a1 = ext.session("acme");
let a2 = ext.session("acme");
let b1 = ext.session("globex");
assert_eq!(snap("acme"), Some((2, 0)), "two sessions under one root");
assert_eq!(snap("globex"), Some((1, 0)), "a distinct root is counted separately");
drop(a1);
assert_eq!(snap("acme"), Some((2, 1)));
drop(a2);
drop(b1);
assert_eq!(snap("acme"), Some((2, 2)));
assert_eq!(snap("globex"), Some((1, 1)));
}
#[tokio::test(start_paused = true)]
async fn session_track_surfaces_by_root() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let _a = stats.tier(Tier::External).session("acme");
let _b = stats.tier(Tier::External).session("acme");
let _c = stats.tier(Tier::Internal).session("peer");
tokio::time::advance(Duration::from_millis(1100)).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let track = broadcast
.subscribe_track(&Track {
name: "sessions.json".into(),
priority: 0,
})
.expect("subscribe");
let frame = read_session_frame(track).await;
let snap = frame.get("acme").expect("root entry");
assert_eq!(snap.sessions, 2);
assert_eq!(snap.sessions_closed, 0);
assert!(
!frame.contains_key("peer"),
"internal session must not appear on the external track"
);
let int_track = broadcast
.subscribe_track(&Track {
name: "internal/sessions.json".into(),
priority: 0,
})
.expect("subscribe");
let snap = *read_session_frame(int_track).await.get("peer").expect("internal entry");
assert_eq!(snap.sessions, 1);
}
#[tokio::test(start_paused = true)]
async fn session_root_dropped_when_empty() {
let (stats, _origin) = test_stats(Some("sjc"));
let key = PathOwned::from("acme");
let session = stats.tier(Tier::External).session("acme");
drive_ticks(1).await;
assert!(
stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
"root present while a session is connected"
);
drop(session);
drive_ticks(1).await;
assert!(
!stats.shared().sessions[Tier::External.idx()].lock().contains_key(&key),
"root GC'd after the last session leaves"
);
}
#[tokio::test(start_paused = true)]
async fn unused_slots_dont_surface() {
let (stats, origin) = test_stats(Some("sjc"));
let mut consumer = origin.consume();
let bs = stats.tier(Tier::External).broadcast("foo/bar");
let track = bs.publisher().track("video");
track.frame();
drive_ticks(2).await;
let (_path, broadcast) = consumer.announced().await.expect("announce");
let broadcast = broadcast.expect("active");
let pub_track = broadcast
.subscribe_track(&Track {
name: "publisher.json".into(),
priority: 0,
})
.expect("subscribe");
assert!(
read_frame(pub_track).await.contains_key("foo/bar"),
"publisher.json must include the active foo/bar entry"
);
for name in ["subscriber.json", "internal/publisher.json", "internal/subscriber.json"] {
let t = broadcast
.subscribe_track(&Track {
name: name.into(),
priority: 0,
})
.expect("subscribe");
let frame = read_frame(t).await;
assert!(
frame.is_empty(),
"{name} must be empty for an entry with no activity on that slot, got {frame:?}",
);
}
}
#[test]
fn snapshot_reads_closed_before_open() {
let src = include_str!("stats.rs");
let body_start = src
.find("fn snapshot(&self) -> RawCounts")
.expect("snapshot fn present");
let body = &src[body_start..];
let closed_pos = body.find("self.announced_closed.load").expect("announced_closed load");
let open_pos = body.find("self.announced.load(").expect("announced load");
assert!(
closed_pos < open_pos,
"announced_closed must be loaded before announced; reversing breaks the open>=closed invariant",
);
let subs_closed_pos = body
.find("self.subscriptions_closed.load")
.expect("subscriptions_closed load");
let subs_pos = body.find("self.subscriptions.load").expect("subscriptions load");
assert!(
subs_closed_pos < subs_pos,
"subscriptions_closed must be loaded before subscriptions",
);
let bcast_closed_pos = body
.find("self.broadcasts_closed.load")
.expect("broadcasts_closed load");
let bcast_pos = body.find("self.broadcasts.load").expect("broadcasts load");
assert!(
bcast_closed_pos < bcast_pos,
"broadcasts_closed must be loaded before broadcasts",
);
}
#[test]
fn session_snapshot_reads_closed_before_open() {
let src = include_str!("stats.rs");
let body_start = src
.find("fn snapshot(&self) -> (u64, u64)")
.expect("SessionCounters::snapshot fn present");
let body = &src[body_start..];
let closed_pos = body.find("self.sessions_closed.load").expect("sessions_closed load");
let open_pos = body.find("self.sessions.load").expect("sessions load");
assert!(closed_pos < open_pos, "sessions_closed must be loaded before sessions",);
}
async fn read_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, Snapshot> {
let bytes = track.read_frame().await.expect("ok").expect("frame");
serde_json::from_slice(&bytes).expect("json parse")
}
async fn read_session_frame(mut track: crate::TrackConsumer) -> BTreeMap<String, SessionSnapshot> {
let bytes = track.read_frame().await.expect("ok").expect("frame");
serde_json::from_slice(&bytes).expect("json parse")
}
}