use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::types::EpicsValue;
use tokio::sync::{Mutex, RwLock};
use super::cache::PvCache;
pub struct Stats {
prefix: String,
pub total_events: AtomicU64,
pub put_count: AtomicU64,
pub read_only_rejects: AtomicU64,
pub heartbeat: AtomicU64,
pub post_event_count: AtomicU64,
pub loop_count: AtomicU64,
per_host: Mutex<HashSet<String>>,
last_refresh: Mutex<Instant>,
last_total_events: AtomicU64,
}
impl Stats {
pub fn new(prefix: String) -> Self {
Self {
prefix,
total_events: AtomicU64::new(0),
put_count: AtomicU64::new(0),
read_only_rejects: AtomicU64::new(0),
heartbeat: AtomicU64::new(0),
post_event_count: AtomicU64::new(0),
loop_count: AtomicU64::new(0),
per_host: Mutex::new(HashSet::new()),
last_refresh: Mutex::new(Instant::now()),
last_total_events: AtomicU64::new(0),
}
}
pub fn record_event(&self) {
self.total_events.fetch_add(1, Ordering::Relaxed);
}
pub fn record_put(&self) {
self.put_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_post_event(&self) {
self.post_event_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_loop(&self) {
self.loop_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_readonly_reject(&self) {
self.read_only_rejects.fetch_add(1, Ordering::Relaxed);
}
pub async fn record_host(&self, host: &str) {
self.per_host.lock().await.insert(host.to_string());
}
pub async fn forget_host(&self, host: &str) {
self.per_host.lock().await.remove(host);
}
pub async fn host_count(&self) -> usize {
self.per_host.lock().await.len()
}
pub async fn publish_initial(&self, db: &PvDatabase) {
let p = &self.prefix;
if p.is_empty() {
return;
}
for (suffix, init) in [
("totalPvs", EpicsValue::Long(0)),
("upstreamCount", EpicsValue::Long(0)),
("connectingCount", EpicsValue::Long(0)),
("activeCount", EpicsValue::Long(0)),
("inactiveCount", EpicsValue::Long(0)),
("deadCount", EpicsValue::Long(0)),
("eventRate", EpicsValue::Double(0.0)),
("totalEvents", EpicsValue::Long(0)),
("heartbeat", EpicsValue::Long(0)),
("putCount", EpicsValue::Long(0)),
("readOnlyRejects", EpicsValue::Long(0)),
("perHostConnections", EpicsValue::Long(0)),
("vctotal", EpicsValue::Long(0)),
("pvtotal", EpicsValue::Long(0)),
("connected", EpicsValue::Long(0)),
("active", EpicsValue::Long(0)),
("inactive", EpicsValue::Long(0)),
("unconnected", EpicsValue::Long(0)),
("dead", EpicsValue::Long(0)),
("connecting", EpicsValue::Long(0)),
("disconnected", EpicsValue::Long(0)),
("clientEventRate", EpicsValue::Double(0.0)),
("fd", EpicsValue::Long(0)),
("clientEventCount", EpicsValue::Long(0)),
("postEventCount", EpicsValue::Long(0)),
("loopCount", EpicsValue::Long(0)),
] {
let pv = format!("{p}{suffix}");
if let Err(e) = db.add_pv(&pv, init).await {
tracing::warn!(
pv = %pv,
error = %e,
"ca_gateway stats: pre-register skipped (name already in use)"
);
}
}
}
pub async fn refresh(
&self,
cache: &RwLock<PvCache>,
db: &PvDatabase,
cache_size: usize,
upstream_count: usize,
) {
if self.prefix.is_empty() {
return;
}
let cache_guard = cache.read().await;
let (connecting, active, inactive, dead, _disconnect) = cache_guard.count_states().await;
drop(cache_guard);
let now = Instant::now();
let mut last = self.last_refresh.lock().await;
let elapsed = now.duration_since(*last).as_secs_f64();
*last = now;
drop(last);
let total_events = self.total_events.load(Ordering::Relaxed);
let last_events = self.last_total_events.swap(total_events, Ordering::Relaxed);
let delta = total_events.saturating_sub(last_events);
let event_rate = if elapsed > 0.0 {
delta as f64 / elapsed
} else {
0.0
};
let put_count = self.put_count.load(Ordering::Relaxed);
let readonly = self.read_only_rejects.load(Ordering::Relaxed);
let heartbeat = self.heartbeat.load(Ordering::Relaxed);
let host_count = self.host_count().await;
let post_event_count = self.post_event_count.load(Ordering::Relaxed);
let loop_count = self.loop_count.load(Ordering::Relaxed);
let client_event_count = total_events;
let p = &self.prefix;
let n_total = format!("{p}totalPvs");
let n_upstream = format!("{p}upstreamCount");
let n_connecting = format!("{p}connectingCount");
let n_active = format!("{p}activeCount");
let n_inactive = format!("{p}inactiveCount");
let n_dead = format!("{p}deadCount");
let n_rate = format!("{p}eventRate");
let n_events = format!("{p}totalEvents");
let n_heartbeat = format!("{p}heartbeat");
let n_put = format!("{p}putCount");
let n_readonly = format!("{p}readOnlyRejects");
let n_hosts = format!("{p}perHostConnections");
let n_vctotal = format!("{p}vctotal");
let n_pvtotal = format!("{p}pvtotal");
let n_connected = format!("{p}connected");
let n_active_alias = format!("{p}active");
let n_inactive_alias = format!("{p}inactive");
let n_unconnected = format!("{p}unconnected");
let n_dead_alias = format!("{p}dead");
let n_connecting_alias = format!("{p}connecting");
let n_disconnected = format!("{p}disconnected");
let n_client_event_rate = format!("{p}clientEventRate");
let n_fd = format!("{p}fd");
let n_client_event_count = format!("{p}clientEventCount");
let n_post_event_count = format!("{p}postEventCount");
let n_loop_count = format!("{p}loopCount");
let connected = (active + inactive) as i32;
let unconnected = (connecting + dead) as i32;
let fd_count = open_fd_count();
let _ = tokio::join!(
db.put_pv_and_post(&n_total, EpicsValue::Long(cache_size as i32)),
db.put_pv_and_post(&n_upstream, EpicsValue::Long(upstream_count as i32)),
db.put_pv_and_post(&n_connecting, EpicsValue::Long(connecting as i32)),
db.put_pv_and_post(&n_active, EpicsValue::Long(active as i32)),
db.put_pv_and_post(&n_inactive, EpicsValue::Long(inactive as i32)),
db.put_pv_and_post(&n_dead, EpicsValue::Long(dead as i32)),
db.put_pv_and_post(&n_rate, EpicsValue::Double(event_rate)),
db.put_pv_and_post(&n_events, EpicsValue::Long(total_events as i32)),
db.put_pv_and_post(&n_heartbeat, EpicsValue::Long(heartbeat as i32)),
db.put_pv_and_post(&n_put, EpicsValue::Long(put_count as i32)),
db.put_pv_and_post(&n_readonly, EpicsValue::Long(readonly as i32)),
db.put_pv_and_post(&n_hosts, EpicsValue::Long(host_count as i32)),
db.put_pv_and_post(&n_vctotal, EpicsValue::Long(cache_size as i32)),
db.put_pv_and_post(&n_pvtotal, EpicsValue::Long(cache_size as i32)),
db.put_pv_and_post(&n_connected, EpicsValue::Long(connected)),
db.put_pv_and_post(&n_active_alias, EpicsValue::Long(active as i32)),
db.put_pv_and_post(&n_inactive_alias, EpicsValue::Long(inactive as i32)),
db.put_pv_and_post(&n_unconnected, EpicsValue::Long(unconnected)),
db.put_pv_and_post(&n_dead_alias, EpicsValue::Long(dead as i32)),
db.put_pv_and_post(&n_connecting_alias, EpicsValue::Long(connecting as i32)),
db.put_pv_and_post(&n_disconnected, EpicsValue::Long(dead as i32)),
db.put_pv_and_post(&n_client_event_rate, EpicsValue::Double(event_rate)),
db.put_pv_and_post(
&n_client_event_count,
EpicsValue::Long(client_event_count as i32),
),
db.put_pv_and_post(
&n_post_event_count,
EpicsValue::Long(post_event_count as i32),
),
db.put_pv_and_post(&n_loop_count, EpicsValue::Long(loop_count as i32)),
);
if let Some(fd) = fd_count {
let _ = db.put_pv_and_post(&n_fd, EpicsValue::Long(fd as i32)).await;
}
}
pub async fn heartbeat_tick(&self, db: &PvDatabase) {
let n = self.heartbeat.fetch_add(1, Ordering::Relaxed) + 1;
if !self.prefix.is_empty() {
let _ = db
.put_pv_and_post(
&format!("{}heartbeat", self.prefix),
EpicsValue::Long(n as i32),
)
.await;
}
}
pub fn prefix(&self) -> &str {
&self.prefix
}
}
pub fn open_fd_count() -> Option<u64> {
for dir in ["/proc/self/fd", "/dev/fd"] {
match std::fs::read_dir(dir) {
Ok(entries) => {
let n = entries.filter(|e| e.is_ok()).count() as u64;
return Some(n.saturating_sub(1));
}
Err(_) => continue,
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn counters_increment() {
let stats = Stats::new("g:".into());
assert_eq!(stats.total_events.load(Ordering::Relaxed), 0);
stats.record_event();
stats.record_event();
assert_eq!(stats.total_events.load(Ordering::Relaxed), 2);
stats.record_put();
assert_eq!(stats.put_count.load(Ordering::Relaxed), 1);
stats.record_readonly_reject();
assert_eq!(stats.read_only_rejects.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn host_tracking() {
let stats = Stats::new("g:".into());
assert_eq!(stats.host_count().await, 0);
stats.record_host("host1").await;
stats.record_host("host2").await;
stats.record_host("host1").await; assert_eq!(stats.host_count().await, 2);
stats.forget_host("host1").await;
assert_eq!(stats.host_count().await, 1);
}
#[tokio::test]
async fn publish_initial_creates_pvs() {
let stats = Stats::new("g:".into());
let db = PvDatabase::new();
stats.publish_initial(&db).await;
assert!(db.has_name("g:totalPvs").await);
assert!(db.has_name("g:heartbeat").await);
assert!(db.has_name("g:eventRate").await);
}
#[tokio::test]
async fn empty_prefix_skips_publish() {
let stats = Stats::new("".into());
let db = PvDatabase::new();
stats.publish_initial(&db).await;
assert!(!db.has_name("totalPvs").await);
}
#[test]
fn rate_stats_counters_increment() {
let stats = Stats::new("g:".into());
assert_eq!(stats.post_event_count.load(Ordering::Relaxed), 0);
assert_eq!(stats.loop_count.load(Ordering::Relaxed), 0);
stats.record_post_event();
stats.record_post_event();
stats.record_post_event();
assert_eq!(stats.post_event_count.load(Ordering::Relaxed), 3);
stats.record_loop();
assert_eq!(stats.loop_count.load(Ordering::Relaxed), 1);
}
#[test]
fn open_fd_count_is_plausible() {
if let Some(n) = open_fd_count() {
assert!(n >= 3, "expected at least 3 open fds, got {n}");
}
}
#[test]
fn open_fd_count_tracks_new_descriptors() {
let before = match open_fd_count() {
Some(n) => n,
None => return, };
const BATCH: usize = 32;
let dir = std::env::temp_dir();
let mut _held = Vec::with_capacity(BATCH);
let mut paths = Vec::with_capacity(BATCH);
for i in 0..BATCH {
let p = dir.join(format!("ca_gw_stats_fd_probe_{}_{i}", std::process::id()));
_held.push(std::fs::File::create(&p).expect("create temp file"));
paths.push(p);
}
let during = open_fd_count().expect("fd count available");
assert!(
during >= before + (BATCH as u64) / 2,
"open fd count did not rise enough: before={before} during={during}"
);
drop(_held);
for p in paths {
let _ = std::fs::remove_file(p);
}
}
#[tokio::test]
async fn publish_initial_creates_rate_stats_pvs() {
let stats = Stats::new("g:".into());
let db = PvDatabase::new();
stats.publish_initial(&db).await;
assert!(db.has_name("g:fd").await);
assert!(db.has_name("g:clientEventCount").await);
assert!(db.has_name("g:postEventCount").await);
assert!(db.has_name("g:loopCount").await);
}
#[tokio::test]
async fn refresh_publishes_rate_stats() {
let stats = Stats::new("g:".into());
let db = PvDatabase::new();
stats.publish_initial(&db).await;
stats.record_event(); stats.record_event();
stats.record_post_event();
stats.record_loop();
stats.record_loop();
stats.record_loop();
let cache = RwLock::new(PvCache::new());
stats.refresh(&cache, &db, 0, 0).await;
assert_eq!(
db.get_pv("g:clientEventCount").await.unwrap(),
EpicsValue::Long(2)
);
assert_eq!(
db.get_pv("g:postEventCount").await.unwrap(),
EpicsValue::Long(1)
);
assert_eq!(db.get_pv("g:loopCount").await.unwrap(), EpicsValue::Long(3));
if let Ok(EpicsValue::Long(fd)) = db.get_pv("g:fd").await {
assert!(fd >= 0);
}
}
}