use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use epics_base_rs::server::recgbl::EventMask;
use epics_pva_rs::pvdata::PvStructure;
use super::provider::{AccessContext, PvaMonitor};
use super::pvif::{NtType, snapshot_to_pv_structure};
use crate::error::{BridgeError, BridgeResult};
pub struct BridgeMonitor {
db: Arc<PvDatabase>,
record_name: String,
field: String,
nt_type: NtType,
subscription: Option<DbSubscription>,
property_subscription: Option<DbSubscription>,
value_mask_override: Option<u16>,
filters: std::sync::Arc<epics_base_rs::server::database::filters::FilterChain>,
running: bool,
initial_snapshot: Option<PvStructure>,
overflow_count: Arc<AtomicU64>,
access: AccessContext,
}
impl BridgeMonitor {
pub fn new(db: Arc<PvDatabase>, record_name: String, field: String, nt_type: NtType) -> Self {
Self {
db,
record_name,
field,
nt_type,
subscription: None,
property_subscription: None,
value_mask_override: None,
filters: std::sync::Arc::new(
epics_base_rs::server::database::filters::FilterChain::new(),
),
running: false,
initial_snapshot: None,
overflow_count: Arc::new(AtomicU64::new(0)),
access: AccessContext::allow_all(),
}
}
pub fn with_filters(
mut self,
filters: std::sync::Arc<epics_base_rs::server::database::filters::FilterChain>,
) -> Self {
self.filters = filters;
self
}
pub fn with_access(mut self, access: AccessContext) -> Self {
self.access = access;
self
}
pub fn with_value_mask(mut self, mask: u16) -> Self {
self.value_mask_override = Some(mask);
self
}
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
}
impl PvaMonitor for BridgeMonitor {
async fn start(&mut self) -> BridgeResult<()> {
if self.running {
return Ok(());
}
if !self.access.can_read(&self.record_name) {
return Err(BridgeError::PutRejected(format!(
"monitor read denied for {} (user='{}' host='{}')",
self.record_name, self.access.user, self.access.host
)));
}
let pv_name = format!("{}.{}", self.record_name, self.field);
let value_mask = self
.value_mask_override
.unwrap_or_else(|| (EventMask::VALUE | EventMask::ALARM).bits());
let filters_opt = if self.filters.is_empty() {
None
} else {
Some(self.filters.as_ref())
};
let sub = DbSubscription::subscribe_with_mask_and_filters(
&self.db,
&pv_name,
0,
value_mask,
filters_opt,
)
.await
.ok_or_else(|| BridgeError::RecordNotFound(self.record_name.clone()))?;
let property_sub =
DbSubscription::subscribe_with_mask(&self.db, &pv_name, 0, EventMask::PROPERTY.bits())
.await
.ok_or_else(|| BridgeError::RecordNotFound(self.record_name.clone()))?;
self.subscription = Some(sub);
self.property_subscription = Some(property_sub);
self.running = true;
Ok(())
}
async fn poll(&mut self) -> Option<PvStructure> {
if let Some(initial) = self.initial_snapshot.take() {
return Some(initial);
}
match (
self.subscription.as_mut(),
self.property_subscription.as_mut(),
) {
(Some(value_sub), Some(prop_sub)) => {
let snapshot = tokio::select! {
snap = value_sub.recv_snapshot() => snap?,
snap = prop_sub.recv_snapshot() => snap?,
};
Some(snapshot_to_pv_structure(&snapshot, self.nt_type))
}
(Some(value_sub), None) => {
let snapshot = value_sub.recv_snapshot().await?;
Some(snapshot_to_pv_structure(&snapshot, self.nt_type))
}
_ => None,
}
}
async fn stop(&mut self) {
self.subscription = None;
self.property_subscription = None;
self.running = false;
self.initial_snapshot = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use epics_base_rs::server::records::ai::AiRecord;
use std::time::Duration;
#[tokio::test]
async fn monitor_stop_releases_subscription() {
let db = Arc::new(PvDatabase::new());
db.add_record("MON_LIFECYCLE", Box::new(AiRecord::new(1.0)))
.await
.unwrap();
let mut mon = BridgeMonitor::new(
db.clone(),
"MON_LIFECYCLE".into(),
"VAL".into(),
NtType::Scalar,
);
mon.start().await.expect("start ok");
assert!(mon.running);
let polled = tokio::time::timeout(Duration::from_millis(100), mon.poll()).await;
assert!(
polled.is_err(),
"poll() should time out without a fresh update"
);
mon.stop().await;
assert!(!mon.running);
assert!(mon.subscription.is_none());
assert!(mon.property_subscription.is_none());
mon.stop().await;
assert!(!mon.running);
let mut mon2 = BridgeMonitor::new(
db.clone(),
"MON_LIFECYCLE".into(),
"VAL".into(),
NtType::Scalar,
);
mon2.start().await.expect("re-subscribe ok");
assert!(mon2.running);
mon2.stop().await;
}
#[tokio::test]
async fn monitor_property_event_wakes_poll() {
let db = Arc::new(PvDatabase::new());
db.add_record("MON_PROPERTY", Box::new(AiRecord::new(1.0)))
.await
.unwrap();
let mut mon = BridgeMonitor::new(
db.clone(),
"MON_PROPERTY".into(),
"VAL".into(),
NtType::Scalar,
);
mon.start().await.expect("start ok");
{
let rec = db.get_record("MON_PROPERTY").await.expect("rec exists");
let instance = rec.read().await;
instance.notify_field("VAL", EventMask::PROPERTY);
}
let polled = tokio::time::timeout(Duration::from_millis(500), mon.poll()).await;
let snap = polled
.expect("PROPERTY event must wake poll within 500ms")
.expect("snapshot delivered");
assert!(
!snap.fields.is_empty(),
"PROPERTY-event snapshot must carry the full NT structure"
);
mon.stop().await;
}
}