use std::sync::Arc;
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarType, ScalarValue};
use epics_pva_rs::server_native::ChannelSource;
use tokio::sync::mpsc;
use super::channel_cache::ChannelCache;
use super::source::GatewayChannelSource;
#[derive(Clone)]
pub struct ControlSource {
prefix: String,
cache: Arc<ChannelCache>,
gateway_source: GatewayChannelSource,
}
impl ControlSource {
pub fn new(
prefix: impl Into<String>,
cache: Arc<ChannelCache>,
gateway_source: GatewayChannelSource,
) -> Self {
Self {
prefix: prefix.into(),
cache,
gateway_source,
}
}
fn pv_names(&self) -> [String; 4] {
[
format!("{}:cacheSize", self.prefix),
format!("{}:upstreamCount", self.prefix),
format!("{}:liveSubscribers", self.prefix),
format!("{}:report", self.prefix),
]
}
fn nt_scalar_long(v: i64) -> PvField {
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Long(v))));
PvField::Structure(s)
}
fn nt_scalar_long_desc() -> FieldDesc {
FieldDesc::Structure {
struct_id: "epics:nt/NTScalar:1.0".into(),
fields: vec![("value".into(), FieldDesc::Scalar(ScalarType::Long))],
}
}
fn nt_scalar_string(v: String) -> PvField {
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::String(v))));
PvField::Structure(s)
}
fn nt_scalar_string_desc() -> FieldDesc {
FieldDesc::Structure {
struct_id: "epics:nt/NTScalar:1.0".into(),
fields: vec![("value".into(), FieldDesc::Scalar(ScalarType::String))],
}
}
fn matches(&self, name: &str) -> bool {
self.pv_names().iter().any(|n| n == name)
}
}
impl ChannelSource for ControlSource {
async fn list_pvs(&self) -> Vec<String> {
self.pv_names().to_vec()
}
async fn has_pv(&self, name: &str) -> bool {
self.matches(name)
}
async fn get_introspection(&self, name: &str) -> Option<FieldDesc> {
if !self.matches(name) {
return None;
}
if name.ends_with(":report") {
Some(Self::nt_scalar_string_desc())
} else {
Some(Self::nt_scalar_long_desc())
}
}
async fn get_value(&self, name: &str) -> Option<PvField> {
if !self.matches(name) {
return None;
}
let cache_size = self.cache.entry_count().await as i64;
let live_subs = self.gateway_source.live_subscribers() as i64;
if name.ends_with(":cacheSize") || name.ends_with(":upstreamCount") {
Some(Self::nt_scalar_long(cache_size))
} else if name.ends_with(":liveSubscribers") {
Some(Self::nt_scalar_long(live_subs))
} else if name.ends_with(":report") {
let report = format!(
"cacheSize={cache_size} upstreamCount={cache_size} liveSubscribers={live_subs}"
);
Some(Self::nt_scalar_string(report))
} else {
None
}
}
async fn is_writable(&self, _name: &str) -> bool {
false
}
async fn put_value(&self, _name: &str, _value: PvField) -> Result<(), String> {
Err("control PVs are read-only".to_string())
}
async fn subscribe(&self, name: &str) -> Option<mpsc::Receiver<PvField>> {
if !self.matches(name) {
return None;
}
let (tx, rx) = mpsc::channel::<PvField>(4);
let me = self.clone();
let pv_name = name.to_string();
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(1));
tick.tick().await; let mut last: Option<PvField> = None;
loop {
tick.tick().await;
let snapshot = me.get_value(&pv_name).await;
if let Some(value) = snapshot {
let changed = match &last {
Some(prev) => prev != &value,
None => true,
};
if changed {
if tx.send(value.clone()).await.is_err() {
break;
}
last = Some(value);
}
}
}
});
Some(rx)
}
}