use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use epics_pva_rs::pvdata::PvStructure;
use super::provider::{AccessContext, PvaMonitor};
use super::pvif::{NtType, snapshot_to_pv_structure};
use crate::error::{BridgeError, BridgeResult};
pub struct BridgeMonitor {
db: Arc<PvDatabase>,
record_name: String,
nt_type: NtType,
subscription: Option<DbSubscription>,
running: bool,
initial_snapshot: Option<PvStructure>,
overflow_count: Arc<AtomicU64>,
access: AccessContext,
}
impl BridgeMonitor {
pub fn new(db: Arc<PvDatabase>, record_name: String, nt_type: NtType) -> Self {
Self {
db,
record_name,
nt_type,
subscription: None,
running: false,
initial_snapshot: None,
overflow_count: Arc::new(AtomicU64::new(0)),
access: AccessContext::allow_all(),
}
}
pub fn with_access(mut self, access: AccessContext) -> Self {
self.access = access;
self
}
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
}
impl PvaMonitor for BridgeMonitor {
async fn start(&mut self) -> BridgeResult<()> {
if self.running {
return Ok(());
}
if !self.access.can_read(&self.record_name) {
return Err(BridgeError::PutRejected(format!(
"monitor read denied for {} (user='{}' host='{}')",
self.record_name, self.access.user, self.access.host
)));
}
let sub = DbSubscription::subscribe(&self.db, &self.record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(self.record_name.clone()))?;
self.subscription = Some(sub);
self.running = true;
Ok(())
}
async fn poll(&mut self) -> Option<PvStructure> {
if let Some(initial) = self.initial_snapshot.take() {
return Some(initial);
}
let sub = self.subscription.as_mut()?;
let snapshot = sub.recv_snapshot().await?;
Some(snapshot_to_pv_structure(&snapshot, self.nt_type))
}
async fn stop(&mut self) {
self.subscription = None;
self.running = false;
self.initial_snapshot = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use epics_base_rs::server::records::ai::AiRecord;
use std::time::Duration;
#[tokio::test]
async fn monitor_stop_releases_subscription() {
let db = Arc::new(PvDatabase::new());
db.add_record("MON_LIFECYCLE", Box::new(AiRecord::new(1.0)))
.await
.unwrap();
let mut mon = BridgeMonitor::new(db.clone(), "MON_LIFECYCLE".into(), NtType::Scalar);
mon.start().await.expect("start ok");
assert!(mon.running);
let polled = tokio::time::timeout(Duration::from_millis(100), mon.poll()).await;
assert!(
polled.is_err(),
"poll() should time out without a fresh update"
);
mon.stop().await;
assert!(!mon.running);
assert!(mon.subscription.is_none());
mon.stop().await;
assert!(!mon.running);
let mut mon2 = BridgeMonitor::new(db.clone(), "MON_LIFECYCLE".into(), NtType::Scalar);
mon2.start().await.expect("re-subscribe ok");
assert!(mon2.running);
mon2.stop().await;
}
}