use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::runtime::sync::{Mutex, RwLock, mpsc};
use crate::error::CaError;
use crate::server::snapshot::Snapshot;
use crate::types::{DbFieldType, EpicsValue};
fn per_channel_event_depth() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_EVENTS_PER_CHAN")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(64)
.max(4)
}
pub(crate) fn max_subscribers_per_pv() -> usize {
crate::runtime::env::get("EPICS_CAS_MAX_SUBSCRIBERS_PER_PV")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1024)
.max(8)
}
static DROPPED_MONITOR_EVENTS: AtomicU64 = AtomicU64::new(0);
pub fn dropped_monitor_events() -> u64 {
DROPPED_MONITOR_EVENTS.load(Ordering::Relaxed)
}
fn record_dropped_monitor() {
DROPPED_MONITOR_EVENTS.fetch_add(1, Ordering::Relaxed);
}
#[derive(Debug, Clone, Default)]
pub struct WriteContext {
pub user: String,
pub host: String,
pub peer: String,
}
pub type WriteHook = Arc<
dyn Fn(
EpicsValue,
WriteContext,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), CaError>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone)]
pub struct MonitorEvent {
pub snapshot: Snapshot,
pub origin: u64,
}
pub struct Subscriber {
pub sid: u32,
pub data_type: DbFieldType,
pub mask: u16,
pub tx: mpsc::Sender<MonitorEvent>,
pub coalesced: Arc<StdMutex<Option<MonitorEvent>>>,
}
pub struct ProcessVariable {
pub name: String,
pub value: RwLock<EpicsValue>,
pub subscribers: Mutex<Vec<Subscriber>>,
write_hook: parking_lot::RwLock<Option<WriteHook>>,
}
impl ProcessVariable {
pub fn new(name: String, initial: EpicsValue) -> Self {
Self {
name,
value: RwLock::new(initial),
subscribers: Mutex::new(Vec::new()),
write_hook: parking_lot::RwLock::new(None),
}
}
pub fn set_write_hook(&self, hook: WriteHook) {
*self.write_hook.write() = Some(hook);
}
pub fn clear_write_hook(&self) {
*self.write_hook.write() = None;
}
pub fn write_hook(&self) -> Option<WriteHook> {
self.write_hook.read().clone()
}
pub async fn get(&self) -> EpicsValue {
self.value.read().await.clone()
}
pub async fn snapshot(&self) -> Snapshot {
let value = self.value.read().await.clone();
Snapshot::new(value, 0, 0, crate::runtime::time::now_wall())
}
pub async fn set(&self, new_value: EpicsValue) {
{
let mut val = self.value.write().await;
*val = new_value.clone();
}
self.notify_subscribers(new_value).await;
}
pub async fn post_alarm(&self, severity: u16, status: u16) {
let value = self.value.read().await.clone();
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
for sub in subs.iter() {
let snapshot = Snapshot::new(
value.clone(),
status,
severity,
crate::runtime::time::now_wall(),
);
let event = MonitorEvent {
snapshot,
origin: 0,
};
if sub.tx.try_send(event.clone()).is_err() {
if let Ok(mut slot) = sub.coalesced.lock() {
*slot = Some(event);
}
}
}
}
async fn notify_subscribers(&self, value: EpicsValue) {
let mut subs = self.subscribers.lock().await;
subs.retain(|sub| !sub.tx.is_closed());
for sub in subs.iter() {
let snapshot = Snapshot::new(value.clone(), 0, 0, crate::runtime::time::now_wall());
let event = MonitorEvent {
snapshot,
origin: 0,
};
if sub.tx.try_send(event.clone()).is_err() {
if let Ok(mut slot) = sub.coalesced.lock() {
if slot.is_some() {
record_dropped_monitor();
}
*slot = Some(event);
}
}
}
}
pub async fn add_subscriber(
&self,
sid: u32,
data_type: DbFieldType,
mask: u16,
) -> Option<mpsc::Receiver<MonitorEvent>> {
let cap = max_subscribers_per_pv();
let (tx, rx) = mpsc::channel(per_channel_event_depth());
let sub = Subscriber {
sid,
data_type,
mask,
tx,
coalesced: Arc::new(StdMutex::new(None)),
};
let mut subs = self.subscribers.lock().await;
if subs.len() >= cap {
tracing::warn!(
pv = %self.name,
live = subs.len(),
cap,
"PV subscriber cap reached, refusing add_subscriber"
);
return None;
}
subs.push(sub);
Some(rx)
}
pub async fn remove_subscriber(&self, sid: u32) {
let mut subs = self.subscribers.lock().await;
subs.retain(|s| s.sid != sid);
}
pub async fn pop_coalesced(&self, sid: u32) -> Option<MonitorEvent> {
let subs = self.subscribers.lock().await;
let sub = subs.iter().find(|s| s.sid == sid)?;
sub.coalesced.lock().ok()?.take()
}
}