use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use crate::runtime::sync::{Mutex, RwLock, mpsc};
use crate::error::CaError;
use crate::server::snapshot::{ControlInfo, DisplayInfo, EnumInfo, Snapshot};
use crate::types::{DbFieldType, EpicsValue, WallTime};
fn per_channel_event_depth() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_EVENTS_PER_CHAN")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(64)
.max(4)
}
pub(crate) fn max_subscribers_per_pv() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_SUBSCRIBERS_PER_PV")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1024)
.max(8)
}
static DROPPED_MONITOR_EVENTS: AtomicU64 = AtomicU64::new(0);
pub fn dropped_monitor_events() -> u64 {
DROPPED_MONITOR_EVENTS.load(Ordering::Relaxed)
}
fn record_dropped_monitor() {
DROPPED_MONITOR_EVENTS.fetch_add(1, Ordering::Relaxed);
}
#[derive(Debug, Clone, Default)]
pub struct WriteContext {
pub user: String,
pub host: String,
pub peer: String,
}
pub type WriteHook = Arc<
dyn Fn(
EpicsValue,
WriteContext,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), CaError>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone, Copy)]
pub struct AccessDecision {
pub read: bool,
pub write: bool,
}
pub type AccessHook = Arc<dyn Fn(&str, &str) -> AccessDecision + Send + Sync>;
pub type ReadHook = Arc<
dyn Fn()
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Snapshot, CaError>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone)]
pub struct MonitorEvent {
pub snapshot: Snapshot,
pub origin: u64,
pub mask: crate::server::recgbl::EventMask,
}
pub struct Subscriber {
pub sid: u32,
pub data_type: DbFieldType,
pub mask: u16,
pub tx: mpsc::Sender<MonitorEvent>,
pub coalesced: Arc<StdMutex<Option<MonitorEvent>>>,
pub filters: crate::server::database::filters::FilterChain,
pub active: bool,
}
impl Subscriber {
fn accepts(&self, post: crate::server::recgbl::EventMask) -> bool {
post.is_empty() || crate::server::recgbl::EventMask::from_bits(self.mask).intersects(post)
}
pub(crate) fn coalesce_overflow(&self, mut event: MonitorEvent) {
if let Ok(mut slot) = self.coalesced.lock() {
if let Some(prior) = slot.as_ref() {
record_dropped_monitor();
event.mask |= prior.mask;
}
*slot = Some(event);
}
}
}
pub fn coalesce_consume(
rx: &mut mpsc::Receiver<MonitorEvent>,
queued: MonitorEvent,
coalesced: Option<MonitorEvent>,
) -> MonitorEvent {
let Some(mut newest) = coalesced else {
return queued;
};
record_dropped_monitor();
newest.mask |= queued.mask;
while let Ok(stale) = rx.try_recv() {
record_dropped_monitor();
newest.mask |= stale.mask;
}
newest
}
#[derive(Debug, Clone, Default)]
pub struct PvMetadata {
pub display: Option<DisplayInfo>,
pub control: Option<ControlInfo>,
pub enums: Option<EnumInfo>,
}
#[derive(Clone)]
struct PostedMeta {
alarm: crate::server::snapshot::AlarmInfo,
timestamp: WallTime,
user_tag: i32,
}
pub struct ProcessVariable {
pub name: String,
pub value: RwLock<EpicsValue>,
pub subscribers: Mutex<Vec<Subscriber>>,
posted_meta: parking_lot::RwLock<Option<PostedMeta>>,
metadata: parking_lot::RwLock<PvMetadata>,
write_hook: parking_lot::RwLock<Option<WriteHook>>,
access_hook: parking_lot::RwLock<Option<AccessHook>>,
read_hook: parking_lot::RwLock<Option<ReadHook>>,
}
impl ProcessVariable {
pub fn new(name: String, initial: EpicsValue) -> Self {
Self {
name,
value: RwLock::new(initial),
subscribers: Mutex::new(Vec::new()),
metadata: parking_lot::RwLock::new(PvMetadata::default()),
posted_meta: parking_lot::RwLock::new(None),
write_hook: parking_lot::RwLock::new(None),
access_hook: parking_lot::RwLock::new(None),
read_hook: parking_lot::RwLock::new(None),
}
}
pub fn set_metadata(&self, metadata: PvMetadata) {
*self.metadata.write() = metadata;
}
pub fn metadata(&self) -> PvMetadata {
self.metadata.read().clone()
}
fn apply_metadata(&self, snap: &mut Snapshot) {
let meta = self.metadata.read();
if snap.display.is_none() {
snap.display = meta.display.clone();
}
if snap.control.is_none() {
snap.control = meta.control.clone();
}
if snap.enums.is_none() {
snap.enums = meta.enums.clone();
}
}
pub fn set_access_hook(&self, hook: AccessHook) {
*self.access_hook.write() = Some(hook);
}
pub fn access_hook(&self) -> Option<AccessHook> {
self.access_hook.read().clone()
}
pub fn set_read_hook(&self, hook: ReadHook) {
*self.read_hook.write() = Some(hook);
}
pub fn read_hook(&self) -> Option<ReadHook> {
self.read_hook.read().clone()
}
pub fn set_write_hook(&self, hook: WriteHook) {
*self.write_hook.write() = Some(hook);
}
pub fn clear_write_hook(&self) {
*self.write_hook.write() = None;
}
pub fn write_hook(&self) -> Option<WriteHook> {
self.write_hook.read().clone()
}
pub async fn get(&self) -> EpicsValue {
self.value.read().await.clone()
}
pub async fn snapshot(&self) -> Snapshot {
let value = self.value.read().await.clone();
let mut snap = match self.posted_meta.read().clone() {
Some(m) => {
let mut s = Snapshot::new(value, m.alarm.status, m.alarm.severity, m.timestamp);
s.alarm.ackt = m.alarm.ackt;
s.alarm.acks = m.alarm.acks;
s.user_tag = m.user_tag;
s
}
None => Snapshot::new(value, 0, 0, crate::runtime::time::now_wall()),
};
self.apply_metadata(&mut snap);
snap
}
pub async fn read_snapshot(&self) -> Result<Snapshot, CaError> {
match self.read_hook() {
Some(hook) => {
let mut snap = hook().await?;
self.apply_metadata(&mut snap);
Ok(snap)
}
None => Ok(self.snapshot().await),
}
}
pub async fn set(&self, new_value: EpicsValue) {
{
let mut val = self.value.write().await;
*val = new_value.clone();
}
*self.posted_meta.write() = None;
self.notify_subscribers(new_value).await;
}
pub async fn set_snapshot(&self, snapshot: Snapshot) {
{
let mut val = self.value.write().await;
*val = snapshot.value.clone();
}
*self.posted_meta.write() = Some(PostedMeta {
alarm: snapshot.alarm.clone(),
timestamp: snapshot.timestamp,
user_tag: snapshot.user_tag,
});
self.notify_subscribers_from_snapshot(snapshot).await;
}
async fn deliver(&self, post: crate::server::recgbl::EventMask, snapshot: Snapshot) {
use crate::server::database::filters::FilteredMonitorEvent;
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
for sub in subs.iter() {
if !sub.active {
continue;
}
if !sub.accepts(post) {
continue;
}
let event = MonitorEvent {
snapshot: snapshot.clone(),
origin: 0,
mask: post,
};
let filtered = if sub.filters.is_empty() {
Some(event)
} else {
sub.filters
.apply(FilteredMonitorEvent::new(event))
.map(|fe| fe.event)
};
let Some(event) = filtered else {
continue;
};
if sub.tx.try_send(event.clone()).is_err() {
sub.coalesce_overflow(event);
}
}
}
pub async fn post_alarm(&self, severity: u16, status: u16) {
use crate::server::recgbl::EventMask;
let value = self.value.read().await.clone();
let mut snapshot = Snapshot::new(value, status, severity, crate::runtime::time::now_wall());
self.apply_metadata(&mut snapshot);
self.deliver(EventMask::ALARM | EventMask::LOG, snapshot)
.await;
}
pub async fn post_property(&self, mut snapshot: Snapshot) {
use crate::server::recgbl::EventMask;
self.apply_metadata(&mut snapshot);
self.deliver(EventMask::PROPERTY, snapshot).await;
}
async fn notify_subscribers(&self, value: EpicsValue) {
use crate::server::recgbl::EventMask;
let mut snapshot = Snapshot::new(value, 0, 0, crate::runtime::time::now_wall());
self.apply_metadata(&mut snapshot);
self.deliver(EventMask::VALUE | EventMask::LOG, snapshot)
.await;
}
async fn notify_subscribers_from_snapshot(&self, mut snapshot: Snapshot) {
use crate::server::recgbl::EventMask;
self.apply_metadata(&mut snapshot);
self.deliver(
EventMask::VALUE | EventMask::LOG | EventMask::ALARM,
snapshot,
)
.await;
}
pub async fn add_subscriber(
&self,
sid: u32,
data_type: DbFieldType,
mask: u16,
) -> Option<mpsc::Receiver<MonitorEvent>> {
let cap = max_subscribers_per_pv();
let (tx, rx) = mpsc::channel(per_channel_event_depth());
let sub = Subscriber {
sid,
data_type,
mask,
tx,
coalesced: Arc::new(StdMutex::new(None)),
filters: crate::server::database::filters::FilterChain::new(),
active: true,
};
let mut subs = self.subscribers.lock().await;
subs.retain(|s| !s.tx.is_closed());
if subs.len() >= cap {
tracing::warn!(
pv = %self.name,
live = subs.len(),
cap,
"PV subscriber cap reached, refusing add_subscriber"
);
return None;
}
subs.push(sub);
Some(rx)
}
pub async fn attach_filters_to_subscriber(
&self,
sid: u32,
filters: crate::server::database::filters::FilterChain,
) {
if filters.is_empty() {
return;
}
let mut subs = self.subscribers.lock().await;
if let Some(sub) = subs.iter_mut().find(|s| s.sid == sid) {
sub.filters = filters;
}
}
pub async fn remove_subscriber(&self, sid: u32) {
let mut subs = self.subscribers.lock().await;
subs.retain(|s| s.sid != sid);
}
pub async fn pop_coalesced(&self, sid: u32) -> Option<MonitorEvent> {
let subs = self.subscribers.lock().await;
let sub = subs.iter().find(|s| s.sid == sid)?;
sub.coalesced.lock().ok()?.take()
}
}
static NEXT_PV_SUB_SID: AtomicU32 = AtomicU32::new(1_000_000);
fn next_pv_sub_sid() -> u32 {
NEXT_PV_SUB_SID.fetch_add(1, Ordering::Relaxed)
}
pub struct PvSubscription {
rx: mpsc::Receiver<MonitorEvent>,
pv: Arc<ProcessVariable>,
sid: u32,
}
impl PvSubscription {
pub async fn subscribe(pv: Arc<ProcessVariable>) -> Option<Self> {
use crate::server::recgbl::EventMask;
let mask = (EventMask::VALUE | EventMask::LOG).bits();
let sid = next_pv_sub_sid();
let rx = pv.add_subscriber(sid, DbFieldType::Double, mask).await?;
Some(Self { rx, pv, sid })
}
pub async fn recv_snapshot(&mut self) -> Option<Snapshot> {
let queued = self.rx.recv().await?;
let coalesced = self.pv.pop_coalesced(self.sid).await;
let event = coalesce_consume(&mut self.rx, queued, coalesced);
Some(event.snapshot)
}
}
impl Drop for PvSubscription {
fn drop(&mut self) {
let pv = self.pv.clone();
let sid = self.sid;
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(async move {
pv.remove_subscriber(sid).await;
});
}
}
}
#[cfg(test)]
mod mask_gate_tests {
use super::*;
const DBE_VALUE: u16 = 1;
const DBE_LOG: u16 = 2;
const DBE_ALARM: u16 = 4;
fn pv() -> ProcessVariable {
ProcessVariable::new("test:pv".into(), EpicsValue::Double(0.0))
}
#[tokio::test]
async fn set_snapshot_metadata_persists_then_value_set_clears() {
let pv = pv();
let posted_time = WallTime::from_unix(1_600_000_000, 42);
let mut snap = Snapshot::new(EpicsValue::Double(7.0), 3, 2, posted_time);
snap.user_tag = 9;
pv.set_snapshot(snap).await;
let got = pv.snapshot().await;
assert_eq!(got.value, EpicsValue::Double(7.0), "value persisted");
assert_eq!(got.alarm.status, 3, "alarm.status persisted to GET");
assert_eq!(got.alarm.severity, 2, "alarm.severity persisted to GET");
assert_eq!(got.user_tag, 9, "userTag persisted to GET");
assert_eq!(got.timestamp, posted_time, "timestamp persisted to GET");
pv.set(EpicsValue::Double(8.0)).await;
let after = pv.snapshot().await;
assert_eq!(after.value, EpicsValue::Double(8.0));
assert_eq!(after.alarm.status, 0, "value set clears posted alarm");
assert_eq!(after.alarm.severity, 0, "value set clears posted severity");
assert_eq!(after.user_tag, 0, "value set clears posted userTag");
assert_ne!(
after.timestamp, posted_time,
"value set must restamp the timestamp, not keep the posted one"
);
}
#[tokio::test]
async fn alarm_only_subscriber_skips_value_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_ALARM)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_err(),
"DBE_ALARM-only subscriber must not receive a value post"
);
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_ok(),
"DBE_ALARM subscriber must receive an alarm post"
);
}
#[tokio::test]
async fn value_only_subscriber_skips_alarm_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_err(),
"DBE_VALUE-only subscriber must not receive an alarm post"
);
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_ok(),
"DBE_VALUE subscriber must receive a value post"
);
}
fn snapshot() -> Snapshot {
Snapshot::new(
EpicsValue::Double(2.0),
0,
0,
std::time::SystemTime::UNIX_EPOCH,
)
}
#[tokio::test]
async fn log_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_LOG)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn alarm_only_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_ALARM)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_ALARM-only subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn value_subscriber_receives_snapshot_post() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.set_snapshot(snapshot()).await;
assert!(
rx.try_recv().is_ok(),
"DBE_VALUE subscriber must receive a set_snapshot post"
);
}
#[tokio::test]
async fn both_classes_receive_both_posts() {
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE | DBE_ALARM)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(rx.try_recv().is_ok(), "value post delivered to VALUE|ALARM");
pv.post_alarm(2, 3).await;
assert!(rx.try_recv().is_ok(), "alarm post delivered to VALUE|ALARM");
}
#[tokio::test]
async fn br_r52_log_subscriber_receives_value_and_alarm_events() {
const DBE_LOG: u16 = 2;
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_LOG)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive a value post"
);
pv.post_alarm(2, 3).await;
assert!(
rx.try_recv().is_ok(),
"DBE_LOG subscriber must receive an alarm post"
);
}
#[tokio::test]
async fn monitor_event_carries_post_class_mask() {
use crate::server::recgbl::EventMask;
let pv = pv();
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE | DBE_LOG | DBE_ALARM)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(1.0)).await;
assert_eq!(
rx.try_recv().expect("value event").mask,
EventMask::VALUE | EventMask::LOG,
"value post carries VALUE|LOG"
);
pv.post_alarm(2, 3).await;
assert_eq!(
rx.try_recv().expect("alarm event").mask,
EventMask::ALARM | EventMask::LOG,
"alarm post carries ALARM|LOG"
);
}
#[tokio::test]
async fn coalescing_accumulates_event_class_masks() {
use crate::server::recgbl::EventMask;
let pv = Arc::new(ProcessVariable::new(
"coalesce:mask".into(),
EpicsValue::Double(0.0),
));
let mut rx = pv
.add_subscriber(7, DbFieldType::Double, DBE_VALUE | DBE_LOG | DBE_ALARM)
.await
.expect("subscriber added");
for i in 1..=64u32 {
pv.set(EpicsValue::Double(i as f64)).await;
}
pv.post_alarm(2, 3).await;
pv.set(EpicsValue::Double(99.0)).await;
let queued = rx.recv().await.expect("queued event");
let coalesced = pv.pop_coalesced(7).await;
let delivered = coalesce_consume(&mut rx, queued, coalesced);
assert_eq!(
delivered.snapshot.value.to_f64(),
Some(99.0),
"delivery converges on the newest value"
);
assert!(
delivered
.mask
.contains(EventMask::VALUE | EventMask::ALARM | EventMask::LOG),
"squashed alarm class survives in the delivered mask (got {:?})",
delivered.mask
);
}
#[tokio::test]
async fn r0604_pv_overflow_never_delivers_newest_then_old() {
use std::time::Duration;
let pv = Arc::new(ProcessVariable::new(
"coalesce:pv".into(),
EpicsValue::Double(0.0),
));
let mut sub = PvSubscription::subscribe(pv.clone())
.await
.expect("subscribe");
for i in 1..=80u32 {
pv.set(EpicsValue::Double(i as f64)).await;
}
let mut seq = Vec::new();
while let Ok(Some(snap)) =
tokio::time::timeout(Duration::from_millis(200), sub.recv_snapshot()).await
{
seq.push(snap.value.to_f64().expect("double value"));
}
assert!(!seq.is_empty(), "consumer must observe at least one value");
for w in seq.windows(2) {
assert!(
w[0] <= w[1],
"monitor delivery stepped backward {} -> {} (sequence {seq:?})",
w[0],
w[1],
);
}
assert_eq!(
*seq.last().unwrap(),
80.0,
"consumer must converge on the newest produced value (sequence {seq:?})"
);
}
}
#[cfg(test)]
mod metadata_tests {
use super::*;
fn meta() -> PvMetadata {
PvMetadata {
display: Some(DisplayInfo {
units: "degC".into(),
precision: 2,
upper_disp_limit: 100.0,
lower_disp_limit: -50.0,
upper_alarm_limit: 90.0,
upper_warning_limit: 80.0,
lower_warning_limit: -20.0,
lower_alarm_limit: -40.0,
..Default::default()
}),
control: Some(ControlInfo {
upper_ctrl_limit: 95.0,
lower_ctrl_limit: -45.0,
}),
enums: None,
}
}
fn pv() -> ProcessVariable {
ProcessVariable::new("m:pv".into(), EpicsValue::Double(1.0))
}
#[tokio::test]
async fn set_metadata_serves_on_get_snapshot() {
let pv = pv();
assert!(
pv.snapshot().await.display.is_none(),
"bare PV must carry no metadata before install"
);
pv.set_metadata(meta());
let snap = pv.snapshot().await;
let d = snap.display.expect("display installed");
assert_eq!(d.units, "degC");
assert_eq!(d.precision, 2);
assert_eq!(
snap.control.expect("control installed").upper_ctrl_limit,
95.0
);
}
#[tokio::test]
async fn installed_metadata_rides_value_posts() {
const DBE_VALUE: u16 = 1;
let pv = pv();
pv.set_metadata(meta());
let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.set(EpicsValue::Double(2.0)).await;
let ev = rx.try_recv().expect("value event delivered");
assert_eq!(
ev.snapshot.display.expect("metadata on value post").units,
"degC"
);
}
#[tokio::test]
async fn apply_metadata_does_not_clobber_caller_metadata() {
const DBE_VALUE: u16 = 1;
let pv = pv();
pv.set_metadata(meta()); let mut rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
let mut snap = Snapshot::new(
EpicsValue::Double(3.0),
0,
0,
std::time::SystemTime::UNIX_EPOCH,
);
snap.display = Some(DisplayInfo {
units: "volts".into(),
..Default::default()
});
pv.set_snapshot(snap).await;
let ev = rx.try_recv().expect("snapshot delivered");
assert_eq!(
ev.snapshot.display.expect("caller display kept").units,
"volts"
);
}
#[tokio::test]
async fn post_property_reaches_only_property_subscribers() {
const DBE_VALUE: u16 = 1;
const DBE_PROPERTY: u16 = 8;
let pv = pv();
pv.set_metadata(meta());
let mut prop_rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_PROPERTY)
.await
.expect("subscriber added");
let mut val_rx = pv
.add_subscriber(2, DbFieldType::Double, DBE_VALUE)
.await
.expect("subscriber added");
pv.post_property(Snapshot::new(
EpicsValue::Double(1.0),
0,
0,
std::time::SystemTime::UNIX_EPOCH,
))
.await;
let ev = prop_rx
.try_recv()
.expect("DBE_PROPERTY subscriber receives property post");
assert_eq!(
ev.snapshot
.display
.expect("property post carries metadata")
.units,
"degC"
);
assert!(
val_rx.try_recv().is_err(),
"DBE_VALUE-only subscriber must not receive a property post"
);
}
#[tokio::test]
async fn post_property_preserves_upstream_alarm_and_timestamp() {
const DBE_PROPERTY: u16 = 8;
const MAJOR: u16 = 2; const HIGH: u16 = 3; let pv = pv();
pv.set_metadata(meta());
let mut prop_rx = pv
.add_subscriber(1, DbFieldType::Double, DBE_PROPERTY)
.await
.expect("subscriber added");
let upstream_ts = WallTime::from_unix(1_000_000, 0);
pv.post_property(Snapshot::new(
EpicsValue::Double(2.0),
HIGH,
MAJOR,
upstream_ts,
))
.await;
let ev = prop_rx.try_recv().expect("property post delivered");
assert_eq!(
ev.snapshot.alarm.severity, MAJOR,
"property post must carry the upstream MAJOR severity, not NO_ALARM"
);
assert_eq!(ev.snapshot.alarm.status, HIGH, "upstream status preserved");
assert_eq!(
ev.snapshot.timestamp, upstream_ts,
"property post must keep the upstream timestamp, not a fresh wall clock"
);
assert_eq!(
ev.snapshot
.display
.expect("property post carries shadow metadata")
.units,
"degC"
);
}
}
#[cfg(test)]
mod read_hook_tests {
use super::*;
fn pv() -> ProcessVariable {
ProcessVariable::new("g:pv".into(), EpicsValue::Double(1.0))
}
#[tokio::test]
async fn read_snapshot_without_hook_equals_snapshot() {
let pv = pv();
let read = pv.read_snapshot().await.expect("no-hook read never errors");
let stored = pv.snapshot().await;
assert_eq!(read.value, stored.value);
assert_eq!(read.value, EpicsValue::Double(1.0));
}
#[tokio::test]
async fn read_snapshot_fires_hook_for_fresh_value() {
let pv = pv();
pv.set(EpicsValue::Double(999.0)).await;
pv.set_read_hook(Arc::new(|| {
Box::pin(async {
Ok(Snapshot::new(
EpicsValue::Double(42.0),
0,
0,
std::time::UNIX_EPOCH,
))
})
}));
let read = pv.read_snapshot().await.expect("hook returns Ok");
assert_eq!(
read.value,
EpicsValue::Double(42.0),
"GET must serve the hook's fresh value, not the stored sentinel"
);
}
#[tokio::test]
async fn read_snapshot_propagates_hook_error() {
let pv = pv();
pv.set_read_hook(Arc::new(|| Box::pin(async { Err(CaError::Disconnected) })));
let err = pv.read_snapshot().await.expect_err("hook error propagates");
assert!(matches!(err, CaError::Disconnected));
}
#[tokio::test]
async fn snapshot_ignores_read_hook() {
let pv = pv();
pv.set(EpicsValue::Double(7.0)).await;
pv.set_read_hook(Arc::new(|| {
Box::pin(async {
Ok(Snapshot::new(
EpicsValue::Double(42.0),
0,
0,
std::time::UNIX_EPOCH,
))
})
}));
let snap = pv.snapshot().await;
assert_eq!(
snap.value,
EpicsValue::Double(7.0),
"snapshot must serve the stored value, never the read hook"
);
}
#[tokio::test]
async fn read_snapshot_carries_shadow_metadata() {
let pv = pv();
pv.set_metadata(PvMetadata {
display: Some(DisplayInfo {
units: "mm".into(),
precision: 3,
..Default::default()
}),
control: None,
enums: None,
});
pv.set_read_hook(Arc::new(|| {
Box::pin(async {
Ok(Snapshot::new(
EpicsValue::Double(5.0),
0,
0,
std::time::UNIX_EPOCH,
))
})
}));
let read = pv.read_snapshot().await.expect("hook returns Ok");
assert_eq!(read.value, EpicsValue::Double(5.0));
assert_eq!(
read.display
.expect("shadow property metadata rides fresh value")
.units,
"mm"
);
}
#[tokio::test]
async fn read_snapshot_carries_upstream_alarm_not_shadow() {
use std::time::{Duration, UNIX_EPOCH};
let pv = pv();
let shadow_time = UNIX_EPOCH + Duration::from_secs(1_000);
pv.set_snapshot(Snapshot::new(EpicsValue::Double(1.0), 7, 1, shadow_time))
.await;
let upstream_time = WallTime::from_unix(2_000, 0);
pv.set_read_hook(Arc::new(move || {
Box::pin(
async move { Ok(Snapshot::new(EpicsValue::Double(5.0), 17, 2, upstream_time)) },
)
}));
let read = pv.read_snapshot().await.expect("hook returns Ok");
assert_eq!(read.value, EpicsValue::Double(5.0), "fresh upstream value");
assert_eq!(
read.alarm.status, 17,
"upstream alarm status, not shadow's 7"
);
assert_eq!(read.alarm.severity, 2, "upstream severity, not shadow's 1");
assert_eq!(
read.timestamp, upstream_time,
"upstream timestamp, not shadow's"
);
}
}