use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use epics_pva_rs::pvdata::PvStructure;
use super::provider::{AccessContext, PvaMonitor};
use super::pvif::{NtType, snapshot_to_pv_structure};
use crate::error::{BridgeError, BridgeResult};
pub struct BridgeMonitor {
db: Arc<PvDatabase>,
record_name: String,
nt_type: NtType,
subscription: Option<DbSubscription>,
running: bool,
initial_snapshot: Option<PvStructure>,
overflow_count: Arc<AtomicU64>,
access: AccessContext,
}
impl BridgeMonitor {
pub fn new(db: Arc<PvDatabase>, record_name: String, nt_type: NtType) -> Self {
Self {
db,
record_name,
nt_type,
subscription: None,
running: false,
initial_snapshot: None,
overflow_count: Arc::new(AtomicU64::new(0)),
access: AccessContext::allow_all(),
}
}
pub fn with_access(mut self, access: AccessContext) -> Self {
self.access = access;
self
}
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
}
impl PvaMonitor for BridgeMonitor {
async fn start(&mut self) -> BridgeResult<()> {
if self.running {
return Ok(());
}
if !self.access.can_read(&self.record_name) {
return Err(BridgeError::PutRejected(format!(
"monitor read denied for {} (user='{}' host='{}')",
self.record_name, self.access.user, self.access.host
)));
}
let sub = DbSubscription::subscribe(&self.db, &self.record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(self.record_name.clone()))?;
let (record_name, _) = epics_base_rs::server::database::parse_pv_name(&self.record_name);
if let Some(rec) = self.db.get_record(record_name).await {
let instance = rec.read().await;
if let Some(snapshot) = instance.snapshot_for_field("VAL") {
self.initial_snapshot = Some(snapshot_to_pv_structure(&snapshot, self.nt_type));
}
}
self.subscription = Some(sub);
self.running = true;
Ok(())
}
async fn poll(&mut self) -> Option<PvStructure> {
if let Some(initial) = self.initial_snapshot.take() {
return Some(initial);
}
let sub = self.subscription.as_mut()?;
let snapshot = sub.recv_snapshot().await?;
Some(snapshot_to_pv_structure(&snapshot, self.nt_type))
}
async fn stop(&mut self) {
self.subscription = None;
self.running = false;
self.initial_snapshot = None;
}
}