use std::sync::Arc;
use epics_base_rs::server::database::{ExternalPvResolver, LinkSet, PvDatabase};
use epics_base_rs::types::EpicsValue;
use epics_pva_rs::pvdata::{PvField, ScalarValue};
use super::config::{LinkDirection, PvaLinkConfig};
use super::link::{PvaLink, PvaLinkResult};
use super::registry::PvaLinkRegistry;
fn block_in_place_or_warn<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
use tokio::runtime::{Handle, RuntimeFlavor};
if let Ok(handle) = Handle::try_current() {
match handle.runtime_flavor() {
RuntimeFlavor::MultiThread => tokio::task::block_in_place(f),
_ => f(),
}
} else {
f()
}
}
#[derive(Clone)]
pub struct PvaLinkResolver {
registry: Arc<PvaLinkRegistry>,
handle: tokio::runtime::Handle,
reads: Arc<std::sync::atomic::AtomicU64>,
enabled: Arc<std::sync::atomic::AtomicBool>,
}
impl PvaLinkResolver {
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self {
registry: Arc::new(PvaLinkRegistry::new()),
handle,
reads: Arc::new(std::sync::atomic::AtomicU64::new(0)),
enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
}
}
pub fn set_enabled(&self, on: bool) {
self.enabled.store(on, std::sync::atomic::Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn open(&self, pv_name: &str) -> PvaLinkResult<Arc<PvaLink>> {
let cfg = PvaLinkConfig {
pv_name: pv_name.to_string(),
field: "value".into(),
monitor: true,
process: false,
notify: false,
scan_on_update: false,
direction: LinkDirection::Inp,
};
self.registry.get_or_open(cfg).await
}
pub fn read_count(&self) -> u64 {
self.reads.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn link_count(&self) -> usize {
self.registry.len()
}
pub async fn wait_for_link_connected(
&self,
pv_name: &str,
timeout: std::time::Duration,
) -> bool {
let link = match self.open(pv_name).await {
Ok(l) => l,
Err(_) => return false,
};
let deadline = std::time::Instant::now() + timeout;
loop {
if link.read().await.is_ok() {
return true;
}
if std::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
pub fn build_resolver(self) -> ExternalPvResolver {
let resolver = self;
Arc::new(move |name: &str| -> Option<EpicsValue> {
if !resolver.is_enabled() {
return None;
}
let name = match name.strip_prefix("pva://") {
Some(stripped) => stripped,
None => {
if name.starts_with("ca://") {
return None;
}
name
}
};
if let Some(link) = resolver.registry.try_get(name, LinkDirection::Inp)
&& let Some(value) = link.try_read_cached()
{
resolver
.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return pvfield_to_epics_value(&value);
}
let cfg = PvaLinkConfig {
pv_name: name.to_string(),
field: "value".into(),
monitor: true,
process: false,
notify: false,
scan_on_update: false,
direction: LinkDirection::Inp,
};
let (link, value) = block_in_place_or_warn(|| {
resolver.handle.block_on(async {
let link = resolver.registry.get_or_open(cfg).await.ok()?;
let value = link.read().await.ok()?;
Some((link, value))
})
})?;
let _ = link;
resolver
.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pvfield_to_epics_value(&value)
})
}
}
pub async fn install_pvalink_resolver(
db: &Arc<PvDatabase>,
handle: tokio::runtime::Handle,
) -> PvaLinkResolver {
let resolver = PvaLinkResolver::new(handle);
db.set_external_resolver(resolver.clone().build_resolver())
.await;
db.register_link_set("pva", Arc::new(resolver.clone()))
.await;
resolver
}
impl LinkSet for PvaLinkResolver {
fn is_connected(&self, name: &str) -> bool {
let Some(name) = strip_scheme(name) else {
return false;
};
match self.registry.try_get(name, LinkDirection::Inp) {
Some(link) => link.is_connected(),
None => false,
}
}
fn get_value(&self, name: &str) -> Option<EpicsValue> {
if !self.is_enabled() {
return None;
}
let name = strip_scheme(name)?;
if let Some(link) = self.registry.try_get(name, LinkDirection::Inp)
&& let Some(value) = link.try_read_cached()
{
self.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return pvfield_to_epics_value(&value);
}
let cfg = PvaLinkConfig {
pv_name: name.to_string(),
field: "value".into(),
monitor: true,
process: false,
notify: false,
scan_on_update: false,
direction: LinkDirection::Inp,
};
let value = block_in_place_or_warn(|| {
self.handle.block_on(async {
let link = self.registry.get_or_open(cfg).await.ok()?;
link.read().await.ok()
})
})?;
self.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pvfield_to_epics_value(&value)
}
fn put_value(&self, name: &str, value: EpicsValue) -> Result<(), String> {
if !self.is_enabled() {
return Err("pvalink disabled".into());
}
let name = strip_scheme(name).ok_or_else(|| {
format!("pvalink rejects ca:// scheme: {name} (use the CA-link path instead)")
})?;
let cfg = PvaLinkConfig {
pv_name: name.to_string(),
field: "value".into(),
monitor: false,
process: true,
notify: false,
scan_on_update: false,
direction: LinkDirection::Out,
};
let array_path = matches!(
value,
EpicsValue::ShortArray(_)
| EpicsValue::FloatArray(_)
| EpicsValue::EnumArray(_)
| EpicsValue::DoubleArray(_)
| EpicsValue::LongArray(_)
| EpicsValue::CharArray(_)
| EpicsValue::StringArray(_)
);
block_in_place_or_warn(|| {
self.handle.block_on(async {
let link = self
.registry
.get_or_open(cfg)
.await
.map_err(|e| e.to_string())?;
if array_path {
let pv_field = crate::qsrv::convert::epics_to_pv_field(&value);
link.write_pv_field(&pv_field)
.await
.map_err(|e| e.to_string())
} else {
let value_str = value.to_string();
link.write(&value_str).await.map_err(|e| e.to_string())
}
})
})
}
fn alarm_message(&self, name: &str) -> Option<String> {
let name = strip_scheme(name)?;
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(default_inp_cfg(name)).await.ok() })
})?;
link.alarm_message()
}
fn time_stamp(&self, name: &str) -> Option<(i64, i32)> {
let name = strip_scheme(name)?;
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(default_inp_cfg(name)).await.ok() })
})?;
link.time_stamp()
}
fn link_names(&self) -> Vec<String> {
Vec::new()
}
}
fn strip_scheme(name: &str) -> Option<&str> {
if let Some(stripped) = name.strip_prefix("pva://") {
return Some(stripped);
}
if name.starts_with("ca://") {
return None;
}
Some(name)
}
fn default_inp_cfg(pv_name: &str) -> PvaLinkConfig {
PvaLinkConfig {
pv_name: pv_name.to_string(),
field: "value".into(),
monitor: true,
process: false,
notify: false,
scan_on_update: false,
direction: LinkDirection::Inp,
}
}
fn pvfield_to_epics_value(field: &PvField) -> Option<EpicsValue> {
match field {
PvField::Scalar(sv) => Some(scalar_to_epics(sv)),
PvField::Structure(s) => {
for (name, sub) in &s.fields {
if name == "value" {
return pvfield_to_epics_value(sub);
}
}
None
}
PvField::ScalarArray(arr) => {
let first = arr.first()?;
match first {
ScalarValue::Double(_) => {
let v: Vec<f64> = arr
.iter()
.filter_map(|s| {
if let ScalarValue::Double(d) = s {
Some(*d)
} else {
None
}
})
.collect();
Some(EpicsValue::DoubleArray(v))
}
ScalarValue::Int(_) => {
let v: Vec<i32> = arr
.iter()
.filter_map(|s| {
if let ScalarValue::Int(i) = s {
Some(*i)
} else {
None
}
})
.collect();
Some(EpicsValue::LongArray(v))
}
_ => None,
}
}
_ => None,
}
}
fn scalar_to_epics(sv: &ScalarValue) -> EpicsValue {
match sv {
ScalarValue::Double(v) => EpicsValue::Double(*v),
ScalarValue::Float(v) => EpicsValue::Float(*v),
ScalarValue::Long(v) => EpicsValue::Long(*v as i32),
ScalarValue::Int(v) => EpicsValue::Long(*v),
ScalarValue::Short(v) => EpicsValue::Short(*v),
ScalarValue::Byte(v) => EpicsValue::Char(*v as u8),
ScalarValue::ULong(v) => EpicsValue::Long(*v as i32),
ScalarValue::UInt(v) => EpicsValue::Long(*v as i32),
ScalarValue::UShort(v) => EpicsValue::Short(*v as i16),
ScalarValue::UByte(v) => EpicsValue::Short(*v as i16),
ScalarValue::Boolean(v) => EpicsValue::Long(if *v { 1 } else { 0 }),
ScalarValue::String(s) => EpicsValue::String(s.clone()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pvfield_scalar_to_epics_double() {
let f = PvField::Scalar(ScalarValue::Double(2.5));
assert_eq!(pvfield_to_epics_value(&f), Some(EpicsValue::Double(2.5)));
}
#[test]
fn pvfield_struct_with_value_extracts() {
use epics_pva_rs::pvdata::PvStructure;
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Long(42))));
let f = PvField::Structure(s);
assert_eq!(pvfield_to_epics_value(&f), Some(EpicsValue::Long(42)));
}
}