use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::runtime::sync::{Mutex, RwLock, mpsc};
use crate::error::CaError;
use crate::server::snapshot::Snapshot;
use crate::types::{DbFieldType, EpicsValue};
fn per_channel_event_depth() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_EVENTS_PER_CHAN")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(64)
.max(4)
}
pub(crate) fn max_subscribers_per_pv() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_SUBSCRIBERS_PER_PV")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1024)
.max(8)
}
static DROPPED_MONITOR_EVENTS: AtomicU64 = AtomicU64::new(0);
pub fn dropped_monitor_events() -> u64 {
DROPPED_MONITOR_EVENTS.load(Ordering::Relaxed)
}
fn record_dropped_monitor() {
DROPPED_MONITOR_EVENTS.fetch_add(1, Ordering::Relaxed);
}
#[derive(Debug, Clone, Default)]
pub struct WriteContext {
pub user: String,
pub host: String,
pub peer: String,
}
pub type WriteHook = Arc<
dyn Fn(
EpicsValue,
WriteContext,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), CaError>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone, Copy)]
pub struct AccessDecision {
pub read: bool,
pub write: bool,
}
pub type AccessHook = Arc<dyn Fn(&str, &str) -> AccessDecision + Send + Sync>;
#[derive(Debug, Clone)]
pub struct MonitorEvent {
pub snapshot: Snapshot,
pub origin: u64,
}
pub struct Subscriber {
pub sid: u32,
pub data_type: DbFieldType,
pub mask: u16,
pub tx: mpsc::Sender<MonitorEvent>,
pub coalesced: Arc<StdMutex<Option<MonitorEvent>>>,
pub filters: crate::server::database::filters::FilterChain,
}
impl Subscriber {
fn accepts(&self, post: crate::server::recgbl::EventMask) -> bool {
post.is_empty() || crate::server::recgbl::EventMask::from_bits(self.mask).intersects(post)
}
pub(crate) fn coalesce_overflow(&self, event: MonitorEvent) {
if let Ok(mut slot) = self.coalesced.lock() {
if slot.is_some() {
record_dropped_monitor();
}
*slot = Some(event);
}
}
}
pub struct ProcessVariable {
pub name: String,
pub value: RwLock<EpicsValue>,
pub subscribers: Mutex<Vec<Subscriber>>,
write_hook: parking_lot::RwLock<Option<WriteHook>>,
access_hook: parking_lot::RwLock<Option<AccessHook>>,
}
impl ProcessVariable {
pub fn new(name: String, initial: EpicsValue) -> Self {
Self {
name,
value: RwLock::new(initial),
subscribers: Mutex::new(Vec::new()),
write_hook: parking_lot::RwLock::new(None),
access_hook: parking_lot::RwLock::new(None),
}
}
pub fn set_access_hook(&self, hook: AccessHook) {
*self.access_hook.write() = Some(hook);
}
pub fn access_hook(&self) -> Option<AccessHook> {
self.access_hook.read().clone()
}
pub fn set_write_hook(&self, hook: WriteHook) {
*self.write_hook.write() = Some(hook);
}
pub fn clear_write_hook(&self) {
*self.write_hook.write() = None;
}
pub fn write_hook(&self) -> Option<WriteHook> {
self.write_hook.read().clone()
}
pub async fn get(&self) -> EpicsValue {
self.value.read().await.clone()
}
pub async fn snapshot(&self) -> Snapshot {
let value = self.value.read().await.clone();
Snapshot::new(value, 0, 0, crate::runtime::time::now_wall())
}
pub async fn set(&self, new_value: EpicsValue) {
{
let mut val = self.value.write().await;
*val = new_value.clone();
}
self.notify_subscribers(new_value).await;
}
pub async fn set_snapshot(&self, snapshot: Snapshot) {
{
let mut val = self.value.write().await;
*val = snapshot.value.clone();
}
self.notify_subscribers_from_snapshot(snapshot).await;
}
pub async fn post_alarm(&self, severity: u16, status: u16) {
use crate::server::database::filters::FilteredMonitorEvent;
use crate::server::recgbl::EventMask;
let value = self.value.read().await.clone();
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
let post = EventMask::ALARM | EventMask::LOG;
for sub in subs.iter() {
if !sub.accepts(post) {
continue;
}
let snapshot = Snapshot::new(
value.clone(),
status,
severity,
crate::runtime::time::now_wall(),
);
let event = MonitorEvent {
snapshot,
origin: 0,
};
let filtered = if sub.filters.is_empty() {
Some(event)
} else {
sub.filters
.apply(FilteredMonitorEvent::new(event, post))
.map(|fe| fe.event)
};
let Some(event) = filtered else {
continue;
};
if sub.tx.try_send(event.clone()).is_err() {
sub.coalesce_overflow(event);
}
}
}
async fn notify_subscribers(&self, value: EpicsValue) {
use crate::server::database::filters::FilteredMonitorEvent;
use crate::server::recgbl::EventMask;
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
let post = EventMask::VALUE | EventMask::LOG;
for sub in subs.iter() {
if !sub.accepts(post) {
continue;
}
let snapshot = Snapshot::new(value.clone(), 0, 0, crate::runtime::time::now_wall());
let event = MonitorEvent {
snapshot,
origin: 0,
};
let filtered = if sub.filters.is_empty() {
Some(event)
} else {
sub.filters
.apply(FilteredMonitorEvent::new(event, post))
.map(|fe| fe.event)
};
let Some(event) = filtered else {
continue;
};
if sub.tx.try_send(event.clone()).is_err() {
sub.coalesce_overflow(event);
}
}
}
async fn notify_subscribers_from_snapshot(&self, snapshot: Snapshot) {
use crate::server::database::filters::FilteredMonitorEvent;
use crate::server::recgbl::EventMask;
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
let post = EventMask::VALUE | EventMask::LOG | EventMask::ALARM;
for sub in subs.iter() {
if !sub.accepts(post) {
continue;
}
let event = MonitorEvent {
snapshot: snapshot.clone(),
origin: 0,
};
let filtered = if sub.filters.is_empty() {
Some(event)
} else {
sub.filters
.apply(FilteredMonitorEvent::new(event, post))
.map(|fe| fe.event)
};
let Some(event) = filtered else {
continue;
};
if sub.tx.try_send(event.clone()).is_err() {
sub.coalesce_overflow(event);
}
}
}
pub async fn add_subscriber(
&self,
sid: u32,
data_type: DbFieldType,
mask: u16,
) -> Option<mpsc::Receiver<MonitorEvent>> {
let cap = max_subscribers_per_pv();
let (tx, rx) = mpsc::channel(per_channel_event_depth());
let sub = Subscriber {
sid,
data_type,
mask,
tx,
coalesced: Arc::new(StdMutex::new(None)),
filters: crate::server::database::filters::FilterChain::new(),
};
let mut subs = self.subscribers.lock().await;
subs.retain(|s| !s.tx.is_closed());
if subs.len() >= cap {
tracing::warn!(
pv = %self.name,
live = subs.len(),
cap,
"PV subscriber cap reached, refusing add_subscriber"
);
return None;
}
subs.push(sub);
Some(rx)
}
pub async fn attach_filters_to_subscriber(
&self,
sid: u32,
filters: crate::server::database::filters::FilterChain,
) {
if filters.is_empty() {
return;
}
let mut subs = self.subscribers.lock().await;
if let Some(sub) = subs.iter_mut().find(|s| s.sid == sid) {
sub.filters = filters;
}
}
pub async fn remove_subscriber(&self, sid: u32) {
let mut subs = self.subscribers.lock().await;
subs.retain(|s| s.sid != sid);
}
pub async fn pop_coalesced(&self, sid: u32) -> Option<MonitorEvent> {
let subs = self.subscribers.lock().await;
let sub = subs.iter().find(|s| s.sid == sid)?;
sub.coalesced.lock().ok()?.take()
}
}
#[cfg(test)]
mod mask_gate_tests {
use super::*;
const DBE_VALUE: u16 = 1;
const DBE_LOG: u16 = 2;
const DBE_ALARM: u16 = 4;
fn pv() -> ProcessVariable {
ProcessVariable::new("test:pv".into(), EpicsValue::Double(0.0))
}
#[tokio::test]
async fn alarm_only_subscriber_skips_value_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_ALARM)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_err(),
"DBE_ALARM-only subscriber must not receive a value post"
);
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_ok(),
"DBE_ALARM subscriber must receive an alarm post"
);
}
#[tokio::test]
async fn value_only_subscriber_skips_alarm_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_err(),
"DBE_VALUE-only subscriber must not receive an alarm post"
);
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_ok(),
"DBE_VALUE subscriber must receive a value post"
);
}
fn snapshot() -> Snapshot {
Snapshot::new(
EpicsValue::Double(2.0),
0,
0,
std::time::SystemTime::UNIX_EPOCH,
)
}
#[tokio::test]
async fn log_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_LOG)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn alarm_only_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_ALARM)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_ALARM-only subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn value_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_VALUE subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn both_classes_receive_both_posts() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE | DBE_ALARM)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(rx.try_recv().is_ok(), "value post delivered to VALUE|ALARM");
pv.post_alarm(2, 3).await;
assert!(rx.try_recv().is_ok(), "alarm post delivered to VALUE|ALARM");
}
#[tokio::test]
async fn br_r52_log_subscriber_receives_value_and_alarm_events() {
const DBE_LOG: u16 = 2;
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_LOG)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive a value post"
);
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive an alarm post"
);
}
}