use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use crate::error::CaResult;
use crate::runtime::sync::mpsc;
use crate::server::pv::MonitorEvent;
use crate::server::recgbl::EventMask;
use crate::types::{DbFieldType, EpicsValue};
use super::{PvDatabase, parse_pv_name};
static NEXT_SID: AtomicU32 = AtomicU32::new(1_000_000);
static NEXT_ORIGIN: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
fn next_sid() -> u32 {
NEXT_SID.fetch_add(1, Ordering::Relaxed)
}
pub fn alloc_origin() -> u64 {
NEXT_ORIGIN.fetch_add(1, Ordering::Relaxed)
}
#[derive(Clone)]
pub struct DbChannel {
db: PvDatabase,
name: String,
origin: u64,
}
impl DbChannel {
pub fn new(db: &PvDatabase, name: &str) -> Self {
Self {
db: db.clone(),
name: name.to_string(),
origin: 0,
}
}
pub fn with_origin(db: &PvDatabase, name: &str, origin: u64) -> Self {
Self {
db: db.clone(),
name: name.to_string(),
origin,
}
}
pub fn origin(&self) -> u64 {
self.origin
}
pub async fn get_f64(&self) -> f64 {
self.db
.get_pv(&self.name)
.await
.ok()
.and_then(|v| v.to_f64())
.unwrap_or(0.0)
}
pub async fn get_i16(&self) -> i16 {
self.db
.get_pv(&self.name)
.await
.ok()
.and_then(|v| v.to_f64())
.map(|f| f as i16)
.unwrap_or(0)
}
pub async fn get_string(&self) -> String {
match self.db.get_pv(&self.name).await {
Ok(EpicsValue::String(s)) => s,
Ok(v) => v.to_string(),
Err(_) => String::new(),
}
}
pub async fn put_f64(&self, v: f64) -> CaResult<()> {
self.db.put_pv(&self.name, EpicsValue::Double(v)).await
}
pub async fn put_i16(&self, v: i16) -> CaResult<()> {
self.db.put_pv(&self.name, EpicsValue::Short(v)).await
}
pub async fn put_string(&self, v: &str) -> CaResult<()> {
self.db
.put_pv(&self.name, EpicsValue::String(v.to_string()))
.await
}
pub async fn put_f64_post(&self, v: f64) -> CaResult<()> {
self.db
.put_pv_and_post_with_origin(&self.name, EpicsValue::Double(v), self.origin)
.await
}
pub async fn put_i16_post(&self, v: i16) -> CaResult<()> {
self.db
.put_pv_and_post_with_origin(&self.name, EpicsValue::Short(v), self.origin)
.await
}
pub async fn put_string_post(&self, v: &str) -> CaResult<()> {
self.db
.put_pv_and_post_with_origin(&self.name, EpicsValue::String(v.to_string()), self.origin)
.await
}
pub async fn put_f64_process(&self, v: f64) -> CaResult<()> {
let (record_name, field) = parse_pv_name(&self.name);
let _ = self
.db
.put_record_field_from_ca(record_name, field, EpicsValue::Double(v))
.await?;
Ok(())
}
pub async fn put_i16_process(&self, v: i16) -> CaResult<()> {
let (record_name, field) = parse_pv_name(&self.name);
let _ = self
.db
.put_record_field_from_ca(record_name, field, EpicsValue::Short(v))
.await?;
Ok(())
}
pub async fn put_i32_process(&self, v: i32) -> CaResult<()> {
let (record_name, field) = parse_pv_name(&self.name);
let _ = self
.db
.put_record_field_from_ca(record_name, field, EpicsValue::Long(v))
.await?;
Ok(())
}
pub async fn put_string_process(&self, v: &str) -> CaResult<()> {
let (record_name, field) = parse_pv_name(&self.name);
let _ = self
.db
.put_record_field_from_ca(record_name, field, EpicsValue::String(v.to_string()))
.await?;
Ok(())
}
pub async fn get_i32(&self) -> i32 {
self.db
.get_pv(&self.name)
.await
.ok()
.and_then(|v| match v {
EpicsValue::Long(i) => Some(i),
other => other.to_f64().map(|f| f as i32),
})
.unwrap_or(0)
}
pub fn name(&self) -> &str {
&self.name
}
}
pub struct DbSubscription {
rx: mpsc::Receiver<MonitorEvent>,
pv_name: String,
ignore_origin: u64,
record: std::sync::Arc<tokio::sync::RwLock<crate::server::record::RecordInstance>>,
sid: u32,
}
impl DbSubscription {
pub async fn subscribe(db: &PvDatabase, pv_name: &str) -> Option<Self> {
Self::subscribe_filtered(db, pv_name, 0).await
}
pub async fn subscribe_filtered(
db: &PvDatabase,
pv_name: &str,
ignore_origin: u64,
) -> Option<Self> {
let mask = (EventMask::VALUE | EventMask::LOG).bits();
Self::subscribe_with_mask(db, pv_name, ignore_origin, mask).await
}
pub async fn subscribe_with_mask(
db: &PvDatabase,
pv_name: &str,
ignore_origin: u64,
mask: u16,
) -> Option<Self> {
let (record_name, field) = parse_pv_name(pv_name);
let field = field.to_ascii_uppercase();
let rec = db.get_record(record_name).await?;
let sid = next_sid();
let rx = {
let mut instance = rec.write().await;
instance.add_subscriber(&field, sid, DbFieldType::Double, mask)?
};
Some(Self {
rx,
pv_name: pv_name.to_string(),
ignore_origin,
record: rec,
sid,
})
}
async fn next_event(&mut self) -> Option<MonitorEvent> {
loop {
let queued = self.rx.recv().await?;
let coalesced = self.record.read().await.pop_coalesced(self.sid);
let event = coalesced.unwrap_or(queued);
if self.ignore_origin != 0 && event.origin == self.ignore_origin {
continue;
}
return Some(event);
}
}
pub async fn recv_f64(&mut self) -> Option<f64> {
let event = self.next_event().await?;
event.snapshot.value.to_f64()
}
pub async fn recv(&mut self) -> Option<EpicsValue> {
let event = self.next_event().await?;
Some(event.snapshot.value)
}
pub async fn recv_snapshot(&mut self) -> Option<crate::server::snapshot::Snapshot> {
let event = self.next_event().await?;
Some(event.snapshot)
}
pub fn pv_name(&self) -> &str {
&self.pv_name
}
}
impl Drop for DbSubscription {
fn drop(&mut self) {
let record = self.record.clone();
let sid = self.sid;
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(async move {
record.write().await.remove_subscriber(sid);
});
}
}
}
pub struct DbMultiMonitor {
subs: Vec<DbSubscription>,
}
impl DbMultiMonitor {
pub async fn new(db: &PvDatabase, pv_names: &[String]) -> Self {
Self::new_filtered(db, pv_names, 0).await
}
pub async fn new_filtered(db: &PvDatabase, pv_names: &[String], ignore_origin: u64) -> Self {
let mut subs = Vec::new();
for name in pv_names {
if let Some(sub) = DbSubscription::subscribe_filtered(db, name, ignore_origin).await {
subs.push(sub);
}
}
Self { subs }
}
pub fn sub_count(&self) -> usize {
self.subs.len()
}
pub async fn wait_change(&mut self) -> (String, f64) {
loop {
for sub in &mut self.subs {
match sub.rx.try_recv() {
Ok(event) => {
if sub.ignore_origin != 0 && event.origin == sub.ignore_origin {
continue;
}
let val = event.snapshot.value.to_f64().unwrap_or(0.0);
return (sub.pv_name.clone(), val);
}
Err(_) => continue,
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}